1 분 소요

안녕하세요! 이번 포스트는 PySpark에 대해서 어떻게 session을 생성하고 기초적인 작업을 해보려고 합니다.

데이터처리 할 때 csv파일이 64GB가 넘어가는 경우가 종종 있어서, 제 컴퓨터의 메모라가 한계에 도달하는 경우가 많습니다.

PySpark를 사용하면 단일 노드의 컴퓨터에서도 잘 작동하며 빠르게 읽고 원하는 작업을 수행하기 좋습니다.

1. PySpark session 생성

PySpark를 사용하기전 제일 먼저 해야할 일은 SparkSession을 만드는 것이다.

# 기타 라이브러리 import
import pyspark
import pandas as pd
import time

# session 생성
from pyspark.sql import SparkSession

spark=SparkSession.builder.appName('MyFirstSpark').getOrCreate()

만들어진 session 정보를 확인해보자. 생성된 세션을 출력해보면 세션정보가 나오고

나는 단일 클러스터에서 실행하였기 때문에 Master만 나온다.

만약 클러스터 환경에서 해당 session을 확인했을 경우, 여러대의 클러스터가 확인될 것이다.

# session 정보 확인
spark

spark session 결과 ⬇️

image

2. data set 읽기

데이터 셋을 읽는 방법은 아래와 같다.

이때 header를 True로 설정하면 원래컬럼명을 반영하고

inferSchema를 True로 성정하여 모든 컬럼의 데이터타입이 원래 타입을 갖도록하자. (원래 String만 갖게되는데 이렇게 True로 설정하면 로드시간이 조금 더 걸린다)

# read the dataset
'''
header를 True하면 원래컬럼명을 반영한다.
inferSchema를 True로 설정하여 모든 데이터타입이 String이 안되도록 하자.'''
df_pyspark=spark.read.csv('./202101.csv'
                          ,header=True
                          ,inferSchema=True, encoding='euc-kr')
df_pyspark.show(n=3)

df_pyspark 결과 ⬇️

image

3. 데이터프레임 확인

3.1. type 확인

데이터를 프레임형태로 정상적으로 읽었으니, type을 확인해보자.

다음과 같이 입력하면 “pyspark.sql.dataframe.DataFrame”라고 나온다 이는 판다스 데이터프레임 type의 “pandas.core.frame.DataFrame”과 다르다.

이 타입의 프레임에선 SQL을 사용하거나 기존의 판다스 문법에서 살짝(?) 다른 형식이 요구된다.

type(df_pyspark)

3.2. 데이터프레임 정보확인

판다스의 info()와 비슷한 함수를 찾아보았으나 찾질 못하였다. 😥 컬럼 명, 컬럼 타입, 컬럼 별 눌 개수 정보를 한 눈에 알아보고 싶었기 떄문이다.

그래서 불편하지만 다음과 같이 확인하였다!

# check the schema
df_pyspark.printSchema()

Schema 확인 결과 ⬇️

image

df_pyspark.summary().show(n=1)

컬럼별 nonnull 확인 결과 ⬇️ image

4. index 추가

난 PySpark에 날짜 순으로 index 컬럼을 만들어 보고 싶었다.

코드는 다음과 같다.

# Returns a sequential number starting from 1 within a window partition
from pyspark.sql.functions import row_number
# row 간 연산을 처리하기 위한 도구
from pyspark.sql import Window

olderByDay = Window.orderBy('일자')
df_pyspark = df_pyspark.withColumn('index', row_number().over(olderByDay)-1)
df_pyspark.show(n=5)

index 컬럼 추가 결과 (맨 마지막 컬럼) ⬇️

image

5. 첫 번째 행과 마지막 행 index 얻기

첫 번째 행 얻기

from pyspark.sql.functions import max as max_, min as min_, col

first = df_pyspark.select(min_("index")).first()[0]
또는
first = df_pyspark.head(1)[0][-1]

first

ouput:

0

마지막 행 얻기

last = df_pyspark.select(max_("index")).first()[0]
또는
last = df_pyspark.tail(1)[0][-1]

last

ouput:

178359

댓글남기기