데이터 엔지니어 기술 블로그

[🔥Spark] Spark Streaming + Kafka 연동하기 본문

데이터 엔지니어링

[🔥Spark] Spark Streaming + Kafka 연동하기

jun_yeong_park 2021. 10. 29. 20:54
반응형

개요

카프카를 사용하는 경우 데이터를 가공해서 다시 카프카로 넣거나 다른 곳으로 보내는 등의 처리를 해줄 곳이 반드시 필요하다.

Spark Streaming을 사용하면 문제를 쉽게 해결할 수 있다. 자체적으로 kafka에서 읽고 처리 후 kafka로 보내는 기능이 포함되어 있으며 Spark에서는 1.2버전부터 파이썬에서 Spark Streaming을 사용할 수 있게 되었다.

Spark Streaming에는 DStreams라는 기능이 있고, 그 위에 DataFrame을 사용하여 더 쉽게 처리를 할 수 있는 Structed Streaming이 있다. 여기에서는 Structed Streaming을 사용하려고 한다.

 

 

방법

spark.readStream을 사용하여 카프카의 어떤 토픽에서 데이터를 가져올지 정한다.

kafka_bootstrap_servers = 'your kafka bootstrap servers'
topic = 'your kafka topic'
df = spark \
  .readStream\
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
  .option("subscribe", topic) \
  .load()

 

데이터는 key, value로 들어온다. df을 기존에 스파크에서 처리했던 것처럼 처리한다.

  • withColumn, dropna, udf 등을 사용하여 처리한다.

 

처리된 결과를 카프카로 다시 쓴다.

topic = "your destination topic name"
ds = df \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", kafka_bootstrap_servers) \
  .option("topic", topic) \
  .option("checkpointLocation", "/tmp/your/checkpoint/location") \
  .start()

ds.awaitTermination()
  • checkpointLocation을 반드시 작성하지 않으면 오류가 발생한다.
    • checkpoint는 처리를 하다가 문제가 생길 경우 그 위치부터 다시 시작할 수 있도록 도와준다.
    • 체크포인트 관련 문서
  • awaitTermination()
    • 프로그램이 종료될 때까지 기다린다. 운영 환경에서는 반드시 사용해야한다.
    • 만약 Zeppelin에서 개발용으로 사용하고 있다면 ds.stop()과 같은 명령어로 직접 제어할 수 있다.
    • 스트리밍 쿼리 제어 관련 문서

 

 

제출하기

--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2

스파크 어플리케이션을 제출하는 경우 spark-submit에 옵션으로 넣어주어야한다.

만약 Zeppelin에서 사용하는 경우 Interpreter -> spark 설정에서 Property를 추가해주어야한다.

Interpreter
spark.jars.packages

 

 

이슈

카프카에 다시 쓸 때 dictionary 형태의 값을 그대로 쓰는 경우

{item={channel=a, message=test-message}, name=slack}

위와 같이 = 으로 구분되어 들어간다.

 

 

readStream 에러 로그 확인

 

Zeppelin에서 개발시 스트리밍 처리를 하다가 오류가 발생할 경우 중지될 수 있다. 그러나 오류를 확실하게 볼 수가 없는데 이럴 경우 kafka.readStream을 read로 바꿔서 먼저 테스트해보면 무슨 문제인지 쉽게 확인할 수 있다.

 

 

Required attribute 'value' not found

카프카로 다시 쓸 때 위의 오류가 발생할 수 있다. 데이터프레임에 value 컬럼이 존재해야한다.

 

 

AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".

org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2를 추가해주지 않을 경우 위의 문제가 발생한다.(위의 제출하기 참조)

 

 

 

반응형
Comments