일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- delta lake
- 카프카
- spark streaming
- Redshift
- 데이터 웨어하우스
- Data engineering
- AWS
- 스파크
- 컬럼 기반
- 스파크 스트리밍
- 데이터 엔지니어링
- Parquet
- 데이터 엔지니어
- Data Warehouse
- 데이터
- spark
- s3
- 에어플로우
- Schema Registry
- MySQL
- docker
- kafka
- 대용량 처리
- airflow
- Data Engineer
- Zookeeper
- 델타레이크
- kafka rest api
- 레드시프트
- 카프카 구축
- Today
- Total
데이터 엔지니어 기술 블로그
[🔥Spark] Spark Streaming + Kafka Option 본문
Spark Streaming Kafka Option
Example
df = spark \
.readStream \
.format("kafka") \
.option("option name", "option value") \
.load()
startingTimestamp
구독중인 주제의 모든 파티션에 대한 시작 타임스탬프를 지정한다.
타입: timestamp string e.g. "1000"
기본값: none (next preference is startingOffsetsByTimestamp)
startingOffsetsByTimestamp
쿼리가 시작될 때 타임스탬프의 시작점
타입: json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
기본값: none (next preference is startingOffsets)
startingOffsets
쿼리가 시작될 때의 오프셋 시작점. 스트리밍 쿼리의 경우 새 쿼리가 시작될 때만 적용된다.
타입: "earliest", "latest" (streaming only), or json string """ {"topicA":{"0":23,"1":-1},"topicB":{"0":-2}} """
기본값: "latest" for streaming, "earliest" for batch
endingTimestamp
배치 쿼리 전용
마지막 타임스탬프
타입: timestamp string e.g. "1000"
기본값: none (next preference is endingOffsetsByTimestamp)
endingOffsetsByTimestamp
배치 쿼리 전용
타임스탬프의 끝점
타입: json string """ {"topicA":{"0": 1000, "1": 1000}, "topicB": {"0": 2000, "1": 2000}} """
기본값: none (next preference is endingOffsets)
endingOffsets
배치 쿼리 전용
오프셋 끝점
타입: latest or json string {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
기본값: latest
failOnDataLoss
topic이 삭제되거나 offset이 범위를 벗어나는 경우 쿼리를 실패시킬 수 있다.
예상대로 작동하지 않는 경우 비활성화할 수 있다.
타입: true or false
기본값: true
kafkaConsumer.pollTimeoutMs
카프카 데이터 폴링하는 작업의 Timeout
이 값이 정의되지 않으면 spark.network.timeout으로 대체된다.
타입: long
기본값: 120000
fetchOffset.numRetries
오프셋을 가져올 때 실패할 경우의 재시도 횟수
타입: int
기본값: 3
fetchOffset.retryIntervalMs
오프셋을 가져오기 위해 다시 시도하기 전에 대기하는 시간
타입: long
기본값: 10
maxOffsetsPerTrigger
트리거 간격당 처리되는 최대 오프셋 수
타입: long
기본값: none
minOffsetsPerTrigger
트리거 간격당 처리되는 최소 오프셋 수
타입: long
기본값: none
maxTriggerDelay
트리거가 지연될 수 있는 최대 시간
minOffsetsPerTrigger가 설정된 경우에만 적용된다.
타입: time with units
기본값: 15m
minPartitions
Kafka에서 읽을 최소 파티션 수
기본적으로 스파크 파티션과 카프카 파티션을 1:1로 매치한다.
Kafka topicPartitions 보다 큰 값으로 설정하면 Spark는 큰 카프카 파티션을 더 작게 나눈다.
타입: int
기본값: none
groupIdPrefix
소비자 그룹 식별자의 접두사이며, kafka.group.id가 사용되면 이 옵션은 무시된다.
타입: string
기본값: spark-kafka-source
kafka.group.id
카프카 소비자 그룹 식별자
타입: string
기본값: none
includeHeaders
행에 Kafka 헤더를 포함할지에 대한 여부
타입: boolean
기본값: false
startingOffsetsByTimestampStrategy
타임스탬프로 지정된 시작 오프셋이 반환된 오프셋과 일치하지 않는 경우 사용되는 전략
타입: "error" or "latest"
기본값: "error"