데이터 엔지니어 기술 블로그

[🧙Kafka] 카프카 구축 (9) - 카프카 컨슈머로 카프카에서 데이터 읽기 본문

데이터 엔지니어링

[🧙Kafka] 카프카 구축 (9) - 카프카 컨슈머로 카프카에서 데이터 읽기

jun_yeong_park 2021. 4. 11. 18:52
반응형

About

Apache Kafka

이전에 카프카에서 Producer로 메시지를 보냈는데 메시지를 소비하기 위해 Consumer를 만들어보려고 한다.

 

컨슈머 그룹(Consumer Group)

프로듀서가 생산한 메시지를 하나의 컨슈머가 따라갈 수가 없다면 여러개의 컨슈머를 두어야 한다. 카프카 컨슈머는 컨슈머 그룹에 속한다. 하나의 컨슈머 그룹에서 파티션이 4개일 때 컨슈머가 2개라면 2개씩 나눠가지고, 컨슈머가 4개라면 파티션을 1개씩 나눠가진다. 만약 5개라면 컨슈머 하나는 메시지를 받을 수 없다.

 

컨슈머 그룹 하나에서는 서로 다른 메시지를 수신하여 처리를 한다. 실시간 처리를 하는 곳과 메시지의 데이터를 저장하는 곳을 따로 두고 싶다면 컨슈머 그룹은 두개가 필요하다.

 

리밸런싱(rebalancing)

토픽의 파티션 1개의 소유권을 동시에 여러개의 컨슈머가 받을수는 없다. 만약 작동하던 컨슈머가 문제가 생긴다면 다른 컨슈머가 이를 받아서 처리하려고 한다. 이렇게 파티션의 소유권을 이전하는 과정을 리밸런싱이라고 한다. 리밸런싱은 가용성과 확장성을 높여준다.

 

그룹 조정자(Group Coordinator)

특정 브로커는 그룹 조정자로 지정될 수 있다. 컨슈머가 heartbeat를 그룹 조정자인 브로커에게 전송하면 파티션 소유권을 유지할 수 있다. 컨슈머는 일정 간격으로 heartbeat를 전송한다. 컨슈머가 polling(메시지 가져오기) 또는 commit(오프셋 등록하기) 시에 자동으로 전송된다. 만약 특정 시간동안 heartbeat를 전송하지 못하여 타임아웃이 발생한다면 그룹 조정자 브로커는 전송하지 못한 컨슈머를 버리고 리밸런싱을 시작한다.

 

ConsumerRecord

컨슈머로 데이터를 가져오면 ConsumerRecord가 반환되고 예시는 다음과 같다.

 

ConsumerRecord(topic='titles', 
              partition=0, 
              offset=0, 
              timestamp=1617829961260, 
              timestamp_type=0, 
              key=None, 
              value={'key': 205657150, 'source': 'naver', 'text': '테스트 타이틀'}, 
              checksum=-1575239064, 
              serialized_key_size=-1, 
              serialized_value_size=71)

 

하나의 컨슈머로 특정 데이터의 파티션을 처리하려는 경우 

토픽을 구독하지 않고 파티션을 할당하면 스스로 메시지를 처리할 수 있다. .assign() 함수를 사용하면 된다.

 

 

Commit

파티션을 가지고 있는 컨슈머는 오프셋을 등록하는데 파티션에 __consumer_offsets에 등록한다. 오프셋이란 메시지를 어디까지 읽고 처리했는지 확인하기 위해 북마크를 하는 것이다. 만약 하나의 컨슈머 그룹에서 작동하던 컨슈머 하나가 문제가 생기면 다른 컨슈머가 북마크를 보고 계속 일을 할 수 있다.

 

커밋 방법은 여러가지가 있다.

 

1. 자동 커밋(Auto Commit)

enable.auto.commit=true 로 설정하게 되면 poll() 함수로 받은 오프셋 중 가장 큰 값을 5초에 한 번씩 커밋한다. 가장 간단한 방법이지만 문제가 생기면 두번 이상 처리하는 경우가 생길 수 있다.

 

2. 동기 커밋(Commit Sync)

커밋 메시지를 수동으로 보내는데 성공적으로 커밋이 될 때까지 기다리면서 재시도하기때문에 안전하다. 하지만 문제는 기다리는 동안 폴링 루프를 계속 실행할 수 없기 때문에 메시지를 처리하는데에 오래걸리게 된다.

파이썬에서는 .commit() 함수를 사용한다.

 

3. 비동기 커밋(Commit Async)

동기 커밋의 기다렸다가 폴링 루프를 진행하는 문제를 해결하기 위해서 비동기로 호출한다. 메시지가 처리가 되었을 때 실행 할 콜백 함수를 매개변수로 받을 수 있다.

파이썬에서는 .commit_async() 함수를 사용한다.

 

 

동기 커밋과 비동기 커밋 함께 사용하기

동기 커밋과 비동기 커밋을 같이 사용하는 방법이 있는데, 폴링 루프에 문제가 생겼을 때 마지막에 동기 커밋을 실행하여 성공적으로 커밋을 하는 것이고, 비동기 커밋은 폴링 루프 내에 실행시켜서 빠르게 커밋을 하는 것이다. 

 

Example

컨슈머 인스턴스 생성

- 메시지를 json.dumps()를 한 뒤 bytes array로 변경해서 보내기 때문에 value_deserializer로 데이터를 다시 사용할 수 있는 형태로 변환했다. 

- 커밋을 직접 해줄것이기 때문에 enable_auto_commitFalse로 두었다.

consumer = kafka.KafkaConsumer(('titles'), client_id='title-parser', 
                    group_id='parser',
                    bootstrap_servers=['pipeline-kafka-1', 'pipeline-kafka-2', 'pipeline-kafka-5'],
                    value_deserializer=lambda v: json.loads(v.decode('utf-8')), 
                    auto_offset_reset='latest', 
                    enable_auto_commit=False)

 

 

폴링 루프 생성

try:
    while True:
        # 여기에서 poll을 한다. 

        consumer.commit_async()
        time.sleep(1)
finally:
    consumer.commit()
    consumer.close()

 

폴링 루프 처리 부분 작성

max_records = 10

for _, records in consumer.poll(max_records=max_records).items():
    for record in records:
        
        try:
        	# 여기에서 record를 처리하는 동작을 한다.

        except:
            logging.error(traceback.format_exc())

 

컨슈머 구성

 

fetch.min.bytes

레코드를 가져올 때 데이터의 최소량을 정할 수 있다. 이 값보다 작다면 브로커는 메시지를 보내지 않고 모았다가 충족되면 보낸다.

 

fetch.max.wait.ms

fetch.min.bytes 에서 정한 값이 fetch.max.wait.ms 동안 채워지지 않을 경우 다 충족되지 않아도 보낸다.

 

max.partition.fetch.bytes

파티션당 반환하는 최대 바이트 수를 제어한다. ConsumerRecords 객체는 max.partition.fetch.bytes 보다 크지 않게 된다. 

 

session.timeout.ms

컨슈머와 브로커가 연결되는 세션이며 기본은 10초이다. 컨슈머가 이 시간만큼 하트비트를 전송하지 않으면 연결이 끊겼다고 알게 된다. heartbeat.interval.ms 와 관련이 있다.

 

auto.offset.reset

컨슈머가 읽고 있는 파티션의 오프셋을 조절할 수 있다.

 

enable.auto.commit

특정 시간마다 자동으로 커밋할 수 있게 한다. auto.commit.interval.ms 와 관련이 있다.

 

partition.assignment.strategy

컨슈머 그룹 내의 컨슈머들에게 파티션을 할당하는 방법을 정할 수 있다.

1. Range - T1, T2 토픽이 각 세개의 파티션을 가지고 있을 때 C1 컨슈머가 0, 1을 받고 C2 컨슈머가 2를 받는다.

2. RoundRobin - 컨슈머들에게 순서대로 할당한다.

 

client.id

이 문자열은 카프카 브로커로 전송되며, 카프카 브로커에서는 전송된 client.id로 어디서 구독하고 있는지 알 수 있다.

 

max.poll.records

한 번에 가져올 레코드의 개수를 제어할 수 있다.

 

receive.buffer.bytes, send.buffer.bytes

TCP 송수신 버퍼의 크기를 제어할 수 있다. -1로 설정하면 운영체제의 기본값이 설정된다.

 

 

반응형
Comments