[PySpark] Spark 기초적인 활용 (1)
안녕하세요! 이번 포스트는 PySpark에 대해서 어떻게 session을 생성하고 기초적인 작업을 해보려고 합니다.
데이터처리 할 때 csv파일이 64GB가 넘어가는 경우가 종종 있어서, 제 컴퓨터의 메모라가 한계에 도달하는 경우가 많습니다.
PySpark를 사용하면 단일 노드의 컴퓨터에서도 잘 작동하며 빠르게 읽고 원하는 작업을 수행하기 좋습니다.
1. PySpark session 생성
PySpark를 사용하기전 제일 먼저 해야할 일은 SparkSession을 만드는 것이다.
# 기타 라이브러리 import
import pyspark
import pandas as pd
import time
# session 생성
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('MyFirstSpark').getOrCreate()
만들어진 session 정보를 확인해보자. 생성된 세션을 출력해보면 세션정보가 나오고
나는 단일 클러스터에서 실행하였기 때문에 Master만 나온다.
만약 클러스터 환경에서 해당 session을 확인했을 경우, 여러대의 클러스터가 확인될 것이다.
# session 정보 확인
spark
spark session 결과 ⬇️
2. data set 읽기
데이터 셋을 읽는 방법은 아래와 같다.
이때 header를 True로 설정하면 원래컬럼명을 반영하고
inferSchema를 True로 성정하여 모든 컬럼의 데이터타입이 원래 타입을 갖도록하자. (원래 String만 갖게되는데 이렇게 True로 설정하면 로드시간이 조금 더 걸린다)
# read the dataset
'''
header를 True하면 원래컬럼명을 반영한다.
inferSchema를 True로 설정하여 모든 데이터타입이 String이 안되도록 하자.'''
df_pyspark=spark.read.csv('./202101.csv'
,header=True
,inferSchema=True, encoding='euc-kr')
df_pyspark.show(n=3)
df_pyspark 결과 ⬇️
3. 데이터프레임 확인
3.1. type 확인
데이터를 프레임형태로 정상적으로 읽었으니, type을 확인해보자.
다음과 같이 입력하면 “pyspark.sql.dataframe.DataFrame”라고 나온다 이는 판다스 데이터프레임 type의 “pandas.core.frame.DataFrame”과 다르다.
이 타입의 프레임에선 SQL을 사용하거나 기존의 판다스 문법에서 살짝(?) 다른 형식이 요구된다.
type(df_pyspark)
3.2. 데이터프레임 정보확인
판다스의 info()와 비슷한 함수를 찾아보았으나 찾질 못하였다. 😥 컬럼 명, 컬럼 타입, 컬럼 별 눌 개수 정보를 한 눈에 알아보고 싶었기 떄문이다.
그래서 불편하지만 다음과 같이 확인하였다!
# check the schema
df_pyspark.printSchema()
Schema 확인 결과 ⬇️
df_pyspark.summary().show(n=1)
컬럼별 nonnull 확인 결과 ⬇️
4. index 추가
난 PySpark에 날짜 순으로 index 컬럼을 만들어 보고 싶었다.
코드는 다음과 같다.
# Returns a sequential number starting from 1 within a window partition
from pyspark.sql.functions import row_number
# row 간 연산을 처리하기 위한 도구
from pyspark.sql import Window
olderByDay = Window.orderBy('일자')
df_pyspark = df_pyspark.withColumn('index', row_number().over(olderByDay)-1)
df_pyspark.show(n=5)
index 컬럼 추가 결과 (맨 마지막 컬럼) ⬇️
5. 첫 번째 행과 마지막 행 index 얻기
첫 번째 행 얻기
from pyspark.sql.functions import max as max_, min as min_, col
first = df_pyspark.select(min_("index")).first()[0]
또는
first = df_pyspark.head(1)[0][-1]
first
ouput:
0
마지막 행 얻기
last = df_pyspark.select(max_("index")).first()[0]
또는
last = df_pyspark.tail(1)[0][-1]
last
ouput:
178359
댓글남기기