데이터 엔지니어 기술 블로그

[Delta Lake] 데이터 레이크하우스: 소개 및 예시 본문

데이터 엔지니어링

[Delta Lake] 데이터 레이크하우스: 소개 및 예시

jun_yeong_park 2022. 2. 7. 18:11
반응형

Delta Lake

Delta Lake는 데이터 레이크 위에 Lakehouse 아키텍처를 구축할 수 있는 오픈소스 프로젝트이다. 데이터 레이크의 문제점과 데이터 웨어하우스의 문제점을 보완해줄 수 있다. 데이터 레이크는 아주 큰 데이터를 저장할 수 있지만 체계가 정확하게 잡히지 않으면 데이터 늪이 되기가 쉽다.

S3와 같은 클라우드 스토리지는 가장 비용 효율적인 스토리지 시스템이다. 그러나 key-value로 구현이 되어있어서 ACID 트랜잭션과 같은 고성능을 구현하기는 어렵다. listing object와 같은 메타데이터 동작은 비싸며 일관성 보장은 제한적이다. 델타레이크는 이런 문제점을 보완할 수 있다. ACID 성질을 가질 수 있게 하여 트랜잭션을 구현하며, 테이블에서의 시간 여행을 가능하게 한다. upsert를 구현할 수 있고, 아주 큰 데이터에서 쿼리를 빠르게 실행할 수 있게 해준다.

제공하는 것

  • ACID 트랜잭션
  • 확장 가능한 메타데이터 처리
    • Spark 분산 처리를 활용하여 페타바이트 규모 테이블의 메타데이터를 쉽게 처리한다.
  • 스트리밍 및 통합
  • 스키마 적용
  • 시간 여행
    • Delete, Update와 같은 명령어를 사용하더라도 기존 데이터는 그대로 보존되고 삭제되고 업데이트 된 기록을 남기기 때문에 테이블 시간 여행을 가능하게 한다.
  • Upsert & Delete
    • CDC, SCD, Straeming Upsert와 같은 복잡한 기능들을 쉽게 할 수 있다.

SCD란?

데이터 웨어하우스에서 시간 경과에 따른 현재 및 과거 데이터를 모두 저장하고 관리하는 차원이다.

 

Delta Lake 버전 선택하기

Delta Lake 버전은 Spark 버전과 맞춰야하므로 현재 사용중인 스파크 버전과 맞는 델타레이크 버전을 확인한다.

 

S3와 함께 사용하기

S3와 함께 사용하기 위해서는 패키지가 필요하다.

io.delta:delta-core_<scala_version>:<delta_version>,org.apache.hadoop:hadoop-aws:<hadoop_version>

델타 테이블 코드 예시

델타 테이블 인스턴스 함수를 실행시키면 실행되는 즉시 S3에 처리되어 저장이 된다.

from delta.tables import *
from pyspark.sql.functions import *

# 델타 테이블 불러오기
deltaTable = DeltaTable.forPath(spark, "s3a://{bucket_name}/test/2/delta-table")

# 델타 테이블 UPDATE
deltaTable.update(
  condition = expr("id % 2 == 0"),
  set = { "id": expr("id + 100") })

# 델타 테이블 DELETE
deltaTable.delete(condition = expr("id % 2 == 0"))

# 델타 테이블 UPSERT 
newData = spark.range(0, 20)

deltaTable.alias("oldData") \\
  .merge(
    newData.alias("newData"),
    "oldData.id = newData.id") \\
  .whenMatchedUpdate(set = { "id": col("newData.id") }) \\
  .whenNotMatchedInsert(values = { "id": col("newData.id") }) \\
  .execute()

deltaTable.toDF().show()

구조적 스트리밍 쓰기과 읽기 예시

 

streamingDf = spark.readStream.format("rate").load()
stream = streamingDf.selectExpr("value as id").writeStream.format("delta").option("checkpointLocation", "/tmp/checkpoint").start("/tmp/delta-table")
stream2 = spark.readStream.format("delta").load("/tmp/delta-table").writeStream.format("console").start()

 

S3에 저장되는 방식

Delete시에도 파일이 제거되지 않고 파일이 추가가 되는 특징이 있다.

_delta_log/에는 한 번 커밋될 때마다 파일이 하나씩 늘어난다.

 

반응형
Comments