我们从Python开源项目中,提取了以下48个代码示例,用于说明如何使用yaml.dump()。
def hugepage_support(user, group='hugetlb', nr_hugepages=256, max_map_count=65536, mnt_point='/run/hugepages/kvm', pagesize='2MB', mount=True, set_shmmax=False): """Enable hugepages on system. Args: user (str) -- Username to allow access to hugepages to group (str) -- Group name to own hugepages nr_hugepages (int) -- Number of pages to reserve max_map_count (int) -- Number of Virtual Memory Areas a process can own mnt_point (str) -- Directory to mount hugepages on pagesize (str) -- Size of hugepages mount (bool) -- Whether to Mount hugepages """ group_info = add_group(group) gid = group_info.gr_gid add_user_to_group(user, group) if max_map_count < 2 * nr_hugepages: max_map_count = 2 * nr_hugepages sysctl_settings = { 'vm.nr_hugepages': nr_hugepages, 'vm.max_map_count': max_map_count, 'vm.hugetlb_shm_group': gid, } if set_shmmax: shmmax_current = int(check_output(['sysctl', '-n', 'kernel.shmmax'])) shmmax_minsize = bytes_from_string(pagesize) * nr_hugepages if shmmax_minsize > shmmax_current: sysctl_settings['kernel.shmmax'] = shmmax_minsize sysctl.create(yaml.dump(sysctl_settings), '/etc/sysctl.d/10-hugepage.conf') mkdir(mnt_point, owner='root', group='root', perms=0o755, force=False) lfstab = fstab.Fstab() fstab_entry = lfstab.get_entry_by_attr('mountpoint', mnt_point) if fstab_entry: lfstab.remove_entry(fstab_entry) entry = lfstab.Entry('nodev', mnt_point, 'hugetlbfs', 'mode=1770,gid={},pagesize={}'.format(gid, pagesize), 0, 0) lfstab.add_entry(entry) if mount: fstab_mount(mnt_point)
def retrieve(cont, filename): stream = file(filename, 'r') data = yaml.load(stream) #return yaml.dump(data, encoding=('utf-8'), default_flow_style=False, allow_unicode=True) return data[cont].encode('utf-8')
def internal_data(filename, io, entry, cont, cont_in = None, cont_in2 = None): #Supports up to 3 containers stacked on top. "filename = 'string',, io = [in, out],, entry = val,, cont,,..." stream = open(filename, 'r') prof = yaml.load(stream) if io == 'out': if cont_in == None: val = prof[cont] else: if cont_in2 == None: val = prof[cont][cont_in] else: val = prof[cont][cont_in][cont_in2] return val if io == 'in': if cont_in == None: prof[cont] = entry else: if cont_in2 == None: prof[cont][cont_in] = entry else: prof[cont][cont_in][cont_in2] = entry with open(filename, 'w') as yaml_file: yaml_file.write(yaml.dump(prof, default_flow_style = False))
def begin(self): variables = tf.contrib.framework.get_variables(scope=self.params["prefix"]) def varname_in_checkpoint(name): """Removes the prefix from the variable name. """ prefix_parts = self.params["prefix"].split("/") checkpoint_prefix = "/".join(prefix_parts[:-1]) return name.replace(checkpoint_prefix + "/", "") target_names = [varname_in_checkpoint(_.op.name) for _ in variables] restore_map = {k: v for k, v in zip(target_names, variables)} tf.logging.info("Restoring variables: \n%s", yaml.dump({k: v.op.name for k, v in restore_map.items()})) self._saver = tf.train.Saver(restore_map)
def test_invalid_form_data_unknown_url_name(self): data = { 'url': '/test/', 'alias': 'test-page', 'description': 'At vero eos et accusamus et iusto odio', 'keywords': 'lorem ipsum dolor sit amet', 'page_processor': 'powerpages.RedirectProcessor', 'page_processor_config': yaml.dump({ 'to name': 'not-existing-url' }), 'template': '<h1>{{ website_page.title }}</h1>\n', 'title': 'De Finibus Bonorum et Malorum' } form = PageAdminForm(data, instance=Page()) self.assertFalse(form.is_valid()) self.assertEqual(list(form.errors.keys()), ['__all__'])
def test_invalid_form_data_unknown_parent_template(self): # No parent page data = { 'url': '/test/', 'alias': 'test-page', 'description': 'At vero eos et accusamus et iusto odio', 'keywords': 'lorem ipsum dolor sit amet', 'page_processor': 'powerpages.DefaultPageProcessor', 'page_processor_config': yaml.dump({ 'base template': "this-template-does-not-exist.html" }), 'template': '<h1>{{ website_page.title }}</h1>\n', 'title': 'De Finibus Bonorum et Malorum' } form = PageAdminForm(data, instance=Page()) self.assertFalse(form.is_valid()) self.assertEqual(list(form.errors.keys()), ['__all__'])
def edit_mkdocs_config(self): """ Create mkdocs.yml file from metadata :return: Boolean indicating the success of the operation. """ mkdocs_yml = os.path.join(MKDOCS_DIR, "mkdocs.yml") cfg = dict( site_name=self._wiki_name, theme='readthedocs', docs_dir=self._wiki_name, site_dir=self._out_dir ) with open(mkdocs_yml, 'w') as outfile: yaml.dump(cfg, outfile, default_flow_style=False)
def set_device_yaml(): """ ???????Android version?????yaml? :return: """ device_lst = [] for device in get_device(): adb = lib.adbUtils.ADB(device) U.Logging.success( 'get device:{},Android version:{}'.format( device, adb.get_android_version())) device_lst.append({'platformVersion': adb.get_android_version( ), 'deviceName': device, 'platformName': 'Android'}) ini = U.ConfigIni() with open(ini.get_ini('test_device', 'device'), 'w') as f: yaml.dump(device_lst, f) f.close()
def __load_analysis(self): """ ???? ????: 1:????log 2:???? 3:?????? 4:?????? 5:?? :return: ???? """ U.Logging.success('read the yaml file') self.__save_android_log() error_msg = self.__analysis_yaml(self.path_yaml) with open(self.__save_error_status(), 'w') as f: yaml.dump({'error_msg': error_msg}, f) U.Logging.debug(str('results of the:%s' % error_msg)) f.close() return self.__save_screen_file()
def convert(self, common): """ Process all hardware profiles """ for pathname in common.keys(): for filename in common[pathname]: if 'profile-' in filename: with open(filename, "r") as yml: content = yaml.safe_load(yml) migrated = _migrate(content, filename) newfilename = re.sub('profile-', 'migrated-profile-', filename) path_dir = os.path.dirname(newfilename) _create_dirs(path_dir, self.pillar_dir) with open(newfilename, "w") as yml: yml.write(yaml.dump(migrated, Dumper=self.friendly_dumper, default_flow_style=False))
def _record_filter(args, base_dir): """ Save the filter provided """ filter_file = '{}/.filter'.format(base_dir) if not isfile(filter_file): # do a touch filter_file open(filter_file, 'a').close() current_filter = {} with open(filter_file) as filehandle: current_filter = yaml.load(filehandle) if current_filter is None: current_filter = {} pprint.pprint(current_filter) # filter a bunch of salt content and the target key before writing rec_args = {k: v for k, v in args.items() if k is not 'target' and not k.startswith('__')} current_filter[args['target']] = rec_args with open(filter_file, 'w') as filehandle: yaml.dump(current_filter, filehandle, default_flow_style=False)
def _update_grains(self, content, filename="/etc/salt/grains"): """ Update the yaml file without destroying other content """ log.info("Updating {}".format(filename)) # Keep yaml human readable/editable friendly_dumper = yaml.SafeDumper friendly_dumper.ignore_aliases = lambda self, data: True with open(filename, 'w') as minion_grains: minion_grains.write(yaml.dump(content, Dumper=friendly_dumper, default_flow_style=False)) log.info("Syncing grains") __salt__['saltutil.sync_grains']()
def roles(): """ Remove the roles from the cluster/*.sls files """ # Keep yaml human readable/editable friendly_dumper = yaml.SafeDumper friendly_dumper.ignore_aliases = lambda self, data: True cluster_dir = '/srv/pillar/ceph/cluster' for filename in os.listdir(cluster_dir): pathname = "{}/{}".format(cluster_dir, filename) content = None with open(pathname, "r") as sls_file: content = yaml.safe_load(sls_file) log.info("content {}".format(content)) if 'roles' in content: content.pop('roles') with open(pathname, "w") as sls_file: sls_file.write(yaml.dump(content, Dumper=friendly_dumper, default_flow_style=False))
def default(): """ Remove the .../stack/defaults directory. Preserve available_roles """ # Keep yaml human readable/editable friendly_dumper = yaml.SafeDumper friendly_dumper.ignore_aliases = lambda self, data: True preserve = {} content = None pathname = "/srv/pillar/ceph/stack/default/{}/cluster.yml".format('ceph') with open(pathname, "r") as sls_file: content = yaml.safe_load(sls_file) preserve['available_roles'] = content['available_roles'] stack_default = "/srv/pillar/ceph/stack/default" shutil.rmtree(stack_default) os.makedirs("{}/{}".format(stack_default, 'ceph')) with open(pathname, "w") as sls_file: sls_file.write(yaml.dump(preserve, Dumper=friendly_dumper, default_flow_style=False)) uid = pwd.getpwnam("salt").pw_uid gid = grp.getgrnam("salt").gr_gid for path in [stack_default, "{}/{}".format(stack_default, 'ceph'), pathname]: os.chown(path, uid, gid)
def init(banner, hidden, backup): """Initialize a manage shell in current directory $ manage init --banner="My awesome app shell" initializing manage... creating manage.yml """ manage_file = HIDDEN_MANAGE_FILE if hidden else MANAGE_FILE if os.path.exists(manage_file): if not click.confirm('Rewrite {0}?'.format(manage_file)): return if backup: bck = '.bck_{0}'.format(manage_file) with open(manage_file, 'r') as source, open(bck, 'w') as bck_file: bck_file.write(source.read()) with open(manage_file, 'w') as output: data = default_manage_dict if banner: data['shell']['banner']['message'] = banner output.write(yaml.dump(data, default_flow_style=False))
def save_parameters(stack, params): """saves parameters to disk""" # decode parameter dict params_dict = {} for param in params: params_dict[param['ParameterKey']] = param['ParameterValue'] stack_dir = path.join('stacks', stack) param_path = path.join(stack_dir, 'parameters.yaml') # ensure paths are present if not path.exists('stacks'): mkdir('stacks') if not path.exists(stack_dir): mkdir(stack_dir) with open(param_path, mode='w', encoding='utf-8') as file: file.write(yaml.dump(params_dict, default_flow_style=False, explicit_start=True))
def return_config_overrides_yaml(self, config_overrides, resultant, list_extend=True, ignore_none_type=True): """Return config yaml. :param config_overrides: ``dict`` :param resultant: ``str`` || ``unicode`` :returns: ``str`` """ original_resultant = yaml.safe_load(resultant) merged_resultant = self._merge_dict( base_items=original_resultant, new_items=config_overrides, list_extend=list_extend ) return yaml.dump( merged_resultant, Dumper=IDumper, default_flow_style=False, width=1000, )
def test_add_and_remove_config(self): self.init_with_remote_catalog() with self.captured_output() as (out, err): StateHolder.skip_docker = True poco = Poco(home_dir=self.tmpdir, argv=["catalog", "config", "add", "teszt", "ssh://teszt.teszt/teszt"]) poco.run() self.assertEqual(0, len(err.getvalue())) data = dict() data["teszt"] = dict() data["teszt"]["repositoryType"] = "git" data["teszt"]["server"] = "ssh://teszt.teszt/teszt" self.assertIn(yaml.dump(data, default_flow_style=False, default_style='', indent=4).strip(), out.getvalue().strip()) self.clean_states() with self.captured_output() as (out, err): StateHolder.skip_docker = True poco = Poco(home_dir=self.tmpdir, argv=["catalog", "config", "remove", "teszt"]) poco.run() self.assertEqual(0, len(err.getvalue())) self.assertNotIn("teszt", out.getvalue().strip())
def generate_test_accounts_file(tenant_id): """ Add needed tenant and user params into test_accounts.yaml """ logger.debug("Add needed params into test_accounts.yaml...") accounts_list = [ { 'tenant_name': CONST.__getattribute__('tempest_identity_tenant_name'), 'tenant_id': str(tenant_id), 'username': CONST.__getattribute__('tempest_identity_user_name'), 'password': CONST.__getattribute__('tempest_identity_user_password') } ] with open(TEST_ACCOUNTS_FILE, "w") as f: yaml.dump(accounts_list, f, default_flow_style=False)
def post(self): if self.current_user['level'] != 0: self.custom_error() settings = { 'init_money': int(self.get_body_argument('init_money')), 'reg_type': self.get_body_argument('reg_type'), 'cookie_secret': self.get_body_argument('cookie_secret') or self.settings['cookie_secret'], 'site': { 'name': self.get_body_argument('sitename'), 'keyword': self.get_body_argument('keyword'), 'description': self.get_body_argument('description') } } self.settings.update(settings) custom_settings = {} with open(self.settings['config_file'], 'r') as f: custom_settings = yaml.load(f) custom_settings['global'].update(settings) with open(self.settings['config_file'], 'w') as f: yaml.dump(custom_settings, f, default_flow_style=False, default_style='"') self.redirect('/ushio/setting')
def test_install_hook_git(self): self.git_install_requested.return_value = True self.determine_packages.return_value = ['foo', 'bar'] self.determine_ports.return_value = [80, 81, 82] repo = 'cloud:trusty-juno' openstack_origin_git = { 'repositories': [ {'name': 'requirements', 'repository': 'git://git.openstack.org/openstack/requirements', # noqa 'branch': 'stable/juno'}, {'name': 'nova', 'repository': 'git://git.openstack.org/openstack/nova', 'branch': 'stable/juno'} ], 'directory': '/mnt/openstack-git', } projects_yaml = yaml.dump(openstack_origin_git) self.test_config.set('openstack-origin', repo) self.test_config.set('openstack-origin-git', projects_yaml) hooks.install() self.git_install.assert_called_with(projects_yaml) self.apt_install.assert_called_with(['foo', 'bar'], fatal=True) self.assertTrue(self.execd_preinstall.called) self.assertTrue(self.disable_services.called) self.cmd_all_services.assert_called_with('stop')
def main(): parser = argparse.ArgumentParser() parser.add_argument('key', help='key name to be fetched ("." to fetch all)') parser.add_argument('conf', help='path to a config file') ns = parser.parse_args() try: val = fetch_value_from_file(ns.key, ns.conf) except KeyError: log('no such key: ' + ns.key) sys.exit(1) if isinstance(val, dict) or isinstance(val, list): print yaml.dump(val) else: print val
def to_yaml(self): """ Pretty print dump as YAML. """ return dump( self.to_safe_dict(), # show every document in its own block default_flow_style=False, # start a new document (via "---") before every resource explicit_start=True, # follow (modern) PEP8 max line length and indent width=99, indent=4, Dumper=SafeDumper, )
def test_basic_config(): fd, path = tempfile.mkstemp() f = os.fdopen(fd,'w') f.write(yaml.dump(testcfg)) f.flush() cfg = ny.get_config(path) ny.write_supervisor_conf() config = ConfigParser.ConfigParser() config.readfp(open(cfg['supervisor.conf'])) # from IPython import embed # embed() print(config.get('program:testtunnel','command')) assert 'sshuttle -r 1.1.1.1 2.2.2.2 -x 3.3.3.3' in config.get('program:testtunnel','command')
def fake_metta(matrix_dict, metadata): """Stores matrix and metadata in a metta-data-like form Args: matrix_dict (dict) of form { columns: values }. Expects an entity_id to be present which it will use as the index metadata (dict). Any metadata that should be set Yields: tuple of filenames for matrix and metadata """ matrix = pandas.DataFrame.from_dict(matrix_dict).set_index('entity_id') with tempfile.NamedTemporaryFile() as matrix_file: with tempfile.NamedTemporaryFile('w') as metadata_file: hdf = pandas.HDFStore(matrix_file.name) hdf.put('title', matrix, data_columns=True) matrix_file.seek(0) yaml.dump(metadata, metadata_file) metadata_file.seek(0) yield (matrix_file.name, metadata_file.name)
def fake_metta(matrix_dict, metadata): """Stores matrix and metadata in a metta-data-like form Args: matrix_dict (dict) of form { columns: values }. Expects an entity_id to be present which it will use as the index metadata (dict). Any metadata that should be set Yields: tuple of filenames for matrix and metadata """ matrix = pd.DataFrame.from_dict(matrix_dict).set_index('entity_id') with tempfile.NamedTemporaryFile() as matrix_file: with tempfile.NamedTemporaryFile('w') as metadata_file: hdf = pd.HDFStore(matrix_file.name) hdf.put('title', matrix, data_columns=True) matrix_file.seek(0) yaml.dump(metadata, metadata_file) metadata_file.seek(0) yield (matrix_file.name, metadata_file.name)
def publish_updating(self): LOG.info("Sending updating request") nsd = open('test/test_descriptors/nsdu.yml', 'r') message = {'NSD': yaml.load(nsd), 'UUID':'937213ae-890b-413c-a11e-45c62c4eee3f'} self.manoconn.call_async(self._on_publish_ins_response, 'specific.manager.registry.ssm.update', yaml.dump(message)) vnfd1 = open('test/test_descriptors/vnfdu.yml', 'r') message = {'VNFD': yaml.load(vnfd1), 'UUID':'754fe4fe-96c9-484d-9683-1a1e8b9a31a3'} self.manoconn.call_async(self._on_publish_ins_response, 'specific.manager.registry.fsm.update', yaml.dump(message)) nsd.close() vnfd1.close()
def publish_terminating(self): nsd = open('test/test_descriptors/nsdt.yml', 'r') message = {'NSD': yaml.load(nsd), 'UUID': '937213ae-890b-413c-a11e-45c62c4eee3f'} self.manoconn.call_async(self._on_publish_ins_response, 'specific.manager.registry.ssm.terminate', yaml.dump(message)) vnfd1 = open('test/test_descriptors/vnfdt1.yml', 'r') message = {'VNFD': yaml.load(vnfd1), 'UUID': 'c32b731f-7eea-4afd-9c60-0b0d0ea37eed'} self.manoconn.call_async(self._on_publish_ins_response, 'specific.manager.registry.fsm.terminate', yaml.dump(message)) vnfd2 = open('test/test_descriptors/vnfdt2.yml', 'r') message = {'VNFD': yaml.load(vnfd2), 'UUID': '754fe4fe-96c9-484d-9683-1a1e8b9a31a3'} self.manoconn.call_async(self._on_publish_ins_response, 'specific.manager.registry.fsm.terminate', yaml.dump(message)) nsd.close() vnfd1.close() vnfd2.close()
def publish_nsd(self): LOG.info("Sending onboard request") nsd = open('test/test_descriptors/nsd.yml', 'r') message = {'NSD': yaml.load(nsd)} self.manoconn.call_async(self._on_publish_nsd_response, 'specific.manager.registry.ssm.on-board', yaml.dump(message)) vnfd1 = open('test/test_descriptors/vnfd1.yml', 'r') message = {'VNFD': yaml.load(vnfd1)} self.manoconn.call_async(self._on_publish_nsd_response, 'specific.manager.registry.fsm.on-board', yaml.dump(message)) vnfd2 = open('test/test_descriptors/vnfd2.yml', 'r') message = {'VNFD': yaml.load(vnfd2)} self.manoconn.call_async(self._on_publish_nsd_response, 'specific.manager.registry.fsm.on-board', yaml.dump(message)) nsd.close() vnfd1.close() vnfd2.close()
def publish_sid(self): LOG.info("Sending instantiate request") nsd = open('test/test_descriptors/nsd.yml', 'r') message = {'NSD': yaml.load(nsd), 'UUID': '937213ae-890b-413c-a11e-45c62c4eee3f'} self.manoconn.call_async(self._on_publish_sid_response, 'specific.manager.registry.ssm.instantiate', yaml.dump(message)) vnfd1 = open('test/test_descriptors/vnfd1.yml', 'r') message = {'VNFD': yaml.load(vnfd1), 'UUID': 'c32b731f-7eea-4afd-9c60-0b0d0ea37eed'} self.manoconn.call_async(self._on_publish_sid_response, 'specific.manager.registry.fsm.instantiate', yaml.dump(message)) vnfd2 = open('test/test_descriptors/vnfd2.yml', 'r') message = {'VNFD': yaml.load(vnfd2), 'UUID': '754fe4fe-96c9-484d-9683-1a1e8b9a31a3'} self.manoconn.call_async(self._on_publish_sid_response, 'specific.manager.registry.fsm.instantiate', yaml.dump(message)) nsd.close() vnfd1.close() vnfd2.close()
def yaml(self): """Serialize the object to yaml""" return yaml.dump(self.data)
def save(self): """Save this config to disk. If the charm is using the :mod:`Services Framework <services.base>` or :meth:'@hook <Hooks.hook>' decorator, this is called automatically at the end of successful hook execution. Otherwise, it should be called directly by user code. To disable automatic saves, set ``implicit_save=False`` on this instance. """ with open(self.path, 'w') as f: json.dump(self, f)
def store_context(self, file_name, config_data): if not os.path.isabs(file_name): file_name = os.path.join(hookenv.charm_dir(), file_name) with open(file_name, 'w') as file_stream: os.fchmod(file_stream.fileno(), 0o600) yaml.dump(config_data, file_stream)
def rewrite_yml(data): with io.open('toc.yml', 'w', encoding='utf8') as outfile: yaml.dump(data, outfile, default_flow_style=False, allow_unicode=True)
def save_yaml_config(filepath, config): with open(filepath, 'w') as f: yaml.dump(config, f, default_flow_style=False)
def save(self, cfg_file=None): """ Save's configuration back to disk. """ if cfg_file is None: cfg_file = self.cfg_file else: # acquire new configuration path cfg_file = abspath(expanduser(cfg_file)) try: with open(cfg_file, 'w') as fp: yaml.dump(self.cfg_data, fp, default_flow_style=False) except IOError, e: logger.debug('%s' % (str(e))) logger.error('Failed to write configuration file %s' % ( cfg_file, )) return False # Update central configuration self.cfg_file = cfg_file if hasattr(self, '__mask_re'): # Presumably something has changed if the user called save so we # destroy our cached mask to be safe del self.__mask_re return True
def insert(data, filename): with open(filename, 'w') as yaml_file: yaml.dump(data, yaml_file)
def enter_data(cont, valinput, filename): #Cont for container#profile.yml stream = open(filename, 'r') prof = yaml.load(stream) # Player information is stored here. prof[cont] = valinput with open(filename, 'w') as yaml_file: yaml_file.write(yaml.dump(prof, default_flow_style = False))
def get_data(cont, filename): stream = open(filename, 'r') profile = yaml.load(stream) # Player information is stored here. data = yaml.dump(profile, default_flow_style = False, encoding=('utf-8')) return profile[cont]
def saveUserPreferences(self) : ''' write user preferences into the related configuration file ''' with open(self._userPrfesPathName, 'w') as outfile: yaml.dump(self._userPrefs, outfile, default_flow_style=False)
def write(data): with open(path, 'w') as file: yaml.dump(data, file, default_flow_style=True)
def load_source(in_f, out_f): reader = csv.reader(in_f) entries = [make_entry(row) for row in reader] yaml.dump(entries, out_f, allow_unicode=True, default_flow_style=False)
def __save_config(self): """Saves the configuration. Saves the config attribute to the configuration file. """ with open(self.USERS_CONF_PATH, "w+") as config_file: config_file.write(yaml.dump(self.__config))