小编典典

如何将更新的 Docker 映像部署到 Amazon ECS 任务?

all

一旦在相应的注册表中更新了所述图像,让我的Amazon ECS任务更新其 Docker
图像的正确方法是什么?


阅读 210

收藏
2022-08-17

共1个答案

小编典典

我创建了一个脚本,用于将更新的 Docker 镜像部署到 ECS 上的暂存服务,以便相应的任务定义引用 Docker
镜像的当前版本。我不确定我是否遵循最佳实践,因此欢迎提供反馈。

要使脚本正常工作,您需要一个备用 ECS 实例或一个deploymentConfiguration.minimumHealthyPercent值,以便
ECS 可以窃取一个实例来部署更新的任务定义。

我的算法是这样的:

  1. 使用 Git 修订标记与任务定义中的容器对应的 Docker 映像。
  2. 将 Docker 镜像标签推送到相应的注册表。
  3. 取消注册任务定义系列中的旧任务定义。
  4. 注册新的任务定义,现在指的是用当前 Git 修订标记的 Docker 镜像。
  5. 更新服务以使用新的任务定义。

我的代码粘贴在下面:

部署-ecs

#!/usr/bin/env python3
import subprocess
import sys
import os.path
import json
import re
import argparse
import tempfile

_root_dir = os.path.abspath(os.path.normpath(os.path.dirname(__file__)))
sys.path.insert(0, _root_dir)
from _common import *


def _run_ecs_command(args):
    run_command(['aws', 'ecs', ] + args)


def _get_ecs_output(args):
    return json.loads(run_command(['aws', 'ecs', ] + args, return_stdout=True))


def _tag_image(tag, qualified_image_name, purge):
    log_info('Tagging image \'{}\' as \'{}\'...'.format(
        qualified_image_name, tag))
    log_info('Pulling image from registry in order to tag...')
    run_command(
        ['docker', 'pull', qualified_image_name], capture_stdout=False)
    run_command(['docker', 'tag', '-f', qualified_image_name, '{}:{}'.format(
        qualified_image_name, tag), ])
    log_info('Pushing image tag to registry...')
    run_command(['docker', 'push', '{}:{}'.format(
        qualified_image_name, tag), ], capture_stdout=False)
    if purge:
        log_info('Deleting pulled image...')
        run_command(
            ['docker', 'rmi', '{}:latest'.format(qualified_image_name), ])
        run_command(
            ['docker', 'rmi', '{}:{}'.format(qualified_image_name, tag), ])


def _register_task_definition(task_definition_fpath, purge):
    with open(task_definition_fpath, 'rt') as f:
        task_definition = json.loads(f.read())

    task_family = task_definition['family']

    tag = run_command([
        'git', 'rev-parse', '--short', 'HEAD', ], return_stdout=True).strip()
    for container_def in task_definition['containerDefinitions']:
        image_name = container_def['image']
        _tag_image(tag, image_name, purge)
        container_def['image'] = '{}:{}'.format(image_name, tag)

    log_info('Finding existing task definitions of family \'{}\'...'.format(
        task_family
    ))
    existing_task_definitions = _get_ecs_output(['list-task-definitions', ])[
        'taskDefinitionArns']
    for existing_task_definition in [
        td for td in existing_task_definitions if re.match(
            r'arn:aws:ecs+:[^:]+:[^:]+:task-definition/{}:\d+'.format(
                task_family),
            td)]:
        log_info('Deregistering task definition \'{}\'...'.format(
            existing_task_definition))
        _run_ecs_command([
            'deregister-task-definition', '--task-definition',
            existing_task_definition, ])

    with tempfile.NamedTemporaryFile(mode='wt', suffix='.json') as f:
        task_def_str = json.dumps(task_definition)
        f.write(task_def_str)
        f.flush()
        log_info('Registering task definition...')
        result = _get_ecs_output([
            'register-task-definition',
            '--cli-input-json', 'file://{}'.format(f.name),
        ])

    return '{}:{}'.format(task_family, result['taskDefinition']['revision'])


def _update_service(service_fpath, task_def_name):
    with open(service_fpath, 'rt') as f:
        service_config = json.loads(f.read())
    services = _get_ecs_output(['list-services', ])[
        'serviceArns']
    for service in [s for s in services if re.match(
        r'arn:aws:ecs:[^:]+:[^:]+:service/{}'.format(
            service_config['serviceName']),
        s
    )]:
        log_info('Updating service with new task definition...')
        _run_ecs_command([
            'update-service', '--service', service,
            '--task-definition', task_def_name,
        ])


parser = argparse.ArgumentParser(
    description="""Deploy latest Docker image to staging server.
The task definition file is used as the task definition, whereas
the service file is used to configure the service.
""")
parser.add_argument(
    'task_definition_file', help='Your task definition JSON file')
parser.add_argument('service_file', help='Your service JSON file')
parser.add_argument(
    '--purge_image', action='store_true', default=False,
    help='Purge Docker image after tagging?')
args = parser.parse_args()

task_definition_file = os.path.abspath(args.task_definition_file)
service_file = os.path.abspath(args.service_file)

os.chdir(_root_dir)

task_def_name = _register_task_definition(
    task_definition_file, args.purge_image)
_update_service(service_file, task_def_name)

_common.py

import sys
import subprocess


__all__ = ['log_info', 'handle_error', 'run_command', ]


def log_info(msg):
    sys.stdout.write('* {}\n'.format(msg))
    sys.stdout.flush()


def handle_error(msg):
    sys.stderr.write('* {}\n'.format(msg))
    sys.exit(1)


def run_command(
        command, ignore_error=False, return_stdout=False, capture_stdout=True):
    if not isinstance(command, (list, tuple)):
        command = [command, ]
    command_str = ' '.join(command)
    log_info('Running command {}'.format(command_str))
    try:
        if capture_stdout:
            stdout = subprocess.check_output(command)
        else:
            subprocess.check_call(command)
            stdout = None
    except subprocess.CalledProcessError as err:
        if not ignore_error:
            handle_error('Command failed: {}'.format(err))
    else:
        return stdout.decode() if return_stdout else None
2022-08-17