博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
3-kafka0.10 生产者详解
阅读量:6081 次
发布时间:2019-06-20

本文共 3720 字,大约阅读时间需要 12 分钟。

hot3.png

Kafka在0.8.1版本的时候重写了Producer。在0.9版本中又重写了Consumer,纯Java,没有了对Scala和ZK的依赖。

一、消息的发送流程:

  • KafkaProducer:
    • 等待 topic meteData 数据的更新,序列化 key,value;
    • 根据 topic 的 partition 个数和 key 的值,计算该条消息所属的 partition,将消息 append 给 RecordAccumulator;
  • RecordAccumulator:
    • 使用Map类型的 batches ( ConcurrentMap<TopicPartition, Deque<RecordBatch>> ) 维护发向所有 topic 的消息数据;
    • append 方法内将发送的这条消息 tryAppend 进对应 Deque 最后一个 RecordBatch 中。如果空间不够,该 RecordBatch 就会 flip ByteBuffer,进入只读状态。空间不够或者失败则会在 Deque 末端尝试新起一个 RecordBatch;
  • Sender:
    • KafkaProducer 初始化的时候会启一个 KafkaThread 线程,运行 Runnable 的 Sender 对象,不停地发送 RecordAccumulator 内累积的消息;
    • 调用 RecordAccumulator 的 ready 方法收集到此次发送任务的目的地,即 Broker Leader 的列表,消息都是发送给所属 Partition 目前是 Leader 的那个 Broker 节点的;
    • 调用 NetworkClient 的 ready 方法,判断收集到的每个 Leader 节点是否是 connected 状态,否的话会被移除;
    • 调用 RecordAccumulator 的 drain 方法,获得发送给每个 Broker 节点的 RecordBatch 列表。将发往每个 Broker 节点的 RecordBatch 数据,封装成一个 ClientRequest,主要的消息内容由 RequestSend 内的 Struct 结构表示。Struct 内部已将消息按 topic 分开,并是按 kafka 消息的 schema 生成,具有如下的嵌套结构:{“acks”:1,”topic_data”:[{"topic": "xxx", "data": [{"partition": 1, "record_set": ByteBuffer}]}]}。ClientRequest 内还包括发完消息后的 CallBack 处理逻辑;
    • 遍历每个 ClientRequest,调用 NetworkClient 的 send 方法,将 RequestSend 放进 Selector.channels 内对应的 KafkaChannel 中;
    • 调用 NetworkClient 的 poll 方法,将 RequestSend 真正的发送给 Broker;
  • RecordAccumulator:
    • ready 方法中检查每个 Deque 的第一个 RecordBatch 是否是 ready 的状态,并把 RecordBatch 对应的 Broker Leader 节点收集起来好向它们发送消息。判断 RecordBatch 是否 ready 涉及到这个 bath 是否满了、距离上一次检查是否够久等。例如如果 RecordBatch 所在的 Deque 长度大于1,证明这个 RecordBatch 曾今被 append 的时候发现已经满了,现在是只读待发状态,是 ready 的。需要等待的时长受是否处在 backoff 时期,是否超过 linger 时长等影响;
    • drain 方法中遍历收集到的、 connected 状态的 Broker Leader 节点,根据每个节点下归属的 Partition 对应从 batches 中的 Deque 中取出第一个 RecordBatch,拼装成 Map<Integer, List<RecordBatch>> 的结构,key 是 Broker 节点的 id, value 是发给该节点的 RecordBatch 列表;
  • NetworkClient:
    • 使用 ClusterConnectionStates (Map<String, NodeConnectionState>) 维护着每个 Broker 节点的连接状态;
    • ready 方法中判断是否跟指定的 Broker 节点是 connected 的状态,否的话会通过 Selector 的 connect 方法初始化跟其的连接,建立 SocketChannel 并 register,KafkaChannel 会 attach 在 SelectionKey 上 ;
    • poll 方法中调用 Selector 的 poll 方法,处理 Selector 内的 completedSends,completedReceives等,处理 ClientResponse, 遍历 RecordBatch 内的List<Thunk>,完成回调逻辑的处理;
  • Selector:
    • 使用 channels (Map<String, KafkaChannel>) 维护着与每个 Broker 节点的 Channel;
    • 使用 completedSends (List<Send>)  维护着已经发送完毕的 RequestSend
    • 使用 completedReceives (List<NetworkReceive>)  维护着来自 Broker 的 response;
    • poll 方法中遍历 SelectionKey, 如果 KafkaChannel ready + SelectionKeywritable,那么就将 KafkaChannel 中的 RequestSend 发送,并维护更新 completedSends;如果 KafkaChannel ready + SelectionKey readable,那么就接受来自 Broker 的 NetworkReceive,并维护更新 completedReceives;

二、延迟与吞吐量的问题:

Case1: Producer将消息一条接一条发送到 Broker,假设发送延迟是 2ms,那么 1s 可以发送 500 条消息;

Case2: Producer将消息延迟 8ms 发送,假设 8ms 内收集到 20 条消息,那么 1s 可以发送 2000 条消息;

两个重要的参数:

batch.size:  This is an upper limit of how many messages Kafka Producer will attempt to batch before sending, specified in bytes (default is 16K bytes). Kafka may send batches before this limit is, but will always send when this limit is reached. Therefore setting this limit too low will hurt throughput without improving latency. The main reason to set this low is lack of memory – Kafka will always allocate enough memory for the entire batch size, even if latency requirements cause it to send half-empty batches.

linger.ms:  How long will the producer wait before sending in order to allow more messages to get accumulated in the same batch.  (default is 0). Sometimes we are willing to wait a bit longer in order to improve the overall throughput at the expense of a little higher latency.

三、总结:

Kafka 的 Producer 通过把将要发送的消息先放在 RecordAccumulator 的 batches 内累积一段时间,然后进行小批量提交给 Broker 的方式,减少网络往返的开销,牺牲一点latency 换取 throughput。

转载于:https://my.oschina.net/u/1024107/blog/745733

你可能感兴趣的文章
支付宝app支付
查看>>
GitHub又受攻击了
查看>>
flask权限管理
查看>>
Meteor全栈开发平台 - 不仅仅是前端
查看>>
苹果移除openssl头文件
查看>>
前端碎片知识储备
查看>>
Redisson 成为 GitHub 里星星最多的 Redis Java 客户端
查看>>
C缺陷与陷阱(C Traps and Pitfalls)学习笔记
查看>>
strtr+array_combine实现简单的敏感词过滤
查看>>
域名注册商 GoDaddy 被指悄悄在托管网站页面植入脚本
查看>>
微服务架构 - 巧妙获取被墙的Docker镜像
查看>>
java EasyExcel集成及工具类使用
查看>>
服务器故障,报警声介绍
查看>>
生鲜电商“朴朴超市”完成B1轮5500万美元融资,平均送达用时24min ...
查看>>
5G商用道阻且长,加快其进程的最佳路径是什么?
查看>>
HTA免杀
查看>>
【视频分析】大规模机器学习在爱奇艺视频分析理解中的实践
查看>>
如何用纯 CSS 创作锡纸撕开的文字效果
查看>>
服务器禁ping的好处和坏处是什么
查看>>
怎么规划一个零基础学习Unity3D的“方法”或者“流程”?
查看>>