我们从Python开源项目中,提取了以下32个代码示例,用于说明如何使用yaml.Dumper()。
def load_config(): obj = { 'cache_dir': '%s/.cache/gpx' % os.environ['HOME'], 'enable_upload': True, 'overpass_server': 'http://overpass-api.de/api/interpreter', } config_file = '%s/.gpx_upload.yaml' % os.environ['HOME'] try: with open(config_file, 'r') as f: loaded = yaml.load(f, Loader=yaml.Loader) for key in loaded.keys(): obj[key] = loaded[key] except IOError: try: with open(config_file, 'w') as f: f.write(yaml.dump(obj, Dumper=yaml.Dumper)) except IOError: pass return obj
def toYAML(self, **options): """ Serializes this Munch to YAML, using `yaml.safe_dump()` if no `Dumper` is provided. See the PyYAML documentation for more info. >>> b = Munch(foo=['bar', Munch(lol=True)], hello=42) >>> import yaml >>> yaml.safe_dump(b, default_flow_style=True) '{foo: [bar, {lol: true}], hello: 42}\\n' >>> b.toYAML(default_flow_style=True) '{foo: [bar, {lol: true}], hello: 42}\\n' >>> yaml.dump(b, default_flow_style=True) '!munch.Munch {foo: [bar, !munch.Munch {lol: true}], hello: 42}\\n' >>> b.toYAML(Dumper=yaml.Dumper, default_flow_style=True) '!munch.Munch {foo: [bar, !munch.Munch {lol: true}], hello: 42}\\n' """ opts = dict(indent=4, default_flow_style=False) opts.update(options) if 'Dumper' not in opts: return yaml.safe_dump(self, **opts) else: return yaml.dump(self, **opts)
def create(self, flow, *tasks, deps=None): if not deps: deps = {} Flow(flow, tasks, deps) dep_lines = list(map(lambda x: x[0] + '->' + ','.join(x[1]), deps.items())) create_flow = { 'tasks': list(tasks), 'deps': dep_lines } class IndentDumper(yaml.Dumper): def increase_indent(self, flow=False, indentless=False): return super(IndentDumper, self).increase_indent(flow, False) os.makedirs(self.flow_dir, exist_ok=True) flow_file = os.path.join(self.flow_dir, flow + '.yml') with open(flow_file, 'w') as f: yaml.dump(create_flow, f, Dumper=IndentDumper, default_flow_style=False)
def convert_yaml(data): """ ***Converter Special *** Convert data structure to yaml format :param data: OrderedDict to convert :return: yaml formated data """ ordered = (type(data).__name__ == 'OrderedDict') dict_type = 'dict' if ordered: dict_type = 'OrderedDict' sdata = _ordered_dump(data, Dumper=yaml.SafeDumper, version=yaml_version, indent=indent_spaces, block_seq_indent=2, width=12288, allow_unicode=True, default_flow_style=False) else: sdata = yaml.dump(data, Dumper=yaml.SafeDumper, indent=indent_spaces, block_seq_indent=2, width=12288, allow_unicode=True, default_flow_style=False) sdata = _format_yaml_dump(sdata) return sdata
def _ordered_dump(data, stream=None, Dumper=yaml.Dumper, **kwds): """ Ordered yaml dumper Use this instead ot yaml.Dumper/yaml.SaveDumper to get an Ordereddict :param stream: stream to write to :param Dumper: yaml-dumper to use :**kwds: Additional keywords :return: OrderedDict structure """ # usage example: ordered_dump(data, Dumper=yaml.SafeDumper) class OrderedDumper(Dumper): pass def _dict_representer(dumper, data): return dumper.represent_mapping( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, data.items()) OrderedDumper.add_representer(OrderedDict, _dict_representer) return yaml.dump(data, stream, OrderedDumper, **kwds)
def write_configuration_file(filename, conf): """Write a config dict to the specified file, in yaml format""" import shutil try: shutil.copy2(filename, '%s.back' % filename) except IOError: pass try: os.makedirs(os.path.dirname(filename)) except OSError: pass with open(filename, 'w') as filep: yaml.dump(conf, filep, indent=4, default_flow_style=False, Dumper=Dumper)
def ordered_dump(data, stream=None, dumper_class=yaml.Dumper, default_flow_style=False, **kwds): indent = 4 line_break = '\n' # pylint: disable=too-many-ancestors class OrderedDumper(dumper_class): pass def _dict_representer(dumper, data): return dumper.represent_mapping( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, data.items()) OrderedDumper.add_representer(OrderedDict, _dict_representer) kwds.update({'default_flow_style': default_flow_style, 'indent': indent, 'line_break': line_break}) # WARN: if stream is provided, return value is None return yaml.dump(data, stream, OrderedDumper, **kwds)
def ordered_dump(data, stream=None, Dumper=yaml.Dumper, **kwds): class OrderedDumper(Dumper): pass def _dict_representer(dumper, data): return dumper.represent_mapping(yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, data.items()) OrderedDumper.add_representer(OrderedDict, _dict_representer) return yaml.dump(data, stream, OrderedDumper, **kwds)
def dumps(obj): return yaml.dump(obj, Dumper=Dumper).encode('utf-8')
def __write_yaml(data, dump, file): data = __escape_str(data) if file: create_file(file) with open(file, "w", encoding="utf-8") as stream: return dump(data, stream, Dumper=Dumper, default_flow_style=False, encoding="utf-8") else: return dump(data, Dumper=Dumper, default_flow_style=False)
def dump_ordered_yaml(ordered_data, output_filename, Dumper=yaml.Dumper): class OrderedDumper(Dumper): pass class UnsortableList(list): def sort(self, *args, **kwargs): pass class UnsortableOrderedDict(OrderedDict): def items(self, *args, **kwargs): return UnsortableList(OrderedDict.items(self, *args, **kwargs)) OrderedDumper.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict) with open(output_filename, "w") as f: yaml.dump(ordered_data, f, Dumper=OrderedDumper)
def convert(self): """ conversion of the raml info into the word document :return: """ try: parsetree = ramlparser.load(self.inputname) except ValidationError as e: print ('validation error:', e.errors) print ("could not load file: error loading file") traceback.print_exc() return # make it a member.. self.parsetree = parsetree self.list_x_resources(parsetree) # print parsetree # output = dump(parsetree, Dumper=Dumper,default_flow_style=False) # output = dump(parsetree, Dumper=SafeDumper) # print output try: self.document = Document(docx=self.resourcedoc) except: print ("could not load file: ", self.resourcedoc) print ("make sure that docx file exist..") return self.generate_sections(parsetree, self.resource_name) self.document.save(self.resource_out) print ("document saved..", self.resource_out)
def convert(self): try: parsetree = ramlparser.load(self.inputname) except ValidationError as e: print 'validation error:', e.errors #print "could not load file: error loading file" #traceback.print_exc() return # make it a member.. self.parsetree = parsetree self.listXResources(parsetree) #print parsetree #output = dump(parsetree, Dumper=Dumper,default_flow_style=False) #output = dump(parsetree, Dumper=SafeDumper) #print output try: self.document = Document(docx=self.resourcedoc) except: print "could not load file: ", self.resourcedoc print "make sure that docx file exist in same directory as executable" return self.generateSections(parsetree, self.ResourceName) self.document.save(self.resourceout)
def sync_config_yaml(config_metadata_file_path, config_file_path): """ Create configuration file from configuration metadata file. Args: config_metadata_file_path (str): configuration metadata file path config_file_path (str): configuration file path """ with open(config_metadata_file_path) as config_metadata_file: config_metadata = yaml.safe_load(config_metadata_file) config = {} for command_name, command_dict in config_metadata["commands"].items(): config[command_name] = {} for option_name in command_dict["options"]: option_default = config_metadata["options"][option_name]["default"] config[command_name][option_name] = option_default # Do not use aliases in the YAML to make it simpler class NoAliasDumper(yaml.Dumper): def ignore_aliases(self, _data): return True with open(config_file_path, "w") as config_file: config_file.write(yaml.dump(config, default_flow_style=False, Dumper=NoAliasDumper))
def _yaml_save_roundtrip(filename, data): """ Dump yaml using the RoundtripDumper and correct linespacing in output file """ sdata = yaml.dump(data, Dumper=yaml.RoundTripDumper, version=yaml_version, indent=indent_spaces, block_seq_indent=2, width=12288, allow_unicode=True) ldata = sdata.split('\n') rdata = [] for index, line in enumerate(ldata): # Fix for ruamel.yaml handling: Reinsert empty line before comment of next section if len(line.lstrip()) > 0 and line.lstrip()[0] == '#': indentcomment = len(line) - len(line.lstrip(' ')) indentprevline = len(ldata[index-1]) - len(ldata[index-1].lstrip(' ')) if indentprevline - indentcomment >= 2*indent_spaces: rdata.append('') rdata.append(line) # Fix for ruamel.yaml handling: Remove empty line with spaces that have been inserted elif line.strip() == '' and line != '': if ldata[index-1] != '': rdata.append(line) else: rdata.append(line) sdata = '\n'.join(rdata) if sdata[0] == '\n': sdata =sdata[1:] with open(filename+'.yaml', 'w') as outfile: outfile.write( sdata )
def dump(data, stream=None, safe=False, many=False, **kwargs): kwargs.setdefault('default_flow_style', False) Dumper = SafeCustomDumper if safe else CustomDumper if not many: data = [data] return yaml.dump_all(data, stream, Dumper, **kwargs)
def get_YAML(ngtree): """Returns an ngtree as YAML Object""" ytree = yaml.dump(ngtree, Dumper=yaml.Dumper, default_flow_style=False) return ytree
def ordered_dump(data, stream=None, Dumper=yaml.Dumper, **kwds): # ordered_dump(data, Dumper=yaml.SafeDumper) class OrderedDumper(Dumper): pass def _dict_representer(dumper, data): return dumper.represent_mapping( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, data.items()) OrderedDumper.add_representer(OrderedDict, _dict_representer) return yaml.dump(data, stream, OrderedDumper, **kwds)
def save_results(self, verbose): if self.tested_frame_coutner > 0: ave_fp = self.cum_fp / self.tested_frame_coutner ave_fn = self.cum_fn / self.tested_frame_coutner ave_wfp = self.cum_wfp / self.tested_frame_coutner ave_wfn = self.cum_wfn / self.tested_frame_coutner else: ave_fp = 0. ave_fn = 0. ave_wfp = 0. ave_wfn = 0. if verbose: print("Tested frame count: {:d}".format(self.tested_frame_coutner)) print("Avg. false positives: {:.2f}\nAvg. false negatives: {:.2f}".format( ave_fp, ave_fn)) print("Avg. weighted false positives: {:.2f}\nAvg. weighted false negatives: {:.2f}".format( ave_wfp, ave_wfn)) out = {"average_false_positives": float(ave_fp), "average_false_negatives": float(ave_fn), "average_weighted_false_positives": float(ave_wfp), "average_weighted_false_negatives": float(ave_wfn), "tested_frame_count": self.tested_frame_coutner, "args": self.args_dict} out_file = open(os.path.join(self.datapath, self.output_file), "w", encoding="utf_8") dump(out, out_file, Dumper=Dumper) out_file.close() return 0
def format(self, record): return yaml.dump(vars(record), Dumper=Dumper) + "\n---\n"
def save_data_yaml(self, filename=None, *args): ''' Save or show the results in YAML ''' if filename: self.stream = open(filename, 'w') else: self.stream = sys.stdout try: for game_log, stats in args: print >> self.stream, '### Stats per second of the log file %s:\n' % game_log print >> self.stream, '# Zero is significant (count missing lines): %s' % str(self.significantzero) print >> self.stream, yaml.dump(stats, default_flow_style=False, Dumper=Dumper) print >> self.stream, '---' # YAML objects separator except: print >> self.stream, yaml.dump_all(args, default_flow_style=False, Dumper=Dumper)
def dump_config(config_path, config, dumper=yaml.Dumper): dirname = os.path.dirname(config_path) if not os.path.exists(dirname): os.makedirs(dirname) with codecs.open(config_path, mode='w', encoding='utf-8') as stream: stream.write(to_utf8(yaml.dump(config, Dumper=dumper)))
def to_yaml(obj, stream=None, dumper_cls=yaml.Dumper, default_flow_style=False, **kwargs): """ Serialize a Python object into a YAML stream with OrderedDict and default_flow_style defaulted to False. If stream is None, return the produced string instead. OrderedDict reference: http://stackoverflow.com/a/21912744 default_flow_style reference: http://stackoverflow.com/a/18210750 :param data: python object to be serialized :param stream: to be serialized to :param Dumper: base Dumper class to extend. :param kwargs: arguments to pass to to_dict :return: stream if provided, string if stream is None """ class OrderedDumper(dumper_cls): pass def dict_representer(dumper, data): return dumper.represent_mapping( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, data.items()) OrderedDumper.add_representer(OrderedDict, dict_representer) obj_dict = to_dict(obj, **kwargs) return yaml.dump(obj_dict, stream, OrderedDumper, default_flow_style=default_flow_style)
def save(self, filename): '''Save the model to file in YAML format.''' with open(filename, 'w') as fout: dump(self, fout, Dumper=Dumper)
def main(): Setting.generate_missing_shorthands() defaults = Setting.generate_defaults_dict() conf_parser = \ Setting.generate_parser(defaults, console_only=True, description= "Use one or more .mp4 video files to perform calibration: " + "find the cameras' intrinsics and/or extrinsics.") # ============== STORAGE/RETRIEVAL OF CONSOLE SETTINGS ===========================================# args, remaining_argv = conf_parser.parse_known_args() defaults[Setting.save_settings.name] = args.save_settings if args.settings_file: defaults[Setting.settings_file.name] = args.settings_file if osp.isfile(args.settings_file): file_stream = open(args.settings_file, "r", encoding="utf-8") config_defaults = load(file_stream, Loader=Loader) file_stream.close() for key, value in config_defaults.items(): defaults[key] = value else: raise ValueError("Settings file not found at: {0:s}".format(args.settings_file)) parser = Setting.generate_parser(defaults, parents=[conf_parser]) args = parser.parse_args(remaining_argv) # process "special" setting values if args.folder == "!settings_file_location": if args.settings_file and osp.isfile(args.settings_file): args.folder = osp.dirname(args.settings_file) # save settings if prompted to do so if args.save_settings and args.settings_file: setting_dict = vars(args) file_stream = open(args.settings_file, "w", encoding="utf-8") file_name = setting_dict[Setting.save_settings.name] del setting_dict[Setting.save_settings.name] del setting_dict[Setting.settings_file.name] dump(setting_dict, file_stream, Dumper=Dumper) file_stream.close() setting_dict[Setting.save_settings.name] = file_name setting_dict[Setting.settings_file.name] = True app = MultiStereoApplication(args)
def main(): Setting.generate_missing_shorthands() defaults = Setting.generate_defaults_dict() conf_parser = \ Setting.generate_parser(defaults, console_only=True, description= "Test stereo algorithms on two image files.") # ============== STORAGE/RETRIEVAL OF CONSOLE SETTINGS ===========================================# args, remaining_argv = conf_parser.parse_known_args() defaults[Setting.save_settings.name] = args.save_settings if args.settings_file: defaults[Setting.settings_file.name] = args.settings_file if osp.isfile(args.settings_file): file_stream = open(args.settings_file, "r", encoding="utf-8") config_defaults = load(file_stream, Loader=Loader) file_stream.close() if config_defaults: for key, value in config_defaults.items(): defaults[key] = value else: raise ValueError("Settings file not found at: {0:s}".format(args.settings_file)) parser = Setting.generate_parser(defaults, parents=[conf_parser]) args = parser.parse_args(remaining_argv) # process "special" setting values if args.folder == "!settings_file_location": if args.settings_file and osp.isfile(args.settings_file): args.folder = osp.dirname(args.settings_file) # save settings if prompted to do so if args.save_settings and args.settings_file: setting_dict = vars(args) file_stream = open(args.settings_file, "w", encoding="utf-8") file_name = setting_dict[Setting.save_settings.name] del setting_dict[Setting.save_settings.name] del setting_dict[Setting.settings_file.name] dump(setting_dict, file_stream, Dumper=Dumper) file_stream.close() setting_dict[Setting.save_settings.name] = file_name setting_dict[Setting.settings_file.name] = True app = StereoMatcherApp(args) app.disparity2()
def main(): Setting.generate_missing_shorthands() defaults = Setting.generate_defaults_dict() conf_parser = \ Setting.generate_parser(defaults, console_only=True, description= "Use one or more .mp4 video files to perform calibration: " + "find the cameras' intrinsics and/or extrinsics.") # ============== STORAGE/RETRIEVAL OF CONSOLE SETTINGS ===========================================# args, remaining_argv = conf_parser.parse_known_args() defaults[Setting.save_settings.name] = args.save_settings if args.settings_file: defaults[Setting.settings_file.name] = args.settings_file if osp.isfile(args.settings_file): file_stream = open(args.settings_file, "r", encoding="utf-8") config_defaults = load(file_stream, Loader=Loader) file_stream.close() for key, value in config_defaults.items(): defaults[key] = value else: raise ValueError("Settings file not found at: {0:s}".format(args.settings_file)) parser = Setting.generate_parser(defaults, parents=[conf_parser]) args = parser.parse_args(remaining_argv) # process "special" setting values if args.folder == "!settings_file_location": if args.settings_file and osp.isfile(args.settings_file): args.folder = osp.dirname(args.settings_file) # save settings if prompted to do so if args.save_settings and args.settings_file: setting_dict = vars(args) file_stream = open(args.settings_file, "w", encoding="utf-8") file_name = setting_dict[Setting.save_settings.name] del setting_dict[Setting.save_settings.name] del setting_dict[Setting.settings_file.name] dump(setting_dict, file_stream, Dumper=Dumper) file_stream.close() setting_dict[Setting.save_settings.name] = file_name setting_dict[Setting.settings_file.name] = True if args.unsynced: app = ApplicationUnsynced(args) app.gather_frame_data() app.calibrate_time_reprojection(save_data=True) else: app = ApplicationSynced(args) app.gather_frame_data() app.run_calibration() return 0
def process_arguments(program_arguments_enum, program_help_description): argproc = ArgumentProcessor(program_arguments_enum) defaults = argproc.generate_defaults_dict() conf_parser = \ argproc.generate_parser(defaults, console_only=True, description= "Test stereo algorithms on two image files.") # ============== STORAGE/RETRIEVAL OF CONSOLE SETTINGS ===========================================# args, remaining_argv = conf_parser.parse_known_args() defaults[ArgumentProcessor.save_settings_name] = args.save_settings if args.settings_file: defaults[ArgumentProcessor.settings_file_name] = args.settings_file if os.path.isfile(args.settings_file): file_stream = open(args.settings_file, "r", encoding="utf-8") config_defaults = load(file_stream, Loader=Loader) file_stream.close() if config_defaults: for key, value in config_defaults.items(): defaults[key] = value else: if not args.save_settings: raise ValueError("Settings file not found at: {0:s}".format(args.settings_file)) parser = argproc.generate_parser(defaults, parents=[conf_parser]) args = parser.parse_args(remaining_argv) # process "special" setting values if args.settings_file and os.path.isfile(args.settings_file): for key in args.__dict__.keys(): if key in argproc.setting_file_location_args and args.__dict__[key] == \ Argument.setting_file_location_wildcard: args.__dict__[key] = os.path.dirname(args.settings_file) # save settings if prompted to do so if args.save_settings and args.settings_file: setting_dict = vars(args) file_stream = open(args.settings_file, "w", encoding="utf-8") file_name = setting_dict[ArgumentProcessor.save_settings_name] del setting_dict[ArgumentProcessor.save_settings_name] del setting_dict[ArgumentProcessor.settings_file_name] dump(setting_dict, file_stream, Dumper=Dumper, indent=3, default_flow_style=False) file_stream.close() setting_dict[ArgumentProcessor.save_settings_name] = file_name setting_dict[ArgumentProcessor.settings_file_name] = True return args