데이터 엔지니어 기술 블로그

[🧙Kafka] S3 Sink Connector: 카프카 S3와 연동하기 본문

데이터 엔지니어링

[🧙Kafka] S3 Sink Connector: 카프카 S3와 연동하기

jun_yeong_park 2021. 7. 9. 02:18
반응형

개요

kafka connect

카프카로 데이터를 이동시킬 때 Kafka Connect를 사용하면 쉽고 편하게 이동시키는데에 도움이 된다. 예를 들면 MySQL에서 Kafka를 거쳐 S3에 넣고 싶을 때 JDBC Source Connector를 사용하여 MySQL에서 Kafka에 넣고, Kafka에서 S3 Sink Connector를 사용하여 S3에 넣을 수 있다. 여기에서 source connector는 데이터 소스에서 카프카로 데이터를 넣는 커넥터이고, sink connector는 데이터를 카프카에서 목적지로 데이터를 이동하는 커넥터이다.

 

이번에는 S3 Sink Connector를 사용하여 Kafka Connect가 어떻게 동작하는지 이해해보려고 한다.

 

 

시작하기

환경

  • Docker ubuntu:20.04
  • kafka 2.8.0

 

 

Kafka 설치하기

카프카 클러스터가 아닌 명령어를 실행하기 위한 카프카를 설치한다.

wget https://mirror.navercorp.com/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
tar -xvf kafka_2.13-2.8.0.tgz

 

Worker 실행하기

Kafka Connector가 동작을 하기 위해서는 Worker가 필요하다.

Worker는 분산 모드와 스탠드얼론 모드로 실행할 수 있으며 분산 모드에서는 내결함성을 확보할 수 있다. 분산 모드를 위해 Worker가 두 대의 컴퓨터에 설치되었다고 하고 같은 group.id를 사용한다면, 카프카는 자동으로 커넥터를 나눠서 배포하고 만약 커넥터 A가 동작하고 있는 워커에서 문제가 생긴다면 자동으로 다른 워커에서 받아서 이어서 처리한다.

 

스탠드얼론 모드로 실행할 때는 worker 설정과 connector의 설정까지 같이 넣어서 실행하고, 분산 모드에서는 worker 설정만 입력해서 실행하며그 뒤에는 REST API로 통신하여 connector를 실행할 수 있다.

 

워커는 자신의 작업이 어디까지 진행되었고 어떤 상태인지 구성, 오프셋, 상태 등을 저장하는데 이를 내부 토픽이라고 한다. worker 설정 파일에서 다음 속성이 내부 토픽과 관련이 있다.

  • config.storage.topic
    config.storage.topic example
  • status.storage.topic
  • offset.storage.topic

 

 

worker.properties 준비하기

{your_kafka_path}/config/connect-distributed.properties 파일에 속성을 다음과 같이 설정한다.

  • bootstrap.servers=kafka-bootstrap-1:9092,kafka-bootstrap-2:9092
    • 카프카 클러스터에 대한 초기 연결을 설정하는 host:port 목록
  • group.id=connect-cluster
    • connect 클러스터를 구분할 때 사용한다. 같은 이름의 group.id worker는 같은 클러스터에 포함된다.
  • key.converter=org.apache.kafka.connect.json.JsonConverter
    • 커넥터를 지나면서 key를 어떻게 변환할지 설정할 수 있다.
  • value.converter=org.apache.kafka.connect.json.JsonConverter
    • 커넥터를 지나면서 value를 어떻게 변환할지 설정할 수 있다.
  • key.converter.schemas.enable=false
    • 이 값을 true로 두면 메세지의 key에 스키마를 포함할 수 있다.
  • value.converter.schemas.enable=false
    • 이 값을 true로 두면 메세지의 value에 스키마를 포함할 수 있다.
  • offset.storage.topic=connect-offsets
    • 오프셋 저장에 사용할 주제
  • offset.storage.replication.factor=3
  • offset.storage.partitions=25
  • config.storage.topic=connect-configs
    • 커넥터 작업 구성을 저장하는데 사용할 주제
  • config.storage.replication.factor=3
  • status.storage.topic=connect-status
    • 상태 저장에 사용할 주제
  • status.storage.replication.factor=3
  • status.storage.partitions=5
  • offset.flush.interval.ms=10000
    • 작업에 대한 오프셋 커밋을 하는 시간 간격
  • rest.host.name=
    • worker rest api host name
  • rest.port=8083
    • worker rest api port
  • plugin.path=/usr/local/share/kafka/plugins
    • 플러그인이 저장될 경로

 

standalone mode(스탠드얼론 모드) 실행 방법 - 여기에서는 이 방법으로 실행 안함

bin/connect-standalone worker.properties connector1.properties [connector2.properties connector3.properties ...]
  • worker.properties
    • 워커의 설정이 담긴 파일의 경로를 입력하면 된다.
  • connector1.properties
    • 커넥터의 설정이 담긴 파일의 경로를 입력하면 된다.

 

distributed mode(분산 모드) 실행 방법 - 여기에서는 이 방법으로 실행

bin/connect-distributed config/connect-distributed.properties
  • 분산 모드에서는 아래의 속성들이 일치해야 카프카에서 잘 분산해서 작업을 시킬 수 있다. 만약 다른 connect 클러스터를 구축하려면 group.id와 내부 주제를 다른 이름으로 사용해야 한다.
    • group.id
    • status.storage.topic
    • offset.storage.topic
    • config.storage.topic
    • config.storage.replication.factor
    • offset.storage.replication.factor
    • offset.storage.partitions
    • status.storage.replication.factor
    • status.storage.partitions

worker 실행 화면

  • 잘 실행되었는지 확인해보기 위해 Worker에 요청을 보내본다.
    • curl -XGET http://localhost:8083
      {"version":"2.8.0","commit":"ebb1d6e21c92130","kafka_cluster_id":"yRPN8K4URxyAr7kjiAAzA"}
      curl -XGET http://localhost:8083/connector-plugins
      [{"class":"io.confluent.connect.s3.S3SinkConnector","type":"sink","version":"10.0.0"},{"class":"io.confluent.connect.storage.tools.SchemaSourceConnector","type":"source","version":"2.8.0"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.8.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.8.0"},{"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector","type":"source","version":"1"},{"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","type":"source","version":"1"}]
  • 더 자세한 REST API는 이 링크를 참조한다.

 

 

S3 Sink Connector 설치 및 등록하기

S3 Sink Connector를 등록하기 위해서는 Plugin으로 설치를 해야한다. 플러그인의 경로는 worker 설정파일에서 plugin.path 속성을 변경하면 된다.

 

플러그인은 두가지 방법으로 사용할 수 있다. 

  1. uber jar을 지정된 경로에 배치한다.
    • uber jar란 자바 어플리케이션의 모든 패키지와 그와 의존관계에 있는 패키지 라이브러리까지 모두 하나의 jar에 묶은것이다.
    • 어플리케이션 배포 시 의존관계를 생각하지 않아도 된다.
  2. 플러그인 디렉토리를 배치한다.
    • 이 방법을 주로 사용한다.

 

S3로 이동할 Topic 생성

bin/kafka-topics.sh --create --if-not-exists --topic "{your_topic_name}" --zookeeper "{zookepper_str}" --replication-factor 2 --partitions 1

 

S3 Sink Connector 다운로드

  1. 이 링크에서 다운로드한 후 plugin.path에 설정한 경로(/usr/local/share/kafka/plugins)에 압축 해제한다.
    • 이대로 커넥터를 사용하면 Error: Java.lang.NoClassDefFoundError: com/google/common/base/Preconditions 오류가 발생한다. 내부에서 이 클래스를 사용하는 것 같은데 lib에 존재하지 않아서 생기는 문제로 예상된다.
    • guava-30.1.1-jre.jar이 링크에서 다운로드 받아서 lib폴더에 넣어서 사용하면 해결할 수 있다.
  2. worker를 다시 실행한다.

 

S3 Sink Connector 등록하기

  1. worker에 connector를 실행해달라고 요청하기 위해 준비한다.
    • 설정에 대한 자세한 설명은 이 링크를 참조한다.
  2. 아래와 같은 요청으로 커넥터를 등록한다.
    import json
    import requests
    
    
    headers = {
        'Content-Type': 'application/json',
        'Accept': 'application/json'
        }
    
    data = {
        "name": "your-connector-name",
        "config": {
            "topics": "your-topic-name",
            "tasks.max": 3,
            "connector.class": "io.confluent.connect.s3.S3SinkConnector",
            "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
            "storage.class": "io.confluent.connect.s3.storage.S3Storage",
            "flush.size": 1000,
            "s3.bucket.name": "your-bucket-name",
            "s3.region": "ap-northeast-1",
            "topics.dir": "your-s3-dir",
            "s3.compression.type": "gzip",
            "locale": "ko_KR",
            "timezone": "Asia/Seoul"
        }
    }
    
    response = requests.post(f'http://localhost:8083/connectors', json.dumps(data, ensure_ascii=False), headers=headers)
  3. 입력한 토픽에 프로듀서로 메세지를 보내면 flush.size인 1000개마다 S3에 저장하는 것을 확인할 수 있다.
  4. 파티셔너 설정으로 json 파일 안의 timestamp 컬럼으로 S3에 파티션을 나누어 저장하거나, 현재 시간에 기반하여 파티션을 나누어 저장하는 등의 설정을 할 수 있다. 자세한 설명은 이 링크를 참조한다.

 

 

 

반응형
Comments