데이터 엔지니어 기술 블로그

[🧙Kafka] 카프카 구축 (3) - 카프카 클러스터 REST API로 사용하기 본문

데이터 엔지니어링

[🧙Kafka] 카프카 구축 (3) - 카프카 클러스터 REST API로 사용하기

jun_yeong_park 2021. 4. 7. 01:39
반응형

About

카프카 구축 (1)카프카 구축 (2) 에서 구축했던 카프카 클러스터는 각자 다른 host를 가지고 있기 때문에 프로듀서가 카프카로 요청을 보낼 때 어디로 보내야 할 지 확실하지가 않다. 프로듀서에서 카프카 클러스터 중 하나의 브로커로 보낸다고 했을 때 그 브로커가 어떠한 문제로 종료되면 요청을 보낼수가 없다. 그래서 하나의 엔드포인트가 필요하다. 엔드포인트에 요청을 보내면 문제가 생긴 인스턴스가 아닌 곳으로 요청을 보내 처리할 수 있도록 고가용성을 확보할 수 있다.

직접 REST API를 만드려고 했으나 Confluent에서 오픈소스로 지원해주어서 사용하여 구축하려고 한다.

 

Confluent의 기능을 사용할 수 있는 방법은 두가지가 있다. Confluent Platform Confluent Platform using only Confluent Community components 이 있는데 기존에 카프카를 Confluent를 이용하지 않고 구축했기 때문에 두번째 방법으로 사용하려고 한다.

 

Tutorial & Example

Confluent Kafka REST 설치하기

1. 작업 할 폴더를 생성한다.

2. Dockerfile을 만들고 아래의 코드를 입력한다. 

여기에서는 6.1.1 버전으로 사용했으나 이 링크에 들어가면 가장 최신 버전의 링크를 알 수 있으므로 Dockerfile에서 conlfuent-community를 받는 부분을 최신 버전으로 수정해도 된다.

FROM ubuntu:18.04
RUN mkdir -p /root/install
RUN apt-get update

WORKDIR /root/install

ENV DEBIAN_FRONTEND noninteractive
ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64

RUN apt-get install openjdk-8-jdk -y
RUN apt-get install wget -y
RUN apt-get install vim -y

# confluent-community 설치
RUN wget http://packages.confluent.io/archive/6.1/confluent-community-6.1.1.tar.gz
RUN tar -zxvf confluent-community-6.1.1.tar.gz
RUN mv confluent-6.1.1 /usr/local/confluent

# kafka-rest 설정파일 복사
COPY config/kafka-rest.properties /usr/local/confluent/etc/kafka-rest/kafka-rest.properties
RUN sed -i 's/\r//g' /usr/local/confluent/etc/kafka-rest/kafka-rest.properties

# kakfa-rest 실행
CMD /usr/local/confluent/bin/kafka-rest-start /usr/local/confluent/etc/kafka-rest/kafka-rest.properties

3. config 폴더를 생성한다.

4. config 폴더 안에 kafka-rest.properties 파일을 생성하고 아래의 속성을 입력한다. 기존에 생성했던 zookeeper와 kafka에 대한 정보들이 적혀있다.

id=default
# schema.registry.url=http://localhost:8081
zookeeper.connect=pipeline-zookeeper-a:2181,pipeline-zookeeper-b:2181,pipeline-zookeeper-c:2181
bootstrap.servers=PLAINTEXT://pipeline-kafka-1:9092,PLAINTEXT://pipeline-kafka-2:9092,PLAINTEXT://pipeline-kafka-3:9092,PLAINTEXT://pipeline-kafka-4:9092,PLAINTEXT://pipeline-kafka-5:9092

5. 적당한 곳에 docker-compose.yml 파일을 생성하고 아래의 설정을 입력한다. 8082포트는 kafka-rest가 사용하는 포트이다.

version: '3.8'

networks:
  default:
    external:
      name: zoo

services:
  pipeline-confluent-kafka-rest:
    container_name: pipeline-confluent-kafka-rest
    hostname: pipeline-confluent-kafka-rest
    image: pipeline-confluent-kafka-rest
    restart: always
    ports:
      - 8082:8082

6. 아래의 명령어로 도커를 빌드한다.

docker build --tag pipeline-confluent-kafka-rest .

7. 아래의 명령어로 docker-compose를 실행한다.

docker-compose up -d

8. docker ps 명령어로 실행이 잘 되는지 확인한다.

docker ps

 

Confluent Kafka REST 테스트하기

1. 파이썬을 사용하여 요청을 보내보려고 한다. 

2. 토픽 생성 및 보내기

kafka는 기본 설정으로 topic에 메시지를 보내거나 받으려고 할 때 존재하지 않으면 자동으로 생성한다. 이 옵션을 끄는 편이 좋다.

import requests
import json

headers = {
    'Content-Type': 'application/vnd.kafka.json.v2+json',
}

data = '{"records":[{"value":{"id":"probiotics"}}]}'

response = requests.post('http://localhost:8082/topics/topic_a', headers=headers, data=data)

print(response)
print(json.dumps(response.json(), indent=4))

 

Kafka 토픽 생성 및 보내기 요청 결과

3. 생성된 토픽 정보를 확인하려면 아까 생성했든 토픽 이름을 포함하여 이렇게 간단하게 요청을 보내면 된다.

import requests
import json
response = requests.get('http://localhost:8082/topics/topic_a/')
print(response)
print(json.dumps(response.json(), indent=4))

 

토픽 정보 확인하기

4. 프로듀서가 보낸 토픽을 소비하는 곳을 만드려면 consumer를 등록해야 한다. 아래의 요청으로 등록한다.

import requests
import json
headers = {
    'Content-Type': 'application/vnd.kafka.v2+json',
}

data = '{"name": "consumer_a_instance", "format": "json", "auto.offset.reset": "earliest"}'

response = requests.post('http://localhost:8082/consumers/consumer_a', headers=headers, data=data)

print(response)
print(json.dumps(response.json(), indent=4))

Consumer 등록

5. 아까 생성했던 topic_a를 구독하겠다고 요청을 보낸다. 결과값은 빈 값이다.

import requests
import json

headers = {
    'Content-Type': 'application/vnd.kafka.v2+json',
}

data = '{"topics":["topic_a"]}'

response = requests.post('http://localhost:8082/consumers/consumer_a/instances/consumer_a_instance/subscription', headers=headers, data=data)

print(response)
print(response.text)

6. 소비 할 수 있는 메시지가 있는지 확인한다.

import requests
import json

headers = {
    'Accept': 'application/vnd.kafka.json.v2+json',
}

response = requests.get('http://localhost:8082/consumers/consumer_a/instances/consumer_a_instance/records', headers=headers)
print(response)
print(json.dumps(response.json(), indent=4))

소비할 수 있는지 메시지 확인

7. 완료!

 

반응형
Comments