반응형
진행중인 프로젝트에 Iceberg를 도입 중 등장한 Error 처리에 대한 기록
NoClassDefFoundError: org/apache/hadoop/conf/Configuration
java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
at org.apache.iceberg.flink.FlinkCatalogFactory.clusterHadoopConf(FlinkCatalogFactory.java:211)
at org.apache.iceberg.flink.FlinkCatalogFactory.createCatalog(FlinkCatalogFactory.java:139)
at org.apache.flink.table.factories.FactoryUtil.createCatalog(FactoryUtil.java:485)
at org.apache.flink.table.catalog.CatalogManager.initCatalog(CatalogManager.java:320)
at org.apache.flink.table.catalog.CatalogManager.createCatalog(CatalogManager.java:312)
at org.apache.flink.table.operations.ddl.CreateCatalogOperation.execute(CreateCatalogOperation.java:70)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:521)
... 19 more
Hadoop Classpath를 찾지 못하여 발생하는 Error로 판단하여, 직접 hadoop 설치 후, hadoop classpath를 환경변수로 지정해주었음
Docker file
FROM flink:1.19.1-scala_2.12
#FROM pyflink:latest
#mirror 서버 kakao로 변경
RUN sed -i 's/archive.ubuntu.com/mirror.kakao.com/g' /etc/apt/sources.list
RUN sed -i 's/security.ubuntu.com/mirror.kakao.com/g' /etc/apt/sources.list
RUN apt update -y && \
apt install -y python3.10 python3-pip python3.10-dev && rm -rf /var/lib/apt/lists/*
RUN mkdir /opt/flink/examples/python/custom
ADD ./flink/lib /opt/flink/lib
ADD ./flink/usrlib /opt/flink/usrlib
ADD ./flink/tars /opt
RUN tar xzvf /opt/hadoop-2.8.5.tar.gz -C /opt/
#직접 Hadoop 설치 후 Classpath 지정
ENV HADOOP_HOME=/opt/hadoop-2.8.5
ENV HADOOP_CLASSPATH=/opt/hadoop-2.8.5/etc/hadoop:/opt/hadoop-2.8.5/share/hadoop/common/lib/*:/opt/hadoop-2.8.5/share/hadoop/common/*:/opt/hadoop-2.8.5/share/hadoop/hdfs:/opt/hadoop-2.8.5/share/hadoop/hdfs/lib/*:/opt/hadoop-2.8.5/share/hadoop/hdfs/*:/opt/hadoop-2.8.5/share/hadoop/yarn/lib/*:/opt/hadoop-2.8.5/share/hadoop/yarn/*:/opt/hadoop-2.8.5/share/hadoop/mapreduce/lib/*:/opt/hadoop-2.8.5/share/hadoop/mapreduce/*:/contrib/capacity-scheduler/*.jar
RUN ln -s /usr/bin/python3 /usr/bin/python
RUN pip config set global.index-url https://mirror.kakao.com/pypi/simple/
RUN pip config set global.extra-index-url https://pypi.org/simple/
RUN pip config set global.trusted-host mirror.kakao.com
RUN pip install apscheduler apache-flink==1.19.1
환경변수 설정 시 직접 HADOOP_CLASSPATH에 하드코딩하여 넣어주었지만, 명령어(pwd, ./bin/hadoop classpath) 등으로 설정하는 방법을 찾아봐야 함.
추가 설정
iceberg, hive(catalog) 관련 class 못 읽는 Error 발생 시 아래 repo에서 설치 후 lib 경로에 적용
iceberg 공식 문서 참고
- iceberg : https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-1.19/1.6.1/
- hive : https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.9_2.12/
해결못한 Error
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: Could not execute CreateTable in path `hive_catalog`.`default`.`iceberg_table`
at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:1300)
at org.apache.flink.table.catalog.CatalogManager.createTable(CatalogManager.java:950)
at org.apache.flink.table.operations.ddl.CreateTableOperation.execute(CreateTableOperation.java:86)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1107)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:735)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.iceberg.hive.RuntimeMetaException: Failed to connect to Hive Metastore
at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:85)
at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:34)
at org.apache.iceberg.ClientPoolImpl.get(ClientPoolImpl.java:143)
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:70)
at org.apache.iceberg.ClientPoolImpl.run(ClientPoolImpl.java:65)
at org.apache.iceberg.hive.CachedClientPool.run(CachedClientPool.java:122)
at org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:147)
at org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:90)
at org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:73)
at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:191)
at org.apache.iceberg.CachingCatalog$CachingTableBuilder.lambda$create$0(CachingCatalog.java:262)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406)
at java.base/java.util.concurrent.ConcurrentHashMap.compute(Unknown Source)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108)
at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62)
at org.apache.iceberg.CachingCatalog$CachingTableBuilder.create(CachingCatalog.java:258)
at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:75)
at org.apache.iceberg.flink.FlinkCatalog.createIcebergTable(FlinkCatalog.java:415)
at org.apache.iceberg.flink.FlinkCatalog.createTable(FlinkCatalog.java:395)
at org.apache.flink.table.catalog.CatalogManager.lambda$createTable$18(CatalogManager.java:961)
at org.apache.flink.table.catalog.CatalogManager.execute(CatalogManager.java:1294)
... 15 more
Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1742)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.<init>(RetryingMetaStoreClient.java:83)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:133)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:97)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.iceberg.common.DynMethods$UnboundMethod.invokeChecked(DynMethods.java:62)
at org.apache.iceberg.common.DynMethods$UnboundMethod.invoke(DynMethods.java:74)
at org.apache.iceberg.common.DynMethods$StaticMethod.invoke(DynMethods.java:187)
at org.apache.iceberg.hive.HiveClientPool.newClient(HiveClientPool.java:63)
... 37 more
Caused by: java.lang.reflect.InvocationTargetException
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.base/java.lang.reflect.Constructor.newInstance(Unknown Source)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1740)
... 49 more
Caused by: MetaException(message:org/datanucleus/NucleusContext)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:83)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:92)
at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:6902)
at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.<init>(HiveMetaStoreClient.java:162)
... 54 more
Caused by: java.lang.NoClassDefFoundError: org/datanucleus/NucleusContext
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Unknown Source)
at org.apache.hadoop.hive.metastore.MetaStoreUtils.getClass(MetaStoreUtils.java:1708)
at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:64)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStoreForConf(HiveMetaStore.java:628)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMSForConf(HiveMetaStore.java:594)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:588)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:655)
at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:431)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:148)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
at org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:79)
... 57 more
Caused by: java.lang.ClassNotFoundException: org.datanucleus.NucleusContext
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(Unknown Source)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(Unknown Source)
at java.base/java.lang.ClassLoader.loadClass(Unknown Source)
... 73 more
pyflink로 hive catalog 및 table 생성하지 못하는 Error로 hive metastore 설치 후 진행해야 할 것으로 판단됨
Pyflink code
import os
import sys
import json
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import ProcessFunction, RuntimeContext
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer, KafkaSink, KafkaRecordSerializationSchema
from pyflink.common.serialization import SimpleStringSchema
from pyflink.common import Types
from pyflink.common.types import Row, RowKind
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream.formats.json import JsonRowSerializationSchema
from pyflink.table.table_environment import StreamTableEnvironment, EnvironmentSettings, TableDescriptor
from pyflink.table.schema import Schema
from pyflink.table.expression import DataTypes
from pyflink.table.expressions import col, concat_ws, lit
from pyflink.table.catalog import CatalogBaseTable, CatalogDatabase
sample_data = """[{'k_year' : '2024', 'k_mon' : '09', 'k_day' : '07', 'dt' : '20240907', 'code' : 'A', 'data' : 'a'},
{'k_year' : '2024', 'k_mon' : '09', 'k_day' : '08', 'dt' : '20240908', 'code' : 'B', 'data' : 'b'},
{'k_year' : '2024', 'k_mon' : '09', 'k_day' : '09', 'dt' : '20240909', 'code' : 'C', 'data' : 'c'}]"""
field_names = ["k_year", "k_mon", "k_day", "dt", "code", "data"]
field_types = []
schema = Schema.new_builder()
for column in field_names:
schema.column(column, DataTypes.STRING())
field_types.append(Types.STRING())
class MyFunction(ProcessFunction):
def process_element(self, value, ctx: ProcessFunction.Context):
datas = eval(value)
for data in datas:
row = Row(*data.keys())
yield row(*data.values())
if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)
os_name = sys.platform
if os_name == 'win32':
jars_path = os.path.join(os.getcwd(), "jars_19")
all_jar_files = os.listdir(jars_path)
jars = [os.path.join(jars_path, jar_file) for jar_file in all_jar_files]
for jar in jars:
env.add_jars(f"file:///{jar}")
t_env = StreamTableEnvironment.create(env)
table = env.from_collection([sample_data])
table = table.process(MyFunction(), output_type=Types.ROW_NAMED(field_names, field_types))
t_env.create_temporary_view('my_table', table)
my_table = t_env.from_path('my_table').select(col('*'))
# t_env.create_table("print_table", TableDescriptor.for_connector("print")\
# .schema(schema.build())
# .build())
# my_table.execute_insert('print_table')
t_env.execute_sql("""
CREATE CATALOG hive_catalog WITH(
'type' = 'iceberg',
'catalog-type' = 'hive'
)
""")
t_env.execute_sql("""
CREATE TABLE `hive_catalog`.`default`.`iceberg_table`(
k_year VARCHAR,
k_mon VARCHAR,
k_day VARCHAR,
dt VARCHAR,
code VARCHAR,
data VARCHAR
)
""")
my_table.execute_insert('iceberg_table')
반응형
'데이터 엔지니어링' 카테고리의 다른 글
[Airflow] Webserver 속도 개선 (0) | 2024.11.04 |
---|---|
[Trino] id/password 적용하기 (0) | 2024.10.25 |
[PyFlink] 기록#1 Dictionary in List 처리 (1) | 2024.08.31 |
[Flink] Flink Architecture (0) | 2024.08.24 |
[kafka] kafka cluster 구축하기 (2) | 2024.01.28 |