Spring-Springkafka基本用法
简介
跟 Spring Data Redis、Spring Data MongoDB、Spring Data JPA 等项目类似,Spring Kafka 提供了在 Spring 应用中通过简单配置从而访问 Kafka 集群的途径。
本文主要介绍在 Spring 应用中消息生产者如何向 Kafka 集群发送消息、消息消费者如何消费消息、如何批量消费消息以及多消费者组同时消费消息等等。
使用 Spring Kafka 的最新特性,以下测试代码采用了 Spring Boot 2.0.0 构建
Spring Kafka 的基本用法
在 pom.xml 中添加依赖:
1 | <dependency> |
基本配置
springBoot properties
1 | #kafka,更多配置:org.springframework.boot.autoconfigure.kafka.KafkaProperties |
普通 Maven 构建项目,或者想要自定义更多配置,可以采用 JavaConfig 配置
/**
* 生产者配置信息
*/
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = Maps.newHashMap();
props.put(ProducerConfig.ACKS_CONFIG, "0");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
/**
* 生产者工厂
*/
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* 生产者模板
*/
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
/**
* 消费者配置信息
*/
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());