_Han_
나의 개발 노트
_Han_
  • 분류 전체보기 (272)
    • 데이터 엔지니어링 (29)
    • 인프라 (3)
    • 추천시스템 (11)
    • 코딩테스트 (146)
    • 부트캠프 회고 (15)
    • 회고 (4)
    • 자격증 (1)
    • 파이썬 프로그래밍 (6)
    • 통계 (2)
    • Git (21)
    • 유니티2D (33)

최근 글

반응형
hELLO · Designed By 정상우.
_Han_

나의 개발 노트

데이터 엔지니어링

[PyFlink] 기록 # 2 Iceberg 도입시 Error

2024. 9. 21. 23:26
반응형

진행중인 프로젝트에 Iceberg를 도입 중 등장한 Error 처리에 대한 기록

NoClassDefFoundError: org/apache/hadoop/conf/Configuration

java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
        at org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:211)
        at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:139)
        at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:485)
        at org.apache.flink.table.catalog.CatalogManager.initCatalog(CatalogManager.java:320)
        at org.apache.flink.table.catalog.CatalogManager.createCatalog(CatalogManager.java:312)
        at org.apache.flink.table.operations.ddl.CreateCatalogOperation.execute(CreateCatalogOperation.java:70)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
        ... 19 more

 
Hadoop Classpath를 찾지 못하여 발생하는 Error로 판단하여, 직접 hadoop 설치 후, hadoop classpath를 환경변수로 지정해주었음
 

Docker file

FROM flink:1.19.1-scala_2.12
#FROM pyflink:latest

#mirror 서버 kakao로 변경
RUN sed -i 's/archive.ubuntu.com/mirror.kakao.com/g' /etc/apt/sources.list
RUN sed -i 's/security.ubuntu.com/mirror.kakao.com/g' /etc/apt/sources.list

RUN apt update -y && \
apt install -y python3.10 python3-pip python3.10-dev && rm -rf /var/lib/apt/lists/*

RUN mkdir /opt/flink/examples/python/custom

ADD ./flink/lib /opt/flink/lib
ADD ./flink/usrlib /opt/flink/usrlib

ADD ./flink/tars /opt

RUN tar xzvf /opt/hadoop-2.8.5.tar.gz -C /opt/

#직접 Hadoop 설치 후 Classpath 지정
ENV HADOOP_HOME=/opt/hadoop-2.8.5

ENV HADOOP_CLASSPATH=/opt/hadoop-2.8.5/etc/hadoop:/opt/hadoop-2.8.5/share/hadoop/common/lib/*:/opt/hadoop-2.8.5/share/hadoop/common/*:/opt/hadoop-2.8.5/share/hadoop/hdfs:/opt/hadoop-2.8.5/share/hadoop/hdfs/lib/*:/opt/hadoop-2.8.5/share/hadoop/hdfs/*:/opt/hadoop-2.8.5/share/hadoop/yarn/lib/*:/opt/hadoop-2.8.5/share/hadoop/yarn/*:/opt/hadoop-2.8.5/share/hadoop/mapreduce/lib/*:/opt/hadoop-2.8.5/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar

RUN ln -s /usr/bin/python3 /usr/bin/python

RUN pip config set global.index-url https://mirror.kakao.com/pypi/simple/
RUN pip config set global.extra-index-url https://pypi.org/simple/
RUN pip config set global.trusted-host mirror.kakao.com

RUN pip install apscheduler apache-flink==1.19.1

 
환경변수 설정 시 직접 HADOOP_CLASSPATH에 하드코딩하여 넣어주었지만, 명령어(pwd, ./bin/hadoop classpath) 등으로 설정하는 방법을 찾아봐야 함.
 

추가 설정

iceberg, hive(catalog) 관련 class 못 읽는 Error 발생 시 아래 repo에서 설치 후 lib 경로에 적용
iceberg 공식 문서 참고

  • iceberg : https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-1.19/1.6.1/
  • hive : https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9_2.12/

해결못한 Error

pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Could not execute CreateTable in path `hive_catalog`.`default`.`iceberg_table`
        at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:1300)
        at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:950)
        at org.apache.flink.table.operations.ddl.CreateTableOperation.execute(CreateTableOperation.java:86)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.iceberg.hive.RuntimeMetaException: Failed to connect to Hive Metastore
        at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:85)
        at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:34)
        at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:143)
        at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:70)
        at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:65)
        at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
        at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:147)
        at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:90)
        at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:73)
        at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:191)
        at org.apache.iceberg.CachingCatalog$CachingTableBuilder.lambda$create$0(CachingCatalog.java:262)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
        at java.base/java.util.concurrent.ConcurrentHashMap.compute(Unknown Source)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
        at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
        at org.apache.iceberg.CachingCatalog$CachingTableBuilder.create(CachingCatalog.java:258)
        at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:75)
        at org.apache.iceberg.flink.FlinkCatalog.createIcebergTable(FlinkCatalog.java:415)
        at org.apache.iceberg.flink.FlinkCatalog.createTable(FlinkCatalog.java:395)
        at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$18(CatalogManager.java:961)
        at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:1294)
        ... 15 more
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
        at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1742)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:83)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
        at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:97)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:62)
        at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:74)
        at org.apache.iceberg.common.DynMethods$StaticMethod.invoke(DynMethods.java:187)
        at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:63)
        ... 37 more
Caused by: java.lang.reflect.InvocationTargetException
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
        at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
        at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1740)
        ... 49 more
Caused by: MetaException(message:org/datanucleus/NucleusContext)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:83)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:92)
        at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:6902)
        at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:162)
        ... 54 more
Caused by: java.lang.NoClassDefFoundError: org/datanucleus/NucleusContext
        at java.base/java.lang.Class.forName0(Native Method)
        at java.base/java.lang.Class.forName(Unknown Source)
        at org.apache.hadoop.hive.metastore.MetaStoreUtils.getClass(MetaStoreUtils.java:1708)
        at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:64)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStoreForConf(HiveMetaStore.java:628)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMSForConf(HiveMetaStore.java:594)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:588)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:655)
        at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:431)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
        at java.base/java.lang.reflect.Method.invoke(Unknown Source)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:148)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
        at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:79)
        ... 57 more
Caused by: java.lang.ClassNotFoundException: org.datanucleus.NucleusContext
        at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
        at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source)
        at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
        ... 73 more

 
pyflink로 hive catalog 및 table 생성하지 못하는 Error로 hive metastore 설치 후 진행해야 할 것으로 판단됨
 

Pyflink code

import os
import sys
import json

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import ProcessFunction, RuntimeContext
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, KafkaSink, KafkaRecordSerializationSchema

from pyflink.common.serialization import SimpleStringSchema
from pyflink.common import Types
from pyflink.common.types import Row, RowKind
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.formats.json import JsonRowSerializationSchema

from pyflink.table.table_environment import StreamTableEnvironment, EnvironmentSettings, TableDescriptor
from pyflink.table.schema import Schema
from pyflink.table.expression import DataTypes
from pyflink.table.expressions import col, concat_ws, lit
from pyflink.table.catalog import CatalogBaseTable, CatalogDatabase

sample_data = """[{'k_year' : '2024', 'k_mon' : '09', 'k_day' : '07', 'dt' : '20240907', 'code' : 'A', 'data' : 'a'},
                  {'k_year' : '2024', 'k_mon' : '09', 'k_day' : '08', 'dt' : '20240908', 'code' : 'B', 'data' : 'b'},
                  {'k_year' : '2024', 'k_mon' : '09', 'k_day' : '09', 'dt' : '20240909', 'code' : 'C', 'data' : 'c'}]"""
field_names = ["k_year", "k_mon", "k_day", "dt", "code", "data"]
field_types = []
schema = Schema.new_builder()
for column in field_names:
  schema.column(column, DataTypes.STRING())
  field_types.append(Types.STRING())


class MyFunction(ProcessFunction):
  def process_element(self, value, ctx: ProcessFunction.Context):
    datas = eval(value)
    for data in datas:
      row = Row(*data.keys())
      yield row(*data.values())

if __name__ == '__main__':
  env = StreamExecutionEnvironment.get_execution_environment()
  env.set_parallelism(2)

  os_name  = sys.platform
  if os_name == 'win32':
    jars_path = os.path.join(os.getcwd(), "jars_19")
    all_jar_files  = os.listdir(jars_path)
    jars = [os.path.join(jars_path, jar_file) for jar_file in all_jar_files]
    for jar in jars:
      env.add_jars(f"file:///{jar}")

  
  t_env = StreamTableEnvironment.create(env)

  table = env.from_collection([sample_data])
  table = table.process(MyFunction(), output_type=Types.ROW_NAMED(field_names, field_types))
  t_env.create_temporary_view('my_table', table)
  my_table = t_env.from_path('my_table').select(col('*'))
  

  # t_env.create_table("print_table", TableDescriptor.for_connector("print")\
  #               .schema(schema.build())
  #               .build())
                
  # my_table.execute_insert('print_table')

  t_env.execute_sql("""
    CREATE CATALOG hive_catalog WITH(
      'type' = 'iceberg',
      'catalog-type' = 'hive'
    )
  """)

  t_env.execute_sql("""
    CREATE TABLE `hive_catalog`.`default`.`iceberg_table`(
      k_year VARCHAR,
      k_mon VARCHAR,
      k_day VARCHAR,
      dt VARCHAR,
      code VARCHAR,
      data VARCHAR
    )
  """)
  my_table.execute_insert('iceberg_table')

 
 
 
 

반응형

'데이터 엔지니어링' 카테고리의 다른 글

[Airflow] Webserver 속도 개선  (0) 2024.11.04
[Trino] id/password 적용하기  (0) 2024.10.25
[PyFlink] 기록#1 Dictionary in List 처리  (1) 2024.08.31
[Flink] Flink Architecture  (0) 2024.08.24
[kafka] kafka cluster 구축하기  (2) 2024.01.28
    '데이터 엔지니어링' 카테고리의 다른 글
    • [Airflow] Webserver 속도 개선
    • [Trino] id/password 적용하기
    • [PyFlink] 기록#1 Dictionary in List 처리
    • [Flink] Flink Architecture
    _Han_
    _Han_
    학습한 것을 기록합니다.

    티스토리툴바