如果你的应用使用了Apache Kafka,你需要把它和Spring Cloud进行整合。需要在应用中添加如下依赖。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
然后就是Spring Cloud Stream的标准配置了。需要在@Configuration
类上使用@EnableBinding
声明需要应用的Binding。
@EnableBinding({Source.class, Sink.class})
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
上面代码定义需要使用的Binding是Source和Sink接口中声明的input和output两个Binding。然后可以在application.properties文件中声明这两个Binding对应的destination,它们对应于kafka的Topic。如果指定的Topic还未创建,默认会自动进行创建。
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.group=test-group
如果你的Kafka服务器不是本机或者监听端口不是默认的9092,则还需要通过spring.cloud.stream.kafka.binder.brokers
指定Kafka的服务地址。
spring.cloud.stream.kafka.binder.brokers=localhost:9092
之后就是照常的使用Spring Cloud Stream的相关API进行操作了。如下是发送消息的示例。
@Component
@Slf4j
public class SourceProducer {
@Autowired
private Source source;
public void sendMessages(String msg) {
Message<String> message = MessageBuilder.withPayload(msg).build();
log.info("发送了一条消息-{}", msg);
this.source.output().send(message);
}
}
如下是监听消息的示例。
@Component
@Slf4j
public class SinkConsumer {
@StreamListener(Sink.INPUT)
public void inputConsumer(Message<String> message) {
String payload = message.getPayload();
MessageHeaders headers = message.getHeaders();
log.info("从Binding-{}收到信息-{}, headers:{}", Sink.INPUT, payload, headers);
}
}
由于笔者的上一篇文章——Spring Cloud Stream基于RocketMQ的实现已经介绍了Spring Cloud Stream的一些规范,这里就不再赘述了。
从Kafka服务,也就是从Spring Cloud Stream的Binder的角度来讲可以定义的参数可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties
。比较核心的参数有:
- spring.cloud.stream.kafka.binder.brokers:用来指定Kafka服务的地址,可以是host,也可以是host:port格式,如:
spring.cloud.stream.kafka.binder.brokers=localhost,10.10.10.1:9092
。默认是localhost。 - spring.cloud.stream.kafka.binder.defaultBrokerPort:Kafka服务的完整地址应该是host+port,当brokers只定义了host时,将默认取该属性定义的port作为Kafka服务的port,默认是9092。
- spring.cloud.stream.kafka.binder.autoCreateTopics:指定Topic不存在时是否需要自动创建,默认是true。
Spring Cloud Stream有多种不同的实现,比如RocketMQ/Kafka/RabbitMQ。不同的实现者在Producer和Consumer上也可能是有些差别的,或者是有些特性的。整合Spring Cloud Stream后这些特性的属性也是可以进行配置的。可以通过spring.cloud.stream.kafka.bindings.xxx.consumer.yyy
指定名为xxx的这个Consumer角色的yyy属性。可以通过spring.cloud.stream.kafka.bindings.xxx.producer.yyy
指定名为xxx的Producer的yyy属性。Kafka实现的Producer的特性属性配置可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaProducerProperties
,Consumer的特性属性配置可以参考org.springframework.cloud.stream.binder.kafka.properties.KafkaConsumerProperties
。Consumer特性的参数主要有:
- autoRebalanceEnabled:默认为true。当设置为true时,会对分区进行负载均衡,有Consumer加入或退出时会对Topic的分区重新分配。设置为false时,每个Consumer分配的Topic分区是固定的,不会变。
- autoCommitOffset:默认为true。当设置为true时表示消息处理完后会自动提交offset;如果设置为false则会在消息的header中添加一个key为kafka_acknowledgment,value为
org.springframework.kafka.support.Acknowledgment
类型的对象的header,消费者可以在处理消息后从header中获取该对象进行手动响应消息的处理情况。 - startOffset:指定新的消费者组加入的时候起始的消费位置,可选值有earliest和latest。默认是null,相当于earliest。