트위터 API를 이용하여 데이터 파이프라인 만들기
이전에 작성했던 포스팅에 이어서 진행합니다.
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 1
트위터 API를 이용하여 데이터 파이프라인 만들기 트위터의 API로 트위터 텍스트 데이터를 수집하고 간단한 데이터 파이프라인을 만드는 프로젝트를 진행하였고, 프로젝트는 [빅데이터를 지탱하
my-develop-note.tistory.com
데이터 수집
먼저 데이터 수집에는 트위터 API를 이용하고 파이썬 스크립트를 작성하였고 master VM에 mongodb를 이용하여 데이터에 저장했습니다.
파이썬 스크립트를 작성하기 전에 크게 2가지 작업을 해야합니다.
먼저 트위터의 API를 사용하기 위해서는 트위터에 개발자 등록을 하고 API에 접속할 수 있는 Key와 Token을 받아야합니다.
이 과정은 인터넷 블로그 자료에 많이 나와있으니 여기서는 생략하도록 하겠습니다.
MongoDB 설치
트위터 API 개발자 등록을 마쳤거나 신청을 하신다음에는 master VM에 분산스토리지를 사용할 MongoDB를 설치합니다.
MongoDB는 JSON 형태로 되어있으며, 실제 JSON 형태로 데이터를 수집하는 경우가 많다고 합니다.
아래의 MongoDB 공식문서를 따라가면 MongoDB는 쉽게 설치할 수 있습니다.
Install MongoDB Community Edition on Ubuntu — MongoDB Manual
Docs Home → MongoDB Manual MongoDB AtlasMongoDB Atlas is a hosted MongoDB service option in the cloud which requires no installation overhead and offers a free tier to get started.Use this tutorial to install MongoDB 4.4 Community Edition on LTS (long-te
www.mongodb.com
MongoDB를 설치했다면 /etc/mongo.conf
를 수정하여 외부접속을 허용하도록 만들겠습니다.
bindIp를 0.0.0.0으로 수정합니다.
sudo systemctl restart mongod
혹은
sudo service mongod restart
그리고 위의 명령으로 mongod를 재시작합니다.
mongodb가 잘 실행되는 것을 확인할 수 있습니다.
트위터 스트리밍 데이터 수집
트위터 공식 문서를 참고하였습니다.
- https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/introduction
- https://developer.twitter.com/en/docs/twitter-api/tweets/volume-streams/quick-start/sampled-stream
- https://github.com/twitterdev/Twitter-API-v2-sample-code/blob/main/Sampled-Stream/sampled-stream.py
v1에서는 OAuth 1.0 인증을 하였는데 v2가 되면서 OAuth 2.0으로 인증이 바뀌었습니다.
그래서 책에서 나온 코드로는 더이상 트위터 API를 이용하여 데이터를 수집할 수 없기 때문에 제가 다시 작성해보았습니다.
v2가 되면서 아래와 같은 제한사항이 추가된 것으로 파악됩니다.
- 한번에 하나의 연결을 할 수 있다.
- 연결 요청은 15분당 최대 50회까지 이루어질 수 있다
두번째 조건으로 인하여 데이터를 수집하다가 연결이 강제로 끊기는 에러가 발생한 것으로 추측됩니다.
이러한 에러상황으로 인하여 에러가 발생했을때 기록할 수 있는 log를 작성하여 적용하였습니다.
- log를 작성하고 실행했더니 그 다음부터는 에러가 잘 나지 않는 아이러니한 상황이 발생했습니다..무슨 이유인지는 모르겠습니다
def record_log(error, count):
KST = timedelta(hours=9).seconds
#복구시간
recover_time = int(response.headers['x-rate-limit-reset']) + KST
#현재시간
now_time = time()+KST
#남은시간
remain = recover_time-now_time
remain_time = timedelta(seconds=remain)
logger.warning(f"### {error} Error ###")
logger.info(f"현재 시간 : {datetime.utcfromtimestamp(now_time).strftime('%Y/%m/%d %H:%M:%S')}")
logger.info(f"복구 시간 : {datetime.utcfromtimestamp(recover_time).strftime('%Y/%m/%d %H:%M:%S')}")
logger.info(f'남은 시간 : {remain_time}')
logger.info(f'count : {count}')
logger.warning(f"### {error} Error ###")
아래는 데이터를 수집하는 전체 코드입니다.
import pymongo
import requests
import json
import sys
from Key import get_key
from time import time, sleep
from pprint import pprint
from datetime import timezone, timedelta, datetime
from log import get_logger
key = get_key()
consumer_key = key['consumer_key']
consumer_secret = key['consumer_secret']
bearer_token =key['bearer_token']
def bearer_oauth(r):
r.headers['Authorization'] = f"Bearer {bearer_token}"
return r
url = 'https://api.twitter.com/2/tweets/sample/stream?tweet.fields=lang,created_at'
KST = timezone(timedelta(hours=9))
while True:
mongo = pymongo.MongoClient()
response = requests.request("GET", url, auth=bearer_oauth,stream=True)
logger = get_logger()
count = 0
try:
for line in response.iter_lines():
if line:
tweet = json.loads(line)
tweet['_timestamp'] = datetime.now(tz=KST).isoformat()
mongo.twitter.sample.insert_one(tweet)
count += 1
except Exception as err:
error = str(sys.exc_info()[0]) + str(err)
record_log(error, count)
print('15분 쉽니다.')
sleep_time = 60 * 15
sleep(sleep_time)
Key.py 파일에 트위터 API에서 발급받은 consumer_key, consumer_secret, bearer_token을 작성하고 현재 스크립트로 import하여 사용하였습니다.
- github에 공개할 때 key정보를 감추기 위하여 사용했습니다.
request 모듈을 사용하여 bearer_token을 적용하여 stream = Ture로 하여 response를 받아올 수 있었습니다.
받아온 데이터들을 mongodb의 twitter 데이터베이스.sample 컬렉션에 저장하였습니다. mongodb에 저장할 때 현재 저장 시간을 파악하기 위하여 _timestamp를 추가하여 저장하였습니다.
로컬 컴퓨터에서 MongoDB Compass 프로그램을 이용하여 master VM에 저장되어 있는 데이터를 확인해보았습니다.
url = 'https://api.twitter.com/2/tweets/sample/stream?tweet.fields=lang,created_at'
url에 tweets.fileds=lang,created_at을 볼 수 있는데 기본적으로 제공하는 데이터 외에 추가로 수집한 데이터입니다.
실제 수집된 데이터를 확인해보니 잘 저장이 되어 있습니다.
수집된 데이터는 다음과 같습니다.
- lang : 작성자의 언어
- created_at : 작성자가 트윗을 올린 날짜
- text : 작성자가 트윗한 글 내용
- _timestamp : 현재 저장 시간
수집된 데이터를 확인해보니 약 770만개의 데이터를 수집하였습니다.
수집된 데이터의 크기는 약 1.4GB 정도로 빅데이터라 부르기에는 너무 작은데이터지만 빅데이터라고 생각하고 프로젝트를 진행했습니다.
데이터를 수집한 기간은 3월 14일부터 3월 21일입니다. 약 8일정도 기간 수집을 진행했지만 8일의 모든 시간을 수집하지는 못했습니다. 에러로 수집하지 못한 시간도 있었고, 컴퓨터를 켜고 작업하지 않은 시간은 수집하지 못했습니다.
'데이터 엔지니어링' 카테고리의 다른 글
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 6 (0) | 2023.03.28 |
---|---|
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 5 (0) | 2023.03.28 |
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 4 (0) | 2023.03.28 |
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 3 (0) | 2023.03.26 |
[빅데이터를 지탱하는 기술] 트위터 API를 이용한 데이터 파이프라인 만들기 # 1 (0) | 2023.03.24 |