데이터 엔지니어 기술 블로그

[🔥Spark] 스파크로 머신러닝 시작하기 본문

데이터 엔지니어링

[🔥Spark] 스파크로 머신러닝 시작하기

jun_yeong_park 2021. 4. 10. 12:37
반응형

스파크 기본

파이썬에서 스파크 사용하기

SparkContext의 인스턴스를 만들면 스파크 클러스터에 연결해서 사용할 수 있게 해준다.

SparkConf로 스파크에 대한 구성을 할 수 있다.

sc = SparkContext.getOrCreate()

# Verify SparkContext
print(sc)

# Print Spark version
print(sc.version)

 

데이터프레임 사용하기

스파크의 코어 데이터 구조는 RDD라고 하는데 Resilient Distributed Dataset(탄력성있는 분산된 데이터셋) 이라는 뜻이다.

RDD는 낮은 레벨에 있어서 사용하기 어렵기 때문에 Spark DataFrame 이라는 더 높은 레벨의 데이터프레임을 사용하는 것이 편하다.

Spark DataFrames를 사용하는 것이 자동으로 최적화하는데 도움이 된다.

 

스파크 세션 생성하기

SparkSession은 Spark 2.0 이상부터 사용할 수 있다. Spark DataFrame을 사용할 수 있고, 기존에 SparkContext에서 사용하던 기능들도 사용할 수 있다.

기존에는 SparkContext를 사용하였지만, 이후부터는 SparkSession을 사용하려고 한다. 멀티 세션 상황에서 Spark Context의 충돌을 방지하고 하나를 공유해서 사용할 수 있게 해준다.

동일한 jvm에 여러개의 스파크 컨텍스트를 갖는 것은 충돌을 일으킬 수 있기 때문에 권장되지 않는다.

from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

print(spark)

 

테이블 보기

SparkSession은 catalog라는 속성을 가지고 있다. 여기에는 몇 가지 사용할 수 있는 기능들이 있다.

.listTables() 를 사용하면 클러스터에 있는 테이블 리스트를 볼 수 있다.

spark.catalog.listTables()
# 결과
# [Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

 

스파크에서 sql 사용하기

DataFrame을 사용할 때 최근에 사용하는 방법으로 SQL을 사용할 수도 있다.

SparkSession의 .sql() 을 사용하면 된다.

query = 'SELECT * FROM flights LIMIT 10'

flights = spark.sql(query)
flights.show()

# 테이블 정보가 결과로 보여진다.

 

스파크 데이터프레임을 판다스 데이터프레임으로 변환하기

 

때로는 스파크로 작업하는 것보다 Pandas를 사용하여 작업하는 것이 효율적인 경우가 있다.

SparkSession의 DataFrame을 .to_pandas() 라는 함수를 사용하여 Pandas Dataframe으로 변환할 수 있다.

pd_counts = flight_counts.toPandas()

 

 

판다스 데이터프레임을 스파크 데이터프레임으로 변환하기

 

pandas 데이터를 Spark DataFrame으로 변환할 수도 있다.

SparkSession의 .createDataFrame() 함수를 사용하면 된다. 하지만 이런 방식으로 만든 데이터프레임은 로컬로밖에 사용할 수 없어서 다른 컨텍스트에서는 접근을 할 수가 없다.

.createOrReplaceTempView() 를 사용하면 현재 세션에서 사용할 테이블을 스파크 카탈로그에 등록할 수 있다.

spark_temp = spark.createDataFrame(pd_dataframe)

spark_temp.createOrReplaceTempView('temp')

spark.sql('select * from temp limit 10')

 

 

csv로 데이터프레임 만들기

 

스파크 데이터프레임을 만들 때 다양한 소스로 만들 수 있는다.

csv 통해서도 만들 수 있는데 아래와 같은 방식으로 만들 수 있다.

filepath = '/foo/bar/airports.csv'

airports = spark.read.csv(file_path, header=True)

airports.show()

 

컬럼 생성하기

컬럼을 새로 생성하려고 할 때 .withColumn 함수를 사용할 수 있다.

df = df.withColumn('your_column_name', df.id + 10)

 

데이터 조작

데이터 필터링(where)

SQL에서 데이터를 필터링하기 위해 WHERE 문을 사용했었다. 스파크에서는 .filter() 를 사용하여 필터링을 할 수 있는데 사용 방법은 string으로 컬럼과 비교값 등을 전달해서 filter 함수에서 파싱해서 처리하는 것과 컬럼을 비교시킨다는 의미의 값을 전달하는 것이다.

df1 = df.filter('id > 1000')
df2 = df.filter(df.id > 1000)
# df.id > 1000의 결과값
Column<b'(id > 1000)')>

데이터 선택(select)

Spark DataFrame에서 데이터를 가져오려면 select() 함수를 사용하면 된다.

df1 = df.select('id', 'name')

아래와 같은 방식으로도 사용이 가능하다.

new_id = (df.id / 60).alias('new_id')
df1 = df.select('id', new_id)

df2 = df.selectExpr('id', '(id / 60) as new_id')

Aggregating

최댓값, 최솟값 등을 구하는 어그리게이션을 구하는 것도 간단하게 가능하다. .min(), .max() 등의 함수를 사용하려면 .groupBy() 를 통해 <pyspark.sql.group.GroupedData> 를 가져오면 어그리게이션 함수들을 사용할 수 있다. groupBy의 매개변수를 빈 값으로 두어도 사용할 수 있다.

df.groupBy().min('id').show()
df.groupBy().min('id').show()
df.groupby('class').count()

위에서 사용했던 방법 외에 다른 방법으로도 할 수 있다. pyspark.sql.functions 를 이용하는 방법이다. 여기에는 표준 편차를 구하는 것과 같은 다양한 어그리게이션 방법들이 존재한다.

import pyspark.sql.functions as F

df.agg(F.stddev('number')).show()

 

스파크로 머신러닝 파이프라인 만들기

조인

pyspark에서도 물론 조인을 할 수 있다. 데이터 프레임은 .join() 함수를 가지고 있는데 이 함수는 세가지 매개변수를 받는다. 데이터 프레임과 결합할 다른 데이터프레임을 첫 번째 매개변수로 두고, 어디에서 조인할지를 결정하는 on 매개변수가 두번째이고, 방법을 결정하는 how 매개변수가 세번째이다.

df.join(df2, on='id', how='leftouter')

컬럼명 변경하기

컬럼의 이름을 변경해야 할 때가 있다. 그럴때는 DataFrame이 가지고 있는 .withColumnRenamed 함수를 사용하면 된다. 사용 방법은 다음과 같다.

df.withColumnRenamed('name', 'nickname')

데이터 타입 변경하기

지금까지는 숫자 데이터만 다루어봤는데 스파크 데이터프레임은 여러가지 데이터 타입을 다룰 수 있다. 데이터프레임에서 import를 할 때 스파크는 데이터를 보고 어떤 타입인지 추측을 한다. 만약 데이터타입을 제대로 추출해내지 못했을 때에는 .cast() 함수를 사용해서 데이터 타입을 변경해주면 된다. 이 함수는 .withColumn() 함수와 같이 사용된다.

df = df.withColumn('your_column_name', df.your_column_name.cast('integer'))

Boolean 형식 만들기

Boolean 형식으로 바꾸기 위해서 다음과 같이 사용할 수 있다.

model_data = df.withColumn('ls_late', model_data.arr_deply > 0)

model_data = model_data.withColumn('label', model_data.is_late.cast('integer'))

model_data = model_data.filter('arr_delay is not NULL')

 

파이프라인(Pipeline) 생성하기

pyspark에서 머신러닝을 할 때 pyspark.ml 을 사용하면 된다.

pyspark.ml.feature.StringIndexer

  • 이 클래스는 string 값을 각자의 숫자로 바꾸어준다.
  • 예를 들면 very good, good, bad, so bad 와 같은 단어들을 각자 1, 2, 3, 4 로 바꿔주는 것이다. 이렇게 바꾸어주는 이유는 머신러닝을 할 때 숫자로 변환을 해야 빠르게 인식을 할 수 있기 때문이다.
carr_indexer = StringIndexer(inputCol='carrier', outputcol='carrier_index')

pyspark.ml.feature.OneHotEncoder

  • 이 클래스는 0 또는 1을 사용한 배열로 바꾸어준다.
  • 예를 들면 2는 [0, 0, 1, 0] 0은 [1, 0, 0, 0] 4는 [0, 0, 0, 0]으로 매핑이 된다.
carr_encoder = OneHotEncoder(inputCol='carrier_index', outputCol="carrier_fact")

VectorAssembler은 여러 열을 하나의 벡터 열로 병합하는 변환기이다.

vec_assembler = VectorAssembler(inputCols=['month', 'air_time'], outputCol='features')

머신러닝을 하기 위해 파이프라인을 만드려면 위에서 만들었던 것을 이용해서 다음과 같이 만들면 된다.

from pyspark.ml import Pipeline

pipe = Pipeline(stages=[carr_indexer, carr_encoder, vec_assembler])

 

데이터 준비하기

 

생성한 파이프라인에 학습시키기 위한 데이터를 준비하기 위해 .fit().transform() 함수를 사용할 수 있다.

piped_data = pipe.fit(model_data).transform(model_data)
piped_data.show()

train set, test set 분리하기

데이터를 학습하고 테스트하기 위해 train set과 test set으로 분리를 하려면 .randomSplit() 함수를 사용하면 된다.

training, test = piped_data.randomSplit([0.6, 0.4])

 

모델 튜닝 및 선택하기

로지스틱 회귀란?

선형 회귀는 종속 변수 y와 한개 이상의 독립변수 X의 선형 상관관계를 모델링하는 회귀분석 기법이다.

독립변수 1개와 종속변수 1개일 때 다음과 같이 선형 회귀를 할 수 있다.

출처: wikipedia

로지스틱 회귀는 선형회귀와 비슷하다. 다른 점은 종속 변수가 범주형 데이터(혈액형과 같이 몇 개의 범주로 나누어진 데이터)를 대상으로 하여 특정 분류로 나누어서 분류 기법이라고 볼 수 있다.

로지스틱 회귀는 어떤 사건이 발생할지에 대한 직접 예측이 아니라 사건이 발생할 확률을 예측한다. 예를 들면 암인지 아닌지에 대한 확률을 계산할 수 있다. 그래서 결과값은 숫자 변수가 아니라 0~1 사이의 값이 나온다.

Hyper Parameter는 모델의 특정 속성값을 조율할 수 있게 해주는 값이다. 이 값을 통해 모델 성능을 향상시킬 수 있다.

 

모델 생성하기

LogisticRegression 모델은 pyspark.ml.classification 안에 있다.

from pyspark.ml.classification import LogsitcRegression

lr = LogisticRegression()

 

교차 검증

로지스틱 회귀 모델을 조정하기 위해서 k-fold cross validation을 사용한다. 이 방법은 test 데이터프레임과 같은 보이지 않는 데이터에 대한 모델의 성능을 추정하는 방법이다.

 

evaluator 생성하기

교차 검증을 위해 필요한 것은 다른 모델을 비교하는 것이다. pyspark.ml.evaluation 서브 모듈에는 다양한 종류의 모델을 평가하기 위한 것들이 있다.

import pyspark.ml.evaluation as evals
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

 

그리드 생성하기

최적의 하이퍼 파라미터를 찾기 위해 그리드를 만든다. .addGrid.build() 메서드를 사용하여 그리드를 만들 수 있다.

import pyspark.ml.tuning as tune
grid = tune.ParamGridBuilder()
# lr = logstic regression
grid = grid.addGrid(lr.regParam, np.arange(0, .1, .01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])
grid = grid.build()

 

validator 생성하기

pyspark.ml.tuning에는 CrossValidator라는 클래스가 있다. 이 것으로 교차 검증을 할 수 있다.

# lr => logstic regression
# evaluator => BinaryClassificationEvaluator
cv = tune.CrossValidator(estimator=lr,
               estimatorParamMaps=grid,
               evaluator=evaluator)

 

모델 피팅하기

# CrossValidator를 통해 좋은 모델을 가져온다.
models = cv.fit(training)
best_lr = models.bestModel

 

모델 평가하기

 

ROC

  • TN(예측 0, 실제 0), TP(예측: 1, 실제: 1), FN(예측: 0, 실제: 1), FP(예측: 1, 실제: 0)
    • True, False, Positive, Negative의 약자이다.
    • 예측값이 맞으면 True이다.
  • FN, FP가 많을 수록 직선에 가까워지는 그래프이다.

AUC

  • 1에 가까울수록 좋은 모델이다.
  • Area Under the Curve
  • ROC curve의 아래쪽 면적을 말한다.
  • 아래쪽 면적이 적을수록 많이 틀렸다는 것이다.
test_results = best_lr.transform(test)

# AUC를 계산해서 돌려준다.
print(evaluator.evaluate(test_results))

 

머신러닝 파이프라인 요약

 

반응형
Comments