분할되어 있는 여러 Parquet 파일을 읽어서 처리해야하는 경우, Polars, Dask, DuckDB를 이용할 수 있다.

Polars는 Apache Arrow 형식을 사용하고 Rust 언어로 구현된 데이터 프레임 라이브러리이다. Apache Arrow 형식은 Pandas를 개발한 Wes McKinney가 참여하고 있으며, 더 자세한 내용은 Apache Arrow and the Future of Data Frames with Wes McKinney에서 확인할 수 있다.

Dask는 스케일 확장에 최적화된 Python 라이브러리이다. High Performance Computing (HPC) 환경 뿐 아니라 단일 컴퓨터에서도 사용할 수 있다. 특히, 단일 컴퓨터에서 사용할 때에도 쉽게 설치할 수 있어 로컬 환경에서 테스트한 후 HPC 환경에서 사용할 수 있다.

DuckDB는 C++로 구현된 embeddable SQL OLAP database이다. 손쉽게 설치할 수 있으며 Python에서 사용할 수 있다.

예제에서 사용하는 데이터와 소스코드는 https://github.com/hyounggyu/hello_data 에서 확인할 수 있으며 Jupyter notebook은 여기이다.

데이터는 S&P 500 기업 중 25개 기업을 임의로 뽑아 2023년 1월 2일부터 2023년 12월 22일까지의 종가를 2023년 1월 2일 기준으로 상대적인 수치로 변환하였다. 아래 예,

┌────────────┬─────────┬────────────────────┐
│    date    │ ticker  │       close        │
│    date    │ varchar │       double       │
├────────────┼─────────┼────────────────────┤
│ 2023-01-03 │ a       │                1.0 │
│ 2023-01-04 │ a       │ 1.0108637696614236 │
│ 2023-01-05 │ a       │ 1.0137963209810719 │
│ 2023-01-06 │ a       │ 0.9842042122100773 │
│ 2023-01-09 │ a       │   0.98287123433751 │
└────────────┴─────────┴────────────────────┘

데이터 쿼리하기

Polars는 다음과 같이 사용할 수 있다.

pl_df = pl.read_parquet("./data/parquet/*.parquet")
pl_df.filter((pl.col("date") == pl.lit(date(2023, 12, 22))) & (pl.col("close") >= 1.5))

결과는 다음과 같다.

shape: (2, 3)
┌────────────┬────────┬──────────┐
│ date       ┆ ticker ┆ close    │
│ ---        ┆ ---    ┆ ---      │
│ date       ┆ str    ┆ f64      │
╞════════════╪════════╪══════════╡
│ 2023-12-22 ┆ intu   ┆ 1.595353 │
│ 2023-12-22 ┆ phm    ┆ 2.2171   │
└────────────┴────────┴──────────┘

scan_parquet을 활용하여 lazy evaluation을 할 수 있다.

pl_lazy_df = pl.scan_parquet("./data/parquet/*.parquet")
pl_lazy_df.filter((pl.col("date") == pl.lit(date(2023, 12, 22))) & (pl.col("close") >= 1.5)).collect()

Dask는 다음과 같이 사용할 수 있다.

dd_df = dd.read_parquet("./data/parquet/*.parquet")
dd_df[(dd_df["date"] == date(2023, 12, 22)) & (dd_df["close"] >= 1.5)].compute()

결과는 다음과 같다.

           date ticker     close
245  2023-12-22   intu  1.595353
245  2023-12-22    phm  2.217100

DuckDB는 다음과 같이 사용할 수 있다.

TABLE = "./data/parquet/*.parquet"
duckdb.query(f"""SELECT * FROM '{TABLE}' WHERE date = '2023-12-22' AND close >= 1.5""")

결과는 다음과 같다.

┌────────────┬─────────┬────────────────────┐
│    date    │ ticker  │       close        │
│    date    │ varchar │       double       │
├────────────┼─────────┼────────────────────┤
│ 2023-12-22 │ phm     │  2.217099567099567 │
│ 2023-12-22 │ intu    │ 1.5953525231351298 │
└────────────┴─────────┴────────────────────┘

Pivot과 Aggregate

위 데이터를 이용하여 pivot table을 만들어보자. 특히, closes 컬럼에는 list 형태로 모든 종목의 상대적인 수치가 들어있다.

┌─────────┬────────────┬────────────────────┬───┬────────────────────┬────────────────────┬──────────────────────┐
│ ticker  │ 2023-01-03 │     2023-01-04     │ … │     2023-12-21     │     2023-12-22     │        closes        │
│ varchar │   double   │       double       │   │       double       │       double       │       double[]       │
├─────────┼────────────┼────────────────────┼───┼────────────────────┼────────────────────┼──────────────────────┤
│ intu    │        1.0 │ 1.0009969834858632 │ … │ 1.5874789099647222 │ 1.5953525231351298 │ [1.0, 1.0009969834…  │
│ etr     │        1.0 │ 1.0007330706496838 │ … │ 0.9138641986621462 │ 0.9183542563914597 │ [1.0, 1.0007330706…  │
│ sbux    │        1.0 │ 1.0360011901219874 │ … │ 0.9454527422394129 │  0.944956858077953 │ [1.0, 1.0360011901…  │
│ xyl     │        1.0 │ 0.9984651498736006 │ … │ 1.0083965330444202 │ 1.0169736366919464 │ [1.0, 0.9984651498…  │
│ duk     │        1.0 │ 1.0107039537126326 │ … │ 0.9292189006750241 │ 0.9340405014464802 │ [1.0, 1.0107039537…  │
├─────────┴────────────┴────────────────────┴───┴────────────────────┴────────────────────┴──────────────────────┤
│ 5 rows                                                                                   248 columns (6 shown) │
└────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘

Polars는 다음과 같이 사용할 수 있다.

pl_df = pl.read_parquet("./data/parquet/*.parquet")
pl_df.pivot(index="ticker", columns="date", values="close").with_columns(
    pl.concat_list(pl.all().exclude("ticker")).alias("closes")
)

Dask는 다음과 같이 사용할 수 있다.

dd_df = dd.read_parquet("./data/parquet/*.parquet")

# Make date column to categorical to use pivot method
dd_df.date = dd_df.date.dt.strftime("%Y-%m-%d").astype("category").cat.as_known()

dd_pivot_df = dd_df.pivot_table(index='ticker', columns='date', values='close').compute()

# Aggregate the date columns
dd_agg_df = dd_df.compute().groupby('ticker').agg({"close": list})

# Merge the two dataframes
dd_final_df = dd_pivot_df.merge(dd_agg_df, on="ticker")

dd_final_df.rename(columns={"close": "closes"}, inplace=True)

NOTE: 만약 동일 작업을 lazy evaluation으로 하는 경우, Object 형식의 float64[] 타입이 string 타입으로 변환된다.

위 코드의 compute() 함수 호출 위치를 바꿔보자.

dd_dtissue_df = dd.read_parquet("./data/parquet/*.parquet")
dd_dtissue_df.date = dd_dtissue_df.date.dt.strftime("%Y-%m-%d").astype("category").cat.as_known()
dd_dtissue_pivot_df = dd_dtissue_df.pivot_table(index='ticker', columns='date', values='close')
dd_dtissue_agg_df = dd_dtissue_df.groupby('ticker').agg({"close": list})
dd_dtissue_final_df = dd_dtissue_pivot_df.merge(dd_dtissue_agg_df, on="ticker").compute()
dd_dtissue_final_df.rename(columns={"close": "closes"}, inplace=True)

type(dd_dtissue_final_df.closes.iloc[0])

위 코드를 실행하면 다음과 같이 closes 컬럼이 str로 변환된 것을 확인할 수 있다.

str

DuckDB는 다음과 같이 사용할 수 있다.

TABLE = "./data/parquet/*.parquet"

# Directly join the results of the two queries using subqueries
duckdb.sql(f"""
CREATE OR REPLACE TEMPORARY TABLE final_t AS (
    SELECT pivot_t.*, agg_t.closes
    FROM (
        PIVOT '{TABLE}' ON date USING first(close) GROUP BY ticker
    ) AS pivot_t
    INNER JOIN (
        SELECT ticker, list(close ORDER BY date ASC) AS closes
        FROM '{TABLE}'
        GROUP BY ticker
    ) AS agg_t ON pivot_t.ticker = agg_t.ticker
)
""")

다음과 같이 DuckDB의 결과를 Pandas dataframe으로 변환할 수 있다.

duckdb.sql("SELECT * FROM final_t").to_df()

그리고 DuckDB의 결과를 다음과 같이 Parquet 파일로 저장할 수 있다.

OUTPUT = "./tmp/duckdb.parquet"

duckdb.sql(f"""COPY final_t TO '{OUTPUT}' (FORMAT PARQUET)""")