트위터 API를 이용하여 데이터 파이프라인 만들기
이전 포스팅에 이어서 진행합니다.
이번 포스팅에는 전에 진행했던 task1,2,3을 워크플로 도구인 airflow를 이용하여 자동화시킨 경험을 공유하겠습니다.
- Task 1 : Embulk에 의한 데이터 추출
- Task 2 : Hive에 의한 데이터 구조화
- Task 3 : Presto에 의한 데이터 집계
Airflow 설치 & 세팅
Airflow 공식 문서를 참고 하였습니다.
#!/bin/bash
AIRFLOW_VERSION=2.5.2
PYTHON_VERSION="$(python3 --version | cut -d ' ' -f 2 | cut -d '.' -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
Ariflow를 설치하는 bash 스크립트를 작성하고 실행시켜 설치하였습니다.
task를 병렬실행하기 위하여 airflow용 데이터베이스를 mysql을 설치하여 설정하였습니다.
- 설치 중간에 error가 발생하여 g++, python3-dev, libsasl2-dev 패키지를 추가로 설치하였습니다.
- sudo apt install g++ python3-dev libsasl2-dev
Airflow.cfg 설정
task의 병렬처리를 위하여 airflow.cfg 파일을 수정하였습니다.
기본적으로 airflow가 설치가 되면 root 디렉토리 아래에 설치가 됩니다.
airflow 디렉토리 아래에 airflow.cfg가 존재합니다.
수정한 것만 가져왔습니다.
[core]
executor = LocalExecutor
parallelism = 2
load_examples = False
...
[database]
sql_alchemy_conn = mysql://root@localhost/airflow
...
- executor : 가장 기본 executor는 SequentialExecutor로 병렬처리를 지원하지 않습니다. LocalExecutor는 병렬처리를 지원하기 때문에 변경하였습니다.
- parallelism : 모든 DAG내에서 실행되는 최대 Task 수 입니다. 2로 설정하였습니다.
- load_examples : airflow에서 기본적으로 제공해주는 example dag입니다. 운영 DAG와 혼동을 줄 수 있어 제거하였습니다.
- sql_alchemy_conn : airflow 기본 데이터베이스를 mysql로 연결했습니다.
DAG 작성
airflow/dags 디렉토리 아래에 아래와 같은 DAG를 작성하면 airflow dags에 등록할 수 있습니다.
#twitter_sample.py
import datetime
from utils import etl
from airflow import DAG
from airflow.operators.bash import BashOperator
with DAG(
'twitter_sample',
schedule='@daily',
start_date=datetime.datetime(2023,3,14),
tags=['twitter']
) as dag:
extract = BashOperator(task_id='extract', bash_command=etl.EXTRACT)
load = BashOperator(task_id='load', bash_command=etl.LOAD)
aggregate=BashOperator(task_id='aggregate', bash_command=etl.AGGREGATE)
extract >> load >> aggregate
twitter_sample에서 import 하고 있는 etl.py에는 이전에 작성했던 Task1, 2, 3가 작성되어 있습니다.
EXTRACT = """ TASK1 """
LOAD = """ TASK2 """
AGGREGATE = """ TASK3 """
쉘 스크립트를 실행하기 위하여 BashOperator를 사용했습니다.
start_date는 데이터 수집시작일인 2023-03-14로 설정하였습니다.
데이터 파이프라인 실행하기
지금까지 설정했던 hadoop, hive 메타스토어, presto--server 모두 기동시킵니다.
- spark는 사용하지 않기 때문에 기동시키지 않아도됩니다.
airflow db init
혹은
airflow db reset
명령어로 airflow 데이터베이스를 초기화합니다.
airflow webserver -p 8081
8081포트에 airflow webserver를 기동시킵니다.
airflow scheduler
airflow의 태스크의 스케줄 실행을 시작하기 위하여 airflow scheduler 명령어로 스케줄러를 기동합니다.
master:8081포트로 접속을 해보면
위에서 작성했던 twitter_sample DAG가 등록되어 있습니다.
twitte_sample DAG를 클릭해서 들어가보면 이전에 작업했던 결과가 보입니다.
airflow dags backfill twitter_sample -s 2023-03-14 -e 2023-03-22
위의 명령어로 이전에 backfill을 진행했기 때문에 backfill의 결과가 보입니다.
twitter_sample DAG의 데이터 파이프라인은 위와 같습니다.
현재는 로컬의 가상머신으로 실습을 진행하고 있고 매번 컴퓨터를 켜둘수 없기 때문에 자동화가 되는 과정은 보지 못했습니다.
aws와 같은 클라우드 서비스를 이용하면 매번 하루가 끝날때 데이터 파이프라인을 실행할 것 입니다.
Troubleshooting
parallelism 파라미터가 기본값(32)으로 설정되어 있을 때 backfill 을 실행하니 task가 Error가 발생하여 failed 되는 문제가 있었습니다.
task가 병렬처리를 할때 가상머신들의 OutOfMemory Error가 발생하였고, parallelism의 값을 2개로 줄여주니 backfill을 할 때 발생하던 메모리 오류가 발생하지 않고 backfill을 완료할 수 있었습니다.
'데이터 엔지니어링' 카테고리의 다른 글
[빅데이터를 지탱하는 기술] 빅데이터 시대의 데이터 분석 # 1 (0) | 2023.04.11 |
---|---|
[빅데이터를 지탱하는 기술] 배경 (0) | 2023.04.11 |
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 6 (0) | 2023.03.28 |
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 5 (0) | 2023.03.28 |
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 4 (0) | 2023.03.28 |