일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- 스파크
- AWS
- delta lake
- 데이터 웨어하우스
- 에어플로우
- 데이터 엔지니어
- s3
- Data Engineer
- 카프카 구축
- MySQL
- Data engineering
- spark streaming
- 카프카
- docker
- kafka
- 데이터
- 대용량 처리
- 컬럼 기반
- 스파크 스트리밍
- 데이터 엔지니어링
- 레드시프트
- spark
- 델타레이크
- Parquet
- airflow
- kafka rest api
- Schema Registry
- Data Warehouse
- Zookeeper
- Redshift
- Today
- Total
데이터 엔지니어 기술 블로그
[🧙Kafka] 카프카 구축 (7) - 카프카 프로듀서 파이썬에서 구현하기 본문
About
카프카는 Java를 제공하지만 go, python 등에서 서드파티에서 사용할 수 있도록 해주기도 한다. 이번에는 파이썬 애플리케이션에서 카프카로 보내는 방법과 추가 구성들을 알아보려고 한다.
Example
bootstrap.servers
최초 연결을 위한 브로커 서버 목록을 설정한다. 모든 브로커를 포함할 필요는 없지만 여러개로 설정해두어야 부트스트랩 서버 중 하나가 문제가 생겼을 때 다음 서버로 시도할 수 있다.
key.serializer
메시지의 키를 직렬화할 직렬처리기의 이름을 여기에 설정한다. 직렬처리기는 객체를 Byte Array로 변환해준다. ByteArraySerializer, StringSerializer, IntegerSerializer라는 직렬처리기가 존재하고 사용자가 직접 만들수도 있다.
value.serializer
메시지의 값을 직렬화할 때 사용한다. 위의 key.serializer와 방법은 동일하다.
위의 속성들을 사용한 간단한 예시
python3 -m pip install kafka-python
import random
import traceback
import json
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers=['pipeline-kafka-1', 'pipeline-kafka-2', 'pipeline-kafka-5'],
client_id='sample_producer',
key_serializer=None,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
record = []
for i in range(10):
record.append({
'number': random.randint(0, 1000)
})
try:
response = producer.send(topic='sample', value=record).get()
except:
traceback.print_exc()
print(response)
1. 부트스트랩 서버로 카프카 1, 2, 3번 브로커를 사용했다.
2. value_serializer로 배열을 json dumps로 처리한 후 보낼 수 있도록 했다.
3. 결과는 다음과 같다.
전송 방법
프로듀서가 카프카에 전송할 때 3가지 방식으로 전송할 수 있다.
Fire-and-forget
send 메시지로 전송만 하고 성공 및 실패 여부는 신경쓰지 않는 방법이다. 메시지가 유실될 수 있다.
producer.send(topic='sample', value=record)
Synchronous send
메시지를 전송하면 Futuer 객체가 반환되는데 이 객체에 있는 get() 함수를 사용하면 기다렸다가 어떻게 처리가 되었는지 확인할 수 있다.
response = producer.send(topic='sample', value=record).get()
Asynchronous send
send 함수를 호출할 때 처리 완료 후 어떤 동작을 할 것인지 정하는 callback() 메서드를 구현하는 방식이다. 프로세스는 요청만 보내고 계속 진행을 하면서 이후에 응답이 오면 따로 처리할 수 있다.
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
def on_send_error(excp):
print(excp)
response = producer.send(topic='sample', value=record).add_callback(on_send_success).add_errback(on_send_error)
프로듀서 매개변수
위의 세가지 매개변수 외에도 다른 구성을 할 수 있는 것들이 많다.
매개변수 이름(kafka-python KafkaProducer.send()의 매개변수 이름) 형식으로 작성했다.
acks(acks)
acks=0
- 브로커의 응답을 기다리지 않기 때문에 굉장히 빠르다. 매우 높은 처리량이 필요할 때 사용된다.
acks=1
- 리더 리플리카가 메시지를 받으면 수신했다는 응답을 받는다.
- 만약 리더가 문제가 생겼다면 클라이언트에서 다시 처리할 수 있다.
- 만약 리더가 중단되고 아직 복제하지 못한 경우 메시지가 유실될 수도 있다.
acks=2
- 동기화된 모든 리플리카가 메시지를 받으면 성공 응답을 받는다. 가장 안전한 형태이지만 오래 걸린다.
buffer.memory(buffer_memory)
전송될 메시지의 버퍼로 사용할 메모리의 양을 지정할 수 있다.
만약 서버로 전송되는 것보다 메시지가 더 많이 생긴다면 버퍼 메모리가 부족한 문제가 발생할 수 있다.
compression.type(compression_type)
이 매개변수를 사용하면 메시지를 압축해서 제공할 수 있다.
snappy, gzip, lz4 중 하나로 설정할 수 있다.
네트워크 사용량과 CPU 사용량 등을 보면서 네트워크 사용량이 너무 많다면 이 값을 사용해서 CPU 사용량을 늘리고 네트워크 사용량을 줄이는 방법을 생각할 수 있다.
retries(retries)
만약 메시지를 보냈을 때 실패하면 재시도를 얼마나 할 지 정할 수 있다.
batch.size(batch_size)
파티션에서 사용하는 배치(batch)의 사이즈를 조절할 수 있다.
batch는 한꺼번에 전송할 수 있도록 모아두는 곳인데, 다 차지 않더라도 시간이 되면 전송을 한다.
이 값이 너무 작다면 너무 네트워크에 부담이 갈 수 있다.
linger.ms(linger_ms)
배치를 전송할 때까지 기다리는 시간을 나타낸다.
client.id(client_id)
어떤 클라이언트에서 전송된 메시지를 식별하기 위해 사용된다.
kafka-python에서는 kafka-python-producer-#(# = unique number)가 기본값으로 설정되어 있다.
max.in.flight.requests.per.connection(max_in_flight_requests_per_connection)
이 값을 높이면 메모리 사용량은 증가하지만 처리량이 좋아진다.
실패시 메시지의 순서가 뒤바뀔 수 있다. 이 값을 1로 둔다면 메시지의 전송 순서대로 브로커가 쓰게 된다.
기본값은 5이다.
request.timeout.ms(request_timeout_ms)
클라이언트에서 요청할 때의 타임아웃을 정할 수 있다.
max.block.ms(max_block_ms)
전송 버퍼가 가득차는 등의 문제가 발생했을 때 프로듀서가 이 시간동안 일시 중단되는데 이 시간 내에 해결이 되어야 한다.
max.request.size(max_request_size)
프로듀서가 send()함수로 전송할 수 있는 최대 메시지 크기를 정할 수 있다. 기본값은 1MB이다.
message.max.bytes 값과 같은 값으로 설정하는 것이 좋다. 파이썬 카프카에서는 message_max_bytes 라는 옵션은 없다.
receive.buffer.bytes(receive_buffer_bytes), send.buffer.bytes(send_buffer_bytes)
소켓이 사용하는 TCP 송수신 버퍼의 크기를 나타낸다. 이 값이 -1이면 기본값이 사용된다.
네트워크 처리가 느린 곳에서는 이 값을 늘리는 것이 좋다.
파티션
ProducerRecord 객체는 키의 기본값이 null 이지만 대부분의 애플리케이션에서는 기본값으로 두지 않는다. 왜냐하면 메시지를 구분하는 추가 정보를 가질 수 있고, 파티션을 결정할 수 있기 때문이다.
키가 null일 때는 라운드 로빈 방식으로 균등하게 배분된다. 키가 있다면 해시값을 구한 후 특정 파티션으로 분류한다.
파티셔너는 기본은 Round Robin 방식이지만 커스텀으로 구현할 수도 있다.
'데이터 엔지니어링' 카테고리의 다른 글
[🔥Spark] 스파크로 머신러닝 시작하기 (0) | 2021.04.10 |
---|---|
[🧙Kafka] 카프카 구축 (8) - Serializer: JSON vs Avro vs Thrift 비교 (0) | 2021.04.09 |
[🧙Kafka] 카프카 구축 (6) - 프로듀서로 카프카에 메시지 쓰기 (0) | 2021.04.08 |
[🧙Kafka] 카프카 구축 (5) - 카프카 UI 간단하게 구성하기 (0) | 2021.04.08 |
[🧙Kafka] 카프카 구축 (4) - 카프카 설정 구성하기 (1) | 2021.04.07 |