一光年

[Spring-Kafka] SpringBoot上使用Kafka的最简Demo

2019.09.06

1. 安装和启动Kafka

参见前篇

2. 加入Maven依赖

在pom.xml中加入kafka相关dependency。

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

3. 配置kafka

修改application.properties,加入以下内容

...

# kafka配置
spring.kafka.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=kafka-test
spring.kafka.consumer.auto-offset-reset=earliest

4. 新建-消息发送类

新建类KafkaProducer.java

...

@Component
public class KafkaProducer {
  
    @Autowired
    private KafkaTemplate template;

    /**
     * 发送消息
     * @param topic 主题
     * @param content 内容
     * @throws NrException 异常
     */
    public void sendMessage(String topic, String content) throws Exception {

        try {
            template.send(topic, content);
        } catch (Exception e) {
            log.error("发送KAFKA消息失败,内容=" + content);
            throw new Exception("发送KAFKA消息失败");
        }
    }
}

5. 新建-测试Controller

Controller代码分两部分,一部分调用KafkaProducer来发送消息,一部分用来接受消息。

...

@RestController
@RequestMapping("/kafka")
@Slf4j
public class KafkaTestController {

    @Autowired
    private KafkaProducer kafkaProducer;

    @GetMapping("")
    public void testKafka() throws Exception {

        // 发送producer消息
        kafkaProducer.sendMessage("kafka-test", "Test Content");
    }

    @KafkaListener(topics="kafka-test3")
    public void handleReceiveKafkaMsg(ConsumerRecord<String, String> record) throws Exception {
    
        // 接受消息
        String content = record.value();

        log.info("接收到Kafka消息,内容= " + content);
    }
}

## 6. 测试

启动SpringBoot应用,打开浏览器输入127.0.0.1:8080/kafka,从运行log中看到以下内容即算成功:
> 接收到Kafka消息,内容= Test Content