데이터 엔지니어 기술 블로그

[🧙Kafka] 카프카 구축 (6) - 프로듀서로 카프카에 메시지 쓰기 본문

데이터 엔지니어링

[🧙Kafka] 카프카 구축 (6) - 프로듀서로 카프카에 메시지 쓰기

jun_yeong_park 2021. 4. 8. 12:42
반응형

About

카프카는 클라이언트 API를 가지고 있는데 이것을 사용해서 프로듀서와 컨슈머 어플리케이션을 개발할 수 있다. 카프카는 서드파티 클라이언트를 사용할 수 있는데 python, C++, go 등에서도 사용할 수 있다.

카프카는 처리량이 많은 작업에 사용될 수 있다. 예를 들면 웹사이트 클릭 로그를 수집하거나 신용카드를 사용한 거래 등에서 사용될 수 있다.

 

프로듀서에서 카프카 클러스터까지의 처리 방식

카프카 프로듀서 메시지 처리 방식

  1. 메시지 클래스인 ProducerRecord의 객체를 생성한다. 토픽과 값을 입력하는 것이 기본이고 파티션과 키는 옵션으로 지정할 수 있다.
  2. send() 함수를 이용해 Serializer에서 네트워크로 전송할 수 있는 Byte Array로 변환한다.
  3. 그 후 Partitioner에서 메시지를 어떤 파티션으로 보낼지 판단한다. 만약 ProducerRecord 객체를 만들 때 Partition과 Key를 지정했다면 생략된다.
  4. 그 후 전송할 레코드들을 모은 Batch에 추가한다. 
  5. Batch에 모아진 값을 다른 스레드에서 카프카 클러스터에 전송을 한다.
  6. 카프카 브로커가 성공여부와 메타데이터를 반환하고, 프로듀서는 RecordMetadata 객체로 결과를 받을 수 있다.
  7. 프로듀서는 처리된 결과를 보고 재전송을 시도하거나 계속 진행하는 등의 로직을 처리하면 된다.

 

Example

Example

1. 크롤러는 카프카 프로듀서로 적당하진 않지만 실시간으로 들어오는 데이터는 따로 없기 때문에 게시글을 크롤링하고, 실시간으로 카프카로 보내려고 한다.

2. Producer에서 아래와 같은 json 형태로 메시지를 카프카로 보낸다.

{
  'key': <key>,
  'source': <source>,
  'text': <text>
}
headers = {
	'Content-Type': 'application/vnd.kafka.json.v2+json',
}

data = {
	'records': records
}


response = requests.post('http://localhost:8082/topics/titles', headers=headers, data=json.dumps(data))

실제 실행된 크롤러의 로그메시지

3. [다음] 받은 메시지를 처리해주는 Consumer를 작성한다. 여기에서 konlp의 명사 추출 기능을 사용하여 키워드를 가져온다.

4. [다음] 데이터 시각화 툴로 처리된 데이터를 보낸다.

5. [다음] 데이터 시각화 툴에서 차트로 시각화해준다.

6. 완료

 

 

 

반응형
Comments