목차
Broker
- Kafka 는 빠른 처리를 위해 Topic 내에 여러 Partition 을 가질 수 있고,
Partition 들을 각각의Broker
에 분산 저장한다. - Kafka Client 와 데이터를 주고받기 위해 사용하는 주체
- 하나의 서버에는 한 개의 카프카
Broker
가 실행된다. - Producer 로부터 데이터를 전달받은
Broker
는 Topic 의 Partition 에 데이터를 저장하고,
Partition 의 데이터를 Segment 파일로 저장하는데, OS의 Page Cache 기능을 활용해 디스크의 입출력 속도를 높인다.
PageCache
- 페이지 캐시는 처리한 데이터를 메인 메모리 영역(RAM)에 저장해서 가지고 있다가, 다시 이 데이터에 대한 접근이 발생하면 disk에서 IO 처리를 하지 않고 메인 메모리 영역의 데이터를 반환하여 처리할 수 있도록 하는 컴포넌트다.
Kafka Cluster
- 데이터를 안전하게 보관하고 처리하기 위해 최소 3대 이상의 브로커 서버를 1개의
Kafka Cluster
로 묶어서 운영 - Partition 중에 Producer 또는 Consumer 와 직접 통신하는 Partition 을 Leader Partition 이라 부르고 Leader Partition 에게서 장애를 대비해 다른 브로커들은 Partition 을 복사해서 저장한다.
이렇게 저장하는 행위를 Replication 이라하며 Leader Partition 에게서 데이터를 가져가는 Partition 을 Follower Partition 이라 부른다. 이 덕분에 총 3개의 브로커가 운영중이라면 Leader Partition 이 있는 브로커에서 장애가나도 정상 동작한다. 이를 위해 브로커를 기본 3 이상을 쓰는 것이 좋다. Partition 의 replication 의 개수 replication.factor 또한 설정 가능하며 최솟값은 1 (복제없음) , 최댓값은 브로커 개수 만큼 설정할 수 있다. - 하나의 브로커로 Leader Partition 이 몰리지 않게
auto.leader.rebalance.enable
=enable(기본값) ,leader.imbalance.check.interval.seconse
=300sec(기본값) ,leader.imbalance.per.broker.percentage
=10 (기본값) 설정이 존재한다.
Controller
- Kafka Cluster 의 다수 브로커 중 한 대가
Controller
의 역할을 한다. Controller
는 다른 브로커들의 상태를 체크하고 브로커가 Cluster 에서 빠지는 경우 해당 브로커에 존재하는 Leader Partition 을 재분배 한다.
Data 삭제
- Consumer 가
Data
를 가져가더라도 Topic 의 데이터는 삭제되지 않는다. - Consumer 나 Producer 가
Data
의 삭제를 요청할 수도 없다. 오직 브로커만이Data
를 삭제할 수 있다. Data
는 Log Segment 에 저장되고log.segement.bytes
(기본값 1GB) 설정에 의해 용량이 차면 파일이 닫힌다. 닫힌 파일은log.retention.bytes
또는log.retention.ms
설정의 값을 넘으면 삭제된다. 닫힌 Segment 파일을 체크하는 간격은log.retention.check.interval.ms
설정에 따른다.
Consumer Offset 저장
- Consumer Group 은 토픽이 특정 Partition 으로부터 data 를 가져가서 처리하고 어느 Record까지 가져갔는지를 확인하기위해 Internal Topic 인
__consumer_offsets
Topic 에 Offset 을 commit 한다.
코디네이터 ( coordinator )
- Kafka Cluster 내의 하나의 Broker 는
coordinator
역할을 한다. - Consumer Group 의 상태를 체크하고 Partition 을 Consumer 와 매칭되도록 하는데, 요청을 보내는 최초 Consumer 인 Leader Consumer 와 상호작용하면서 분배하는 역할을 한다.
- Consumer 가 장애가 나서 매칭에서 빠지게 되면 정상 동작하는 Consumer 로 다시 할당하는 rebalance 역할을 한다.
Zookeeper
Zookeeper
는 Kafka 의 METADATA 를 관리하는 데에 사용된다. 쉽게말해 Broker 들의 목록과 설정을 유지시키고 전달해주는 역할을 한다.- 분산형 Configuration 정보 유지 구조를 가지고 있기에 Zookeeper 또한 Leader 와 Follower 가 있다. 또한 Zookeeper 는 Quorum(정족수:과반수) 기반 알고리즘으로 동작하기에, 예를들어 3대의 Zookeeper 서버중 1대가 장애가나도 2대의 과반수 를 충족하기에 정상동작한다. 때문에 Zookeeper 는 반드시 홀수기반으로 구성해야한다.
- Kafka Cluster 로 묶인 Broker 들은 동일한 경로의
Zookeeper
경로로 선언해야 같은 Kafka Broker 묶음이 된다. - 한개의
Zookeeper
에 다수의 Kafka Cluster 를 연결해서 사용할 수도 있다.
znode
znode
란 Zookeeper 에서 사용하는 data 저장 단위이다.- Zookeeper 에서 다수의 Kafka Cluster 를 사용하는 방법으로, 2개 이상의 카프카 클러스터를 구축할 때는 root
znode
(최상위 znode) 가 아닌 한 단계 아래의znode
를 Kafka Broker 옵션으로 지정하도록 한다.
ex) 파이프라인용 카프카 클러스터 : zookeeper.connect=localhost:2181/pipeline
실시간 추천용 카프카 클러스터 : zookeeper.connect=localhost:2181/recommend
Topic
Topic
은 kafka 의 시작과 끝이다. Topic 을 삭제하면 data는 삭제되고 파이프라인은 중단된다.Topic
은 Kafka 에서 data 를 구분하기 위해 사용하는 단위이다.(논리적인개념)Topic
은 1개 이상의 Partition 을 소유하고 있다.- Kafka 는
Topic
이름 변경을 지원하지 않으므로, 이름을 변경하기 위해서는 삭제후 생성하는 방법밖에 없다.
Topic 이름 제약 조건
- Topic 이름의 길이는 249자 미만으로 생성되어야 한다.
- Topic 이름은 영어 대소문자와 숫자 0부터 9 그리고 마침표(.) , 언더바(_) , 하이픈(-) 조합으로 생성할 수 있다.
- Kafka 내부 logic 관리 목적으로 사용되는 2개의 Topic (__consumer_offsets , __transaction_state)과 동일한 이름으로 생성 불가능하다.
- Topic 이름에 마침표(.) 와 언더바(_) 가 동시에 들어가면 안된다. 생성은 가능하나 WARNING 메시지가 발생한다.
- Topic 이름은 같으나 마침표와 언더바만 다른 경우에는 신규 Topic 을 생성할 수 없다.
ex) to.pic 이 존재할 경우 to_pic 을 만들 수 없다.
적절한 Topic 작명의 예시
- <환경>.<팀명>.<애플리케이션명>.<메시지타입>
ex) prd.marketing-team.sms-platform.json - <프로젝트명>.<서비스명>.<환경>.<이벤트명>
ex) commerce.payment.prd.notification - <환경>.<서비스명>.<관리번호>.<메시지-타입>
ex) dev.email-sender.jira-1234.email-vo-custom - <카프카클러스터명>.<환경>.<서비스명>.<메시지타입>
ex) aws-kafka.live.marketing-platform.json
Partition
Partition
에는 Producer 가 보낸 data 들이 들어가 저장되는데 이 data 를 Record 가고 부른다.Partition
은 Kafka 의 병렬처리의 핵심으로 Conusmer 들이 Record 를 병렬로 처리 할 수 있도록 매칭된다.Partition
은 자료구조 Queue 와 비슷하지만 Queue 는 data 를 가져가면 삭제하지만, Kafka 에서는 삭제하지 않는다.Partition
의 Record 는 Consumer 가 가져가는 것과 별개로 관리된다. 이러한 특징덕에 다양한 목적을 가진 여러 Consumer Group 들이 Topic 의 data 를 여러 번 가져갈 수 있다.
Record
Record
는 Timestamp , Message Key , Message Value , Offset , Header 로 구성되어 있다.- Producer 가 생성한
Record
가 Broker 로 전송되면 Offset 과 Timestamp 가 지정되어 저장된다. - Broker 에 한번 적재된
Record
는 수정할 수 없고, log.retention.ms 또는 log.retention.bytes 에 따라서만 삭제된다. - Message Key 가 존재할 경우 Producer 가
Record
를 전송 할 때 Message Key 의 해시값을 토대로 Partition 을 할당한다. 즉 동일한 Message Key 의 경우 동일 Partition 에 들어간다. - Message Value 는 실제 처리할 data 이다.
- Message Key 와 Value 는 직렬화되어 Broker 로 전송되기에 Consumer 또한 역직렬화를 수행해야한다. 이때 반드시 동일한 형태로 직렬화, 역직렬화 해야 한다.
- Offset 은 0 이상의 숫자로 이루어져 있다.
Record
의 Offset 은 직접 지정할 수 없고, Broker 에 저장될 때 이전에 전송된Record
의 Offset + 1 로 생성된다. 생성된 Offset 은 Kafka Consumer 가 data 를 가져갈 때 사용된다. - Header 는
Record
의 추가적인 정보를 담는 METADATA 저장소 용도로 사용한다. Header 는 key/value 형태로 data를 추가하여Record
의 속성(Schema 버전 등)을 저장하여 Consumer 에서 참조할 수 있다.
Producer
Producer
는 Kafka Broker 로 데이터를 전송할 때 내부적으로 파티셔너 , 배치 생성 단계를 거친다.- KafkaProducer 인스턴스가 send() 를 호출하게되면 ProducerRecord 는 파티셔너에서 토픽의 어느 파티션으로 전송될 것인지 정해진다. 파티셔너에 의해 구분된 레코드는 데이터를 전송하기 전에 Accumulator 에 데이터를 버퍼로 쌓아놓고 발송한다. 버퍼로 쌓인 데이터는 배치로 묶어서 전송함으로써 카프카의 프로듀서 처리량을 향상시키는 구조를 갖고 있다.
- Kafka 는 전송데이터의 Key 가 있을경우 메시지 키의 해시값과 파티션을 매칭하여 데이터를 전송하지만 Key 가 없을경우 파티셔너에따라 전송방식이 달라진다.
- Kafka 2.4.0 이전에는 RoundRobinPartitioner 가 기본 방식으로 들어오는 대로 파티션을 순회하면서 전송하기 때문에 배치로 묶이는 빈도가 적었다. 이후에는 UniformStickyPartitioner 가 기본 방식으로 되면서 Accumulator 가 데이터가 배치로 모두 묶일 때까지 기다렸다가 배치로 묶인 데이터는 모두 동일한 파티션에 정송함으로써 훨씬 향상된 성능을 가지게 되었다.
Producer 옵션
- 필수 옵션 :
bootstrap.servers : 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름, 포트를 1개 이상을 작성
한다. 2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접속하는 데에 이슈가 없도록 설정 가능하다.
key.serializer : 레코드의 메시지 키를 직렬화하는 클래스를 지정한다.
value.serializer : 레코드의 메시지 값을 직렬화하는 클래스를 지정한다.
- 그 외 선택 옵션 :
acks : 프로듀서가 전송한 데이터가 브로커들에게 정상적으로 저장되었는지 확인하는데 사용되는 옵션, 기본값은 1
buffer.memory : 브로커로 전송할 데이터를 배치로 모으기 위해 설정할 버퍼 메모리양을 지정, 기본값은 33554432(32MB)
retries :프로듀서가 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수, 기본값은 2147483647
batch.size : 배치로 전송할 레코드 최대 용량을 지정, 기본값은 16384
linger.ms : 배치를 전송하기 전까지 기다리는 최소시간, 기본값은 0
partitioner.class : 기본값은 org.apache.kafka.clients.producer.internals.DefaultPartitioner
enable.idempotence : 멱등성 프로듀서로 동작할지 여부 설정(이후 추가적인 설정 필요), 기본값은 false
transactional.id : 프로듀서가 레코드를 전송할 때 레코드를 트랜젝션 단위로 묶을지 여부,기본값은 null
Consumer
Consumer
는 Kafk Broker 에서 적재된 데이터를 사용하기 위해 브로커로부터 데이터를 가져와서 필요한 처리를 한다.- 컨슈머는 카프카 브로커로부터 데이터를 어디까지 가져갔는지 내부토픽(__consumer_offsets) 에 Commit 을 통해 기록한다.
- 커밋의 기본설정은 poll()메서드가 수행될 때 일정 간격마다 오프셋을 커밋하도록 enable.auto.commit=true 로 설정되어 있다. 이 옵션은 auto.commit.interval.ms 값에 의해 ms 에 설정된 값이 지났을 때 그 시점까지 읽은 레코드의 오프셋을 커밋한다. 그러나 이런 자동 커밋은 poll() 메서드 호출 이후에 리밸런싱(컨슈머들의 파티션 재할당) 또는 컨슈머 강제종료 발생 시 컨슈머가 처리하는 데이터가 중복 또는 유실될 수 있기 떄문에, 데이터 중복이나 유실을 허용하지 않는 서비스이면 자동 커밋을 허용해서는 안 된다.
Consumer 옵션 :
- 필수 옵션 :
bootstrap.servers : 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름, 포트를 1개 이상을 작성
한다. 2개 이상 브로커 정보를 입력하여 일부 브로커에 이슈가 발생하더라도 접속하는 데에 이슈가 없도록 설정 가능하다.
key.deserializer : 레코드의 메시지 키를 직렬화하는 클래스를 지정한다.
value.deserializer : 레코드의 메시지 값을 직렬화하는 클래스를 지정한다.
- 선택 옵션 :
group.id : 컨슈머 그룹 아이디를 지정한다(같은 그룹 아이디를 가진 컨슈머들은 그룹을 이룬다.). 기본값은 null이다.
auto.offset.reset : 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션, 기본값은 latest 이다.
enable.auto.commit : 자동 커밋으로 할지 수동 커밋으로 할지 선택, 기본값은 true 이다.
auto.commit.interval.ms : 자동 커밋일 경우 오프셋 커밋 간격을 설정, 기본값은 5000(5초) 이다.
max.poll.records : poll() 메서드를 통해 반환되는 레코드 개수를 지정, 기본값은 500 이다.
sesstion.timeout.ms : 컨슈머가 브로커와 연결이 끊기는 최대 시간, 이 시간내에 하트비트를 전송하지 않으면 브로커는 컨슈머에 이슈가 발생했다고 가정하고 리밸런싱을 시작한다. 보통 하트비트 시가의 3배로 설정한다. 기본값은 10000(10초) 이다.
heartbeat.internal.ms : 하트비트를 전송하는 시간 간격이다. 기본값은 3000(3초) 이다.
max.poll.interval.ms : poll() 메서드를 호출하는 간격의 최대 시간을 지정, 이 옵션이 있는 이유는 poll() 메서드 이후에 데이터를 처리하는 데에 시간이 너무 많이 걸리는 경우 비정상으로 판단하고 리밸런싱을 시작하기 때문이다. 기본값은 300000(5분) 이다.
isolation.level : 트랜젝션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용한다. 이 옵션은 read_committed,
read_uncommitted 로 설정할 수 있다. read_committed 로 설정하면 커밋이 완료된 레코드만 읽는다. read_uncommitted 로 설정하면 커밋 여부와 관계없이 파티션에 있는 모든 레코드를 읽는다. 기본값은 read_uncommitted 이다.
참고서 : 아파치 카프카 애플리케이션 프로그래밍 with 자바
Partition 디렉토리에 생성되는 Files Types
- ~.log : 메시지와 metadata 를 저장
- ~.index : 각 메시지의 Offset 을 Log Segment 파일의 Byte 위치에 매핑 (Offset 번호를 가지고 빠르게 찾기위함)
- ~.timeindex : 각 메시지의 tumestamp 를 기반으로 메시지를 검색하는 데 사용 (timestamp 기반으로 빠르게 찾기위함)
- leader-epoch-checkpoint : Leader Epoch(새로운 leader 선출시 기록) 관련 Offset 정보 저장
'Kafka' 카테고리의 다른 글
Apache Kafka java Custom Producer, Consumer (0) | 2022.03.29 |
---|---|
Apache Kafka 실습 (0) | 2022.03.27 |
Apache Kafka 옵션 (0) | 2022.03.27 |