트위터 API를 이용하여 데이터 파이프라인 만들기
이전 포스팅에 이어서 진행합니다.
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 5
트위터 API를 이용하여 데이터 파이프라인 만들기 이전 포스팅에서 했던 작업을 embulk, hive, presto, airflow를 이용하여 작업해보도록 하겠습니다. 이전 포스팅은 아래와 같습니다. [빅데이터를 지탱
my-develop-note.tistory.com
Presto 설치 & 세팅
presto 공식문서를 참고하였습니다. 링크는 아래와 같습니다.
Deploying Presto — Presto 0.279 Documentation
Create an etc directory inside the installation directory. This will hold the following configuration: Catalog Properties Presto accesses data via connectors, which are mounted in catalogs. The connector provides all of the schemas and tables inside of the
prestodb.io
presto cluster를 구축하여 진행하였습니다.
presto는 coordinator와 worker로 구성이 됩니다.
coordinator는 hive의 데이터 소스를 읽어와서 worker들에게 전달하고 쿼리를 관리하며 worker가 데이터를 처리할 수 있게 task를 보내주는 역할을 합니다. 저는 master VM이 그 역할을 하도록 설정하였습니다.
worker 는 coordinator로 부터 받은 task를 기반으로 데이터 소스에 접근하고 결과를 다시 client 에게 보내줍니다. slave VM들이 그 역할을 합니다.
master와 slave 모두 presto를 다운받고 압축을 풀어줍니다.
저는 master의 모습만 가져왔지만 cluster를 구축하기 위해서는 slave 또한 설치해야 합니다.
설치 받은 presto-server-0.279/etc/로 들어갑니다. etc 디렉토리가 없다면 생성합니다.
node.properties
먼저 master VM입니다.
node.environment=prestocluster
node.id=master
node.data-dir=/home/bigdata/presto/data
slave1입니다.
node.environment=prestocluster
node.id=slave1
node.data-dir=/home/bigdata/presto/data
- node.environment : 환경이름으로 모든 노드들이 동일한 이름을 사용해야합니다. 저는 prestocluster라고 부여했습니다.
- node.id : 각 노드마다 고유한 id를 가져야 하며 저는 간단하게 hostname을 부여했습니다.
- node.data-dir : 데이터 디렉토리의 위치입니다. presto는 여기에 로그나 기타데이터를 저장합니다. 해당 경로에 디렉토리가 없어서 에러가 나온다면 생성해줍니다.
현재는 slave1만 가지고 왔지만 모든 slave에 적용하였습니다.
jvm.config
master VM입니다.
-server
-Xmx6G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
slave입니다.
-server
-Xmx3G
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:+ExitOnOutOfMemoryError
공식문서에서 제공하는 기본템플릿을 따랐고 -Xmx부분만 다릅니다.
Xmx는 Java 힙의 최대 크기를 제어하며 jar 파일이 가질 수 있는 최대 동적 메모리입니다. 공식문서에서 제공하는 기본값은 16G로 제가 VM에게 부여한 메모리보다는 크기 때문에 VM에게 부여한 메모리에 맞춰서 변경하였습니다.
- master : 8GB를 부여하였고 6GB로 설정
- slave : 각각 4GB를 부여하였고 3GB로 설정
이 옵션으로 outofmemory로 node가 죽는 현상이 있었는데 그것을 해결할 수 있었습니다.
config.properties
master VM입니다.
coordinator=true
node-scheduler.include-coordinator=false
http-server.http.port=8080
discovery-server.enabled=true
discovery.uri=http://master:8080
slave 입니다.
coordinator=false
http-server.http.port=8080
discovery.uri=http://master:8080
공식문서에 제공하는 minimal configuration에서 살짝 변형하였습니다.
coordinator로 사용할 master VM에 coordinator=ture 옵션으로 설정했고, slave에는 coordinator=false으로 설정하였습니다.
port는 8080으로 master:8080으로 접속하면 presto server을 web ui로 확인할 수 있습니다.
Hive Connector
hive connector를 허용하여 hive에 저장되어 있는 데이터를 쿼리하도록 설정하였습니다.
아래의 공식문서를 참고하였습니다.
Hive Connector — Presto 0.279 Documentation
The Hive connector supports Apache Hadoop 2.x and derivative distributions including Cloudera CDH 5 and Hortonworks Data Platform (HDP). Create etc/catalog/hive.properties with the following contents to mount the hive-hadoop2 connector as the hive catalog,
prestodb.io
master, slave VM 모두 동일하게 설정하였습니다.
먼저 etc 디렉토리 안에 catalog 디렉토리를 생성해줍니다.
hive.properties
connector.name=hive-hadoop2
hive.metastore.uri=thrift://master:9083
hive.metastore-timeout=30s
hive.config.resources=/etc/hadoop/core-site.xml,/etc/hadoop/hdfs-site.xml
presto로 부터 hive metastore 참고하기 위하여 작성하였습니다.
Command Line Interface
presto cli를 실행하기 위하여 presto-cli를 설치합니다.
master VM에서만 설정했습니다.
Command Line Interface — Presto 0.279 Documentation
Command Line Interface The Presto CLI provides a terminal-based interactive shell for running queries. The CLI is a self-executing JAR file, which means it acts like a normal UNIX executable. Download presto-cli-0.279-executable.jar, rename it to presto, m
prestodb.io
presto-cli-0.279-executable.jar 파일을 다운받고 presto라는 이름으로 변경하고 chmod +x 권한을 부여하였습니다.
presto-server 폴더에 다운받고 이름과 권한을 변경하였습니다.
Presto 실행
지금까지 설치하고 세팅을 완료한 presto를 실행하겠습니다.
presto는 서버/클라이언트 시스템이기 때문에 presto server를 기동해야합니다.
master와 slave 모두 bin/launcher를 start 합니다.
그리고 master:8080 주소를 접속해보니 아래와 같이 node가 떠있는 것을 확인할 수 있습니다.
presto cli를 사용하여 hive에서 데이터를 쿼리해보겠습니다.
먼저 ./presto로 presto server에 접속하였습니다.
use hive.default; 로 hive의 metastore를 이용하게 설정하였습니다.
이전에 만들었던 twitter_sample_words 테이블을 쿼리해보았더니 잘 나온것을 확인할 수 있습니다.
web으로도 확인할 수 있습니다.
Presto에 의한 데이터 집계 - Task3
START="{{macros.ds_add(ds, -3)}}"
END="{{tomorrow_ds}}"
cat > query.sql << EOF
WITH word_category as
(SELECT word,
if(count > 1000, word, concat('COUNT=', cast(count AS varchar))) category
FROM(
SELECT word, count(*) count
FROM twitter_sample_words
WHERE time BETWEEN DATE('$START') AND DATE('$END') GROUP BY word )
),
range_twitter_sample_words as (
select date_trunc('hour', time) time, word
from twitter_sample_words
where time BETWEEN DATE('$START') AND DATE('$END')
)
select a.time time, b.category category, count(*) count
from range_twitter_sample_words a
left join word_category b
on a.word = b.word
group by time, category;
EOF
### 한줄입니다 ###
$PRESTO_HOME/presto --catalog hive --schema default -f query.sql
\--output-format CSV_HEADER > $HOME/project/data/word_summary/word_summary_${START}_to_{{ds}}.csv
START는 airflow logical 날짜(ds)를 기준으로 3일전 입니다.
END는 (ds)기준 다음날입니다.
spark에서 했던 작업과 동일한 작업입니다.
spark에서는 view를 사용했지만 여기서는 with구문을 사용하여 임시테이블을 생성했습니다.
2023-03-14(START)부터 2023-03-20(END)의 범위로 설정하여 쿼리를 실행했을 때 결과입니다.
마지막으로 일정 범위에 있는 데이터를 project/data/word_summary 디렉토리 안에 csv의 형태로 저장하게 됩니다.
csv의 형태로 데이터가 저장되었다면 pandas를 이용하여 데이터 분석이나 머신러닝에 활용할 수 있을 것 같습니다.
- 방금 실행한 쿼리문은 csv파일로 저장하지 않았고 프로젝트를 진행하며 실습했던 파일을 가져왔습니다.
Troubleshooting
airflow에서 backfill을 하거나 presto에서 쿼리를 실행할 때 sql 쿼리문에서 OutOfMemoryErorr가 나왔습니다.
backfill은 또 다른 문제로 OutOfMemory가 나오긴 했지만 backfill을 하기 이전부터 task3을 단일로 테스트했을 때에도 등장한 Error였습니다. 책에서 나온 코드 그대로 적용하였을때 Memory를 초과하는 오류가 나왔었고 오류를 해결한 방법을 공유하겠습니다.
presto 설치 & 세팅에서 설명한대로 presto의 jvm.config를 수정하였습니다. 그럼에도 불구하고 쿼리문이 느렸기 때문에 코드를 개선하였습니다.
[빅데이터를 지탱하는 기술] 책에서는 아래와 같이 with구문으로 word_category 임시테이블만 생성하고 left join으로 twitter_sample_words와 합치는 쿼리문을 사용하고 있습니다.
WITH word_category AS(
SELECT word,
if(count > 1000, word,
concat('COUNT=', cast(count AS varchar))) category
FROM (
SELECT word, count(*) count FROM twitter_sample_words
WHERE time BETWEEN DATE('$START') and DATE('$END')
GROUP BY word
) t
)
SELECT date_trunc('hour', a.time) time, b.category, count(*) count
FROM twitter_sample_words a
LEFT JOIN word_category b ON a.word = b.word
WHERE a.time BETWEEN DATE('$START') AND DATE('$END')
GROUP BY 1,2;
그래서 저는 임시테이블 2개를 생성하여 left join하는 쿼리문으로 다시 만들어 문제를 해결하였습니다.
WITH word_category as
(SELECT word,
if(count > 1000, word, concat('COUNT=', cast(count AS varchar))) category
FROM(
SELECT word, count(*) count
FROM twitter_sample_words
WHERE time BETWEEN DATE('$START') AND DATE('$END') GROUP BY word )
),
range_twitter_sample_words as (
select date_trunc('hour', time) time, word
from twitter_sample_words
where time BETWEEN DATE('$START') AND DATE('$END')
)
select a.time time, b.category category, count(*) count
from range_twitter_sample_words a
left join word_category b
on a.word = b.word
group by time, category;
2023-03-14부터 2023-03-20까지 데이터를 추출하는 쿼리문을 실행시켜보겠습니다.
먼저 책에서 제공한 쿼리문입니다.
다음은 제가 수정한 쿼리문입니다.
1분 13초에서 21초정도로 쿼리시간이 많이 단축된것을 확인할 수 있습니다.
'데이터 엔지니어링' 카테고리의 다른 글
[빅데이터를 지탱하는 기술] 배경 (0) | 2023.04.11 |
---|---|
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 7 (0) | 2023.03.29 |
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 5 (0) | 2023.03.28 |
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 4 (0) | 2023.03.28 |
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 3 (0) | 2023.03.26 |