일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- Zookeeper
- 카프카
- 데이터 엔지니어
- spark
- 스파크
- Data Warehouse
- 데이터
- 카프카 구축
- 레드시프트
- 대용량 처리
- kafka
- s3
- kafka rest api
- MySQL
- Parquet
- Data Engineer
- Redshift
- Schema Registry
- delta lake
- AWS
- 컬럼 기반
- 델타레이크
- 에어플로우
- Data engineering
- 스파크 스트리밍
- docker
- airflow
- spark streaming
- 데이터 엔지니어링
- 데이터 웨어하우스
- Today
- Total
데이터 엔지니어 기술 블로그
[🧙Kafka] 카프카 구축 (3) - 카프카 클러스터 REST API로 사용하기 본문
About
카프카 구축 (1) 과 카프카 구축 (2) 에서 구축했던 카프카 클러스터는 각자 다른 host를 가지고 있기 때문에 프로듀서가 카프카로 요청을 보낼 때 어디로 보내야 할 지 확실하지가 않다. 프로듀서에서 카프카 클러스터 중 하나의 브로커로 보낸다고 했을 때 그 브로커가 어떠한 문제로 종료되면 요청을 보낼수가 없다. 그래서 하나의 엔드포인트가 필요하다. 엔드포인트에 요청을 보내면 문제가 생긴 인스턴스가 아닌 곳으로 요청을 보내 처리할 수 있도록 고가용성을 확보할 수 있다.
직접 REST API를 만드려고 했으나 Confluent에서 오픈소스로 지원해주어서 사용하여 구축하려고 한다.
Confluent의 기능을 사용할 수 있는 방법은 두가지가 있다. Confluent Platform 과 Confluent Platform using only Confluent Community components 이 있는데 기존에 카프카를 Confluent를 이용하지 않고 구축했기 때문에 두번째 방법으로 사용하려고 한다.
Tutorial & Example
Confluent Kafka REST 설치하기
1. 작업 할 폴더를 생성한다.
2. Dockerfile을 만들고 아래의 코드를 입력한다.
여기에서는 6.1.1 버전으로 사용했으나 이 링크에 들어가면 가장 최신 버전의 링크를 알 수 있으므로 Dockerfile에서 conlfuent-community를 받는 부분을 최신 버전으로 수정해도 된다.
FROM ubuntu:18.04
RUN mkdir -p /root/install
RUN apt-get update
WORKDIR /root/install
ENV DEBIAN_FRONTEND noninteractive
ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
RUN apt-get install openjdk-8-jdk -y
RUN apt-get install wget -y
RUN apt-get install vim -y
# confluent-community 설치
RUN wget http://packages.confluent.io/archive/6.1/confluent-community-6.1.1.tar.gz
RUN tar -zxvf confluent-community-6.1.1.tar.gz
RUN mv confluent-6.1.1 /usr/local/confluent
# kafka-rest 설정파일 복사
COPY config/kafka-rest.properties /usr/local/confluent/etc/kafka-rest/kafka-rest.properties
RUN sed -i 's/\r//g' /usr/local/confluent/etc/kafka-rest/kafka-rest.properties
# kakfa-rest 실행
CMD /usr/local/confluent/bin/kafka-rest-start /usr/local/confluent/etc/kafka-rest/kafka-rest.properties
3. config 폴더를 생성한다.
4. config 폴더 안에 kafka-rest.properties 파일을 생성하고 아래의 속성을 입력한다. 기존에 생성했던 zookeeper와 kafka에 대한 정보들이 적혀있다.
id=default
# schema.registry.url=http://localhost:8081
zookeeper.connect=pipeline-zookeeper-a:2181,pipeline-zookeeper-b:2181,pipeline-zookeeper-c:2181
bootstrap.servers=PLAINTEXT://pipeline-kafka-1:9092,PLAINTEXT://pipeline-kafka-2:9092,PLAINTEXT://pipeline-kafka-3:9092,PLAINTEXT://pipeline-kafka-4:9092,PLAINTEXT://pipeline-kafka-5:9092
5. 적당한 곳에 docker-compose.yml 파일을 생성하고 아래의 설정을 입력한다. 8082포트는 kafka-rest가 사용하는 포트이다.
version: '3.8'
networks:
default:
external:
name: zoo
services:
pipeline-confluent-kafka-rest:
container_name: pipeline-confluent-kafka-rest
hostname: pipeline-confluent-kafka-rest
image: pipeline-confluent-kafka-rest
restart: always
ports:
- 8082:8082
6. 아래의 명령어로 도커를 빌드한다.
docker build --tag pipeline-confluent-kafka-rest .
7. 아래의 명령어로 docker-compose를 실행한다.
docker-compose up -d
8. docker ps 명령어로 실행이 잘 되는지 확인한다.
Confluent Kafka REST 테스트하기
1. 파이썬을 사용하여 요청을 보내보려고 한다.
2. 토픽 생성 및 보내기
kafka는 기본 설정으로 topic에 메시지를 보내거나 받으려고 할 때 존재하지 않으면 자동으로 생성한다. 이 옵션을 끄는 편이 좋다.
import requests
import json
headers = {
'Content-Type': 'application/vnd.kafka.json.v2+json',
}
data = '{"records":[{"value":{"id":"probiotics"}}]}'
response = requests.post('http://localhost:8082/topics/topic_a', headers=headers, data=data)
print(response)
print(json.dumps(response.json(), indent=4))
3. 생성된 토픽 정보를 확인하려면 아까 생성했든 토픽 이름을 포함하여 이렇게 간단하게 요청을 보내면 된다.
import requests
import json
response = requests.get('http://localhost:8082/topics/topic_a/')
print(response)
print(json.dumps(response.json(), indent=4))
4. 프로듀서가 보낸 토픽을 소비하는 곳을 만드려면 consumer를 등록해야 한다. 아래의 요청으로 등록한다.
import requests
import json
headers = {
'Content-Type': 'application/vnd.kafka.v2+json',
}
data = '{"name": "consumer_a_instance", "format": "json", "auto.offset.reset": "earliest"}'
response = requests.post('http://localhost:8082/consumers/consumer_a', headers=headers, data=data)
print(response)
print(json.dumps(response.json(), indent=4))
5. 아까 생성했던 topic_a를 구독하겠다고 요청을 보낸다. 결과값은 빈 값이다.
import requests
import json
headers = {
'Content-Type': 'application/vnd.kafka.v2+json',
}
data = '{"topics":["topic_a"]}'
response = requests.post('http://localhost:8082/consumers/consumer_a/instances/consumer_a_instance/subscription', headers=headers, data=data)
print(response)
print(response.text)
6. 소비 할 수 있는 메시지가 있는지 확인한다.
import requests
import json
headers = {
'Accept': 'application/vnd.kafka.json.v2+json',
}
response = requests.get('http://localhost:8082/consumers/consumer_a/instances/consumer_a_instance/records', headers=headers)
print(response)
print(json.dumps(response.json(), indent=4))
7. 완료!
'데이터 엔지니어링' 카테고리의 다른 글
[🧙Kafka] 카프카 구축 (5) - 카프카 UI 간단하게 구성하기 (0) | 2021.04.08 |
---|---|
[🧙Kafka] 카프카 구축 (4) - 카프카 설정 구성하기 (1) | 2021.04.07 |
[🧙Kafka] 카프카 구축 (2) - 카프카 도커로 쉽게 설치하기 (0) | 2021.04.06 |
[🧙Kafka] 카프카 구축 (1) - 주키퍼 앙상블 쉽게 구축하기 (2) | 2021.04.06 |
[🔥Spark] Spark Streaming 이란? (0) | 2021.04.01 |