Python docker 模块,APIClient() 实例源码

我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用docker.APIClient()

项目:bay    作者:eventbrite    | 项目源码 | 文件源码
def client(self):
        """
        Returns a Docker client for the URL
        """
        # TLS setup
        tls = None
        tls_client = None
        if self.tls_cert and self.tls_key:
            tls_client = (self.tls_cert, self.tls_key)
        if tls_client or self.tls_ca:
            tls = docker.tls.TLSConfig(
                ca_cert=self.tls_ca,
                client_cert=tls_client,
                verify=True,
            )
        # Make client
        try:
            return docker.APIClient(
                base_url=self.url,
                version="auto",
                timeout=42,
                tls=tls,
            )
        except docker.errors.DockerException:
            raise DockerNotAvailableError("The docker host at {} is not available".format(self.url))
项目:fuxi    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(FuxiBaseTest, self).setUp()
        self.docker_client = docker.APIClient(
            base_url='tcp://0.0.0.0:2375')
        try:
            self.cinder_client = get_cinder_client_from_env()
            self.manila_client = get_manila_client_from_env()
        except Exception as e:
            # We may missing or didn't source configured openrc file.
            message = ('Missing environment variable %s in your local. '
                       'Please add it and also check other missing '
                       'environment variables. After that please source '
                       'the openrc file. '
                       'Trying credentials from DevStack cloud.yaml ...')
            LOG.warning(message, e.args[0])
            self.cinder_client = get_cinder_client_from_creds()
            self.manila_client = get_manila_client_from_creds()
项目:cc-server    作者:curious-containers    | 项目源码 | 文件源码
def __init__(self, config, tee, node_name, node_config):
        self._config = config
        self._tee = tee

        self.node_name = node_name
        self.node_config = node_config

        self._thread_limit = Semaphore(self._config.docker['thread_limit'])

        tls = False
        if self.node_config.get('tls'):
            tls = docker.tls.TLSConfig(**self.node_config['tls'])

        try:
            client_class = docker.APIClient
        except AttributeError:
            client_class = docker.Client
            self._tee('Node {}: Fallback to old docker-py Client.'.format(self.node_name))

        self.client = client_class(
            base_url=self.node_config['base_url'],
            tls=tls,
            timeout=self._config.docker.get('api_timeout'),
            version='auto'
        )
项目:wrfy    作者:grahame    | 项目源码 | 文件源码
def pull_all(args):
    "pull all images"
    def status_title(tag, pad=None):
        title = 'pull %s' % (tag)
        if pad:
            title = '%*s' % (pad, title)
        return title

    def pull_tags(tags):
        pad = max(len(status_title(t)) for t in tags)
        for tag in sorted(tags):
            log_action("pulling tag: %s" % (tag))
            try:
                print_status_stream(
                    status_title(tag, pad),
                    cli.pull(tag, stream=True))
            except APIError as err:
                print(err)

    cli = Client()
    tags = Image.repotags(cli)
    if tags:
        pull_tags(tags)
项目:wrfy    作者:grahame    | 项目源码 | 文件源码
def rmi_dangling(args):
    "remove all dangling (untagged) images"
    cli = Client()
    to_remove = []
    for image, used_by in untagged_images_with_usage(cli):
        if used_by:
            log_issue("not removing image: %s (in use by %s)" % (image, used_by))
        else:
            to_remove.append(image)
    if not to_remove:
        return
    background = ['The following dangling images will be removed:\n']
    background += [' - %s\n' % (image) for image in to_remove]
    if not args.force and not confirm_action(
            ''.join(background), 'Remove images?'):
        return
    for image in to_remove:
        log_action("removing dangling image: %s" % (image))
        log_any_error(lambda: cli.remove_image(image.get('Id')))
项目:wrfy    作者:grahame    | 项目源码 | 文件源码
def rmi_matching(args):
    "remove images which have tags matching `pattern'"
    cli = Client()

    def all_image_tags():
        for image in Image.all(cli):
            for tag in image.tags:
                yield tag
    to_remove = list(
        match_iterator_glob_or_regexp(
            args,
            all_image_tags(),
            lambda t: t))
    if not to_remove:
        return
    background = ['Images with the following tags will be deleted:\n']
    for tag in sorted(to_remove):
        background.append(' - %s\n' % (tag))
    if not args.force and not confirm_action(
            ''.join(background), 'Delete matching images?'):
        return
    for tag in to_remove:
        log_action("removing image via tag: %s" % (tag))
        log_any_error(lambda: cli.remove_image(tag))
项目:ToolDog    作者:bio-tools    | 项目源码 | 文件源码
def __init__(self, image, command, environment=None):
        """
        Create the container. Not the same as `docker run`, need to be started after the creation.

        :param image: the image to run
        :type image: STRING
        :param command: the command to be run in the container
        :type command: STRING or LIST
        :param environment: A dictionary or a list of strings in the following format {"TEST": "123"} or ["TEST=123"].
        :type environment: DICT or LIST
        :param mount: absolute path to file to mount
        :type mount: STRING
        """
        self.client = docker.APIClient()
        self.pull(image)
        self.id = None
        self.run(image, command, environment)
项目:repo2docker    作者:jupyter    | 项目源码 | 文件源码
def push_image(self):
        client = docker.APIClient(version='auto', **kwargs_from_env())
        # Build a progress setup for each layer, and only emit per-layer
        # info every 1.5s
        layers = {}
        last_emit_time = time.time()
        for line in client.push(self.output_image_spec, stream=True):
            progress = json.loads(line.decode('utf-8'))
            if 'error' in progress:
                self.log.error(progress['error'], extra=dict(phase='failed'))
                sys.exit(1)
            if 'id' not in progress:
                continue
            if 'progressDetail' in progress and progress['progressDetail']:
                layers[progress['id']] = progress['progressDetail']
            else:
                layers[progress['id']] = progress['status']
            if time.time() - last_emit_time > 1.5:
                self.log.info('Pushing image\n',
                              extra=dict(progress=layers, phase='pushing'))
                last_emit_time = time.time()
项目:repo2docker    作者:jupyter    | 项目源码 | 文件源码
def build(self, image_spec, memory_limit, build_args):
        limits = {
            # Always disable memory swap for building, since mostly
            # nothing good can come of that.
            'memswap': -1
        }
        if memory_limit:
            limits['memory'] = memory_limit
        client = docker.APIClient(version='auto', **docker.utils.kwargs_from_env())
        for line in client.build(
                path=os.getcwd(),
                dockerfile=self.binder_path(self.dockerfile),
                tag=image_spec,
                buildargs=build_args,
                decode=True,
                forcerm=True,
                rm=True,
                container_limits=limits
        ):
            yield line
项目:AJudge    作者:AJudge-team    | 项目源码 | 文件源码
def __init__(self, host_port=None):
        """
        we don't know docker client is thread-safe
        if Sandbox.docker_client is None:
            Sandbox.docker_client = docker.APIClient(base_url='unix://var/run/docker.sock')
        """

        IMG_SRC = "ajudgeteam/ajudge:base"
        DEFAULT_CONTAINER_PORT = 50000

        if host_port is None:
            raise Exception('host_port cannot be None. you must set it.')

        self.docker_client = docker.APIClient(base_url="unix://var/run/docker.sock")
        self.host_config = self.docker_client.create_host_config(port_bindings={DEFAULT_CONTAINER_PORT: host_port})
        self.container = self.docker_client.create_container(IMG_SRC, "/bin/bash", detach=True, tty=True,
                                                             ports=[DEFAULT_CONTAINER_PORT],
                                                             host_config=self.host_config)
        self.docker_client.start(container=self.container.get('Id'))
项目:ecsctl    作者:cxmcc    | 项目源码 | 文件源码
def exec_command(self):
        first_container_name, hostname = self.get_ecs_hostname_of_task()
        if self.container is None:
            container = first_container_name
        else:
            container = self.container
        docker_url = '%s:%d' % (hostname, self.port)
        client = docker.APIClient(
            docker_url,
            version=self.api_version,
        )
        container_id = self.find_container_id(client, container)
        resp = client.exec_create(
            container_id, self.command, stdin=self.stdin, tty=self.tty
        )
        exec_id = resp['Id']
        dockerpty.start_exec(client, exec_id, interactive=self.stdin)
项目:tsaotun    作者:qazbnm456    | 项目源码 | 文件源码
def __init__(self, host):
        """Loading docker environments"""
        if platform.system() == 'Darwin' or platform.system() == 'Windows':
            try:
                # TLS problem, can be referenced from
                # https://github.com/docker/machine/issues/1335
                from docker.utils import kwargs_from_env
                self.host = '{0}'.format(urlparse.urlparse(
                    os.environ['DOCKER_HOST']).netloc.split(':')[0])
                self.client = APIClient(
                    base_url='{0}'.format(os.environ['DOCKER_HOST']))
                kwargs = kwargs_from_env()
                kwargs['tls'].assert_hostname = False
                self.client = APIClient(**kwargs)
            except KeyError:
                self.host = host
                self.client = APIClient(base_url=host)
        else:
            self.host = host
            self.client = APIClient(base_url=host)
        self.client.ping()
项目:docker-db    作者:EXASOL    | 项目源码 | 文件源码
def __init__(self, verbose=False, quiet=False):
        """
        Creates a new docker_handler and a docker.APIClient (used for communication with the docker service).
        """
        self.client = docker.APIClient(timeout=20, **kwargs_from_env())
        self.verbose = verbose
        self.quiet = quiet
        if self.quiet:
            self.verbose = False
        self.def_container_cmd = None 
#}}}

#{{{ log
项目:docker-grader    作者:elsys    | 项目源码 | 文件源码
def __init__(self):
        self.cli = APIClient(base_url="unix://var/run/docker.sock")
项目:SwarmSpawner    作者:cassinyio    | 项目源码 | 文件源码
def client(self):
        """single global client instance"""
        cls = self.__class__

        if cls._client is None:
            kwargs = {}
            if self.tls_config:
                kwargs['tls'] = docker.tls.TLSConfig(**self.tls_config)
            kwargs.update(kwargs_from_env())
            client = docker.APIClient(version='auto', **kwargs)

            cls._client = client
        return cls._client
项目:kuryr-libnetwork    作者:openstack    | 项目源码 | 文件源码
def setUp(self):
        super(KuryrBaseTest, self).setUp()
        self.docker_client = docker.APIClient(
            base_url='tcp://0.0.0.0:2375')
        try:
            self.neutron_client = get_neutron_client_from_env()
        except Exception as e:
            # We may missing or didn't source configured openrc file.
            message = ("Missing environment variable %s in your local."
                       "Please add it and also check other missing "
                       "environment variables. After that please source "
                       "the openrc file. "
                       "Trying credentials from DevStack cloud.yaml ...")
            LOG.warning(message, e.args[0])
            self.neutron_client = get_neutron_client_from_creds()
项目:kuryr-libnetwork    作者:openstack    | 项目源码 | 文件源码
def setup(self):
        """Create kuryr or non-kuryr docker network, and prepare image cache"""
        try:
            docker_client = docker.APIClient(base_url="tcp://0.0.0.0:2375")

            if self.config["is_kuryr"]:
                ipam = {
                    "Driver": "kuryr",
                    "Options": {},
                    "Config": [
                        {
                            "Subnet": self.config.get("Subnet"),
                            "IPRange": self.config.get("IPRange"),
                            "Gateway": self.config.get("Gateway")
                        }
                    ]
                }
                res = docker_client.create_network(name="kuryr_network",
                                                   driver="kuryr",
                                                   ipam=ipam)
                self.context["netid"] = res.get("Id")
                self.context["netname"] = "kuryr_network"
            else:
                res = docker_client.create_network(name="docker_network")
                self.context["netid"] = res.get("Id")
                self.context["netname"] = "docker_network"
            LOG.debug("Container network id is '%s'" % self.context["netid"])
        except Exception as e:
            msg = "Can't create docker network: %s" % e.message
            if logging.is_debug():
                LOG.exception(msg)
            else:
                LOG.warning(msg)
项目:kuryr-libnetwork    作者:openstack    | 项目源码 | 文件源码
def run(self, network_list_args=None):
        """List the networks.

        Measure the "docker network ls" command performance under kuryr.

        This will call the docker client API to list networks

        TODO (baohua):
        1. may support tenant/user in future.
        2. validation.required_services add KURYR support

        :param network_list_args: dict: names, ids
        """
        self.docker_client = docker.APIClient(base_url='tcp://0.0.0.0:2375')
        self._list_networks(network_list_args or {})
项目:kuryr-libnetwork    作者:openstack    | 项目源码 | 文件源码
def run(self, network_create_args=None):
        """Create and delete a network with kuryr.

        Measure the "docker network create" and "docker network rm" command
        performance with kuryr driver.

        :param network_create_args: dict as options to create the network
        """
        self.docker_client = docker.APIClient(base_url='tcp://0.0.0.0:2375')
        network = self._create_network(is_kuryr=True,
                       network_create_args=network_create_args or {})
        self._delete_network(network)
项目:kuryr-libnetwork    作者:openstack    | 项目源码 | 文件源码
def run(self, network_create_args=None):
        """Create and delete a network without kuryr.

        Measure the "docker network create" and "docker network rm" command
        performance with default driver.

        :param network_create_args: dict as options to create the network
        """
        self.docker_client = docker.APIClient(base_url='tcp://0.0.0.0:2375')
        network = self._create_network(is_kuryr=False,
                       network_create_args=network_create_args or {})
        self._delete_network(network)
项目:worker-manager    作者:nebula-orchestrator    | 项目源码 | 文件源码
def __init__(self):
        self.cli = docker.APIClient(base_url='unix://var/run/docker.sock', version="auto")

    # check network exists:
项目:py-cloud-compute-cannon    作者:Autodesk    | 项目源码 | 文件源码
def get_docker_apiclient(*args, **kwargs):
    import docker

    if hasattr(docker, 'APIClient'):
        ClientClass = docker.APIClient
    else:
        ClientClass = docker.Client
    return ClientClass(*args, **kwargs)
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def getDockerCli(self):
        """
        Helper to interact with local docker instance.
        """
        if self.docker_cli is None:
            self.docker_cli = docker.APIClient(
                base_url='unix://var/run/docker.sock')
        return self.docker_cli
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def getDockerCli(self):
        """
        Helper to interact with local docker instance.
        """
        if self.docker_cli is None:
            self.docker_cli = docker.APIClient(
                base_url='unix://var/run/docker.sock')
        return self.docker_cli
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def docker_container_id(container_name):
    """
    Uses the container name to return the container ID.

    :param container_name: The full name of the docker container.
    :type container_name: ``str``
    :return: Returns the container ID or None if the container is not running or could not be found.
    :rtype: ``dict``
    """
    c = APIClient()
    detail = c.inspect_container(container_name)
    if bool(detail["State"]["Running"]):
        return detail['Id']
    return None
项目:son-emu    作者:sonata-nfv    | 项目源码 | 文件源码
def docker_abs_net_io(container_id):
    """
    Network traffic of all network interfaces within the controller.

    :param container_id: The full ID of the docker container.
    :type container_id: ``str``
    :return: Returns the absolute network I/O till container startup, in bytes. The return dict also contains the
        system time.
    :rtype: ``dict``
    """
    c = APIClient()
    command = c.exec_create(container_id, 'ifconfig')
    ifconfig = c.exec_start(command['Id'])
    sys_time = int(time.time() * 1000000000)

    in_bytes = 0
    m = re.findall('RX bytes:(\d+)', str(ifconfig))
    if m:
        for number in m:
            in_bytes += int(number)
    else:
        in_bytes = None

    out_bytes = 0
    m = re.findall('TX bytes:(\d+)', str(ifconfig))
    if m:
        for number in m:
            out_bytes += int(number)
    else:
        out_bytes = None

    return {'NET_in': in_bytes, 'NET_out': out_bytes, 'NET_systime': sys_time}
项目:python-zunclient    作者:openstack    | 项目源码 | 文件源码
def connect(self):
        try:
            client = docker.APIClient(base_url=self.url)
            self.socket = client.exec_start(self.exec_id, socket=True,
                                            tty=True)
            print('connected to container "%s"' % self.id)
            print('type %s. to disconnect' % self.escape)
        except docker.errors.APIError as e:
            raise exceptions.ConnectionFailed(e)
项目:wrfy    作者:grahame    | 项目源码 | 文件源码
def kill_all(args):
    "kill all running containers"
    cli = Client()
    to_kill = list(sorted(Container.all(cli), key=repr))
    if not to_kill:
        return
    background = ['The following running containers will be killed:\n']
    background += [' - %s\n' % (container) for container in to_kill]
    if not args.force and not confirm_action(
            ''.join(background), 'Kill containers?'):
        return
    for container in to_kill:
        log_action("killing container: %s" % (container))
        log_any_error(lambda: cli.kill(container.get('Id')))
项目:wrfy    作者:grahame    | 项目源码 | 文件源码
def rm_stopped(args):
    "remove all containers which are not running"
    cli = Client()
    to_remove = list(stopped_containers(cli))
    if not to_remove:
        return
    background = ['The following stopped containers will be removed:\n']
    background += [' - %s\n' % (container) for container in to_remove]
    if not args.force and not confirm_action(
            ''.join(background), 'Remove containers?'):
        return
    for container in to_remove:
        log_action("removing container: %s" % (container))
        log_any_error(lambda: cli.remove_container(container.get('Id')))
项目:wrfy    作者:grahame    | 项目源码 | 文件源码
def rmv_dangling(args):
    "remove all dangling volumes"
    cli = Client()
    to_remove = list(dangling_volumes(cli))
    if not to_remove:
        return
    background = ['The following dangling volumes will be removed:\n']
    background += [' - %s\n' % (volume) for volume in to_remove]
    if not args.force and not confirm_action(
            ''.join(background), 'Remove volumes?'):
        return
    for volume in to_remove:
        log_action("removing dangling volume: %s" % (volume))
        cli.remove_volume(volume.name)
项目:wrfy    作者:grahame    | 项目源码 | 文件源码
def doctor(args):
    "check for common issues"
    cli = Client()
    log_issues("containers running from old version of tag", "restart containers", check_latest_image(cli))
    log_issues("dangling volumes", "wrfy rmv-dangling", check_dangling_volumes(cli))
    log_issues("dangling images", "wrfy rmi-dangling", check_untagged_images(cli))
    log_warnings("stopped containers", "wrfy rm-stopped", check_stopped_containers(cli))
项目:BiocImageBuilder    作者:Bioconductor-notebooks    | 项目源码 | 文件源码
def __init__(self, url, name):
        self.url = url
        self.name = name
        if sys.platform == "win32":
            self.cli = docker.from_env()
        else:
            self.cli = docker.APIClient(base_url=url)
项目:kokkuri    作者:LiGhT1EsS    | 项目源码 | 文件源码
def __init__(self):
        super(DockerPot, self).__init__()
        self.client = docker.APIClient(base_url=server_config.DOCKER_BASE_URL)

        logger.info("Docker API Version: {0}".format(self.client.version().get("ApiVersion")))
项目:dhubfs    作者:jeid64    | 项目源码 | 文件源码
def __init__(self, root):
        self.containers = dict()
        self.client = docker.from_env(version='auto')
        self.apiclient = docker.APIClient(base_url='unix://var/run/docker.sock', version='auto')
        self.container = self.client.containers.run("jeidtest/testfile", "/busybox-x86_64 sleep 100000", detach=True)
        inspect_dict = self.apiclient.inspect_container(self.container.id)
        print (inspect_dict)
        dev_name =  inspect_dict['GraphDriver']['Data']['DeviceName']
        print (dev_name)
        self.loc = dev_name[22:]
        path = "/var/lib/docker/devicemapper/mnt/"+self.loc+"/rootfs/"
        self.root = path
项目:aiohttp_admin    作者:aio-libs    | 项目源码 | 文件源码
def docker():
    if os.environ.get('DOCKER_MACHINE_IP') is not None:
        docker = from_env(assert_hostname=False)
    else:
        docker = APIClient(version='auto')
    return docker
项目:autocompose    作者:rapid7    | 项目源码 | 文件源码
def main():
    __require_python_version()

    args = parser.parse_args()
    command = args.COMMAND

    # Check that the user config directory and file exists.
    __setup_config_directory()

    if command not in commands.keys():
        print('Not a command: ' + command)
        exit(-1)

    if args.service_name is not None:
        set_service_name(args.service_name)
        print('service-name set to ' + args.service_name)

    aws_session = boto3.Session(aws_access_key_id=args.aws_access_key_id,
                                aws_secret_access_key=args.aws_secret_access_key,
                                aws_session_token=args.aws_session_token,
                                region_name=args.region,
                                profile_name=args.aws_profile)

    docker_client = docker.APIClient()

    try:
        commands[command].parse_and_execute(args=args.ARGUMENTS, aws_session=aws_session, docker_client=docker_client)
    except Exception as e:
        print("Unexpected error:", sys.exc_info()[1])
项目:aiohttp_json_api    作者:vovanbo    | 项目源码 | 文件源码
def docker():
    return libdocker.APIClient()
项目:stakkr    作者:edyan    | 项目源码 | 文件源码
def get_api_client():
    """Returns the API client or initialize it"""

    if 'api_client' not in __st__:
        from docker import APIClient, utils
        params = utils.kwargs_from_env()
        base_url = None if 'base_url' not in params else params['base_url']
        tls = None if 'tls' not in params else params['tls']

        __st__['api_client'] = APIClient(base_url=base_url, tls=tls)

    return __st__['api_client']
项目:cord    作者:opencord    | 项目源码 | 文件源码
def _docker_connect(self):
        """ Connect to docker daemon """

        try:
            self.dc = DockerClient()
        except requests.ConnectionError:
            LOG.debug("Docker connection not available")
            sys.exit(1)

        if self.dc.ping():
            LOG.debug("Docker server is responding")
        else:
            LOG.error("Unable to ping docker server")
            sys.exit(1)
项目:docker-enforcer    作者:piontec    | 项目源码 | 文件源码
def __init__(self, config: Config, client: APIClient) -> None:
        super().__init__()
        self._padlock = threading.Lock()
        self._check_in_progress: bool = False
        self._config: Config = config
        self._client: APIClient = client
        self._params_cache: Dict[str, Any] = {}
        self.last_check_containers_run_end_timestamp: datetime.datetime = datetime.datetime.min
        self.last_check_containers_run_start_timestamp: datetime.datetime = datetime.datetime.min
        self.last_check_containers_run_time: datetime.timedelta = datetime.timedelta.min
        self.last_periodic_run_ok: bool = False
项目:docker-enforcer    作者:piontec    | 项目源码 | 文件源码
def setUp(self):
        self._config = Config()
        self._client = create_autospec(docker.APIClient)
        self._helper = DockerHelper(self._config, self._client)
        self._cid = "cont_id1"
        self._cid2 = "cont_id2"
        self._params = {"Id": self._cid, "param1": "1"}
        self._params2 = {"Id": self._cid2, "param1": "2"}
项目:lead    作者:derwebcoder    | 项目源码 | 文件源码
def __init__(self):
        self.client = None
        self.ll_client = None
        # docker_api_version=lead_settings.get("docker-api-version")
        docker_api_version = None
        if docker_api_version is not None:
            self.client = docker.from_env(version=docker_api_version)
            self.ll_client = docker.APIClient(version=docker_api_version)
        else:
            self.client = docker.from_env(version="auto")
            self.ll_client = docker.APIClient(version="auto")
项目:kolla    作者:sieve-microservices    | 项目源码 | 文件源码
def dc(self):
        if self._dc is not None:
            return self._dc
        docker_kwargs = self.docker_kwargs.copy()
        self._dc = docker.APIClient(version='auto', **docker_kwargs)
        return self._dc
项目:codebuild-emulator    作者:horiam    | 项目源码 | 文件源码
def wait_for_container(self):
        if self._debug:
            run_thread = threading.Thread(target=self._wait_for_input)
            run_thread.daemon = True
            run_thread.start()

        while True:
            stream = self._container.logs(stdout=True, stderr=True, stream=True, follow=True)
            try:
                for c in stream:
                    sys.stdout.write(c)
                    sys.stdout.flush()
                    if c == '\n':
                        sys.stdout.write('[Container] ')
                        sys.stdout.flush()
                break
            except Exception as e:
                print('\n' + '=' * 128)
                print(str(e))
                print('\n' + '=' * 128)


        if self._debug:
            run_thread.join(timeout=10)

        self._container.reload()

        while not self._container.status == 'exited':
            time.sleep(1)

        docker_api = docker.APIClient(version=self._docker_version)
        exit_code = docker_api.inspect_container(self._container.id)['State']['ExitCode']
        return exit_code
项目:containernet    作者:containernet    | 项目源码 | 文件源码
def getDockerCli(self):
        """
        Helper to interact with local docker instance.
        """
        if self.docker_cli is None:
            self.docker_cli = docker.APIClient(
                base_url='unix://var/run/docker.sock')
        return self.docker_cli
项目:repo2docker    作者:jupyter    | 项目源码 | 文件源码
def run_image(self):
        client = docker.from_env(version='auto')
        port = self._get_free_port()
        if not self.run_cmd:
            port = str(self._get_free_port())
            run_cmd = ['jupyter', 'notebook', '--ip', '0.0.0.0',
                       '--port', port]
            ports = {'%s/tcp' % port: port}
        else:
            run_cmd = self.run_cmd
            ports = {}
        container_volumes = {}
        if self.volumes:
            api_client = docker.APIClient(
                version='auto',
                **docker.utils.kwargs_from_env()
            )
            image = api_client.inspect_image(self.output_image_spec)
            image_workdir = image['ContainerConfig']['WorkingDir']

            for k, v in self.volumes.items():
                container_volumes[os.path.abspath(k)] = {
                    'bind': v if v.startswith('/') else os.path.join(image_workdir, v),
                    'mode': 'rw'
                }

        container = client.containers.run(
            self.output_image_spec,
            ports=ports,
            detach=True,
            command=run_cmd,
            volumes=container_volumes
        )
        while container.status == 'created':
            time.sleep(0.5)
            container.reload()

        try:
            for line in container.logs(stream=True):
                self.log.info(line.decode('utf-8'),
                              extra=dict(phase='running'))
        finally:
            container.reload()
            if container.status == 'running':
                self.log.info('Stopping container...\n',
                              extra=dict(phase='running'))
                container.kill()
            exit_code = container.attrs['State']['ExitCode']
            container.remove()
            sys.exit(exit_code)
项目:repo2docker    作者:jupyter    | 项目源码 | 文件源码
def build(self, image_spec, memory_limit, build_args):
        tarf = io.BytesIO()
        tar = tarfile.open(fileobj=tarf, mode='w')
        dockerfile_tarinfo = tarfile.TarInfo("Dockerfile")
        dockerfile = self.render().encode('utf-8')
        dockerfile_tarinfo.size = len(dockerfile)

        tar.addfile(
            dockerfile_tarinfo,
            io.BytesIO(dockerfile)
        )

        def _filter_tar(tar):
            # We need to unset these for build_script_files we copy into tar
            # Otherwise they seem to vary each time, preventing effective use
            # of the cache!
            # https://github.com/docker/docker-py/pull/1582 is related
            tar.uname = ''
            tar.gname = ''
            tar.uid = 1000
            tar.gid = 1000
            return tar

        for src in sorted(self.build_script_files):
            src_parts = src.split('/')
            src_path = os.path.join(os.path.dirname(__file__), *src_parts)
            tar.add(src_path, src, filter=_filter_tar)

        tar.add('.', 'src/', filter=_filter_tar)

        tar.close()
        tarf.seek(0)

        limits = {
            # Always disable memory swap for building, since mostly
            # nothing good can come of that.
            'memswap': -1
        }
        if memory_limit:
            limits['memory'] = memory_limit
        client = docker.APIClient(version='auto',
                                  **docker.utils.kwargs_from_env())
        for line in client.build(
                fileobj=tarf,
                tag=image_spec,
                custom_context=True,
                buildargs=build_args,
                decode=True,
                forcerm=True,
                rm=True,
                container_limits=limits
        ):
            yield line
项目:kolla    作者:sieve-microservices    | 项目源码 | 文件源码
def __init__(self, conf):
        self.conf = conf
        self.images_dir = self._get_images_dir()
        self.registry = conf.registry
        if self.registry:
            self.namespace = self.registry + '/' + conf.namespace
        else:
            self.namespace = conf.namespace
        self.base = conf.base
        self.base_tag = conf.base_tag
        self.install_type = conf.install_type
        self.tag = conf.tag
        self.base_arch = conf.base_arch
        self.images = list()
        rpm_setup_config = ([repo_file for repo_file in
                             conf.rpm_setup_config if repo_file is not None])
        self.rpm_setup = self.build_rpm_setup(rpm_setup_config)

        rh_base = ['centos', 'oraclelinux', 'rhel']
        rh_type = ['source', 'binary', 'rdo', 'rhos']
        deb_base = ['ubuntu', 'debian']
        deb_type = ['source', 'binary']

        if not ((self.base in rh_base and self.install_type in rh_type) or
                (self.base in deb_base and self.install_type in deb_type)):
            raise exception.KollaMismatchBaseTypeException(
                '{} is unavailable for {}'.format(self.install_type, self.base)
            )

        if self.install_type == 'binary':
            self.install_metatype = 'rdo'
        elif self.install_type == 'source':
            self.install_metatype = 'mixed'
        elif self.install_type == 'rdo':
            self.install_type = 'binary'
            self.install_metatype = 'rdo'
        elif self.install_type == 'rhos':
            self.install_type = 'binary'
            self.install_metatype = 'rhos'
        else:
            raise exception.KollaUnknownBuildTypeException(
                'Unknown install type'
            )

        self.image_prefix = self.base + '-' + self.install_type + '-'

        self.regex = conf.regex
        self.image_statuses_bad = dict()
        self.image_statuses_good = dict()
        self.image_statuses_unmatched = dict()
        self.image_statuses_skipped = dict()
        self.maintainer = conf.maintainer

        docker_kwargs = docker.utils.kwargs_from_env()
        self.dc = docker.APIClient(version='auto', **docker_kwargs)