현재 진행하는 프로젝트에 flink를 도입하려고 기존에 있던 파이프라인을 pyflink로 코드화하여 정상동작을 하고 있는지 테스트를 진행중이다.
flink에 대한 자료뿐 아니라 pyflink에 대한 자료가 적고, 의지할 수 있는것은 공식문서밖에 없기 때문에 코드를 한줄한줄 바꿔가며 적용해보고 있다.
이번 포스팅에서도 프로젝트 상황과 비슷하게 구현하여, 테스트한 내용과 기록을 남기려고 한다.
배경
먼저 현재 프로젝트에는
Kakfa에 저장되어 있는 데이터를 가져와 Flink로 전처리, partitioning 하여 S3에 parquet format으로 적재하고 있다.
이번 포스팅에서는 Kafka에서 데이터를 가져와 Flink로 전처리하여 print하는 작업에 대하여 기록하려고 한다.
Kafka에 저장되어 있는 데이터는 아래와 같다.
[{
"A" : "a",
"B" : "b",
"C" : "c",
"D" : "d"
},
{
"A" : "a",
"B" : "b",
"C" : "c",
"D" : "d"
},
{
"A" : "a",
"B" : "b",
"C" : "c",
"D" : "d"
}]
파이썬 자료구조로 설명하면, 리스트안에 딕셔너리로 들어있는 구조이다.
하지만 데이터 항상 [A, B, C, D]의 데이터만 들어오는것이 아니라, [E], 혹은 ,[A, B, C]와 같이 field 값이 달리 들어오는 상황에 대하여 대처해야 한다.
Pyflink Code
import os
import sys
import json
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import ProcessFunction, RuntimeContext
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common import Types, types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.table.table_environment import StreamTableEnvironment, EnvironmentSettings, TableDescriptor
from pyflink.table.schema import Schema
from pyflink.table.expression import DataTypes
field_names= ["A", "B", "C", "D", "E"]
class MyProccessFuntion(ProcessFunction):
def process_element(self, value, ctx: ProcessFunction.Context):
field_dict = {name : None for name in field_names}
for data in json.loads(value):
return_dict = field_dict.copy()
return_dict.update(data)
row = types.Row(*return_dict.keys())
yield row(*return_dict.values())
BOOTSTRAP_SERVER = "kafka:9092"
TOPIC = "LIST_DATA"
if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
os_name = sys.platform
if os_name == 'win32':
jars_path = os.path.join(os.getcwd(), "jars_18")
all_jar_files = os.listdir(jars_path)
jars = [os.path.join(jars_path, jar_file) for jar_file in all_jar_files]
for jar in jars:
env.add_jars(f"file:///{jar}")
t_env = StreamTableEnvironment.create(env)
source = (KafkaSource.builder()\
.set_bootstrap_servers(BOOTSTRAP_SERVER)\
.set_group_id('test')\
.set_topics(TOPIC)\
.set_starting_offsets(KafkaOffsetsInitializer.earliest())\
.set_value_only_deserializer(SimpleStringSchema())
.build())
ds = env.from_source(source, WatermarkStrategy.no_watermarks(), "kafka_source")
field_types = []
schema = Schema.new_builder()
for column in field_names:
field_types.append(Types.STRING())
schema.column(column, DataTypes.STRING())
output_type = Types.ROW_NAMED(field_names, field_types)
my_process = MyProccessFuntion()
ds = ds.process(my_process, output_type=output_type)
table =t_env.from_data_stream(ds, schema.build())
t_env.create_table("print", TableDescriptor.for_connector("print")\
.schema(schema.build())
.build())
table.execute_insert('print').wait()
설명
코드에 대한 설명을 하자면
1.KafkaSource builder를 통하여 Kafka Topic(LIST_DATA)에 저장되어 있는 메세지를, SimpleStringSchema를 사용하여 String 으로 가져온다.
- kafka에 데이터를 가져오기 위하여 pyflink datastream api를 활용했고, table api를 활용할 수 있지만, list안에 인자들을 하나하나 빼오기에는 숙련도가 부족하여 datastream api를 활용했음
2.MyProcessFuntion을 작성, field_name에 맞게 dict를 구현하고 value 모두 None으로 적용, 들어온 데이터와 결합하여 yield로 인자(dict)하나씩 전달
- yield할 때 데이터의 타입을 types.Row로 변환하여 적용
- 받아야하는 Schema는 [A, B, C, D ,E]이며, 들어오는 데이터는 [A, B, C, D]이다.
- [E]와 같이 field가 다르다면, null로 채운다.
3.미리 정의한 field_names로부터, name과 type을 정의 및 output_type을 적용한다.
4.Schema builder를 통하여, 받아야하는 schema의 형태를 정의한다.
5.print table을 생성하여, print 한다.
2번에서 받아야하는 Schema를 미리 정의하고, null 값으로 채웠기 때문에
raise Exception("Field names {0} not exist in {1}.".format(difference, self._fields))
Exception: Field names ['E'] not exist in ['A', 'B', 'C', 'D'].
위 오류에 대해서 대처할 수 있었음.
기록
1 types.Row를 지정하지 않고, dict로 전달할 때 발생
ERROR 발생
AttributeError: 'dict' object has no attribute 'get_fields_by_names'
dict 타입은 get_fields_by_names 함수가 없다는 Error로 파이썬 내부 자료구조에는 get_fields_by_names는 존재하지않음.
types.Row 공식문서 에 따르면 get_fields_by_names은 types.Row class에 정의되어 있음
ds = ds.process(my_process, output_type=output_type)
table =t_env.from_data_stream(ds, schema.build())
위 동작시 내부적으로 get_fields_by_names는을 호출하는 것으로 파악함.
또한 DataStream API Integreation 문서에서
DataStream, Table API간 전환시 오버헤드가 발생할 수 있지만,
데이터 구조는 Row <-> RowData로 변환되어야 한다고 나와있고, 일반적으로는 이 오버헤드는 무시할 수 있다고 함.
데이터의 구조를 dict가 아니라 Row형태로 변환시켜준 것.
2 output_type을 지정하지 않으면?
yield 할때, Types.Row field의 name, type을 지정했다고 생각할 수 있지만, output타입에도 따로 지정해줘야 함
ds.process 에서 output_type 인자를 TypeInformation 형태로 전달 할 수 있음
ds = ds.process(my_process)
table =t_env.from_data_stream(ds, schema.build())
schema.build()로 인하여 아래 Error 발생
Unable to find a field named 'A' in the physical data type derived from the given type information for schema declaration. Make sure that the type information is not a generic raw type. Currently available fields are: [f0]
output_type의 field name을 따로 지정하지 않으면,default로 f0로 설정되어 있는것으로 확인함
따라서 schema에서 지정한 field name과 달라 발생하는 오류를 확인.
'데이터 엔지니어링' 카테고리의 다른 글
[Trino] id/password 적용하기 (0) | 2024.10.25 |
---|---|
[PyFlink] 기록 # 2 Iceberg 도입시 Error (1) | 2024.09.21 |
[Flink] Flink Architecture (0) | 2024.08.24 |
[kafka] kafka cluster 구축하기 (2) | 2024.01.28 |
[Kafka] kafka 아키텍처와 구성 (0) | 2024.01.21 |