这篇文章展示了如何配置Spring Kafka和Spring Boot以使用JSON传送讯息并以多种格式接收它们:JSON,纯字串或字节阵列。基于此配置,您还可以将Kafka生成器从传送JSON切换到其他序列化方法。
此示例应用程序还演示了同一消费组中三个Kafka消费者的使用情况,因此讯息在三者之间进行负载平衡。每个消费者实现不同的反序列化方法。
您可以了解一些Kafka概念,如Consumer Group和Topic分割槽。
多个消费者
要更好地理解配置,请检视下图。如您所见,我们建立了一个包含三个分割槽的Kafka主题。在消费者方面,只有一个应用程序,但它实现了具有相同group.id 属性的三个Kafka消费者。

当我们启动应用程序时,Kafka会为每个消费者分配一个不同的分割槽。消费者组将以负载平衡的方式接收讯息。在这篇文章的后面,如果我们让它们具有不同的组识别符号,你会看到有什么区别(如果你熟悉Kafka,你可能知道结果)。
示例用例
我们要构建的逻辑很简单。每次我们呼叫指定REST端点hello,应用程序将生成可配置数量的讯息,并使用序列号作为Kafka金钥将它们传送到同一主题,等待消费所有讯息后返回Hello Kafka!
设定Kafka和Spring Boot
首先,您需要有一个正在执行的Kafka丛集才能连线。对于这个应用程序,我将在单个节点中使用docker-compose和Kafka。这显然远不是一个生产配置,但它足以满足这篇文章的目标。
以下是docker-compose.yml配置
version: \'2\'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_AUTO_CREATE_TOPICS_ENABLE: \'false\'
请注意,我将Kafka配置为不自动建立主题(最后一行配置)。我们将在Spring Boot应用程序建立我们的主题,因为我们想要传递一些自定义配置。如果你想玩玩这些Docker影象(例如使用多个节点),请检视wurstmeister/zookeeper影象文件
要启动Kafka和Zookeeper容器,只要在上述配置目录下执行 docker-compose up
获取SpringBoot应用程序骨架的最简单方法是到start.spring.io,使用使用YAML进行配置配置application.yml:
spring:
kafka:
consumer:
group-id: tpd-loggers
auto-offset-reset: earliest
# change this property if you are using your own
# Kafka cluster or your Docker IP is different
bootstrap-servers: localhost:9092
tpd:
topic-name: advice-topic
messages-per-request: 10
第一部分属性是Spring Kafka配置:
Kafka的组标识 group-idauto-offset-reset 属性设定为earliest,这意味着当消费者没有发现偏移量(指标)时,消费者将开始从最早的讯息中读取讯息。第三行用于连线Kafka的服务器,在这种情况下,如果您使用单节点配置,则是唯一可用的服务器。请注意,如果使用预设值 localhost:9092,则此属性是多余的 。第二部分是特定于应用程序的自定义配置。我们定义Kafka主题名称以及每次执行HTTP REST请求时要传送的讯息数。
Message类
这是我们将用作Kafka讯息的Java类。这里没有什么复杂的,只是@JsonProperty 在建构函式引数中带有注释的不可变类, 因此Jackson可以正确地反序列化它。
import com.fasterxml.jackson.annotation.JsonProperty;
public class PracticalAdvice {
private final String message;
private final int identifier;
public PracticalAdvice(@JsonProperty("message") final String message,
@JsonProperty("identifier") final int identifier) {
this.message = message;
this.identifier = identifier;
}
public String getMessage() {
return message;
}
public int getIdentifier() {
return identifier;
}
@Override
public String toString() {
return "PracticalAdvice::toString() {" +
"message=\'" + message + \'\'\' +
", identifier=" + identifier +
\'}\';
}
}
Spring Boot中的Kafka Producer配置
为了简化应用程序,我们将在Spring Boot类中新增配置。最后,我们希望在此处包含生产者和消费者配置,并使用三种不同的变体进行反序列化。请记住,您可以在GitHub储存库中找到完整的源代码。
首先,让我们关注Producer配置:
@SpringBootApplication
public class KafkaExampleApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaExampleApplication.class, args);
}
@Autowired
private KafkaProperties kafkaProperties;
@Value("${tpd.topic-name}")
private String topicName;
// Producer configuration
@Bean
public Map producerConfigs() {
Map props =
new HashMap(kafkaProperties.buildProducerProperties());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return props;
}
@Bean
public ProducerFactory producerFactory() {
return new DefaultKafkaProducerFactory(producerConfigs());
}
@Bean
public KafkaTemplate kafkaTemplate() {
return new KafkaTemplate(producerFactory());
}
@Bean
public NewTopic adviceTopic() {
return new NewTopic(topicName, 3, (short) 1);
}
}
在此配置中,我们将设定应用程序的两个部分:
KafkaTemplate例项,这是我们将使用它将讯息传送到卡夫卡。我们不想使用预设版本,因此我们需要在Spring的应用程序上下文中注入我们的自定义这个例项版本。我们键入(使用泛型)KafkaTemplate以具有普通的String键和Object作为值。将Object作为值的原因是我们希望使用相同的模板传送多个物件型别。KafkaTemplate接受我们在配置中建立的ProducerFactory作为引数。我们使用的ProducerFactory是预设的,但是我们需要在这里显式配置,因为我们想要传递我们的自定义生成器配置。Producer Configuration是一个简单的键值对映。我们使用预设属性@Autowired 来获取 KafkaProperties bean,然后构建我们的map,传递生成器的预设值,并覆盖预设的Kafka键和值序列化器。生产者将使用Kafka库将键序列化为字串,StringSerializer 并且将对值执行相同的操作,但这次使用JSON JsonSerializer,在本例中由Spring Kafka提供。我们将要使用的Kafka主题。通过注入一个NewTopic 例项,我们指示Kafka的AdminClient bean(已经在上下文中)建立一个具有给定配置的主题。第一个引数是名称(advice-topic,来自app配置),第二个是分割槽数量(3),第三个引数是复制因子(一个,因为我们无论如何都使用单个节点)。关于Java的Kafka Serializers和Deserializers
Strings 的核心Kafka库(javadoc)中提供了一些基本的Serializer ,所有型别的阵列类和字节阵列,以及Spring Kafka(javadoc)提供的JSON类。
最重要的是,您可以通过实现Serializer or ExtendedSerializer或其相应的反序列化版本来建立自己的序列化器和反序列化器。这为您提供了很大的灵活性,可以优化通过Kafka传输的资料量。正如您在这些界面中看到的那样,Kafka使用普通字节阵列,因此,无论您使用何种复杂型别,都需要将其转换为byte[]。
知道这一点,你可能想知道为什么有人想要在Kafka上使用JSON。由于您将物件转换为JSON然后转换为字节阵列,因此效率非常低。但是你必须考虑这样做有两个主要优点:
JSON比人类更可读,而不是字节阵列。如果您想除错或分析Kafka主题的内容,那么它将比检视裸字节更简单。JSON是标准,而预设字节阵列序列化器依赖于程式语言实现。因此,如果要使用来自多种程式语言的讯息,则需要在所有这些语言中复制(反)序列化器逻辑。另一方面,如果您担心Kafka中的流量负载,储存或(反)序列化速度,您可能需要选择字节阵列,甚至可以选择自己的序列器/解串器实现。
使用Spring Boot和Kafka传送讯息
我们建立一个Rest Controller,并在KafkaTemplate 请求端点时通过注入来生成一些JSON讯息。
这是控制器的第一个实现,仅包含产生讯息的逻辑。
@RestController
public class HelloKafkaController {
private static final Logger logger =
LoggerFactory.getLogger(HelloKafkaController.class);
private final KafkaTemplate template;
private final String topicName;
private final int messagesPerRequest;
private CountDownLatch latch;
public HelloKafkaController(
final KafkaTemplate template,
@Value("${tpd.topic-name}") final String topicName,
@Value("${tpd.messages-per-request}") final int messagesPerRequest) {
this.template = template;
this.topicName = topicName;
this.messagesPerRequest = messagesPerRequest;
}
@GetMapping("/hello")
public String hello() throws Exception {
latch = new CountDownLatch(messagesPerRequest);
IntStream.range(0, messagesPerRequest)
.forEach(i -> this.template.send(topicName, String.valueOf(i),
new PracticalAdvice("A Practical Advice", i))
);
latch.await(60, TimeUnit.SECONDS);
logger.info("All messages received");
return "Hello Kafka!";
}
}
在建构函式中,我们传递一些配置引数和我们自定义的KafkaTemplate,以传送String键和JSON值。然后,当API客户端请求/hello 端点时,我们传送10条讯息(这是配置值),然后我们阻止执行绪最多60秒。锁存器解锁后,我们将讯息返回Hello Kafka! 给客户端。
这整个锁定的想法不是在实际应用程序中看到的模式,但它对于这个例子来说是好的。这样,您可以检查收到的邮件数量。如果您愿意,可以在接收讯息之前删除锁存器并返回“Hello Kafka!”讯息。
Kafka消费者配置
正如前面在本文中所提到的,我们希望演示使用Spring Boot和Spring Kafka进行反序列化的不同方法,同时了解当多个消费者属于同一个消费者组时,多个消费者如何以负载均衡的方式工作。
@SpringBootApplication
public class KafkaExampleApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaExampleApplication.class, args);
}
@Autowired
private KafkaProperties kafkaProperties;
@Value("${tpd.topic-name}")
private String topicName;
// Producer configuration
// omitted...
// Consumer configuration
// If you only need one kind of deserialization, you only need to set the
// Consumer configuration properties. Uncomment this and remove all others below.
// @Bean
// public Map consumerConfigs() {
// Map props = new HashMap(
// kafkaProperties.buildConsumerProperties()
// );
// props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
// StringDeserializer.class);
// props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
// JsonDeserializer.class);
// props.put(ConsumerConfig.GROUP_ID_CONFIG,
// "tpd-loggers");
//
// return props;
// }
@Bean
public ConsumerFactory consumerFactory() {
final JsonDeserializer jsonDeserializer = new JsonDeserializer();
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory(
kafkaProperties.buildConsumerProperties(), new StringDeserializer(), jsonDeserializer
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
// String Consumer Configuration
@Bean
public ConsumerFactory stringConsumerFactory() {
return new DefaultKafkaConsumerFactory(
kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new StringDeserializer()
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerStringContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(stringConsumerFactory());
return factory;
}
// Byte Array Consumer Configuration
@Bean
public ConsumerFactory byteArrayConsumerFactory() {
return new DefaultKafkaConsumerFactory(
kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new ByteArrayDeserializer()
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerByteArrayContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory =
new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(byteArrayConsumerFactory());
return factory;
}
}
这种配置可能看起来很繁琐,但考虑到为了演示这三种类型的反序列化,我们重复了三次建立ConsumerFactory和KafkaListenerContainerFactory例项,以便我们可以在消费者中切换它们。
配置消费者的基本步骤是:
以类似Producer的方式设定Consumer属性。我们可以跳过此步骤,因为我们需要的唯一配置是Spring Boot属性档案中指定的Group ID,以及我们将在建立自定义消费者和KafkaListener工厂,从而实现自定义的键和值反序列化器。如果您只需要一个配置,意味着始终使用相同型别的Key和Value反序列化器,那么被注释掉的程式码块符合你这一需求,可将反序列化器型别调整为你要使用的型别。要使用的KafkaListenerContainerFactory就要建立ConsumerFactory, 我们建立三个,在每种情况下将反序列化器切换为1)JSON反序列化器,2)字串反序列化器和3)字节阵列反序列化器。[list=1]请注意,在建立JSON反序列化器之后,我们将包含一个额外的步骤来指定我们信任所有包。如果需要,您可以在应用程序中对其进行微调。如果我们不这样做,我们将收到一条错误讯息,上面写着:java.lang.IllegalArgumentException: The class [] is not in the trusted packages。使用先前配置的Consumer Factory构造KafkaListenerContainerFactory(并发容器工厂)。同样,我们这样做三次,每个例项使用不同的一个。使用Spring Boot和Kafka以JSON,String和byte []格式接收讯息
现在是时候展示Kafka消费者的样子了。我们将使用@KafkaListener 注释,因为它简化了过程并负责对传递的Java型别进行反序列化。
@RestController
public class HelloKafkaController {
private static final Logger logger =
LoggerFactory.getLogger(HelloKafkaController.class);
private final KafkaTemplate template;
private final String topicName;
private final int messagesPerRequest;
private CountDownLatch latch;
public HelloKafkaController(
final KafkaTemplate template,
@Value("${tpd.topic-name}") final String topicName,
@Value("${tpd.messages-per-request}") final int messagesPerRequest) {
this.template = template;
this.topicName = topicName;
this.messagesPerRequest = messagesPerRequest;
}
@GetMapping("/hello")
public String hello() throws Exception {
latch = new CountDownLatch(messagesPerRequest);
IntStream.range(0, messagesPerRequest)
.forEach(i -> this.template.send(topicName, String.valueOf(i),
new PracticalAdvice("A Practical Advice", i))
);
latch.await(60, TimeUnit.SECONDS);
logger.info("All messages received");
return "Hello Kafka!";
}
@KafkaListener(topics = "advice-topic", clientIdPrefix = "json",
containerFactory = "kafkaListenerContainerFactory")
public void listenAsObject(ConsumerRecord cr,
@Payload PracticalAdvice payload) {
logger.info("Logger 1 [JSON] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
latch.countDown();
}
@KafkaListener(topics = "advice-topic", clientIdPrefix = "string",
containerFactory = "kafkaListenerStringContainerFactory")
public void listenasString(ConsumerRecord cr,
@Payload String payload) {
logger.info("Logger 2 [String] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
latch.countDown();
}
@KafkaListener(topics = "advice-topic", clientIdPrefix = "bytearray",
containerFactory = "kafkaListenerByteArrayContainerFactory")
public void listenAsByteArray(ConsumerRecord cr,
@Payload byte[] payload) {
logger.info("Logger 3 [ByteArray] received key {}: Type [{}] | Payload: {} | Record: {}", cr.key(),
typeIdHeader(cr.headers()), payload, cr.toString());
latch.countDown();
}
private static String typeIdHeader(Headers headers) {
return StreamSupport.stream(headers.spliterator(), false)
.filter(header -> header.key().equals("__TypeId__"))
.findFirst().map(header -> new String(header.value())).orElse("N/A");
}
}
这里有三个消费者。首先,让我们描述@KafkaListener 注释的引数:
所有消费者都使用相同的主题advice-topic。此引数是必需的。引数clientIdPrefix 是可选的。我在这里使用它,让日志更人性化。您将知道哪个消费者通过其名称字首做什么。卡夫卡将附加一个这个字首的数字。containerFactory 引数是可选的,您还可以依赖命名约定。如果不指定它,它将查询具有名称的bean kafkaListenerContainerFactory,这也是Spring Boot在自动配置Kafka时使用的预设名称。您也可以使用相同的名称覆盖它(尽管对于不了解约定的人来说它看起来很神奇)。我们需要明确地设定它,因为我们想为每个监听器使用不同的一个,以便能够使用不同的反序列化器。请注意,传递给所有消费者的第一个引数是相同的:一个 ConsumerRecord和@Payload,如果我们使用第一个,则第二个 是多余的。我们可以访问ConsumerRecord的方法value() 获得Payload,但我这里写在这里,让你看到它是多么简单直接通过反序列化得到的Payload。
Kafka中的TypeId标头
标头 __TypeId__预设情况下由Kafka库自动设定。我这里使用工具方法typeIdHeader 获得字串,因为从ConsumerRecord的toString() 方法只能看到字节组输出。
TypeId标头可用于反序列化,因此您可以找到要将资料对映到的型别。但是JSON反序列化却不需要它,因为这个特殊的反序列化器是由Spring团队制作,并且它们从方法的引数中推断出型别。
执行
现在我们完成了Kafka生产者和消费者,我们可以执行Kafka和Spring Boot应用程序:
$ docker-compose up -d
Starting kafka-example_zookeeper_1 ... done
Starting kafka-example_kafka_1 ... done
$ mvn spring-boot:run
pring Boot应用程序启动,消费者在Kafka中注册,Kafka为它们分配了一个分割槽。我们使用三个分割槽配置主题,因此每个消费者都会分配其中一个分割槽。
[Consumer clientId=string-0, groupId=tpd-loggers] Successfully joined group with generation 28
[Consumer clientId=string-0, groupId=tpd-loggers] Setting newly assigned partitions [advice-topic-2]
[Consumer clientId=bytearray-0, groupId=tpd-loggers] Successfully joined group with generation 28
[Consumer clientId=bytearray-0, groupId=tpd-loggers] Setting newly assigned partitions [advice-topic-0]
[Consumer clientId=json-0, groupId=tpd-loggers] Successfully joined group with generation 28
[Consumer clientId=json-0, groupId=tpd-loggers] Setting newly assigned partitions [advice-topic-1]
partitions assigned: [advice-topic-1]
partitions assigned: [advice-topic-2]
partitions assigned: [advice-topic-0]
我们现在可以尝试对服务进行HTTP呼叫。您可以使用浏览器curl,例如:
curl localhost:8080/hello
注意日志中的输出。
以上源代码: Kafka and Spring Boot Example.
解释
Kafka对讯息的key进行杂凑化(key是一个简单的字串识别符号),并根据key将讯息放入不同的分割槽。每个使用者在其分配的分割槽中获取讯息,并使用其反序列化器将其转换为Java物件。请记住,我们的生产者总是传送JSON值。
正如您在日志中看到的,每个反序列化器都设法完成其任务,因此String消费者打印原始JSON讯息,字节阵列消费者显示JSON字串的字节表示,JSON反序列化器使用Java型别对映器进行转换到原来的类PracticalAdvice。您可以检视记录的ConsumerRecord,您将看到标题,指定的分割槽,偏移量等。
这就是你如何使用Spring Boot和Kafka传送和接收JSON讯息。我希望您发现本指南很有用,下面您有一些程式码变体,以便您可以更多地了解Kafka的工作原理。
多次请求/hello
发出一些请求,然后检视讯息如何跨分割槽分布。具有相同金钥的Kafka讯息始终放在相同的分割槽中。当您希望确保指定使用者、程序或正在处理的任何逻辑的所有讯息都由同一消费者以与生成时相同的顺序接收时,此功能非常有用(在事件溯源EventSourcing时实现事件顺序,从而实现事务一致性很需要),这里就不考虑负载平衡了。
减少分割槽数量
首先,确保重新启动Kafka,这样您就可以放弃以前的配置。
然后,在应用程序中重新定义主题,使其只有2个分割槽:
@Bean
public NewTopic adviceTopic() {
return new NewTopic(topicName, 2, (short) 1);
}
现在,再次执行应用程序并向/hello 端点发出请求。
结果:其中一个消费者没有收到任何讯息。这是预期的行为,因为同一消费者组中没有可用的分割槽(我们只设置了2个分割槽)。
更改一个消费者的组识别符号
保留上一个案例的更改,该主题现在只有2个分割槽。我们现在正在改变我们的一个消费者的群组ID。
@KafkaListener(topics = "advice-topic", clientIdPrefix = "bytearray",
containerFactory = "kafkaListenerByteArrayContainerFactory",
groupId = "tpd-loggers-2")
public void listenAsByteArray(ConsumerRecord cr,
@Payload byte[] payload) {
logger.info("Logger 3 [ByteArray] received a payload with size {}", payload.length);
latch.countDown();
请注意,我们还更改了记录的讯息。现在,这个消费者负责打印payload有效载荷的大小。此外,我们需要更改CountDownLatch 它,因此它需要两倍的讯息数。
latch = new CountDownLatch(messagesPerRequest * 2);
为什么?这一次,让我们解释在执行应用程序之前会发生什么。正如我在本文开头所描述的那样,当消费者属于同一个消费者群体时,他们(在概念上)正在处理同一个任务。我们正在实现一种负载均衡机制,其中并发工作程式从会不同分割槽获取讯息,处理的讯息是彼此隔离的。
在这个例子中,我还改变了最后一个消费者的“任务”,以便更好地理解这一点:它打印的是不同的东西。由于我们更改了组ID,因此该消费者将独立工作,Kafka将为其分配两个分割槽。字节阵列消费者将接收所有讯息,与其他两个讯息分开工作。
感谢您的观看,喜欢的小伙伴可以点个赞!!!专注Java、大资料知识干货及相关领域动态分享,请多多关注哦!





























