在springboot中集成kafka客户端

使用springboot1.5.6可以非常简单的使用参数完成kafka的客户端集成,而不用再像之前一样写个复杂的config类填参数了。

        <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-parent</artifactId>
                <version>1.5.6.RELEASE</version>
                <type>pom</type>
                <scope>import</scope>
        </dependency>
        <dependency> 
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.2.2.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <version>1.2.2.RELEASE</version>
            <scope>test</scope>
        </dependency>
  • springboot的application.properties 注意本地的话也别写localhost,写成127.0.0.1
kafkaTopic=recom  
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092  
spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092  
spring.kafka.consumer.group-id=recom  
spring.kafka.template.default-topic= my-replicated-topic  
spring.kafka.listener.concurrency= 3  
spring.kafka.producer.retries= 3  
spring.kafka.producer.batch-size= 1000  
spring.kafka.consumer.auto-offset-reset=earliest  
  • 生产者 封成了一个api
@RestController
public class MessageController {  
    @Autowired
    private ServiceUtils serviceUtils;

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    /**
     * 通过本接口可以向消息队列发送消息
     * @return
     */
    @RequestMapping(value = "/api/msg", method = RequestMethod.POST, consumes = Constants.REQUEST_HEAD)
    @NewSpan(name = "sendMsg")
    ResponseEntity sendMsg() {
        String topic = content.getString(Constants.REQUEST_MQ_TOPIC);
        String msg = content.getString(Constants.REQUEST_MQ_MSG);
        if (StringUtils.isEmpty(topic)) {
            return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(new BaseResponse(StatusCode.PARAM_NO_TOPIC_ERROR));
        }
        if (StringUtils.isEmpty(msg)) {
            return ResponseEntity.status(HttpStatus.BAD_REQUEST).body(new BaseResponse(StatusCode.PARAM_NO_MSG_ERROR));
        }
        send(topic, msg);
        Map<String, Object> response = new HashMap<>();
        response.put(Constants.RESULT_STATUSES, StatusCode.NO_ERROR);
        return new ResponseEntity(response, HttpStatus.ACCEPTED);
    }

    public void send(String topic, String msg) {
        kafkaTemplate.send(topic,msg);
        kafkaTemplate.metrics();
        kafkaTemplate.setProducerListener(new ProducerListener<String, String>() {
            @Override
            public void onSuccess(String topic, Integer partition, String key, String value, RecordMetadata recordMetadata) {
            }

            @Override
            public void onError(String topic, Integer partition, String key, String value, Exception exception) {
            }

            @Override
            public boolean isInterestedInSuccess() {
                return false;
            }
        });
    }
}
  • 消费者
import org.apache.logging.log4j.LogManager;  
import org.apache.logging.log4j.Logger;  
import org.springframework.kafka.annotation.KafkaListener;  
import org.springframework.stereotype.Service;

@Service
public class KafkaConsumer {

    private static final Logger logger = LogManager.getLogger(KafkaConsumer.class.getName());

    @KafkaListener(topics = "${kafkaTopic}")
    public void receive(String payload) {
        logger.info("kafka receive:received payload='{}'", payload);
    }
}
comments powered by Disqus