我们从Python开源项目中,提取了以下44个代码示例,用于说明如何使用toml.loads()。
def parse_front_matter(lines): """ Parse lines of front matter """ if not lines: return "toml", {} if lines[0] == "{": # JSON import json return "json", json.loads("\n".join(lines)) if lines[0] == "+++": # TOML import toml return "toml", toml.loads("\n".join(lines[1:-1])) if lines[0] == "---": # YAML import yaml return "yaml", yaml.load("\n".join(lines[1:-1]), Loader=yaml.CLoader) return {}
def getConfigDict(configPath): configDict = None try: configRaw = open(configPath, "rb").read() except IOError: print("ERROR: I/O fatal error.", file = sys.stderr) sys.exit(1) if configRaw.startswith(codecs.BOM_UTF8): configRaw = configRaw[3:] try: configDict = toml.loads(configRaw) except toml.TomlDecodeError as tomlExp: for string in tomlExp.args: print("ERROR: Invalid TOML syntax. " + string, file = sys.stderr) sys.exit(1) except TypeError as typeExp: for string in typeExp.args: print("ERROR: Invalid config file. " + string, file = sys.stderr) sys.exit(1) except: print("ERROR: Invalid config file. Please make sure it is UTF-8 encoded and complies TOML specification.", file = sys.stderr) print("Please review TOML specification at: https://github.com/toml-lang/toml", file = sys.stderr) sys.exit(1) return configDict
def load_from_path(path, mutable=False): """Loads a TOML file from the path :param path: Path to the TOML file :type path: str :param mutable: True if the returned Toml object should be mutable :type mutable: boolean :returns: Map for the configuration file :rtype: Toml | MutableToml """ util.ensure_file_exists(path) util.enforce_file_permissions(path) with util.open_file(path, 'r') as config_file: try: toml_obj = toml.loads(config_file.read()) except Exception as e: raise DCOSException( 'Error parsing config file at [{}]: {}'.format(path, e)) return (MutableToml if mutable else Toml)(toml_obj)
def read_config(args): """ Read configuration options from ~/.shakedown (if exists) :param args: a dict of arguments :type args: dict :return: a dict of arguments :rtype: dict """ configfile = os.path.expanduser('~/.shakedown') if os.path.isfile(configfile): with open(configfile, 'r') as f: config = toml.loads(f.read()) for key in config: param = key.replace('-', '_') if not param in args or args[param] in [False, None]: args[param] = config[key] return args
def _get_log_config(filename=None): """Determines if there is a log config in the config directory and returns it. If it does not exist, return None. Arguments: filename (str): The name of the logging config specific to the transaction processor that is being started. Returns: log_config (dict): The dictionary to pass to logging.config.dictConfig """ if filename is not None: conf_file = os.path.join(get_config_dir(), filename) if os.path.exists(conf_file): with open(conf_file) as fd: raw_config = fd.read() if filename.endswith(".yaml"): log_config = yaml.safe_load(raw_config) else: log_config = toml.loads(raw_config) return log_config return None
def _get_processor_config(filename=None): """Determines if there is a proccesor config in the config directory and returns it. If it does not exist, return None. Arguments: filename (str): The name of the processor config specific to the transaction processor that is being started. Returns: processor_config (dict): The dictionary to set transaction processor """ if filename is not None: conf_file = os.path.join(get_config_dir(), filename) if os.path.exists(conf_file): with open(conf_file) as fd: raw_config = fd.read() log_config = toml.loads(raw_config) return log_config return None
def _load_toml_cli_config(filename=None): if filename is None: filename = os.path.join( _get_config_dir(), 'cli.toml') if not os.path.exists(filename): LOGGER.info( "Skipping CLI config loading from non-existent config file: %s", filename) return {} LOGGER.info("Loading CLI information from config: %s", filename) try: with open(filename) as fd: raw_config = fd.read() except IOError as e: raise CliConfigurationError( "Unable to load CLI configuration file: {}".format(str(e))) return toml.loads(raw_config)
def configure(args): opts = parse_args(args) config = {} if opts["config"] is None: config.update(toml.loads(open(opts["config"]).read())) opts = {key: value for key, value in opts.items() if value is not None} config.update(opts) if config["Verbose"]: print("Configuration:") PP.pprint(config) return config
def _get_config(): """Determines if there is a log config in the config directory and returns it. If it does not exist, return None. Returns: log_config (dict): The dictionary to pass to logging.config.dictConfig """ conf_file = os.path.join(_get_config_dir(), 'log_config.toml') if os.path.exists(conf_file): with open(conf_file) as fd: raw_config = fd.read() log_config = toml.loads(raw_config) return log_config conf_file = os.path.join(_get_config_dir(), 'log_config.yaml') if os.path.exists(conf_file): with open(conf_file) as fd: raw_config = fd.read() log_config = yaml.safe_load(raw_config) return log_config return None
def load_from_path(path, mutable=False): """Loads a TOML file from the path :param path: Path to the TOML file :type path: str :param mutable: True if the returned Toml object should be mutable :type mutable: boolean :returns: Map for the configuration file :rtype: Toml | MutableToml """ util.ensure_file_exists(path) with util.open_file(path, 'r') as config_file: try: toml_obj = toml.loads(config_file.read()) except Exception as e: raise DCOSException( 'Error parsing config file at [{}]: {}'.format(path, e)) return (MutableToml if mutable else Toml)(toml_obj)
def get_config_schema(command): """ :param command: the subcommand name :type command: str :returns: the subcommand's configuration schema :rtype: dict """ # core.* config variables are special. They're valid, but don't # correspond to any particular subcommand, so we must handle them # separately. if command == "core": return json.loads( pkg_resources.resource_string( 'dcos', 'data/config-schema/core.json').decode('utf-8')) executable = subcommand.command_executables(command) return subcommand.config_schema(executable, command)
def loadConfig(self): """Read in TOML-structured config file.""" self.sanityCheckConfigFile() with open(self.configFilenameAbs) as f: config = toml.loads(f.read()) if self.verbose: print("# Loaded config file {}".format(self.configFilenameAbs)) self.config = config
def __init__(self, file): with open(file) as conffile: config = toml.loads(conffile.read()) # Sections of the config file self.general = config['general'] self.katana = config['katana'] self.files = config['files'] self.lcd = config['lcd'] self.leds = config['leds'] self.buttons = config['buttons']
def read(self, config_file): config = {} with open(config_file) as conffile: config = toml.loads(conffile.read()) return config
def read(self, config_file): config = {} with open(config_file) as conffile: config = json.loads(conffile.read()) return config
def get_config(): # Read config parameters from a TOML file. config = None config_file_path = sys.argv[1] with open(config_file_path) as config_file: config = toml.loads(config_file.read()) return config
def load_metadata_from_manifest(section): with open('Cargo.toml', 'rt') as manifest_file: manifest = toml.loads(manifest_file.read()) return (manifest .get('package', {}) .get('metadata', {}) .get(section, {}) )
def get_conf(conf_file_path): """read toml conf file for latter use. :param conf_file_path: absolute path of conf file. :return:a dict contains configured infomation. """ if version_info[0] == 3: with open(conf_file_path, encoding='utf-8') as conf_file: config = toml.loads(conf_file.read()) else: with open(conf_file_path) as conf_file: config = toml.loads(conf_file.read()) return config
def _load_toml(self, event): """Loads TOML if necessary""" return toml.loads(event["toml"]) if "toml" in event else event
def config_file_as_dict(**kwargs): cfgfile = kwargs["cfgfile"] cfgfile_contents = templated_file_contents(kwargs, kwargs["cfgfile"]) # print (cfgfile_contents) cfg_data = {} if cfgfile.endswith(".json"): cfgdata = json.loads(cfgfile_contents) elif cfgfile.endswith(".toml"): cfgdata = toml.loads(cfgfile_contents) elif cfgfile.endswith(".yaml"): yaml.add_constructor('!join', join) # http://stackoverflow.com/questions/5484016/how-can-i-do-string-concatenation-or-string-replacement-in-yaml cfgdata = yaml.load(cfgfile_contents) else: raise ValueError("Invalid config file format") return merge_two_dicts(kwargs, cfgdata)
def test_should_generate_toml_frontmatter(self, pypandocMock, blogObjClass): item = MainTests.posts["items"][0] fm = getFrontMatter(item) fmObj = toml.loads(fm) assert fmObj["title"] == 'title' assert fmObj["id"] == '100' assert fmObj["aliases"][0] == 'url'
def get_config_schema(command): """ :param command: the subcommand name :type command: str :returns: the subcommand's configuration schema :rtype: dict """ # import here to avoid circular import from dcos.subcommand import ( command_executables, config_schema, default_subcommands) # core.* config variables are special. They're valid, but don't # correspond to any particular subcommand, so we must handle them # separately. if command == "core": return json.loads( pkg_resources.resource_string( 'dcos', 'data/config-schema/core.json').decode('utf-8')) elif command in default_subcommands(): return json.loads( pkg_resources.resource_string( 'dcos', 'data/config-schema/{}.json'.format(command)).decode('utf-8')) else: executable = command_executables(command) return config_schema(executable, command)
def parse(self): # Open the Pipfile. with open(self.filename) as f: content = f.read() # Load the default configuration. default_config = { u'source': [{u'url': u'https://pypi.python.org/simple', u'verify_ssl': True, 'name': "pypi"}], u'packages': {}, u'requires': {}, u'dev-packages': {} } config = {} config.update(default_config) # Load the Pipfile's configuration. config.update(toml.loads(content)) # Structure the data for output. data = { '_meta': { 'sources': config['source'], 'requires': config['requires'] }, } # TODO: Validate given data here. self.groups['default'] = config['packages'] self.groups['develop'] = config['dev-packages'] # Update the data structure with group information. data.update(self.groups) return data
def parsed_pipfile(self): # Open the pipfile, read it into memory. with open(self.pipfile_location) as f: contents = f.read() # If any outline tables are present... if ('[packages.' in contents) or ('[dev-packages.' in contents): data = toml.loads(contents) # Convert all outline tables to inline tables. for section in ('packages', 'dev-packages'): for package in data.get(section, {}): # Convert things to inline tables — fancy :) if hasattr(data[section][package], 'keys'): _data = data[section][package] data[section][package] = toml._get_empty_inline_table(dict) data[section][package].update(_data) # We lose comments here, but it's for the best.) try: return contoml.loads(toml.dumps(data, preserve=True)) except RuntimeError: return toml.loads(toml.dumps(data, preserve=True)) else: # Fallback to toml parser, for large files. try: return contoml.loads(contents) except Exception: return toml.loads(contents)
def _lockfile(self): """Pipfile.lock divided by PyPI and external dependencies.""" pfile = pipfile.load(self.pipfile_location) lockfile = json.loads(pfile.lock()) for section in ('default', 'develop'): lock_section = lockfile.get(section, {}) for key in list(lock_section.keys()): norm_key = pep423_name(key) lockfile[section][norm_key] = lock_section.pop(key) return lockfile
def __init__(self, path, builder=Builder()): self._path = path self._dir = os.path.realpath(os.path.dirname(path)) self._builder = builder self._git_config = None self._name = None self._version = None self._description = None self._authors = [] self._homepage = None self._repository = None self._keywords = [] self._python_versions = [] self._dependencies = [] self._dev_dependencies = [] self._pip_dependencies = [] self._pip_dev_dependencies = [] self._features = {} self._scripts = {} self._entry_points = {} self._license = None self._readme = None self._include = [] self._exclude = [] self._extensions = {} with open(self._path) as f: self._config = toml.loads(f.read()) self.load()
def load_file(self, file): with open(file) as conffile: d = toml.loads(conffile.read()) self.i_rate = 1 + d.get('inflation', 0) / 100 # inflation rate: 2.5 -> 1.025 self.r_rate = 1 + d.get('returns', 6) / 100 # invest rate: 6 -> 1.06 self.startage = d['startage'] self.endage = d.get('endage', max(96, self.startage+5)) if 'prep' in d: self.workyr = d['prep']['workyears'] self.maxsave = d['prep']['maxsave'] self.worktax = 1 + d['prep'].get('tax_rate', 25)/100 else: self.workyr = 0 self.retireage = self.startage + self.workyr self.numyr = self.endage - self.retireage self.aftertax = d.get('aftertax', {'bal': 0}) if 'basis' not in self.aftertax: self.aftertax['basis'] = 0 self.IRA = d.get('IRA', {'bal': 0}) if 'maxcontrib' not in self.IRA: self.IRA['maxcontrib'] = 18000 + 5500*2 self.roth = d.get('roth', {'bal': 0}); if 'maxcontrib' not in self.roth: self.roth['maxcontrib'] = 5500*2 self.parse_expenses(d) self.sepp_end = max(5, 59-self.retireage) # first year you can spend IRA reserved for SEPP self.sepp_ratio = 25 # money per-year from SEPP (bal/ratio)
def check_json(filename, contents): if not filename.endswith(".json"): raise StopIteration try: json.loads(contents, object_pairs_hook=check_json_requirements(filename)) except ValueError as e: match = re.search(r"line (\d+) ", e.message) line_no = match and match.group(1) yield (line_no, e.message) except KeyError as e: yield (None, e.message)
def test_normal(self, capsys, table_name, header, value, expected): writer = table_writer_class() writer.table_name = table_name writer.header_list = header writer.value_matrix = value writer.write_table() out, _err = capsys.readouterr() print("[expected]\n{}".format(expected)) print("[actual]\n{}".format(out)) assert toml.loads(out) == toml.loads(expected)
def _loads(content, fmt=None): if fmt == 'toml': return toml.loads(content) elif fmt == 'json': return json.loads(content, object_hook=json_util.object_hook) elif fmt == 'python': return ast.literal_eval(content) elif fmt == 'pickle': return pickle.loads(content) else: return content
def parse_toml(tml_profile): with open(tml_profile) as conf: config = toml.loads(conf.read()) return config # read_policy reads the resource policy (js,css,img etc.) from the conf dict
def build_inheritance(current_fnm): with open(current_fnm) as f: current_dict = toml.loads(f.read()) if 'inherits' in current_dict.keys(): config_dir = os.path.dirname(current_fnm) inherits_fnm = os.path.join(config_dir, current_dict['inherits']) parent_dict = build_inheritance(inherits_fnm) current_dict = update_recursively(parent_dict, current_dict) return current_dict
def _get_dir(toml_config_setting, sawtooth_home_dir, windows_dir, default_dir): """Determines the directory path based on configuration. Arguments: toml_config_setting (str): The name of the config setting related to the directory which will appear in path.toml. sawtooth_home_dir (str): The directory under the SAWTOOTH_HOME environment variable. For example, for 'data' if the data directory is $SAWTOOTH_HOME/data. windows_dir (str): The windows path relative to the computed base directory. default_dir (str): The default path on Linux. Returns: directory (str): The path. """ conf_file = os.path.join(get_config_dir(), 'path.toml') if os.path.exists(conf_file): with open(conf_file) as fd: raw_config = fd.read() toml_config = toml.loads(raw_config) if toml_config_setting in toml_config: return toml_config[toml_config_setting] if 'SAWTOOTH_HOME' in os.environ: return os.path.join(os.environ['SAWTOOTH_HOME'], sawtooth_home_dir) if os.name == 'nt': base_dir = \ os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0]))) return os.path.join(base_dir, windows_dir) return default_dir
def initialize(cls, config_dir, data_dir): # See if our configuration file exists. If so, then we are going to # see if there is a configuration value for the validator ID. If so, # then we'll use that when constructing the simulated anti-Sybil ID. # Otherwise, we are going to fall back on trying to create one that is # unique. validator_id = datetime.datetime.now().isoformat() config_file = os.path.join(config_dir, 'poet_enclave_simulator.toml') if os.path.exists(config_file): LOGGER.info( 'Loading PoET enclave simulator config from : %s', config_file) try: with open(config_file) as fd: toml_config = toml.loads(fd.read()) except IOError as e: LOGGER.info( 'Error loading PoET enclave simulator configuration: %s', e) LOGGER.info('Continuing with default configuration') invalid_keys = set(toml_config.keys()).difference(['validator_id']) if invalid_keys: LOGGER.warning( 'Ignoring invalid keys in PoET enclave simulator config: ' '%s', ', '.join(sorted(list(invalid_keys)))) validator_id = toml_config.get('validator_id', validator_id) LOGGER.debug( 'PoET enclave simulator creating anti-Sybil ID from: %s', validator_id) # Create an anti-Sybil ID that is unique for this validator cls._anti_sybil_id = hashlib.sha256(validator_id.encode()).hexdigest()
def _get_dir(toml_config_setting, sawtooth_home_dir, windows_dir, default_dir): """Determines the directory path based on configuration. Arguments: toml_config_setting (str): The name of the config setting related to the directory which will appear in path.toml. sawtooth_home_dir (str): The directory under the SAWTOOTH_HOME environment variable. For example, for 'data' if the data directory is $SAWTOOTH_HOME/data. windows_dir (str): The windows path relative to the computed base directory. default_dir (str): The default path on Linux. Returns: directory (str): The path. """ conf_file = os.path.join(_get_config_dir(), 'path.toml') if os.path.exists(conf_file): with open(conf_file) as fd: raw_config = fd.read() toml_config = toml.loads(raw_config) if toml_config_setting in toml_config: return toml_config[toml_config_setting] if 'SAWTOOTH_HOME' in os.environ: return os.path.join(os.environ['SAWTOOTH_HOME'], sawtooth_home_dir) if os.name == 'nt': base_dir = \ os.path.dirname(os.path.dirname(os.path.abspath(sys.argv[0]))) return os.path.join(base_dir, windows_dir) return default_dir
def _read_json_request(self): data_string = self.rfile.read( int(self.headers['Content-Length'])) return json.loads(data_string.decode('utf-8'))
def get_server(): config_file = os.path.join(config.get_config_dir(), 'ias_proxy.toml') LOGGER.info('Loading IAS Proxy config from: %s', config_file) # Lack of a config file is a fatal error, so let the exception percolate # up to caller with open(config_file) as fd: proxy_config = toml.loads(fd.read()) # Verify the integrity (as best we can) of the TOML configuration file valid_keys = set(['proxy_name', 'proxy_port', 'ias_url', 'spid_cert_file']) found_keys = set(proxy_config.keys()) invalid_keys = found_keys.difference(valid_keys) if invalid_keys: raise \ ValueError( 'IAS Proxy config file contains the following invalid ' 'keys: {}'.format( ', '.join(sorted(list(invalid_keys))))) missing_keys = valid_keys.difference(found_keys) if missing_keys: raise \ ValueError( 'IAS Proxy config file missing the following keys: ' '{}'.format( ', '.join(sorted(list(missing_keys))))) return IasProxyServer(proxy_config)
def load_toml_rest_api_config(filename): """Returns a RestApiConfig created by loading a TOML file from the filesystem. """ if not os.path.exists(filename): LOGGER.info( "Skipping rest api loading from non-existent config file: %s", filename) return RestApiConfig() LOGGER.info("Loading rest api information from config: %s", filename) try: with open(filename) as fd: raw_config = fd.read() except IOError as e: raise RestApiConfigurationError( "Unable to load rest api configuration file: {}".format(str(e))) toml_config = toml.loads(raw_config) invalid_keys = set(toml_config.keys()).difference( ['bind', 'connect', 'timeout', 'opentsdb_db', 'opentsdb_url', 'opentsdb_username', 'opentsdb_password']) if invalid_keys: raise RestApiConfigurationError( "Invalid keys in rest api config: {}".format( ", ".join(sorted(list(invalid_keys))))) config = RestApiConfig( bind=toml_config.get("bind", None), connect=toml_config.get('connect', None), timeout=toml_config.get('timeout', None), opentsdb_url=toml_config.get('opentsdb_url', None), opentsdb_db=toml_config.get('opentsdb_db', None), opentsdb_username=toml_config.get('opentsdb_username', None), opentsdb_password=toml_config.get('opentsdb_password', None), ) return config
def load_toml_path_config(filename): """Returns a PathConfig created by loading a TOML file from the filesystem. """ if not os.path.exists(filename): LOGGER.info( "Skipping path loading from non-existent config file: %s", filename) return PathConfig() LOGGER.info("Loading path information from config: %s", filename) try: with open(filename) as fd: raw_config = fd.read() except IOError as e: raise LocalConfigurationError( "Unable to load path configuration file: {}".format(str(e))) toml_config = toml.loads(raw_config) invalid_keys = set(toml_config.keys()).difference( ['data_dir', 'key_dir', 'log_dir', 'policy_dir']) if invalid_keys: raise LocalConfigurationError("Invalid keys in path config: {}".format( ", ".join(sorted(list(invalid_keys))))) config = PathConfig( config_dir=None, data_dir=toml_config.get('data_dir', None), key_dir=toml_config.get('key_dir', None), log_dir=toml_config.get('log_dir', None), policy_dir=toml_config.get('policy_dir', None) ) return config
def check_lock(file_name, contents): def find_reverse_dependencies(name, content): for package in itertools.chain([content["root"]], content["package"]): for dependency in package.get("dependencies", []): if dependency.startswith("{} ".format(name)): yield package["name"], dependency if not file_name.endswith(".lock"): raise StopIteration # Package names to be neglected (as named by cargo) exceptions = config["ignore"]["packages"] content = toml.loads(contents) packages_by_name = {} for package in content.get("package", []): if "replace" in package: continue source = package.get("source", "") if source == r"registry+https://github.com/rust-lang/crates.io-index": source = "crates.io" packages_by_name.setdefault(package["name"], []).append((package["version"], source)) for (name, packages) in packages_by_name.iteritems(): if name in exceptions or len(packages) <= 1: continue message = "duplicate versions for package `{}`".format(name) packages.sort() packages_dependencies = list(find_reverse_dependencies(name, content)) for version, source in packages: short_source = source.split("#")[0].replace("git+", "") message += "\n\t\033[93mThe following packages depend on version {} from '{}':\033[0m" \ .format(version, short_source) for name, dependency in packages_dependencies: if version in dependency and short_source in dependency: message += "\n\t\t" + name yield (1, message) # Check to see if we are transitively using any blocked packages for package in content.get("package", []): package_name = package.get("name") package_version = package.get("version") for dependency in package.get("dependencies", []): dependency = dependency.split() dependency_name = dependency[0] whitelist = config['blocked-packages'].get(dependency_name) if whitelist is not None: if package_name not in whitelist: fmt = "Package {} {} depends on blocked package {}." message = fmt.format(package_name, package_version, dependency_name) yield (1, message)
def load_toml_xo_config(filename): """Returns a XOConfig created by loading a TOML file from the filesystem. Args: filename (string): The name of the file to load the config from Returns: config (XOConfig): The XOConfig created from the stored toml file. Raises: LocalConfigurationError """ if not os.path.exists(filename): LOGGER.info( "Skipping transaction proccesor config loading from non-existent" " config file: %s", filename) return XOConfig() LOGGER.info("Loading transaction processor information from config: %s", filename) try: with open(filename) as fd: raw_config = fd.read() except IOError as e: raise LocalConfigurationError( "Unable to load transaction processor configuration file:" " {}".format(str(e))) toml_config = toml.loads(raw_config) invalid_keys = set(toml_config.keys()).difference( ['connect']) if invalid_keys: raise LocalConfigurationError( "Invalid keys in transaction processor config: " "{}".format(", ".join(sorted(list(invalid_keys))))) config = XOConfig( connect=toml_config.get("connect", None) ) return config
def load_toml_settings_config(filename): """Returns a SettingsConfig created by loading a TOML file from the filesystem. Args: filename (string): The name of the file to load the config from Returns: config (SettingsConfig): The SettingsConfig created from the stored toml file. Raises: LocalConfigurationError """ if not os.path.exists(filename): LOGGER.info( "Skipping transaction proccesor config loading from non-existent" " config file: %s", filename) return SettingsConfig() LOGGER.info("Loading transaction processor information from config: %s", filename) try: with open(filename) as fd: raw_config = fd.read() except IOError as e: raise LocalConfigurationError( "Unable to load transaction processor configuration file:" " {}".format(str(e))) toml_config = toml.loads(raw_config) invalid_keys = set(toml_config.keys()).difference( ['connect']) if invalid_keys: raise LocalConfigurationError( "Invalid keys in transaction processor config: " "{}".format(", ".join(sorted(list(invalid_keys))))) config = SettingsConfig( connect=toml_config.get("connect", None) ) return config
def load_toml_identity_config(filename): """Returns a IdentityConfig created by loading a TOML file from the filesystem. Args: filename (string): The name of the file to load the config from Returns: config (IdentityConfig): The IdentityConfig created from the stored toml file. Raises: LocalConfigurationError """ if not os.path.exists(filename): LOGGER.info( "Skipping transaction proccesor config loading from non-existent" " config file: %s", filename) return IdentityConfig() LOGGER.info("Loading transaction processor information from config: %s", filename) try: with open(filename) as fd: raw_config = fd.read() except IOError as e: raise LocalConfigurationError( "Unable to load transaction processor configuration file:" " {}".format(str(e))) toml_config = toml.loads(raw_config) invalid_keys = set(toml_config.keys()).difference( ['connect']) if invalid_keys: raise LocalConfigurationError( "Invalid keys in transaction processor config: " "{}".format(", ".join(sorted(list(invalid_keys))))) config = IdentityConfig( connect=toml_config.get("connect", None) ) return config