일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- kafka
- 대용량 처리
- s3
- 레드시프트
- 컬럼 기반
- Data Engineer
- Data Warehouse
- 스파크 스트리밍
- spark streaming
- AWS
- Parquet
- 카프카
- 에어플로우
- Zookeeper
- 데이터
- 데이터 엔지니어링
- 델타레이크
- docker
- kafka rest api
- airflow
- spark
- Schema Registry
- 카프카 구축
- 스파크
- Data engineering
- Redshift
- 데이터 엔지니어
- MySQL
- 데이터 웨어하우스
- delta lake
- Today
- Total
데이터 엔지니어 기술 블로그
[🔥Spark] Spark Streaming + Kafka 연동하기 본문
개요
카프카를 사용하는 경우 데이터를 가공해서 다시 카프카로 넣거나 다른 곳으로 보내는 등의 처리를 해줄 곳이 반드시 필요하다.
Spark Streaming을 사용하면 문제를 쉽게 해결할 수 있다. 자체적으로 kafka에서 읽고 처리 후 kafka로 보내는 기능이 포함되어 있으며 Spark에서는 1.2버전부터 파이썬에서 Spark Streaming을 사용할 수 있게 되었다.
Spark Streaming에는 DStreams라는 기능이 있고, 그 위에 DataFrame을 사용하여 더 쉽게 처리를 할 수 있는 Structed Streaming이 있다. 여기에서는 Structed Streaming을 사용하려고 한다.
방법
spark.readStream을 사용하여 카프카의 어떤 토픽에서 데이터를 가져올지 정한다.
kafka_bootstrap_servers = 'your kafka bootstrap servers'
topic = 'your kafka topic'
df = spark \
.readStream\
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("subscribe", topic) \
.load()
데이터는 key, value로 들어온다. df을 기존에 스파크에서 처리했던 것처럼 처리한다.
- withColumn, dropna, udf 등을 사용하여 처리한다.
처리된 결과를 카프카로 다시 쓴다.
topic = "your destination topic name"
ds = df \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
.option("topic", topic) \
.option("checkpointLocation", "/tmp/your/checkpoint/location") \
.start()
ds.awaitTermination()
- checkpointLocation을 반드시 작성하지 않으면 오류가 발생한다.
- checkpoint는 처리를 하다가 문제가 생길 경우 그 위치부터 다시 시작할 수 있도록 도와준다.
- 체크포인트 관련 문서
- awaitTermination()
- 프로그램이 종료될 때까지 기다린다. 운영 환경에서는 반드시 사용해야한다.
- 만약 Zeppelin에서 개발용으로 사용하고 있다면 ds.stop()과 같은 명령어로 직접 제어할 수 있다.
- 스트리밍 쿼리 제어 관련 문서
제출하기
--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2
스파크 어플리케이션을 제출하는 경우 spark-submit에 옵션으로 넣어주어야한다.
만약 Zeppelin에서 사용하는 경우 Interpreter -> spark 설정에서 Property를 추가해주어야한다.
이슈
카프카에 다시 쓸 때 dictionary 형태의 값을 그대로 쓰는 경우
{item={channel=a, message=test-message}, name=slack}
위와 같이 = 으로 구분되어 들어간다.
readStream 에러 로그 확인
Zeppelin에서 개발시 스트리밍 처리를 하다가 오류가 발생할 경우 중지될 수 있다. 그러나 오류를 확실하게 볼 수가 없는데 이럴 경우 kafka.readStream을 read로 바꿔서 먼저 테스트해보면 무슨 문제인지 쉽게 확인할 수 있다.
Required attribute 'value' not found
카프카로 다시 쓸 때 위의 오류가 발생할 수 있다. 데이터프레임에 value 컬럼이 존재해야한다.
AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".
org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2를 추가해주지 않을 경우 위의 문제가 발생한다.(위의 제출하기 참조)
'데이터 엔지니어링' 카테고리의 다른 글
[🔥Spark] StreamingQueryException: Cannot find earliest offsets of Set(topic-name-0) (0) | 2021.11.02 |
---|---|
[🔥Spark] 스파크 설정하는 방법(Spark Configuration, SparkConf) (0) | 2021.11.01 |
[🔥Spark] 스파크 AccessControlException: Permission denied 해결방법 (0) | 2021.10.29 |
[Airflow] Airflow + Celery Executor 구축(Docker) (1) | 2021.10.15 |
[🧙Kafka] Confluent License (0) | 2021.10.14 |