我有一个Apache Beam / Dataflow管道,正在将结果写入BigQuery表。然后,我想在此表中查询管道的单独部分。但是,我似乎无法弄清楚如何正确设置此管道依赖项。我编写(然后要查询)的新表与单独的表结合在一起以进行某些过滤逻辑,这就是为什么我实际上需要编写表然后运行查询的原因。逻辑如下:
with beam.Pipeline(options=pipeline_options) as p: table_data = p | 'CreatTable' >> # ... logic to generate table ... # Write Table to BQ table_written = table_data | 'WriteTempTrainDataBQ' >> beam.io.WriteToBigQuery(...) query_results = table_written | 'QueryNewTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_new_table))
ifquery_new_table实际上是对一个已经存在的BQ表的查询,我更改为query_results = p |而不是table_written正常运行。但是,如果我尝试查询正在流水线中间编写的表,那么在实际生成该表之前,我无法使流水线步骤“等待”。有什么办法可以做我所忽略的吗?
query_new_table
query_results = p |
table_written
当我尝试按顺序执行此步骤时,我收到一个断言错误assert isinstance(pbegin, pvalue.PBegin) AssertionError,table_written由于这不是有效的PCollection实例,因此我读的是问题所在。
assert isinstance(pbegin, pvalue.PBegin) AssertionError
有人知道我可以代替table_write使其实际按需顺序运行吗?
Beam当前不支持用例“在BigQuery写入完成后执行某些操作”。唯一的解决方法是运行单独的管道:让您的主程序为:运行写入BigQuery的管道;等待管道完成;运行另一个从BigQuery读取的管道。
这是一个非常常用的功能,我们开始设计这种支持(更广泛地说,是在升级各种IO写入以支持它们之后的排序操作),但是我不知道何时完成。