我目前正在使用Python处理数据流 模板 ,我想访问作业ID并将其保存到特定的Firestore文档。
是否可以访问作业ID?
我在文档中找不到与此有关的任何内容。
您可以通过dataflow.projects().locations().jobs().list在管道中进行调用来实现(请参见下面的完整代码)。一种可能性是始终使用相同的作业名称来调用模板,这很有意义,否则可以将作业前缀作为运行时参数传递。使用正则表达式解析作业列表,以查看该作业是否包含名称前缀,如果包含名称前缀,则返回该作业ID。如果有多个,它将仅返回最新的一个(当前正在运行的一个)。
dataflow.projects().locations().jobs().list
在定义PROJECT和BUCKET变量之后,使用以下命令暂存该模板:
PROJECT
BUCKET
python script.py \ --runner DataflowRunner \ --project $PROJECT \ --staging_location gs://$BUCKET/staging \ --temp_location gs://$BUCKET/temp \ --template_location gs://$BUCKET/templates/retrieve_job_id
然后,myjobprefix在执行模板化作业时指定所需的作业名称(在我的情况下):
myjobprefix
gcloud dataflow jobs run myjobprefix \ --gcs-location gs://$BUCKET/templates/retrieve_job_id
该retrieve_job_id函数将从作业中返回作业ID,将更job_prefix改为与给定名称匹配。
retrieve_job_id
job_prefix
import argparse, logging, re from googleapiclient.discovery import build from oauth2client.client import GoogleCredentials import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions def retrieve_job_id(element): project = 'PROJECT_ID' job_prefix = "myjobprefix" location = 'us-central1' logging.info("Looking for jobs with prefix {} in region {}...".format(job_prefix, location)) try: credentials = GoogleCredentials.get_application_default() dataflow = build('dataflow', 'v1b3', credentials=credentials) result = dataflow.projects().locations().jobs().list( projectId=project, location=location, ).execute() job_id = "none" for job in result['jobs']: if re.findall(r'' + re.escape(job_prefix) + '', job['name']): job_id = job['id'] break logging.info("Job ID: {}".format(job_id)) return job_id except Exception as e: logging.info("Error retrieving Job ID") raise KeyError(e) def run(argv=None): parser = argparse.ArgumentParser() known_args, pipeline_args = parser.parse_known_args(argv) pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True p = beam.Pipeline(options=pipeline_options) init_data = (p | 'Start' >> beam.Create(["Init pipeline"]) | 'Retrieve Job ID' >> beam.FlatMap(retrieve_job_id)) p.run() if __name__ == '__main__': run()