Spring-Springkafka基本用法

简介

跟 Spring Data Redis、Spring Data MongoDB、Spring Data JPA 等项目类似,Spring Kafka 提供了在 Spring 应用中通过简单配置从而访问 Kafka 集群的途径。

本文主要介绍在 Spring 应用中消息生产者如何向 Kafka 集群发送消息、消息消费者如何消费消息如何批量消费消息以及多消费者组同时消费消息等等。

使用 Spring Kafka 的最新特性,以下测试代码采用了 Spring Boot 2.0.0 构建

Spring Kafka 的基本用法

在 pom.xml 中添加依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

基本配置

springBoot properties
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#kafka,更多配置:org.springframework.boot.autoconfigure.kafka.KafkaProperties
#指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
#指定默认topic id
spring.kafka.template.default-topic=topic-test
#指定listener 容器中的线程数,用于提高并发量
spring.kafka.listener.concurrency=3

#=============== provider =======================
#生产者重试次数
spring.kafka.producer.retries=0
#每次批量发送消息的数量 16K
spring.kafka.producer.batch-size=16384
# 32M
spring.kafka.producer.buffer-memory=33554432
#指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer =======================
#指定默认消费者group id
spring.kafka.consumer.group-id=myGroup1
#若设置为earliest,那么会从头开始读partition
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
普通 Maven 构建项目,或者想要自定义更多配置,可以采用 JavaConfig 配置
    /**
     *  生产者配置信息
     */
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = Maps.newHashMap();
        props.put(ProducerConfig.ACKS_CONFIG, "0");
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    /**
     *  生产者工厂
     */
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    /**
     *  生产者模板
     */
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    /**
     *  消费者配置信息
     */
    @Bean
     public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());