APP下载

怎样使用Spring Boot的Kafka讯息机制实现请求响应模型

消息来源:baojiabao.com 作者: 发布时间:2026-05-20

报价宝综合消息怎样使用Spring Boot的Kafka讯息机制实现请求响应模型

Spring Apache Kafka (spring-kafka)提供了基于卡夫卡的讯息传递解决方案的高阶抽象。传统的请求响应模型中,响应容易被堵塞,造成两个系统耦合,呼叫者需要等待到响应返回才能继续做自己的工作,这在分散式系统中,流量比较大情况下几乎不现实,使用讯息模型只能每次请求一个讯息,响应再来一个讯息,用两个讯息组合成请求响应,虽然程式设计没有传统请求响应方便,但是系统松耦合,相互协调好。

spring-kafka使用起来了很简单:

引入Maven包:

org.springframework.kafka

spring-kafka

讯息生产者程式码:

@Autowired

KafkaTemplate kafkaTemplate;

public void send(@RequestParam String productId) {

kafkaTemplate.send("cdProduct", productId);

}

生产者的配置application.property:

spring.kafka.consumer.group-id=ecomm

spring.kafka.bootstrap-servers=localhost:9092

讯息接受者程式码,这里接受到productId以后,查询到Product物件,再给传送者发回去,模拟请求-响应模型:

@KafkaListener(topics = "cdProduct")

public void onAction(ConsumerRecord, ?> consumerRecord) {

System.out.printf("接受到=" + consumerRecord);

String productId = (String) consumerRecord.value();

System.out.printf("接受到productId=" + productId);

Product product = productRepo.findById(productId).orElse(new Product());

kafkaTemplate.send("cdProductReply", product);

}

消费者需要配置 JSON 序列化将Product变成JSON,这里只要配置在application.property中即可,无需做程式码生成自己ProducerFactory工厂:

spring.kafka.consumer.group-id=ecomm

spring.kafka.bootstrap-servers=localhost:9092

spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

生产者接受到消费者查询到的Product放入自己的快取。

@KafkaListener(topics = "cdProductReply")

public void onAction(Product product) {

System.out.printf("接受到新的Product" + product.getName());

cache.put(product.getId(), product);

}

为了实现Product直接序列化接受,需要在自己的入口类Application中加入:

@Bean

public StringJsonMessageConverter jsonConverter() {

return new StringJsonMessageConverter();

}

无需配置监听器连线工厂ConcurrentKafkaListenerContainerFactory即可有用。

感谢您的观看,喜欢的小伙伴可以点个赞!!!专注Java、大资料知识干货及相关领域动态分享,请多多关注哦!

2019-12-29 01:50:00

相关文章