프로젝트/장기 프로젝트

[사이드 프로젝트] 크롤링 마이그레이션, Pandas CSV 용량 줄이기 - Parquet 으로 1/10 용량으로 압축하기

Chipmunks 2024. 3. 3.
728x90

 

최근에 사이드프로젝트 백엔드를 작업하다, 데이터 작업도 같이 맡게 되었습니다.

기존엔 프론트엔드 개발자인 형 혼자서 담당했는데, 개발 리소스를 분산하고자 배우고 있네요. ☺️

데이터를 크롤링하고 CSV 파일로 만들고 AWS S3에 업로드하는, 기존 크롤링 코드의 언어 마이그레이션 작업을 맡았습니다.

 

기존엔 JavaScript async-await 코드로 크롤링이 이뤄졌는데요.

매일 수집되는 CSV 용량이 100MB 가 훌쩍 넘어가다보니 비용 문제가 발생했습니다.

이를 해결하기 위해 Parquet 으로 압축하려는 시도를 했으나

자바스크립트의 parquetjs 패키지가 현재 환경에서 동작되지 않는 이슈가 있었습니다.

4년전이 마지막 업데이트다보니 호환이 되지 않는 것 같더라고요.

 

데이터 전처리 코드도 Python 의 Pandas DataFrame 으로 작업하고 있고

DataFrame 에서 아예 to_parquet 메소드를 지원하고 있어서 파이썬 코드로 옮기자! 로 결정했어요.

기존 데이터 처리 로직도 익힐겸, 제가 그 작업을 맡았습니다.

 

1. 자바스크립트 수집 코드를, 파이썬으로 마이그레이션

파이썬 코드로 마이그레이션하며 신경 쓴 점은 다음과 같습니다.

  1. 역할에 맞는 클래스 설계 & 확장성 챙기기
  2. 비동기 작업
  3. 용량 압축

지금은 동작만(?) 가능하게 한 점이 있는데, 차차 리팩터링으로 나아가고자 합니다. ㅎㅎ

 

1.1. 역할에 맞는 클래스 설계 & 확장성 챙기기

기존 코드는 한 파일에 여러 메소드로 작성되었는데요.

개인적으로 파이썬 코드도 동일하게 작성하면, 나중에 코드 파악하기 힘들 것 같더라고요.

각 메소드를 역할로 구분하고 클래스로 만들고자 했습니다.

 

데이터 수집은 다음과 같이 이뤄집니다.

  • 일감 가져오기
  • 일감 분배하기
  • 분배된 일감으로 수집하기
  • 데이터 저장하기
  • AWS S3 으로 데이터 업로드하기

직접 크롤링하는 코드는 외부에 이미 배포가 되어 있습니다.

우리가 만들 스크래퍼(Scrapper)는, '어디'를 크롤링할 건가, 보다는 '어떻게' 크롤링할 건가에 초점이 맞춰져 있습니다.

일감은 대략 40,000 여개이며, 각각 크롤링하며 정보를 수집하는 구조입니다.

 

각각 다음과 같이 이름을 붙였습니다.

  • 일감 가져오기 : Loader
  • 일감 분배하기 : Scheduler
  • 분배된 일감으로 수집하기 : Scrapper
  • 데이터 저장하기 : Storage
  • AWS S3 으로 데이터 업로드하기 : Uploader

사실 Storage 와 Uploader 는 아직입니다...ㅎ

메인에 때려박고 글부터 쓰고(?) 바꾸려고 합니다.

 

추상 클래스로 만든 다음, 자식 클래스로 상속 받아 구현하고 있습니다.

 


 

Loader 의 경우 '어디서' 가져올 것 인가, 에 따라 달라지는 걸 염두했습니다.

 

### loader/__init__.py

from .json_loader import *
from .postgres_loader import *
### loader/loader.py

# Abstract class
from abc import ABCMeta, abstractmethod

# Pandas
import pandas as pd


class AbstractLoader(metaclass=ABCMeta):
    @abstractmethod
    async def load(self) -> pd.DataFrame:
        raise NotImplementedError

 

### loader/json_loader.py

# Common Setup
from .loader import AbstractLoader

# Effective JSON Load
# https://pythonspeed.com/articles/faster-python-json-parsing/
# 74MB 0.51 compared to python json 173MB 0.58
from msgspec.json import decode
from msgspec import Struct

# Pandas
from pandas import DataFrame

# Typing
from typing import Union


class MyData(Struct):
    data1: int
    data2: str
    data3: int
    data4: str
    data5: str
    data6: Union[int, str]
    data7: float
    data8: float
    data9: int
    data10: int
    data11: int
    data12: int
    data13: Union[int, str]
    data14: int
    data15: int
    data16: int
    data17: int
    data18: str


class JsonLoader(AbstractLoader):
    """
    Json 파일을 가져옵니다.
    """

    def __init__(self, file_name: str):
        self.file_name = file_name

    async def load(self, struct=MyData) -> DataFrame:
        with open(self.file_name, "r") as f:
            data = decode(f.read(), type=list[struct])
            return DataFrame(data)

 

### loader/postgres_loader.py

# Common Setup
from config import *

logger = get_logger("PostgresLoader")

from loader import AbstractLoader

# Database Setup
from common.db import *

# Pandas
import pandas as pd


class PostgresLoader(AbstractLoader):
    async def load(self) -> pd.DataFrame:
        logger.info("Postgres DB 에서 XXX 데이터를 불러옵니다.")

        query = "SELECT * FROM XXX"
        data = pd.read_sql_query(query, engine)
        return data

 

AbstractLoader 추상 클래스를 만들고, 일감을 가져오는 역할임을 추상 메소드로 표현했습니다.

Json 으로 불러올 수 있고, PostgreSQL 로도 불러올 수 있도록 확장성을 챙겼습니다.

공통된 반환 형태로는 Pandas 의 DataFrame 입니다.

 

원래 클래스명에 특정 도메인 이름이 있었어요. ( eg. AccountPostgresLoader )

당장 있어도 큰 상관은 없지만 가독성과 확장성을 위해 제거했습니다.

 

기존 코드가 json 을 불러와서 어떻게 해야 Json 을 효율적으로 불러올 수 있을까, 고민했었어요.

코드에도 나와있듯이 이 포스팅을 참고했습니다.

속도도 빠르고 빌트인 json 패키지보다 메모리를 100MB 나 줄일 수 있는 msgspec 패키지를 채택했습니다.

어떤 용량이든 고정된 메모리 길이로 스트리밍 처리하는 라이브러리도 고려했었어요.

다만 불러오는 CSV 의 크기가 몇 십 GB 단위가 아니라 단순 100MB 정도기도 하고,

로드할 때 부터 청크 단위로 코드 구조를 짜는 게 당장은 어렵다고 판단했습니다.

 

무엇보다 중요한 건 더이상 json 으로 불러오지 않고, PostgreSQL 으로 불러온다는 점이었는데요...!

PostgreSQL 코드로 급하게 바꾸면서 json 코드는 손을 대지 않았습니다. 😁

PostgreSQL 도 페이지네이션으로 변경하면 yield 구조로 바꾸지 않을까 싶네요.

그러면서 json 도 스트리밍으로 불러오는 ijson 패키지를 사용하지 않을까 싶네요.

 


 

Scheduler 는 일감을 어떻게 분배할 것인가, 에 초점을 맞춥니다.

현재 가용 가능한 크롤링 리소스는 4개입니다.

그래서 단순하게 4등분만 해주는 EvenScheduler 만 작업했습니다.

 

### scheduler/__init__.py

from .even_scheduler import *
### scheduler/scheduler.py

# Pandas
from pandas import DataFrame

# Typing
from typing import List

# Abstract class
from abc import ABCMeta, abstractmethod


class AbstractScheduler(metaclass=ABCMeta):
    @abstractmethod
    def schedule(self, df: DataFrame, total_num: int) -> List[DataFrame]:
        raise NotImplementedError
### scheduler/even_scheduler.py

# Builtins
import math

# Common Setup
from config import *

logger = get_logger("EvenScheduler")

# Pandas
from pandas import DataFrame

# Typing
from typing import List

# Abstract class
from .scheduler import AbstractScheduler


class EvenScheduler(AbstractScheduler):
    def schedule(self, df: DataFrame, total_num: int) -> List[DataFrame]:
        if total_num <= 0:
            return []

        logger.info("스케쥴러를 실행합니다.")

        length = len(df)
        chunk_size = math.ceil(length / total_num)
        result = []

        for i in range(0, length, chunk_size):
            logger.info("%d 번째에 %d 개를 할당", i + 1, chunk_size)
            result.append(df[i:i + chunk_size])

        logger.info("스케쥴러를 종료합니다.")
        return result

 

일감 하나 하나마다 작업에 드는 비용이 다른데요.

어떤 일감은 한 번의 네트워크 호출만 하면 되지만, 어떤 일감은 굉장히 여러 번의 네트워크 호출로 정보를 수집하게 됩니다.

단순히 4등분하는 것 만으로는 부족하지 않을까 싶어요.

 

어렴풋이 배운 OS 지식에서 차용을 해봤는데요..!

미리 4등분하는 정적 로드밸런싱보다 동적으로 일감을 계속 가져가는 것도 좋은 것 같아요.

정적으로 등분하면, 각 등분별로 작업량의 차이가 많이 날 수도 있습니다.

동적으로 가져간다면 첫 번째 등분한 게, 네 번째 등분한 것 보다 작업량이 월등히 많은 경우를 방지할 수 있겠죠?

 

또는 이전에 작업한 내역을 따로 저장하여, 이를 다음 스케쥴링에 활용하는 것도 하나의 방법이겠다 싶더라고요.

하나의 스크랩퍼가 균등한 네트워크 작업량을 가질 수 있도록 분배할 수 있겠군요.

현재는 하나의 일감이 얼마나 많이 작업되는가도 모르는 상태예요.

 

실시간 데이터를 수집하는 터라 이전에 작업한 내역이, 반드시 현재와 같은 법은 없습니다만,

도메인 특성상 하루 만에 그 변동이 크지 않을 것 같다는 예상이 듭니다.

한 일감에 20번을 작업하는 것과, 변동이 생겨 18~22 번 작업하는 것의 오차는 괜찮지 않을까 싶네요.

반대로 한 1번의 작업량을 가진 일감이, 2~3 번 작업할 확률도 적기도 합니다.

 

작업량대로 우선순위를 매겨, 빠르게 업데이트가 되어야하는 일감과 느리게 업데이트해도 괜찮은 일감을 구분할 수도 있을 것 같아요.

물론 현재는 모든 작업이 완료가 되어야지만 반영이 되는데

실시간으로 반영되게끔 구조를 확장하는 것도 (아주 미래지만) 생각 중입니다.

 


 

Scrapper 는 외부에 배포된 스크랩퍼에게 일감을 전달하고 그 응답을 가져옵니다.

한 일감을 여러 번 전달해야하는 경우가 있는데요.

한 일감에 수집해야 하는 데이터가 100 개라면, 한 번 가져오는 데 최대 20개밖에 가져오지 못합니다.

따라서 총 5번을 전달하고 응답을 받아야 합니다.

또한 로봇 탐지에 걸린다면 새로운 토큰을 발급받고 재시도를 해야하는 Fallback 처리 로직도 들어가 있습니다.

 

async def scrap(self, mydata: DataFrame, worker_index: int, start_index: int):
    if len(mydata) == 0:
        return []

    token = await self.get_token()
    result = []
    page_count = 1
    scrap_count = 0

    for index, data in mydata.iterrows():
        success = False

        while not success:
            try:
                is_more_data = True
                while is_more_data:
                    res = await self.get_response(data['data1'], page_count, token)
                    for data_result in res["result"]:
                        data_result_with_filtered_key = {
                            'worker_index': worker_index,
                            'data_index': start_index + index
                        }

                        for key in keys:
                            data_result_with_filtered_key[key] = data_result.get(key)

                        result.append(
                            data_result_with_filtered_key
                        )
					
                    # 토큰 재발급
                    if scrap_count % 1000 == 0:
                        token = await self.get_token()

                    is_more_data = res["isMoreData"]
                    if is_more_data:
                        page_count = res["page"] + 1
                    else:
                        page_count = 1
                        scrap_count = scrap_count + 1

                    await asyncio.sleep(1)
                success = True
            except Exception as e:
                logger.error(e)
                # 토큰 재발급하여 재시도
                token = await self.get_token()
                await asyncio.sleep(1)

    return result

 

이 역시 코드가 깔끔하지 못한데...

사실 핵심 로직은 네트워크 호출하고 데이터 프레임으로 필요한 키만 저장하고 모아서 반환하는 로직이 끝이에요.

기타, 더 받아와야 하는 경우나 Fallback 처리 로직이 많이 들어가 있어, 이걸 어떻게 분리해야할까 고민 포인트입니다..!

핵심 로직만 눈에 딱 들어오면 좋겠네요. 😁

 


 

Storage 와 Uploader 도 방금 급조한 거긴 합니다. 😅

Loader 의 반댓말인 Saver 로 지을까, Stroage 로 할까 여전히 고민이지만...

어쨌든 데이터프레임을 파일로 저장하는 역할을 합니다.

아직은 CSV 파일만 저장하지만 다른 확장자로 저장할 수도 있고요~

 

또한 Pandas 패키지의 의존성을 줄일 수도 있을 것 같네요.

가능하면 줄이고 싶지만... Pandas DataFrame 이 주는 어마어마한 전처리 이점도 있기에...

당장은 쉽지 않을 것 같네요.

 

Uploader 는 S3에 보낼 수도, NAS나 다른 스토리지 벤더로 보낼 수도 있습니다.

별 일이 없으면은 S3 고정일 것 같긴 합니다.

 

여담으로, 추후 후술할 데이터 압축도 Compresser 요런 걸로도 만들어야 하나 싶긴 하네요.

Storage 의 책임으로 넣는 걸로도 생각이 기울였긴 합니다.

 

1.2. 비동기 작업

기존 자바스크립트의 async - await 구조를 asyncio 으로 대체하고 있습니다.

일단 async 키워드 덕지 덕지 붙이고(?) 4개의 스크랩퍼가 병렬로 실행되도록 구성했습니다.

...

# Asynchronous
import asyncio

...

async def main():
	...
    while True:
    	...
        
        futures = []

        for index, scheduled_chunks in enumerate(scheduled_chunks):
            scrapper = XXXScrapper(XXX_URL_LIST[index])
	    ...
            futures.append(
                scrapper.scrap(chunk, index, step)
            )

        collected_result = await asyncio.gather(*futures)
		
        ...

    ...

if __name__ == '__main__':
    asyncio.run(main())

 

큰 비동기 작업은 각 스크랩퍼가 병렬로 실행되게끔 합니다.

앞으로, 일감을 가져오는 것 부터 S3 으로 보내는 것 까지, 비동기로 변환할 부분은 없나

눈을 씻고 찾아보는 걸로 개선할 것 같아요.

 

안타깝게도 DataFrame 에선 asyncio 를 지원하지 않는다고 하는데요.

asyncio 으로 감싸서 작업을 나눠 실행하든지, Pandas 를 학습하여 자원을 효율적으로 사용하게끔 리팩터링이 들어갈 것 같네요.

당장은 개인 PC 에서 돌아갈 크롤링이라 자원은 크게 신경쓰지 않아도 될 부분이긴 합니다.

 

2. Parquet 으로 1/10 용량으로 압축하기

현재 작업의 큰 요구사항 중 하나가 바로 Parquet 으로 압축하여 저장하는 겁니다!

근데 Parquet 란 무엇일까요?

 

Apache Parquet 란, 컬럼(열) 기반의 형식으로 효율적으로 저장해주는 오픈 소스 데이터 파일 형식입니다.
복잡한 데이터를 열로 구성되어, 스토리지 공간을 절약하고 분석 쿼리 속도를 향상시킵니다.
데이터 압축과 해제의 효율이 높고 언어를 가리지 않습니다.
따라서 클라우드 스토리지 공간에 저장하는 데 용이합니다.
출처 : https://www.databricks.com/kr/glossary/what-is-parquet

 

간단히 말하면, 데이터를 압축하여 용량을 줄여주는 친구입니다.

장기적으로 저장하는 데 있어 압축보다 좋은 선택지는 없을 겁니다.

데이터 프로젝트 전역적으로 Pandas 라이브러리를 사용하는 터라

to_parquet 메소드로 데이터프레임을 간편하게 저장할 수도 있습니다.

 

2.1. dtype 데이터 타입 튜닝

파케이로 저장하기 전에 수집한 CSV 를 데이터 프레임으로 올려놓아야 합니다.

현재는 수집할 때 마다 CSV 데이터 맨 뒤에 추가(Append)하는 코드로 되어 있습니다.

기존에는 한 번에 CSV 파일로 쓰는 로직이었습니다만,

중간에 오류가 나면 처음부터 다시 실행해야 하는 문제점이 있었어요. (가뜩이나 오래 걸리는데...)

그래서 맨 뒤에 추가하는 로직으로 마이그레이션 했습니다.

 

CSV를 데이터프레임으로 읽을 때 dtype 을 지정하면, 읽는 속도도 빠르고 parquet 저장 시 용량도 줄여줍니다.

기본적으로 각 칼럼의 데이터 타입이 int64, object 으로 불러와지는데요.

일단... 큰 숫자를 다루지 않습니다. 1,000 도 안넘습니다...

object 도 모든 데이터 타입을 아우르는 구조인 것 같죠?!

이를 저장하는 데 필요한 공간도 딱 봐도 많아 보입니다.

결국엔 데이터프레임으로 변환하면서, 모든 행을 읽어 어떤 타입인지 판별을 해야하는 로직도 들어가 있어 보입니다.

여러모로 손해만 봐서 데이터 타입을 직접 지정했습니다.

 

데이터는 약 7만건이 넘는데요.

CSV 용량만 135 MB 입니다.

칼럼의 dtype 을 지정하는 것 만으로도 용량 변화가 눈에 띄었어요.

기존 int64, object 로 구성된 22개 칼럼을 uint16 (0~65,536) 양의 정수 칼럼 4개, string 문자열 칼럼 18개, float32 실수 칼럼 2개 으로 구성했습니다.

 

Parquet 기본 압축(snappy)한 결과

  • 20.7 MB -> 19.8 MB

으로 약 1 MB 나 차이가 나는 걸 확인할 수 있습니다.

데이터 프레임으로 먼저 데이터 범위 동향을 분석해보고 적절한 타입으로 지정해주면,

읽기 속도도 빨라지고 용량도 확보할 수 있습니다.

all_df = pd.read_csv('data.csv', keep_default_na=False, dtype={
    'data1': 'uint16',
    'data2': 'uint16',
    'data3': 'string',
    'data4': 'uint16',
    'data5': 'uint16',
    'data6': 'string',
    'data7': 'string',
    'data8': 'string',
    'data9': 'string',
    'data10': 'string',
    'data11': 'string',
    'data12': 'string',
    'data13': 'string',
    'data14': 'string',
    'data15': 'string',
    'data16': 'string',
    'data17': 'string',
    'data18': 'string',
    'data19': 'string',
    'data20': 'string',
    'data21': "float32",
    'data22': "float32",
    'data23': 'string',
    'data24': 'string'
})

 

2.2. 데이터에 맞는 압축 방법 찾기

Parquet 으로 저장할 때 압축 방법을 직접 설정할 수 있습니다.

Parquet engine 은 압축 속도도 빠르고 대중적으로 많이 쓰이는 pyarrow 으로 설정했습니다.

(관련 라이브러리 패키지 설치 필요, 없으면 오류로 알려줌)

 

pyarrow 외에 fastparquet 라는 선택지가 있는데요.

fastparquet 는 압축 속도는 pyarrow 보다 느리지만, 설치 용량이 훨씬 가벼운 특징이 있습니다.

pyarrow 는 빌드 파일이 25MB 내외인데 반해, fastparquet 는 1MB 내외입니다.

compressions = ['snappy', 'gzip', 'brotli', 'lz4', 'zstd']
for compression in compressions:
    all_df.to_parquet("data.parquet.%s" % compression, compression=compression, engine="pyarrow", index=False)

 

135 MB CSV 를 Parquet 파일로 만든 결과 용량 차이가 눈에 띄었습니다!

  • snappy : 19.8 MB ( 85.3 % )
  • gzip : 14 MB ( 89.6 % )
  • brotli : 13.6 MB ( 90 % )
  • lz4 : 20.1 MB ( 85.1 % )
  • zstd : 15.6 MB ( 88.4 % )

그 중 brotli 가 1/10 수준으로 압축을 해줍니다!

찾아보니 텍스트 데이터에 효율이 높은 압축 방법이라고 합니다.
앞으로 brotli 압축 방법으로 채택하여 업로드하고,
기존에 저장된 CSV 데이터들도 brotli 압축 방법으로 변환할 것 같네요. 🙂
출처 : https://www.linkedin.com/pulse/parquet-file-compression-everyone-zstd-brotli-lz4-gzip-alex-merced/ 

 

3. 앞으로의 고민

적은 용량으로 압축하는 건 나름 잘 해결한 것 같은데요,

그 외에도 아주 여러가지 고민들은 여전합니다.

 

  • 객체지향적으로 설계를 잘 했는지, 좋은 코드 구조는 어떤걸까?
  • DataFrame 에 의존적이라 관련 조작 코드가 계속 들어간다... 타협을 해야할까? 분리한다면 어떻게 분리하면 좋을까? Spring 에서 Mapper 로 변환하듯 그래야할까?
  • 여전히 자바스크립트에서 실행한 코드처럼 느리다... 7만건 전체 수집 5시간 20분 걸렸다. 제일 중요한 데이터가 나중에 작업되면 어쩌지? 작업 우선순위를 어떻게 나누면 좋을까?
  • 현재는 전체 일감을 바로 DB 에서 불러온다. 먼저 전체 총량만 가져오고, 실제 일감 불러오는 건 동적으로 나눠서 불러오면 좋을듯, 페이지네이션 들어가면 yield 로 되는 등, 요청 구간을 매개 변수로 넘기는 등으로 코드 구조가 바뀔듯
  • 본문에 나와있는 스케쥴링 방법과 로드밸런싱 방법 고민

 

당장 중요도가 높은 항목은 아니라, 백로그로 남겨서

나중에 리팩터링 시간이 날 때 추가로 진행하는 쪽으로 될 것 같네요.

자세한 코드를 남기진 못하지만, 좋은 의견을 주면 아주 긍정적으로 검토해보겠습니다. 😁😁

긴 글 읽어주셔서 감사합니다.

 

댓글