我正在尝试从Google扳手数据库中读取一个表,并将其写入文本文件以使用python sdk和google dataflow进行备份。我写了以下脚本:
from __future__ import absolute_import import argparse import itertools import logging import re import time import datetime as dt import logging import apache_beam as beam from apache_beam.io import iobase from apache_beam.io import WriteToText from apache_beam.io.range_trackers import OffsetRangeTracker, UnsplittableRangeTracker from apache_beam.metrics import Metrics from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions, SetupOptions from apache_beam.options.pipeline_options import GoogleCloudOptions from google.cloud.spanner.client import Client from google.cloud.spanner.keyset import KeySet BUCKET_URL = 'gs://my_bucket' OUTPUT = '%s/output/' % BUCKET_URL PROJECT_ID = 'my_project' INSTANCE_ID = 'my_instance' DATABASE_ID = 'my_db' JOB_NAME = 'spanner-backup' TABLE = 'my_table' class SpannerSource(iobase.BoundedSource): def __init__(self): logging.info('Enter __init__') self.spannerOptions = { "id": PROJECT_ID, "instance": INSTANCE_ID, "database": DATABASE_ID } self.SpannerClient = Client def estimate_size(self): logging.info('Enter estimate_size') return 1 def get_range_tracker(self, start_position=None, stop_position=None): logging.info('Enter get_range_tracker') if start_position is None: start_position = 0 if stop_position is None: stop_position = OffsetRangeTracker.OFFSET_INFINITY range_tracker = OffsetRangeTracker(start_position, stop_position) return UnsplittableRangeTracker(range_tracker) def read(self, range_tracker): # This is not called when using the dataflowRunner ! logging.info('Enter read') # instantiate spanner client spanner_client = self.SpannerClient(self.spannerOptions["id"]) instance = spanner_client.instance(self.spannerOptions["instance"]) database = instance.database(self.spannerOptions["database"]) # read from table table_fields = database.execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % TABLE) table_fields.consume_all() self.columns = [x[0] for x in table_fields] keyset = KeySet(all_=True) results = database.read(table=TABLE, columns=self.columns, keyset=keyset) # iterator over rows results.consume_all() for row in results: JSON_row = { self.columns[i]: row[i] for i in range(len(self.columns)) } yield JSON_row def split(self, start_position=None, stop_position=None): # this should not be called since the source is unspittable logging.info('Enter split') if start_position is None: start_position = 0 if stop_position is None: stop_position = 1 # Because the source is unsplittable (for now), only a single source is returned yield iobase.SourceBundle( weight=1, source=self, start_position=start_position, stop_position=stop_position) def run(argv=None): """Main entry point""" pipeline_options = PipelineOptions() google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) google_cloud_options.project = PROJECT_ID google_cloud_options.job_name = JOB_NAME google_cloud_options.staging_location = '%s/staging' % BUCKET_URL google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL #pipeline_options.view_as(StandardOptions).runner = 'DirectRunner' pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner' p = beam.Pipeline(options=pipeline_options) output = p | 'Get Rows from Spanner' >> beam.io.Read(SpannerSource()) iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat() output | 'Store in GCS' >> WriteToText(file_path_prefix=OUTPUT + iso_datetime + '-' + TABLE, file_name_suffix='') # if this line is commented, job completes but does not do anything result = p.run() result.wait_until_finish() if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) run()
但是,此脚本只能在DirectRunner上正确运行:当我让它在DataflowRunner上运行时,它会运行一段时间而没有任何输出,然后退出并出现错误:
“执行失败步骤失败14 […]工作流程失败。原因:[…]工人失去了与服务的联系。”
有时,它会一直持续下去,而不会创建输出。
此外,如果我注释“输出= …”行,则作业完成,但实际上并未读取数据。
似乎dataflowRunner调用源的函数“ estimate_size”,而不调用函数“ read”或“ get_range_tracker”。
有人对导致这种情况的原因有任何想法吗?我知道有一个(更完整的)带有实验性扳手源/接收器的Java SDK,但如果可能的话,我宁愿使用python。
谢谢
Google当前添加了对Backup Spanner和Dataflow的支持,您可以在创建DataFlow作业时选择相关模板。
有关更多信息:https : //cloud.google.com/blog/products/gcp/cloud-spanner-adds-import-export- functionality-to-ease-data- movement