Apache Kafka 是一个分布式流处理平台,它能够以高吞吐量的方式处理大规模数据流。作为大数据生态中的重要一员,Kafka 广泛应用于日志收集、消息系统、事件流处理等多个场景。为了高效地与 Kafka 交互,Apache 官方提供了多种语言的客户端库,其中 Java 客户端因其直接性、丰富性和广泛的使用基础而备受青睐。本章将深入介绍如何使用 Kafka Java 客户端构建生产者和消费者,以实现数据的发布和消费。
在开始之前,请确保你已经安装了 Kafka 服务器,并且 Kafka 服务正在运行。同时,你需要将 Kafka 的 Java 客户端库添加到你的项目中。如果你使用 Maven,可以在 pom.xml
文件中添加如下依赖(版本号请根据实际情况调整):
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>你的Kafka客户端版本号</version>
</dependency>
生产者负责将数据发布到 Kafka 的主题(Topic)中。构建生产者主要涉及到设置配置参数、创建生产者实例以及发送消息等步骤。
Kafka 生产者的配置项非常灵活,允许你根据具体需求调整其行为。以下是一些常见的配置项:
bootstrap.servers
:Kafka 集群的地址列表,格式为 host1:port1,host2:port2,...
。key.serializer
和 value.serializer
:用于指定键和值的序列化器,常用的有 org.apache.kafka.common.serialization.StringSerializer
。acks
:生产者收到哪些确认时认为消息发送成功,0
表示不等待任何服务器响应,1
表示等待领导节点确认,-1
或 all
表示等待所有副本确认。retries
和 retry.backoff.ms
:当消息发送失败时,重试次数和重试之间的时间间隔。batch.size
:批量发送消息时,一个批次的大小(以字节为单位)。
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class SimpleProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
for (int i = 0; i < 100; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", Integer.toString(i), "message-" + i);
producer.send(record, (RecordMetadata metadata, Exception e) -> {
if (e != null) {
e.printStackTrace();
} else {
System.out.printf("The offset of the record we just sent is: %d%n", metadata.offset());
}
});
}
}
}
}
消费者负责从 Kafka 主题中读取数据。与生产者类似,构建消费者也涉及配置参数的设置、消费者实例的创建以及消息的读取等步骤。
消费者同样有一系列配置项,用以调整其行为:
bootstrap.servers
:同上,Kafka 集群的地址列表。group.id
:消费者所属的消费组 ID,用于实现负载均衡和故障恢复。key.deserializer
和 value.deserializer
:键和值的反序列化器。auto.offset.reset
:当找不到消费者组的偏移量或偏移量无效时(例如,数据已被删除),如何操作。enable.auto.commit
:是否自动提交偏移量。auto.commit.interval.ms
:自动提交偏移量的时间间隔(仅当 enable.auto.commit
为 true
时有效)。
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class SimpleConsumer {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Arrays.asList("test-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
}
在生产者中,除了同步发送消息外,还可以利用回调机制实现异步发送,从而提高性能。如上述生产者示例所示,通过为 send
方法提供一个回调函数,可以在消息发送成功后执行特定的逻辑。
Kafka 消费者组中的消费者会自动分配订阅主题的分区。当消费者加入或离开组时,Kafka 会触发再平衡过程,以重新分配分区。了解这一机制对于构建高可用的消费者应用至关重要。
Kafka 允许消费者控制其读取的数据偏移量。自动提交偏移量虽然简单,但在某些场景下可能会导致数据重复消费或丢失。因此,根据应用场景,你可能需要手动管理偏移量。
Kafka 从 0.11 版本开始支持事务性消息,允许生产者和消费者以事务的方式操作,确保数据的一致性和准确性。这对于需要严格数据一致性的场景非常有用。
通过本章的学习,你应该已经掌握了如何使用 Kafka Java 客户端构建生产者和消费者,并能够处理基本的消息发布和消费任务。然而,Kafka 的功能远不止于此,随着你对 Kafka 的深入了解,你将能够探索更多高级特性和最佳实践,以更好地满足你的业务需求。