Python docker 模块,Client() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用docker.Client()

项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def get_container_info(cls, project, service, docker_client=None):
        """Returns container information for the first container matching project
        and service.
        Raises ContainerUnavailableError if the container is unavailable.

        Args:
            project: Name of the project the container is hosting
            service: Name of the service that the container is hosting
            docker_client: The docker client object
        """
        if docker_client is None:
            docker_client = Client(version='auto')

        try:
            return next(
                c for c in docker_client.containers() if
                c['Labels'].get('com.docker.compose.project') == project and
                c['Labels'].get('com.docker.compose.service') == service
            )
        except StopIteration:
            raise ContainerUnavailableError(project=project, service=service)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def exec_command(cls, command, project, service, timeout_seconds=60):
        """Executes the command in the project and service container running under
        docker-compose.
        Throws ContainerUnavailableError if the retry limit is reached.

        Args:
            command: The command to be executed in the container
            project: Name of the project the container is hosting
            service: Name of the service that the container is hosting
            timeout_seconds: Retry time limit to wait for containers to start
                             Default in seconds: 60
        """
        docker_client = Client(version='auto')
        container_id = cls._get_container_id(project, service, docker_client, timeout_seconds)
        exec_id = docker_client.exec_create(container_id, command)['Id']
        docker_client.exec_start(exec_id)
项目:kliko    作者:gijzelaerr    | 项目源码 | 文件源码
def command_line_run(argv):
    format = '%(asctime)-15s %(message)s'
    logging.basicConfig(level=logging.INFO, format=format)
    image_name = first_parser(argv)
    logger.info("starting image {}".format(image_name))
    config = docker.utils.kwargs_from_env()
    docker_client = docker.Client(**config)
    raw_kliko_data = extract_params(docker_client, image_name)
    kliko_data = validate_kliko(yaml.safe_load(raw_kliko_data))
    parameters, input_path, output_path, work_path = second_parser(argv, kliko_data)
    paths = {
        'input': input_path,
        'output': output_path,
        'work': work_path,
    }
    kliko_runner(kliko_data=kliko_data, parameters=parameters,
                 docker_client=docker_client,
                 image_name=image_name, paths=paths)
项目:contrail-ansible    作者:Juniper    | 项目源码 | 文件源码
def main():
    argument_spec = dict(
        name=dict(required=False, type='list', default=[]),
        api_version=dict(required=False, type='str', default='auto')
    )

    module = AnsibleModule(argument_spec=argument_spec)

    results = dict(changed=False, _containers=[])
    client = docker.Client(version=module.params.get('api_version'))
    containers = client.containers()
    names = module.params.get('name')
    if names and not isinstance(names, list):
        names = [names]
    for container in containers:
        for container_name in container['Names']:
            # remove '/' prefix character
            container_name = container_name[1:]
            if names and container_name not in names:
                continue
            results['_containers'].append(container)
            results[container_name] = container
    module.exit_json(**results)
项目:shipy    作者:containscafeine    | 项目源码 | 文件源码
def start(self, client, cid):
        """
        shipy equivalent of docker start.
        Starts a stopped container.
        :param client: docker.Client instance
        :param cid: ID of the container to be started
        :return: ID of the started container, False otherwise
        """
        sane_start = {}
        sane_start.update({'container': cid})
        try:
            self.logger.debug('Running container {}'.format(cid))
            client.start(**sane_start)
            return cid
        except Exception as e:
            self.logger.debug(e)
            return False
项目:shipy    作者:containscafeine    | 项目源码 | 文件源码
def create(self, client, sane_create):
        """
        shipy equivalent of docker create.
        Creates a container with the supplied config.
        :param client: docker.Client instance
        :param sane_create: parsed and sanified input to shipy
        :return: ID of the container created
        """
        if len(sane_create['image'].split(':')) == 1:
            self.logger.debug('No tag provided, using tag latest')
            sane_create.update({'image': '{}:latest'.
                               format(sane_create['image'])})

        sane_create, host_config = self._host_config_gen(client, sane_create)
        if host_config:
            sane_create.update({'host_config': host_config})

        self.logger.debug('Creating container')
        container_info = client.create_container(**sane_create)
        return container_info['Id']
项目:shipy    作者:containscafeine    | 项目源码 | 文件源码
def ps(self, client, sane_input):
        """
        shipy equivalent of docker ps.
        Lists the containers on the system.
        :param client: docker.Client instance
        :param sane_input: parsed and sanified input to shipy
        :return: currently same output as client.containers(**kwargs)
        """

        ps_output = client.containers(**sane_input)
        for container in ps_output:
            self.logger.debug('Name: {}, ID: {}'.format(
                container['Names'][0].split('/')[1],
                container['Id'][:8]))

        return ps_output
项目:shipy    作者:containscafeine    | 项目源码 | 文件源码
def kill(self, client, sane_input):
        """
        shipy equivalent of docker kill.
        Kills a running container.
        :param client: docker.Client instance
        :param sane_input: parsed and sanified input to shipy
        :return: True if container killed, False otherwise
        """

        try:
            client.kill(**sane_input)
            self.logger.debug('Killed container {}'.format(sane_input['container']))
            return True
        except errors.NotFound:
            self.logger.debug('Container {} not found.'
                              .format(sane_input['container']))
            return False
项目:shipy    作者:containscafeine    | 项目源码 | 文件源码
def rm(self, client, sane_input):
        """
        shipy equivalent of docker rm.
        Removes the given container.
        :param client: docker.Client instance
        :param sane_input: parsed and sanified input to shipy
        :return: True if container removed, False otherwise
        """

        try:
            client.remove_container(**sane_input)
            self.logger.debug('Removed container {}'
                              .format(sane_input['container']))
            return True
        except errors.NotFound:
            self.logger.debug('Container {} not found.'
                              .format(sane_input['container']))
            return False
        except Exception as e:
            self.logger.debug(e.message)
            return False
项目:shipy    作者:containscafeine    | 项目源码 | 文件源码
def restart(self, client, sane_input):
        """
        shipy equivalent of docker restart.
        Restarts a running docker container.
        :param client: docker.Client instance
        :param sane_input: parsed and sanified input to shipy
        :return: True if container restarted, False otherwise
        """

        try:
            client.restart(**sane_input)
            self.logger.debug('Restarted container {}'
                              .format(sane_input['container']))
            return True

        except errors.NotFound:
            self.logger.debug('Could not find container {}'.format(
                sane_input['container']))
            return False
项目:cc-server    作者:curious-containers    | 项目源码 | 文件源码
def __init__(self, config, tee, node_name, node_config):
        self._config = config
        self._tee = tee

        self.node_name = node_name
        self.node_config = node_config

        self._thread_limit = Semaphore(self._config.docker['thread_limit'])

        tls = False
        if self.node_config.get('tls'):
            tls = docker.tls.TLSConfig(**self.node_config['tls'])

        try:
            client_class = docker.APIClient
        except AttributeError:
            client_class = docker.Client
            self._tee('Node {}: Fallback to old docker-py Client.'.format(self.node_name))

        self.client = client_class(
            base_url=self.node_config['base_url'],
            tls=tls,
            timeout=self._config.docker.get('api_timeout'),
            version='auto'
        )
项目:spc    作者:whbrewer    | 项目源码 | 文件源码
def get_docker():
    user = root.authorized()
    params = {}

    try:
        cli = docker.Client(base_url=base_url)
        images = cli.images()
        conts = cli.containers(all=True)
    except:
        images = []
        conts = []
        params['alert'] = "ERROR: there was a problem talking to the Docker daemon..."

    params['user'] = user
    params['app'] = root.active_app()

    if request.query.alert:
        params['alert'] = request.query.alert

    return template('docker', params, images=images, containers=conts)
项目:py-cloud-compute-cannon    作者:Autodesk    | 项目源码 | 文件源码
def kwargs_from_client(client, assert_hostname=False):
    """
    More or less stolen from docker-py's kwargs_from_env
    https://github.com/docker/docker-py/blob/c0ec5512ae7ab90f7fac690064e37181186b1928/docker/utils/utils.py
    :type client : docker.Client
    """
    from docker import tls
    if client.base_url == 'http+docker://localunixsocket':
        return {'base_url': 'unix://var/run/docker.sock'}

    params = {'base_url': client.base_url}
    if client.cert:
        # TODO: problem - client.cert is filepaths, and it would be insecure to send those files.
        params['tls'] = tls.TLSConfig(
            client_cert=client.cert,
            ca_cert=client.verify,
            verify=bool(client.verify),
            assert_hostname=assert_hostname)

    return params
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_execute_tls(self, client_class_mock, tls_class_mock):
        client_mock = mock.Mock(spec=Client)
        client_mock.create_container.return_value = {'Id': 'some_id'}
        client_mock.create_host_config.return_value = mock.Mock()
        client_mock.images.return_value = []
        client_mock.logs.return_value = []
        client_mock.pull.return_value = []
        client_mock.wait.return_value = 0

        client_class_mock.return_value = client_mock
        tls_mock = mock.Mock()
        tls_class_mock.return_value = tls_mock

        operator = DockerOperator(docker_url='tcp://127.0.0.1:2376', image='ubuntu',
                                  owner='unittest', task_id='unittest', tls_client_cert='cert.pem',
                                  tls_ca_cert='ca.pem', tls_client_key='key.pem')
        operator.execute(None)

        tls_class_mock.assert_called_with(assert_hostname=None, ca_cert='ca.pem',
                                          client_cert=('cert.pem', 'key.pem'), ssl_version=None,
                                          verify=True)

        client_class_mock.assert_called_with(base_url='https://127.0.0.1:2376', tls=tls_mock,
                                             version=None)
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def test_execute_unicode_logs(self, client_class_mock):
        client_mock = mock.Mock(spec=Client)
        client_mock.create_container.return_value = {'Id': 'some_id'}
        client_mock.create_host_config.return_value = mock.Mock()
        client_mock.images.return_value = []
        client_mock.logs.return_value = ['unicode container log ??']
        client_mock.pull.return_value = []
        client_mock.wait.return_value = 0

        client_class_mock.return_value = client_mock

        originalRaiseExceptions = logging.raiseExceptions
        logging.raiseExceptions = True

        operator = DockerOperator(image='ubuntu', owner='unittest', task_id='unittest')

        with mock.patch('traceback.print_exception') as print_exception_mock:
            operator.execute(None)
            logging.raiseExceptions = originalRaiseExceptions
            print_exception_mock.assert_not_called()
项目:LMDocker-project    作者:xiaozhazi    | 项目源码 | 文件源码
def docker_py_check():
    global docker_api_version
    if isBlank(docker_api_version):
        logging.error('Error: criu check failed [inner issue occurs].')

    cli = Client(version=docker_api_version)
    to_json = json.dumps(cli.info())
    json_info = json.loads(to_json)
    if json_info['Driver'] != 'aufs':
        logging.error('Error: now we just support aufs.')
        return False
    logging.debug(json_info['OperatingSystem'] +'.')
    print(json_info['KernelVersion'])
    logging.debug('docker py works')
    return True

#----check the kernel version----#
项目:LMDocker-project    作者:xiaozhazi    | 项目源码 | 文件源码
def get_container_info(container_name):
    cli = Client(version='1.21')
    out = cli.inspect_container(container_name)

    if 'Error' in out:
        logging.error('Error: Get container id Failed')
        return None,None

    image = out['Config']['Image']
    image_id = out['Image']
    label = container_name + '-' + image + '-' + image_id
    logging.info(label)
    pid = out['State']['Pid']
    logging.info(pid)

    return out['Id'],label,pid


#----Check whether the container is running or not.----#
项目:docker_based_cloudlet    作者:hixichen    | 项目源码 | 文件源码
def get_con_info(name):
    cli = Client(version='1.21')
    out = cli.inspect_container(name)
    if 'Error' in out:
        logging.error('get container id failed')
        return None, None

    image = out['Config']['Image']
    image_id = out['Image']
    label = name + '-' + image + '-' + image_id
    logging.info(label)

    # get pid.
    pid = out['State']['Pid']
    logging.info(pid)

    return out['Id'], label, pid
项目:schematizer    作者:Yelp    | 项目源码 | 文件源码
def _ensure_containers_up(project, service):
    docker_client = Client(version='auto')
    service_container = docker_client.containers(
        filters={
            'label': ["com.docker.compose.project={}".format(project),
                      "com.docker.compose.service={}".format(service)]
        }
    )[0]
    container_id = service_container['Id']

    timeout_in_seconds = 5
    timed_out = time.time() + timeout_in_seconds
    while time.time() < timed_out:
        inspect_info = docker_client.inspect_container(container_id)
        container_status = inspect_info['State']['Status']
        if container_status == 'running':
            return container_id
        time.sleep(0.5)
    raise Exception("Unable to start up Schematizer container.")
项目:voltha    作者:opencord    | 项目源码 | 文件源码
def get_my_containers_name():
    """
    Return the docker containers name in which this process is running.
    To look up the container name, we use the container ID extracted from the
    $HOSTNAME environment variable (which is set by docker conventions).
    :return: String with the docker container name (or None if any issue is
             encountered)
    """
    my_container_id = os.environ.get('HOSTNAME', None)

    try:
        docker_cli = Client(base_url=docker_socket)
        info = docker_cli.inspect_container(my_container_id)

    except Exception, e:
        log.exception('failed', my_container_id=my_container_id, e=e)
        raise

    name = info['Name'].lstrip('/')

    return name
项目:voltha    作者:opencord    | 项目源码 | 文件源码
def create_networking_config(name, links):
    """
    Creates a container networks based on a set of containers.
    :param name: the network name
    :param links: the set of containers to link
    :return: a network configuration
    """
    try:
        docker_cli = Client(base_url=docker_socket)
        networking_config = docker_cli.create_networking_config({
            name : docker_cli.create_endpoint_config(links=links)
        })
    except Exception, e:
        log.exception('failed-network-creation', name, e=e)
        raise

    return networking_config
项目:hyrise_rcm    作者:DukeHarris    | 项目源码 | 文件源码
def __init__(self, url):
        self.url = url
        self.consul_url = "consul://vm-imdmresearch-keller-06.eaalab.hpi.uni-potsdam.de:8500"

        self.nodes = []
        self.instances = []

        self.workload_is_set = False
        self.throughput = 0

        self.docker = Client(base_url='tcp://vm-imdmresearch-keller-01.eaalab.hpi.uni-potsdam.de:8888', timeout=180) #TODO
        self.lock = threading.Lock()

        self.dispatcher_url = "" #internal ip
        self.master_url = ""
        self.dispatcher_node_url = ""
        self.dispatcher_ip = "" # external ip to receive queries
项目:giftwrap    作者:openstack    | 项目源码 | 文件源码
def _build_image(self):
        template_vars = {
            'commands': self._commands
        }
        dockerfile_contents = self._render_dockerfile(template_vars)

        tempdir = tempfile.mkdtemp()
        dockerfile = os.path.join(tempdir, 'Dockerfile')
        with open(dockerfile, "w") as w:
            w.write(dockerfile_contents)
        docker_client = docker.Client(base_url='unix://var/run/docker.sock',
                                      timeout=10)
        build_result = docker_client.build(path=tempdir, stream=True,
                                           tag=self.image_name)
        for line in build_result:
            LOG.info(line.strip())

    # I borrowed this from docker/stackbrew, should cull it down
    # to be more sane.
项目:jupyterhub-carina    作者:jupyter-attic    | 项目源码 | 文件源码
def client(self):
        """
        The Docker client used to connect to the user's Carina cluster
        """
        # TODO: Figure out how to configure this without overriding, or tweak a bit and call super
        if self._client is None:
            creds_dir = self.docker_config['DOCKER_CERT_PATH']
            api_endpoint = self.docker_config['DOCKER_HOST'].replace("tcp://", "https://")
            tls_config = docker.tls.TLSConfig(
                client_cert=(os.path.join(creds_dir, 'cert.pem'),
                             os.path.join(creds_dir, 'key.pem')),
                ca_cert=os.path.join(creds_dir, 'ca.pem'),
                verify=os.path.join(creds_dir, 'ca.pem'),
                assert_hostname=False)

            self._client = docker.Client(version='auto', tls=tls_config, base_url=api_endpoint)

        return self._client
项目:vagrant-ansible-docker-swarm    作者:jamesdmorgan    | 项目源码 | 文件源码
def main():
    '''
        Register / De-register containers that have
        CONSUL_SERVICE_PORT env variable defined
    '''
    args = handler_args()
    setup_logging(args.verbose)

    # create a docker client object that talks to the local docker daemon
    cli = docker.Client(base_url='unix://var/run/docker.sock')
    con = consul.Consul()

    logger.info("Consul notifier processing {0}".format(args.action))

    if args.action == 'stream':
        stream(cli, con)
    elif args.action in ['register', 'deregister']:
        s = Service(cli, con, args.name)
        s.handle(args.action)
    else:
        logger.error("Unknown action {0}".format(args.action))
        sys.exit(1)
项目:cello    作者:yeasy    | 项目源码 | 文件源码
def _clean_chaincode_images(daemon_url, name_prefix, timeout=5):
    """ Clean chaincode images, whose name should have cluster id as prefix

    :param daemon_url: Docker daemon url
    :param name_prefix: image name prefix
    :param timeout: Time to wait for the response
    :return: None
    """
    logger.debug("clean chaincode images with prefix={}".format(name_prefix))
    client = Client(base_url=daemon_url, version="auto", timeout=timeout)
    images = client.images()
    id_removes = [e['Id'] for e in images if e['RepoTags'][0].startswith(
        name_prefix)]
    if id_removes:
        logger.debug("chaincode image id to removes=" + ", ".join(id_removes))
    for _ in id_removes:
        client.remove_image(_, force=True)
项目:cello    作者:yeasy    | 项目源码 | 文件源码
def _clean_project_containers(daemon_url, name_prefix, timeout=5):
    """
    Clean cluster node containers and chaincode containers

    All containers with the name prefix will be removed.

    :param daemon_url: Docker daemon url
    :param name_prefix: image name prefix
    :param timeout: Time to wait for the response
    :return: None
    """
    logger.debug("Clean project containers, daemon_url={}, prefix={}".format(
        daemon_url, name_prefix))
    client = Client(base_url=daemon_url, version="auto", timeout=timeout)
    containers = client.containers(all=True)
    id_removes = [e['Id'] for e in containers if
                  e['Names'][0].split("/")[-1].startswith(name_prefix)]
    for _ in id_removes:
        client.remove_container(_, force=True)
        logger.debug("Remove container {}".format(_))
项目:cello    作者:yeasy    | 项目源码 | 文件源码
def start_containers(daemon_url, name_prefix, timeout=5):
    """Start containers with given prefix

    The chaincode container usually has name with `name_prefix-` as prefix

    :param daemon_url: Docker daemon url
    :param name_prefix: image name prefix
    :param timeout: Time to wait for the response
    :return: None
    """
    logger.debug("Get containers, daemon_url={}, prefix={}".format(
        daemon_url, name_prefix))
    client = Client(base_url=daemon_url, version="auto", timeout=timeout)
    containers = client.containers(all=True)
    id_cc = [e['Id'] for e in containers if
             e['Names'][0].split("/")[-1].startswith(name_prefix)]
    logger.info(id_cc)
    for _ in id_cc:
        client.start(_)


#  Deprecated
#  Normal chaincode container may also become exited temporarily
项目:cello    作者:yeasy    | 项目源码 | 文件源码
def _clean_exited_containers(daemon_url):
    """ Clean those containers with exited status

    This is dangerous, as it may delete temporary containers.
    Only trigger this when no one else uses the system.

    :param daemon_url: Docker daemon url
    :return: None
    """
    logger.debug("Clean exited containers")
    client = Client(base_url=daemon_url, version="auto")
    containers = client.containers(quiet=True, all=True,
                                   filters={"status": "exited"})
    id_removes = [e['Id'] for e in containers]
    for _ in id_removes:
        logger.debug("exited container to remove, id={}", _)
        try:
            client.remove_container(_)
        except Exception as e:
            logger.error("Exception in clean_exited_containers {}".format(e))
项目:cello    作者:yeasy    | 项目源码 | 文件源码
def check_daemon(daemon_url, timeout=5):
    """ Check if the daemon is active

    Only wait for timeout seconds.

    :param daemon_url: Docker daemon url
    :param timeout: Time to wait for the response
    :return: True for active, False for inactive
    """
    if not daemon_url or not daemon_url.startswith("tcp://"):
        return False
    segs = daemon_url.split(":")
    if len(segs) != 3:
        logger.error("Invalid daemon url = ", daemon_url)
        return False
    try:
        client = Client(base_url=daemon_url, version="auto", timeout=timeout)
        return client.ping() == 'OK'
    except Exception as e:
        logger.error("Exception in check_daemon {}".format(e))
        return False
项目:cello    作者:yeasy    | 项目源码 | 文件源码
def get_swarm_node_ip(swarm_url, container_name, timeout=5):
    """
    Detect the host ip where the given container locate in the swarm cluster

    :param swarm_url: Swarm cluster api url
    :param container_name: The container name
    :param timeout: Time to wait for the response
    :return: host ip
    """
    logger.debug("Detect container={} with swarm_url={}".format(
        container_name, swarm_url))
    try:
        client = Client(base_url=swarm_url, version="auto", timeout=timeout)
        info = client.inspect_container(container_name)
        return info['NetworkSettings']['Ports']['5000/tcp'][0]['HostIp']
    except Exception as e:
        logger.error("Exception happens when detect container host!")
        logger.error(e)
        return ''
项目:mysql_streamer    作者:Yelp    | 项目源码 | 文件源码
def get_mysql_conn():
    """
    This function instantiates a MySQL database container, which the tests use
    to execute given MySQL DDL statements such as CREATE TABLE or ALTER TABLE
    statement, etc., and then extract the table information from the db.
    """
    client = Client(version='auto')

    container_id = _create_container(client)
    try:
        client.start(container=container_id)
        container_host = _get_container_host(client, container_id)
        mysql_conn = _wait_for_mysql_ready(container_host)
        yield mysql_conn
    finally:
        if mysql_conn:
            mysql_conn.close()
        _cleanup_container(client, container_id)
项目:ops    作者:xiaomatech    | 项目源码 | 文件源码
def _client(self, req):
        base_url = req.get_param(name='h')
        tlscert = docker_config.get('tlscert')
        tlskey = docker_config.get('tlskey')
        tlscacert = docker_config.get('tlscacert')
        if tlscacert is None and tlscert is None and tlskey is None:
            return docker.Client(base_url=base_url)
        elif tlscacert is None and tlscert is not None and tlskey is not None:
            tls_config = docker.tls.TLSConfig(client_cert=(
                docker_config.get('tlscert'), docker_config.get('tlskey')))
        elif tlscacert is not None and tlscert is not None and tlskey is not None:
            tls_config = docker.tls.TLSConfig(
                client_cert=(docker_config.get('tlscert'),
                             docker_config.get('tlskey')),
                verify=docker_config.get('tlscacert'))
        elif tlscacert is not None and tlscert is None and tlskey is None:
            tls_config = docker.tls.TLSConfig(ca_cert=tlscacert)

        client = docker.Client(base_url=base_url, tls=tls_config)
        return client
项目:cmonitor    作者:Colstuwjx    | 项目源码 | 文件源码
def __init__(self, configs={}):
        socket = configs["socket"]

        if "tls" in configs:
            is_tls = True

            # config_path = '/etc/pki/CA'
            config_path = configs["tls"]["config_path"]
            cert = os.path.join(config_path, configs["tls"]["cert"])
            key = os.path.join(config_path, configs["tls"]["key"])
            ca = os.path.join(config_path, configs["tls"]["ca"])
        else:
            is_tls = False

        if not is_tls:
            client = docker.Client(base_url=socket)
        else:
            tls_config = docker.tls.TLSConfig(
                client_cert=(cert, key),
                verify=ca,
            )
            client = docker.Client(base_url=socket, tls=tls_config)
        self.client = client
        self.configs = configs
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def get_container_ip_address(cls, project, service, timeout_seconds=100):
        """Fetches the ip address assigned to the running container.
        Throws ContainerUnavailableError if the retry limit is reached.

        Args:
            project: Name of the project the container is hosting
            service: Name of the service that the container is hosting
            timeout_seconds: Retry time limit to wait for containers to start
                             Default in seconds: 60
        """
        docker_client = Client(version='auto')
        container_id = cls._get_container_id(project, service, docker_client, timeout_seconds)
        return docker_client.inspect_container(
            container_id
        )['NetworkSettings']['IPAddress']
项目:benchmarks    作者:tensorflow    | 项目源码 | 文件源码
def _BuildAndPushDockerImage(
    docker_client, docker_file, name, tag, push_to_gcloud=False):
  """Builds a docker image and optionally pushes it to gcloud.

  Args:
    docker_client: docker.Client object.
    docker_file: Dockerfile path.
    name: name of the benchmark to build a docker image for.
    tag: tag for docker image.
    push_to_gcloud: whether to push the image to google cloud.

  Returns:
    Docker image identifier.
  """
  local_docker_image_with_tag = '%s:%s' % (name, tag)
  remote_docker_image = _DOCKER_IMAGE_PATTERN % name
  remote_docker_image_with_tag = '%s:%s' % (remote_docker_image, tag)
  if FLAGS.docker_context_dir:
    docker_context = os.path.join(
        os.path.dirname(__file__), FLAGS.docker_context_dir)
    docker_file_name = docker_file
  else:
    docker_context = os.path.dirname(docker_file)
    docker_file_name = os.path.basename(docker_file)

  built_image = docker_client.images.build(
      path=docker_context, dockerfile=docker_file_name,
      tag=local_docker_image_with_tag,
      pull=True)
  built_image.tag(remote_docker_image, tag=tag)
  if push_to_gcloud:
    subprocess.check_call(
        ['gcloud', 'docker', '--', 'push', remote_docker_image_with_tag])
  return remote_docker_image_with_tag
项目:eventdriventalk    作者:cachedout    | 项目源码 | 文件源码
def container_exists(container):
    d = docker.Client()
    return container in d.containers()
项目:shub-image    作者:scrapinghub    | 项目源码 | 文件源码
def get_docker_client(validate=True):
    """A helper to initiate Docker client"""
    try:
        import docker
    except ImportError:
        raise ImportError('You need docker-py installed for the cmd')

    docker_host = os.environ.get('DOCKER_HOST')
    tls_config = None
    if os.environ.get('DOCKER_TLS_VERIFY', False):
        tls_cert_path = os.environ.get('DOCKER_CERT_PATH')
        if not tls_cert_path:
            tls_cert_path = os.path.join(os.path.expanduser('~'), '.docker')
        apply_path_fun = lambda name: os.path.join(tls_cert_path, name)  # noqa
        tls_config = docker.tls.TLSConfig(
            client_cert=(apply_path_fun('cert.pem'),
                         apply_path_fun('key.pem')),
            verify=apply_path_fun('ca.pem'),
            assert_hostname=False)
        docker_host = docker_host.replace('tcp://', 'https://')
    version = os.environ.get('DOCKER_VERSION', DEFAULT_DOCKER_VERSION)
    client = docker.Client(base_url=docker_host,
                           version=version,
                           tls=tls_config)
    if validate:
        validate_connection_with_docker_daemon(client)
    return client
项目:metadataproxy    作者:lyft    | 项目源码 | 文件源码
def docker_client():
    global _docker_client
    if _docker_client is None:
        _docker_client = docker.Client(base_url=app.config['DOCKER_URL'])
    return _docker_client
项目:caduc    作者:tjamet    | 项目源码 | 文件源码
def setUp(self):
        self.logger = logging.getLogger(type(self).__name__)
        self.client = docker.Client(**docker.utils.kwargs_from_env(assert_hostname=False))
        options = mock.Mock()
        options.debug = False
        options.config = ['images.test-*.grace_time=1s']
        options.config_path = None
        options.image_gracetime = '1d'
        self.options = options
        self.containers = set()
        self.images = set()
项目:caduc    作者:tjamet    | 项目源码 | 文件源码
def create_watcher(options, args):
    if options.debug:
        logging.basicConfig(level=logging.DEBUG)
    else:
        logging.basicConfig(level=logging.INFO)

    def client():
        return docker.Client(**docker.utils.kwargs_from_env(assert_hostname=False))
    config = Config(options.config, options.config_path)
    images = Images(config, client, default_timeout=options.image_gracetime)
    containers = Containers(config, client, images)
    images.update_timers()
    return Watcher(client, images, containers)
项目:open-nti    作者:Juniper    | 项目源码 | 文件源码
def test_connect_docker():
    global c
    global DOCKER_IP

    # initialize Docker Object
    if _platform == "linux" or _platform == "linux2" or _platform == "darwin":
        # linux
        c = Client(base_url='unix://var/run/docker.sock', version='1.20')

    elif _platform == "win32":
        exit

    # Check if connection to Docker work by listing all images
    list_images = c.images()
    assert len(list_images) >= 1
项目:open-nti    作者:Juniper    | 项目源码 | 文件源码
def check_docker():
    global c
    # initialize Docker Object
    if _platform == "linux" or _platform == "linux2" or _platform == "darwin":
        # linux
        c = Client(base_url='unix://var/run/docker.sock', version='1.20')
    elif _platform == "win32":
        exit

    return c
项目:open-nti    作者:Juniper    | 项目源码 | 文件源码
def test_connect_docker():
    global c
    global DOCKER_IP

    # initialize Docker Object
    if _platform == "linux" or _platform == "linux2" or _platform == "darwin":
        # linux
        c = Client(base_url='unix://var/run/docker.sock', version='1.20')

    elif _platform == "win32":
        exit

    # Check if connection to Docker work by listing all images
    list_images = c.images()
    assert len(list_images) >= 1
项目:salt-toaster    作者:openSUSE    | 项目源码 | 文件源码
def docker_client():
    client = Client(base_url='unix://var/run/docker.sock', timeout=180)
    return client
项目:pykit    作者:baishancloud    | 项目源码 | 文件源码
def _docker_cli():
    dcli = docker.Client(base_url='unix://var/run/docker.sock')
    return dcli
项目:pykit    作者:baishancloud    | 项目源码 | 文件源码
def get_client():
    dcli = docker.Client(base_url='unix://var/run/docker.sock')
    return dcli
项目:pytest-dockerpy    作者:keeppythonweird    | 项目源码 | 文件源码
def _docker_client():
    return docker.Client('unix://var/run/docker.sock', version="auto")
项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def __init__(self, module):
        self.module = module
        self.params = self.module.params
        self.changed = False

        # TLS not fully implemented
        # tls_config = self.generate_tls()

        options = {
            'version': self.params.get('api_version')
        }

        self.dc = docker.Client(**options)
项目:kolla-kubernetes-personal    作者:rthallisey    | 项目源码 | 文件源码
def docker_client():
    try:
        docker_kwargs = docker.utils.kwargs_from_env()
        return docker.Client(version='auto', **docker_kwargs)
    except docker.errors.DockerException:
        LOG.exception('Can not communicate with docker service.'
                      'Please check docker service is running without errors')
        sys.exit(1)