일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | |||||
3 | 4 | 5 | 6 | 7 | 8 | 9 |
10 | 11 | 12 | 13 | 14 | 15 | 16 |
17 | 18 | 19 | 20 | 21 | 22 | 23 |
24 | 25 | 26 | 27 | 28 | 29 | 30 |
- AWS
- Data Engineer
- MySQL
- spark
- 데이터 엔지니어
- kafka rest api
- Schema Registry
- Data Warehouse
- 컬럼 기반
- Parquet
- spark streaming
- 카프카
- 스파크 스트리밍
- Data engineering
- delta lake
- 레드시프트
- docker
- kafka
- 데이터
- 카프카 구축
- airflow
- s3
- Redshift
- 데이터 웨어하우스
- 스파크
- 에어플로우
- 대용량 처리
- Zookeeper
- 데이터 엔지니어링
- 델타레이크
- Today
- Total
데이터 엔지니어 기술 블로그
[🔥Spark] 스파크 설정하는 방법(Spark Configuration, SparkConf) 본문
About
세 가지 위치에서 스파크 설정을 할 수 있다.
- Spark properties: SparkConf 객체 사용, Java system properties 사용
- val conf = new SparkConf() .setMaster("local[2]") .setAppName("CountingSheep") val sc = new SparkContext(conf)
- Environment variables: conf/spark-env.sh 환경 변수를 통하여 시스템별 설정 가능
# Options read when launching programs locally with # ./bin/run-example or ./bin/spark-submit # - HADOOP_CONF_DIR, to point Spark towards Hadoop configuration files # - SPARK_LOCAL_IP, to set the IP address Spark binds to on this node # - SPARK_PUBLIC_DNS, to set the public dns name of the driver program
- Logging: log4j.properties를 통하여 설정 가능
Spark Properties
Spark Properties는 어플리케이션 설정을 제어하며 각 애플리케이션마다 별도로 구성된다. SparkConf()를 사용하여 속성을 설정할 수 있다.
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("CountingSheep")
val sc = new SparkContext(conf)
동적으로 로드하기 위해서는 외부에서 런타임에 구성 값을 제공해줄 수 있으며 conf/spark-defaults.conf을 사용하여 기본값을 제공해줄 수도 있다. 외부에서 받기 위해서 내부에서는 빈 설정 값을 제공해주어야 한다.
./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar
val sc = new SparkContext(new SparkConf())
conf/spark-defaults.conf
spark.master spark://5.6.7.8:7077
spark.executor.memory 4g
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
속성의 종류 예시
Application Properties
spark.app.name
어플리케이션의 이름이며 UI에 이름이 표시된다.
spark.driver.cores
드라이버 프로세스에 사용할 코어 수
spark.driver.maxResultSize
모든 파티션의 직렬화된 결과의 총 크기 제한
만약 결과가 이 값보다 크면 작업이 중단된다.
spark.driver.memory
드라이버 프로세스에서 사용할 메모리 값
client 모드에서는 SparkConf 지점에서 이미 정해져있기 때문에 매개변수로 옵션을 주어야 한다.
spark.executor.memory
스파크 실행기 프로세스당 사용할 메모리
spark.local.dir
디스크에 저장되는 맵 출력 파일, RDD와 같은 데이터를 저장할 공간
spark.driver.supervise
이 값이 true인 경우 종료 코드가 0이 아니면 드라이버를 자동으로 재시작하며 spark standalone cluster 또는 Mesos 클러스터 배포 모드에만 적용된다.
Runtime Environment
spark.driver.extraClassPath
추가 클래스 경로
client 모드일 경우 외부에서 전달해주어야한다.
spark.driver.defaultJavaOptions
default JVM option to pass to the driver
spark.driver.extraLibraryPath
extra JVM option to pass to the driver
spark.executor.extraClassPath
이전 스파크 버전의 호환성을 위해 사용되며 일반적으로 사용되지 않는다.
spark.executor.logs.rolling.strategy
Executor 로그 롤링 전략
spark.python.worker.memory
python 작업자 프로세스 당 사용할 메모리 양
이 값을 초과하면 디스크에 기록된다.
spark.pyspark.python
driver, executor 모두에서 사용할 python 바이너리 실행 파일
Shuffle
spark.shuffle.compress
맵 출력 파일을 압축할지에 대한 여부
spark.shuffle.file.buffer
셔플 파일 출력 스트림 메모리 내 버퍼 크기
Spark UI
spark.eventLog.compress
기록된 이벤트를 압축할지에 대한 여부
spark.eventLog.compression.codec
기본값: zstd
lz4, lzf, snappy, zstd를 사용할 수 있다.
spark.eventLog.overwrite
기존 파일을 덮어쓸지 여부
압축 & 직렬화
spark.checkpoint.compress
RDD 체크포인트를 압축할지에 대한 여부
spark.rdd.compress
직렬화된 RDD 파티션을 압축할지에 대한 여부
메모리 관리
spark.memory.storageFraction
제거에 영향을 받지 않는 스토리지 메모리의 양
Execution Behavior
spark.executor.cores
실행기에서 사용할 코어 수
spark.broadcast.checksum
브로드캐스트 체크섬 활성화 여부
spark.default.parallelism
변환에 의해 반환된 RDD의 기본 파티션 수
Networking
spark.network.timeout
모든 네트워크 상호 작용에 대한 기본 Timeout
spark.port.maxRetries
포트에 바인딩 할 때의 최대 retry 수
Scheduling
spark.excludeOnFailure.enabled
true로 설정 시 너무 많은 작업 실패로 인해 제외된 실행기는 spark가 작업을 예약하지 못하게 한다.
spark.excludeOnFailure.timeout
제외 목록에서 제거되기 전 응용 프로그램에 대해 노드 또는 Executor가 제외되는 시간
spark.task.maxFailures
작업을 실패하기 전에 특정 작업이 실패한 횟수
Dynamic Allocation
spark.dynamicAllocation.enabled
workload에 따라 동적으로 실행기 수를 늘이거나 줄인다.
spark.dynamicAllocation.maxExecutors
동적 할당이 활성화된 경우 실행기 수의 제한
SQL (Runtime)
spark.sql.shuffle.partitions
셔플 기본 파티션 수
spark.sql.streaming.checkpointLocation
스트리밍 쿼리에 대한 체크포인트 기본 저장 위치
Spark Streaming
spark.streaming.blockInterval
데이터를 스파크에 저장하기 전에 데이터 블록으로 청크하는 간격
기본값: 200ms
spark.streaming.unpersist
RDD가 메모리에서 자동으로 지속 해제되도록 한다.
만약 이 값을 false로 두면 스트리밍 앱 외부에서 접근할 수 있으나 많은 메모리 사용이 될 수 있다.