我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用six.moves.configparser.ConfigParser()。
def main(args=None): args = parse_args(args) if not os.path.exists(args.aws_credentials): print("%s does not exist. Please run 'aws configure' or specify an " "alternate credentials file with --aws-credentials." % args.aws_credentials, file=sys.stderr) return USER_RECOVERABLE_ERROR if PY2: credentials = configparser.ConfigParser() else: credentials = configparser.ConfigParser(default_section=None) credentials.read(args.aws_credentials) err = one_mfa(args, credentials) if err != OK: return err if args.rotate_identity_keys: err = rotate(args, credentials) if err != OK: return err if args.env: print_env_vars(credentials, args.target_profile) return OK
def _parse_config(self, domain_name): config_path = os.path.join(utils.CONFIG_PATH, domain_name, 'config') if not os.path.exists(config_path): raise exception.DomainNotFound(domain=domain_name) config = configparser.ConfigParser() config.read(config_path) bmc = {} for item in ('username', 'password', 'address', 'domain_name', 'libvirt_uri', 'libvirt_sasl_username', 'libvirt_sasl_password'): try: value = config.get(DEFAULT_SECTION, item) except configparser.NoOptionError: value = None bmc[item] = value # Port needs to be int bmc['port'] = config.getint(DEFAULT_SECTION, 'port') return bmc
def _get_config(self): config_file = os.environ.get('ECS_TEST_CONFIG_FILE', os.path.join(os.getcwd(), "tests/test.conf")) config = configparser.ConfigParser() config.read(config_file) self.config = config if config.has_section('func_test'): self.token_endpoint = config.get('func_test', 'token_endpoint') self.ecs_endpoint = config.get('func_test', 'ecs_endpoint') self.username = config.get('func_test', 'username') self.password = config.get('func_test', 'password') self.api_version = config.get('func_test', 'api_version') license_file = config.get('func_test', 'license_file') with open(license_file) as f: self.license_text = f.read() else: self.skip_tests = True
def __init__(self): self.config = configparser.ConfigParser({ 'firsttime' : 'yes', 'style' : 'default' }) self.config.add_section('Help Files') self.config.add_section('Layout') self.config.set('Help Files', 'command', 'help_dump.json') self.config.set('Help Files', 'history', 'history.txt') self.config.set('Layout', 'command_description', 'yes') self.config.set('Layout', 'param_description', 'yes') self.config.set('Layout', 'examples', 'yes') azure_folder = get_config_dir() if not os.path.exists(azure_folder): os.makedirs(azure_folder) if not os.path.exists(os.path.join(get_config_dir(), CONFIG_FILE_NAME)): with open(os.path.join(get_config_dir(), CONFIG_FILE_NAME), 'w') as config_file: self.config.write(config_file) else: with open(os.path.join(get_config_dir(), CONFIG_FILE_NAME), 'r') as config_file: self.config.readfp(config_file) # pylint: disable=deprecated-method self.update()
def _parse_config(self, domain_name): config_path = os.path.join(self.config_dir, domain_name, 'config') if not os.path.exists(config_path): raise exception.DomainNotFound(domain=domain_name) config = configparser.ConfigParser() config.read(config_path) bmc = {} for item in ('username', 'password', 'address', 'domain_name', 'libvirt_uri', 'libvirt_sasl_username', 'libvirt_sasl_password'): try: value = config.get(DEFAULT_SECTION, item) except configparser.NoOptionError: value = None bmc[item] = value # Port needs to be int bmc['port'] = config.getint(DEFAULT_SECTION, 'port') return bmc
def read_config_file(self, file="./config.ini"): #read config file Config = configparser.ConfigParser() try: Config.read(file) except Exception as e: self.log.warn("Error reading config file %s" %e) self.log.info("No Roomba specified, and no config file found - " "attempting discovery") if Password(self.address, file): return self.read_config_file(file) else: return False self.log.info("reading info from config file %s" % file) addresses = Config.sections() if self.address is None: if len(addresses) > 1: self.log.warn("config file has entries for %d Roombas, " "only configuring the first!") self.address = addresses[0] self.blid = Config.get(self.address, "blid"), self.password = Config.get(self.address, "password") # self.roombaName = literal_eval( # Config.get(self.address, "data"))["robotname"] return True
def read_string(self, string, source='<string>'): """ Read configuration from a string. A backwards-compatible version of the ConfigParser.read_string() method that was introduced in Python 3. """ # Python 3 added read_string() method if six.PY3: ConfigParser.read_string(self, string, source=source) # Python 2 requires StringIO buffer else: import StringIO self.readfp(StringIO.StringIO(string))
def main(): args = get_options() config = ConfigParser.ConfigParser() config.read(args.conffile) if config.has_option('default', 'pidfile'): pid_fn = os.path.expanduser(config.get('default', 'pidfile')) else: pid_fn = '/var/run/germqtt.pid' if args.foreground: _main(args, config) else: pid = pid_file_module.TimeoutPIDLockFile(pid_fn, 10) with daemon.DaemonContext(pidfile=pid): _main(args, config)
def main(argv): root = os.path.dirname(os.path.dirname(os.path.realpath(__file__))) if os.environ.get('tools_path'): root = os.environ['tools_path'] venv = os.path.join(root, '.venv') if os.environ.get('venv'): venv = os.environ['venv'] pip_requires = os.path.join(root, 'requirements.txt') test_requires = os.path.join(root, 'test-requirements.txt') py_version = "python%s.%s" % (sys.version_info[0], sys.version_info[1]) setup_cfg = configparser.ConfigParser() setup_cfg.read('setup.cfg') project = setup_cfg.get('metadata', 'name') install = install_venv.InstallVenv( root, venv, pip_requires, test_requires, py_version, project) options = install.parse_args(argv) install.check_python_version() install.check_dependencies() install.create_virtualenv(no_site_packages=options.no_site_packages) install.install_dependencies() install.post_process() print_help(project, venv, root)
def _read_config_file(path=None): """ Reads config file. First look for config file in the current directory, then in the user's home directory, then in the same directory as this file. Tries to find config file both with and without preceeding 'dot' for hidden files (prefer non-hidden). """ cfg = configparser.ConfigParser() if path is None: # pragma: no cover dirs = [os.curdir, os.path.expanduser('~'), os.path.dirname(os.path.realpath(__file__))] locations = map(os.path.abspath, dirs) for loc in locations: if cfg.read(os.path.join(loc, 'gpflowrc')): break if cfg.read(os.path.join(loc, '.gpflowrc')): break else: if not cfg.read(path): raise RuntimeError("Config at '{0}' cannot be read".format(path)) return cfg
def __init__(self, config_file, workspace_dir): if not os.path.isfile(config_file): raise Exception("Error: file {} does not " "exist.".format(config_file)) config = configparser.ConfigParser() config.read(config_file) for section in config.sections(): section_options = self.get_config_options_for_section(config, section) self.validate_config_options_for_section(section_options, section) if section == RUNWAY_CONFIG_SECTION: self.runway_options = section_options else: self.sections.append(section) self.components_options[section] = section_options self.workspace_dir = workspace_dir
def parse_config(cfg, ext_config={}): config = {} try: options = cfg.options('pypdns') except ConfigParser.NoSectionError: log.debug('No config files provided and default not present') options = [] for option in options: config[option] = cfg.get('pypdns', option) for ckey, cvalue in iteritems(DEFAULT_CONFIG): if ckey not in config: config[ckey] = cvalue for k in ('endpoint', 'apikey'): if ext_config.get(k) is not None: config[k] = ext_config[k] assert (config.get('endpoint') is not None and config.get('apikey') is not None), 'Configuration not found' return config
def __init__(self): self.config = configparser.ConfigParser({ 'firsttime': 'yes', 'style': 'default' }) self.config.add_section('Help Files') self.config.add_section('Layout') self.config.set('Help Files', 'command', 'help_dump.json') self.config.set('Help Files', 'history', 'history.txt') self.config.set('Help Files', 'frequency', 'frequency.json') self.config.set('Layout', 'command_description', 'yes') self.config.set('Layout', 'param_description', 'yes') self.config.set('Layout', 'examples', 'yes') azure_folder = get_config_dir() if not os.path.exists(azure_folder): os.makedirs(azure_folder) if not os.path.exists(os.path.join(get_config_dir(), CONFIG_FILE_NAME)): with open(os.path.join(get_config_dir(), CONFIG_FILE_NAME), 'w') as config_file: self.config.write(config_file) else: with open(os.path.join(get_config_dir(), CONFIG_FILE_NAME), 'r') as config_file: self.config.readfp(config_file) # pylint: disable=deprecated-method self.update()
def __init__(self, args): self.args = args self.image_width = 256 self.image_height = 256 self.color_map = None self.dimension_colors = None self.dimension_labels = None logger = get_logger() if self.args.color_map: color_map_config = ConfigParser() with FileIO(self.args.color_map, 'r') as config_f: color_map_config.readfp(config_f) self.color_map = parse_color_map_from_configparser(color_map_config) color_label_map = { (int(k), int(k), int(k)): v for k, v in color_map_config.items('color_labels') } sorted_keys = sorted(six.iterkeys(self.color_map)) self.dimension_colors = [self.color_map[k] for k in sorted_keys] self.dimension_labels = [color_label_map.get(k) for k in sorted_keys] logger.debug("dimension_colors: %s", self.dimension_colors) logger.debug("dimension_labels: %s", self.dimension_labels)
def parse_color_map(f): color_map_config = ConfigParser() color_map_config.readfp(f) num_pattern = re.compile(r'(\d+)') rgb_pattern = re.compile(r'\((\d+),(\d+),(\d+)\)') def parse_color(s): m = num_pattern.match(s) if m: x = int(m.group(1)) return (x, x, x) else: m = rgb_pattern.match(s) if m: return (int(m.group(1)), int(m.group(2)), int(m.group(3))) raise Exception('invalid color value: {}'.format(s)) color_map = dict() for k, v in color_map_config.items('color_map'): color_map[parse_color(k)] = parse_color(v) return color_map
def remove_all_auto_profiles(filePath): """ remove all profiles from the credentials file that contain 'auto-refresh-' """ log.info('Removing all autoAwsume profiles') #remove the auto-awsume profiles from the credentials file autoAwsumeProfileParser = ConfigParser.ConfigParser() autoAwsumeProfileParser.read(filePath) #scan all profiles to find auto-refresh profiles for profile in autoAwsumeProfileParser._sections: if 'auto-refresh-' in profile: log.debug('Removing profile ' + profile + ' from credentials file') autoAwsumeProfileParser.remove_section(profile) #save changes autoAwsumeProfileParser.write(open(filePath, 'w'))
def _get_config_object(config_path, use_cashed_config=True): ''' Returns a ConfigParser for the config file at the given path. If no file exists, an empty config file is created. :param config_path: :param use_cashed_config: If set to True, will return the previously created ConfigParser file (if previously created). If set to False, will re-read the config file from disk. If a ConfigParser was previously created, it will not be replaced! :return: ''' if config_path not in _CONFIG_OBJECTS or not use_cashed_config: config = ConfigParser() if not os.path.exists(config_path): with open(config_path,'w') as f: config.write(f) else: config.read(config_path) if use_cashed_config: _CONFIG_OBJECTS[config_path] = config else: config = _CONFIG_OBJECTS[config_path] return config
def init_bridge(): """Parse the configuration file and set relevant variables.""" conf_path = os.path.abspath(os.getenv('WAT_CONF', '')) if not conf_path or not os.path.isfile(conf_path): sys.exit('Could not find configuration file') parser = configparser.ConfigParser() parser.read(conf_path) # Whatsapp settings SETTINGS['wa_phone'] = parser.get('wa', 'phone') SETTINGS['wa_password'] = parser.get('wa', 'password') # Telegram settings SETTINGS['owner'] = parser.getint('tg', 'owner') SETTINGS['tg_token'] = parser.get('tg', 'token') # TinyDB global DB DB = TinyDB(parser.get('db', 'path')) DB.table_class = SmartCacheTable
def _get_config(self, unit, filename): """Get a ConfigParser object for parsing a unit's config file.""" file_contents = unit.file_contents(filename) # NOTE(beisner): by default, ConfigParser does not handle options # with no value, such as the flags used in the mysql my.cnf file. # https://bugs.python.org/issue7005 config = configparser.ConfigParser(allow_no_value=True) config.readfp(io.StringIO(file_contents)) return config
def getConfig(): ''' Retrieve skdaccess configuration @return configParser.ConfigParser object of configuration ''' config_location = os.path.join(os.path.expanduser('~'), '.skdaccess.conf') conf = configparser.ConfigParser() conf.read(config_location) return conf
def writeConfig(conf): ''' Write config to disk @param conf: configparser.ConfigParser object ''' config_location = os.path.join(os.path.expanduser('~'), '.skdaccess.conf') config_handle = open(config_location, "w") conf.write(config_handle) config_handle.close()
def update_credentials_file(filename, target_profile, source_profile, credentials, new_access_key): if target_profile != source_profile: credentials.remove_section(target_profile) # Hack: Python 2's implementation of ConfigParser rejects new sections # named 'default'. if PY2 and target_profile == 'default': # noinspection PyProtectedMember credentials._sections[ target_profile] = configparser._default_dict() else: credentials.add_section(target_profile) for k, v in credentials.items(source_profile): credentials.set(target_profile, k, v) credentials.set(target_profile, 'aws_access_key_id', new_access_key['AccessKeyId']) credentials.set(target_profile, 'aws_secret_access_key', new_access_key['SecretAccessKey']) if 'SessionToken' in new_access_key: credentials.set(target_profile, 'aws_session_token', new_access_key['SessionToken']) credentials.set(target_profile, 'awsmfa_expiration', new_access_key['Expiration'].isoformat()) else: credentials.remove_option(target_profile, 'aws_session_token') credentials.remove_option(target_profile, 'awsmfa_expiration') temp_credentials_file = filename + ".tmp" with open(temp_credentials_file, "w") as out: credentials.write(out) try: os.rename(temp_credentials_file, filename) except WindowsError as E: os.remove(filename) os.rename(temp_credentials_file, filename)
def add(self, username, password, port, address, domain_name, libvirt_uri, libvirt_sasl_username, libvirt_sasl_password): # check libvirt's connection and if domain exist prior to adding it utils.check_libvirt_connection_and_domain( libvirt_uri, domain_name, sasl_username=libvirt_sasl_username, sasl_password=libvirt_sasl_password) domain_path = os.path.join(utils.CONFIG_PATH, domain_name) try: os.makedirs(domain_path) except OSError as e: if e.errno == errno.EEXIST: sys.exit('Domain %s already exist' % domain_name) config_path = os.path.join(domain_path, 'config') with open(config_path, 'w') as f: config = configparser.ConfigParser() config.add_section(DEFAULT_SECTION) config.set(DEFAULT_SECTION, 'username', username) config.set(DEFAULT_SECTION, 'password', password) config.set(DEFAULT_SECTION, 'port', port) config.set(DEFAULT_SECTION, 'address', address) config.set(DEFAULT_SECTION, 'domain_name', domain_name) config.set(DEFAULT_SECTION, 'libvirt_uri', libvirt_uri) if libvirt_sasl_username and libvirt_sasl_password: config.set(DEFAULT_SECTION, 'libvirt_sasl_username', libvirt_sasl_username) config.set(DEFAULT_SECTION, 'libvirt_sasl_password', libvirt_sasl_password) config.write(f)
def __init__(self): config = configparser.ConfigParser() config.read(CONFIG_FILE) self._conf_dict = self._as_dict(config) self._validate()
def setUpModule(): # Backup local config file if it exists if os.path.exists(local_cfg): os.rename(local_cfg, local_cfg + '.bak') # Re-read the configs girder_worker.config = ConfigParser() _cfgs = ('worker.dist.cfg', 'worker.local.cfg') girder_worker.config.read( [os.path.join(girder_worker.PACKAGE_DIR, f) for f in _cfgs])
def _parse_options(self): cp = configparser.ConfigParser() cp.add_section('testoptions') try: cp.read(self.option_file) except NoFileError: pass for name, value in cp.items('testoptions'): conv = self._CONVERTERS.get(name, lambda v: v) self.options[name] = conv(value)
def assume_role(self, region, profile): # assume role global connect_args if six.PY3: aws_creds = configparser.ConfigParser() aws_config = configparser.ConfigParser() else: aws_creds = configparser.SafeConfigParser() aws_config = configparser.SafeConfigParser() aws_creds.read(os.path.expanduser("~/.aws/credentials")) aws_config.read(os.path.expanduser("~/.aws/config")) source_profile = self.get_option(aws_config, profile, 'source_profile') arn = self.get_option(aws_config, profile, 'role_arn') aws_access_key = self.get_option(aws_creds, source_profile, 'aws_access_key_id') aws_secret_key = self.get_option(aws_creds, source_profile, 'aws_secret_access_key') session_name = "role_session_name_" + self.boto_profile sts_conn = sts.STSConnection(aws_access_key, aws_secret_key) assume_role = sts_conn.assume_role(role_arn=arn, role_session_name=session_name) connect_args['aws_access_key_id'] = assume_role.credentials.access_key connect_args['aws_secret_access_key'] = assume_role.credentials.secret_key connect_args['security_token'] = assume_role.credentials.session_token
def get_config(config_file_path="~/.glogcli.cfg"): config = configparser.ConfigParser() try: open(os.path.expanduser(config_file_path)) except Exception: click.echo("[WARNING] - Could not find %s file. Please create the configuration file like:" % config_file_path) click.echo("\n[environment:default]\n" "host=mygraylogserver.com\n" "port=443\n" "username=john.doe\n" "default_stream=*\n" "\n" "[environment:dev]\n" "host=mygraylogserver.dev.com\n" "port=443\n" "proxy=mycompanyproxy.com\n" "username=john.doe\n" "default_stream=57e14cde6fb78216a60d35e8\n" "\n" "[format:default]\n" "format={host} {level} {facility} {timestamp} {message}\n" "\n" "[format:short]\n" "format=[{timestamp}] {level} {message}\n" "\n" "[format:long]\n" "format=time: [{timestamp}] level: {level} msg: {message} tags: {tags}\n" "color=false\n" "\n") config.read(os.path.expanduser(config_file_path)) return config
def add(self, username, password, port, address, domain_name, libvirt_uri, libvirt_sasl_username, libvirt_sasl_password): # check libvirt's connection and if domain exist prior to adding it utils.check_libvirt_connection_and_domain( libvirt_uri, domain_name, sasl_username=libvirt_sasl_username, sasl_password=libvirt_sasl_password) domain_path = os.path.join(self.config_dir, domain_name) try: os.makedirs(domain_path) except OSError as e: if e.errno == errno.EEXIST: raise exception.DomainAlreadyExists(domain=domain_name) raise exception.VirtualBMCError( 'Failed to create domain %(domain)s. Error: %(error)s' % {'domain': domain_name, 'error': e}) config_path = os.path.join(domain_path, 'config') with open(config_path, 'w') as f: config = configparser.ConfigParser() config.add_section(DEFAULT_SECTION) config.set(DEFAULT_SECTION, 'username', username) config.set(DEFAULT_SECTION, 'password', password) config.set(DEFAULT_SECTION, 'port', six.text_type(port)) config.set(DEFAULT_SECTION, 'address', address) config.set(DEFAULT_SECTION, 'domain_name', domain_name) config.set(DEFAULT_SECTION, 'libvirt_uri', libvirt_uri) if libvirt_sasl_username and libvirt_sasl_password: config.set(DEFAULT_SECTION, 'libvirt_sasl_username', libvirt_sasl_username) config.set(DEFAULT_SECTION, 'libvirt_sasl_password', libvirt_sasl_password) config.write(f)
def initialize(self): config = configparser.ConfigParser() config.read(CONFIG_FILE) self._conf_dict = self._as_dict(config) self._validate()
def config_from_file(): if os.path.exists(".osfcli.config"): config_ = configparser.ConfigParser() config_.read(".osfcli.config") # for python2 compatibility config = dict(config_.items('osf')) else: config = {} return config
def init(args): """Initialize or edit an existing .osfcli.config file.""" # reading existing config file, convert to configparser object config = config_from_file() config_ = configparser.ConfigParser() config_.add_section('osf') if 'username' not in config.keys(): config_.set('osf', 'username', '') else: config_.set('osf', 'username', config['username']) if 'project' not in config.keys(): config_.set('osf', 'project', '') else: config_.set('osf', 'project', config['project']) # now we can start asking for new values print('Provide a username for the config file [current username: {}]:'.format( config_.get('osf', 'username'))) username = input() if username: config_.set('osf', 'username', username) print('Provide a project for the config file [current project: {}]:'.format( config_.get('osf', 'project'))) project = input() if project: config_.set('osf', 'project', project) cfgfile = open(".osfcli.config", "w") config_.write(cfgfile) cfgfile.close()
def config_from_ini(self, ini): config = {} if sys.version_info >= (3, 2): parser = configparser.ConfigParser() else: parser = configparser.SafeConfigParser() ini = textwrap.dedent(six.u(ini)) parser.readfp(io.StringIO(ini)) for section in parser.sections(): config[section] = dict(parser.items(section)) return config
def update_credentials(profile, credentials): credentials_file = os.path.expanduser('~/.aws/credentials') config = configparser.ConfigParser() config.read(credentials_file) # Create profile section in credentials file if not config.has_section(profile): config.add_section(profile) # Set access credentials # `aws_security_token` is used by boto # `aws_session_token` is used by aws cli config.set( profile, 'aws_access_key_id', credentials['AccessKeyId']) config.set( profile, 'aws_secret_access_key', credentials['SecretAccessKey']) config.set( profile, 'aws_session_token', credentials['SessionToken']) config.set( profile, 'aws_security_token', credentials['SessionToken']) # Update credentials file with open(credentials_file, 'w') as credentials_file: config.write(credentials_file) print( "# Aws credentials file got updated with temporary access for profile %s" % profile )
def __init__(self, *args, **kwargs): ConfigParser.__init__(self, *args, **kwargs) self.read_string(parameterized_config(DEFAULT_CONFIG)) self.is_validated = False