본문 바로가기

Kafka

Apache Kafka java Custom Producer, Consumer

728x90

yml 자동설정을 활용하지 않은 방법

Producer
Consumer

Producer

@Configuration
public class KafkaTemplateConfig {

    @Bean(name = "kafkaTemplateCustom")
    public KafkaTemplate<String , String > kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    private ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerProps());
    }

    private Map<String, Object> producerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:19092,127.0.0.1:29092,127.0.0.1:39092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        return props;
    }
}

 

 

 

@Slf4j
@Service
@RequiredArgsConstructor
public class ProducerService {

    private final TopicNames topicNames;
    private final KafkaTemplate<String, String> kafkaTemplateCustom;

    public void sendMessageToKafka(String message) {
        MessageDateDto nowMessageDateDto = MessageDateDto.createNowMessageDateDto(message);
        String data = nowMessageDateDto.transToJson();

        kafkaTemplateCustom.send(topicNames.getKafka(), data)
                .addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
                    @Override
                    public void onFailure(Throwable ex) {
                        log.error(ex.getMessage(), ex);
                    }

                    @Override
                    public void onSuccess(SendResult<String, String> result) {
                        log.info(result.toString());
                    }
                });
    }
}

Consumer

@Slf4j
@Configuration
public class KafkaListenerContainerConfig {

    @Bean(name = "kafkaListenerCustomContainerFactory")
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String , String > factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(StringConsumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        factory.setCommonErrorHandler(new DefaultErrorHandler(
                (consumerRecord, e) ->
                log.error("Failed Topic : {}, Value {}", 
                                        consumerRecord.topic(), consumerRecord.value()), 
                new FixedBackOff(1_000L, 3)));


        return factory;
    }

    private ConsumerFactory<String, String> StringConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(
                props()
        );
    }

    private Map<String ,Object> props() {
        Map<String, Object> props = new HashMap<>();
        props.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:19092,127.0.0.1:29092,127.0.0.1:39092");
        props.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return props;
    }

}

 

 

 

@Service
@RequiredArgsConstructor
public class ConsumerService {

    private final MessageService messageService;

    @KafkaListener(id = "from-message-slack", topics = "${kafka.topic.kafka}" , containerFactory = "kafkaListenerCustomContainerFactory")
    public void listen1(String message) {
        System.out.println("message = " + message);
        messageService.saveMessage(message);
    }

    private final SlackService slackService;

    @KafkaListener(id = "from-slack-topic", topics = "${kafka.topic.slack}", containerFactory = "kafkaListenerCustomContainerFactory")
    public void listen2(String message) {
        slackService.sendMessageToSlack(message);
    }
}
728x90

'Kafka' 카테고리의 다른 글

Apache Kafka 실습  (0) 2022.03.27
Apache Kafka 옵션  (0) 2022.03.27
Apache Kafka 개념  (0) 2022.03.26