首页
技术小册
AIGC
面试刷题
技术文章
MAGENTO
云计算
视频课程
源码下载
PDF书籍
「涨薪秘籍」
登录
注册
01 | 消息引擎系统ABC
02 | 一篇文章带你快速搞定Kafka术语
03 | Kafka只是消息引擎系统吗?
04 | 我应该选择哪种Kafka?
05 | 聊聊Kafka的版本号
06 | Kafka线上集群部署方案怎么做?
07 | 最最最重要的集群参数配置
09 | 生产者消息分区机制原理剖析
10 | 生产者压缩算法面面观
11 | 无消息丢失配置怎么实现?
12 | 客户端都有哪些不常见但是很高级的功能?
13 | Java生产者是如何管理TCP连接的?
14 | 幂等生产者和事务生产者是一回事吗?
15 | 消费者组到底是什么?
16 | 揭开神秘的“位移主题”面纱
17 | 消费者组重平衡能避免吗?
18 | Kafka中位移提交那些事儿
19 | CommitFailedException异常怎么处理?
20 | 多线程开发消费者实例
21 | Java 消费者是如何管理TCP连接的?
22 | 消费者组消费进度监控都怎么实现?
23 | Kafka副本机制详解
24 | 请求是怎么被处理的?
25 | 消费者组重平衡全流程解析
26 | 你一定不能错过的Kafka控制器
27 | 关于高水位和Leader Epoch的讨论
28 | 主题管理知多少?
29 | 熟悉Kafka动态配置
30 | 怎么重设消费者组位移?
31 | 常见工具脚本大汇总
32 | KafkaAdminClient:Kafka的运维利器
33 | Kafka认证机制用哪家?
34 | 云环境下的授权该怎么做?
35 | 跨集群备份解决方案MirrorMaker
36 | 你应该怎么监控Kafka?
37 | 主流的Kafka监控框架
38 | 调优Kafka,你做到了吗?
39 | 从0搭建基于Kafka的企业级实时日志流处理平台
40 | Kafka Streams与其他流处理平台的差异在哪里?
41 | Kafka Streams DSL开发实例
42 | Kafka Streams在金融领域的应用
当前位置:
首页>>
技术小册>>
Kafka核心技术与实战
小册名称:Kafka核心技术与实战
### 第十一章 无消息丢失配置怎么实现? 在分布式消息队列系统中,Apache Kafka以其高吞吐量、低延迟和可扩展性著称,广泛应用于日志收集、事件流处理、消息传递等多个场景。然而,确保消息在生产、传输、消费过程中不丢失,是Kafka应用开发中极为关键的一环。本章将深入探讨如何在Kafka中配置和实现无消息丢失的策略,涵盖生产者、Broker(服务器)、消费者三方面的配置与优化。 #### 11.1 引言 消息丢失可能发生在Kafka系统的多个环节:生产者发送消息失败、Broker存储失败、消费者处理消息前丢失或处理后未确认。为实现无消息丢失,我们需要从这三个维度出发,合理配置Kafka及其客户端,并考虑系统容错和恢复机制。 #### 11.2 生产者配置 ##### 11.2.1 启用acks Kafka生产者发送消息时,可以通过`acks`参数控制消息确认的级别。要确保消息不丢失,应将`acks`设置为`all`(或`-1`),这意味着只有当所有副本都成功写入消息后,生产者才会收到来自服务器的确认。 ```properties acks=all ``` ##### 11.2.2 设置retries和retry.backoff.ms 在网络波动或Kafka集群负载较高时,消息发送可能会暂时失败。通过设置`retries`(重试次数)和`retry.backoff.ms`(重试间隔),生产者可以在遇到临时故障时自动重试发送消息。 ```properties retries=10 retry.backoff.ms=100 ``` ##### 11.2.3 使用同步发送 Kafka生产者提供了同步(send)和异步(sendCallback)两种发送消息的方式。为确保每条消息都得到确认,推荐使用同步发送方式。 ```java producer.send(record).get(); // 同步发送并等待结果 ``` ##### 11.2.4 设置batch.size和linger.ms 合理设置`batch.size`(批次大小)和`linger.ms`(等待时间)可以在吞吐量和消息延迟之间找到平衡点,同时也有助于减少因网络问题导致的消息丢失。较大的批次和较长的等待时间可以提高吞吐量,但可能增加消息在内存中等待的风险。 ```properties batch.size=16384 linger.ms=5 ``` #### 11.3 Broker配置 ##### 11.3.1 复制因子 提高消息的可靠性最直接的方式是增加副本数量。Kafka中的每个主题(Topic)分区都可以配置多个副本,通过`replication.factor`设置。确保`replication.factor`至少为3,以提高容错能力。 ```bash bin/kafka-topics.sh --create --topic my-topic --replication-factor 3 --partitions 1 --bootstrap-server localhost:9092 ``` ##### 11.3.2 最小同步副本数 通过`min.insync.replicas`参数设置分区必须有多少个同步副本才能继续接收新的写入请求。这可以防止因副本不足而导致的消息丢失。 ```properties min.insync.replicas=2 ``` ##### 11.3.3 清理策略 Kafka的日志清理策略(如`log.cleanup.policy`)默认是`delete`,即根据`log.retention.hours`或`log.retention.bytes`等参数自动删除旧数据。确保这些参数设置合理,避免意外删除重要消息。 ```properties log.cleanup.policy=delete log.retention.hours=168 log.retention.bytes=1073741824 ``` #### 11.4 消费者配置 ##### 11.4.1 启用自动提交偏移量(谨慎使用) Kafka消费者支持自动和手动提交偏移量。虽然自动提交简化了编程模型,但可能导致消息重复消费或丢失(如消费者崩溃前未成功提交偏移量)。在追求无消息丢失的场景下,建议谨慎使用自动提交,或结合手动提交确保数据一致性。 ```properties enable.auto.commit=false ``` ##### 11.4.2 手动提交偏移量 在消费者处理完消息后,应手动提交偏移量以确保Kafka知道哪些消息已被成功处理。可以选择同步或异步提交方式,根据具体需求决定。 ```java consumer.commitSync(); // 同步提交 // 或 consumer.commitAsync(new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception != null) { // 处理提交失败情况 } } }); ``` ##### 11.4.3 消费者组管理 确保消费者组中的每个消费者都能稳定运行,避免频繁重启导致的消息处理中断。同时,合理设置消费者数量,避免单个消费者过载或闲置。 #### 11.5 监控与日志 ##### 11.5.1 监控Kafka指标 使用Kafka自带的JMX监控、Prometheus等监控工具,实时关注Broker的负载情况、消息堆积情况等关键指标,及时发现并解决问题。 ##### 11.5.2 启用详细日志 在开发和调试阶段,可以启用Kafka的详细日志记录,帮助定位消息丢失的原因。在生产环境中,应适当调整日志级别,避免过多日志影响系统性能。 #### 11.6 容错与恢复 ##### 11.6.1 备份与恢复 定期备份Kafka集群的数据和配置,以便在发生严重故障时能够快速恢复。同时,制定详尽的灾难恢复计划,确保在关键时刻能够迅速响应。 ##### 11.6.2 集群扩展与缩容 随着业务的发展,Kafka集群可能需要扩展或缩容。在操作过程中,应确保数据的一致性和服务的连续性,避免因操作不当导致的消息丢失。 #### 11.7 实战案例 假设一个金融交易系统使用Kafka作为消息队列,要求确保所有交易信息无丢失。在配置Kafka时,可以采取以下策略: - 生产者:设置`acks=all`,`retries=10`,`retry.backoff.ms=100`,使用同步发送方式,并合理设置`batch.size`和`linger.ms`。 - Broker:为每个主题设置`replication.factor=3`,`min.insync.replicas=2`,并确保日志清理策略合理。 - 消费者:禁用自动提交偏移量,采用手动同步提交偏移量,确保消息处理完成后才更新偏移量。 - 监控与日志:启用JMX监控,设置合理的日志级别,定期备份数据。 - 容错与恢复:制定详尽的灾难恢复计划,确保在集群扩展或缩容时数据的一致性和服务的连续性。 #### 11.8 总结 实现Kafka无消息丢失配置是一个涉及生产者、Broker、消费者三方面配置与优化的复杂过程。通过合理配置`acks`、`retries`、`batch.size`等参数,确保消息在生产阶段不被丢失;通过提高复制因子、设置最小同步副本数等策略,增强Broker的容错能力;通过禁用自动提交偏移量、采用手动提交偏移量等方式,确保消息在消费阶段不被遗漏。同时,通过监控与日志、容错与恢复等措施,进一步提高Kafka系统的稳定性和可靠性。
上一篇:
10 | 生产者压缩算法面面观
下一篇:
12 | 客户端都有哪些不常见但是很高级的功能?
该分类下的相关小册推荐:
Kafka面试指南
Kafka核心源码解读
kafka入门到实战
Kafka 原理与源码精讲
消息队列入门与进阶