일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- spark
- docker
- 스파크 스트리밍
- 에어플로우
- 데이터
- MySQL
- AWS
- delta lake
- 컬럼 기반
- Schema Registry
- 델타레이크
- 대용량 처리
- Data engineering
- airflow
- 스파크
- 레드시프트
- 카프카
- spark streaming
- s3
- 데이터 엔지니어링
- Data Warehouse
- 카프카 구축
- Data Engineer
- Zookeeper
- kafka
- kafka rest api
- Redshift
- Parquet
- 데이터 웨어하우스
- 데이터 엔지니어
- Today
- Total
목록데이터 엔지니어링 (46)
데이터 엔지니어 기술 블로그
About 이전에 카프카에서 Producer로 메시지를 보냈는데 메시지를 소비하기 위해 Consumer를 만들어보려고 한다. 컨슈머 그룹(Consumer Group) 프로듀서가 생산한 메시지를 하나의 컨슈머가 따라갈 수가 없다면 여러개의 컨슈머를 두어야 한다. 카프카 컨슈머는 컨슈머 그룹에 속한다. 하나의 컨슈머 그룹에서 파티션이 4개일 때 컨슈머가 2개라면 2개씩 나눠가지고, 컨슈머가 4개라면 파티션을 1개씩 나눠가진다. 만약 5개라면 컨슈머 하나는 메시지를 받을 수 없다. 컨슈머 그룹 하나에서는 서로 다른 메시지를 수신하여 처리를 한다. 실시간 처리를 하는 곳과 메시지의 데이터를 저장하는 곳을 따로 두고 싶다면 컨슈머 그룹은 두개가 필요하다. 리밸런싱(rebalancing) 토픽의 파티션 1개의 소유..
스파크 기본 파이썬에서 스파크 사용하기 SparkContext의 인스턴스를 만들면 스파크 클러스터에 연결해서 사용할 수 있게 해준다. SparkConf로 스파크에 대한 구성을 할 수 있다. sc = SparkContext.getOrCreate() # Verify SparkContext print(sc) # Print Spark version print(sc.version) 데이터프레임 사용하기 스파크의 코어 데이터 구조는 RDD라고 하는데 Resilient Distributed Dataset(탄력성있는 분산된 데이터셋) 이라는 뜻이다. RDD는 낮은 레벨에 있어서 사용하기 어렵기 때문에 Spark DataFrame 이라는 더 높은 레벨의 데이터프레임을 사용하는 것이 편하다. Spark DataFrames..
About 카프카에서 데이터를 전송할 때 직렬화를 통해 바이너리 배열로 변환해야 한다. 직렬화를 하는 이유는 어떤 값을 참조하는 주소가 담긴 변수를 저장했다고 했을 때, 다시 불러오게 된다고 해도 가르키던 값의 주소가 달라졌기 때문에 의미가 없다. 그래서 참조 값(Reference Type)은 저장하거나 보낼 수 없고 값(Value Type)만 저장하거나 보낼 수 있다. Serializer JSON 파이썬에서 직렬화, 역직렬화 방법은 다음과 같다. 파이썬 프로그래밍을 하면 array, dict 타입의 데이터를 파일로 저장하거나 불러올 때 아래의 함수(직렬화 및 역직렬화)를 자주 사용하게 된다. 스키마가 따로 존재하지 않아서 데이터를 보낼 때 전부 보내야하는 부담이 있다. # 직렬화 json_value =..
About 카프카는 Java를 제공하지만 go, python 등에서 서드파티에서 사용할 수 있도록 해주기도 한다. 이번에는 파이썬 애플리케이션에서 카프카로 보내는 방법과 추가 구성들을 알아보려고 한다. Example bootstrap.servers 최초 연결을 위한 브로커 서버 목록을 설정한다. 모든 브로커를 포함할 필요는 없지만 여러개로 설정해두어야 부트스트랩 서버 중 하나가 문제가 생겼을 때 다음 서버로 시도할 수 있다. key.serializer 메시지의 키를 직렬화할 직렬처리기의 이름을 여기에 설정한다. 직렬처리기는 객체를 Byte Array로 변환해준다. ByteArraySerializer, StringSerializer, IntegerSerializer라는 직렬처리기가 존재하고 사용자가 직접 ..
About 카프카는 클라이언트 API를 가지고 있는데 이것을 사용해서 프로듀서와 컨슈머 어플리케이션을 개발할 수 있다. 카프카는 서드파티 클라이언트를 사용할 수 있는데 python, C++, go 등에서도 사용할 수 있다. 카프카는 처리량이 많은 작업에 사용될 수 있다. 예를 들면 웹사이트 클릭 로그를 수집하거나 신용카드를 사용한 거래 등에서 사용될 수 있다. 프로듀서에서 카프카 클러스터까지의 처리 방식 메시지 클래스인 ProducerRecord의 객체를 생성한다. 토픽과 값을 입력하는 것이 기본이고 파티션과 키는 옵션으로 지정할 수 있다. send() 함수를 이용해 Serializer에서 네트워크로 전송할 수 있는 Byte Array로 변환한다. 그 후 Partitioner에서 메시지를 어떤 파티션으로..
About 카프카를 GUI로 보기 위해서 kafdrop 이라는 라이브러리를 사용하려고 한다. docker로 미리 만들어진 이미지가 있어서 정말 간단하게 사용할 수 있다. Tutorial & Example 1. 작업할 폴더를 생성한다. 2. docker-compose.yml 파일을 생성하고 아래처럼 입력한다. - KAFKA_BROKERCONNECT에 기존에 생성했던 카프카 브로커를 입력해주었다. version: "3.8" services: kafdrop: image: obsidiandynamics/kafdrop restart: "always" ports: - "9000:9000" environment: KAFKA_BROKERCONNECT: "pipeline-kafka-1:9092,pipeline-kafka..
About 카프카를 사용할 때 운영으로 사용하려면 구성을 잘 조절해야 한다. 스트리밍 데이터로 많은 데이터가 들어오고 나가기 때문에 비용 차이, 속도 차이가 클 수 있다. /config/server.properties 파일을 수정하여 설정을 적용할 수 있다. 카프카 구성하기 broker.id=1 카프카는 클러스터로 구축할 때 broker.id를 기준으로 브로커를 분류하기 때문에 하나의 클러스터에서 여러개의 broker.id를 지정할 수 없고 각기 다르게 지정해주어야 한다. port=9092 카프카는 기본 포트로 9092를 사용한다. num.network.threads=3 네트워크 요청을 처리하는 쓰레드 수 지정할 수 있다. num.io.threads=8 IO가 발생했을 때 사용되어지는 쓰레드의 개수를 지..
About 카프카 구축 (1) 과 카프카 구축 (2) 에서 구축했던 카프카 클러스터는 각자 다른 host를 가지고 있기 때문에 프로듀서가 카프카로 요청을 보낼 때 어디로 보내야 할 지 확실하지가 않다. 프로듀서에서 카프카 클러스터 중 하나의 브로커로 보낸다고 했을 때 그 브로커가 어떠한 문제로 종료되면 요청을 보낼수가 없다. 그래서 하나의 엔드포인트가 필요하다. 엔드포인트에 요청을 보내면 문제가 생긴 인스턴스가 아닌 곳으로 요청을 보내 처리할 수 있도록 고가용성을 확보할 수 있다. 직접 REST API를 만드려고 했으나 Confluent에서 오픈소스로 지원해주어서 사용하여 구축하려고 한다. Confluent의 기능을 사용할 수 있는 방법은 두가지가 있다. Confluent Platform 과 Conflu..
About 이번에는 카프카를 도커로 설치하고 '카프카 설치 (1)' 에서 했던 주키퍼 앙상블을 연결하려고 한다. 하나의 컴퓨터에서 클러스터를 구성해보고, 배포할 때 크게 변경 없이 배포하기 위해 도커로 구성하려고 한다. 카프카 브로커는 5개로 구성하고 크기를 쉽게 변경 가능하도록 하려고 한다. Tutorial & Example 1. 작업할 폴더를 생성한다. 2. Dockerfile을 아래와 같이 작성한다. FROM ubuntu:18.04 RUN mkdir -p /root/install RUN apt-get update WORKDIR /root/install ENV DEBIAN_FRONTEND noninteractive ENV JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 ..
About 카프카 브로커에서 브로커와 토픽의 메타데이터를 저장하기 위해 주키퍼를 사용한다. 주키퍼를 단독으로 구성할 수도 있지만, 주키퍼를 클러스터로 구성하여 고가용성을 확보한 것을 주키퍼 앙상블(Zookeeper Ensemble)이라고 한다. 주키퍼 서버는 홀수로 구성하는 것을 권고하고 있기 때문에 3대로 구성하려고 한다. 홀수로 구성하는 이유는 예를 들어서 4대로 구성을 했을 때 결함에 대한 장애 대비 기능(failover)가 3대로 구성한 것과 다르지 않기 때문인데, 짝수로 구성해도 다른 큰 문제는 없다. 하나의 OS에서 3개의 도커를 띄워 구성해보려고 한다. Tutorial & Example 1. 작업할 폴더를 생성한다. 2. Dockerfile을 아래와 같이 작성한다. FROM ubuntu:18..