트위터 API를 이용하여 데이터 파이프라인 만들기
이전 포스팅에서 했던 작업을 embulk, hive, presto, airflow를 이용하여 작업해보도록 하겠습니다.
이전 포스팅은 아래와 같습니다.
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 4
트위터 API를 이용하여 데이터 파이프라인 만들기 이전 포스팅에 이어서 진행합니다. [빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 3 트위터 API를 이용하여 데이
my-develop-note.tistory.com
MongoDB로 부터 데이터를 추출하기 위하여 오픈소스 벌크 전송 도구인 "Embulk"를 사용하였습니다.
Embulk 설치
Embulk 공식 홈페이지를 참고하여 설치하였습니다.
Embulk
Embulk is an open-source pluggable bulk data loader. It helps loading data to/from varieties of storages, file formats, databases, cloud services, and else.
www.embulk.org
curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc
embulk-0.9.24 버전을 사용하고 있습니다.
MongoDB용 플러그인을 설치하였습니다.
embulk gem install embulk-input-mongodb embulk-formatter-jsonl
저는 미리 설치 했기 때문에 아래와 같이 두개의 플러그인을 조회할 수 있습니다.
Embulk에 의한 데이터 추출 - Task1
#!/bin/bash
START="{{ds}}"
END="{{tomorrow_ds}}"
cat > config.yml << EOF
in :
type: mongodb
uri: mongodb://192.168.56.101:27017/twitter
collection: sample
query: '{_timestamp: {\$gte: "${START}", \$lt: "${END}"}}'
projection: '{_timestamp : true, "data.lang" : true, "data.text" : true}'
out :
type: file
path_prefix: /home/bigdata/project/data/twitter_sample/twitter_sample_${START}/
file_ext: json.gz
formatter:
type: jsonl
encoders:
- type: gzip
EOF
rm -rf /home/bigdata/project/data/twitter_sample/twitter_sample_${START}
mkdir /home/bigdata/project/data/twitter_sample/twitter_sample_${START}
embulk run config.yml
hdfs dfs -put -f /home/bigdata/project/data/twitter_sample/twitter_sample_${START} /user/bigdata/json_data/
airflow에서 실행하는 task입니다.
START는 DAG를 실행하는 logical 날짜입니다.
END는 START의 다음 날짜 입니다.
먼저 MongoDB에서 특정기간의 데이터를 추출합니다.
데이터가 어떤 모습으로 추출되는지 확인하기 위하여 mongodb를 실행하고 아래의 명령어를 입력했습니다.
- Start는 2023-03-14로 가정하였습니다.
#한줄입니다
db.sample.find({_timestamp : {$gte : "2023-03-14", $lt : "2023-03-15"}},\
{_timestamp : true, "data.lang" : true, "data.text" : true}).limit(5).pretty()
2023-03-14일 부터 2023-03-15일까지 _timestamp, data.lang, data.text만 추출하고 5개만 표현했습니다.
위와 같은 형태의 데이터가 ubuntu master VM안에 project/data/twitter_sample/ twitter_sample_날짜 디렉토리 밑에 json.gz 파일 형태로 저장됩니다.
아래의 명령어로 데이터를 확인할 수 있습니다.
zcat 000.00.json.gz | head -1
저는 여러번 실행해도 동일한 결과를 얻기 위한 작업을 진행했습니다.
task의 하단 부분을 확인해보면 출력 디렉토리를 제거하고 다시 만드는 작업을 하고 embulk를 실행합니다.
그리고 출력디렉토리에 mongodb에서 추출한 데이터가 저장된다면 그 디렉토리를 hdfs로 옮기는 작업을 합니다.
rm -rf /home/bigdata/project/data/twitter_sample/twitter_sample_${START}
mkdir /home/bigdata/project/data/twitter_sample/twitter_sample_${START}
embulk run config.yml
hdfs dfs -put -f /home/bigdata/project/data/twitter_sample/twitter_sample_${START} /user/bigdata/json_data/
hdfs의 결과를 확인하면 아래와 같습니다.
같은 json.gz 파일이 저장되어 있는 것을 확인할 수 있습니다.
Hive 설치 & 셋업
참고한 자료입니다.
GettingStarted - Apache Hive - Apache Software Foundation
Table of Contents Installation and Configuration You can install a stable release of Hive by downloading a tarball, or you can download the source code and build Hive from that. Running HiveServer2 and Beeline Requirements Java 1.7Note: Hive versions 1.
cwiki.apache.org
저는 /usr/local 디렉토리 안에 hive를 설치하였습니다.
hive는 3.1.3을 사용하고 있습니다.
hive/conf 아래의 hive.site.xml 파일을 아래와 같이 수정하였습니다.
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://master:9083</value>
</property>
</configuration>
그리고 project/batch 폴더로 이동하여 아래의 명령어를 입력하였습니다.
schematool -initSchema -dbType derby
저는 이전에 미리 작업을 했습니다.
metastore_db 라는 디렉토리가 만들어지며 hive 테이블 정보를 가지는 역할을 합니다.
그리고 현재 디렉토리(/project/batch) hive 메타 스토어를 기동합니다.
hive --service metastore
presto로도 집계를 해야하기 때문에 기동한채로 진행했습니다.
Hive에 의한 데이터 구조화 - Task2
#!/bin/bash
START="{{ds}}"
cat > load.hql << EOF
ADD JAR /usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-core-3.1.3.jar;
! echo "> ADD JAR /usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-core-3.1.3.jar";
CREATE TEMPORARY EXTERNAL TABLE twitter_sample(
record struct<\`_timestamp\` : string, \`data\` : struct<\`lang\` : string, \`text\` : string> >
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE LOCATION '/user/bigdata/json_data/twitter_sample_${START}/';
! echo "> CREATE TEMPORARY EXTERNAL TABLE twitter_sample";
SELECT * FROM twitter_sample LIMIT 5;
CREATE TABLE IF NOT EXISTS twitter_sample_words(
\`time\` timestamp, \`word\` string
)
PARTITIONED BY(pt string) STORED AS ORC;
! echo "> CREATE TABLE twitter_sample_words";
INSERT OVERWRITE TABLE twitter_sample_words PARTITION (pt='${START}')
SELECT from_unixtime(unix_timestamp(record.\`_timestamp\`, "yyyy-MM-dd'T'hh:mm:ss")) as \`time\`, \`word\`
FROM twitter_sample
LATERAL VIEW explode(split(record.\`data\`.\`text\`, '\\s+')) words as \`word\`
WHERE record.\`data\`.\`lang\` = 'en'
ORDER BY \`time\`;
SELECT * FROM twitter_sample_words LIMIT 5;
EOF
hive -f load.hql -d START=${START}
START는 embulk에서 사용한 것과 같은 의미입니다.
저는 hive를 실행해서 데이터가 어떤 모습인지 확인하면서 진행했습니다.
먼저 JSON 파일을 읽어들이기 위한 라이브러리를 가져옵니다.
JSON 파일을 읽은 외부테이블 twitter_sample을 만듭니다.
출력 테이블 twitter_sample_words를 생성합니다. PARTITIONED BY는 폴더 구조로 데이터를 분할하여 저장합니다.
STORED AS ORC로 칼럼 기반으로 파일을 저장합니다. 분석을 위하여 데이터를 집계할 때 조금 더 빠른 속도를 보일 수 있습니다.
특정 날짜를 지정해서 파티션을 덮어쓰기를 했습니다. OVERWRITE로 인하여 파티션을 덮어쓰기 때문에 매번 실행해도 같은 결과를 얻을 수 있습니다.
이렇게 만들어진 twitter_sample_words 테이블을 출력하면 위와 같이 나오게 됩니다.
'데이터 엔지니어링' 카테고리의 다른 글
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 7 (0) | 2023.03.29 |
---|---|
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 6 (0) | 2023.03.28 |
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 4 (0) | 2023.03.28 |
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 3 (0) | 2023.03.26 |
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 2 (0) | 2023.03.24 |