Python yaml 模块,dump_all() 实例源码

我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用yaml.dump_all()

项目:holo    作者:TheEnigmaBlade    | 项目源码 | 文件源码
def save_current_file():
    print("Saving current file: {}".format(current_file))
    def order_dict(d):
        return OrderedDict([
            ("title", d["title"]),
            ("type", d["type"]),
            ("has_source", d["has_source"]),
            ("info", OrderedDict([
                (key, d["info"][key] if key in d["info"] else "") for key in info_keys
            ])),
            ("streams", OrderedDict([
                (key, d["streams"][key] if key in d["streams"] else "") for key in stream_keys
            ]))
        ])

    try:
        sorted_docs = [order_dict(doc) for doc in current_docs]
        with open(current_file, "w", encoding="UTF-8") as f:
            yaml.dump_all(sorted_docs, f, default_flow_style=False, indent=4, allow_unicode=True)
    except:
        from traceback import print_exc
        print_exc()
        return False
    return True
项目:analytics-platform-ops    作者:ministryofjustice    | 项目源码 | 文件源码
def dump( data, dst=unicode, safe=False,
        force_embed=False, vspacing=None, string_val_style=None, **pyyaml_kws ):
    buff = io.BytesIO()
    Dumper = PrettyYAMLDumper if safe else UnsafePrettyYAMLDumper
    Dumper = ft.partial(Dumper, force_embed=force_embed, string_val_style=string_val_style)
    yaml.dump_all( [data], buff, Dumper=Dumper,
        default_flow_style=False, allow_unicode=True, encoding='utf-8', **pyyaml_kws )

    if vspacing is not None:
        dump_add_vspacing(buff, vspacing)

    buff = buff.getvalue()
    if dst is bytes: return buff
    elif dst is unicode: return buff.decode('utf-8')
    else:
        try: dst.write(b'') # tests if dst is unicode- or bytestream
        except: dst.write(buff.decode('utf-8'))
        else: dst.write(buff)
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def test_represent_scapy(self):
        data = IP() / UDP()
        yaml.dump_all([data], Dumper=OSafeDumper)
项目:piss    作者:AOSC-Dev    | 项目源码 | 文件源码
def generate_config(args):
    setup_yaml()
    if args.existing:
        existing = next(yaml.safe_load_all(open(args.existing, 'r', encoding='utf-8')))
    else:
        existing = None
    cfg, failed = chores.generate_chore_config(args.db, args.bookmark, existing)
    with open(args.output, 'w', encoding='utf-8') as f:
        yaml.dump_all((cfg, failed), f, default_flow_style=False)
    logging.info('Done.')
项目:sesame-paste-noodle    作者:aissehust    | 项目源码 | 文件源码
def save(self, ostream):
        """
        Save model to the stream.
        """
        allLayer = []
        for layer in self.nextLayer():
            allLayer.append(layer)
        yaml.dump_all(allLayer, ostream)
项目:fuel-ccp-tests    作者:openstack    | 项目源码 | 文件源码
def write_content(self, content=None):
        if content:
            self.content = content
        self.__documents[self.__document_id] = self.content

        def representer(dumper, data):
            """Represents a dict key started with '!' as a YAML tag

            Assumes that there is only one !tag in the dict at the
            current indent.

            Python object:
            {"!unknown_tag": ["some content", ]}

            Resulting yaml:
            !unknown_tag
            - some content
            """
            key = data.keys()[0]
            if key.startswith("!"):
                value = data[key]
                if type(value) is dict:
                    node = dumper.represent_mapping(key, value)
                elif type(value) is list:
                    node = dumper.represent_sequence(key, value)
                else:
                    node = dumper.represent_scalar(key, value)
            else:
                node = dumper.represent_mapping(u'tag:yaml.org,2002:map', data)
            return node

        yaml.add_representer(dict, representer)
        with self.__get_file("w") as file_obj:
            yaml.dump_all(self.__documents, file_obj,
                          default_flow_style=self.default_flow_style,
                          default_style=self.default_style)
项目:holo    作者:TheEnigmaBlade    | 项目源码 | 文件源码
def create_season_config(config, db, output_file):
    info("Checking for new shows")
    shows = _get_primary_source_shows(config)

    debug("Outputting new shows")
    with open(output_file, "w", encoding="utf-8") as f:
        yaml.dump_all(shows, f, explicit_start=True, default_flow_style=False)
项目:promenade    作者:att-comdev    | 项目源码 | 文件源码
def _write(output_dir, docs):
    with open(os.path.join(output_dir, 'certificates.yaml'), 'w') as f:
        # Don't use safe_dump_all so we can block format certificate data.
        yaml.dump_all(
            docs,
            stream=f,
            default_flow_style=False,
            explicit_start=True,
            indent=2)
项目:MUBench    作者:stg-tud    | 项目源码 | 文件源码
def write_yamls(data: List[Dict], file: str = None):
    return __write_yaml(data, yaml.dump_all, file)
项目:tcp-qa    作者:Mirantis    | 项目源码 | 文件源码
def write_content(self, content=None):
        if content:
            self.content = content
        self.__documents[self.__document_id] = self.content

        def representer(dumper, data):
            """Represents a dict key started with '!' as a YAML tag

            Assumes that there is only one !tag in the dict at the
            current indent.

            Python object:
            {"!unknown_tag": ["some content", ]}

            Resulting yaml:
            !unknown_tag
            - some content
            """
            key = data.keys()[0]
            if key.startswith("!"):
                value = data[key]
                if type(value) is dict:
                    node = dumper.represent_mapping(key, value)
                elif type(value) is list:
                    node = dumper.represent_sequence(key, value)
                else:
                    node = dumper.represent_scalar(key, value)
            else:
                node = dumper.represent_mapping(u'tag:yaml.org,2002:map', data)
            return node

        yaml.add_representer(dict, representer)
        with self.__get_file("w") as file_obj:
            yaml.dump_all(self.__documents, file_obj,
                          default_flow_style=self.default_flow_style,
                          default_style=self.default_style)
项目:picasso    作者:jungmannlab    | 项目源码 | 文件源码
def save_info(path, info, default_flow_style=False):
    with open(path, 'w') as file:
        _yaml.dump_all(info, file, default_flow_style=default_flow_style)
项目:restcli    作者:dustinrohde    | 项目源码 | 文件源码
def dump(data, stream=None, safe=False, many=False, **kwargs):
    kwargs.setdefault('default_flow_style', False)
    Dumper = SafeCustomDumper if safe else CustomDumper
    if not many:
        data = [data]
    return yaml.dump_all(data, stream, Dumper, **kwargs)
项目:rvmi-rekall    作者:fireeye    | 项目源码 | 文件源码
def safe_dump(data, **kwargs):
    kwargs["default_flow_style"] = False
    return yaml.dump_all(
        [data], None, Dumper=PrettyPrinterDumper, **kwargs)
项目:ooniprobe-debian    作者:TheTorProject    | 项目源码 | 文件源码
def safe_dump(data, stream=None, **kw):
    """
    Safely dump to a yaml file the specified data.
    """
    return yaml.dump_all([data], stream, Dumper=OSafeDumper, **kw)
项目:yq    作者:kislyuk    | 项目源码 | 文件源码
def main(args=None):
    args, jq_args = parser.parse_known_args(args=args)
    if sys.stdin.isatty() and not args.files:
        return parser.print_help()

    try:
        # Note: universal_newlines is just a way to induce subprocess to make stdin a text buffer and encode it for us
        jq = subprocess.Popen(["jq"] + jq_args + [args.jq_filter],
                              stdin=subprocess.PIPE,
                              stdout=subprocess.PIPE if args.yaml_output else None,
                              universal_newlines=True)
    except OSError as e:
        msg = "yq: Error starting jq: {}: {}. Is jq installed and available on PATH?"
        parser.exit(msg.format(type(e).__name__, e))

    try:
        input_streams = args.files if args.files else [sys.stdin]
        if args.yaml_output:
            # TODO: enable true streaming in this branch (with asyncio, asyncproc, a multi-shot variant of
            # subprocess.Popen._communicate, etc.)
            # See https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
            input_docs = []
            for input_stream in input_streams:
                input_docs.extend(yaml.load_all(input_stream, Loader=OrderedLoader))
            input_payload = "\n".join(json.dumps(doc, cls=JSONDateTimeEncoder) for doc in input_docs)
            jq_out, jq_err = jq.communicate(input_payload)
            json_decoder = json.JSONDecoder(object_pairs_hook=OrderedDict)
            yaml.dump_all(decode_docs(jq_out, json_decoder), stream=sys.stdout, Dumper=OrderedDumper, width=args.width,
                          allow_unicode=True, default_flow_style=False)
        else:
            for input_stream in input_streams:
                for doc in yaml.load_all(input_stream, Loader=OrderedLoader):
                    json.dump(doc, jq.stdin, cls=JSONDateTimeEncoder)
                    jq.stdin.write("\n")
            jq.stdin.close()
            jq.wait()
        for input_stream in input_streams:
            input_stream.close()
        exit(jq.returncode)
    except Exception as e:
        parser.exit("yq: Error running jq: {}: {}.".format(type(e).__name__, e))