我正在测试Apache Flink和Spring Boot之间的集成,可以在IDE上运行它们,但是当我尝试在Apache Flink Cluster上运行时,我遇到了一个与ClassLoader相关的异常。
这些类非常简单:
BootFlinkApplication
@SpringBootApplication @ComponentScan("com.example.demo") public class BootFlinkApplication { public static void main(String[] args) { System.out.println("some test"); SpringApplication.run(BootFlinkApplication.class, args); } }
FlinkTest
@Service public class FlinkTest { @PostConstruct public void init() { StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); see.fromElements(1, 2, 3, 4) .filter(new RemoveNumber3Filter()).print(); try { see.execute(); } catch (Exception e) { System.out.println("Error executing flink job: " + e.getMessage()); } } }
RemoveNumber3Filter
public class RemoveNumber3Filter implements FilterFunction<Integer> { @Override public boolean filter(Integer i) throws Exception { return i != 3; } }
例外:
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot load user class: com.example.demo.RemoveNumber3Filter ClassLoader info: URL ClassLoader: file: '/tmp/blobStore-850f3189-807e-4f8d-a8a6-3bd3c1bd76b4/job_eb93b239080b4d4e09f10f1e3605744d/blob_p-5fd56f3348976c0d333d680fde4a79573c21cd40-48ac0995eee11f38ce3ff4f890102af8' (valid JAR) Class not resolvable through given classloader. at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:232) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:355) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:282) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:126) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:231) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) at java.lang.Thread.run(Thread.java:748)
您可能使用Spring Boot Maven插件(https://docs.spring.io/spring- boot/docs/current/reference/html/build-tool-plugins-maven- plugin.html)重新打包Jar,以生成一个可执行jar,但是它使用自定义引导布局,Apache Flink的内部类加载器不支持该自定义引导布局。您尝试部署的文件(.jar.original)旁边应该有一个原始的jar文件,您可以将其用于Flink群集上的部署。
或者,您可以使用另一种方式来生成具有所有依赖项的Jar,例如maven- shade(https://maven.apache.org/plugins/maven-shade- plugin/)