Python six.moves.configparser 模块,SafeConfigParser() 实例源码

我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用six.moves.configparser.SafeConfigParser()

项目:novajoin    作者:openstack    | 项目源码 | 文件源码
def get_host_and_realm(self):
        """Return the hostname and IPA realm name.

           IPA 4.4 introduced the requirement that the schema be
           fetched when calling finalize(). This is really only used by
           the ipa command-line tool but for now it is baked in.
           So we have to get a TGT first but need the hostname and
           realm. For now directly read the IPA config file which is
           in INI format and pull those two values out and return as
           a tuple.
        """
        config = SafeConfigParser()
        config.read('/etc/ipa/default.conf')
        hostname = config.get('global', 'host')
        realm = config.get('global', 'realm')

        return (hostname, realm)
项目:tomato    作者:sertansenturk    | 项目源码 | 文件源码
def set_environment(self):
        config = configparser.SafeConfigParser()
        config.read(self.mcr_filepath)
        try:
            op_sys, env_var, mcr_path, set_paths = \
                self._get_mcr_config(config, 'custom')
        except (IOError, ValueError):
            try:
                op_sys, env_var, mcr_path, set_paths = \
                    self._get_mcr_config(config, 'linux_default')
            except (IOError, ValueError):
                op_sys, env_var, mcr_path, set_paths = \
                    self._get_mcr_config(config, 'macosx_default')

        subprocess_env = os.environ.copy()
        subprocess_env["MCR_CACHE_ROOT"] = "/tmp/emptydir"
        subprocess_env["LANG"] = "en_US.utf8"
        subprocess_env[env_var] = set_paths

        return subprocess_env, op_sys
项目:tomato    作者:sertansenturk    | 项目源码 | 文件源码
def get_lilypond_bin_path(self):
        config = configparser.SafeConfigParser()
        lily_cfgfile = os.path.join(os.path.dirname(os.path.abspath(__file__)),
                                    'config', 'lilypond.cfg')
        config.read(lily_cfgfile)

        # check custom
        lilypath = config.get('custom', 'custom')

        # linux path might be given with $HOME; convert it to the real path
        lilypath = lilypath.replace('$HOME', os.path.expanduser('~'))

        if lilypath:
            assert os.path.exists(lilypath), \
                'The lilypond path is not found. Please correct the custom ' \
                'section in "tomato/config/lilypond.cfg".'
        else:  # defaults
            lilypath = config.defaults()[self.sys_os]

            assert (os.path.exists(lilypath) or
                    self.call('"which" "{0:s}"'.format(lilypath))[0]), \
                'The lilypond path is not found. Please correct the custom ' \
                'section in "tomato/config/lilypond.cfg".'

        return lilypath
项目:stor    作者:counsyl    | 项目源码 | 文件源码
def parse_config_file(filename):
    """
    Parses a configuration file and returns a settings dictionary.

    Args:
        filename (str): File to read configuration settings from.

    Returns:
        dict: A dictionary of settings options.
    """
    parser = SafeConfigParser()

    with open(filename) as fp:
        parser.readfp(fp)

    settings = {
        section: {
            item[0]: _parse_config_val(item[1])
            for item in parser.items(section)
        }
        for section in parser.sections()
    }

    return settings
项目:scitokens    作者:scitokens    | 项目源码 | 文件源码
def test_passing_config_log(self):
        """
        Test the with log_file
        """

        new_config = configparser.SafeConfigParser()
        new_config.add_section("scitokens")
        new_config.set("scitokens", "log_level", "WARNING")
        tmp_file = tempfile.NamedTemporaryFile()
        new_config.set("scitokens", "log_file", tmp_file.name)

        scitokens.set_config(new_config)

        self.assertEqual(scitokens.utils.config.get("log_level"), "WARNING")
        self.assertEqual(scitokens.utils.config.get("log_file"), tmp_file.name)

        # Log a line
        logger = logging.getLogger("scitokens")
        logger.error("This is an error")
        tmp_file.flush()
        print(os.path.getsize(tmp_file.name))
        self.assertTrue(os.path.getsize(tmp_file.name) > 0)

        tmp_file.close()
项目:conflagration    作者:Machinarum    | 项目源码 | 文件源码
def write(cfg_obj, output_file_path):
        """
        Only supports writing out a conflagration object with namespaces that
        follow the section.key=value pattern that ConfigFile.parse generates
        """
        parser = SafeConfigParser()
        for k in cfg_obj.__dict__.keys():
            parser.add_section(k)
            try:
                for sub_k, sub_v in cfg_obj.__dict__[k].__dict__.items():
                    parser.set(k, sub_k, sub_v)
            except Exception:
                raise Exception(
                    "Output to config file not supported for conflagrations"
                    "nested beyond a one dot namespace.")

        with open(output_file_path, 'w') as f:
            parser.write(f)
项目:Trusted-Platform-Module-nova    作者:BU-NU-CLOUD-SP16    | 项目源码 | 文件源码
def _load_support_matrix(self):
        """Reads the support-matrix.ini file and populates an instance
        of the SupportMatrix class with all the data.

        :returns: SupportMatrix instance
        """

        cfg = configparser.SafeConfigParser()
        env = self.state.document.settings.env
        fname = self.arguments[0]
        rel_fpath, fpath = env.relfn2path(fname)
        with open(fpath) as fp:
            cfg.readfp(fp)

        # This ensures that the docs are rebuilt whenever the
        # .ini file changes
        env.note_dependency(rel_fpath)

        matrix = SupportMatrix()
        matrix.targets = self._get_targets(cfg)
        matrix.features = self._get_features(cfg, matrix.targets)

        return matrix
项目:ansible-tower-automated-deployment    作者:OliverCable    | 项目源码 | 文件源码
def assume_role(self, region, profile):
        # assume role

        global connect_args

        if six.PY3:
            aws_creds  = configparser.ConfigParser()
            aws_config = configparser.ConfigParser()
        else:
            aws_creds  = configparser.SafeConfigParser()
            aws_config = configparser.SafeConfigParser()

        aws_creds.read(os.path.expanduser("~/.aws/credentials"))
        aws_config.read(os.path.expanduser("~/.aws/config"))

        source_profile = self.get_option(aws_config, profile, 'source_profile')
        arn            = self.get_option(aws_config, profile, 'role_arn')
        aws_access_key = self.get_option(aws_creds, source_profile, 'aws_access_key_id')
        aws_secret_key = self.get_option(aws_creds, source_profile, 'aws_secret_access_key')
        session_name   = "role_session_name_" + self.boto_profile

        sts_conn = sts.STSConnection(aws_access_key, aws_secret_key)
        assume_role = sts_conn.assume_role(role_arn=arn, role_session_name=session_name)
        connect_args['aws_access_key_id']     = assume_role.credentials.access_key
        connect_args['aws_secret_access_key'] = assume_role.credentials.secret_key
        connect_args['security_token']        = assume_role.credentials.session_token
项目:devsecops-example-helloworld    作者:boozallen    | 项目源码 | 文件源码
def config_from_ini(self, ini):
        config = {}
        parser = configparser.SafeConfigParser()
        ini = textwrap.dedent(six.u(ini))
        parser.readfp(io.StringIO(ini))
        for section in parser.sections():
            config[section] = dict(parser.items(section))
        return config
项目:RPoint    作者:george17-meet    | 项目源码 | 文件源码
def config_from_ini(self, ini):
        config = {}
        if sys.version_info >= (3, 2):
            parser = configparser.ConfigParser()
        else:
            parser = configparser.SafeConfigParser()
        ini = textwrap.dedent(six.u(ini))
        parser.readfp(io.StringIO(ini))
        for section in parser.sections():
            config[section] = dict(parser.items(section))
        return config
项目:opp    作者:openpassphrase    | 项目源码 | 文件源码
def __init__(self, top_config=None):
        usr_config = path.expanduser('~/.opp/opp.cfg')
        sys_config = '/etc/opp/opp.cfg'

        if not top_config:
            # User config not provided, attempt to read from env
            # This is mostly intended to be used for testing
            try:
                top_config = environ['OPP_TOP_CONFIG']
            except KeyError:
                pass

        # Create config list in order of increasing priority
        cfglist = []
        if path.isfile(sys_config):
            cfglist.append(sys_config)
        if path.isfile(usr_config):
            cfglist.append(usr_config)
        if top_config and path.isfile(top_config):
            cfglist.append(top_config)

        if sys.version_info >= (3, 2):
            self.cfg = configparser.ConfigParser()
        else:
            self.cfg = configparser.SafeConfigParser()

        if cfglist:
            self.cfg.read(cfglist)

        # Set default values
        self.def_sec = "DEFAULT"
        cfg_defaults = [
            ['secret_key', "default-insecure"],
            ['exp_delta', "300"]]
        for opt in cfg_defaults:
            if not self.cfg.has_option(self.def_sec, opt[0]):
                self.cfg.set(self.def_sec, opt[0], opt[1])
项目:stor    作者:counsyl    | 项目源码 | 文件源码
def _get_env():
    """
    Get the current environment using the ENV_FILE.

    Returns a ConfigParser.
    """
    parser = configparser.SafeConfigParser()
    # if env file doesn't exist, copy over the package default
    if not os.path.exists(ENV_FILE):
        shutil.copyfile(PKG_ENV_FILE, ENV_FILE)
    with open(ENV_FILE) as fp:
        parser.readfp(fp)
    return parser
项目:scitokens    作者:scitokens    | 项目源码 | 文件源码
def test_passing_config(self):
        """
        Test the passing of a configuration parser object
        """
        new_config = configparser.SafeConfigParser()
        new_config.add_section("scitokens")
        new_config.set("scitokens", "log_level", "WARNING")

        scitokens.set_config(new_config)

        self.assertEqual(scitokens.utils.config.get("log_level"), "WARNING")
项目:conflagration    作者:Machinarum    | 项目源码 | 文件源码
def read(cfgfile):
        if not os.path.exists(cfgfile):
            ex = IOError if six.PY2 else FileNotFoundError
            raise ex('File {name} does not exist.'.format(name=cfgfile))
        data = SafeConfigParser()
        data.read(cfgfile)
        return data
项目:deb-python-pykmip    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        self.logger = logging.getLogger(__name__)

        self.conf = SafeConfigParser()
        if self.conf.read(CONFIG_FILE):
            self.logger.debug("Using config file at {0}".format(CONFIG_FILE))
        else:
            self.logger.warning(
                "Config file {0} not found".format(CONFIG_FILE))
项目:deb-python-pykmip    作者:openstack    | 项目源码 | 文件源码
def test_load_settings(self):
        """
        Test that the right calls are made and the right errors generated when
        loading configuration settings from a configuration file specified by
        a path string.
        """
        c = config.KmipServerConfig()
        c._logger = mock.MagicMock()
        c._parse_settings = mock.MagicMock()

        # Test that the right calls are made when correctly processing the
        # configuration file.
        with mock.patch('os.path.exists') as os_mock:
            os_mock.return_value = True
            with mock.patch(
                'six.moves.configparser.SafeConfigParser.read'
            ) as parser_mock:
                c.load_settings("/test/path/server.conf")
                c._logger.info.assert_any_call(
                    "Loading server configuration settings from: "
                    "/test/path/server.conf"
                )
                parser_mock.assert_called_with("/test/path/server.conf")
                self.assertTrue(c._parse_settings.called)

        # Test that a ConfigurationError is generated when the path is invalid.
        c._logger.reset_mock()

        with mock.patch('os.path.exists') as os_mock:
            os_mock.return_value = False
            args = ('/test/path/server.conf', )
            self.assertRaises(
                exceptions.ConfigurationError,
                c.load_settings,
                *args
            )
项目:deb-python-pykmip    作者:openstack    | 项目源码 | 文件源码
def load_settings(self, path):
        """
        Load configuration settings from the file pointed to by path.

        This will overwrite all current setting values.

        Args:
            path (string): The path to the configuration file containing
                the settings to load. Required.
        Raises:
            ConfigurationError: Raised if the path does not point to an
                existing file or if a setting value is invalid.
        """
        if not os.path.exists(path):
            raise exceptions.ConfigurationError(
                "The server configuration file ('{0}') could not be "
                "located.".format(path)
            )

        self._logger.info(
            "Loading server configuration settings from: {0}".format(path)
        )

        parser = configparser.SafeConfigParser()
        parser.read(path)
        self._parse_settings(parser)
项目:coordinates    作者:markovmodel    | 项目源码 | 文件源码
def __init__(self, wrapped):
        # this is a SafeConfigParser instance
        self._conf_values = None

        # note we do not invoke the cfg_dir setter here, because we do not want anything to be created/copied yet.
        # first check if there is a config dir set via environment
        if 'CHAINSAW_CFG_DIR' in os.environ:
            # TODO: probe?
            self._cfg_dir = os.environ['CHAINSAW_CFG_DIR']
        # try to read default cfg dir
        elif os.path.isdir(self.DEFAULT_CONFIG_DIR) and os.access(self.DEFAULT_CONFIG_DIR, os.W_OK):
            self._cfg_dir = self.DEFAULT_CONFIG_DIR
        # use defaults, have no cfg_dir set.
        else:
            self._cfg_dir = ''
        try:
            self.load()
        except RuntimeError as re:
            warnings.warn("unable to read default configuration file. Logging and "
                          " progress bar handling could behave bad! Error: %s" % re)

        from chainsaw.util.log import setup_logging, LoggingConfigurationError
        try:
            setup_logging(self)
        except LoggingConfigurationError as e:
            warnings.warn("Error during logging configuration. Logging might not be functional!"
                          "Error: %s" % e)

        # wrap this module
        self.wrapped = wrapped
        self.__wrapped__ = wrapped
项目:coordinates    作者:markovmodel    | 项目源码 | 文件源码
def __read_cfg(self, filenames):
        config = configparser.SafeConfigParser()

        try:
            self._used_filenames = config.read(filenames)
        except EnvironmentError as e:
            # note: this file is mission crucial, so fail badly if this is not readable.
            raise ReadConfigException("FATAL ERROR: could not read default configuration"
                                      " file %s\n%s" % (self.default_config_file, e))
        return config
项目:deb-python-kmip    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        self.logger = logging.getLogger(__name__)

        self.conf = SafeConfigParser()
        if self.conf.read(CONFIG_FILE):
            self.logger.debug("Using config file at {0}".format(CONFIG_FILE))
        else:
            self.logger.warning(
                "Config file {0} not found".format(CONFIG_FILE))
项目:deb-python-kmip    作者:openstack    | 项目源码 | 文件源码
def test_load_settings(self):
        """
        Test that the right calls are made and the right errors generated when
        loading configuration settings from a configuration file specified by
        a path string.
        """
        c = config.KmipServerConfig()
        c._logger = mock.MagicMock()
        c._parse_settings = mock.MagicMock()

        # Test that the right calls are made when correctly processing the
        # configuration file.
        with mock.patch('os.path.exists') as os_mock:
            os_mock.return_value = True
            with mock.patch(
                'six.moves.configparser.SafeConfigParser.read'
            ) as parser_mock:
                c.load_settings("/test/path/server.conf")
                c._logger.info.assert_any_call(
                    "Loading server configuration settings from: "
                    "/test/path/server.conf"
                )
                parser_mock.assert_called_with("/test/path/server.conf")
                self.assertTrue(c._parse_settings.called)

        # Test that a ConfigurationError is generated when the path is invalid.
        c._logger.reset_mock()

        with mock.patch('os.path.exists') as os_mock:
            os_mock.return_value = False
            args = ('/test/path/server.conf', )
            self.assertRaises(
                exceptions.ConfigurationError,
                c.load_settings,
                *args
            )
项目:deb-python-kmip    作者:openstack    | 项目源码 | 文件源码
def load_settings(self, path):
        """
        Load configuration settings from the file pointed to by path.

        This will overwrite all current setting values.

        Args:
            path (string): The path to the configuration file containing
                the settings to load. Required.
        Raises:
            ConfigurationError: Raised if the path does not point to an
                existing file or if a setting value is invalid.
        """
        if not os.path.exists(path):
            raise exceptions.ConfigurationError(
                "The server configuration file ('{0}') could not be "
                "located.".format(path)
            )

        self._logger.info(
            "Loading server configuration settings from: {0}".format(path)
        )

        parser = configparser.SafeConfigParser()
        parser.read(path)
        self._parse_settings(parser)
项目:azure-cli    作者:Azure    | 项目源码 | 文件源码
def get_config_parser():
    import sys

    python_version = sys.version_info.major
    return configparser.ConfigParser() if python_version == 3 else configparser.SafeConfigParser()
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def read(benchmark_result_file):
    """
    Read benchmark
    :param benchmark_result_file: benchmark result file
    :return: {device: {metric: value, }, }
    """
    result = {}
    config = configparser.SafeConfigParser()
    with io.open(benchmark_result_file) as fp:
        config.readfp(fp)  # pylint: disable=deprecated-method
    for section in config.sections():
        try:
            device = config.get(section, _DEVICE)
            result[device] = {}
            for metric in Metrics:
                result[device][metric.value] = config.get(
                    section,
                    metric.value
                )
        except configparser.NoOptionError:
            _LOGGER.error(
                'Incorrect section in %s',
                benchmark_result_file
            )

    return result
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def write(benchmark_result_file, result):
    """Write benchmark result.

    Sample output file format:
        [device0]
        device = 589d88bd-8098-4041-900e-7fcac18abab3
        write_bps = 314572800
        read_bps = 314572800
        write_iops = 64000
        read_iops = 4000

    :param benchmark_result_file:
        benchmark result file
    :param result:
        {device: {metric: value, }, }
    """
    config = configparser.SafeConfigParser()
    device_count = 0
    for device, metrics in result.iteritems():
        section = _DEVICE + str(device_count)
        device_count += 1
        config.add_section(section)
        config.set(section, _DEVICE, device)
        for metric, value in metrics.iteritems():
            config.set(section, metric, str(value))

    fs.write_safe(
        benchmark_result_file,
        config.write,
        permission=0o644
    )
项目:nova-dpm    作者:openstack    | 项目源码 | 文件源码
def _load_support_matrix(self):
        """Reads the support-matrix.ini file and populates an instance
        of the SupportMatrix class with all the data.

        :returns: SupportMatrix instance
        """

        # SafeConfigParser was deprecated in Python 3.2
        if sys.version_info >= (3, 2):
            cfg = configparser.ConfigParser()
        else:
            cfg = configparser.SafeConfigParser()
        env = self.state.document.settings.env
        fname = self.arguments[0]
        rel_fpath, fpath = env.relfn2path(fname)
        with open(fpath) as fp:
            cfg.readfp(fp)

        # This ensures that the docs are rebuilt whenever the
        # .ini file changes
        env.note_dependency(rel_fpath)

        matrix = SupportMatrix()
        matrix.targets = self._get_targets(cfg)
        matrix.features = self._get_features(cfg, matrix.targets)

        return matrix
项目:weakmon    作者:rtmrtmrtmrtm    | 项目源码 | 文件源码
def cfg(program, key):
    cfg = configparser.SafeConfigParser()
    cfg.read(['weak-local.cfg', 'weak.cfg'])

    if cfg.has_option(program, key):
        return cfg.get(program, key)

    return None

# make a butterworth IIR bandpass filter
项目:nodepy    作者:nodepy    | 项目源码 | 文件源码
def __init__(self, filename, defaults=None):
    self.filename = filename
    self._parser = configparser.SafeConfigParser()
    self.defaults = defaults or {}
    self.read(filename, doraise=False)
项目:PyWebDAV3    作者:andrewleech    | 项目源码 | 文件源码
def __init__(self, fileName):
        cp = SafeConfigParser()
        cp.read(fileName)
        self.__parser = cp
        self.fileName = fileName
项目:knack    作者:Microsoft    | 项目源码 | 文件源码
def get_config_parser():
    return configparser.ConfigParser() if sys.version_info.major == 3 else configparser.SafeConfigParser()
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __init__(self, config):

                self.classification_data = None

                self.classification_path = config.get(
                    "pkglint", "info_classification_path")
                self.skip_classification_check = False

                # a default error message used if we've parsed the
                # data file, but haven't thrown any exceptions
                self.bad_classification_data = _("no sections found in data "
                    "file {0}").format(self.classification_path)

                if os.path.exists(self.classification_path):
                        try:
                                if six.PY2:
                                        self.classification_data = \
                                            configparser.SafeConfigParser()
                                        self.classification_data.readfp(
                                            open(self.classification_path))
                                else:
                                        # SafeConfigParser has been renamed to
                                        # ConfigParser in Python 3.2.
                                        self.classification_data = \
                                            configparser.ConfigParser()
                                        self.classification_data.read_file(
                                            open(self.classification_path))
                        except Exception as err:
                                # any exception thrown here results in a null
                                # classification_data object.  We deal with that
                                # later.
                                self.bad_classification_data = _(
                                    "unable to parse data file {path}: "
                                    "{err}").format(
                                    path=self.classification_path,
                                    err=err)
                                pass
                else:
                        self.bad_classification_data = _("missing file {0}").format(
                            self.classification_path)
                super(ManifestChecker, self).__init__(config)
项目:solaris-ips    作者:oracle    | 项目源码 | 文件源码
def __init__(self, config_file=None):

                if config_file:
                        try:
                                # ConfigParser doesn't do a good job of
                                # error reporting, so we'll just try to open
                                # the file
                                open(config_file, "r").close()
                        except (EnvironmentError) as err:
                                raise PkglintConfigException(
                                    _("unable to read config file: {0} ").format(
                                    err))
                try:
                        if six.PY2:
                                self.config = configparser.SafeConfigParser(
                                    defaults)
                        else:
                                # SafeConfigParser has been renamed to
                                # ConfigParser in Python 3.2.
                                self.config = configparser.ConfigParser(
                                    defaults)
                        if not config_file:
                                if six.PY2:
                                        self.config.readfp(
                                            open("/usr/share/lib/pkg/pkglintrc"))
                                else:
                                        self.config.read_file(
                                            open("/usr/share/lib/pkg/pkglintrc"))
                                self.config.read(
                                    [os.path.expanduser("~/.pkglintrc")])
                        else:
                                self.config.read(config_file)

                        # sanity check our config by looking for a known key
                        self.config.get("pkglint", "log_level")
                except configparser.Error as err:
                        raise PkglintConfigException(
                            _("missing or corrupt pkglintrc file "
                            "{config_file}: {err}").format(**locals()))
项目:Hawkeye    作者:tozhengxq    | 项目源码 | 文件源码
def config_from_ini(self, ini):
        config = {}
        parser = configparser.SafeConfigParser()
        ini = textwrap.dedent(six.u(ini))
        parser.readfp(io.StringIO(ini))
        for section in parser.sections():
            config[section] = dict(parser.items(section))
        return config
项目:python-zulip-api    作者:zulip    | 项目源码 | 文件源码
def read_config_file(config_file_path):
    # type: (str) -> None
    config_file_path = os.path.abspath(os.path.expanduser(config_file_path))
    if not os.path.isfile(config_file_path):
        raise IOError("Could not read config file {}: File not found.".format(config_file_path))
    parser = SafeConfigParser()
    parser.read(config_file_path)

    for section in parser.sections():
        bots_config[section] = {
            "email": parser.get(section, 'email'),
            "key": parser.get(section, 'key'),
            "site": parser.get(section, 'site'),
        }
项目:delft3d-gt-server    作者:openearth    | 项目源码 | 文件源码
def do_docker_create(self, label, parameters, environment, name, image,
                     volumes, memory_limit, folders, command):
    """
    Create necessary directories in a working directory
    for the mounts in the containers.

    Write .ini file filled with given parameters in each folder.

    Create a new docker container from a given image and
    return the id of the container
    """
    # Create needed folders for mounts
    for folder in folders:
        try:
            os.makedirs(folder, 0o2775)
        # Path already exists, ignore
        except OSError:
            if not os.path.isdir(folder):
                raise

    # Create ini file for containers
    config = configparser.SafeConfigParser()
    for section in parameters:
        if not config.has_section(section):
            config.add_section(section)
        for key, value in parameters[section].items():

            # TODO: find more elegant solution for this! ugh!
            if not key == 'units':
                if not config.has_option(section, key):
                    config.set(*map(str, [section, key, value]))

    for folder in folders:
        with open(os.path.join(folder, 'input.ini'), 'w') as f:
            config.write(f)  # Yes, the ConfigParser writes to f

    # Create docker container
    client = Client(base_url=settings.DOCKER_URL)
    # We could also pass mem_reservation since docker-py 1.10
    config = client.create_host_config(binds=volumes, mem_limit=memory_limit)
    container = client.create_container(
        image,  # docker image
        name=name,
        host_config=config,  # mounts
        command=command,  # command to run
        environment=environment,  # {'uuid' = ""} for cloud fs sync
        labels=label  # type of container
    )
    container_id = container.get('Id')
    return container_id, ""
项目:delft3d-gt-server    作者:openearth    | 项目源码 | 文件源码
def test_do_docker_create(self, mockClient):
        """
        Assert that the docker_create task
        calls the docker client.create_container() function.
        """
        image = "IMAGENAME"
        volumes = ['/:/data/output:z',
                   '/:/data/input:ro']
        memory_limit = '1g'
        command = "echo test"
        config = {}
        environment = {'a': 1, 'b': 2}
        label = {"type": "delft3d"}
        folder = ['input', 'output']
        name = 'test-8172318273'
        workingdir = os.path.join(os.getcwd(), 'test')
        folders = [os.path.join(workingdir, f) for f in folder]
        parameters = {u'test':
                      {u'1': u'a', u'2': u'b', 'units': 'ignoreme'}
                      }
        mockClient.return_value.create_host_config.return_value = config

        do_docker_create.delay(label, parameters, environment, name,
                               image, volumes, memory_limit, folders, command)

        # Assert that docker is called
        mockClient.return_value.create_container.assert_called_with(
            image,
            host_config=config,
            command=command,
            name=name,
            environment=environment,
            labels=label
        )

        # Assert that folders are created
        listdir = os.listdir(workingdir)
        for f in listdir:
            self.assertIn(f, listdir)

        for folder in folders:
            ini = os.path.join(folder, 'input.ini')
            self.assertTrue(os.path.isfile(ini))

            config = configparser.SafeConfigParser()
            config.readfp(open(ini))
            for key in parameters.keys():
                self.assertTrue(config.has_section(key))
                for option, value in parameters[key].items():
                    if option != 'units':
                        self.assertTrue(config.has_option(key, option))
                        self.assertEqual(config.get(key, option), value)
                    else:  # units should be ignored
                        self.assertFalse(config.has_option(key, option))
项目:python-zunclient    作者:openstack    | 项目源码 | 文件源码
def _get_config(self):
        config_file = os.environ.get('ZUNCLIENT_TEST_CONFIG',
                                     DEFAULT_CONFIG_FILE)
        # SafeConfigParser was deprecated in Python 3.2
        if six.PY3:
            config = config_parser.ConfigParser()
        else:
            config = config_parser.SafeConfigParser()
        if not config.read(config_file):
            self.skipTest('Skipping, no test config found @ %s' % config_file)
        try:
            auth_strategy = config.get('functional', 'auth_strategy')
        except config_parser.NoOptionError:
            auth_strategy = 'keystone'
        if auth_strategy not in ['keystone', 'noauth']:
            raise self.fail(
                'Invalid auth type specified: %s in functional must be '
                'one of: [keystone, noauth]' % auth_strategy)

        conf_settings = []
        keystone_v3_conf_settings = []
        if auth_strategy == 'keystone':
            conf_settings += ['os_auth_url', 'os_username',
                              'os_password', 'os_project_name',
                              'os_identity_api_version']
            keystone_v3_conf_settings += ['os_user_domain_id',
                                          'os_project_domain_id']
        else:
            conf_settings += ['os_auth_token', 'zun_url']

        cli_flags = {}
        missing = []
        for c in conf_settings + keystone_v3_conf_settings:
            try:
                cli_flags[c] = config.get('functional', c)
            except config_parser.NoOptionError:
                # NOTE(vdrok): Here we ignore the absence of KS v3 options as
                # v2 may be used. Keystone client will do the actual check of
                # the parameters' correctness.
                if c not in keystone_v3_conf_settings:
                    missing.append(c)
        if missing:
            self.fail('Missing required setting in test.conf (%(conf)s) for '
                      'auth_strategy=%(auth)s: %(missing)s' %
                      {'conf': config_file,
                       'auth': auth_strategy,
                       'missing': ','.join(missing)})
        return cli_flags
项目:scitokens    作者:scitokens    | 项目源码 | 文件源码
def set_config(config = None):
    """
    Set the configuration of SciTokens library
    :param config: config may be: A full path to a ini configuration file,
        A ConfigParser instance, or None, which will use all defaults.
    """
    global configuration # pylint: disable=C0103

    if isinstance(config, six.string_types):
        configuration = configparser.SafeConfigParser(CONFIG_DEFAULTS)
        configuration.read([config])
    elif isinstance(config, configparser.RawConfigParser):
        configuration = config
    elif config is None:
        print("Using built-in defaults")
        configuration = configparser.SafeConfigParser(CONFIG_DEFAULTS)
        configuration.add_section("scitokens")
    else:
        pass

    logger = logging.getLogger("scitokens")

    if configuration.has_option("scitokens", "log_file"):
        log_file = configuration.get("scitokens", "log_file")
        if log_file is not None:
            # Create loggers with 100MB files, rotated 5 times
            logger.addHandler(logging.handlers.RotatingFileHandler(log_file, maxBytes=100 * (1024*1000), backupCount=5))

    else:
        logger.addHandler(logging.StreamHandler())

    # Set the logging
    log_level = configuration.get("scitokens", "log_level")
    if log_level == "DEBUG":
        logger.setLevel(logging.DEBUG)
    elif log_level == "INFO":
        logger.setLevel(logging.INFO)
    elif log_level == "WARNING":
        logger.setLevel(logging.WARNING)
    elif log_level == "ERROR":
        logger.setLevel(logging.ERROR)
    elif log_level == "CRITICAL":
        logger.setLevel(logging.CRITICAL)
    else:
        logger.setLevel(logging.WARNING)
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def benchmark(directory,
              volume=BENCHMARK_VOLUME,
              rw_type=BENCHMARK_RW_TYPE,
              job_number=BENCHMARK_JOB_NUMBER,
              thread_number=BENCHMARK_THREAD_NUMBER,
              block_size=BENCHMARK_IOPS_BLOCK_SIZE,
              max_seconds=BENCHMARK_MAX_SECONDS):
    """Use fio to do benchmark.
    """
    result = {}
    config_file = os.path.join(directory, _BENCHMARK_CONFIG_FILE)
    result_file = os.path.join(directory, _BENCHMARK_RESULT_FILE)

    # prepare fio config
    config = configparser.SafeConfigParser()
    global_section = 'global'
    config.add_section(global_section)
    config.set(global_section, 'group_reporting', '1')
    config.set(global_section, 'unlink', '1')
    config.set(global_section, 'time_based', '1')
    config.set(global_section, 'direct', '1')
    config.set(global_section, 'size', volume)
    config.set(global_section, 'rw', rw_type)
    config.set(global_section, 'numjobs', job_number)
    config.set(global_section, 'iodepth', thread_number)
    config.set(global_section, 'bs', block_size)
    config.set(global_section, 'runtime', max_seconds)
    drive_section = 'drive'
    config.add_section(drive_section)
    config.set(drive_section, 'directory', directory)
    fs.write_safe(
        config_file,
        lambda f: config.write(EqualSpaceRemover(f))
    )

    # start fio
    ret = subproc.call(
        ['fio', config_file, '--norandommap',
         '--minimal', '--output', result_file]
    )

    # parse fio terse result
    # http://fio.readthedocs.io/en/latest/fio_doc.html#terse-output
    if ret == 0:
        with io.open(result_file) as fp:
            metric_list = fp.read().split(';')
            result[Metrics.READ_BPS.value] = int(
                float(metric_list[6]) * 1024
            )
            result[Metrics.READ_IOPS.value] = int(metric_list[7])
            result[Metrics.WRITE_BPS.value] = int(
                float(metric_list[47]) * 1024
            )
            result[Metrics.WRITE_IOPS.value] = int(metric_list[48])

    return result
项目:ci    作者:dlang    | 项目源码 | 文件源码
def read_settings(self):
        ''' Reads the settings from the profitbricks_inventory.ini file '''

        if six.PY3:
            config = configparser.ConfigParser()
        else:
            config = configparser.SafeConfigParser()

        config.read(os.path.dirname(os.path.realpath(__file__)) + '/profitbricks_inventory.ini')

        # Credentials
        if config.has_option('profitbricks', 'subscription_user'):
            self.subscription_user = config.get('profitbricks', 'subscription_user')
        if config.has_option('profitbricks', 'subscription_password'):
            self.subscription_password = config.get('profitbricks', 'subscription_password')
        if config.has_option('profitbricks', 'subscription_password_file'):
            self.subscription_password_file = config.get('profitbricks', 'subscription_password_file')

        if config.has_option('profitbricks', 'api_url'):
            self.api_url = config.get('profitbricks', 'api_url')

        # Cache
        if config.has_option('profitbricks', 'cache_path'):
            self.cache_path = config.get('profitbricks', 'cache_path')
        if config.has_option('profitbricks', 'cache_max_age'):
            self.cache_max_age = config.getint('profitbricks', 'cache_max_age')

        # Group variables
        if config.has_option('profitbricks', 'vars'):
            self.vars = ast.literal_eval(config.get('profitbricks', 'vars'))

        # Groups
        group_by_options = [
            'group_by_datacenter_id',
            'group_by_location',
            'group_by_availability_zone',
            'group_by_image_name',
            'group_by_licence_type'
        ]
        for option in group_by_options:
            if config.has_option('profitbricks', option):
                setattr(self, option, config.getboolean('profitbricks', option))
            else:
                setattr(self, option, True)

        # Inventory Hostname
        option = 'server_name_as_inventory_hostname'
        if config.has_option('profitbricks', option):
            setattr(self, option, config.getboolean('profitbricks', option))
        else:
            setattr(self, option, False)
项目:python-blessclient    作者:lyft    | 项目源码 | 文件源码
def parse_config_file(self, config_file):
        config = SafeConfigParser(self.DEFAULT_CONFIG)
        config.readfp(config_file)

        blessconfig = {
            'CLIENT_CONFIG': {
                'domain_regex': config.get('CLIENT', 'domain_regex'),
                'cache_dir': config.get('CLIENT', 'cache_dir'),
                'cache_file': config.get('CLIENT', 'cache_file'),
                'mfa_cache_dir': config.get('CLIENT', 'mfa_cache_dir'),
                'mfa_cache_file': config.get('CLIENT', 'mfa_cache_file'),
                'ip_urls': [s.strip() for s in config.get('CLIENT', 'ip_urls').split(",")],
                'update_script': config.get('CLIENT', 'update_script'),
                'user_session_length': int(config.get('CLIENT', 'user_session_length')),
                'usebless_role_session_length': int(config.get('CLIENT', 'usebless_role_session_length')),
            },
            'BLESS_CONFIG': {
                'userrole': config.get('LAMBDA', 'user_role'),
                'accountid': config.get('LAMBDA', 'account_id'),
                'functionname': config.get('LAMBDA', 'functionname'),
                'functionversion': config.get('LAMBDA', 'functionversion'),
                'certlifetime': config.getint('LAMBDA', 'certlifetime'),
                'ipcachelifetime': config.getint('LAMBDA', 'ipcachelifetime'),
                'timeoutconfig': {
                    'connect': config.getint('LAMBDA', 'timeout_connect'),
                    'read': config.getint('LAMBDA', 'timeout_read')
                }
            },
            'AWS_CONFIG': {
                'bastion_ips': config.get('MAIN', 'bastion_ips'),
                'remote_user': config.get('MAIN', 'remote_user')
            },
            'REGION_ALIAS': {}
        }

        regions = config.get('MAIN', 'region_aliases').split(",")
        regions = [region.strip() for region in regions]
        for region in regions:
            region = region.upper()
            kms_region_key = 'KMSAUTH_CONFIG_{}'.format(region)
            blessconfig.update({kms_region_key: self._get_region_kms_config(region, config)})
            blessconfig['REGION_ALIAS'].update({region: blessconfig[kms_region_key]['awsregion']})
        return blessconfig
项目:openshift-ansible-contrib    作者:openshift    | 项目源码 | 文件源码
def update_ini_file(self):
        ''' Update INI file with added number of nodes '''
        scriptbasename = "ocp-on-vmware"
        defaults = {'vmware': {
            'ini_path': os.path.join(os.path.dirname(__file__), '%s.ini' % scriptbasename),
            'master_nodes':'3',
            'infra_nodes':'2',
            'storage_nodes': '0',
            'app_nodes':'3' }
        }
        # where is the config?
        if six.PY3:
            config = configparser.ConfigParser()
        else:
            config = configparser.SafeConfigParser()

        vmware_ini_path = os.environ.get('VMWARE_INI_PATH', defaults['vmware']['ini_path'])
        vmware_ini_path = os.path.expanduser(os.path.expandvars(vmware_ini_path))
        config.read(vmware_ini_path)


        if 'app' in self.node_type:
            self.app_nodes = int(self.app_nodes) + int(self.node_number)
            config.set('vmware', 'app_nodes', str(self.app_nodes))
            print "Updating %s file with %s app_nodes" % (vmware_ini_path, str(self.app_nodes))
        if 'infra' in self.node_type:
            self.infra_nodes = int(self.infra_nodes) + int(self.node_number)
            config.set('vmware', 'infra_nodes', str(self.infra_nodes))
            print "Updating %s file with %s infra_nodes" % (vmware_ini_path, str(self.infra_nodes))
        if 'storage' in self.node_type:
            if 'clean' in self.tag:
                self.storage_nodes = int(self.storage_nodes) - int(self.node_number)
            else:
                self.storage_nodes = int(self.storage_nodes) + int(self.node_number)
            config.set('vmware', 'storage_nodes', str(self.storage_nodes))
            print "Updating %s file with %s storage_nodes" % (vmware_ini_path, str(self.storage_nodes))

        for line in fileinput.input(vmware_ini_path, inplace=True):
            if line.startswith("app_nodes"):
                print "app_nodes=" + str(self.app_nodes)
            elif line.startswith("infra_nodes"):
                print "infra_nodes=" + str(self.infra_nodes)
            elif line.startswith("storage_nodes"):
                print "storage_nodes=" + str(self.storage_nodes)
            else:
                print line,
项目:openshift-ansible    作者:nearform    | 项目源码 | 文件源码
def update_ini_file(self):
        ''' Update INI file with added number of nodes '''
        scriptbasename = "ocp-on-vmware"
        defaults = {'vmware': {
            'ini_path': os.path.join(os.path.dirname(__file__), '%s.ini' % scriptbasename),
            'master_nodes':'3',
            'infra_nodes':'2',
            'storage_nodes': '0',
            'app_nodes':'3' }
        }
        # where is the config?
        if six.PY3:
            config = configparser.ConfigParser()
        else:
            config = configparser.SafeConfigParser()

        vmware_ini_path = os.environ.get('VMWARE_INI_PATH', defaults['vmware']['ini_path'])
        vmware_ini_path = os.path.expanduser(os.path.expandvars(vmware_ini_path))
        config.read(vmware_ini_path)


        if 'app' in self.node_type:
            self.app_nodes = int(self.app_nodes) + int(self.node_number)
            config.set('vmware', 'app_nodes', str(self.app_nodes))
            print "Updating %s file with %s app_nodes" % (vmware_ini_path, str(self.app_nodes))
        if 'infra' in self.node_type:
            self.infra_nodes = int(self.infra_nodes) + int(self.node_number)
            config.set('vmware', 'infra_nodes', str(self.infra_nodes))
            print "Updating %s file with %s infra_nodes" % (vmware_ini_path, str(self.infra_nodes))
        if 'storage' in self.node_type:
            if 'clean' in self.tag:
                self.storage_nodes = int(self.storage_nodes) - int(self.node_number)
            else:
                self.storage_nodes = int(self.storage_nodes) + int(self.node_number)
            config.set('vmware', 'storage_nodes', str(self.storage_nodes))
            print "Updating %s file with %s storage_nodes" % (vmware_ini_path, str(self.storage_nodes))

        for line in fileinput.input(vmware_ini_path, inplace=True):
            if line.startswith("app_nodes"):
                print "app_nodes=" + str(self.app_nodes)
            elif line.startswith("infra_nodes"):
                print "infra_nodes=" + str(self.infra_nodes)
            elif line.startswith("storage_nodes"):
                print "storage_nodes=" + str(self.storage_nodes)
            else:
                print line,