일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- spark streaming
- Data Engineer
- spark
- airflow
- 델타레이크
- docker
- 데이터 엔지니어링
- 데이터
- kafka
- Zookeeper
- s3
- Schema Registry
- Redshift
- AWS
- 데이터 엔지니어
- Data Warehouse
- 레드시프트
- delta lake
- 데이터 웨어하우스
- kafka rest api
- 스파크 스트리밍
- MySQL
- 에어플로우
- 대용량 처리
- Parquet
- 카프카
- 스파크
- 카프카 구축
- Data engineering
- 컬럼 기반
- Today
- Total
목록카프카 (19)
데이터 엔지니어 기술 블로그
Spark Streaming Kafka Option Example df = spark \ .readStream \ .format("kafka") \ .option("option name", "option value") \ .load() startingTimestamp 구독중인 주제의 모든 파티션에 대한 시작 타임스탬프를 지정한다. 타입: timestamp string e.g. "1000" 기본값: none (next preference is startingOffsetsByTimestamp) startingOffsetsByTimestamp 쿼리가 시작될 때 타임스탬프의 시작점 타입: json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 200..
개요 카프카를 사용하는 경우 데이터를 가공해서 다시 카프카로 넣거나 다른 곳으로 보내는 등의 처리를 해줄 곳이 반드시 필요하다. Spark Streaming을 사용하면 문제를 쉽게 해결할 수 있다. 자체적으로 kafka에서 읽고 처리 후 kafka로 보내는 기능이 포함되어 있으며 Spark에서는 1.2버전부터 파이썬에서 Spark Streaming을 사용할 수 있게 되었다. Spark Streaming에는 DStreams라는 기능이 있고, 그 위에 DataFrame을 사용하여 더 쉽게 처리를 할 수 있는 Structed Streaming이 있다. 여기에서는 Structed Streaming을 사용하려고 한다. 방법 spark.readStream을 사용하여 카프카의 어떤 토픽에서 데이터를 가져올지 정한다...
카프카는 메시지를 보내는 Producer와 Consuemr로 이루어져 있다. 카프카는 Producer가 메시지를 보낸 후 Consumer가 소비하려고 할 때 누가 보낸 메시지인지 확인할 수 있는 방법이 없다. 그래서 Producer가 메시지를 기존에 보내던 것과 다른 스키마 형식으로 보낸다면 Consumer는 바뀐 메시지를 받았을 때 문제가 크게 발생할 수도 있다. 이런 일을 방지하기 위해서 스키마 레지스트리를 사용할 수 있다. Schema Registry란? 스키마 레지스트리는 Producer와 Consumer가 주고 받으려는 메시지의 스키마를 서로 알게 해주고 호환을 강제한다. 예를 들면 Producer가 처음에 정의했던 스키마와 호환되지 않는 스키마를 보내려고 할 때 보낼 수 없게 막아준다. 스키마..
개요 카프카를 활용한 인프라를 설계해보기 위해 찾아보다가 Confluent 공동 창립자 중 한명인 네하 나크헤데의 프레젠테이션을 보게 되었다. 이번에는 그 내용을 정리해보려고 한다. 프레젠테이션 내용 지난 10년간 데이터 시스템은 많이 바뀌었다. 여러 DB에서 데이터 웨어하우스로 ETL하여 데이터를 넣는 방식이었다. 최근 몇가지 데이터 추세는 ETL 아키텍처의 극적인 변화를 주도하고 있다. Single Server Database는 전사적 규모로 분산 데이터 플랫폼으로 바뀌고 있다. 그리고 일반적인 데이터가 아닌 logs, sensors, metrics 등으로 데이터 타입이 많아지고 있으며 스트림 데이터는 점점 보편화(increasingly ubiquitous)되고 있다. 그래서 그 결과는 아주 복잡한 ..
개요 카프카로 데이터를 이동시킬 때 Kafka Connect를 사용하면 쉽고 편하게 이동시키는데에 도움이 된다. 예를 들면 MySQL에서 Kafka를 거쳐 S3에 넣고 싶을 때 JDBC Source Connector를 사용하여 MySQL에서 Kafka에 넣고, Kafka에서 S3 Sink Connector를 사용하여 S3에 넣을 수 있다. 여기에서 source connector는 데이터 소스에서 카프카로 데이터를 넣는 커넥터이고, sink connector는 데이터를 카프카에서 목적지로 데이터를 이동하는 커넥터이다. 이번에는 S3 Sink Connector를 사용하여 Kafka Connect가 어떻게 동작하는지 이해해보려고 한다. 시작하기 환경 Docker ubuntu:20.04 kafka 2.8.0 K..
Kinesis란? Kinesis는 실시간으로 데이터 스트림을 수집, 처리, 분석해주는 서비스이다. Data Streams 데이터 스트림 수집 및 저장 샤드의 수를 조절하여 스트림을 얼마나 받을지 조절할 수 있다. Data Firehose 데이터 스트림 처리 및 전송 Data Analytics 스트리밍 데이터 분석 실시간 분석 생성 – 지표를 계산하고, Kinesis를 통해 Amazon S3 또는 Amazon Redshift로 전송할 수 있다. 실시간 대시보드 제공 – 집계 및 처리된 스트리밍 데이터 결과를 전송하여 실시간 대시보드를 구성할 수 있다. 실시간 지표 생성 – 실시간 모니터링, 알림, 경보에 사용할 사용자 지정 지표와 트리거를 생성할 수 있다. Video Streams 재생 및 분석을 위해 미..
신뢰성 있는 데이터 전달 카프카는 웹 클릭 로그 수집, 신용카드 트랜잭션 등 다양한 방면에서 사용된다. 카프카는 신뢰성과 속도 등 사이에서 구성으로 유연하게 조절할 수 있다. 신뢰성 보장 1. 카프카는 메시지 순서를 보장한다. 2. 카프카에 메시지를 쓸 때 리플리카에 메시지를 복제하는 작업의 구성으로 신뢰성을 조절할 수 있다. 만약 acks 를 높게 두고 요청에 실패했다면 프로듀서에서 다시 재시도 할 수 있다. - acks = 0 메시지를 보내고 확인을 하지 않는다. - acks = 1 리더가 메시지를 썼는지 확인한다. - acks = all 모든 동기화 리플리카에 복제되었는지 확인한다. 3. 최소한 하나의 리플리카가 살아있다면 메시지는 유실되지 않는다. 4. 컨슈머는 모든 리플리카에 커밋된 메시지만 읽..
About 카프카의 내부를 다 이해할 필요는 없지만 어느정도 알아두면 좀 더 최적화하거나 문제가 발생했을 때 해결하는데 도움이 된다. 내부 매커니즘 클러스터와 주키퍼 컨트롤러 브로커 중 하나이고 리더를 선출해야하는 역할을 가지고 있다. 클러스터를 시작할 때의 첫 번째 브로커가 컨트롤러로 임명된다. 모든 브로커는 시작될 때 주키퍼에서 /controller 노드가 없으면 생성하려고 한다. 컨트롤러 브로커에 문제가 생기면 카프카의 Watch가 브로커들에게 컨트롤러가 사라졌다고 알려준다. 그러면 브로커들은 /controller 노드를 생성하려고 시도한다. 첫 번째로 생성한 노드가 컨트롤러가 되며 새로운 세대 번호를 받게 된다. 브로커들은 새로운 세대 번호를 알게 되고, 이전 세대 번호의 브로커에서 명령이 오면 ..
About 이전에 카프카에서 Producer로 메시지를 보냈는데 메시지를 소비하기 위해 Consumer를 만들어보려고 한다. 컨슈머 그룹(Consumer Group) 프로듀서가 생산한 메시지를 하나의 컨슈머가 따라갈 수가 없다면 여러개의 컨슈머를 두어야 한다. 카프카 컨슈머는 컨슈머 그룹에 속한다. 하나의 컨슈머 그룹에서 파티션이 4개일 때 컨슈머가 2개라면 2개씩 나눠가지고, 컨슈머가 4개라면 파티션을 1개씩 나눠가진다. 만약 5개라면 컨슈머 하나는 메시지를 받을 수 없다. 컨슈머 그룹 하나에서는 서로 다른 메시지를 수신하여 처리를 한다. 실시간 처리를 하는 곳과 메시지의 데이터를 저장하는 곳을 따로 두고 싶다면 컨슈머 그룹은 두개가 필요하다. 리밸런싱(rebalancing) 토픽의 파티션 1개의 소유..
About 카프카에서 데이터를 전송할 때 직렬화를 통해 바이너리 배열로 변환해야 한다. 직렬화를 하는 이유는 어떤 값을 참조하는 주소가 담긴 변수를 저장했다고 했을 때, 다시 불러오게 된다고 해도 가르키던 값의 주소가 달라졌기 때문에 의미가 없다. 그래서 참조 값(Reference Type)은 저장하거나 보낼 수 없고 값(Value Type)만 저장하거나 보낼 수 있다. Serializer JSON 파이썬에서 직렬화, 역직렬화 방법은 다음과 같다. 파이썬 프로그래밍을 하면 array, dict 타입의 데이터를 파일로 저장하거나 불러올 때 아래의 함수(직렬화 및 역직렬화)를 자주 사용하게 된다. 스키마가 따로 존재하지 않아서 데이터를 보낼 때 전부 보내야하는 부담이 있다. # 직렬화 json_value =..