我们从Python开源项目中,提取了以下43个代码示例,用于说明如何使用six.moves.configparser.SafeConfigParser()。
def get_host_and_realm(self): """Return the hostname and IPA realm name. IPA 4.4 introduced the requirement that the schema be fetched when calling finalize(). This is really only used by the ipa command-line tool but for now it is baked in. So we have to get a TGT first but need the hostname and realm. For now directly read the IPA config file which is in INI format and pull those two values out and return as a tuple. """ config = SafeConfigParser() config.read('/etc/ipa/default.conf') hostname = config.get('global', 'host') realm = config.get('global', 'realm') return (hostname, realm)
def set_environment(self): config = configparser.SafeConfigParser() config.read(self.mcr_filepath) try: op_sys, env_var, mcr_path, set_paths = \ self._get_mcr_config(config, 'custom') except (IOError, ValueError): try: op_sys, env_var, mcr_path, set_paths = \ self._get_mcr_config(config, 'linux_default') except (IOError, ValueError): op_sys, env_var, mcr_path, set_paths = \ self._get_mcr_config(config, 'macosx_default') subprocess_env = os.environ.copy() subprocess_env["MCR_CACHE_ROOT"] = "/tmp/emptydir" subprocess_env["LANG"] = "en_US.utf8" subprocess_env[env_var] = set_paths return subprocess_env, op_sys
def get_lilypond_bin_path(self): config = configparser.SafeConfigParser() lily_cfgfile = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config', 'lilypond.cfg') config.read(lily_cfgfile) # check custom lilypath = config.get('custom', 'custom') # linux path might be given with $HOME; convert it to the real path lilypath = lilypath.replace('$HOME', os.path.expanduser('~')) if lilypath: assert os.path.exists(lilypath), \ 'The lilypond path is not found. Please correct the custom ' \ 'section in "tomato/config/lilypond.cfg".' else: # defaults lilypath = config.defaults()[self.sys_os] assert (os.path.exists(lilypath) or self.call('"which" "{0:s}"'.format(lilypath))[0]), \ 'The lilypond path is not found. Please correct the custom ' \ 'section in "tomato/config/lilypond.cfg".' return lilypath
def parse_config_file(filename): """ Parses a configuration file and returns a settings dictionary. Args: filename (str): File to read configuration settings from. Returns: dict: A dictionary of settings options. """ parser = SafeConfigParser() with open(filename) as fp: parser.readfp(fp) settings = { section: { item[0]: _parse_config_val(item[1]) for item in parser.items(section) } for section in parser.sections() } return settings
def test_passing_config_log(self): """ Test the with log_file """ new_config = configparser.SafeConfigParser() new_config.add_section("scitokens") new_config.set("scitokens", "log_level", "WARNING") tmp_file = tempfile.NamedTemporaryFile() new_config.set("scitokens", "log_file", tmp_file.name) scitokens.set_config(new_config) self.assertEqual(scitokens.utils.config.get("log_level"), "WARNING") self.assertEqual(scitokens.utils.config.get("log_file"), tmp_file.name) # Log a line logger = logging.getLogger("scitokens") logger.error("This is an error") tmp_file.flush() print(os.path.getsize(tmp_file.name)) self.assertTrue(os.path.getsize(tmp_file.name) > 0) tmp_file.close()
def write(cfg_obj, output_file_path): """ Only supports writing out a conflagration object with namespaces that follow the section.key=value pattern that ConfigFile.parse generates """ parser = SafeConfigParser() for k in cfg_obj.__dict__.keys(): parser.add_section(k) try: for sub_k, sub_v in cfg_obj.__dict__[k].__dict__.items(): parser.set(k, sub_k, sub_v) except Exception: raise Exception( "Output to config file not supported for conflagrations" "nested beyond a one dot namespace.") with open(output_file_path, 'w') as f: parser.write(f)
def _load_support_matrix(self): """Reads the support-matrix.ini file and populates an instance of the SupportMatrix class with all the data. :returns: SupportMatrix instance """ cfg = configparser.SafeConfigParser() env = self.state.document.settings.env fname = self.arguments[0] rel_fpath, fpath = env.relfn2path(fname) with open(fpath) as fp: cfg.readfp(fp) # This ensures that the docs are rebuilt whenever the # .ini file changes env.note_dependency(rel_fpath) matrix = SupportMatrix() matrix.targets = self._get_targets(cfg) matrix.features = self._get_features(cfg, matrix.targets) return matrix
def assume_role(self, region, profile): # assume role global connect_args if six.PY3: aws_creds = configparser.ConfigParser() aws_config = configparser.ConfigParser() else: aws_creds = configparser.SafeConfigParser() aws_config = configparser.SafeConfigParser() aws_creds.read(os.path.expanduser("~/.aws/credentials")) aws_config.read(os.path.expanduser("~/.aws/config")) source_profile = self.get_option(aws_config, profile, 'source_profile') arn = self.get_option(aws_config, profile, 'role_arn') aws_access_key = self.get_option(aws_creds, source_profile, 'aws_access_key_id') aws_secret_key = self.get_option(aws_creds, source_profile, 'aws_secret_access_key') session_name = "role_session_name_" + self.boto_profile sts_conn = sts.STSConnection(aws_access_key, aws_secret_key) assume_role = sts_conn.assume_role(role_arn=arn, role_session_name=session_name) connect_args['aws_access_key_id'] = assume_role.credentials.access_key connect_args['aws_secret_access_key'] = assume_role.credentials.secret_key connect_args['security_token'] = assume_role.credentials.session_token
def config_from_ini(self, ini): config = {} parser = configparser.SafeConfigParser() ini = textwrap.dedent(six.u(ini)) parser.readfp(io.StringIO(ini)) for section in parser.sections(): config[section] = dict(parser.items(section)) return config
def config_from_ini(self, ini): config = {} if sys.version_info >= (3, 2): parser = configparser.ConfigParser() else: parser = configparser.SafeConfigParser() ini = textwrap.dedent(six.u(ini)) parser.readfp(io.StringIO(ini)) for section in parser.sections(): config[section] = dict(parser.items(section)) return config
def __init__(self, top_config=None): usr_config = path.expanduser('~/.opp/opp.cfg') sys_config = '/etc/opp/opp.cfg' if not top_config: # User config not provided, attempt to read from env # This is mostly intended to be used for testing try: top_config = environ['OPP_TOP_CONFIG'] except KeyError: pass # Create config list in order of increasing priority cfglist = [] if path.isfile(sys_config): cfglist.append(sys_config) if path.isfile(usr_config): cfglist.append(usr_config) if top_config and path.isfile(top_config): cfglist.append(top_config) if sys.version_info >= (3, 2): self.cfg = configparser.ConfigParser() else: self.cfg = configparser.SafeConfigParser() if cfglist: self.cfg.read(cfglist) # Set default values self.def_sec = "DEFAULT" cfg_defaults = [ ['secret_key', "default-insecure"], ['exp_delta', "300"]] for opt in cfg_defaults: if not self.cfg.has_option(self.def_sec, opt[0]): self.cfg.set(self.def_sec, opt[0], opt[1])
def _get_env(): """ Get the current environment using the ENV_FILE. Returns a ConfigParser. """ parser = configparser.SafeConfigParser() # if env file doesn't exist, copy over the package default if not os.path.exists(ENV_FILE): shutil.copyfile(PKG_ENV_FILE, ENV_FILE) with open(ENV_FILE) as fp: parser.readfp(fp) return parser
def test_passing_config(self): """ Test the passing of a configuration parser object """ new_config = configparser.SafeConfigParser() new_config.add_section("scitokens") new_config.set("scitokens", "log_level", "WARNING") scitokens.set_config(new_config) self.assertEqual(scitokens.utils.config.get("log_level"), "WARNING")
def read(cfgfile): if not os.path.exists(cfgfile): ex = IOError if six.PY2 else FileNotFoundError raise ex('File {name} does not exist.'.format(name=cfgfile)) data = SafeConfigParser() data.read(cfgfile) return data
def __init__(self): self.logger = logging.getLogger(__name__) self.conf = SafeConfigParser() if self.conf.read(CONFIG_FILE): self.logger.debug("Using config file at {0}".format(CONFIG_FILE)) else: self.logger.warning( "Config file {0} not found".format(CONFIG_FILE))
def test_load_settings(self): """ Test that the right calls are made and the right errors generated when loading configuration settings from a configuration file specified by a path string. """ c = config.KmipServerConfig() c._logger = mock.MagicMock() c._parse_settings = mock.MagicMock() # Test that the right calls are made when correctly processing the # configuration file. with mock.patch('os.path.exists') as os_mock: os_mock.return_value = True with mock.patch( 'six.moves.configparser.SafeConfigParser.read' ) as parser_mock: c.load_settings("/test/path/server.conf") c._logger.info.assert_any_call( "Loading server configuration settings from: " "/test/path/server.conf" ) parser_mock.assert_called_with("/test/path/server.conf") self.assertTrue(c._parse_settings.called) # Test that a ConfigurationError is generated when the path is invalid. c._logger.reset_mock() with mock.patch('os.path.exists') as os_mock: os_mock.return_value = False args = ('/test/path/server.conf', ) self.assertRaises( exceptions.ConfigurationError, c.load_settings, *args )
def load_settings(self, path): """ Load configuration settings from the file pointed to by path. This will overwrite all current setting values. Args: path (string): The path to the configuration file containing the settings to load. Required. Raises: ConfigurationError: Raised if the path does not point to an existing file or if a setting value is invalid. """ if not os.path.exists(path): raise exceptions.ConfigurationError( "The server configuration file ('{0}') could not be " "located.".format(path) ) self._logger.info( "Loading server configuration settings from: {0}".format(path) ) parser = configparser.SafeConfigParser() parser.read(path) self._parse_settings(parser)
def __init__(self, wrapped): # this is a SafeConfigParser instance self._conf_values = None # note we do not invoke the cfg_dir setter here, because we do not want anything to be created/copied yet. # first check if there is a config dir set via environment if 'CHAINSAW_CFG_DIR' in os.environ: # TODO: probe? self._cfg_dir = os.environ['CHAINSAW_CFG_DIR'] # try to read default cfg dir elif os.path.isdir(self.DEFAULT_CONFIG_DIR) and os.access(self.DEFAULT_CONFIG_DIR, os.W_OK): self._cfg_dir = self.DEFAULT_CONFIG_DIR # use defaults, have no cfg_dir set. else: self._cfg_dir = '' try: self.load() except RuntimeError as re: warnings.warn("unable to read default configuration file. Logging and " " progress bar handling could behave bad! Error: %s" % re) from chainsaw.util.log import setup_logging, LoggingConfigurationError try: setup_logging(self) except LoggingConfigurationError as e: warnings.warn("Error during logging configuration. Logging might not be functional!" "Error: %s" % e) # wrap this module self.wrapped = wrapped self.__wrapped__ = wrapped
def __read_cfg(self, filenames): config = configparser.SafeConfigParser() try: self._used_filenames = config.read(filenames) except EnvironmentError as e: # note: this file is mission crucial, so fail badly if this is not readable. raise ReadConfigException("FATAL ERROR: could not read default configuration" " file %s\n%s" % (self.default_config_file, e)) return config
def get_config_parser(): import sys python_version = sys.version_info.major return configparser.ConfigParser() if python_version == 3 else configparser.SafeConfigParser()
def read(benchmark_result_file): """ Read benchmark :param benchmark_result_file: benchmark result file :return: {device: {metric: value, }, } """ result = {} config = configparser.SafeConfigParser() with io.open(benchmark_result_file) as fp: config.readfp(fp) # pylint: disable=deprecated-method for section in config.sections(): try: device = config.get(section, _DEVICE) result[device] = {} for metric in Metrics: result[device][metric.value] = config.get( section, metric.value ) except configparser.NoOptionError: _LOGGER.error( 'Incorrect section in %s', benchmark_result_file ) return result
def write(benchmark_result_file, result): """Write benchmark result. Sample output file format: [device0] device = 589d88bd-8098-4041-900e-7fcac18abab3 write_bps = 314572800 read_bps = 314572800 write_iops = 64000 read_iops = 4000 :param benchmark_result_file: benchmark result file :param result: {device: {metric: value, }, } """ config = configparser.SafeConfigParser() device_count = 0 for device, metrics in result.iteritems(): section = _DEVICE + str(device_count) device_count += 1 config.add_section(section) config.set(section, _DEVICE, device) for metric, value in metrics.iteritems(): config.set(section, metric, str(value)) fs.write_safe( benchmark_result_file, config.write, permission=0o644 )
def _load_support_matrix(self): """Reads the support-matrix.ini file and populates an instance of the SupportMatrix class with all the data. :returns: SupportMatrix instance """ # SafeConfigParser was deprecated in Python 3.2 if sys.version_info >= (3, 2): cfg = configparser.ConfigParser() else: cfg = configparser.SafeConfigParser() env = self.state.document.settings.env fname = self.arguments[0] rel_fpath, fpath = env.relfn2path(fname) with open(fpath) as fp: cfg.readfp(fp) # This ensures that the docs are rebuilt whenever the # .ini file changes env.note_dependency(rel_fpath) matrix = SupportMatrix() matrix.targets = self._get_targets(cfg) matrix.features = self._get_features(cfg, matrix.targets) return matrix
def cfg(program, key): cfg = configparser.SafeConfigParser() cfg.read(['weak-local.cfg', 'weak.cfg']) if cfg.has_option(program, key): return cfg.get(program, key) return None # make a butterworth IIR bandpass filter
def __init__(self, filename, defaults=None): self.filename = filename self._parser = configparser.SafeConfigParser() self.defaults = defaults or {} self.read(filename, doraise=False)
def __init__(self, fileName): cp = SafeConfigParser() cp.read(fileName) self.__parser = cp self.fileName = fileName
def get_config_parser(): return configparser.ConfigParser() if sys.version_info.major == 3 else configparser.SafeConfigParser()
def __init__(self, config): self.classification_data = None self.classification_path = config.get( "pkglint", "info_classification_path") self.skip_classification_check = False # a default error message used if we've parsed the # data file, but haven't thrown any exceptions self.bad_classification_data = _("no sections found in data " "file {0}").format(self.classification_path) if os.path.exists(self.classification_path): try: if six.PY2: self.classification_data = \ configparser.SafeConfigParser() self.classification_data.readfp( open(self.classification_path)) else: # SafeConfigParser has been renamed to # ConfigParser in Python 3.2. self.classification_data = \ configparser.ConfigParser() self.classification_data.read_file( open(self.classification_path)) except Exception as err: # any exception thrown here results in a null # classification_data object. We deal with that # later. self.bad_classification_data = _( "unable to parse data file {path}: " "{err}").format( path=self.classification_path, err=err) pass else: self.bad_classification_data = _("missing file {0}").format( self.classification_path) super(ManifestChecker, self).__init__(config)
def __init__(self, config_file=None): if config_file: try: # ConfigParser doesn't do a good job of # error reporting, so we'll just try to open # the file open(config_file, "r").close() except (EnvironmentError) as err: raise PkglintConfigException( _("unable to read config file: {0} ").format( err)) try: if six.PY2: self.config = configparser.SafeConfigParser( defaults) else: # SafeConfigParser has been renamed to # ConfigParser in Python 3.2. self.config = configparser.ConfigParser( defaults) if not config_file: if six.PY2: self.config.readfp( open("/usr/share/lib/pkg/pkglintrc")) else: self.config.read_file( open("/usr/share/lib/pkg/pkglintrc")) self.config.read( [os.path.expanduser("~/.pkglintrc")]) else: self.config.read(config_file) # sanity check our config by looking for a known key self.config.get("pkglint", "log_level") except configparser.Error as err: raise PkglintConfigException( _("missing or corrupt pkglintrc file " "{config_file}: {err}").format(**locals()))
def read_config_file(config_file_path): # type: (str) -> None config_file_path = os.path.abspath(os.path.expanduser(config_file_path)) if not os.path.isfile(config_file_path): raise IOError("Could not read config file {}: File not found.".format(config_file_path)) parser = SafeConfigParser() parser.read(config_file_path) for section in parser.sections(): bots_config[section] = { "email": parser.get(section, 'email'), "key": parser.get(section, 'key'), "site": parser.get(section, 'site'), }
def do_docker_create(self, label, parameters, environment, name, image, volumes, memory_limit, folders, command): """ Create necessary directories in a working directory for the mounts in the containers. Write .ini file filled with given parameters in each folder. Create a new docker container from a given image and return the id of the container """ # Create needed folders for mounts for folder in folders: try: os.makedirs(folder, 0o2775) # Path already exists, ignore except OSError: if not os.path.isdir(folder): raise # Create ini file for containers config = configparser.SafeConfigParser() for section in parameters: if not config.has_section(section): config.add_section(section) for key, value in parameters[section].items(): # TODO: find more elegant solution for this! ugh! if not key == 'units': if not config.has_option(section, key): config.set(*map(str, [section, key, value])) for folder in folders: with open(os.path.join(folder, 'input.ini'), 'w') as f: config.write(f) # Yes, the ConfigParser writes to f # Create docker container client = Client(base_url=settings.DOCKER_URL) # We could also pass mem_reservation since docker-py 1.10 config = client.create_host_config(binds=volumes, mem_limit=memory_limit) container = client.create_container( image, # docker image name=name, host_config=config, # mounts command=command, # command to run environment=environment, # {'uuid' = ""} for cloud fs sync labels=label # type of container ) container_id = container.get('Id') return container_id, ""
def test_do_docker_create(self, mockClient): """ Assert that the docker_create task calls the docker client.create_container() function. """ image = "IMAGENAME" volumes = ['/:/data/output:z', '/:/data/input:ro'] memory_limit = '1g' command = "echo test" config = {} environment = {'a': 1, 'b': 2} label = {"type": "delft3d"} folder = ['input', 'output'] name = 'test-8172318273' workingdir = os.path.join(os.getcwd(), 'test') folders = [os.path.join(workingdir, f) for f in folder] parameters = {u'test': {u'1': u'a', u'2': u'b', 'units': 'ignoreme'} } mockClient.return_value.create_host_config.return_value = config do_docker_create.delay(label, parameters, environment, name, image, volumes, memory_limit, folders, command) # Assert that docker is called mockClient.return_value.create_container.assert_called_with( image, host_config=config, command=command, name=name, environment=environment, labels=label ) # Assert that folders are created listdir = os.listdir(workingdir) for f in listdir: self.assertIn(f, listdir) for folder in folders: ini = os.path.join(folder, 'input.ini') self.assertTrue(os.path.isfile(ini)) config = configparser.SafeConfigParser() config.readfp(open(ini)) for key in parameters.keys(): self.assertTrue(config.has_section(key)) for option, value in parameters[key].items(): if option != 'units': self.assertTrue(config.has_option(key, option)) self.assertEqual(config.get(key, option), value) else: # units should be ignored self.assertFalse(config.has_option(key, option))
def _get_config(self): config_file = os.environ.get('ZUNCLIENT_TEST_CONFIG', DEFAULT_CONFIG_FILE) # SafeConfigParser was deprecated in Python 3.2 if six.PY3: config = config_parser.ConfigParser() else: config = config_parser.SafeConfigParser() if not config.read(config_file): self.skipTest('Skipping, no test config found @ %s' % config_file) try: auth_strategy = config.get('functional', 'auth_strategy') except config_parser.NoOptionError: auth_strategy = 'keystone' if auth_strategy not in ['keystone', 'noauth']: raise self.fail( 'Invalid auth type specified: %s in functional must be ' 'one of: [keystone, noauth]' % auth_strategy) conf_settings = [] keystone_v3_conf_settings = [] if auth_strategy == 'keystone': conf_settings += ['os_auth_url', 'os_username', 'os_password', 'os_project_name', 'os_identity_api_version'] keystone_v3_conf_settings += ['os_user_domain_id', 'os_project_domain_id'] else: conf_settings += ['os_auth_token', 'zun_url'] cli_flags = {} missing = [] for c in conf_settings + keystone_v3_conf_settings: try: cli_flags[c] = config.get('functional', c) except config_parser.NoOptionError: # NOTE(vdrok): Here we ignore the absence of KS v3 options as # v2 may be used. Keystone client will do the actual check of # the parameters' correctness. if c not in keystone_v3_conf_settings: missing.append(c) if missing: self.fail('Missing required setting in test.conf (%(conf)s) for ' 'auth_strategy=%(auth)s: %(missing)s' % {'conf': config_file, 'auth': auth_strategy, 'missing': ','.join(missing)}) return cli_flags
def set_config(config = None): """ Set the configuration of SciTokens library :param config: config may be: A full path to a ini configuration file, A ConfigParser instance, or None, which will use all defaults. """ global configuration # pylint: disable=C0103 if isinstance(config, six.string_types): configuration = configparser.SafeConfigParser(CONFIG_DEFAULTS) configuration.read([config]) elif isinstance(config, configparser.RawConfigParser): configuration = config elif config is None: print("Using built-in defaults") configuration = configparser.SafeConfigParser(CONFIG_DEFAULTS) configuration.add_section("scitokens") else: pass logger = logging.getLogger("scitokens") if configuration.has_option("scitokens", "log_file"): log_file = configuration.get("scitokens", "log_file") if log_file is not None: # Create loggers with 100MB files, rotated 5 times logger.addHandler(logging.handlers.RotatingFileHandler(log_file, maxBytes=100 * (1024*1000), backupCount=5)) else: logger.addHandler(logging.StreamHandler()) # Set the logging log_level = configuration.get("scitokens", "log_level") if log_level == "DEBUG": logger.setLevel(logging.DEBUG) elif log_level == "INFO": logger.setLevel(logging.INFO) elif log_level == "WARNING": logger.setLevel(logging.WARNING) elif log_level == "ERROR": logger.setLevel(logging.ERROR) elif log_level == "CRITICAL": logger.setLevel(logging.CRITICAL) else: logger.setLevel(logging.WARNING)
def benchmark(directory, volume=BENCHMARK_VOLUME, rw_type=BENCHMARK_RW_TYPE, job_number=BENCHMARK_JOB_NUMBER, thread_number=BENCHMARK_THREAD_NUMBER, block_size=BENCHMARK_IOPS_BLOCK_SIZE, max_seconds=BENCHMARK_MAX_SECONDS): """Use fio to do benchmark. """ result = {} config_file = os.path.join(directory, _BENCHMARK_CONFIG_FILE) result_file = os.path.join(directory, _BENCHMARK_RESULT_FILE) # prepare fio config config = configparser.SafeConfigParser() global_section = 'global' config.add_section(global_section) config.set(global_section, 'group_reporting', '1') config.set(global_section, 'unlink', '1') config.set(global_section, 'time_based', '1') config.set(global_section, 'direct', '1') config.set(global_section, 'size', volume) config.set(global_section, 'rw', rw_type) config.set(global_section, 'numjobs', job_number) config.set(global_section, 'iodepth', thread_number) config.set(global_section, 'bs', block_size) config.set(global_section, 'runtime', max_seconds) drive_section = 'drive' config.add_section(drive_section) config.set(drive_section, 'directory', directory) fs.write_safe( config_file, lambda f: config.write(EqualSpaceRemover(f)) ) # start fio ret = subproc.call( ['fio', config_file, '--norandommap', '--minimal', '--output', result_file] ) # parse fio terse result # http://fio.readthedocs.io/en/latest/fio_doc.html#terse-output if ret == 0: with io.open(result_file) as fp: metric_list = fp.read().split(';') result[Metrics.READ_BPS.value] = int( float(metric_list[6]) * 1024 ) result[Metrics.READ_IOPS.value] = int(metric_list[7]) result[Metrics.WRITE_BPS.value] = int( float(metric_list[47]) * 1024 ) result[Metrics.WRITE_IOPS.value] = int(metric_list[48]) return result
def read_settings(self): ''' Reads the settings from the profitbricks_inventory.ini file ''' if six.PY3: config = configparser.ConfigParser() else: config = configparser.SafeConfigParser() config.read(os.path.dirname(os.path.realpath(__file__)) + '/profitbricks_inventory.ini') # Credentials if config.has_option('profitbricks', 'subscription_user'): self.subscription_user = config.get('profitbricks', 'subscription_user') if config.has_option('profitbricks', 'subscription_password'): self.subscription_password = config.get('profitbricks', 'subscription_password') if config.has_option('profitbricks', 'subscription_password_file'): self.subscription_password_file = config.get('profitbricks', 'subscription_password_file') if config.has_option('profitbricks', 'api_url'): self.api_url = config.get('profitbricks', 'api_url') # Cache if config.has_option('profitbricks', 'cache_path'): self.cache_path = config.get('profitbricks', 'cache_path') if config.has_option('profitbricks', 'cache_max_age'): self.cache_max_age = config.getint('profitbricks', 'cache_max_age') # Group variables if config.has_option('profitbricks', 'vars'): self.vars = ast.literal_eval(config.get('profitbricks', 'vars')) # Groups group_by_options = [ 'group_by_datacenter_id', 'group_by_location', 'group_by_availability_zone', 'group_by_image_name', 'group_by_licence_type' ] for option in group_by_options: if config.has_option('profitbricks', option): setattr(self, option, config.getboolean('profitbricks', option)) else: setattr(self, option, True) # Inventory Hostname option = 'server_name_as_inventory_hostname' if config.has_option('profitbricks', option): setattr(self, option, config.getboolean('profitbricks', option)) else: setattr(self, option, False)
def parse_config_file(self, config_file): config = SafeConfigParser(self.DEFAULT_CONFIG) config.readfp(config_file) blessconfig = { 'CLIENT_CONFIG': { 'domain_regex': config.get('CLIENT', 'domain_regex'), 'cache_dir': config.get('CLIENT', 'cache_dir'), 'cache_file': config.get('CLIENT', 'cache_file'), 'mfa_cache_dir': config.get('CLIENT', 'mfa_cache_dir'), 'mfa_cache_file': config.get('CLIENT', 'mfa_cache_file'), 'ip_urls': [s.strip() for s in config.get('CLIENT', 'ip_urls').split(",")], 'update_script': config.get('CLIENT', 'update_script'), 'user_session_length': int(config.get('CLIENT', 'user_session_length')), 'usebless_role_session_length': int(config.get('CLIENT', 'usebless_role_session_length')), }, 'BLESS_CONFIG': { 'userrole': config.get('LAMBDA', 'user_role'), 'accountid': config.get('LAMBDA', 'account_id'), 'functionname': config.get('LAMBDA', 'functionname'), 'functionversion': config.get('LAMBDA', 'functionversion'), 'certlifetime': config.getint('LAMBDA', 'certlifetime'), 'ipcachelifetime': config.getint('LAMBDA', 'ipcachelifetime'), 'timeoutconfig': { 'connect': config.getint('LAMBDA', 'timeout_connect'), 'read': config.getint('LAMBDA', 'timeout_read') } }, 'AWS_CONFIG': { 'bastion_ips': config.get('MAIN', 'bastion_ips'), 'remote_user': config.get('MAIN', 'remote_user') }, 'REGION_ALIAS': {} } regions = config.get('MAIN', 'region_aliases').split(",") regions = [region.strip() for region in regions] for region in regions: region = region.upper() kms_region_key = 'KMSAUTH_CONFIG_{}'.format(region) blessconfig.update({kms_region_key: self._get_region_kms_config(region, config)}) blessconfig['REGION_ALIAS'].update({region: blessconfig[kms_region_key]['awsregion']}) return blessconfig
def update_ini_file(self): ''' Update INI file with added number of nodes ''' scriptbasename = "ocp-on-vmware" defaults = {'vmware': { 'ini_path': os.path.join(os.path.dirname(__file__), '%s.ini' % scriptbasename), 'master_nodes':'3', 'infra_nodes':'2', 'storage_nodes': '0', 'app_nodes':'3' } } # where is the config? if six.PY3: config = configparser.ConfigParser() else: config = configparser.SafeConfigParser() vmware_ini_path = os.environ.get('VMWARE_INI_PATH', defaults['vmware']['ini_path']) vmware_ini_path = os.path.expanduser(os.path.expandvars(vmware_ini_path)) config.read(vmware_ini_path) if 'app' in self.node_type: self.app_nodes = int(self.app_nodes) + int(self.node_number) config.set('vmware', 'app_nodes', str(self.app_nodes)) print "Updating %s file with %s app_nodes" % (vmware_ini_path, str(self.app_nodes)) if 'infra' in self.node_type: self.infra_nodes = int(self.infra_nodes) + int(self.node_number) config.set('vmware', 'infra_nodes', str(self.infra_nodes)) print "Updating %s file with %s infra_nodes" % (vmware_ini_path, str(self.infra_nodes)) if 'storage' in self.node_type: if 'clean' in self.tag: self.storage_nodes = int(self.storage_nodes) - int(self.node_number) else: self.storage_nodes = int(self.storage_nodes) + int(self.node_number) config.set('vmware', 'storage_nodes', str(self.storage_nodes)) print "Updating %s file with %s storage_nodes" % (vmware_ini_path, str(self.storage_nodes)) for line in fileinput.input(vmware_ini_path, inplace=True): if line.startswith("app_nodes"): print "app_nodes=" + str(self.app_nodes) elif line.startswith("infra_nodes"): print "infra_nodes=" + str(self.infra_nodes) elif line.startswith("storage_nodes"): print "storage_nodes=" + str(self.storage_nodes) else: print line,