@Override public MessageDecoderResult decodable(IoSession session, IoBuffer buf) { int remain = buf.remaining(); if(remain <= 1){ return MessageDecoderResult.NOT_OK; } buf.mark(); byte[] data = new byte[remain - 2]; buf.get(data); byte cs = buf.get(); int t = 0; for( int i =0; i< data.length; i ++){ t += data[i]; } int cs_cal = (t%256) ; //如果校验码校验不通过,则为无效消息 if ( (cs & 0xFF) != (cs_cal & 0xFF)){ buf.reset(); return MessageDecoderResult.NOT_OK; } buf.reset(); return MessageDecoderResult.OK; }
public MessageDecoderResult decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws ProtocolCodecException { int messageCount = 0; while (parseMessage(in, out)) { messageCount++; } if (messageCount > 0) { // Mina will compact the buffer because we can't detect a header if (in.remaining() < HEADER_PATTERN.length) { position = 0; } return MessageDecoderResult.OK; } else { // Mina will compact the buffer position -= in.position(); return MessageDecoderResult.NEED_DATA; } }
/** * 解码并处理断包 */ @Override public MessageDecoderResult decode(IoSession session, IoBuffer buffer, ProtocolDecoderOutput output) throws Exception { int nRemainning = buffer.remaining(); if (nRemainning < MessageCodec.HEAD_LENGTH) {// min length return MessageDecoderResult.NEED_DATA; } else { buffer.mark();// 标记位置mark=pos int nLen = buffer.getShort();// 包长(包括消息头) buffer.reset();// 重置位置pos=mark // 如果buffer中可读的长度小于包长说明断包返回 NEED_DATA if (nRemainning < nLen) { // buffer.reset();//重置位置pos=mark return MessageDecoderResult.NEED_DATA; } // buffer.reset();//重置位置pos=mark } Object proObj = decodeBody(session, buffer); output.write(proObj);// 解码后输出 return MessageDecoderResult.OK; }
protected MessageDecoderResult decodeCommonHeader(AbstractMessage message, IoBuffer in) { //Common decoding part if (in.remaining() < 2) { return NEED_DATA; } byte h1 = in.get(); byte messageType = (byte) ((h1 & 0x00F0) >> 4); boolean dupFlag = ((byte) ((h1 & 0x0008) >> 3) == 1); byte qosLevel = (byte) ((h1 & 0x0006) >> 1); boolean retainFlag = ((byte) (h1 & 0x0001) == 1); int remainingLength = Utils.decodeRemainingLenght(in); if (remainingLength == -1) { return NEED_DATA; } message.setMessageType(messageType); message.setDupFlag(dupFlag); message.setQos(AbstractMessage.QOSType.values()[qosLevel]); message.setRetainFlag(retainFlag); message.setRemainingLength(remainingLength); return OK; }
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { //Common decoding part UnsubscribeMessage message = new UnsubscribeMessage(); if (decodeCommonHeader(message, in) == NEED_DATA) { return NEED_DATA; } //check qos level if (message.getQos() != QOSType.LEAST_ONE) { return NOT_OK; } int start = in.position(); //read messageIDs message.setMessageID(Utils.readWord(in)); int readed = in.position() - start; while (readed < message.getRemainingLength()) { message.addTopic(Utils.decodeString(in)); readed = in.position() - start; } out.write(message); return OK; }
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { //Common decoding part SubscribeMessage message = new SubscribeMessage(); if (decodeCommonHeader(message, in) == NEED_DATA) { return NEED_DATA; } //check qos level if (message.getQos() != QOSType.LEAST_ONE) { return NOT_OK; } int start = in.position(); //read messageIDs message.setMessageID(Utils.readWord(in)); int readed = in.position() - start; while (readed < message.getRemainingLength()) { decodeSubscription(in, message); readed = in.position() - start; } out.write(message); return OK; }
static MessageDecoderResult checkDecodable(byte type, IoBuffer in) { if (in.remaining() < 1) { return MessageDecoderResult.NEED_DATA; } byte h1 = in.get(); byte messageType = (byte) ((h1 & 0x00F0) >> 4); int remainingLength = Utils.decodeRemainingLenght(in); if (remainingLength == -1) { return MessageDecoderResult.NEED_DATA; } //check remaining length if (in.remaining() < remainingLength) { return MessageDecoderResult.NEED_DATA; } return messageType == type ? MessageDecoderResult.OK : MessageDecoderResult.NOT_OK; }
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { //Common decoding part SubAckMessage message = new SubAckMessage(); if (decodeCommonHeader(message, in) == NEED_DATA) { return NEED_DATA; } int remainingLength = message.getRemainingLength(); //MessageID message.setMessageID(Utils.readWord(in)); remainingLength -= 2; //Qos array if (in.remaining() < remainingLength ) { return NEED_DATA; } for (int i = 0; i < remainingLength; i++) { byte qos = in.get(); message.addType(QOSType.values()[qos]); } out.write(message); return OK; }
@Test public void testBadFlagUserPwd() throws UnsupportedEncodingException, Exception { m_buff = IoBuffer.allocate(14); m_buff.clear().put((byte)(AbstractMessage.CONNECT << 4)).put((byte)12); //Proto name encodeString(m_buff, "MQIsdp"); //version m_buff.put((byte)3); //conn flags m_buff.put((byte)0x4E); //sets user to false and password to true //keepAlive m_buff.put((byte)0).put((byte) 0x0A); m_buff.flip(); //Excercise MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); assertNull(m_mockProtoDecoder.getMessage()); assertEquals(MessageDecoder.NOT_OK, res); }
@Test public void testMultiTopic() throws Exception { m_buff = IoBuffer.allocate(4).setAutoExpand(true); String topic1 = "a/b"; String topic2 = "c/d/e"; initMultiTopic(m_buff, 123, topic1, topic2); m_buff.flip(); //Excercise MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); //Verify assertEquals(MessageDecoderResult.OK, res); assertEquals(2, m_mockProtoDecoder.getMessage().topics().size()); assertEquals(topic1, m_mockProtoDecoder.getMessage().topics().get(0)); assertEquals(topic2, m_mockProtoDecoder.getMessage().topics().get(1)); assertEquals(AbstractMessage.UNSUBSCRIBE, m_mockProtoDecoder.getMessage().getMessageType()); }
@Test public void testDecodeSingleTopic_bug() throws Exception { //A2 0C 00 01 00 06 2F 74 6F 70 69 63 //12 byte byte[] overallMessage = new byte[] {(byte)0xA2, 0x0A, //fixed header 0x00, 0x01, //MSG ID 0x00, 0x06, 0x2F, 0x74, 0x6F, 0x70, 0x69, 0x63}; //"/topic" string m_buff = IoBuffer.allocate(overallMessage.length).setAutoExpand(true); m_buff.put(overallMessage); m_buff.flip(); MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); assertNotNull(m_mockProtoDecoder.getMessage()); UnsubscribeMessage message = (UnsubscribeMessage) m_mockProtoDecoder.getMessage(); // assertEquals(0x0A, message.getMessageID()); // assertEquals(1, message.types().size()); // assertEquals(AbstractMessage.QOSType.LEAST_ONE, message.types().get(0)); }
@Test public void testBadQos() throws Exception { initHeaderQos(m_buff, 0xAABB, QOSType.LEAST_ONE, QOSType.MOST_ONE, QOSType.MOST_ONE); m_buff.flip(); //Excercise MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); //Verify assertEquals(MessageDecoderResult.OK, res); assertEquals(0xAABB, m_mockProtoDecoder.getMessage().getMessageID().intValue()); List<QOSType> qoses = m_mockProtoDecoder.getMessage().types(); assertEquals(3, qoses.size()); assertEquals(QOSType.LEAST_ONE, qoses.get(0)); assertEquals(QOSType.MOST_ONE, qoses.get(1)); assertEquals(QOSType.MOST_ONE, qoses.get(2)); assertEquals(AbstractMessage.SUBACK, m_mockProtoDecoder.getMessage().getMessageType()); }
@Test public void testBugBadRemainingCalculation() throws Exception { byte[] overallMessage = new byte[] {(byte)0x90, 0x03, //fixed header 0x00, 0x0A, //MSG ID 0x01}; //QoS array m_buff = IoBuffer.allocate(overallMessage.length).setAutoExpand(true); m_buff.put(overallMessage); m_buff.flip(); //Exercise MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); assertNotNull(m_mockProtoDecoder.getMessage()); assertEquals(MessageDecoder.OK, res); SubAckMessage message = (SubAckMessage) m_mockProtoDecoder.getMessage(); assertEquals(0x0A, message.getMessageID().intValue()); assertEquals(1, message.types().size()); assertEquals(AbstractMessage.QOSType.LEAST_ONE, message.types().get(0)); }
@Test public void testMultiTopic() throws Exception { m_buff = IoBuffer.allocate(4).setAutoExpand(true); Couple c1 = new Couple((byte)2, "a/b"); Couple c2 = new Couple((byte)1, "c/d/e"); initMultiTopic(m_buff, 123, c1, c2); m_buff.flip(); //Excercise MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); //Verify assertEquals(MessageDecoderResult.OK, res); assertEquals(2, m_mockProtoDecoder.getMessage().subscriptions().size()); assertEquals(AbstractMessage.SUBSCRIBE, m_mockProtoDecoder.getMessage().getMessageType()); }
@Test public void testHeaderWithMessageID_Payload() throws Exception { m_buff = IoBuffer.allocate(14).setAutoExpand(true); int messageID = 123; byte[] payload = new byte[]{0x0A, 0x0B, 0x0C}; initHeaderWithMessageID_Payload(m_buff, messageID, payload); m_buff.flip(); //Exercise MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); assertNotNull(m_mockProtoDecoder.getMessage()); assertEquals(MessageDecoder.OK, res); assertEquals("Fake Topic", m_mockProtoDecoder.getMessage().getTopicName()); assertEquals(messageID, (int) m_mockProtoDecoder.getMessage().getMessageID()); TestUtils.verifyEquals(payload, m_mockProtoDecoder.getMessage().getPayload()); }
@Test public void testBugOnRealCase() throws Exception { byte[] overallMessage = new byte[]{0x30, 0x17, //fixed header, 25 byte lenght 0x00, 0x06, 0x2f, 0x74, 0x6f, 0x70, 0x69, 0x63, //[/topic] string 2 len + 6 content 0x54, 0x65, 0x73, 0x74, 0x20, 0x6d, 0x79, // [Test my payload] encoding 0x20, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64}; m_buff = IoBuffer.allocate(overallMessage.length).setAutoExpand(true); m_buff.put(overallMessage); m_buff.flip(); //Exercise MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); assertNotNull(m_mockProtoDecoder.getMessage()); assertEquals(MessageDecoder.OK, res); }
@Test public void testDecodeBigContent() throws Exception { int size = 129; IoBuffer payload = TestUtils.generateRandomPayload(size); IoBuffer firstPublish = generatePublishQoS0(payload); IoBuffer secondPublish = generatePublishQoS0(TestUtils.generateRandomPayload(size)); IoBuffer doubleMessageBuf = IoBuffer.allocate(size * 2).setAutoExpand(true); doubleMessageBuf.put(firstPublish).put(secondPublish).flip(); //Exercise MessageDecoderResult res = m_msgdec.decode(null, doubleMessageBuf, m_mockProtoDecoder); assertEquals(MessageDecoder.OK, res); PublishMessage pubMsg = m_mockProtoDecoder.getMessage(); assertNotNull(pubMsg); res = m_msgdec.decode(null, doubleMessageBuf, m_mockProtoDecoder); assertNotNull(m_mockProtoDecoder.getMessage()); assertEquals(MessageDecoder.OK, res); m_buff.flip(); }
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { try { if (length == 0 || length == 1) { in.get(); out.write(""); return MessageDecoderResult.OK; } length++; byte[] result = new byte[length]; for (int i = 0; i < length; i++) { result[i] = in.get(); } if (0 == in.remaining()) { notfirstmessage = false; } String cont = new String(result, "us-ascii"); out.write(cont.trim()); return MessageDecoderResult.OK; } catch (Exception e) { e.printStackTrace(); } return MessageDecoderResult.OK; }
public MessageDecoderResult decodable(IoSession session, IoBuffer in) { // 长度检查 if (in.remaining() < 12) { return MessageDecoderResult.NEED_DATA; } // 控制码检查(未实现) /*byte b = in.array(); if (tag == (short) 0x0001 || tag == (short) 0x8001) { logger.info("请求标识符:" + tag); } else { logger.error("未知的解码类型...."); return MessageDecoderResult.NOT_OK; }*/ // 数据长度检查(未实现) /*int len = in.getInt(); if (in.remaining() < len) { return MessageDecoderResult.NEED_DATA; }*/ //校验代码 return MessageDecoderResult.OK; }
@Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { Context context = (Context) session.getAttribute(CONTEXT); //表示数据不够,需要读到新的数据后,再次调用decode()方法。 if(context == null){ context = new Context(); // 跳过前4字节 //in.skip(4); //获取第一个字符用于判断是否可以被当前解码器解码 context.dataType = in.getInt(); if(context.dataType == BeanUtil.UPLOAD_FILE){ System.out.println("我收到1了"); context.strLength = in.getInt(); context.byteStr = new byte[context.strLength]; context.fileSize = in.getInt(); context.byteFile = new byte[context.fileSize]; session.setAttribute(CONTEXT, context); return MessageDecoderResult.OK; }else{ return MessageDecoderResult.NOT_OK; } }else{ if(context.dataType == BeanUtil.UPLOAD_FILE){ //表示可以解码 return MessageDecoderResult.OK; }else{ //表示不能解码,会抛出异常 return MessageDecoderResult.NOT_OK; } } }
@Override public MessageDecoderResult decodable(IoSession session, IoBuffer in) { Context context = (Context) session.getAttribute(CONTEXT); if(context!=null){ if(context.limit_data){ in.buf().position(0); } } //表示数据不够,需要读到新的数据后,再次调用decode()方法。 if (in.remaining() < 2){ return MessageDecoderResult.NEED_DATA; } else{ context = new Context(); //获取一个字符表示新的数据开始 用于判断异常数据 in.skip(1);//这个数据是十六进制的 01 也就是1 //获取第一个字符用于判断是否可以被当前解码器解码 context.dataType = in.getInt(); if(context.dataType == BeanUtil.UPLOAD_STR){ //读取标题长度 context.strLength = in.getInt(); //声明数组长度 context.byteStr = new byte[context.strLength]; //System.out.println("我收到2了"); session.setAttribute(CONTEXT, context); //表示可以解码 return MessageDecoderResult.OK; }else{ //System.out.println("服务端收到意外数据"); return MessageDecoderResult.NOT_OK; } } }
@Override public MessageDecoderResult decodable(IoSession session, IoBuffer buffer) { // 如果buffer中可读的长度还没达到消息头长度返回 NEED_DATA if (buffer.remaining() < MessageCodec.HEAD_LENGTH) { return MessageDecoderResult.NEED_DATA; } else { if (!MessageCodec.checkValid(buffer.buf())) { // 不符合解析条件 return MessageDecoderResult.NOT_OK; } } return MessageDecoderResult.OK; }
public MessageDecoderResult decodable(IoSession session, IoBuffer in) { try { return messageComplete(in) ? MessageDecoderResult.OK : MessageDecoderResult.NEED_DATA; } catch (Exception ex) { ex.printStackTrace(); } return MessageDecoderResult.NOT_OK; }
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { HttpRequestMessage m = decodeBody(in); // Return NEED_DATA if the body is not fully read. if (m == null) { return MessageDecoderResult.NEED_DATA; } out.write(m); return MessageDecoderResult.OK; }
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { //Common decoding part DisconnectMessage message = new DisconnectMessage(); if (decodeCommonHeader(message, in) == NEED_DATA) { return NEED_DATA; } out.write(message); return OK; }
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { //Common decoding part PingRespMessage message = new PingRespMessage(); if (decodeCommonHeader(message, in) == NEED_DATA) { return NEED_DATA; } out.write(message); return OK; }
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { //Common decoding part PingReqMessage message = new PingReqMessage(); if (decodeCommonHeader(message, in) == NEED_DATA) { return NEED_DATA; } out.write(message); return OK; }
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { //Common decoding part ConnAckMessage message = new ConnAckMessage(); if (decodeCommonHeader(message, in) == NEED_DATA) { return NEED_DATA; } //skip reserved byte in.skip(1); //read return code message.setReturnCode(in.get()); out.write(message); return OK; }
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { LOG.debug("decode invoked with buffer " + in); int startPos = in.position(); //Common decoding part PublishMessage message = new PublishMessage(); if (decodeCommonHeader(message, in) == NEED_DATA) { LOG.info("decode ask for more data after " + in); return NEED_DATA; } int remainingLength = message.getRemainingLength(); //Topic name String topic = Utils.decodeString(in); if (topic == null) { return NEED_DATA; } message.setTopicName(topic); if (message.getQos() == QOSType.LEAST_ONE || message.getQos() == QOSType.EXACTLY_ONCE) { message.setMessageID(Utils.readWord(in)); } int stopPos = in.position(); //read the payload int payloadSize = remainingLength - (stopPos - startPos - 2) + (Utils.numBytesToEncode(remainingLength) - 1); if (in.remaining() < payloadSize) { return NEED_DATA; } byte[] b = new byte[payloadSize]; in.get(b); message.setPayload(b); out.write(message); return OK; }
public MessageDecoderResult decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception { //Common decoding part MessageIDMessage message = createMessage(); if (decodeCommonHeader(message, in) == NEED_DATA) { return NEED_DATA; } //read messageIDs message.setMessageID(Utils.readWord(in)); out.write(message); return OK; }
@Test public void testBadQos() throws Exception { m_buff = IoBuffer.allocate(2); initHeaderBadQos(m_buff); m_buff.flip(); //Excercise MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); //Verify assertEquals(MessageDecoderResult.NOT_OK, res); }
@Test public void testHeader() throws Exception { m_buff = IoBuffer.allocate(14); int messageId = 0xAABB; initHeader(m_buff, messageId); m_buff.flip(); //Exercise MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); assertNotNull(m_mockProtoDecoder.getMessage()); assertEquals(MessageDecoder.OK, res); assertEquals(messageId, m_mockProtoDecoder.getMessage().getMessageID().intValue()); assertEquals(AbstractMessage.PUBACK, m_mockProtoDecoder.getMessage().getMessageType()); }
@Test public void testHeader() throws Exception { m_buff = IoBuffer.allocate(14); initHeader(m_buff); m_buff.flip(); //Exercise MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); assertNotNull(m_mockProtoDecoder.getMessage()); assertEquals(MessageDecoder.OK, res); assertEquals(ConnAckMessage.CONNECTION_ACCEPTED, m_mockProtoDecoder.getMessage().getReturnCode()); assertEquals(AbstractMessage.CONNACK, m_mockProtoDecoder.getMessage().getMessageType()); }
@Test public void testHeader() throws Exception { m_buff = IoBuffer.allocate(14); initHeader(m_buff); m_buff.flip(); //Exercise MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); assertNotNull(m_mockProtoDecoder.getMessage()); assertEquals(MessageDecoder.OK, res); assertEquals("Fake Topic", m_mockProtoDecoder.getMessage().getTopicName()); assertNull(m_mockProtoDecoder.getMessage().getMessageID()); assertEquals(AbstractMessage.PUBLISH, m_mockProtoDecoder.getMessage().getMessageType()); }
@Test public void testHeaderWithMessageID() throws Exception { m_buff = IoBuffer.allocate(14).setAutoExpand(true); int messageID = 123; initHeaderWithMessageID(m_buff, messageID); m_buff.flip(); //Exercise MessageDecoderResult res = m_msgdec.decode(null, m_buff, m_mockProtoDecoder); assertNotNull(m_mockProtoDecoder.getMessage()); assertEquals(MessageDecoder.OK, res); assertEquals("Fake Topic", m_mockProtoDecoder.getMessage().getTopicName()); assertEquals(messageID, (int) m_mockProtoDecoder.getMessage().getMessageID()); }
public MessageDecoderResult decodable(IoSession session, IoBuffer in) { int rem = in.remaining(); int fornumber; byte aa; if (notfirstmessage) { flag++; fornumber = rem + flag; } else { flag = 0; fornumber = rem + flag; } try { for (int i = flag; i < fornumber; i++) { aa = in.get(i); if (']' == aa) { flaglast = flag; flag = i; length = flag - flaglast; notfirstmessage = true; return MessageDecoderResult.OK; } } } catch (Exception e) { e.printStackTrace(); } notfirstmessage = false; return MessageDecoderResult.NEED_DATA; }
@Override public MessageDecoderResult decode(IoSession session, IoBuffer buf, ProtocolDecoderOutput out) throws Exception { CjyTcpMessage message = new CjyTcpMessage(); logger.info("收到采集仪消息" + buf.getHexDump()); buf.mark(); //帧起始符 0x68 buf.get(); // 获取设备编码 int address_i = buf.getInt(); message.setAddress(CommUtils.highAndLowAddressSwap(address_i)); //帧起始符 0x16 if ( buf.get() != 0x68){ return null; } //控制码 byte control = buf.get(); message.setD7(control >> 7); message.setD6(control >> 6); //控制码 , 协议簇 message.setCmd(control&0x0f); //数据体长度 int length = buf.get(); message.setLength(length); //数据体 byte[] data = MessageUtil.getBytes(buf, length); message.setData(data); //CRC byte crc = (buf.get()); //结束标志 int finished = buf.get(); if ( finished != 0x16 ){ return MessageDecoderResult.NOT_OK; } buf.reset(); //获取校验位前字节 byte[] cpData = MessageUtil.getBytes(buf, 8 + length); int t = 0; for( int i =0; i< cpData.length; i ++){ t += cpData[i]; } int crc_cal = (t%256) ; logger.info("获取的CRC值" + (crc & 0xFF) + ",计算的crc值:" + crc_cal + ",address:" + message.getAddress()); //如果校验码校验不通过,则为无效消息 if ( (crc & 0xFF) != (crc_cal & 0xFF)){ return MessageDecoderResult.NOT_OK; } buf.get(); buf.get(); out.write(message); return MessageDecoderResult.OK; }
@Override public MessageDecoderResult decode(IoSession session, IoBuffer in,ProtocolDecoderOutput outPut) throws Exception { //将客户端发来的对象进行封装 System.out.println("开始解码:"); Context context = (Context) session.getAttribute(CONTEXT); if(!context.init){ context.init = true; in.getInt(); in.getInt(); in.getInt(); } byte[] byteFile = context.byteFile; int count = context.count; while(in.hasRemaining()){ byte b = in.get(); if(!context.isReadName){ context.byteStr[count] = b; if(count == context.strLength-1){ context.fileName = new String(context.byteStr,BeanUtil.charset); System.out.println(context.fileName); count = -1; context.isReadName = true; } } if(context.isReadName && count != -1){ byteFile[count] = b; } // byteFile[count] = b; count++; } context.count = count; System.out.println("count:"+count); System.out.println("context.fileSize:"+context.fileSize); session.setAttribute(CONTEXT, context); if(context.count == context.fileSize){ BaseMessage message = new BaseMessage(); message.setDataType(context.dataType); FileBean bean = new FileBean(); bean.setFileName(context.fileName); bean.setFileSize(context.fileSize); bean.setFileContent(context.byteFile); message.setData(bean); outPut.write(message); context.reset(); } return MessageDecoderResult.OK; }
@Override public MessageDecoderResult decode(IoSession session, IoBuffer in,ProtocolDecoderOutput outPut) throws Exception { //将客户端发来的对象进行封装 //TODO 等待测试超长数据是否能正常解码 Context context = (Context) session.getAttribute(CONTEXT); //跳过第一个字节 if(!context.init){ in.buf().position(0);//由于分包之后mina不能让没有改变的缓冲数据返回正常,于是先移动了下游标,这里给归0 context.init = true; in.skip(1); in.getInt(); in.getInt(); } int count = context.count; //System.out.println("这里是第一次COnut:"+context.count); //System.out.println("一共有"+in.remaining()+"数据"); while (in.hasRemaining()) { //System.out.println("循环里面的Count:"+count); byte b = in.get(); if(b == 1){ //收到下一条的起始数据了,证明此条数据已经残缺 重置缓冲区 作废此信息 in.buf().position(1);//移动游标,mina不允许不使用数据就返回 context.reset();//重置 context.limit_data = true;//用于判断是否是前一次包破损后遗留下的新数据包 decodable中判断 session.setAttribute(CONTEXT, context); return MessageDecoderResult.OK;//给他返回个正常解码,然后才能继续解码 } new String(context.byteStr,BeanUtil.charset); //如果标题没读完 继续读 if(!context.isReadName){ context.byteStr[count] = b; if(count == context.strLength-1){ //标题读完 byte[]转换为字符串 context.fileName = new String(context.byteStr,BeanUtil.charset); //System.out.println(context.fileName); //count = -1; context.isReadName = true; //跳出程序 break; } } /* if(context.isReadName && count != -1){ //如果读取完了标题那么读取其他内容 //byteFile[count] = b; } //byteFile[count] = b; */ //这里并未判断是否后面还有数据就加了1 if(in.buf().position()<=in.buf().limit()) count++; } context.count = count; session.setAttribute(CONTEXT, context); //如果内容全部读完 那么就存入并返回数据 //System.out.println("Count:"+context.count+";----StrLen:"+context.strLength); if(context.count == context.strLength-1){ //这里就代表数据全部接收完了 返回出去 BaseMessage message = new BaseMessage(); message.setDataType(context.dataType); StringBean bean = new StringBean(); bean.setFileName(context.fileName); bean.setStrLength(context.strLength); message.setData(bean); outPut.write(message); //重置 context.reset(); }else{ return MessageDecoderResult.NEED_DATA; } //this.finishDecode(session, outPut); return MessageDecoderResult.OK; }