Python docker 模块,DockerClient() 实例源码

我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用docker.DockerClient()

项目:docker-db    作者:EXASOL    | 项目源码 | 文件源码
def run(self, image, cmd=None):
        """
        Creates and runs a new container with given image and command (in privileged mode). The container is immediately removed when it exits.
        """

        # create a simple client for this task
        try:
            c = docker.DockerClient()
            c.containers.run(image, command=cmd, auto_remove=True, privileged=True)
        except docker.errors.ContainerError as e:
            raise DockerError("Container with image '%s' and command '%s' exited with error: %s" % (image, cmd if cmd else "None", e))
        except docker.errors.ImageNotFound as e:
            raise DockerError("Image '%s' could not be found: %s" % (image, e))
        except docker.errors.APIError as e:
            raise DockerError("Error running a container with image '%s' and command '%s': %s" % (image, cmd if cmd else "None", e))
#}}}
项目:ansible-playbook-bundle    作者:ansibleplaybookbundle    | 项目源码 | 文件源码
def build_apb(project, dockerfile=None, tag=None):
    if dockerfile is None:
        dockerfile = "Dockerfile"
    spec = get_spec(project)
    if 'version' not in spec:
        print("APB spec does not have a listed version. Please update apb.yml")
        exit(1)

    if not tag:
        tag = spec['name']

    update_dockerfile(project, dockerfile)

    print("Building APB using tag: [%s]" % tag)

    try:
        client = docker.DockerClient(base_url='unix://var/run/docker.sock', version='auto')
        client.images.build(path=project, tag=tag, dockerfile=dockerfile)
    except docker.errors.DockerException:
        print("Error accessing the docker API. Is the daemon running?")
        raise

    print("Successfully built APB image: %s" % tag)
    return tag
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def __init__(self, context, spec, build_status=None, docker_version='auto'):
        self.context = context
        self.spec = spec
        self.repo_name = context.repository.split('/')[-1]
        self.commit_hash = context.source['commit']['hash']
        self.build_status = build_status or BuildStatus(
            bitbucket,
            context.source['repository']['full_name'],
            self.commit_hash,
            'badwolf/test',
            url_for('log.build_log', sha=self.commit_hash, _external=True)
        )

        self.docker = DockerClient(
            base_url=current_app.config['DOCKER_HOST'],
            timeout=current_app.config['DOCKER_API_TIMEOUT'],
            version=docker_version,
        )
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def _pull_predefined_dockerimages(self):
        """
        If the package contains URLs to pre-build Docker images, we download them with this method.
        """
        dc = DockerClient()
        for url in self.remote_docker_image_urls.itervalues():
            if not FORCE_PULL:  # only pull if not present (speedup for development)
                if len(dc.images.list(name=url)) > 0:
                    LOG.debug("Image %r present. Skipping pull." % url)
                    continue
            LOG.info("Pulling image: %r" % url)
            # this seems to fail with latest docker api version 2.0.2
            # dc.images.pull(url,
            #        insecure_registry=True)
            #using docker cli instead
            cmd = ["docker",
                   "pull",
                   url,
                   ]
            Popen(cmd).wait()
项目:backup    作者:twindb    | 项目源码 | 文件源码
def docker_client():
    for _ in xrange(5):
        try:
            return docker.DockerClient(version="auto")
        except DockerException as err:
            LOG.error(err)
            time.sleep(5)
    raise DockerException('Failed to get a docker client')


# noinspection PyShadowingNames
项目:SwarmOps    作者:staugur    | 项目源码 | 文件源码
def _UpdateNode(self, leader, node_id, node_role, labels={}):
        """ ??????(Labels?Role?) """

        client = docker.DockerClient(base_url="tcp://{}:{}".format(leader, self.port), version="auto", timeout=self.timeout)

        node_spec = {
            'Availability': 'active',
            'Role': node_role,
            'Labels': labels
        }
        logger.info("Update node spec data is {} for node_id {})".format(node_spec, node_id))

        try:
            node = client.nodes.get(node_id)
            res  = node.update(node_spec)
        except docker.errors.APIError,e:
            logger.error(e, exc_info=True)
            return False
        except Exception,e:
            logger.error(e, exc_info=True)
            return False
        else:
            return res
项目:genorch    作者:genorch    | 项目源码 | 文件源码
def create_service(self, opts):
        for target in self.targets:
            docker_client = docker.DockerClient('tcp://' + common.translate_id(target)[0] +
                                                ':' + cfg.docker['API_PORT'])
            if "sub_driver" in opts:
                sub_driver_name = opts['sub_driver']
                sub_driver = getattr(docker_client, sub_driver_name)

                del opts['sub_driver']

                if sub_driver_name == 'swarm':
                    sub_driver_opts = opts['opts']
                    node_type = sub_driver_opts['type']

                    if node_type == 'manager':
                        sub_driver.init('eth0:' + cfg.docker['SWARM_PORT'], '0.0.0.0:' + cfg.docker['SWARM_PORT'])
                        db.vms.update(insert_join_token(sub_driver.attrs['JoinTokens']), where('name') == target)
                    elif node_type == 'worker':
                        manager = db.vms.get(where('name') == sub_driver_opts['managers'][0])
                        sub_driver.join([common.id_to_swarm(sub_driver_opts['managers'][0])], manager['docker']['join_tokens']['Worker'], '0.0.0.0:' + cfg.docker['SWARM_PORT'])

            else:
                docker_client.containers.run(**opts, detach=True)
项目:genorch    作者:genorch    | 项目源码 | 文件源码
def create_cluster(self):
        self.init = {}
        for vm in self.vms:
            docker_client = docker.DockerClient('tcp://' +
                                                common.translate_id(vm['id'])[0]
                                                + ':' + cfg.docker['API_PORT'])
            swarm_client = docker_client.swarm
            if vm['role'] == 'manager':
                swarm_client.init('eth0:' + cfg.docker['SWARM_PORT'],
                                  '0.0.0.0:' + cfg.docker['SWARM_PORT'])
                db.vms.update(
                              insert_join_token(
                                  swarm_client.attrs['JoinTokens']
                                  ),
                              where('name') == vm['id'])

                self.vms.remove(vm)
                self.init = vm
                break

        for vm in self.vms:
            if vm['role'] == 'manager':
                self.add_manager(vm)
            elif vm['role'] == 'worker':
                self.add_worker(vm)
项目:GAZE    作者:monokal    | 项目源码 | 文件源码
def __init__(self, args, config, docker_url='unix://var/run/docker.sock'):
        """
        :param args: (list) Arguments from the command-line.
        :param config: (dict) Config loaded from the YAML file.
        """

        self.args = args
        self.config = config
        self.docker_url = docker_url

        self.helpers = GazeHelper()
        self.log = GazeLog()
        self.volume = GazeVolume()
        self.network = GazeNetwork()
        self.container = GazeContainer()

        # Instantiate a Docker client.
        try:
            self.docker_client = docker.DockerClient(
                base_url=self.docker_url
            )

        except docker.errors.APIError:
            self.log("Failed to instantiate the Docker client.", 'exception')
            sys.exit(1)
项目:compose-ci    作者:docteurklein    | 项目源码 | 文件源码
def test_it_logins_against_the_registry(self):

        project = mock.create_autospec(Project)
        project.client = mock.create_autospec(docker.DockerClient)
        logger = mock.create_autospec(logging.Logger)

        auth = Auth(
            user='test',
            password='test',
            email='test',
            registry='example.org',
            logger=logger
        )
        auth.login(project)

        project.client.login.assert_called_with('test', 'test', 'test', 'example.org')
项目:zimfarm    作者:openzim    | 项目源码 | 文件源码
def run_redis(self, client: DockerClient):
        self.current_index = 0
        self.step_started()

        containers = client.containers.list(filters={'name': self.host_redis_name})
        if len(containers) == 0:
            client.images.pull('redis', tag='latest')
            client.containers.run('redis', detach=True, name=self.host_redis_name)
        self.step_finished(True)
项目:zimfarm    作者:openzim    | 项目源码 | 文件源码
def pull_mwoffliner(self, client: DockerClient):
        self.current_index = 1
        self.step_started()
        client.images.pull('openzim/mwoffliner', tag='latest')
        self.step_finished(True)
项目:shipmaster    作者:damoti    | 项目源码 | 文件源码
def __init__(self, build_config: BuildConfig, args=None, build_num='0', job_num='0', commit_info=None):
        self.config = build_config
        self.build_num = build_num
        self.commit_info = commit_info
        self.job_num = job_num
        self.client = DockerClient('unix://var/run/docker.sock')
        self.args = args
        self.images = self._stage_to_image_builders_mapping()
        self.plugins = PluginManager(self)
项目:slicer_cli_web    作者:girder    | 项目源码 | 文件源码
def getCliData(name, client, img, jobModel, job):
    try:

        if isinstance(client, docker.DockerClient) and isinstance(img, DockerImage):

            cli_dict = getDockerOutput(name, '--list_cli', client)
            # contains nested dict

            # {<cliname>:{
            #             type:<type>
            #             }
            # }

            cli_dict = json.loads(cli_dict)

            for (key, val) in iteritems(cli_dict):

                cli_xml = getDockerOutput(name, '%s --xml' % key, client)
                cli_dict[key][DockerImage.xml] = cli_xml
                jobModel.updateJob(
                    job,
                    log='Got image %s, cli %s metadata\n' % (name, key),
                    status=JobStatus.RUNNING,
                )
                img.addCLI(key, cli_dict[key])
        return cli_dict
    except Exception as err:
        logger.exception(
            'Error getting %s cli data from image %s', name, img)
        raise DockerImageError(
            'Error getting %s cli data from image %s ' % (name, img) + str(err))
项目:console    作者:laincloud    | 项目源码 | 文件源码
def get_docker_client(docker_base_url):
    return docker.DockerClient(base_url=docker_base_url).api
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def _build_images_from_dockerfiles(self):
        """
        Build Docker images for each local Dockerfile found in the package: self.local_docker_files
        """
        if GK_STANDALONE_MODE:
            return  # do not build anything in standalone mode
        dc = DockerClient()
        LOG.info("Building %d Docker images (this may take several minutes) ..." % len(self.local_docker_files))
        for k, v in self.local_docker_files.iteritems():
            for line in dc.build(path=v.replace("Dockerfile", ""), tag=k, rm=False, nocache=False):
                LOG.debug("DOCKER BUILD: %s" % line)
            LOG.info("Docker image created: %s" % k)
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def _check_docker_image_exists(self, image_name):
        """
        Query the docker service and check if the given image exists
        :param image_name: name of the docker image
        :return:
        """
        return len(DockerClient().images.list(name=image_name)) > 0
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def __init__(self):
        self.dc = None
        self.stacks = dict()
        self.computeUnits = dict()
        self.routers = dict()
        self.flavors = dict()
        self._images = dict()
        self.nets = dict()
        self.ports = dict()
        self.port_pairs = dict()
        self.port_pair_groups = dict()
        self.flow_classifiers = dict()
        self.port_chains = dict()
        self.compute_nets = dict()
        self.dcli = DockerClient(base_url='unix://var/run/docker.sock')
项目:postgraas_server    作者:blue-yonder    | 项目源码 | 文件源码
def _docker_client():
    return docker.DockerClient(base_url='unix://var/run/docker.sock', version='auto', timeout=30)
项目:dao_manage    作者:houziyu    | 项目源码 | 文件源码
def __init__(self):
        host_list = dao_config.host_list
        self.docker_all = {}
        for i in host_list:
            self.docker_singleton = docker.DockerClient(base_url='tcp://%s:2375'%i)
            self.docker_all[i] = self.docker_singleton
        print('docker_host_dictionary:',self.docker_all)
    #docker?????????????????????
项目:SwarmOps    作者:staugur    | 项目源码 | 文件源码
def _JoinSwarm(self, node_ip, role, swarm):
        """ ?????? """

        token   = self._checkSwarmToken(self._checkSwarmLeader(swarm)).get(role, "Worker")
        client  = docker.DockerClient(base_url="tcp://{}:{}".format(node_ip, self.port), version="auto", timeout=self.timeout)

        try:
            res = client.swarm.join(remote_addrs=swarm["manager"], listen_addr="0.0.0.0", advertise_addr=node_ip, join_token=token)
        except docker.errors.APIError,e:
            logger.error(e, exc_info=True)
            return False
        else:
            return res
项目:SwarmOps    作者:staugur    | 项目源码 | 文件源码
def _LeaveSwarm(self, node_ip, force=False):
        """ ?????? """

        client  = docker.DockerClient(base_url="tcp://{}:{}".format(node_ip, self.port), version="auto", timeout=self.timeout)

        try:
            res = client.swarm.leave(force=force)
        except docker.errors.APIError,e:
            logger.error(e, exc_info=True)
            return False
        else:
            return res
项目:genorch    作者:genorch    | 项目源码 | 文件源码
def add_manager(self, vm):
            docker_client = docker.DockerClient('tcp://' +
                                                common.translate_id(vm['id'])[0]
                                                + ':' + cfg.docker['API_PORT'])
            swarm_client = docker_client.swarm
            manager = db.vms.get(where('name') == self.init['id'])
            swarm_client.join(
                    [common.id_to_swarm(self.init['id'])],
                    manager['docker']['join_tokens']['Manager'],
                    '0.0.0.0:' + cfg.docker['SWARM_PORT']
                    )
项目:genorch    作者:genorch    | 项目源码 | 文件源码
def add_worker(self, vm):
            docker_client = docker.DockerClient('tcp://' +
                                                common.translate_id(vm['id'])[0]
                                                + ':' + cfg.docker['API_PORT'])
            swarm_client = docker_client.swarm
            manager = db.vms.get(where('name') == self.init['id'])
            swarm_client.join(
                    [common.id_to_swarm(self.init['id'])],
                    manager['docker']['join_tokens']['Worker'],
                    '0.0.0.0:' + cfg.docker['SWARM_PORT']
                    )
项目:GAZE    作者:monokal    | 项目源码 | 文件源码
def __init__(self):
        self.log = GazeLog()

        # Instantiate a Docker client.
        try:
            self.docker_client = docker.DockerClient(
                base_url='unix://var/run/docker.sock'
            )

        except docker.errors.APIError:
            self.log("Failed to instantiate the Docker client.", 'exception')
            sys.exit(1)
项目:GAZE    作者:monokal    | 项目源码 | 文件源码
def __init__(self):
        self.log = GazeLog()

        # Instantiate a Docker client.
        try:
            self.docker_client = docker.DockerClient(
                base_url='unix://var/run/docker.sock'
            )

        except docker.errors.APIError:
            self.log("Failed to instantiate a Docker client.", 'exception')
            sys.exit(1)
项目:GAZE    作者:monokal    | 项目源码 | 文件源码
def __init__(self):
        self.log = GazeLog()

        # Instantiate a Docker client.
        try:
            self.docker_client = docker.DockerClient(
                base_url='unix://var/run/docker.sock'
            )

        except docker.errors.APIError:
            self.log("Failed to instantiate the Docker client.", 'exception')
            sys.exit(1)
项目:son-cli    作者:sonata-nfv    | 项目源码 | 文件源码
def get_docker_api(self, docker_api):
        if docker_api == 'local':
            # commect to local docker api
            return docker.from_env()
        else:
            # connect to remote docker pai eg. tcp://127.0.0.1:1234
            return docker.DockerClient(base_url=docker_api)
项目:seaworthy    作者:praekeltfoundation    | 项目源码 | 文件源码
def test_custom_client(self):
        """
        When the DockerHelper is created with a custom client, that client is
        used.
        """
        client = docker.DockerClient(base_url='unix://var/run/docker.sock')
        dh = self.make_helper(client=client)

        self.assertIs(dh._client, client)
项目:proxysql-tools    作者:twindb    | 项目源码 | 文件源码
def docker_client():
    return docker.DockerClient(version="auto")
项目:zimfarm    作者:openzim    | 项目源码 | 文件源码
def generate(self, client: DockerClient, config: {}):
        # command to be executed in the container
        def get_command(params: {}):
            params['redis'] = 'redis://redis'
            params['outputDirectory'] = '/output'
            parts: [str] = []
            for key, value in params.items():
                if isinstance(value, bool):
                    parts.append('--{name}'.format(name=key))
                else:
                    parts.append('--{name}={value}'.format(name=key, value=value))
            return 'mwoffliner {}'.format(' '.join(parts))

        # mwoffliner docker run options
        def get_options():
            options = {
                'name': '_'.join([self.project_name, self.request.id]),
                'remove': True,
                'links': {self.host_redis_name: 'redis'},
                'stdout': True,
                'stderr': True,
                'volumes': {self.offliner_output_path: {'bind': '/output', 'mode': 'rw'}},
            }

            cpu_quota = os.getenv('CPU_QUOTA')
            if cpu_quota is not None:
                cpu_quota = float(cpu_quota)
                if 0.0 < cpu_quota < 1.0:
                    options['cpu_quota'] = int(cpu_quota * 1000000)

            mem_limit = os.getenv('MEM_LIMIT')
            if mem_limit is not None:
                options['mem_limit'] = mem_limit

            return options

        self.current_index = 2
        self.step_started()

        command = get_command(config)
        if os.getenv('SHOW_COMMAND', False):
            self.logger.info('Exec: {}'.format(command))

        start_time = datetime.utcnow()

        # TODO: - when container returns none zero exit code, aka when docker.errors.ContainerError is raized result is a `bytes` object and it is not json serializeable. Also, we need to define what to do to recover the task
        result = client.containers.run('openzim/mwoffliner', command, **get_options()).decode('utf-8')
        elapsed_seconds = (datetime.utcnow() - start_time).seconds
        self.step_finished(True, {
            'stdout': result,
            'elapsed_seconds': elapsed_seconds
        })

        # get output zim file name
        # for content in self.worker_running_output_path.joinpath(self.request.id).iterdir():
        #     if not content.is_dir():
        #         self.zim_file_name = str(content.parts[-1])
项目:ansible-playbook-bundle    作者:ansibleplaybookbundle    | 项目源码 | 文件源码
def cmdrun_push(**kwargs):
    project = kwargs['base_path']
    spec = get_spec(project, 'string')
    dict_spec = get_spec(project, 'dict')
    blob = base64.b64encode(spec)
    broker = kwargs["broker"]
    if broker is None:
        broker = get_asb_route()
    data_spec = {'apbSpec': blob}
    print(spec)

    if kwargs['openshift']:
        namespace = kwargs['reg_namespace']
        service = kwargs['reg_svc_name']
        # Assume we are using internal registry, no need to push to broker
        registry = get_registry_service_ip(namespace, service)
        if registry is None:
            print("Failed to find registry service IP address.")
            raise Exception("Unable to get registry IP from namespace %s" % namespace)
        tag = registry + "/" + kwargs['namespace'] + "/" + dict_spec['name']
        print("Building image with the tag: " + tag)
        try:
            client = docker.DockerClient(base_url='unix://var/run/docker.sock', version='auto')
            client.images.build(path=project, tag=tag, dockerfile=kwargs['dockerfile'])
            openshift_config.load_kube_config()
            token = openshift_client.configuration.api_key['authorization'].split(" ")[1]
            client.login(username="unused", password=token, registry=registry, reauth=True)
            client.images.push(tag)
            print("Successfully pushed image: " + tag)
            bootstrap(broker, kwargs.get("basic_auth_username"),
                      kwargs.get("basic_auth_password"), kwargs["verify"])
        except docker.errors.DockerException:
            print("Error accessing the docker API. Is the daemon running?")
            raise
        except docker.errors.APIError:
            print("Failed to login to the docker API.")
            raise

    else:
        response = broker_request(kwargs["broker"], "/apb/spec", "post", data=data_spec,
                                  verify=kwargs["verify"],
                                  basic_auth_username=kwargs.get("basic_auth_username"),
                                  basic_auth_password=kwargs.get("basic_auth_password"))

        if response.status_code != 200:
            print("Error: Attempt to add APB to the Broker returned status: %d" % response.status_code)
            print("Unable to add APB to Ansible Service Broker.")
            exit(1)

        print("Successfully added APB to Ansible Service Broker")

    if not kwargs['no_relist']:
        relist_service_broker(kwargs)
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def build_log(sha):
    task_id = request.args.get('task_id')
    log_dir = os.path.join(current_app.config['BADWOLF_LOG_DIR'], sha)
    # old log path
    if os.path.exists(os.path.join(log_dir, 'build.html')):
        return send_from_directory(log_dir, 'build.html')

    if not task_id:
        abort(404)

    # new log path
    log_dir = os.path.join(log_dir, task_id)
    if os.path.exists(os.path.join(log_dir, 'build.html')):
        return send_from_directory(log_dir, 'build.html')

    # Try realtime logs
    docker = DockerClient(
        base_url=current_app.config['DOCKER_HOST'],
        timeout=current_app.config['DOCKER_API_TIMEOUT'],
        version='auto',
    )
    containers = docker.containers.list(filters=dict(
        status='running',
        label='task_id={}'.format(task_id),
    ))
    if not containers:
        abort(404)

    # TODO: ensure only 1 container matched task_id
    container = containers[0]

    def _streaming_gen():
        yield '<style>{}</style>'.format(deansi.styleSheet())
        yield FOLLOW_LOG_JS
        yield '<div class="ansi_terminal">'
        buffer = []
        for log in container.logs(stdout=True, stderr=True, stream=True, follow=True):
            char = str(log)
            buffer.append(char)
            if char == '\n':
                yield deansi.deansi(''.join(buffer))
                buffer = []
        if buffer:
            yield deansi.deansi(''.join(buffer))
        yield '</div>'

    return Response(_streaming_gen(), mimetype='text/html;charset=utf-8')
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def _cancel_outdated_pipelines(context):
    from docker.errors import NotFound, APIError
    docker = DockerClient(
        base_url=current_app.config['DOCKER_HOST'],
        timeout=current_app.config['DOCKER_API_TIMEOUT'],
        version='auto',
    )
    containers = docker.containers.list(filters=dict(
        status='running',
        label='repo={}'.format(context.repository),
    ))
    if not containers:
        return

    for container in containers:
        labels = container.labels
        if context.type == 'tag':
            continue
        if context.pr_id and labels.get('pull_request') != str(context.pr_id):
            continue
        if context.type == 'branch' and labels.get('branch') != context.source['branch']['name']:
            continue

        task_id = labels.get('task_id')
        if not task_id:
            continue

        future = _RUNNING_PIPELINES.get(task_id)
        if not future or future.cancelled():
            continue

        commit = labels['commit']
        if context.pr_id:
            logger.info('Cancelling outdated pipeline for %s pull request #%s @%s',
                        context.repository,
                        context.pr_id,
                        commit)
        else:
            logger.info('Cancelling outdated pipeline for %s @%s', context.repository, commit)
        # cancel the future and remove the container
        try:
            container.remove(force=True)
        except NotFound:
            pass
        except APIError as exc:
            if 'already in progress' not in exc.explanation:
                raise
        future.cancel()
项目:cmdchallenge-site    作者:jarv    | 项目源码 | 文件源码
def output_from_cmd(cmd, challenge, docker_version=None, docker_base_url=None, tls_settings=None):
    if tls_settings:
        tls_config = docker.tls.TLSConfig(**tls_settings)
    else:
        tls_config = None

    if environ.get('DOCKER_MACHINE_NAME') is None:
        client = docker.DockerClient(version=docker_version, base_url=docker_base_url, tls=tls_config)
    else:
        client = docker.DockerClient(**kwargs_from_env(assert_hostname=False))

    b64cmd = b64encode(cmd)
    challenge_dir = path.join(BASE_WORKING_DIR, challenge['slug'])
    docker_cmd = "/ro_volume/runcmd -slug {slug} {b64cmd}".format(
        slug=challenge['slug'],
        b64cmd=b64cmd)
    with timeout(seconds=DOCKER_TIMEOUT):
        try:
            LOG.warn("Running `{}` in container".format(docker_cmd))
            output = client.containers.run('registry.gitlab.com/jarv/cmdchallenge', docker_cmd, working_dir=challenge_dir, **DOCKER_OPTS)
        except SSLError as e:
            LOG.exception("SSL validation error connecting to {}".format(docker_base_url))
            raise DockerValidationError("SSL Error")
        except ContainerError as e:
            LOG.exception("Container error")
            raise DockerValidationError("There was a problem executing the command, return code: {}".format(e.exit_status))
        except NotFound as e:
            LOG.exception("NotFound error")
            raise DockerValidationError(e.explanation)
        except CommandTimeoutError as e:
            LOG.exception("CommandTimeout error")
            raise DockerValidationError("Command timed out")
        except APIError as e:
            LOG.exception("Docker API error")
            raise DockerValidationError("Docker API error")
        except ConnectionError as e:
            LOG.exception("Docker ConnectionError")
            raise DockerValidationError("Docker connection error")
        try:
            output_json = json.loads(output)
        except ValueError as e:
            LOG.exception("JSON decode error")
            raise DockerValidationError("Command failure")
    if 'Error' in output_json:
        LOG.error("Command execution error: {}".format(output_json['Error']))
        raise DockerValidationError("Command execution error")
    return output_json
项目:docker-testnet    作者:l-n-s    | 项目源码 | 文件源码
def main():
    # cli = docker.from_env()
    cli = docker.DockerClient(base_url='unix://var/run/docker.sock', 
            version='auto')
    testnet = Testnet(cli)

    if os.getenv("I2PD_IMAGE"):
        testnet.I2PD_IMAGE = os.getenv("I2PD_IMAGE")
    if os.getenv("NETNAME"):
        testnet.NETNAME = os.getenv("NETNAME")
    if os.getenv("DEFAULT_ARGS"):
        testnet.DEFAULT_ARGS = os.getenv("DEFAULT_ARGS")

    while 1:
        try:
            inpt = prompt('testnet> ', history=history, 
                    auto_suggest=AutoSuggestFromHistory(),
                    completer=TestnetCompleter)
            if not inpt: continue
        except (EOFError, KeyboardInterrupt):
            if testnet.NODES:
                warnings.warn("Testnet containers are still running")
            break

        command = inpt.split()

        if command[0]   == "help":
            print_help()
        elif command[0] == "stop" or command[0] == "quit":
            stop(testnet)
            if command[0] == "quit": break
        elif command[0] == "start":
            start(testnet)
        elif command[0] == "stats":
            stats(testnet)
        elif command[0] == "add":
            args = command[1:] if len(command) > 1 else []
            add(testnet, *args)
        elif command[0] == "create_tunnel":
            if len(command) < 7: continue
            create_tunnel(testnet, command[1], command[2], command[3:])
        elif command[0] == "remove":
            if len(command) < 2: continue
            remove(testnet, command[1:])
        elif command[0] == "inspect":
            if len(command) != 2: continue
            inspect(testnet, command[1])
        elif command[0] == "root":
            import pdb; pdb.set_trace()