我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用yaml.safe_load()。
def get_config(cls): # FIXME: Replace this as soon as we have a config module config = {} # Try to get baidu_yuyin config from config profile_path = dingdangpath.config('profile.yml') if os.path.exists(profile_path): with open(profile_path, 'r') as f: profile = yaml.safe_load(f) if 'baidu_yuyin' in profile: if 'api_key' in profile['baidu_yuyin']: config['api_key'] = \ profile['baidu_yuyin']['api_key'] if 'secret_key' in profile['baidu_yuyin']: config['secret_key'] = \ profile['baidu_yuyin']['secret_key'] return config
def load_config(): ''' Walk backwords from __file__ looking for config.yaml, load and return the 'options' section' ''' config = None f = __file__ while config is None: d = os.path.dirname(f) if os.path.isfile(os.path.join(d, 'config.yaml')): config = os.path.join(d, 'config.yaml') break f = d if not config: logging.error('Could not find config.yaml in any parent directory ' 'of %s. ' % file) raise Exception return yaml.safe_load(open(config).read())['options']
def create(sysctl_dict, sysctl_file): """Creates a sysctl.conf file from a YAML associative array :param sysctl_dict: a YAML-formatted string of sysctl options eg "{ 'kernel.max_pid': 1337 }" :type sysctl_dict: str :param sysctl_file: path to the sysctl file to be saved :type sysctl_file: str or unicode :returns: None """ try: sysctl_dict_parsed = yaml.safe_load(sysctl_dict) except yaml.YAMLError: log("Error parsing YAML sysctl_dict: {}".format(sysctl_dict), level=ERROR) return with open(sysctl_file, "w") as fd: for key, value in sysctl_dict_parsed.items(): fd.write("{}={}\n".format(key, value)) log("Updating sysctl_file: %s values: %s" % (sysctl_file, sysctl_dict_parsed), level=DEBUG) check_call(["sysctl", "-p", sysctl_file])
def load_config_settings(logger, path_to_config_file): """ Load config settings from config file :param logger: the logger :param path_to_config_file: location of config file :return: settings dictionary containing values for: api_token, export_path, timezone, export_profiles, filename_item_id, sync_delay_in_seconds loaded from config file, media_sync_offset_in_seconds """ config_settings = yaml.safe_load(open(path_to_config_file)) settings = { API_TOKEN: load_setting_api_access_token(logger, config_settings), EXPORT_PATH: load_setting_export_path(logger, config_settings), TIMEZONE: load_setting_export_timezone(logger, config_settings), EXPORT_PROFILES: load_setting_export_profile_mapping(logger, config_settings), FILENAME_ITEM_ID: get_filename_item_id(logger, config_settings), SYNC_DELAY_IN_SECONDS: load_setting_sync_delay(logger, config_settings), EXPORT_INACTIVE_ITEMS_TO_CSV: load_export_inactive_items_to_csv(logger, config_settings), MEDIA_SYNC_OFFSET_IN_SECONDS: load_setting_media_sync_offset(logger, config_settings) } return settings
def load_config_settings(logger, path_to_config_file): """ Load config settings from config file :param logger: the logger :param path_to_config_file: location of config file :return: settings dictionary containing values for: api_token, input_filename """ config_settings = yaml.safe_load(open(path_to_config_file)) settings = { 'api_token': load_setting_api_access_token(logger, config_settings), 'input_filename': load_setting_input_filename(logger, config_settings), } return settings
def __init__(self): ''' Initialize. ''' homedir = os.environ.get("HOME", None) if homedir is None: print "ERROR: Home ENV is not set" return self.__orca_config = os.path.join(homedir, ".aws/orcaenv.yaml") try: fp = open(self.__orca_config) except IOError as ioerr: print "ERROR: Failed to open [%s] [%s]" % \ (self.__orca_config, ioerr) try: self.parsedyaml = yaml.safe_load(fp) except yaml.error.YAMLError as yamlerr: print "ERROR: Yaml parsing failed [file: %s] [%s]" % \ (self.__orca_config, yamlerr) return
def _get_top_data(topfile): ''' Helper method to retrieve and parse the nova topfile ''' topfile = os.path.join(_hubble_dir()[1], topfile) try: with open(topfile) as handle: topdata = yaml.safe_load(handle) except Exception as e: raise CommandExecutionError('Could not load topfile: {0}'.format(e)) if not isinstance(topdata, dict) or 'nova' not in topdata or \ not(isinstance(topdata['nova'], dict)): raise CommandExecutionError('Nova topfile not formatted correctly') topdata = topdata['nova'] ret = [] for match, data in topdata.iteritems(): if __salt__['match.compound'](match): ret.extend(data) return ret
def _metadata_unit(unit): """Given the name of a unit (e.g. apache2/0), get the unit charm's metadata.yaml. Very similar to metadata() but allows us to inspect other units. Unit needs to be co-located, such as a subordinate or principal/primary. :returns: metadata.yaml as a python object. """ basedir = os.sep.join(charm_dir().split(os.sep)[:-2]) unitdir = 'unit-{}'.format(unit.replace(os.sep, '-')) joineddir = os.path.join(basedir, unitdir, 'charm', 'metadata.yaml') if not os.path.exists(joineddir): return None with open(joineddir) as md: return yaml.safe_load(md)
def main(): # Read the configuration file config_file = os.path.join(rumps.application_support(APP_TITLE), 'config.yaml') with open(config_file, 'r') as conf: try: config = yaml.safe_load(conf) except yaml.YAMLError as exc: print(exc) return # Setup our CTRL+C signal handler signal.signal(signal.SIGINT, _signal_handler) # Enable debug mode rumps.debug_mode(config['debug']) # Startup application SlackStatusBarApp(config).run()
def from_yaml(self, config_file): """Yaml config getter function. Reads a yaml flask config file and generates a config dictionary out of it that flask and aniping can both understand. Args: config_file (str): the path of the config file to load. Can be relative or absolute. """ env = os.environ.get('FLASK_ENV', 'development') self['ENVIRONMENT'] = env.lower() with open(config_file) as f: config = yaml.safe_load(f) config = config.get(env.upper(), config) for key in config.keys(): if key.isupper(): self[key] = config[key]
def convert(self, common): """ Process all hardware profiles """ for pathname in common.keys(): for filename in common[pathname]: if 'profile-' in filename: with open(filename, "r") as yml: content = yaml.safe_load(yml) migrated = _migrate(content, filename) newfilename = re.sub('profile-', 'migrated-profile-', filename) path_dir = os.path.dirname(newfilename) _create_dirs(path_dir, self.pillar_dir) with open(newfilename, "w") as yml: yml.write(yaml.dump(migrated, Dumper=self.friendly_dumper, default_flow_style=False))
def roles(): """ Remove the roles from the cluster/*.sls files """ # Keep yaml human readable/editable friendly_dumper = yaml.SafeDumper friendly_dumper.ignore_aliases = lambda self, data: True cluster_dir = '/srv/pillar/ceph/cluster' for filename in os.listdir(cluster_dir): pathname = "{}/{}".format(cluster_dir, filename) content = None with open(pathname, "r") as sls_file: content = yaml.safe_load(sls_file) log.info("content {}".format(content)) if 'roles' in content: content.pop('roles') with open(pathname, "w") as sls_file: sls_file.write(yaml.dump(content, Dumper=friendly_dumper, default_flow_style=False))
def default(): """ Remove the .../stack/defaults directory. Preserve available_roles """ # Keep yaml human readable/editable friendly_dumper = yaml.SafeDumper friendly_dumper.ignore_aliases = lambda self, data: True preserve = {} content = None pathname = "/srv/pillar/ceph/stack/default/{}/cluster.yml".format('ceph') with open(pathname, "r") as sls_file: content = yaml.safe_load(sls_file) preserve['available_roles'] = content['available_roles'] stack_default = "/srv/pillar/ceph/stack/default" shutil.rmtree(stack_default) os.makedirs("{}/{}".format(stack_default, 'ceph')) with open(pathname, "w") as sls_file: sls_file.write(yaml.dump(preserve, Dumper=friendly_dumper, default_flow_style=False)) uid = pwd.getpwnam("salt").pw_uid gid = grp.getgrnam("salt").gr_gid for path in [stack_default, "{}/{}".format(stack_default, 'ceph'), pathname]: os.chown(path, uid, gid)
def return_config_overrides_yaml(self, config_overrides, resultant, list_extend=True, ignore_none_type=True): """Return config yaml. :param config_overrides: ``dict`` :param resultant: ``str`` || ``unicode`` :returns: ``str`` """ original_resultant = yaml.safe_load(resultant) merged_resultant = self._merge_dict( base_items=original_resultant, new_items=config_overrides, list_extend=list_extend ) return yaml.dump( merged_resultant, Dumper=IDumper, default_flow_style=False, width=1000, )
def get_config(): """ This load some configuration from the ``.travis.yml``, if file is present, ``doctr`` key if present. """ p = Path('.travis.yml') if not p.exists(): return {} with p.open() as f: travis_config = yaml.safe_load(f.read()) config = travis_config.get('doctr', {}) if not isinstance(config, dict): raise ValueError('config is not a dict: {}'.format(config)) return config
def load_config(): """Loads config. Walk backwords from __file__ looking for config.yaml, load and return the 'options' section' """ config = None f = __file__ while config is None: d = os.path.dirname(f) if os.path.isfile(os.path.join(d, 'config.yaml')): config = os.path.join(d, 'config.yaml') break f = d if not config: logging.error('Could not find config.yaml in any parent directory ' 'of %s. ' % file) raise Exception return yaml.safe_load(open(config).read())['options']
def get_config(parameter, file_path): """ Get config parameter. Returns the value of a given parameter in file.yaml parameter must be given in string format with dots Example: general.openstack.image_name """ with open(file_path) as config_file: file_yaml = yaml.safe_load(config_file) config_file.close() value = file_yaml for element in parameter.split("."): value = value.get(element) if value is None: raise ValueError("The parameter %s is not defined in" " reporting.yaml" % parameter) return value
def get_config(parameter, file_path): """ Get config parameter. Returns the value of a given parameter in file.yaml parameter must be given in string format with dots Example: general.openstack.image_name """ with open(file_path) as config_file: file_yaml = yaml.safe_load(config_file) config_file.close() value = file_yaml for element in parameter.split("."): value = value.get(element) if value is None: raise ValueError("The parameter %s is not defined in" " reporting.yaml", parameter) return value
def __init__(self, util_info): self.logger.debug("init test exec") self.util = Utilvnf() credentials = util_info["credentials"] self.vnf_ctrl = VnfController(util_info) test_cmd_map_file = open(self.util.vnf_data_dir + self.util.opnfv_vnf_data_dir + self.util.command_template_dir + self.util.test_cmd_map_yaml_file, 'r') self.test_cmd_map_yaml = yaml.safe_load(test_cmd_map_file) test_cmd_map_file.close() self.util.set_credentials(credentials["snaps_creds"]) with open(self.util.test_env_config_yaml) as file_fd: test_env_config_yaml = yaml.safe_load(file_fd) file_fd.close() self.protocol_stable_wait = test_env_config_yaml.get("general").get( "protocol_stable_wait")
def config_vnf(self, source_vnf, destination_vnf, test_cmd_file_path, parameter_file_path, prompt_file_path): parameter_file = open(parameter_file_path, 'r') cmd_input_param = yaml.safe_load(parameter_file) parameter_file.close() cmd_input_param["macaddress"] = source_vnf["data_plane_network_mac"] cmd_input_param["source_ip"] = source_vnf["data_plane_network_ip"] cmd_input_param["destination_ip"] = destination_vnf[ "data_plane_network_ip"] return self.vm_controller.config_vm(source_vnf, test_cmd_file_path, cmd_input_param, prompt_file_path)
def __init__(self, ip_address, user, password=None, key_filename=None): self.ip_address = ip_address self.user = user self.password = password self.key_filename = key_filename self.connected = False self.shell = None self.logger.setLevel(logging.INFO) self.ssh = paramiko.SSHClient() self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) self.util = Utilvnf() with open(self.util.test_env_config_yaml) as file_fd: test_env_config_yaml = yaml.safe_load(file_fd) file_fd.close() self.ssh_revieve_buff = test_env_config_yaml.get("general").get( "ssh_receive_buffer")
def __init__(self, util_info): self.logger.debug("initialize vm controller") self.command_gen = CommandGenerator() credentials = util_info["credentials"] self.util = Utilvnf() self.util.set_credentials(credentials["snaps_creds"]) with open(self.util.test_env_config_yaml) as file_fd: test_env_config_yaml = yaml.safe_load(file_fd) file_fd.close() self.reboot_wait = test_env_config_yaml.get("general").get( "reboot_wait") self.command_wait = test_env_config_yaml.get("general").get( "command_wait") self.ssh_connect_timeout = test_env_config_yaml.get("general").get( "ssh_connect_timeout") self.ssh_connect_retry_count = test_env_config_yaml.get("general").get( "ssh_connect_retry_count")
def get_config(parameter, file_path): """ Returns the value of a given parameter in file.yaml parameter must be given in string format with dots Example: general.openstack.image_name """ with open(file_path) as config_file: file_yaml = yaml.safe_load(config_file) config_file.close() value = file_yaml for element in parameter.split("."): value = value.get(element) if value is None: raise ValueError("The parameter %s is not defined in" " reporting.yaml" % parameter) return value
def excl_scenario(): """Exclude scenario.""" black_tests = [] try: with open(RallyBase.BLACKLIST_FILE, 'r') as black_list_file: black_list_yaml = yaml.safe_load(black_list_file) installer_type = CONST.__getattribute__('INSTALLER_TYPE') deploy_scenario = CONST.__getattribute__('DEPLOY_SCENARIO') if (bool(installer_type) and bool(deploy_scenario) and 'scenario' in black_list_yaml.keys()): for item in black_list_yaml['scenario']: scenarios = item['scenarios'] installers = item['installers'] in_it = RallyBase.in_iterable_re if (in_it(deploy_scenario, scenarios) and in_it(installer_type, installers)): tests = item['tests'] black_tests.extend(tests) except Exception: LOGGER.debug("Scenario exclusion not applied.") return black_tests
def excl_func(self): """Exclude functionalities.""" black_tests = [] func_list = [] try: with open(RallyBase.BLACKLIST_FILE, 'r') as black_list_file: black_list_yaml = yaml.safe_load(black_list_file) if not self._migration_supported(): func_list.append("no_migration") if 'functionality' in black_list_yaml.keys(): for item in black_list_yaml['functionality']: functions = item['functions'] for func in func_list: if func in functions: tests = item['tests'] black_tests.extend(tests) except Exception: # pylint: disable=broad-except LOGGER.debug("Functionality exclusion not applied.") return black_tests
def get_parameter_from_yaml(parameter, file): """ Returns the value of a given parameter in file.yaml parameter must be given in string format with dots Example: general.openstack.image_name """ with open(file) as f: file_yaml = yaml.safe_load(f) f.close() value = file_yaml for element in parameter.split("."): value = value.get(element) if value is None: raise ValueError("The parameter %s is not defined in" " %s" % (parameter, file)) return value
def _get_user_provided_overrides(modules): """Load user-provided config overrides. :param modules: stack modules to lookup in user overrides yaml file. :returns: overrides dictionary. """ overrides = os.path.join(os.environ['JUJU_CHARM_DIR'], 'hardening.yaml') if os.path.exists(overrides): log("Found user-provided config overrides file '%s'" % (overrides), level=DEBUG) settings = yaml.safe_load(open(overrides)) if settings and settings.get(modules): log("Applying '%s' overrides" % (modules), level=DEBUG) return settings.get(modules) log("No overrides found for '%s'" % (modules), level=DEBUG) else: log("No hardening config overrides file '%s' found in charm " "root dir" % (overrides), level=DEBUG) return {}
def load_config(): '''Walk backwords from __file__ looking for config.yaml, load and return the 'options' section' ''' config = None f = __file__ while config is None: d = os.path.dirname(f) if os.path.isfile(os.path.join(d, 'config.yaml')): config = os.path.join(d, 'config.yaml') break f = d if not config: logging.error('Could not find config.yaml in any parent directory ' 'of %s. ' % file) raise Exception with open(config) as f: return yaml.safe_load(f)['options']
def load_config(): ''' Walk backwords from __file__ looking for config.yaml, load and return the 'options' section' ''' config = None f = __file__ while config is None: d = os.path.dirname(f) if os.path.isfile(os.path.join(d, 'config.yaml')): config = os.path.join(d, 'config.yaml') break f = d if not config: logging.error('Could not find config.yaml in any parent directory ' 'of %s. ' % f) raise Exception return yaml.safe_load(open(config).read())['options']