일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- Data engineering
- 레드시프트
- AWS
- delta lake
- 데이터 엔지니어링
- 에어플로우
- Zookeeper
- Parquet
- Data Warehouse
- 컬럼 기반
- 대용량 처리
- airflow
- kafka
- 델타레이크
- spark streaming
- 카프카 구축
- docker
- Redshift
- s3
- 데이터
- 스파크
- 데이터 엔지니어
- MySQL
- Schema Registry
- kafka rest api
- 카프카
- spark
- Data Engineer
- 데이터 웨어하우스
- 스파크 스트리밍
- Today
- Total
목록데이터 엔지니어링 (30)
데이터 엔지니어 기술 블로그
![](http://i1.daumcdn.net/thumb/C150x150/?fname=https://blog.kakaocdn.net/dn/ce3mEV/btq85k486H1/uNRIlbthY2e1bYrWOC5aMK/img.png)
개요 카프카로 데이터를 이동시킬 때 Kafka Connect를 사용하면 쉽고 편하게 이동시키는데에 도움이 된다. 예를 들면 MySQL에서 Kafka를 거쳐 S3에 넣고 싶을 때 JDBC Source Connector를 사용하여 MySQL에서 Kafka에 넣고, Kafka에서 S3 Sink Connector를 사용하여 S3에 넣을 수 있다. 여기에서 source connector는 데이터 소스에서 카프카로 데이터를 넣는 커넥터이고, sink connector는 데이터를 카프카에서 목적지로 데이터를 이동하는 커넥터이다. 이번에는 S3 Sink Connector를 사용하여 Kafka Connect가 어떻게 동작하는지 이해해보려고 한다. 시작하기 환경 Docker ubuntu:20.04 kafka 2.8.0 K..
![](http://i1.daumcdn.net/thumb/C150x150/?fname=https://blog.kakaocdn.net/dn/cRQVip/btq7iRqeV8M/6yItrraBQxKeZsJiivEZYk/img.png)
개요 Airflow는 복잡한 워크플로우를 프로그래밍 방식으로 작성해서, 스케줄링하고 모니터링할 수 있는 플랫폼이다. 데이터 파이프라인을 이루고 있는 ETL 스크립트들을 스케줄링 할 때 crontab, cloudwatch 등을 사용하는 곳이 많다. 그러나 스크립트들이 많아지고 서로에 대한 의존성이 생기게 되면 컨트롤하기 어렵고, 기존 작업이 실패했을 때 다시 스크립트를 실행하려면 로그를 확인하고 실행해야 하는 등의 문제점이 생긴다. 이러면 바로 복구할수도 없고, 어디서 잘못되었는지 확인하기도 어렵고, 의존성이 있는 스크립트가 잘못되었는데 그 다음 스크립트가 실행이 되버리는 등의 문제점이 발생할 수 있다. Airflow에는 서로에 대한 의존성을 표현할 수 있고, 스크립트가 실패했을 때 알람을 보내 확인하고 ..
![](http://i1.daumcdn.net/thumb/C150x150/?fname=https://blog.kakaocdn.net/dn/bqe1R2/btq61Kxu61u/7plqw9Gefkzk27377QtfIK/img.png)
Kinesis란? Kinesis는 실시간으로 데이터 스트림을 수집, 처리, 분석해주는 서비스이다. Data Streams 데이터 스트림 수집 및 저장 샤드의 수를 조절하여 스트림을 얼마나 받을지 조절할 수 있다. Data Firehose 데이터 스트림 처리 및 전송 Data Analytics 스트리밍 데이터 분석 실시간 분석 생성 – 지표를 계산하고, Kinesis를 통해 Amazon S3 또는 Amazon Redshift로 전송할 수 있다. 실시간 대시보드 제공 – 집계 및 처리된 스트리밍 데이터 결과를 전송하여 실시간 대시보드를 구성할 수 있다. 실시간 지표 생성 – 실시간 모니터링, 알림, 경보에 사용할 사용자 지정 지표와 트리거를 생성할 수 있다. Video Streams 재생 및 분석을 위해 미..
![](http://i1.daumcdn.net/thumb/C150x150/?fname=https://blog.kakaocdn.net/dn/ceICDo/btq6uI7Fg1c/yYLmIiTp4XwSVYZkAQOR3k/img.png)
개요 RDS에서 매일 생성되는 스냅샷으로 Athena에서 과거의 테이블을 사용할 수 있게 했다. 그러나 시간 문제가 두가지가 발생했다. 문제 첫 번째 문제 - 시간으로 필터할 수 없는 문제 S3에 데이터를 저장한 후 그대로 Glue로 크롤링 후 Athena에서 날짜로 필터해서 읽으려고 시도했다. 그러나 쿼리 결과가 비어있었다. Athena는 Java Timestamp 형식이 필요하다고 하는데, 데이터가 그렇지 않은 것으로 예상된다. 해결 방법은 다음과 같다. 열을 스트링으로 정의한다. YYYY-MM-DD HH:MM:SS.fffffffff 형식으로 변환한다. 쿼리에서 Presto의 날짜 및 시간 함수를 사용하여 DATE 또는 TIMESTAMP로 읽는다. https://aws.amazon.com/ko/pre..
![](http://i1.daumcdn.net/thumb/C150x150/?fname=https://blog.kakaocdn.net/dn/vJMLS/btq6mC7oDIO/1N9RB8r1xvyfcpAwv7dNK0/img.png)
개요 Presto는 페이스북에서 아주 많은 데이터를 빠르고 효율적으로 분석하기 위해 개발되었다. 페이스북에서는 300PB 규모의 데이터 웨어하우스에 쿼리할 때 사용하고 있으며 현재는 Airbnb와 Dropbox 등에서도 사용되고 있다. 다양한 소스를 지원해주는데 Hive, Cassandra, RDB, AWS S3 등에서 데이터를 읽을 수 있다. Hive는 처리할 때 중간 결과를 디스크에 저장하지만 Presto는 메모리에 저장하기 때문에 속도가 훨씬 빠르다. 하지만 리소스를 더 사용하게 된다. Architecture Presto는 하나의 Coordinator가 다양한 소스에서 데이터를 가져와 Worker에게 전달을 해주는 역할을 한다. 쿼리 처리 방법 클라이언트에서 Coordinator로 쿼리를 보낸다. ..
개요 This type of correlated subquery pattern is not supported due to internal error 레드시프트에서 쿼리를 보낼 때 위와 같은 에러를 반환할 때가 있다. Redshift는 쿼리 플래너에서 쿼리 재작성을 이용하여 상관관계가 있는 패턴 등을 최적화한다. MySQL과는 처리해주는 방식이 다르기 때문에 지원하지 않는 상관관계를 가진 서브쿼리들이 있다. 지원하지 않는 서브쿼리 1번) 건너뛰기 수준의 상관관계 참조 event 테이블은 가장 상위 레벨에 있으나 가장 하위의 서브쿼리에 event 테이블이 상관관계로 사용되고 있다. select event.eventname from event where not exists ( select * from list..
![](http://i1.daumcdn.net/thumb/C150x150/?fname=https://blog.kakaocdn.net/dn/cSfOmu/btq2kqKUyQq/Hl7SSG9Ae0xt4r8sRtCKF1/img.png)
About 카프카의 내부를 다 이해할 필요는 없지만 어느정도 알아두면 좀 더 최적화하거나 문제가 발생했을 때 해결하는데 도움이 된다. 내부 매커니즘 클러스터와 주키퍼 컨트롤러 브로커 중 하나이고 리더를 선출해야하는 역할을 가지고 있다. 클러스터를 시작할 때의 첫 번째 브로커가 컨트롤러로 임명된다. 모든 브로커는 시작될 때 주키퍼에서 /controller 노드가 없으면 생성하려고 한다. 컨트롤러 브로커에 문제가 생기면 카프카의 Watch가 브로커들에게 컨트롤러가 사라졌다고 알려준다. 그러면 브로커들은 /controller 노드를 생성하려고 시도한다. 첫 번째로 생성한 노드가 컨트롤러가 되며 새로운 세대 번호를 받게 된다. 브로커들은 새로운 세대 번호를 알게 되고, 이전 세대 번호의 브로커에서 명령이 오면 ..
![](http://i1.daumcdn.net/thumb/C150x150/?fname=https://blog.kakaocdn.net/dn/bBMnDp/btq2lCCC8zK/E7oy3sBmozpa5EmspC8Mkk/img.png)
About 이전에 카프카에서 Producer로 메시지를 보냈는데 메시지를 소비하기 위해 Consumer를 만들어보려고 한다. 컨슈머 그룹(Consumer Group) 프로듀서가 생산한 메시지를 하나의 컨슈머가 따라갈 수가 없다면 여러개의 컨슈머를 두어야 한다. 카프카 컨슈머는 컨슈머 그룹에 속한다. 하나의 컨슈머 그룹에서 파티션이 4개일 때 컨슈머가 2개라면 2개씩 나눠가지고, 컨슈머가 4개라면 파티션을 1개씩 나눠가진다. 만약 5개라면 컨슈머 하나는 메시지를 받을 수 없다. 컨슈머 그룹 하나에서는 서로 다른 메시지를 수신하여 처리를 한다. 실시간 처리를 하는 곳과 메시지의 데이터를 저장하는 곳을 따로 두고 싶다면 컨슈머 그룹은 두개가 필요하다. 리밸런싱(rebalancing) 토픽의 파티션 1개의 소유..
![](http://i1.daumcdn.net/thumb/C150x150/?fname=https://blog.kakaocdn.net/dn/dkDwLl/btq2h2vbRjz/Hv9KA9YYX0tc3MMHdcili0/img.png)
스파크 기본 파이썬에서 스파크 사용하기 SparkContext의 인스턴스를 만들면 스파크 클러스터에 연결해서 사용할 수 있게 해준다. SparkConf로 스파크에 대한 구성을 할 수 있다. sc = SparkContext.getOrCreate() # Verify SparkContext print(sc) # Print Spark version print(sc.version) 데이터프레임 사용하기 스파크의 코어 데이터 구조는 RDD라고 하는데 Resilient Distributed Dataset(탄력성있는 분산된 데이터셋) 이라는 뜻이다. RDD는 낮은 레벨에 있어서 사용하기 어렵기 때문에 Spark DataFrame 이라는 더 높은 레벨의 데이터프레임을 사용하는 것이 편하다. Spark DataFrames..
![](http://i1.daumcdn.net/thumb/C150x150/?fname=https://blog.kakaocdn.net/dn/bwd1gs/btq127SGJKA/FKgwlfcBgynd46esOKhctK/img.png)
About 카프카는 Java를 제공하지만 go, python 등에서 서드파티에서 사용할 수 있도록 해주기도 한다. 이번에는 파이썬 애플리케이션에서 카프카로 보내는 방법과 추가 구성들을 알아보려고 한다. Example bootstrap.servers 최초 연결을 위한 브로커 서버 목록을 설정한다. 모든 브로커를 포함할 필요는 없지만 여러개로 설정해두어야 부트스트랩 서버 중 하나가 문제가 생겼을 때 다음 서버로 시도할 수 있다. key.serializer 메시지의 키를 직렬화할 직렬처리기의 이름을 여기에 설정한다. 직렬처리기는 객체를 Byte Array로 변환해준다. ByteArraySerializer, StringSerializer, IntegerSerializer라는 직렬처리기가 존재하고 사용자가 직접 ..