我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用yaml.safe_dump()。
def find_by_id(self, id_): """Return ServiceDef object of the given service. :param str id_: uri of the service definition """ _, name, version = id_.rsplit('/', 2) rel_fname, abs_fname = self.get_fnames(name, version) if self.ss_dir.isfile(rel_fname): return ServiceDef.create_from_file(abs_fname) resp = self.connection.request(method='GET', path=id_) # Write the yaml file with open(abs_fname, 'w+') as f: yaml.safe_dump(resp.json(), f) return ServiceDef.create_from_file(abs_fname)
def format_output(output, format_): if format_ == 'plain': if output is None: return '' if isinstance(output, text_type): if text_type is str: return output else: return output.encode('utf-8') format_ = 'json' # numbers, booleans, lists and dicts will be represented as JSON if format_ == 'json': return json.dumps(output) if format_ == 'yaml': # Usage of safe_dump here is crucial since PyYAML emits # "!!python/unicode" objects from unicode strings by defaul return yaml.safe_dump(output, default_flow_style=False) raise RuntimeError("Unknown format '{}'".format(format_))
def update_release(channel): """ Update release manifests. """ if not os.path.exists("cluster.yml"): error("no cluster.yml found. Did you configure?") with open("cluster.yml") as fp: config = yaml.load(fp.read()) if channel is None: channel = config["release"]["channel"] current_version = config["release"]["version"] configure.release(config, channel) if current_version == config["release"]["version"]: click.echo("No updates available for {} channel".format(channel)) sys.exit(0) with open("cluster.yml", "w") as fp: fp.write(yaml.safe_dump(config, default_flow_style=False)) click.echo("Updated config to {} in {} channel".format(config["release"]["version"], config["release"]["channel"]))
def save(self): """Persist the services to disk""" try: logger.debug("Backing up services") if os.path.exists(self._db): shutil.copyfile(self._db, "{}.bak".format(self._db)) except Exception as e: logger.exception("Failed to backup services") return try: logger.debug("Saving services") with open(self._db, 'w') as f: f.write(yaml.safe_dump(dict(((k, v) for (k, v) in self._services.iteritems( ) if k not in self.reserved_services)), default_flow_style=False, explicit_start=True)) except Exception as e: logger.exception("Failed to save services")
def save(self): """Persist the addressbook to disk""" try: logger.debug("Backing up addressbook") if os.path.exists(self._db): shutil.copyfile(self._db, "{}.bak".format(self._db)) except Exception as e: logger.exception("Failed to backup addressbook") return try: logger.debug("Saving addressbook") with open(self._db, 'w') as f: f.write(yaml.safe_dump(dict(((k, v) for (k, v) in self._addressbook.iteritems( ) if k not in self.reserved_addresses)), default_flow_style=False, explicit_start=True)) except Exception as e: logger.exception("Failed to save addressbook")
def save(self): """Persist the checks to disk""" try: logger.debug("Backing up checks") if os.path.exists(self._db): shutil.copyfile(self._db, "{}.bak".format(self._db)) except Exception as e: logger.exception("Failed to backup checks") return try: logger.debug("Saving checks") with open(self._db, 'w') as f: f.write(yaml.safe_dump(self._checks, default_flow_style=False, explicit_start=True)) except: logger.exception("Failed to save checks")
def save(self): """Persist the chains to disk""" try: logger.debug("Backing up chains") if os.path.exists(self._db): shutil.copyfile(self._db, "{}.bak".format(self._db)) except Exception as e: logger.exception("Failed to backup chains") return try: logger.debug("Saving chains") with open(self._db, 'w') as f: f.write(yaml.safe_dump(self._tables, default_flow_style=False, explicit_start=True)) except: logger.exception("Failed to save chains")
def save(self): """Persist the interfaces to disk""" try: logger.debug("Backing up interfaces") if os.path.exists(self._db): shutil.copyfile(self._db, "{}.bak".format(self._db)) except Exception as e: logger.exception("Failed to backup interfaces") return try: logger.debug("Saving interfaces") with open(self._db, 'w') as f: f.write(yaml.safe_dump(dict(((k, v) for (k, v) in self._interfaces.iteritems( ) if k not in self.reserved_interfaces)), default_flow_style=False, explicit_start=True)) except Exception as e: logger.exception("Failed to save interfaces")
def _update_system_file(system_file, name, new_kvs): """Update the bcbio_system.yaml file with new resource information. """ if os.path.exists(system_file): bak_file = system_file + ".bak%s" % datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S") shutil.copyfile(system_file, bak_file) with open(system_file) as in_handle: config = yaml.load(in_handle) else: utils.safe_makedir(os.path.dirname(system_file)) config = {} new_rs = {} added = False for rname, r_kvs in config.get("resources", {}).items(): if rname == name: for k, v in new_kvs.items(): r_kvs[k] = v added = True new_rs[rname] = r_kvs if not added: new_rs[name] = new_kvs config["resources"] = new_rs with open(system_file, "w") as out_handle: yaml.safe_dump(config, out_handle, default_flow_style=False, allow_unicode=False)
def _setUp(self): super(RealPolicyFixture, self)._setUp() self.policy_dir = self.useFixture(fixtures.TempDir()) self.policy_file = os.path.join(self.policy_dir.path, 'policy.yaml') # Load the fake_policy data and add the missing default rules. policy_rules = yaml.safe_load(fake_policy.policy_data) self.add_missing_default_rules(policy_rules) with open(self.policy_file, 'w') as f: yaml.safe_dump(policy_rules, f) policy_opts.set_defaults(CONF) self.useFixture( ConfPatcher(policy_dirs=[], policy_file=self.policy_file, group='oslo_policy')) deckhand.policy.reset() deckhand.policy.init() self.addCleanup(deckhand.policy.reset) if self.verify: self._install_policy_verification_hook()
def process_response(self, req, resp, resource): """Converts responses to ``application/x-yaml`` content type.""" if resp.status != '204 No Content': resp.set_header('Content-Type', 'application/x-yaml') for attr in ('body', 'data'): if not hasattr(resp, attr): continue resp_attr = getattr(resp, attr) try: resp_attr = json.loads(resp_attr) except (TypeError, ValueError): pass if isinstance(resp_attr, dict): setattr(resp, attr, yaml.safe_dump(resp_attr)) elif isinstance(resp_attr, (list, tuple)): setattr(resp, attr, yaml.safe_dump_all(resp_attr))
def json_to_yaml(filename): ''' This function convert json file to yaml file :param filename: filename :type filename: str :return: None ''' try: file=open(filename+".json","r") json_data=json.loads(file.read()) yaml_file = open(filename + ".yaml", "w") yaml.safe_dump(json_data,yaml_file,default_flow_style=False) file.close() yaml_file.close() except FileNotFoundError: print("[Error] Bad Input File")
def test_run_heat_config(self): with self.write_config_file(self.data) as config_file: env = os.environ.copy() env.update({ 'HEAT_DOCKER_COMPOSE_WORKING': self.docker_compose_dir.join(), 'HEAT_SHELL_CONFIG': config_file.name }) returncode, stdout, stderr = self.run_cmd( [self.heat_config_docker_compose_path], env) self.assertEqual(0, returncode, stderr) compose_yml = self.docker_compose_dir.join( 'abcdef001/docker-compose.yml') with open(compose_yml) as f: self.assertEqual(yaml.safe_dump( self.data[0].get('config'), default_flow_style=False), f.read())
def output(python_object, format="raw", pager=False): if format == 'yaml': output_string = yaml.safe_dump(python_object, default_flow_style=False, encoding='utf-8', allow_unicode=True) elif format == 'json': output_string = json.dumps(python_object, sort_keys=4, indent=4) elif format == 'raw': output_string = str(python_object) elif format == 'pformat': output_string = pprint.pformat(python_object) else: raise Exception("No valid output format provided. Supported: 'yaml', 'json', 'raw', 'pformat'") if pager: click.echo_via_pager(output_string) else: click.echo(output_string)
def write_metadata(path, meta='.meta.yaml', **params): """Writes metadata for a dataset. Args: path (str): path to **dataset** (not meta file) whose metadata is to be written. If the meta file already exists, it will be overwritten. meta (str): suffix identifying the dataset's meta file **params: all other keyword arguments are treated as dataset attributes, and added to the meta file """ if 'n_channels' in params: del params['n_channels'] if 'n_samples' in params: del params['n_samples'] if os.path.isdir(path): metafile = os.path.join(path, meta[1:]) else: metafile = path + meta for k, v in params.items(): if isinstance(v, (np.ndarray, np.generic)): params[k] = v.tolist() with codecs.open(metafile, 'w', encoding='utf-8') as yaml_file: yaml_file.write(yaml.safe_dump(params, default_flow_style=False))
def main(): parser = argparse.ArgumentParser() parser.add_argument("--old", help="old password file", required=True) parser.add_argument("--new", help="new password file", required=True) parser.add_argument("--final", help="merged password file", required=True) args = parser.parse_args() with open(args.old, "r") as old_file: old_passwords = yaml.safe_load(old_file) with open(args.new, "r") as new_file: new_passwords = yaml.safe_load(new_file) new_passwords.update(old_passwords) with open(args.final, "w") as destination: yaml.safe_dump(new_passwords, destination, default_flow_style=False)
def serialized(obj, status=200): fmt = get_serializer() if fmt == 'json': ser = json.dumps ct = 'application/json' elif fmt == 'yaml': ser = yaml.safe_dump ct = 'text/plain+yaml' # For interop with browsers elif fmt is None: return None else: abort(404) data = ser(obj) resp = make_response(data, 200) resp.headers['Content-Type'] = ct return resp # Authentication
def base_update_meta(self, meta_version, force=False): try: meta_version = normalize_meta_version(meta_version) except Exception, e: raise InvalidMetaVersion(e) if meta_version == self.meta_version and not force: return 'meta_version is already latest' meta = self.fetch_meta(meta_version) if not isinstance(meta, dict): return None self.check_giturl(meta, update=True) meta['giturl'] = self.giturl self.meta = yaml.safe_dump(meta, default_style='"') self.meta_version = meta_version if self.appname != meta['appname']: raise InvalidLainYaml("appname dont match: %s" % meta) self.save() return 'meta updated'
def _write_galaxy_install_info(self): """ Writes a YAML-formatted file to the role's meta/ directory (named .galaxy_install_info) which contains some information we can use later for commands like 'list' and 'info'. """ info = dict( version=self.version, install_date=datetime.datetime.utcnow().strftime("%c"), ) info_path = os.path.join(self.path, self.META_INSTALL) with open(info_path, 'w+') as f: try: self._install_info = yaml.safe_dump(info, f) except: return False return True
def update_write_config(config_file, update_dict): """Update a given configuration file with updated values. If the configuration file does not exist, a new one is created. Args: config_file (str): the location of the config file to update update_dict (dict): the items to update in the config file """ if not os.path.exists(config_file): with open(config_file, 'a'): pass with open(config_file, 'r') as f: config_dict = yaml.safe_load(f.read()) or {} for key, value in update_dict.items(): loader = get_section_loader(key) loader.update(config_dict, value) with open(config_file, 'w') as f: yaml.safe_dump(config_dict, f)
def toYAML(self, **options): """ Serializes this Munch to YAML, using `yaml.safe_dump()` if no `Dumper` is provided. See the PyYAML documentation for more info. >>> b = Munch(foo=['bar', Munch(lol=True)], hello=42) >>> import yaml >>> yaml.safe_dump(b, default_flow_style=True) '{foo: [bar, {lol: true}], hello: 42}\\n' >>> b.toYAML(default_flow_style=True) '{foo: [bar, {lol: true}], hello: 42}\\n' >>> yaml.dump(b, default_flow_style=True) '!munch.Munch {foo: [bar, !munch.Munch {lol: true}], hello: 42}\\n' >>> b.toYAML(Dumper=yaml.Dumper, default_flow_style=True) '!munch.Munch {foo: [bar, !munch.Munch {lol: true}], hello: 42}\\n' """ opts = dict(indent=4, default_flow_style=False) opts.update(options) if 'Dumper' not in opts: return yaml.safe_dump(self, **opts) else: return yaml.dump(self, **opts)
def write(self): ''' write to file ''' if not self.filename: raise YeditException('Please specify a filename.') if self.backup and self.file_exists(): shutil.copy(self.filename, self.filename + '.orig') tmp_filename = self.filename + '.yedit' try: with open(tmp_filename, 'w') as yfd: yml_dump = yaml.safe_dump(self.yaml_dict, default_flow_style=False) for line in yml_dump.strip().split('\n'): if '{{' in line and '}}' in line: yfd.write(line.replace("'{{", '"{{').replace("}}'", '}}"') + '\n') else: yfd.write(line + '\n') except Exception as err: raise YeditException(err.message) os.rename(tmp_filename, self.filename) return (True, self.yaml_dict)
def generate_group_vars_all(self): path = 'result/{}/group_vars/all'.format(self.grid_name) variables = AutoDict() hosts_entries = AutoDict() with open('result/{}/infrastructure/terraform.tfstate'.format( self.grid_name), 'r') as json_file: json_data = json.load(json_file) for module in json_data['modules']: for resource, value in module['resources'].iteritems(): if value['type'] == 'google_compute_instance': host = '{}.node.{}'.format( value['primary']['attributes'][ 'name'], self.grid_name) ip = value['primary']['attributes']['network_interface.0.address'] hosts_entries['hosts'][str(host)] = str(ip) variables['hosts'] = json.dumps(hosts_entries['hosts']) variables['grid_name'] = self.current_grid.name variables['terminal_ip'] = self._nameserver() self._generate_template(path, variables) vars_json = json.loads(self.current_config.vars) vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False) with open(path, "a") as yaml_file: yaml_file.write(vars_yaml)
def generate_group_vars_all(self): path = 'result/{}/group_vars/all'.format(self.grid_name) variables = AutoDict() hosts_entries = AutoDict() with open('result/{}/infrastructure/terraform.tfstate'.format(self.grid_name), 'r') as json_file: json_data = json.load(json_file) for module in json_data['modules']: for resource, value in module['resources'].iteritems(): if value['type'] == 'azure_instance': host = '{}.node.{}'.format(value['primary']['attributes']['name'], self.grid_name) ip = value['primary']['attributes']['ip_address'] hosts_entries['hosts'][str(host)] = str(ip) variables['hosts'] = json.dumps(hosts_entries['hosts']) variables['grid_name'] = self.current_grid.name variables['terminal_ip'] = self._nameserver() self._generate_template(path, variables) vars_json = json.loads(self.current_config.vars) vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False) with open(path, "a") as yaml_file: yaml_file.write(vars_yaml)
def generate_group_vars_all(self): path = 'result/{}/group_vars/all'.format(self.grid_name) variables = AutoDict() hosts_entries = AutoDict() for group in self.current_groups: for ip in group.groupips.split(','): hostname = ip.replace('.','-') host = '{}.node.{}'.format(hostname, self.grid_name) hosts_entries['hosts'][str(host)] = str(ip) for ip in self.current_config.mastersips.split(','): hostname = ip.replace('.','-') host = '{}.node.{}'.format(hostname, self.grid_name) hosts_entries['hosts'][str(host)] = str(ip) terminal_ip = self.current_config.terminalips.split(',')[1] terminal_hostname = terminal_ip.replace('.','-') terminal_host = '{}.node.{}'.format(terminal_hostname, self.grid_name) hosts_entries['hosts'][str(terminal_host)] = str(terminal_ip) variables['hosts'] = json.dumps(hosts_entries['hosts']) variables['grid_name'] = self.current_grid.name variables['terminal_ip'] = self._nameserver() self._generate_template(path, variables) vars_json = json.loads(self.current_config.vars) vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False) with open(path, "a") as yaml_file: yaml_file.write(vars_yaml)
def generate_group_vars_all(self): path = 'result/{}/group_vars/all'.format(self.grid_name) variables = AutoDict() hosts_entries = AutoDict() with open('result/{}/infrastructure/terraform.tfstate'.format(self.grid_name), 'r') as json_file: json_data = json.load(json_file) for module in json_data['modules']: for resource, value in module['resources'].iteritems(): if value['type'] == 'aws_instance': hostname = value['primary']['attributes']['private_dns'].split('.')[0] host = '{}.node.{}'.format(hostname, self.grid_name) ip = value['primary']['attributes']['private_ip'] hosts_entries['hosts'][str(host)] = str(ip) variables['hosts'] = json.dumps(hosts_entries['hosts']) variables['grid_name'] = self.current_grid.name variables['terminal_ip'] = self._nameserver() self._generate_template(path, variables) vars_json = json.loads(self.current_config.vars) vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False) with open(path, "a") as yaml_file: yaml_file.write(vars_yaml)
def generate_group_vars_all(self): path = 'result/{}/group_vars/all'.format(self.grid_name) variables = AutoDict() hosts_entries = AutoDict() with open('result/{}/infrastructure/terraform.tfstate'.format( self.grid_name), 'r') as json_file: json_data = json.load(json_file) for module in json_data['modules']: for resource, value in module['resources'].iteritems(): if value['type'] == 'google_compute_instance': host = '{}.node.{}'.format( value['primary']['attributes'][ 'name'], self.grid_name) ip = value['primary']['attributes']['network_interface.0.address'] hosts_entries['hosts'][str(host)] = str(ip) variables['hosts'] = json.dumps(hosts_entries['hosts']) variables['grid_name'] = self.current_grid.name self._generate_template(path, variables) vars_json = json.loads(self.current_config.vars) vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False) with open(path, "a") as yaml_file: yaml_file.write(vars_yaml)
def generate_group_vars_all(self): path = 'result/{}/group_vars/all'.format(self.grid_name) variables = AutoDict() hosts_entries = AutoDict() with open('result/{}/infrastructure/terraform.tfstate'.format( self.grid_name), 'r') as json_file: json_data = json.load(json_file) for module in json_data['modules']: for resource, value in module['resources'].iteritems(): if value['type'] == 'azure_instance': host = '{}.node.{}'.format( value['primary']['attributes'][ 'name'], self.grid_name) ip = value['primary']['attributes']['ip_address'] hosts_entries['hosts'][str(host)] = str(ip) variables['hosts'] = json.dumps(hosts_entries['hosts']) variables['grid_name'] = self.current_grid.name self._generate_template(path, variables) vars_json = json.loads(self.current_config.vars) vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False) with open(path, "a") as yaml_file: yaml_file.write(vars_yaml)
def generate_group_vars_all(self): path = 'result/{}/group_vars/all'.format(self.grid_name) variables = AutoDict() hosts_entries = AutoDict() for group in self.current_groups: for ip in group.groupips.split(','): hostname = ip.replace('.','-') host = '{}.node.{}'.format(hostname, self.grid_name) hosts_entries['hosts'][str(host)] = str(ip) for ip in self.current_config.mastersips.split(','): hostname = ip.replace('.','-') host = '{}.node.{}'.format(hostname, self.grid_name) hosts_entries['hosts'][str(host)] = str(ip) terminal_ip = self.current_config.terminalips.split(',')[1] terminal_hostname = terminal_ip.replace('.','-') terminal_host = '{}.node.{}'.format(terminal_hostname, self.grid_name) hosts_entries['hosts'][str(terminal_host)] = str(terminal_ip) variables['hosts'] = json.dumps(hosts_entries['hosts']) variables['grid_name'] = self.current_grid.name self._generate_template(path, variables) vars_json = json.loads(self.current_config.vars) vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False) with open(path, "a") as yaml_file: yaml_file.write(vars_yaml)
def generate_group_vars_all(self): path = 'result/{}/group_vars/all'.format(self.grid_name) variables = AutoDict() hosts_entries = AutoDict() with open('result/{}/infrastructure/terraform.tfstate'.format( self.grid_name), 'r') as json_file: json_data = json.load(json_file) for module in json_data['modules']: for resource, value in module['resources'].iteritems(): if value['type'] == 'aws_instance': hostname = value['primary']['attributes'][ 'private_dns'].split('.')[0] host = '{}.node.{}'.format(hostname, self.grid_name) ip = value['primary']['attributes']['private_ip'] hosts_entries['hosts'][str(host)] = str(ip) variables['hosts'] = json.dumps(hosts_entries['hosts']) variables['grid_name'] = self.current_grid.name self._generate_template(path, variables) vars_json = json.loads(self.current_config.vars) vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False) with open(path, "a") as yaml_file: yaml_file.write(vars_yaml)
def generate_group_vars_all(self): path = 'result/{}/group_vars/all'.format(self.grid_name) variables = AutoDict() hosts_entries = AutoDict() with open('result/{}/infrastructure/terraform.tfstate'.format( self.grid_name), 'r') as json_file: json_data = json.load(json_file) for module in json_data['modules']: for resource, value in module['resources'].iteritems(): if value['type'] == 'openstack_compute_instance_v2': host = '{}.node.{}'.format( value['primary']['attributes'][ 'name'], self.grid_name) ip = value['primary']['attributes']['network.0.fixed_ip_v4'] hosts_entries['hosts'][str(host)] = str(ip) variables['hosts'] = json.dumps(hosts_entries['hosts']) variables['grid_name'] = self.current_grid.name self._generate_template(path, variables) vars_json = json.loads(self.current_config.vars) vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False) with open(path, "a") as yaml_file: yaml_file.write(vars_yaml)
def export_conf(conf, conf_file, exclude=None): """Export configuration file.""" exclude = set() if exclude is None else set(exclude) if not conf_file: print('( ) No configuration file provided') return if op.isfile(conf_file): print('(!) File "{0}" exists, skipping export'.format(conf_file)) return with open(conf_file, 'w') as f: # The dict() is also to get rid of the SemiFrozenDict dumped = dict(item for item in conf.items() if item[0] not in exclude) f.write(yaml.safe_dump(dumped, default_flow_style=False, allow_unicode=True)) print('( ) File "{0}" created with configuration'.format(conf_file))
def create_enable_file(certpem, keypem, source_dir, dest_dir, tht_release): output_dict = _open_yaml("{}environments/enable-tls.yaml".format(source_dir)) if tht_release not in ['master', 'newton']: for key in output_dict["parameter_defaults"]["EndpointMap"]: if output_dict["parameter_defaults"]["EndpointMap"][key]["host"] == "CLOUDNAME": output_dict["parameter_defaults"]["EndpointMap"][key]["host"] = "IP_ADDRESS" output_dict["parameter_defaults"]["SSLCertificate"] = certpem output_dict["parameter_defaults"]["SSLKey"] = keypem output_dict["resource_registry"]["OS::TripleO::NodeTLSData"] = \ "{}/puppet/extraconfig/tls/tls-cert-inject.yaml".format(source_dir) with open("{}enable-tls.yaml".format(dest_dir), "w") as stream: yaml.safe_dump(output_dict, stream, default_style='|')
def relation_set(relation_id=None, relation_settings=None, **kwargs): """Set relation information for the current unit""" relation_settings = relation_settings if relation_settings else {} relation_cmd_line = ['relation-set'] accepts_file = "--file" in subprocess.check_output( relation_cmd_line + ["--help"], universal_newlines=True) if relation_id is not None: relation_cmd_line.extend(('-r', relation_id)) settings = relation_settings.copy() settings.update(kwargs) for key, value in settings.items(): # Force value to be a string: it always should, but some call # sites pass in things like dicts or numbers. if value is not None: settings[key] = "{}".format(value) if accepts_file: # --file was introduced in Juju 1.23.2. Use it by default if # available, since otherwise we'll break if the relation data is # too big. Ideally we should tell relation-set to read the data from # stdin, but that feature is broken in 1.23.2: Bug #1454678. with tempfile.NamedTemporaryFile(delete=False) as settings_file: settings_file.write(yaml.safe_dump(settings).encode("utf-8")) subprocess.check_call( relation_cmd_line + ["--file", settings_file.name]) os.remove(settings_file.name) else: for key, value in settings.items(): if value is None: relation_cmd_line.append('{}='.format(key)) else: relation_cmd_line.append('{}={}'.format(key, value)) subprocess.check_call(relation_cmd_line) # Flush cache of any relation-gets for local unit flush(local_unit())
def yaml(self, output): """Output data in YAML format""" import yaml yaml.safe_dump(output, self.outfile)
def yaml_format(data): return yaml.safe_dump(dict(data), default_flow_style=False, explicit_start=True, explicit_end=True)
def show(connect, **kwargs): config = connect.get_config() if config: s = yaml.safe_dump(config, encoding='utf-8', allow_unicode=True) sys.stdout.write(s) else: print "FastScore not configured (use 'fastscore config set')"
def dump_yaml(tables, outdir, outfilename): stream = file(os.path.join(outdir, outfilename+'.yaml'), 'wb') yaml.safe_dump(tables, stream, allow_unicode=True) stream.close() # # Process files listed on command line, or all .xls files in current dir if no # args given #
def save_errorlog(errorlog): with open('out/errors.yml', 'w') as outfile: yaml.safe_dump(errorlog, outfile, default_flow_style=False)
def append_config(file_name, p_domain, u_domain): """ Append values into a yaml file """ with open(file_name) as yfile: doc = yaml.load(yfile) doc['credentials']['abot-epc']['abot-epc']['project-domain-name'] = ( p_domain) doc['credentials']['abot-epc']['abot-epc']['user-domain-name'] = ( u_domain) with open(file_name, 'w') as yfile: yaml.safe_dump(doc, yfile, default_flow_style=False)