我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用ruamel.yaml.safe_load()。
def __init__(self, configfile, dryrun=False, debug=False): self.configfile = configfile self.config = yaml_load(open(configfile)) logger.info("Loaded configuration file: %s" % configfile) src_root = self.config.get("src_root", "/") if os.path.isabs(src_root): self.src_root = src_root logger.info("Source root directory: %s" % self.src_root) else: raise ValueError("Source root must be an absolute path") self.syspath = syspath.union(self.config.get("syspath", [])) logger.info("Protected system paths: {0}".format(self.syspath)) dest_root = os.path.expanduser(self.config["dest_root"]) logger.info("Check backup destination against protected paths ...") self.dest_root = self.check_dest(dest_root) logger.info("Backup destination: %s" % self.dest_root) self.dryrun = dryrun logger.info("Dry run mode: %s" % dryrun) self.debug = debug logger.info("Show DEBUG information: %s" % debug)
def load_schema(schema_file=None): schema_file = schema_file or get_configuration().schema_loc schema_file = os.path.abspath(schema_file) schema_cache = getattr(load_schema, '_schemas', {}) if schema_file in schema_cache: return schema_cache[schema_file] with open(schema_file, 'r') as sf: schema = yaml.safe_load(sf) if 'tables' not in schema: raise ValueError('Tables list missing from schema.') if 'types' not in schema: raise ValueError('Types missing from schema.') schema_cache[schema_file] = schema return schema_cache[schema_file]
def get_parsed_context(context_arg): """Parse input context string and returns context as dictionary.""" assert context_arg, ("pipeline must be invoked with --context set. For " "this yaml parser you're looking for something " "like --context './myyamlfile.yaml'") logger.debug("starting") logger.debug(f"attempting to open file: {context_arg}") with open(context_arg) as yaml_file: payload = yaml.safe_load(yaml_file) logger.debug(f"yaml file parsed. Count: {len(payload)}") if not isinstance(payload, MutableMapping): raise TypeError("yaml input should describe a dictionary at the top " "level. You should have something like " "\n'key1: value1'\n key2: value2'\n" "in the yaml top-level, not \n'- value1\n - value2'") logger.debug("done") return payload
def prepare_git_user(self): """ Tries to read the git name and email, so all git commits have correct author. Requests /api/user-git to check which user is behind the current configured ssh key. """ import aetros.api try: response = aetros.api.request('user-git') if response: user = yaml.safe_load(response) self.git_name = user['name'] self.git_email = user['email'] else: self.go_offline() except ApiConnectionError as e: self.go_offline()
def load_server(self, server_id) -> bool: if not os.path.exists("data/{}".format(server_id)): return False log.debug("Loading server: {}".format(server_id)) config = yaml.safe_load(open("data/{}/config.yml".format(server_id), "r")) sections = yaml.safe_load(open("data/{}/sections.yml".format(server_id), "r")) if os.path.exists("data/{}/notes.yml".format(server_id)): notes = yaml.safe_load(open("data/{}/notes.yml".format(server_id), "r")) else: notes = DEFAULT_NOTES if "notes_channel" not in config: config["notes_channel"] = None self.data[server_id] = { "config": config, "sections": self.load_sections(sections) } self.notes[server_id] = notes return True
def _update_from_file(self, filename): """ Helper method to update an existing configuration with the values from a file. Loads a configuration file and replaces all values in the existing configuration dictionary with the values from the file. Args: filename (str): The path and name to the configuration file. """ if os.path.exists(filename): try: with open(filename, 'r') as config_file: yaml_dict = yaml.safe_load(config_file.read()) if yaml_dict is not None: self._update_dict(self._config, yaml_dict) except IsADirectoryError: raise ConfigLoadError( 'The specified configuration file is a directory not a file') else: raise ConfigLoadError('The config file {} does not exist'.format(filename))
def initialize_kinto(loop, kinto_client, bucket, collection): """ Initialize the remote server with the initialization.yml file. """ # Leverage kinto-wizard async client. thread_pool = ThreadPoolExecutor() async_client = AsyncKintoClient(kinto_client, loop, thread_pool) initialization_manifest = pkgutil.get_data('buildhub', 'initialization.yml') config = yaml.safe_load(initialization_manifest) # Check that we push the records at the right place. if bucket not in config: raise ValueError(f"Bucket '{bucket}' not specified in `initialization.yml`.") if collection not in config[bucket]['collections']: raise ValueError(f"Collection '{collection}' not specified in `initialization.yml`.") await initialize_server(async_client, config, bucket=bucket, collection=collection, force=False)
def setup_logging( default_path='./parameter/logger.yml', default_level=logging.INFO, env_key='LOG_CFG' ): """Setup logging configuration """ path = os.path.abspath(default_path) value = os.getenv(env_key, None) if value: path = value if os.path.exists(os.path.abspath(path)): with open(path, 'rt') as f: config = yaml.safe_load(f.read()) logging.config.dictConfig(config) else: logging.basicConfig(level=default_level)
def task_example(): ''' cp|strip config.yml -> config.yml.example ''' apikey = '82_CHAR_APIKEY' punch = fmt(''' authorities: digicert: apikey: {apikey} destinations: zeus: apikey: {apikey} ''') return { 'actions': [ fmt('cp {CONFIG_YML}.example {CONFIG_YML}.bak'), fmt('cp {CONFIG_YML} {CONFIG_YML}.example'), lambda: _update_config(CONFIG_YML+'.example', yaml.safe_load(punch)), ], }
def unbundle(dirpath, cert_name): key, csr, crt, yml, readme = [None] * 5 tarpath = fmt('{dirpath}/{cert_name}.tar.gz') with tarfile.open(tarpath, 'r:gz') as tar: for info in tar.getmembers(): if info.name.endswith('.key'): key = tar.extractfile(info.name).read().decode('utf-8') elif info.name.endswith('.csr'): csr = tar.extractfile(info.name).read().decode('utf-8') elif info.name.endswith('.crt'): crt = tar.extractfile(info.name).read().decode('utf-8') elif info.name.endswith('.yml'): yml = tar.extractfile(info.name).read().decode('utf-8') yml = yaml.safe_load(yml) elif info.name == 'README': readme = tar.extractfile(info.name).read().decode('utf-8') return key, csr, crt, yml, readme
def _fixup(obj): if isinstance(obj, dict): d = deepcopy(obj) for k,v in obj.items(): if isinstance(v, str): if 'url' in k: d[k] = URL(v) elif 'path' in k: d[k] = Path(v) elif 'auth' == k: with open(fmt('{CONFIG_DIR}/{v}'), 'r') as f: d[k] = yaml.safe_load(f.read()) elif isinstance(v, dict): d[k] = _fixup(v) return d return obj
def run_step(context): """Fetch a json file from s3 and put the json values into context. Args: - context: pypyr.context.Context. Mandatory. Should contain keys for: - s3Fetch: dict. mandatory. Must contain: - Bucket: string. s3 bucket name. - Key: string. s3 key name. json parsed from the s3 file will be merged into the context. This will overwrite existing values if the same keys are already in there. I.e if s3 json has {'eggs' : 'boiled'} and context {'eggs': 'fried'} already exists, returned context['eggs'] will be 'boiled'. """ logger.debug("started") response = pypyraws.aws.s3.get_payload(context) payload = yaml.safe_load(response) logger.debug("successfully parsed yaml from s3 response bytes") context.update(payload) logger.info("loaded s3 yaml into pypyr context") logger.debug("done")
def load_yaml(filename): with open(filename) as myfile: content = myfile.read() if "win" in sys.platform: content = content.replace("\\", "/") return yaml.safe_load(content) # myfile.read())
def load(self, path_cfg: str): with open(path_cfg, 'r') as stream: try: self.cfg_dict = yaml3ed.safe_load(stream) except yaml3ed.YAMLError as exc: print(exc) self.check() self.zbx = ZabbixAgent(self.cfg_dict['zabbix']['url'], self.cfg_dict['zabbix']['login'], self.cfg_dict['zabbix']['password']) log.debug('Config loaded')
def test_package_metadata_config_gen_task(self): task = package_metadata_tasks.PackageMetadataConfigGenTask() repo_root = os.path.abspath('.') package_dependencies_yaml = os.path.join( repo_root, 'test/testdata/googleapis_test/gapic/packaging/dependencies.yaml') package_defaults_yaml = os.path.join( repo_root, 'test/testdata/googleapis_test/gapic/packaging/api_defaults.yaml') task.execute( api_name='fake', api_version='v1', gapic_api_yaml=[], language='python', local_paths={ 'googleapis': '%s/googleapis' % repo_root, 'reporoot': repo_root, }, organization_name='google-cloud', output_dir=str(self.output_dir), package_dependencies_yaml=package_dependencies_yaml, package_defaults_yaml=package_defaults_yaml, proto_deps=['googleapis-common-protos'], package_type="grpc_client", src_proto_path=['path/to/protos'], generated_package_version={'lower': '0.17.29', 'upper': '0.18dev'}, release_level='beta' ) with open(os.path.join(str(self.output_dir), 'google-cloud-fake-v1_package.yaml')) as f: actual = yaml.safe_load(f) with open('test/testdata/google-cloud-fake-v1_package.yaml') as f: expected = yaml.safe_load(f) # Don't compare files directly because yaml doesn't preserve ordering self.assertDictEqual(actual, expected)
def load(self, data): self.data = yload(data)
def read_yaml_cases(config, open): for case in yaml.safe_load(config)['cases']: time = TIME_RE.fullmatch(case['time']) if not time: raise FormatError(case['time'], 'error parsing time') memory = MEMORY_RE.fullmatch(case['memory']) if not memory: raise FormatError(case['memory'], 'error parsing memory') yield DefaultCase( partial(open, case['input']), partial(open, case['output']), int(float(time.group(1)) * TIME_UNITS[time.group(2)]), int(float(memory.group(1)) * MEMORY_UNITS[memory.group(2)]), int(case['score']))
def load_table(table_loc, table_type): """ Loads a table of type ``table_type`` from the YAML file ``table_loc``. """ with open(table_loc, 'r') as yf: table_file = yaml.safe_load(yf) assert table_file['db_version'] == DB_VERSION table_list = table_file['data'] raw_table = (table_type(**params) for params in table_list) table_by_id = {x.id: x for x in raw_table} return table_by_id
def from_file(cls, file_loc, **kwargs): if not os.path.exists(file_loc): raise IOError('File not found: {}'.format(file_loc)) with open(file_loc, 'r') as yf: config = yaml.safe_load(yf) config.update(kwargs) return cls(config_loc_=file_loc, **config)
def from_string(cls, src): return cls.from_kwargs(None, **yaml.safe_load(src, yaml.RoundTripLoader))
def load(): def _load(yaml_string): return ryaml.safe_load(dedent(yaml_string)) return _load
def from_filename(cls, filename: str) -> 'ShanghaiConfiguration': with open(filename, 'r', encoding='utf-8') as f: yaml_config = ryaml.safe_load(f) return cls(yaml_config)
def __init__(self, *args, **kwargs): self._defs = True super(Definition, self).__init__(*args, **kwargs) self.data = pkg_resources.resource_string(__name__, "provisioning.yaml") self._parsed_defs = yaml.safe_load(self.data) self.value = '_NS/provisioning/definitions'
def get_parsed_defs(self): if self._parsed_defs: return self._parsed_defs self._parsed_defs = yaml.safe_load(self.data) return self._parsed_defs
def __init__(self, *args, **kwargs): self._defs = {} super(Definition, self).__init__(*args, **kwargs) self.data = pkg_resources.resource_string(__name__, "node_agent.yaml") self._parsed_defs = yaml.safe_load(self.data) self.value = '_NS/node_agent/definitions'
def __init__(self, *args, **kwargs): self._defs = {} super(CompiledDefinitions, self).__init__(*args, **kwargs) self.data = definitions.data self._parsed_defs = yaml.safe_load(self.data) self.value = '_NS/node_agent/compiled_definitions'
def __init__(self, *args, **kwargs): self._defs = {} super(Definition, self).__init__(*args, **kwargs) self.data = pkg_resources.resource_string(__name__, "gluster.yaml") self._parsed_defs = yaml.safe_load(self.data) self.value = '_NS/integrations/gluster/definitions'
def __init__(self, *args, **kwargs): self._defs = {} super(Definition, self).__init__(*args, **kwargs) self.data = pkg_resources.resource_string(__name__, "ceph.yaml") self._parsed_defs = yaml.safe_load(self.data) self.value = '_NS/integrations/ceph/definitions'
def run_step(context): """Loads a yaml file into the pypyr context. Yaml parsed from the file will be merged into the pypyr context. This will overwrite existing values if the same keys are already in there. I.e if file yaml has {'eggs' : 'boiled'} and context {'eggs': 'fried'} already exists, returned context['eggs'] will be 'boiled'. Args: context: pypyr.context.Context. Mandatory. The following context key must exist - fetchYamlPath. path-like. Path to file on disk. Returns: None. updates context arg. Raises: FileNotFoundError: take a guess pypyr.errors.KeyNotInContextError: fetchYamlPath missing in context. pypyr.errors.KeyInContextHasNoValueError: fetchYamlPath exists but is None. """ logger.debug("started") context.assert_key_has_value(key='fetchYamlPath', caller=__name__) file_path = context.get_formatted('fetchYamlPath') logger.debug(f"attempting to open file: {file_path}") with open(file_path) as yaml_file: payload = yaml.safe_load(yaml_file) if not isinstance(payload, MutableMapping): raise TypeError("yaml input should describe a dictionary at the top " "level. You should have something like " "\n'key1: value1'\n key2: value2'\n" "in the yaml top-level, not \n'- value1\n - value2'") logger.debug("yaml file loaded. Merging into pypyr context. . .") context.update(payload) logger.info(f"yaml file merged into pypyr context. Count: {len(payload)}") logger.debug("done")
def render_yaml_file(filename): with open(os.path.join(HERE, "..", filename)) as stream: content = yaml.safe_load(stream) return web.json_response(content)
def check_yaml_resource(cli, url, filename): with open(os.path.join(HERE, "..", "pollbot", filename)) as stream: content = yaml.safe_load(stream) resp = await cli.get(url) assert await resp.json() == content
def test_oas_spec(): with open(os.path.join(HERE, "..", "pollbot", "api.yaml"), 'r') as stream: oas_spec = yaml.safe_load(stream) # example for swagger spec v2.0 validate_spec(oas_spec)
def read_config(path = 'aetros.yml', logger=None): path = os.path.normpath(os.path.expanduser(path)) config = { 'dockerfile': None, 'command': None, 'install': None, 'ignore': None, 'image': None, 'server': None, 'parameters': {}, 'servers': None, 'before_command': [], } if os.path.exists(path): f = open(path, 'r') custom_config = yaml.safe_load(f) if custom_config is None: custom_config = {} if 'storage_dir' in custom_config: del custom_config['storage_dir'] config.update(custom_config) logger and logger.debug('Config loaded from ' + os.path.realpath(path)) if 'parameters' not in config: config['parameters'] = {} return config
def main(self, args): import aetros.const parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter, prog=aetros.const.__prog__ + ' run') parser.add_argument('name', nargs='?', help="Model name") parser.add_argument('--private', action='store_true', help="Make the model private. Example: aetros init my-model --private") home_config = read_home_config() parsed_args = parser.parse_args(args) if not parsed_args.name: parser.print_help() sys.exit(1) if os.path.exists('aetros.yml'): config = yaml.safe_load(open('aetros.yml', 'r')) if isinstance(config, dict) and 'model' in config: print("failed: aetros.yml already exists with a linked model to " + config['model']) sys.exit(1) name = api.create_model(parsed_args.name or (os.path.basename(os.getcwd())), parsed_args.private) with open('aetros.yml', 'w') as f: f.write('model: ' + name) print("aetros.yml created linked with model " + name + ' in ' + os.getcwd()) print("Open AETROS Trainer to see the model at https://" + home_config['host'] + '/model/' + name)
def load_config(self, key: str) -> dict: try: data = pkg_resources.resource_string(self.load_from, 'whither.yml').decode('utf-8') data = self._filter_data(key, data) config = yaml.safe_load(data) config = config[key] except Exception: data = open(self.load_from, 'r').read() data = self._filter_data(key, data) config = yaml.safe_load(data) config = config[key] return {key: value for key, value in config.items()}
def load(path: str) -> Dict: with open(path, 'rt') as file: config = yaml_load(file.read()) validate(config) return config
def setup_logging() -> None: with open('log_config.yaml', 'r') as file: config_dict = yaml_load(file.read()) logging.config.dictConfig(config_dict)
def __init__(self, *args, **kwargs): self._defs = {} super(Definition, self).__init__(*args, **kwargs) self.data = pkg_resources.resource_string( __name__, "monitoring_integration.yaml" ) self._parsed_defs = yaml.safe_load(self.data) self.value = "_NS/monitoring/definitions"
def __init__(self, *, loop=None, **options): super().__init__(loop=loop, **options) self.banned_ids = [] self.config = yaml.safe_load(open("config.yml", "r")) self.data_manager = DataManager()
def load_yaml(self, template_file): with open(template_file) as f: try: return yaml.safe_load(f) except yaml.YAMLError as e: print(e) return []
def set_to_default(self): """ Overwrite the configuration with the default configuration. """ self._config = yaml.safe_load(self.default())
def task_config(): ''' write config.yml -> .config.yml ''' log_level = 'WARNING' filename = '{0}/LOG_LEVEL'.format(os.path.dirname(__file__)) if os.path.isfile(filename): log_level = open(filename).read().strip() log_level = get_var('LOG_LEVEL', log_level) if log_level not in LOG_LEVELS: raise UnknownLogLevelError(log_level) punch = fmt(''' logging: loggers: api: level: {log_level} handlers: console: level: {log_level} ''') return { 'actions': [ fmt('echo "cp {CONFIG_YML}\n-> {DOT_CONFIG_YML}"'), fmt('echo "setting LOG_LEVEL={log_level}"'), fmt('cp {CONFIG_YML} {DOT_CONFIG_YML}'), lambda: _update_config(DOT_CONFIG_YML, yaml.safe_load(punch)), ] }
def _load_config(cfgs): config = {} for cfg in cfgs: cfg = os.path.expanduser(cfg) if os.path.isfile(cfg): with open(cfg, 'r') as f: yml = yaml.safe_load(f) if yml: config.update(yml) return AttrDict(config)
def _load_config(filename=DOT_CONFIG_YML, roundtrip=False, fixup=True): cfg = {} if os.path.isfile(filename): try: with open(filename, 'r') as f: if roundtrip: cfg = yaml.round_trip_load(f.read()) else: cfg = yaml.safe_load(f.read()) if fixup: cfg = _fixup(cfg) except Exception as ex: print('ex =', ex) raise ConfigLoadError(filename, errors=[ex]) return AttrDict(cfg)