一旦在相应的注册表中更新了所述图像,让我的Amazon ECS任务更新其 Docker 图像的正确方法是什么?
我创建了一个脚本,用于将更新的 Docker 镜像部署到 ECS 上的暂存服务,以便相应的任务定义引用 Docker 镜像的当前版本。我不确定我是否遵循最佳实践,因此欢迎提供反馈。
要使脚本正常工作,您需要一个备用 ECS 实例或一个deploymentConfiguration.minimumHealthyPercent值,以便 ECS 可以窃取一个实例来部署更新的任务定义。
deploymentConfiguration.minimumHealthyPercent
我的算法是这样的:
我的代码粘贴在下面:
#!/usr/bin/env python3 import subprocess import sys import os.path import json import re import argparse import tempfile _root_dir = os.path.abspath(os.path.normpath(os.path.dirname(__file__))) sys.path.insert(0, _root_dir) from _common import * def _run_ecs_command(args): run_command(['aws', 'ecs', ] + args) def _get_ecs_output(args): return json.loads(run_command(['aws', 'ecs', ] + args, return_stdout=True)) def _tag_image(tag, qualified_image_name, purge): log_info('Tagging image \'{}\' as \'{}\'...'.format( qualified_image_name, tag)) log_info('Pulling image from registry in order to tag...') run_command( ['docker', 'pull', qualified_image_name], capture_stdout=False) run_command(['docker', 'tag', '-f', qualified_image_name, '{}:{}'.format( qualified_image_name, tag), ]) log_info('Pushing image tag to registry...') run_command(['docker', 'push', '{}:{}'.format( qualified_image_name, tag), ], capture_stdout=False) if purge: log_info('Deleting pulled image...') run_command( ['docker', 'rmi', '{}:latest'.format(qualified_image_name), ]) run_command( ['docker', 'rmi', '{}:{}'.format(qualified_image_name, tag), ]) def _register_task_definition(task_definition_fpath, purge): with open(task_definition_fpath, 'rt') as f: task_definition = json.loads(f.read()) task_family = task_definition['family'] tag = run_command([ 'git', 'rev-parse', '--short', 'HEAD', ], return_stdout=True).strip() for container_def in task_definition['containerDefinitions']: image_name = container_def['image'] _tag_image(tag, image_name, purge) container_def['image'] = '{}:{}'.format(image_name, tag) log_info('Finding existing task definitions of family \'{}\'...'.format( task_family )) existing_task_definitions = _get_ecs_output(['list-task-definitions', ])[ 'taskDefinitionArns'] for existing_task_definition in [ td for td in existing_task_definitions if re.match( r'arn:aws:ecs+:[^:]+:[^:]+:task-definition/{}:\d+'.format( task_family), td)]: log_info('Deregistering task definition \'{}\'...'.format( existing_task_definition)) _run_ecs_command([ 'deregister-task-definition', '--task-definition', existing_task_definition, ]) with tempfile.NamedTemporaryFile(mode='wt', suffix='.json') as f: task_def_str = json.dumps(task_definition) f.write(task_def_str) f.flush() log_info('Registering task definition...') result = _get_ecs_output([ 'register-task-definition', '--cli-input-json', 'file://{}'.format(f.name), ]) return '{}:{}'.format(task_family, result['taskDefinition']['revision']) def _update_service(service_fpath, task_def_name): with open(service_fpath, 'rt') as f: service_config = json.loads(f.read()) services = _get_ecs_output(['list-services', ])[ 'serviceArns'] for service in [s for s in services if re.match( r'arn:aws:ecs:[^:]+:[^:]+:service/{}'.format( service_config['serviceName']), s )]: log_info('Updating service with new task definition...') _run_ecs_command([ 'update-service', '--service', service, '--task-definition', task_def_name, ]) parser = argparse.ArgumentParser( description="""Deploy latest Docker image to staging server. The task definition file is used as the task definition, whereas the service file is used to configure the service. """) parser.add_argument( 'task_definition_file', help='Your task definition JSON file') parser.add_argument('service_file', help='Your service JSON file') parser.add_argument( '--purge_image', action='store_true', default=False, help='Purge Docker image after tagging?') args = parser.parse_args() task_definition_file = os.path.abspath(args.task_definition_file) service_file = os.path.abspath(args.service_file) os.chdir(_root_dir) task_def_name = _register_task_definition( task_definition_file, args.purge_image) _update_service(service_file, task_def_name)
import sys import subprocess __all__ = ['log_info', 'handle_error', 'run_command', ] def log_info(msg): sys.stdout.write('* {}\n'.format(msg)) sys.stdout.flush() def handle_error(msg): sys.stderr.write('* {}\n'.format(msg)) sys.exit(1) def run_command( command, ignore_error=False, return_stdout=False, capture_stdout=True): if not isinstance(command, (list, tuple)): command = [command, ] command_str = ' '.join(command) log_info('Running command {}'.format(command_str)) try: if capture_stdout: stdout = subprocess.check_output(command) else: subprocess.check_call(command) stdout = None except subprocess.CalledProcessError as err: if not ignore_error: handle_error('Command failed: {}'.format(err)) else: return stdout.decode() if return_stdout else None