spark streaming(5)
-
[🔥Spark] java.lang.AssertionError: assertion failed: Concurrent update to the log. Multiple streaming jobs detected 해결방법
에러 메세지 java.lang.AssertionError: assertion failed: Concurrent update to the log. Multiple streaming jobs detected for 53 at scala.Predef$.assert(Predef.scala:223) 원인 로그에 대한 동시 업데이트, 여러 스트리밍 작업이 발견되었습니다. 스파크 스트리밍에서 동일한 체크포인트를 사용하는 두 개의 싱크 작업(writeStream)이 동시에 실행되면 발생하는 문제이다. checkpointLocation을 다른 위치로 사용하면 해결할 수 있다. Zeppelin에서 코드를 실행하고 같은 스트리밍 스크립트를 사용할 때 오류가 발생할 수 있다. 기존 같은 location을 사용한다. 동시에 같..
2021.11.04 -
[🔥Spark] Spark Streaming + Kafka Option
Spark Streaming Kafka Option Example df = spark \ .readStream \ .format("kafka") \ .option("option name", "option value") \ .load() startingTimestamp 구독중인 주제의 모든 파티션에 대한 시작 타임스탬프를 지정한다. 타입: timestamp string e.g. "1000" 기본값: none (next preference is startingOffsetsByTimestamp) startingOffsetsByTimestamp 쿼리가 시작될 때 타임스탬프의 시작점 타입: json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 200..
2021.11.01 -
[🔥Spark] Spark Streaming + Kafka 연동하기
개요 카프카를 사용하는 경우 데이터를 가공해서 다시 카프카로 넣거나 다른 곳으로 보내는 등의 처리를 해줄 곳이 반드시 필요하다. Spark Streaming을 사용하면 문제를 쉽게 해결할 수 있다. 자체적으로 kafka에서 읽고 처리 후 kafka로 보내는 기능이 포함되어 있으며 Spark에서는 1.2버전부터 파이썬에서 Spark Streaming을 사용할 수 있게 되었다. Spark Streaming에는 DStreams라는 기능이 있고, 그 위에 DataFrame을 사용하여 더 쉽게 처리를 할 수 있는 Structed Streaming이 있다. 여기에서는 Structed Streaming을 사용하려고 한다. 방법 spark.readStream을 사용하여 카프카의 어떤 토픽에서 데이터를 가져올지 정한다...
2021.10.29 -
[🔥Spark] 스파크 AccessControlException: Permission denied 해결방법
개요 스파크를 실행하려고 할 때 권한 오류가 발생할 수 있다. 이 경우에는 AWS EMR의 Zeppelin에서 스크립트를 실행하는데에 오류가 발생했다. org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/":hdfs:hdfsadmingroup:drwxr-xr-x 해결방법 권한을 검사하지 않게 하는 방법도 있지만, 간단한 해결 방법은 유저에게 권한을 주면 해결할 수 있다. dfs.permissions.superusergroup 을 보면 어떤 그룹이 superusergroup인지 확인할 수 있다. >> cat /etc/hadoop/conf/hdfs-site.xml ... dfs..
2021.10.29 -
[🔥Spark] Spark Streaming 이란?
Spark Streaming 개요 스트리밍 처리는 실시간으로 들어오는 데이터를 처리하는 것을 말하는데, 스파크 스트리밍은 배치 처리를 해준다. 배치 처리란 A시간 부터 B시간까지의 처리를 한 묶음, B시간 부터 C시간까지의 처리를 한 묶음으로, 처리를 하는 것을 말한다. n초에 한 번씩 실행하는 등의 지정을 할 수 있다. 스파크 스트리밍은 특정 포트를 통해 데이터를 받아들일 수 있으며 데이터를 전부 불러와서 처리하는 것이 아닌 특정 시간 사이에 들어온 데이터를 처리하는 방식이다. Example 1. 스파크 세션을 생성한다. 2. 스트리밍 데이터프레임 생성한다. 3. 변형 작업을 한다. 4. 목적지로 출력한다. 5. 1을 더하는 출력 결과 예시는 다음과 같다. -------------------------..
2021.04.01