_Han_
나의 개발 노트
_Han_
  • 분류 전체보기 (273)
    • 데이터 엔지니어링 (30)
    • 인프라 (3)
    • 추천시스템 (11)
    • 코딩테스트 (146)
    • 부트캠프 회고 (15)
    • 회고 (4)
    • 자격증 (1)
    • 파이썬 프로그래밍 (6)
    • 통계 (2)
    • Git (21)
    • 유니티2D (33)

최근 글

반응형
hELLO · Designed By 정상우.
_Han_

나의 개발 노트

[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 5
데이터 엔지니어링

[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 5

2023. 3. 28. 12:44
반응형

트위터 API를 이용하여 데이터 파이프라인 만들기

이전 포스팅에서 했던 작업을 embulk, hive, presto, airflow를 이용하여 작업해보도록 하겠습니다.

이전 포스팅은 아래와 같습니다.

 

[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 4

트위터 API를 이용하여 데이터 파이프라인 만들기 이전 포스팅에 이어서 진행합니다. [빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 3 트위터 API를 이용하여 데이

my-develop-note.tistory.com


MongoDB로 부터 데이터를 추출하기 위하여 오픈소스 벌크 전송 도구인 "Embulk"를 사용하였습니다.

 

Embulk 설치

Embulk 공식 홈페이지를 참고하여 설치하였습니다.

 

Embulk

Embulk is an open-source pluggable bulk data loader. It helps loading data to/from varieties of storages, file formats, databases, cloud services, and else.

www.embulk.org

curl --create-dirs -o ~/.embulk/bin/embulk -L "https://dl.embulk.org/embulk-latest.jar"
chmod +x ~/.embulk/bin/embulk
echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
source ~/.bashrc

embulk-0.9.24 버전을 사용하고 있습니다.

 

MongoDB용 플러그인을 설치하였습니다.

embulk gem install embulk-input-mongodb embulk-formatter-jsonl

저는 미리 설치 했기 때문에 아래와 같이 두개의 플러그인을 조회할 수 있습니다.

Embulk에 의한 데이터 추출 - Task1

#!/bin/bash

START="{{ds}}"
END="{{tomorrow_ds}}"

cat > config.yml << EOF
in :
  type: mongodb
  uri: mongodb://192.168.56.101:27017/twitter
  collection: sample
  query: '{_timestamp: {\$gte: "${START}", \$lt: "${END}"}}'
  projection: '{_timestamp : true, "data.lang" : true, "data.text" : true}'
out :
  type: file
  path_prefix: /home/bigdata/project/data/twitter_sample/twitter_sample_${START}/
  file_ext: json.gz
  formatter:
    type: jsonl
  encoders:
  - type: gzip
EOF

rm -rf /home/bigdata/project/data/twitter_sample/twitter_sample_${START}
mkdir /home/bigdata/project/data/twitter_sample/twitter_sample_${START}

embulk run config.yml

hdfs dfs -put -f /home/bigdata/project/data/twitter_sample/twitter_sample_${START}  /user/bigdata/json_data/

airflow에서 실행하는 task입니다.

START는 DAG를 실행하는 logical 날짜입니다.

END는 START의 다음 날짜 입니다.

 

먼저 MongoDB에서 특정기간의 데이터를 추출합니다. 

데이터가 어떤 모습으로 추출되는지 확인하기 위하여 mongodb를 실행하고 아래의 명령어를 입력했습니다.

  •  Start는 2023-03-14로 가정하였습니다.
#한줄입니다
db.sample.find({_timestamp : {$gte : "2023-03-14", $lt : "2023-03-15"}},\
{_timestamp : true, "data.lang" : true, "data.text" : true}).limit(5).pretty()

2023-03-14일 부터 2023-03-15일까지 _timestamp, data.lang, data.text만 추출하고 5개만 표현했습니다.

 

위와 같은 형태의 데이터가 ubuntu master VM안에 project/data/twitter_sample/ twitter_sample_날짜 디렉토리 밑에 json.gz 파일 형태로 저장됩니다.

아래의 명령어로 데이터를 확인할 수 있습니다.

zcat 000.00.json.gz | head -1

 

저는 여러번 실행해도 동일한 결과를 얻기 위한 작업을 진행했습니다.

task의 하단 부분을 확인해보면 출력 디렉토리를 제거하고 다시 만드는 작업을 하고 embulk를 실행합니다.

그리고 출력디렉토리에 mongodb에서 추출한 데이터가 저장된다면 그 디렉토리를 hdfs로 옮기는 작업을 합니다.

rm -rf /home/bigdata/project/data/twitter_sample/twitter_sample_${START}
mkdir /home/bigdata/project/data/twitter_sample/twitter_sample_${START}

embulk run config.yml

hdfs dfs -put -f /home/bigdata/project/data/twitter_sample/twitter_sample_${START}  /user/bigdata/json_data/

hdfs의 결과를 확인하면 아래와 같습니다.

같은 json.gz 파일이 저장되어 있는 것을 확인할 수 있습니다.

 

Hive 설치 & 셋업

참고한 자료입니다.

 

GettingStarted - Apache Hive - Apache Software Foundation

Table of Contents Installation and Configuration You can install a stable release of Hive by downloading a tarball, or you can download the source code and build Hive from that. Running HiveServer2 and Beeline Requirements Java 1.7Note:  Hive versions 1.

cwiki.apache.org

저는 /usr/local 디렉토리 안에 hive를 설치하였습니다.

hive는 3.1.3을 사용하고 있습니다.

 

hive/conf 아래의 hive.site.xml 파일을 아래와 같이 수정하였습니다. 

<configuration>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://master:9083</value>
  </property>
</configuration>

 

그리고 project/batch 폴더로 이동하여 아래의 명령어를 입력하였습니다.

schematool -initSchema -dbType derby

저는 이전에 미리 작업을 했습니다.

metastore_db 라는 디렉토리가 만들어지며 hive 테이블 정보를 가지는 역할을 합니다.

 

그리고 현재 디렉토리(/project/batch) hive 메타 스토어를 기동합니다.

hive --service metastore

presto로도 집계를 해야하기 때문에 기동한채로 진행했습니다.

 

Hive에 의한 데이터 구조화 - Task2

#!/bin/bash

START="{{ds}}"

cat > load.hql << EOF

ADD JAR /usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-core-3.1.3.jar;

! echo "> ADD JAR /usr/local/hive/hcatalog/share/hcatalog/hive-hcatalog-core-3.1.3.jar";

CREATE TEMPORARY EXTERNAL TABLE twitter_sample(
  record struct<\`_timestamp\` : string, \`data\` : struct<\`lang\` : string, \`text\` : string> >
)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
STORED AS TEXTFILE LOCATION '/user/bigdata/json_data/twitter_sample_${START}/';

! echo "> CREATE TEMPORARY EXTERNAL TABLE twitter_sample";

SELECT * FROM twitter_sample LIMIT 5;

CREATE TABLE IF NOT EXISTS twitter_sample_words(
  \`time\` timestamp, \`word\` string
)
PARTITIONED BY(pt string) STORED AS ORC;

! echo "> CREATE TABLE twitter_sample_words";

INSERT OVERWRITE TABLE twitter_sample_words PARTITION (pt='${START}')
SELECT from_unixtime(unix_timestamp(record.\`_timestamp\`, "yyyy-MM-dd'T'hh:mm:ss")) as \`time\`, \`word\`
FROM twitter_sample
LATERAL VIEW explode(split(record.\`data\`.\`text\`, '\\s+')) words as \`word\`
WHERE record.\`data\`.\`lang\` = 'en'
ORDER BY \`time\`;

SELECT * FROM twitter_sample_words LIMIT 5;

EOF

hive -f load.hql -d START=${START}

START는 embulk에서 사용한 것과 같은 의미입니다.

 

저는 hive를 실행해서 데이터가 어떤 모습인지 확인하면서 진행했습니다.

먼저 JSON 파일을 읽어들이기 위한 라이브러리를 가져옵니다.

 

JSON 파일을 읽은 외부테이블 twitter_sample을 만듭니다. 

 

 

출력 테이블 twitter_sample_words를 생성합니다. PARTITIONED BY는 폴더 구조로 데이터를 분할하여 저장합니다.

STORED AS ORC로 칼럼 기반으로 파일을 저장합니다. 분석을 위하여 데이터를 집계할 때 조금 더 빠른 속도를 보일 수 있습니다.

 

 

특정 날짜를 지정해서 파티션을 덮어쓰기를 했습니다. OVERWRITE로 인하여 파티션을 덮어쓰기 때문에 매번 실행해도 같은 결과를 얻을 수 있습니다.

이렇게 만들어진 twitter_sample_words 테이블을 출력하면 위와 같이 나오게 됩니다.

 

 

반응형

'데이터 엔지니어링' 카테고리의 다른 글

[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 7  (0) 2023.03.29
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 6  (0) 2023.03.28
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 4  (0) 2023.03.28
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 3  (0) 2023.03.26
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 2  (0) 2023.03.24
    '데이터 엔지니어링' 카테고리의 다른 글
    • [빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 7
    • [빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 6
    • [빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 4
    • [빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 3
    _Han_
    _Han_
    학습한 것을 기록합니다.

    티스토리툴바