编程知识 cdmana.com

' connector.type 'expectations' filesystem', but is' Kafka '

The complete error report is as follows :

##########################################################################################################################################################

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/table/factories/StreamTableSourceFactory
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at java.lang.ClassLoader.defineClass1(Native Method)
	at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:348)
	at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:370)
	at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
	at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
	at java.util.Iterator.forEachRemaining(Iterator.java:116)
	at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
	at org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)
	at org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
	at org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
	at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:315)
	at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:285)
	at org.apache.flink.table.api.bridge.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:476)
	at FlinkKafkaDDLDemo$.main(FlinkKafkaDDLDemo.scala:27)
	at FlinkKafkaDDLDemo.main(FlinkKafkaDDLDemo.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.factories.StreamTableSourceFactory
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 39 more
 Add the following dependencies :
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner_2.12</artifactId>
    <version>1.12.0</version>
    <scope>provided</scope>
</dependency>

##########################################################################################################################################################

obtain :

Exception in thread "main" org.apache.flink.table.api.TableException: Could not instantiate the executor. Make sure a planner module is on the classpath
	at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:331)
	at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:285)
	at org.apache.flink.table.api.bridge.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:476)
	at FlinkKafkaDDLDemo$.main(FlinkKafkaDDLDemo.scala:26)
	at FlinkKafkaDDLDemo.main(FlinkKafkaDDLDemo.scala)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in
the classpath.

Reason: No factory supports the additional filters.

The following properties are requested:
class-name=org.apache.flink.table.planner.delegation.BlinkExecutorFactory
streaming-mode=true

The following factories have been considered:
org.apache.flink.table.executor.StreamExecutorFactory
	at org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:71)
	at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl$.lookupExecutor(StreamTableEnvironmentImpl.scala:315)
	... 4 more

Add the following dependencies :

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>1.12.0</version>
</dependency>

##########################################################################################################################################################

Continue debugging, we have a problem :

Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)[Ljava/lang/Object;
	at org.apache.flink.table.planner.delegation.PlannerBase.<init>(PlannerBase.scala:118)
	at org.apache.flink.table.planner.delegation.StreamPlanner.<init>(StreamPlanner.scala:47)
	at org.apache.flink.table.planner.delegation.BlinkPlannerFactory.create(BlinkPlannerFactory.java:50)
	at org.apache.flink.table.api.bridge.scala.internal.StreamTableEnvironmentImpl$.create(StreamTableEnvironmentImpl.scala:294)
	at org.apache.flink.table.api.bridge.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:476)
	at org.apache.flink.table.api.bridge.scala.StreamTableEnvironment$.create(StreamTableEnvironment.scala:448)
	at FlinkKafkaDDLDemo$.main(FlinkKafkaDDLDemo.scala:28)
	at FlinkKafkaDDLDemo.main(FlinkKafkaDDLDemo.scala)

project structure in sdk Change it to scala2.12.8

##########################################################################################################################################################

Continue debugging, we have a problem :

Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
	at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
	at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:193)
	at org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:96)
	at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
	at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:165)
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:157)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:823)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:795)
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:250)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:639)
	at FlinkKafkaDDLDemo$.main(FlinkKafkaDDLDemo.scala:86)
	at FlinkKafkaDDLDemo.main(FlinkKafkaDDLDemo.scala)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=Desktop:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=Desktop:9091
connector.startup-mode=earliest-offset
connector.topic=kafka_ddl
connector.type=kafka
connector.version=2.5.0
format.derive-schema=true
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=name
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=age
schema.2.data-type=VARCHAR(2147483647)
schema.2.name=city
schema.3.data-type=VARCHAR(2147483647)
schema.3.name=address
schema.4.data-type=TIMESTAMP(6)
schema.4.name=ts
update-mode=append

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
	at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
	at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
	at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
	at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
	at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
	... 20 more

Process finished with exit code 1

Uncertain , The current code is :

https://gitee.com/appleyuchi/Flink_Code/blob/master/flink%E8%AF%BBkafka/Scala/src/main/scala/FlinkKafkaDDLDemo.scala

版权声明
本文为[Applied Sciences]所创,转载请带上原文链接,感谢
https://cdmana.com/2020/12/20201224214810595m.html

Scroll to Top