_Han_
나의 개발 노트
_Han_
  • 분류 전체보기 (273)
    • 데이터 엔지니어링 (30)
    • 인프라 (3)
    • 추천시스템 (11)
    • 코딩테스트 (146)
    • 부트캠프 회고 (15)
    • 회고 (4)
    • 자격증 (1)
    • 파이썬 프로그래밍 (6)
    • 통계 (2)
    • Git (21)
    • 유니티2D (33)

최근 글

반응형
hELLO · Designed By 정상우.
_Han_

나의 개발 노트

[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 4
데이터 엔지니어링

[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 4

2023. 3. 28. 10:23
반응형

트위터 API를 이용하여 데이터 파이프라인 만들기

이전 포스팅에 이어서 진행합니다.

 

[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 3

트위터 API를 이용하여 데이터 파이프라인 만들기 이전 포스팅에 이어서 진행합니다. [빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 2 트위터 API를 이용하여 데이

my-develop-note.tistory.com

 

이전 포스팅에서는 Hadoop Yarn에서 Spark 클러스터 분산 환경을 구축한 경험과 방법에 대하여 공유했습니다.

 

여기서는 Spark 클러스터를 이용하여 데이터 파이프라인을 구축한 경험을 이야기하고자 합니다.


먼저 파이썬용 패키지 설치 프로그램인 pip를 이용하여 pyspark를 설치하였습니다.

pip install pyspark

 

그리고 vsc를 이용하지 않고 웹 브라우저에서 코드를 작성하고 실행해볼 수 있는 jupyter notebook을 설치하여 진행했습니다.

  • vsc에서 jupyter notebook을 이용할 수 있지만 사용성은 그렇게 좋지 않았습니다..
#pip를 이용하여 설치
pip install jupyter notebook
#conda를 이용하여 설치
conda install jupyter notebook

저는 conda환경을 이용하지 않았기 때문에 pip로 설치하였습니다.

 

로컬 컴퓨터인 Windows에서 가상머신의 ubuntu환경 에서 실행되는 jupyter notebook에 원격접속하기 위하여 다음의 링크를 참고했습니다. 가상 머신의 화면이 아닌 로컬 컴퓨터의 화면에서 작업할 수 있습니다.

 

[Python]VirtualBox에 설치된 Ubuntu에서 Jupyter Notebook설치 후 Windows에서 원격접속 설정하기

먼저 아나콘다를 설치하고 진행하기 바랍니다. 아나콘다 설치는 아래의 링크를 참고해주시기 바랍니다. [Python]Ubuntu18.04 LTS에 Anaconda설치하기 1. Anaconda 설치파일 다운로드 받기 Google Chrome을 켜고

somjang.tistory.com

hdfs의 spark-warehouse에 생성한 테이블을 저장해야하기 때문에 hadoop을 실행합니다.

저는 SparkSession을 실행중이기 때문에 SparkSubmit이 보입니다.


Spark 데이터 파이프라인 - Yarn

spark 데이터 파이프라인

프로젝트에서 구축한 전체적인 데이터 파이프라인은 위와 같습니다.

 

먼저 SparkSession과 Row를 import 해줍니다.

from pyspark.sql import SparkSession
from pyspark.sql import Row

SparkSession은 spark의 기능의 시작점으로 Spark DataFrame 기능을 사용하기 위함입니다.

Row는 Spark DataFrame에서 행을 의미하며 밑에서 함수에 사용됩니다.

yarn = SparkSession\
        .builder\
        .master('yarn')\
        .appName('twitter_yarn')\
        .config('spark.submit.deployMode', 'client')\
        .config('spark.executor.instances', 4)\
        .config('spark.executor.memory', '2g')\
        .config('spark.driver.memory', '2g')\
        .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1')\
        .getOrCreate()

 

yarn이라는 변수에 SparkSession을 생성해줍니다.

  • mater를 yarn으로 입력하여 hadoop yarn위에서 spark를 실행합니다.
  • 애플리케이션의 이름은 twitter_yarn입니다.
  • slave1,2,3,4를 executor로 이용하기 위하여 spark.executor.instances를 4로 입력했습니다.
  • 각 executor의 memory를 2gb로 설정했습니다.
  • deploymode가 client로 실행되며 driver의 memory를 2gb로 설정합니다. master VM에 해당합니다.
  • packages를 mongo-spark-connector로 입력하여 spark에서 mongodb의 데이터를 불러 올 수 있도록 했습니다.

master:4040 주소로 들어가서 application UI를 확인해보았습니다.

세팅에 맞게 설정되어 있는 것을 확인할 수 있습니다.


mongoDB에서 데이터를 추출하였고 Spark DataFrame을 생성하였습니다. 

createOrReplaceTempView('tweets') 함수로 sql을 사용하여 핸들링할 수 있도록 tweets라는 이름의 view 생성합니다.

 

sql을 사용한 방식과 DataFrame의 함수를 사용한 방식 두가지를 비교하며 진행했습니다.

데이터는 약 770만개의 데이터가 존재하며 실행시간은 비슷해 보입니다.


%%time
from pyspark.sql.functions import date_format
en_tweets_sdf = df[['_timestamp', 'data.text']].withColumnRenamed('_timestamp','time')
en_tweets_sdf = en_tweets_sdf.select(date_format('time', 'yyyy-MM-dd HH:mm:ss').alias('time'), en_tweets_sdf.text)
en_tweets_sdf.show(5)

 

두 셀의 실행시간은 각각 587ms, 428ms 비슷했습니다.


데이터의 text 데이터를 split()하여 단어별로 분리하고 Row 객체로 반환하는 함수입니다.

샘플 데이터를 text_split함수를 사용하면 단어들이 공백을 기준으로 분리가 된 후에 Row형태로 만들어진 것을 확인할 수 있습니다.

rdd.flatMap()함수를 사용하고 DataFrame 형태로 변경하였습니다.

 

en_tweets_sdf 또한 같은 작업을 진행했고 sql보다 조금 더 빠른 속도를 보였지만 여기서는 생략하겠습니다.


 

생성한 words_sql view를 통하여 count(*) 쿼리문을 실행해보니 약 3800만개 정도의 데이터가 생성된것을 확인할 수 있었습니다.

데이터를 쿼리하는데 약 1분정도의 시간이 소요되었습니다.

  • words_sdf는 Error가 발생하여 words_sdf.count()함수는 실행할 수 없었습니다. Error 원인은 아직 찾지 못하였습니다.

저는 먼저 hdfs에 spark-warehouse 디렉토리를 만들었습니다.

!hdfs dfs -rmr spark-warehouse/yarn_twitter_sample_words

저는 실습을 하면서 미리 만들어놓았던 yarn_twitter_sample_words 디렉토리를 먼저 제거하였습니다.

!hdfs dfs -expunge

hdfs 휴지통도 비워주었습니다.

!hdfs dfs -ls spark-warehouse/yarn_twitter_sample_words

다시 테이블을 보관할 yarn_twitter_sample_words 디렉토리를 생성했습니다.

%%time
#words를 hdfs warehouse에 저장
words_sql.write.saveAsTable('yarn_twitter_sample_words')

words_sql을 물리적인 테이블로 보관합니다.

잘 저장 되어있는 것을 확인할 수 있습니다.

 

기존의 words_sql view와 저장한 테이블 yarn_twitter_sample_words를 비교해보겠습니다.

 

먼저 기존의 words_sql입니다.

약 1분정도의 시간이 소요되었습니다.

 

테이블 yarn_twitter_sample_words입니다.

약 8초정도 시간이 소요되었습니다.

물리적인 테이블에서 조회할 때 훨씬 빠른속도를 보입니다. 쿼리는 동일하지만 실행시간에서 많이 차이가 나는 것을 확인할 수 있습니다.

 

 

단어를 카테고리별로 나눈 디멘전 테이블입니다. 

단어, 단어의 수, 카테고리로 구성되어 있으며 카테고리의 경우에는 해당 단어의 수가 1000개가 넘는 많이 등장한 단어라면 단어 자체가 카테고리로 지정되지만 1000개 넘지 않는 단어라면 'COUNT=단어의 수' 로 지정됩니다.

 

View : word_category로 생성하였습니다.

 

디멘전 테이블의 수를 세어보니 약 310만개 정도 데이터가 생성된 것을 확인할 수 있습니다.

 

 

위에서 만들었던 word_category 디멘전 테이블과 기존의  yarn_twitter_sample_words 데이터를 join하고 1시간마다 카테고리별로 그룹화를 하였습니다.

데이터의 수는 20만개 정도로 줄어든것을 확인할 수 있습니다.

 

20만개의 데이터는 spark를 안쓰고 pandas로도 충분히 핸들링을 할 수 있습니다.

2023-03-14일부터 2023-03-21까지의 기간동안 수집한 데이터를 1시간마다 카테고리별로 그룹화 시킨것을 확인할 수 있습니다.

 

pandas 라이브러리를 통하여 원하는 기간이나 카테고리를 추출해서 데이터 분석에 활용할 수도 있습니다.

 

분석은 여기서는 다루지 않겠습니다.

 

반응형

'데이터 엔지니어링' 카테고리의 다른 글

[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 6  (0) 2023.03.28
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 5  (0) 2023.03.28
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 3  (0) 2023.03.26
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 2  (0) 2023.03.24
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 1  (0) 2023.03.24
    '데이터 엔지니어링' 카테고리의 다른 글
    • [빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 6
    • [빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 5
    • [빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 3
    • [빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 2
    _Han_
    _Han_
    학습한 것을 기록합니다.

    티스토리툴바