일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 데이터 웨어하우스
- docker
- AWS
- spark streaming
- 레드시프트
- kafka
- 데이터 엔지니어링
- s3
- 데이터 엔지니어
- 컬럼 기반
- airflow
- MySQL
- Data engineering
- 스파크 스트리밍
- Schema Registry
- 델타레이크
- Data Engineer
- 스파크
- Parquet
- 에어플로우
- kafka rest api
- 카프카
- spark
- delta lake
- 데이터
- 카프카 구축
- 대용량 처리
- Redshift
- Zookeeper
- Data Warehouse
- Today
- Total
목록데이터 엔지니어링 (46)
데이터 엔지니어 기술 블로그
About 세 가지 위치에서 스파크 설정을 할 수 있다. Spark properties: SparkConf 객체 사용, Java system properties 사용 val conf = new SparkConf() .setMaster("local[2]") .setAppName("CountingSheep") val sc = new SparkContext(conf) Environment variables: conf/spark-env.sh 환경 변수를 통하여 시스템별 설정 가능 # Options read when launching programs locally with # ./bin/run-example or ./bin/spark-submit # - HADOOP_CONF_DIR, to point Spark t..
개요 카프카를 사용하는 경우 데이터를 가공해서 다시 카프카로 넣거나 다른 곳으로 보내는 등의 처리를 해줄 곳이 반드시 필요하다. Spark Streaming을 사용하면 문제를 쉽게 해결할 수 있다. 자체적으로 kafka에서 읽고 처리 후 kafka로 보내는 기능이 포함되어 있으며 Spark에서는 1.2버전부터 파이썬에서 Spark Streaming을 사용할 수 있게 되었다. Spark Streaming에는 DStreams라는 기능이 있고, 그 위에 DataFrame을 사용하여 더 쉽게 처리를 할 수 있는 Structed Streaming이 있다. 여기에서는 Structed Streaming을 사용하려고 한다. 방법 spark.readStream을 사용하여 카프카의 어떤 토픽에서 데이터를 가져올지 정한다...
개요 스파크를 실행하려고 할 때 권한 오류가 발생할 수 있다. 이 경우에는 AWS EMR의 Zeppelin에서 스크립트를 실행하는데에 오류가 발생했다. org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=WRITE, inode="/":hdfs:hdfsadmingroup:drwxr-xr-x 해결방법 권한을 검사하지 않게 하는 방법도 있지만, 간단한 해결 방법은 유저에게 권한을 주면 해결할 수 있다. dfs.permissions.superusergroup 을 보면 어떤 그룹이 superusergroup인지 확인할 수 있다. >> cat /etc/hadoop/conf/hdfs-site.xml ... dfs..
개요 에어플로우를 LocalExecutor를 사용해서 하나의 인스턴스에 실행시키고 있었다. 그러나 인스턴스에 문제가 생기면 Airflow도 동작하지 않을 수 있는 문제가 있고, 인프라가 커졌을 때 확장이 힘들어서 Worker를 분리해야했다. airflow에서 공식적으로 제공하는 docker-compose에서는 celery를 사용하며, 보통 airflow를 구축할 때 celery를 사용하는 것 같다. 이렇게 클러스터로 구축을 하게 되면 리소스가 필요할 때 worker의 갯수만 늘리면 되고, 리소스가 필요 없을 때 불필요하게 낭비하지 않고 worker의 갯수를 줄이면 되기 때문에 좋다. 구축할 때는 AWS ECS, AWS EFS 환경에서 구축했다. 구축 과정 Dask Cluster를 활용하여 구축 시도 가장..
개요 카프카를 구축할 때 Confluent에서 제공해주는 서비스가 정말 유용한데, 일부는 무료이고 일부는 사용기간 이후에는 라이센스가 필요한 경우가 있다. 서비스마다 다르므로 잘 확인하고 사용해야한다. 라이센스 정보 Enterprise (Subscription) License Confluent Server Confluent Control Center Confluent for Kubernetes Confluent Replicator MQTT Proxy Community License Confluent REST Proxy ksqlDB Confluent Schema Registry Confluent Admin REST API Connector의 경우 커넥터는 종류에 따라 라이센스가 다르다. 이 링크에서 확인할..
개요 Redshift는 데이터 웨어하우스를 만들 수 있게 도와준다. AWS에서 제공하는 서비스이며, 다양한 소스와 쉽게 결합할 수 있는 기능이 있는데, 연합 쿼리라고 하며 하나의 쿼리 안에 S3, Athena, RDS, Redshift에 있는 데이터를 이용할 수 있다. 레드시프트는 비용이 비싸서 모든 데이터를 여기에 넣을 수 없어서 큰 데이터들은 S3에 넣는 경우가 많으며, 서비스는 RDS를 사용하는데 이 기능을 사용하면 쉽게 결합할 수 있다. S3에서 데이터를 가져올 경우 Redshift 인스턴스는 사용하지 않고 독립적인 인스턴스를 사용하는 Redshift Spectrum을 이용하게 되는데 비용은 1TB에 5$정도 된다. 사용 방법: https://docs.aws.amazon.com/redshift/l..
레드시프트란? AWS에서 제공하고, AWS와 쉽게 통합할 수 있으며 컬럼 기반, PostgreSQL을 기반으로 만들어져 있다. 리더 노드와 실제로 쿼리를 처리하는 컴퓨터 노드가 있는 클러스터로 구성되어 있어서 쉽게 확장할 수 있다. 클라우드 데이터 웨어하우스를 만들 때 사용한다. 레드시프트 기능 정렬 키(SORT KEY) 정렬키에 따라 순서대로 데이터베이스에 저장한다. 자동 테이블 최적화 작업을 하면 정렬키를 선택할 필요가 없다. AUTO 키워드를 사용하여 자동으로 정렬키를 선택할 수 있다. 자주 사용되거나 조인되는 경우 사용하면 된다. 분산 테이블을 생성할 때 4가지(AUTO, EVEN, KEY, ALL)중 하나를 선택할 수 있다. 분산키를 선택하지 않으면 AUTO를 사용한다. 기본 키 및 외래 키 제..
개요 파이써 패키지를 만들어서 사용해야하는데, public pypi 에 올릴 수 없는 경우 private PyPI 서버를 만들어야한다. 서버를 띄워둘 때 새로운 EBS를 연결하지 않은 EC2나 EFS를 연결하지 않은 ECS 등의 상태가 유지되지 않을 수 있는 환경에서 띄우기엔 애매할 때 S3를 백엔드로 사용할 수 있다. 내용 S3를 백엔드로 사용하려면 pypicloud라는 라이브러리를 사용하면 된다. S3외에도 GCS, Blob Storage를 사용할 수 있다. 공식 페이지 https://pypi.org/project/pypicloud/ Getting Started https://pypicloud.readthedocs.io/en/latest/topics/getting_started.html#uploadi..
개요 카프카에서는 토픽을 많이 생성해서 사용하는데 Naming Convention 없이 사용하게 된다면 나중에 복잡해질수도 있기 때문에 알아보려고 한다. Convention 카프카에서 토픽을 생성할 때 유효한 문자는 [영문, 숫자, '.', '_', '-']만 사용할 수 있다. 그리고 유의할 점은 마침표(.)와 밑줄(_)은 충돌할 수 있기 때문에 둘 중 하나만 사용하는 것이 좋다. 선택할 수 있는 예시들은 다음과 같다. .. -- ... .. .. ... .. .. 이 게시물을 작성한 사람은 이 데이터 타입으로 성공했다고 한다. 이 방식으로 사용하면 인프라가 어느정도 커져도 커버할 수 있을 것으로 예상된다. message type logging queuing tracking etl/db streaming..
개요 데이터를 수집하고 가공하고 제공을 하기 위해서 보통 아주 많은 양의 데이터들을 다루게 된다. 파일을 저장할 때 압축을 하지 않는다면 파일의 크기가 커지는 동시에 비용도 증가하게 된다. 이것을 줄이기 위해서 보통은 압축을 해서 저장을 하게 된다. 저장하는 방식에는 여러가지가 있는데 이번에는 각 압축 방식마다 어떤 특징이 있는지 알아보려고 한다. 압축 방식들 gzip GNU zip의 줄임말이며 초기 유닉스 시스템에 쓰이던 압축 프로그램을 대체하기 위해 만들어졌다. 1992년 10월 31일에 처음 공개되었다. 무손실 압축 데이터 알고리즘(DEFLATE)을 사용한다. 파일 형식 중에 하나인 타르(tar)와 함께 .tar.gz 형식으로 자주 사용된다. 이 경우 다른 파일끼리 중복되는 부분을 압축시킬 수 있다..