kafka版本:0.9.0
前面笔者写了一篇文章一文讲清粘包拆包 全面的讲解了TCP粘包拆包相关的问题。下面进行一个简单的总结:
TCP粘包拆包产生的原因是,应用层有意义的数据包,传输层的协议并不了解其含义,不会去根据你的业务内容去分包和发送,只会按照自己的协议格式传送数据。
知道问题的本质后,解决问题就简单了。就需要在应用层收到数据后根据标识判断一下,数据是否完整,如果完整了我们再进行数据包解析,最后交给业务代码处理。
解决粘包拆包问题的方法:
(1)消息定长;
(2)增加特殊字符进行分割,比如每条数据末尾都添加一个换行符;
(3)自定义协议,例如 len + data,其中len是代表data的字节长度;
kakfa是如何解决粘包拆包问题的呢?
首先看粘包,也就是接收到了多余的数据,该如何拆分数据包,读取到正确完整的数据包?
kafka使用到是上面的第三种解决方法,自定义协议格式。
kafka接收到数据包后,会进行这些操作:
- 先读取前4字节,转换为一个int,即长度;
- 根据长度申请内存buffer;
- 最后读取指定大小的数据到申请好的buffer中;
具体代码实现在:KafkaChannel.read()
1 | arduino复制代码public NetworkReceive read() throws IOException { |
接下来,再看看拆包代码。拆包也就是接收到数据不够组成一条完整的数据,该如何等待完整的数据包
最主要的代码,在上面的receive.complete()方法中的判断逻辑。
1 | arduino复制代码public boolean complete() { |
- !size.hasRemaining():接收到的len数据已经读取完成;
- !buffer.hasRemaining():接收到的data数据已经读取完成;
两个条件同时成立,也就是说既要读取完len,也要读取完data,才算读取了完整的一条数据。
只要一条数据没读完整,那么receive.complete()函数返回值就是false,那么最终返回的结果就是null,等待下一次OP_READ事件的时候再接着上次没读完的数据读取,直到读取一条完整的数据为止。
那么这次读取的数据就会暂存起来,存入stageReceives这个数据结构中等待下一次读取。
1 | scss复制代码if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) { |
END
本文转载自: 掘金