데이터 엔지니어 기술 블로그

[Delta Lake] 데이터 레이크하우스: 테이블 활용하기 본문

데이터 엔지니어링

[Delta Lake] 데이터 레이크하우스: 테이블 활용하기

jun_yeong_park 2022. 3. 18. 13:04
반응형

1. 테이블 읽기 및 쓰기

1.1 테이블 생성하는 방법

spark.sql

CREATE TABLE delta.`s3://bucket_name/foo/bar/table_name` (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
) USING DELTA

write

df.write.format("delta").mode("overwrite").save("s3://bucket_name/foo/bar/table_name")

1.2 시간 여행

델타레이크에서는 timestamp와 version으로 시간 여행을 할 수 있다. 사용자의 실수로 테이블의 어떤 데이터를 삭제하나 업데이트 한 경우 시간 여행으로 과거의 테이블로 돌아가 다시 쓸 수 있다.

df1 = spark.read.format("delta").option("timestampAsOf", timestamp_string).load("s3://bucket_name/foo/bar/table_name")
df2 = spark.read.format("delta").option("versionAsOf", version).load("s3://bucket_name/foo/bar/table_name")
spark.read.format("delta").load("s3://bucket_name/foo/bar/table_name@20190101000000000") # table on 2019-01-01 00:00:00.000
spark.read.format("delta").load("s3://bucket_name/foo/bar/table_name@v123")              # table on version 123

1.3 데이터 보존

델타테이블의 데이터는 자동으로 삭제되지 않는다. 삭제를 하기 위해서는 VACUUM 명령어를 실행해야 한다. 로그 파일은 체크포인트가 작성되면 자동으로 제거된다.

관련 옵션

  • delta.logRetentionDuration: 기본값 30일 / 로그를 보존할 기간을 설정한다. 이 기간에 따라서 시간 여행을 할 수 있는 기간이 다르다.
  • delta.deletedFileRetentionDuration: 기본값 7일 / VACUUM에 처리되는 파일이 되기 전에 파일이 삭제되어야 하는 기간이다.

노트

체크포인트 이전에 연속적인 로그 항목이 필요하다.

버전 0~19에 대한 로그 항목과 버전 10에 대한 체크포인트가 있는 경우

  • 버전 0 로그 항목이 정리되면 0 ~ 9는 시간여행을 할 수 없다.
  • 버전 10 ~ 19는 가능하다.

1.4 테이블에 쓰기

append mode

df.write.format("delta").mode("append").save("s3://bucket_name/foo/bar/table_name")

overwrite mode

df.write.format("delta").mode("overwrite").save("s3://bucket_name/foo/bar/table_name")

replaceWhere

df.write \\
  .format("delta") \\
  .mode("overwrite") \\
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'") \\
  .save("s3://bucket_name/foo/bar/table_name")

커밋 메시지 작성 방법

df.write.format("delta") \\
  .mode("overwrite") \\
  .option("userMetadata", "overwritten-for-fixing-incorrect-data") \\
  .save("s3://bucket_name/foo/bar/table_name")

1.5 Schema validation

Dataframe이 스키마와 호환되는지 자동으로 확인한다.

Validation

  1. 모든 DataFrame 컬럼이 테이블에 없으면 호환되지 않는다.
    • 테이블에는 있지만 DataFrame에는 없는 컬럼이 있으면 null로 설정된다.
  2. DataFrame 컬럼 데이터 타입과 대상 테이블의 컬럼 데이터 타입이 일치해야한다.
  3. 대소문자만 다르게 같은 열을 정의할 수 없다.

테이블 스키마 업데이트 방법

overwrite 모드로 덮어씌운다.

자동 스키마 업데이트 방법

write or writeStream have .option("mergeSchema", "true")

1.6 Table properties

테이블을 생성할 때나 변경할 때 TBLPROPERTIES를 사용하여 테이블 메타데이터를 설정할 수 있으며 델타테이블 메타데이터의 일부로 저장된다.

DESCRIBE DETAIL: 스키마, 테이블 크기에 대한 정보를 제공한다.

DESCRIBE HISTORY: 사용자 등의 정보와 각 쓰기 작업에 대한 기록을 제공한다.

2. 테이블 스트리밍 읽기 및 쓰기

2.1 Source

spark.readStream.format("delta").load("/delta/events")

import io.delta.implicits._
spark.readStream.delta("/delta/events")

입력 속도 제한

  • maxFilesPerTrigger: 마이크로 배치에서 고려할 새 파일의 수
  • maxBytesPerTrigger: 각 마이크로 배치에서 처리되는 데이터의 양

시작하는 곳 설정

  • startingVersion: Delta Lake 버전
  • startingTimestamp: 시작할 타임스탬프

2.2 Sink

append mode

하나씩 추가하기 위해서는 append mode를 사용한다.

events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json")
  .start("/delta/events")

complete mode

완전히 바꾸기 위해서는 complete mode를 사용한다.

spark.readStream
  .format("delta")
  .load("/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/delta/eventsByCustomer/_checkpoints/streaming-agg")
  .start("/delta/eventsByCustomer")

3. Delete, Update, Merge

3.1 Delete

델타레이크 테이블은 삭제를 하더라도 물리적으로 삭제되는 것이 아니라 삭제가 되었다고 기록을 남기기 때문에 오히려 스토리지 용량이 늘어난다.

# 삭제하는 방법 예시
from delta.tablesimport *
from pyspark.sql.functionsimport *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.delete("date < '2017-01-01'")

deltaTable.delete(col("date") < "2017-01-01")

3.2 Update

from delta.tablesimport *
from pyspark.sql.functionsimport *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.update("eventType = 'clck'", { "eventType": "'click'" } )

deltaTable.update(col("eventType") == "clck", { "eventType": lit("click") } )

3.3 Merge

델타테이블은 merge 함수를 통해 upsert를 할 수 있다.

  • whenMatchedUpdate: 컨디션에 매치될 경우 Update
  • whenNotMatchedInsert: 컨디션에 매치되지 않을 경우 Insert
from delta.tables import *

deltaTable = DeltaTable.forPath(spark, "/data/events/")

deltaTable.alias("events").merge(
    updatesDF.alias("updates"),
    "events.eventId = updates.eventId") \\
  .whenMatchedUpdate(set = { "data" : "updates.data" } ) \\
  .whenNotMatchedInsert(values =
    {
      "date": "updates.date",
      "eventId": "updates.eventId",
      "data": "updates.data"
    }
  ) \\
  .execute()

SCD 유형 2를 델타 테이블로 변경하기

SCD 유형 2는 모든 변경기록을 유지 관리하는 것을 말한다. 이 것을 델타 테이블로 만들기 위해서는 키의 이전 값을 이전 값이라고 표시하고 새로운 값을 삽입한다.

3.4 Performance tuning

3.4.1 파티션

파티션으로 나누고 제약 조건을 추가하여 검색 공간을 줄인다.

3.4.2 압축 파일

작은 파일이 여러개를 읽어야한다면 속도가 느려질 수 있으므로 더 큰 파일로 압축하여 처리량을 향상시킨다.

3.4.3 셔플 파티션 제어

데이터를 계산하기 위해 여러번 섞는데, 셔플에 사용되는 작업자 수는 spark.sql.shuffle.partitions 옵션으로 조절할 수 있다. 이 값에 따라서 파일의 수도 달라진다.

4. 테이블 관리

4.1 Delta Table Vacuum

델타레이크 테이블은 Delete로 테이블의 데이터를 제거하더라도 이전 데이터도 그대로 남아있고 삭제가 되었다는 기록이 추가가 된다. 그러므로 델타레이크 테이블의 과거 기록을 정리하기 위해서는 명시적으로 정리를 해주어야한다.

델타레이크 테이블에서 더이상 참조하지 않는 파일을 제거하기 위해서 vacuum 명령어를 사용하여 제거할 수 있다.

deltaTable.vacuum()

이 명령어를 사용하면 delta.logRetentionDuration, delta.deletedFileRetentionDuration의 설정값을 기준으로 더 이상 사용되지 않는 파일들을 정리한다.

4.2 Generate Manifest file

manifest file은 델타레이크 파일들의 메타데이터를 포함하는 파일이다.

deltaTable = DeltaTable.forPath(<path-to-delta-table>)
deltaTable.generate("symlink_format_manifest")

매니페스트 파일은 Trino(Presto), Athena 등에서 델타레이크 테이블을 읽을 때 사용한다.

4.3 제약 조건

4.3.1 NOT NULL

열의 값이 NULL이 될 수 없음을 나타낸다.

4.3.2 CHECK

값이 원하는 조건에 맞춰서 들어오는지 확인한다.

# CHECK 예시
ALTER TABLE default.people10m ADD CONSTRAINT dateWithinRange CHECK (birthDate > '1900-01-01');
반응형
Comments