728x90
yml 자동설정을 활용하지 않은 방법
Producer
@Configuration
public class KafkaTemplateConfig {
@Bean(name = "kafkaTemplateCustom")
public KafkaTemplate<String , String > kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
private ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerProps());
}
private Map<String, Object> producerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:19092,127.0.0.1:29092,127.0.0.1:39092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "all");
return props;
}
}
@Slf4j
@Service
@RequiredArgsConstructor
public class ProducerService {
private final TopicNames topicNames;
private final KafkaTemplate<String, String> kafkaTemplateCustom;
public void sendMessageToKafka(String message) {
MessageDateDto nowMessageDateDto = MessageDateDto.createNowMessageDateDto(message);
String data = nowMessageDateDto.transToJson();
kafkaTemplateCustom.send(topicNames.getKafka(), data)
.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@Override
public void onFailure(Throwable ex) {
log.error(ex.getMessage(), ex);
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info(result.toString());
}
});
}
}
Consumer
@Slf4j
@Configuration
public class KafkaListenerContainerConfig {
@Bean(name = "kafkaListenerCustomContainerFactory")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String , String > factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(StringConsumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
factory.setCommonErrorHandler(new DefaultErrorHandler(
(consumerRecord, e) ->
log.error("Failed Topic : {}, Value {}",
consumerRecord.topic(), consumerRecord.value()),
new FixedBackOff(1_000L, 3)));
return factory;
}
private ConsumerFactory<String, String> StringConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
props()
);
}
private Map<String ,Object> props() {
Map<String, Object> props = new HashMap<>();
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:19092,127.0.0.1:29092,127.0.0.1:39092");
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return props;
}
}
@Service
@RequiredArgsConstructor
public class ConsumerService {
private final MessageService messageService;
@KafkaListener(id = "from-message-slack", topics = "${kafka.topic.kafka}" , containerFactory = "kafkaListenerCustomContainerFactory")
public void listen1(String message) {
System.out.println("message = " + message);
messageService.saveMessage(message);
}
private final SlackService slackService;
@KafkaListener(id = "from-slack-topic", topics = "${kafka.topic.slack}", containerFactory = "kafkaListenerCustomContainerFactory")
public void listen2(String message) {
slackService.sendMessageToSlack(message);
}
}
728x90
'Kafka' 카테고리의 다른 글
Apache Kafka 실습 (0) | 2022.03.27 |
---|---|
Apache Kafka 옵션 (0) | 2022.03.27 |
Apache Kafka 개념 (0) | 2022.03.26 |