我想使用Dataflow将数据从发布/订阅移到GCS。因此,基本上我希望Dataflow在固定的时间量(例如15分钟)内累积一些消息,然后在经过该时间量后将这些数据作为文本文件写入GCS。
我的最终目标是创建一个自定义管道,因此“ Pub / Sub to Cloud Storage”模板对我来说还不够,而且我完全不了解Java,这使我开始使用Python进行调整。
这是到目前为止我所获得的(Apache Beam Python SDK 2.10.0):
import apache_beam as beam TOPIC_PATH="projects/<my-project>/topics/<my-topic>" def CombineFn(e): return "\n".join(e) o = beam.options.pipeline_options.PipelineOptions() p = beam.Pipeline(options=o) data = ( p | "Read From Pub/Sub" >> beam.io.ReadFromPubSub(topic=TOPIC_PATH) | "Window" >> beam.WindowInto(beam.window.FixedWindows(30)) | "Combine" >> beam.transforms.core.CombineGlobally(CombineFn).without_defaults() | "Output" >> beam.io.WriteToText("<GCS path or local path>")) res = p.run() res.wait_until_finish()
我在本地环境中运行该程序没有问题。
python main.py
它会在本地运行,但会从指定的Pub / Sub主题读取,并且每隔30秒就会按预期方式写入指定的GCS路径。
但是现在的问题是,当我在Google Cloud Platform(即Cloud Dataflow)上运行它时,它不断发出神秘的异常。
java.util.concurrent.ExecutionException: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute response = task() File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda> self._execute(lambda: worker.do_instruction(work), work) File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction request.instruction_id) File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 272, in process_bundle bundle_processor.process_bundle(instruction_id) File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 494, in process_bundle op.finish() File "apache_beam/runners/worker/operations.py", line 506, in apache_beam.runners.worker.operations.DoOperation.finish def finish(self): File "apache_beam/runners/worker/operations.py", line 507, in apache_beam.runners.worker.operations.DoOperation.finish with self.scoped_finish_state: File "apache_beam/runners/worker/operations.py", line 508, in apache_beam.runners.worker.operations.DoOperation.finish self.dofn_runner.finish() File "apache_beam/runners/common.py", line 703, in apache_beam.runners.common.DoFnRunner.finish self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle) File "apache_beam/runners/common.py", line 697, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method self._reraise_augmented(exn) File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.DoFnRunner._reraise_augmented raise_with_traceback(new_exn) File "apache_beam/runners/common.py", line 695, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method bundle_method() File "apache_beam/runners/common.py", line 361, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle def invoke_finish_bundle(self): File "apache_beam/runners/common.py", line 364, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle self.output_processor.finish_bundle_outputs( File "apache_beam/runners/common.py", line 832, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs self.main_receivers.receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 87, in apache_beam.runners.worker.operations.ConsumerSet.receive self.update_counters_start(windowed_value) File "apache_beam/runners/worker/operations.py", line 93, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start self.opcounter.update_from(windowed_value) File "apache_beam/runners/worker/opcounters.py", line 195, in apache_beam.runners.worker.opcounters.OperationCounters.update_from self.do_sample(windowed_value) File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample self.coder_impl.get_estimated_size_and_observables(windowed_value)) File "apache_beam/coders/coder_impl.py", line 953, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables def get_estimated_size_and_observables(self, value, nested=False): File "apache_beam/coders/coder_impl.py", line 969, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables self._windows_coder.estimate_size(value.windows, nested=True)) File "apache_beam/coders/coder_impl.py", line 758, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size self.get_estimated_size_and_observables(value)) File "apache_beam/coders/coder_impl.py", line 772, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables self._elem_coder.get_estimated_size_and_observables( File "apache_beam/coders/coder_impl.py", line 134, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables return self.estimate_size(value, nested), [] File "apache_beam/coders/coder_impl.py", line 458, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size typed_value = value TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'generatedPtransform-1090'] java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57) org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:280) org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:84) org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:130) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144) org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -1096: Traceback (most recent call last): File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 148, in _execute response = task() File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 183, in <lambda> self._execute(lambda: worker.do_instruction(work), work) File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 256, in do_instruction request.instruction_id) File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/sdk_worker.py", line 272, in process_bundle bundle_processor.process_bundle(instruction_id) File "/usr/local/lib/python2.7/dist-packages/apache_beam/runners/worker/bundle_processor.py", line 494, in process_bundle op.finish() File "apache_beam/runners/worker/operations.py", line 506, in apache_beam.runners.worker.operations.DoOperation.finish def finish(self): File "apache_beam/runners/worker/operations.py", line 507, in apache_beam.runners.worker.operations.DoOperation.finish with self.scoped_finish_state: File "apache_beam/runners/worker/operations.py", line 508, in apache_beam.runners.worker.operations.DoOperation.finish self.dofn_runner.finish() File "apache_beam/runners/common.py", line 703, in apache_beam.runners.common.DoFnRunner.finish self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle) File "apache_beam/runners/common.py", line 697, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method self._reraise_augmented(exn) File "apache_beam/runners/common.py", line 722, in apache_beam.runners.common.DoFnRunner._reraise_augmented raise_with_traceback(new_exn) File "apache_beam/runners/common.py", line 695, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method bundle_method() File "apache_beam/runners/common.py", line 361, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle def invoke_finish_bundle(self): File "apache_beam/runners/common.py", line 364, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle self.output_processor.finish_bundle_outputs( File "apache_beam/runners/common.py", line 832, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs self.main_receivers.receive(windowed_value) File "apache_beam/runners/worker/operations.py", line 87, in apache_beam.runners.worker.operations.ConsumerSet.receive self.update_counters_start(windowed_value) File "apache_beam/runners/worker/operations.py", line 93, in apache_beam.runners.worker.operations.ConsumerSet.update_counters_start self.opcounter.update_from(windowed_value) File "apache_beam/runners/worker/opcounters.py", line 195, in apache_beam.runners.worker.opcounters.OperationCounters.update_from self.do_sample(windowed_value) File "apache_beam/runners/worker/opcounters.py", line 213, in apache_beam.runners.worker.opcounters.OperationCounters.do_sample self.coder_impl.get_estimated_size_and_observables(windowed_value)) File "apache_beam/coders/coder_impl.py", line 953, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables def get_estimated_size_and_observables(self, value, nested=False): File "apache_beam/coders/coder_impl.py", line 969, in apache_beam.coders.coder_impl.WindowedValueCoderImpl.get_estimated_size_and_observables self._windows_coder.estimate_size(value.windows, nested=True)) File "apache_beam/coders/coder_impl.py", line 758, in apache_beam.coders.coder_impl.SequenceCoderImpl.estimate_size self.get_estimated_size_and_observables(value)) File "apache_beam/coders/coder_impl.py", line 772, in apache_beam.coders.coder_impl.SequenceCoderImpl.get_estimated_size_and_observables self._elem_coder.get_estimated_size_and_observables( File "apache_beam/coders/coder_impl.py", line 134, in apache_beam.coders.coder_impl.CoderImpl.get_estimated_size_and_observables return self.estimate_size(value, nested), [] File "apache_beam/coders/coder_impl.py", line 458, in apache_beam.coders.coder_impl.IntervalWindowCoderImpl.estimate_size typed_value = value TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'generatedPtransform-1090'] org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:157) org.apache.beam.runners.fnexecution.control.FnApiControlClient$ResponseStreamObserver.onNext(FnApiControlClient.java:140) org.apache.beam.vendor.grpc.v1p13p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onMessage(ServerCalls.java:248) org.apache.beam.vendor.grpc.v1p13p1.io.grpc.ForwardingServerCallListener.onMessage(ForwardingServerCallListener.java:33) org.apache.beam.vendor.grpc.v1p13p1.io.grpc.Contexts$ContextualizedServerCallListener.onMessage(Contexts.java:76) org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.messagesAvailable(ServerCallImpl.java:263) org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1MessagesAvailable.runInContext(ServerImpl.java:683) org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) org.apache.beam.vendor.grpc.v1p13p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
每次尝试写入GCS时,都会以非阻塞方式显示上面的异常。这导致我遇到一种情况,当它尝试输出时,肯定会生成一个新的文本文件,但文本内容始终与第一个窗口输出相同。这显然是不必要的。
异常是如此深深地嵌套在堆栈跟踪中,以至于很难猜测根本原因是什么,我不知道为什么它在DirectRunner上运行得很好,而在DataflowRunner上却运行不佳。似乎说在管道中的某处,全局窗口值被转换为非全局窗口值,尽管我在管道的第二阶段使用了非全局窗口变换。添加自定义触发器没有帮助。
我遇到了同样的错误,找到了解决方法,但没有解决方法:
TypeError: Cannot convert GlobalWindow to apache_beam.utils.windowed_value._IntervalWindowBase [while running 'test-file-out/Write/WriteImpl/WriteBundles']
使用DirectRunner和在数据流上本地运行DataflowRunner。
DirectRunner
DataflowRunner
恢复为apache-beam [gcp] == 2.9.0可使我的管道按预期运行。