Python yaml 模块,safe_dump() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用yaml.safe_dump()

项目:steelscript-appresponse    作者:riverbed    | 项目源码 | 文件源码
def find_by_id(self, id_):
        """Return ServiceDef object of the given service.

        :param str id_: uri of the service definition
        """
        _, name, version = id_.rsplit('/', 2)

        rel_fname, abs_fname = self.get_fnames(name, version)

        if self.ss_dir.isfile(rel_fname):
            return ServiceDef.create_from_file(abs_fname)

        resp = self.connection.request(method='GET', path=id_)

        # Write the yaml file
        with open(abs_fname, 'w+') as f:
            yaml.safe_dump(resp.json(), f)

        return ServiceDef.create_from_file(abs_fname)
项目:tuning-box    作者:openstack    | 项目源码 | 文件源码
def format_output(output, format_):
    if format_ == 'plain':
        if output is None:
            return ''
        if isinstance(output, text_type):
            if text_type is str:
                return output
            else:
                return output.encode('utf-8')
        format_ = 'json'
        # numbers, booleans, lists and dicts will be represented as JSON
    if format_ == 'json':
        return json.dumps(output)
    if format_ == 'yaml':
        # Usage of safe_dump here is crucial since PyYAML emits
        # "!!python/unicode" objects from unicode strings by defaul
        return yaml.safe_dump(output, default_flow_style=False)
    raise RuntimeError("Unknown format '{}'".format(format_))
项目:kelctl    作者:kelproject    | 项目源码 | 文件源码
def update_release(channel):
    """
    Update release manifests.
    """
    if not os.path.exists("cluster.yml"):
        error("no cluster.yml found. Did you configure?")
    with open("cluster.yml") as fp:
        config = yaml.load(fp.read())
    if channel is None:
        channel = config["release"]["channel"]
    current_version = config["release"]["version"]
    configure.release(config, channel)
    if current_version == config["release"]["version"]:
        click.echo("No updates available for {} channel".format(channel))
        sys.exit(0)
    with open("cluster.yml", "w") as fp:
        fp.write(yaml.safe_dump(config, default_flow_style=False))
    click.echo("Updated config to {} in {} channel".format(config["release"]["version"], config["release"]["channel"]))
项目:fluffy    作者:m4ce    | 项目源码 | 文件源码
def save(self):
        """Persist the services to disk"""

        try:
            logger.debug("Backing up services")
            if os.path.exists(self._db):
                shutil.copyfile(self._db, "{}.bak".format(self._db))
        except Exception as e:
            logger.exception("Failed to backup services")
            return

        try:
            logger.debug("Saving services")
            with open(self._db, 'w') as f:
                f.write(yaml.safe_dump(dict(((k, v) for (k, v) in self._services.iteritems(
                ) if k not in self.reserved_services)), default_flow_style=False, explicit_start=True))
        except Exception as e:
            logger.exception("Failed to save services")
项目:fluffy    作者:m4ce    | 项目源码 | 文件源码
def save(self):
        """Persist the addressbook to disk"""

        try:
            logger.debug("Backing up addressbook")
            if os.path.exists(self._db):
                shutil.copyfile(self._db, "{}.bak".format(self._db))
        except Exception as e:
            logger.exception("Failed to backup addressbook")
            return

        try:
            logger.debug("Saving addressbook")
            with open(self._db, 'w') as f:
                f.write(yaml.safe_dump(dict(((k, v) for (k, v) in self._addressbook.iteritems(
                ) if k not in self.reserved_addresses)), default_flow_style=False, explicit_start=True))
        except Exception as e:
            logger.exception("Failed to save addressbook")
项目:fluffy    作者:m4ce    | 项目源码 | 文件源码
def save(self):
        """Persist the checks to disk"""

        try:
            logger.debug("Backing up checks")
            if os.path.exists(self._db):
                shutil.copyfile(self._db, "{}.bak".format(self._db))
        except Exception as e:
            logger.exception("Failed to backup checks")
            return

        try:
            logger.debug("Saving checks")
            with open(self._db, 'w') as f:
                f.write(yaml.safe_dump(self._checks,
                                       default_flow_style=False, explicit_start=True))
        except:
            logger.exception("Failed to save checks")
项目:fluffy    作者:m4ce    | 项目源码 | 文件源码
def save(self):
        """Persist the chains to disk"""

        try:
            logger.debug("Backing up chains")
            if os.path.exists(self._db):
                shutil.copyfile(self._db, "{}.bak".format(self._db))
        except Exception as e:
            logger.exception("Failed to backup chains")
            return

        try:
            logger.debug("Saving chains")
            with open(self._db, 'w') as f:
                f.write(yaml.safe_dump(self._tables,
                                       default_flow_style=False, explicit_start=True))
        except:
            logger.exception("Failed to save chains")
项目:fluffy    作者:m4ce    | 项目源码 | 文件源码
def save(self):
        """Persist the interfaces to disk"""

        try:
            logger.debug("Backing up interfaces")
            if os.path.exists(self._db):
                shutil.copyfile(self._db, "{}.bak".format(self._db))
        except Exception as e:
            logger.exception("Failed to backup interfaces")
            return

        try:
            logger.debug("Saving interfaces")
            with open(self._db, 'w') as f:
                f.write(yaml.safe_dump(dict(((k, v) for (k, v) in self._interfaces.iteritems(
                ) if k not in self.reserved_interfaces)), default_flow_style=False, explicit_start=True))
        except Exception as e:
            logger.exception("Failed to save interfaces")
项目:zgtoolkits    作者:xuzhougeng    | 项目源码 | 文件源码
def _update_system_file(system_file, name, new_kvs):
    """Update the bcbio_system.yaml file with new resource information.
    """
    if os.path.exists(system_file):
        bak_file = system_file + ".bak%s" % datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
        shutil.copyfile(system_file, bak_file)
        with open(system_file) as in_handle:
            config = yaml.load(in_handle)
    else:
        utils.safe_makedir(os.path.dirname(system_file))
        config = {}
    new_rs = {}
    added = False
    for rname, r_kvs in config.get("resources", {}).items():
        if rname == name:
            for k, v in new_kvs.items():
                r_kvs[k] = v
            added = True
        new_rs[rname] = r_kvs
    if not added:
        new_rs[name] = new_kvs
    config["resources"] = new_rs
    with open(system_file, "w") as out_handle:
        yaml.safe_dump(config, out_handle, default_flow_style=False, allow_unicode=False)
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def _setUp(self):
        super(RealPolicyFixture, self)._setUp()
        self.policy_dir = self.useFixture(fixtures.TempDir())
        self.policy_file = os.path.join(self.policy_dir.path,
                                        'policy.yaml')
        # Load the fake_policy data and add the missing default rules.
        policy_rules = yaml.safe_load(fake_policy.policy_data)
        self.add_missing_default_rules(policy_rules)
        with open(self.policy_file, 'w') as f:
            yaml.safe_dump(policy_rules, f)

        policy_opts.set_defaults(CONF)
        self.useFixture(
            ConfPatcher(policy_dirs=[], policy_file=self.policy_file,
                        group='oslo_policy'))

        deckhand.policy.reset()
        deckhand.policy.init()
        self.addCleanup(deckhand.policy.reset)

        if self.verify:
            self._install_policy_verification_hook()
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def process_response(self, req, resp, resource):
        """Converts responses to ``application/x-yaml`` content type."""
        if resp.status != '204 No Content':
            resp.set_header('Content-Type', 'application/x-yaml')

        for attr in ('body', 'data'):
            if not hasattr(resp, attr):
                continue

            resp_attr = getattr(resp, attr)

            try:
                resp_attr = json.loads(resp_attr)
            except (TypeError, ValueError):
                pass

            if isinstance(resp_attr, dict):
                setattr(resp, attr, yaml.safe_dump(resp_attr))
            elif isinstance(resp_attr, (list, tuple)):
                setattr(resp, attr, yaml.safe_dump_all(resp_attr))
项目:pyrgg    作者:sepandhaghighi    | 项目源码 | 文件源码
def json_to_yaml(filename):
    '''
    This function convert json file to yaml file
    :param filename: filename
    :type filename: str
    :return: None
    '''
    try:
        file=open(filename+".json","r")
        json_data=json.loads(file.read())
        yaml_file = open(filename + ".yaml", "w")
        yaml.safe_dump(json_data,yaml_file,default_flow_style=False)
        file.close()
        yaml_file.close()
    except FileNotFoundError:
        print("[Error] Bad Input File")
项目:heat-agents    作者:openstack    | 项目源码 | 文件源码
def test_run_heat_config(self):
        with self.write_config_file(self.data) as config_file:
            env = os.environ.copy()
            env.update({
                'HEAT_DOCKER_COMPOSE_WORKING': self.docker_compose_dir.join(),
                'HEAT_SHELL_CONFIG': config_file.name
            })

            returncode, stdout, stderr = self.run_cmd(
                [self.heat_config_docker_compose_path], env)

            self.assertEqual(0, returncode, stderr)

            compose_yml = self.docker_compose_dir.join(
                'abcdef001/docker-compose.yml')
            with open(compose_yml) as f:
                    self.assertEqual(yaml.safe_dump(
                        self.data[0].get('config'),
                        default_flow_style=False), f.read())
项目:freckles    作者:makkus    | 项目源码 | 文件源码
def output(python_object, format="raw", pager=False):
    if format == 'yaml':
        output_string = yaml.safe_dump(python_object, default_flow_style=False, encoding='utf-8', allow_unicode=True)
    elif format == 'json':
        output_string = json.dumps(python_object, sort_keys=4, indent=4)
    elif format == 'raw':
        output_string = str(python_object)
    elif format == 'pformat':
        output_string = pprint.pformat(python_object)
    else:
        raise Exception("No valid output format provided. Supported: 'yaml', 'json', 'raw', 'pformat'")

    if pager:
        click.echo_via_pager(output_string)
    else:
        click.echo(output_string)
项目:bark    作者:kylerbrown    | 项目源码 | 文件源码
def write_metadata(path, meta='.meta.yaml', **params):
    """Writes metadata for a dataset.

    Args:
        path (str): path to **dataset** (not meta file) whose metadata
            is to be written. If the meta file already exists, it will be
            overwritten.
        meta (str): suffix identifying the dataset's meta file
        **params: all other keyword arguments are treated as dataset attributes,
            and added to the meta file
    """
    if 'n_channels' in params:
        del params['n_channels']
    if 'n_samples' in params:
        del params['n_samples']
    if os.path.isdir(path):
        metafile = os.path.join(path, meta[1:])
    else:
        metafile = path + meta
    for k, v in params.items():
        if isinstance(v, (np.ndarray, np.generic)):
            params[k] = v.tolist()
    with codecs.open(metafile, 'w', encoding='utf-8') as yaml_file:
        yaml_file.write(yaml.safe_dump(params, default_flow_style=False))
项目:kolla-ansible    作者:openstack    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--old", help="old password file", required=True)
    parser.add_argument("--new", help="new password file", required=True)
    parser.add_argument("--final", help="merged password file", required=True)
    args = parser.parse_args()

    with open(args.old, "r") as old_file:
        old_passwords = yaml.safe_load(old_file)

    with open(args.new, "r") as new_file:
        new_passwords = yaml.safe_load(new_file)

    new_passwords.update(old_passwords)

    with open(args.final, "w") as destination:
        yaml.safe_dump(new_passwords, destination, default_flow_style=False)
项目:kolla-ansible    作者:openstack    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--old", help="old password file", required=True)
    parser.add_argument("--new", help="new password file", required=True)
    parser.add_argument("--final", help="merged password file", required=True)
    args = parser.parse_args()

    with open(args.old, "r") as old_file:
        old_passwords = yaml.safe_load(old_file)

    with open(args.new, "r") as new_file:
        new_passwords = yaml.safe_load(new_file)

    new_passwords.update(old_passwords)

    with open(args.final, "w") as destination:
        yaml.safe_dump(new_passwords, destination, default_flow_style=False)
项目:gru    作者:similarweb    | 项目源码 | 文件源码
def serialized(obj, status=200):
    fmt = get_serializer()
    if fmt == 'json':
        ser = json.dumps
        ct = 'application/json'
    elif fmt == 'yaml':
        ser = yaml.safe_dump
        ct = 'text/plain+yaml'  # For interop with browsers
    elif fmt is None:
        return None
    else:
        abort(404)
    data = ser(obj)
    resp = make_response(data, 200)
    resp.headers['Content-Type'] = ct
    return resp


# Authentication
项目:console    作者:laincloud    | 项目源码 | 文件源码
def base_update_meta(self, meta_version, force=False):
        try:
            meta_version = normalize_meta_version(meta_version)
        except Exception, e:
            raise InvalidMetaVersion(e)
        if meta_version == self.meta_version and not force:
            return 'meta_version is already latest'
        meta = self.fetch_meta(meta_version)
        if not isinstance(meta, dict):
            return None

        self.check_giturl(meta, update=True)
        meta['giturl'] = self.giturl

        self.meta = yaml.safe_dump(meta, default_style='"')
        self.meta_version = meta_version
        if self.appname != meta['appname']:
            raise InvalidLainYaml("appname dont match: %s" % meta)
        self.save()
        return 'meta updated'
项目:DevOps    作者:YoLoveLife    | 项目源码 | 文件源码
def _write_galaxy_install_info(self):
        """
        Writes a YAML-formatted file to the role's meta/ directory
        (named .galaxy_install_info) which contains some information
        we can use later for commands like 'list' and 'info'.
        """

        info = dict(
            version=self.version,
            install_date=datetime.datetime.utcnow().strftime("%c"),
        )
        info_path = os.path.join(self.path, self.META_INSTALL)
        with open(info_path, 'w+') as f:
            try:
                self._install_info = yaml.safe_dump(info, f)
            except:
                return False

        return True
项目:MDT    作者:cbclab    | 项目源码 | 文件源码
def update_write_config(config_file, update_dict):
    """Update a given configuration file with updated values.

    If the configuration file does not exist, a new one is created.

    Args:
        config_file (str): the location of the config file to update
        update_dict (dict): the items to update in the config file
    """
    if not os.path.exists(config_file):
        with open(config_file, 'a'):
            pass

    with open(config_file, 'r') as f:
        config_dict = yaml.safe_load(f.read()) or {}

    for key, value in update_dict.items():
        loader = get_section_loader(key)
        loader.update(config_dict, value)

    with open(config_file, 'w') as f:
        yaml.safe_dump(config_dict, f)
项目:vidcutter    作者:ozmartian    | 项目源码 | 文件源码
def toYAML(self, **options):
        """ Serializes this Munch to YAML, using `yaml.safe_dump()` if
            no `Dumper` is provided. See the PyYAML documentation for more info.

            >>> b = Munch(foo=['bar', Munch(lol=True)], hello=42)
            >>> import yaml
            >>> yaml.safe_dump(b, default_flow_style=True)
            '{foo: [bar, {lol: true}], hello: 42}\\n'
            >>> b.toYAML(default_flow_style=True)
            '{foo: [bar, {lol: true}], hello: 42}\\n'
            >>> yaml.dump(b, default_flow_style=True)
            '!munch.Munch {foo: [bar, !munch.Munch {lol: true}], hello: 42}\\n'
            >>> b.toYAML(Dumper=yaml.Dumper, default_flow_style=True)
            '!munch.Munch {foo: [bar, !munch.Munch {lol: true}], hello: 42}\\n'

        """
        opts = dict(indent=4, default_flow_style=False)
        opts.update(options)
        if 'Dumper' not in opts:
            return yaml.safe_dump(self, **opts)
        else:
            return yaml.dump(self, **opts)
项目:spinesible    作者:getspine    | 项目源码 | 文件源码
def write(self):
        ''' write to file '''
        if not self.filename:
            raise YeditException('Please specify a filename.')

        if self.backup and self.file_exists():
            shutil.copy(self.filename, self.filename + '.orig')

        tmp_filename = self.filename + '.yedit'
        try:
            with open(tmp_filename, 'w') as yfd:
                yml_dump = yaml.safe_dump(self.yaml_dict, default_flow_style=False)
                for line in yml_dump.strip().split('\n'):
                    if '{{' in line and '}}' in line:
                        yfd.write(line.replace("'{{", '"{{').replace("}}'", '}}"') + '\n')
                    else:
                        yfd.write(line + '\n')
        except Exception as err:
            raise YeditException(err.message)

        os.rename(tmp_filename, self.filename)

        return (True, self.yaml_dict)
项目:cloud-deploy-grid    作者:elodina    | 项目源码 | 文件源码
def generate_group_vars_all(self):
        path = 'result/{}/group_vars/all'.format(self.grid_name)
        variables = AutoDict()
        hosts_entries = AutoDict()
        with open('result/{}/infrastructure/terraform.tfstate'.format(
                self.grid_name), 'r') as json_file:
            json_data = json.load(json_file)
            for module in json_data['modules']:
                for resource, value in module['resources'].iteritems():
                    if value['type'] == 'google_compute_instance':
                        host = '{}.node.{}'.format(
                            value['primary']['attributes'][
                                'name'], self.grid_name)
                        ip = value['primary']['attributes']['network_interface.0.address']
                        hosts_entries['hosts'][str(host)] = str(ip)
        variables['hosts'] = json.dumps(hosts_entries['hosts'])
        variables['grid_name'] = self.current_grid.name
        variables['terminal_ip'] = self._nameserver()
        self._generate_template(path, variables)
        vars_json = json.loads(self.current_config.vars)
        vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
        with open(path, "a") as yaml_file:
            yaml_file.write(vars_yaml)
项目:cloud-deploy-grid    作者:elodina    | 项目源码 | 文件源码
def generate_group_vars_all(self):
        path = 'result/{}/group_vars/all'.format(self.grid_name)
        variables = AutoDict()
        hosts_entries = AutoDict()
        with open('result/{}/infrastructure/terraform.tfstate'.format(self.grid_name), 'r') as json_file:
            json_data = json.load(json_file)
            for module in json_data['modules']:
                for resource, value in module['resources'].iteritems():
                    if value['type'] == 'azure_instance':
                        host = '{}.node.{}'.format(value['primary']['attributes']['name'], self.grid_name)
                        ip = value['primary']['attributes']['ip_address']
                        hosts_entries['hosts'][str(host)] = str(ip)
        variables['hosts'] = json.dumps(hosts_entries['hosts'])
        variables['grid_name'] = self.current_grid.name
        variables['terminal_ip'] = self._nameserver()
        self._generate_template(path, variables)
        vars_json = json.loads(self.current_config.vars)
        vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
        with open(path, "a") as yaml_file:
            yaml_file.write(vars_yaml)
项目:cloud-deploy-grid    作者:elodina    | 项目源码 | 文件源码
def generate_group_vars_all(self):
        path = 'result/{}/group_vars/all'.format(self.grid_name)
        variables = AutoDict()
        hosts_entries = AutoDict()
        for group in self.current_groups:
            for ip in group.groupips.split(','):
                hostname = ip.replace('.','-')
                host = '{}.node.{}'.format(hostname, self.grid_name)
                hosts_entries['hosts'][str(host)] = str(ip)
            for ip in self.current_config.mastersips.split(','):
                hostname = ip.replace('.','-')
                host = '{}.node.{}'.format(hostname, self.grid_name)
                hosts_entries['hosts'][str(host)] = str(ip)
            terminal_ip = self.current_config.terminalips.split(',')[1]
            terminal_hostname = terminal_ip.replace('.','-')
            terminal_host = '{}.node.{}'.format(terminal_hostname, self.grid_name)
            hosts_entries['hosts'][str(terminal_host)] = str(terminal_ip)
        variables['hosts'] = json.dumps(hosts_entries['hosts'])
        variables['grid_name'] = self.current_grid.name
        variables['terminal_ip'] = self._nameserver()
        self._generate_template(path, variables)
        vars_json = json.loads(self.current_config.vars)
        vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
        with open(path, "a") as yaml_file:
            yaml_file.write(vars_yaml)
项目:cloud-deploy-grid    作者:elodina    | 项目源码 | 文件源码
def generate_group_vars_all(self):
        path = 'result/{}/group_vars/all'.format(self.grid_name)
        variables = AutoDict()
        hosts_entries = AutoDict()
        with open('result/{}/infrastructure/terraform.tfstate'.format(self.grid_name), 'r') as json_file:
            json_data = json.load(json_file)
            for module in json_data['modules']:
                for resource, value in module['resources'].iteritems():
                    if value['type'] == 'aws_instance':
                        hostname = value['primary']['attributes']['private_dns'].split('.')[0]
                        host = '{}.node.{}'.format(hostname, self.grid_name)
                        ip = value['primary']['attributes']['private_ip']
                        hosts_entries['hosts'][str(host)] = str(ip)
        variables['hosts'] = json.dumps(hosts_entries['hosts'])
        variables['grid_name'] = self.current_grid.name
        variables['terminal_ip'] = self._nameserver()
        self._generate_template(path, variables)
        vars_json = json.loads(self.current_config.vars)
        vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
        with open(path, "a") as yaml_file:
            yaml_file.write(vars_yaml)
项目:cloud-deploy-grid    作者:elodina    | 项目源码 | 文件源码
def generate_group_vars_all(self):
        path = 'result/{}/group_vars/all'.format(self.grid_name)
        variables = AutoDict()
        hosts_entries = AutoDict()
        with open('result/{}/infrastructure/terraform.tfstate'.format(
                self.grid_name), 'r') as json_file:
            json_data = json.load(json_file)
            for module in json_data['modules']:
                for resource, value in module['resources'].iteritems():
                    if value['type'] == 'google_compute_instance':
                        host = '{}.node.{}'.format(
                            value['primary']['attributes'][
                                'name'], self.grid_name)
                        ip = value['primary']['attributes']['network_interface.0.address']
                        hosts_entries['hosts'][str(host)] = str(ip)
        variables['hosts'] = json.dumps(hosts_entries['hosts'])
        variables['grid_name'] = self.current_grid.name
        self._generate_template(path, variables)
        vars_json = json.loads(self.current_config.vars)
        vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
        with open(path, "a") as yaml_file:
            yaml_file.write(vars_yaml)
项目:cloud-deploy-grid    作者:elodina    | 项目源码 | 文件源码
def generate_group_vars_all(self):
        path = 'result/{}/group_vars/all'.format(self.grid_name)
        variables = AutoDict()
        hosts_entries = AutoDict()
        with open('result/{}/infrastructure/terraform.tfstate'.format(
                self.grid_name), 'r') as json_file:
            json_data = json.load(json_file)
            for module in json_data['modules']:
                for resource, value in module['resources'].iteritems():
                    if value['type'] == 'azure_instance':
                        host = '{}.node.{}'.format(
                            value['primary']['attributes'][
                                'name'], self.grid_name)
                        ip = value['primary']['attributes']['ip_address']
                        hosts_entries['hosts'][str(host)] = str(ip)
        variables['hosts'] = json.dumps(hosts_entries['hosts'])
        variables['grid_name'] = self.current_grid.name
        self._generate_template(path, variables)
        vars_json = json.loads(self.current_config.vars)
        vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
        with open(path, "a") as yaml_file:
            yaml_file.write(vars_yaml)
项目:cloud-deploy-grid    作者:elodina    | 项目源码 | 文件源码
def generate_group_vars_all(self):
        path = 'result/{}/group_vars/all'.format(self.grid_name)
        variables = AutoDict()
        hosts_entries = AutoDict()
        for group in self.current_groups:
            for ip in group.groupips.split(','):
                hostname = ip.replace('.','-')
                host = '{}.node.{}'.format(hostname, self.grid_name)
                hosts_entries['hosts'][str(host)] = str(ip)
            for ip in self.current_config.mastersips.split(','):
                hostname = ip.replace('.','-')
                host = '{}.node.{}'.format(hostname, self.grid_name)
                hosts_entries['hosts'][str(host)] = str(ip)
            terminal_ip = self.current_config.terminalips.split(',')[1]
            terminal_hostname = terminal_ip.replace('.','-')
            terminal_host = '{}.node.{}'.format(terminal_hostname, self.grid_name)
            hosts_entries['hosts'][str(terminal_host)] = str(terminal_ip)
        variables['hosts'] = json.dumps(hosts_entries['hosts'])
        variables['grid_name'] = self.current_grid.name
        self._generate_template(path, variables)
        vars_json = json.loads(self.current_config.vars)
        vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
        with open(path, "a") as yaml_file:
            yaml_file.write(vars_yaml)
项目:cloud-deploy-grid    作者:elodina    | 项目源码 | 文件源码
def generate_group_vars_all(self):
        path = 'result/{}/group_vars/all'.format(self.grid_name)
        variables = AutoDict()
        hosts_entries = AutoDict()
        with open('result/{}/infrastructure/terraform.tfstate'.format(
                self.grid_name), 'r') as json_file:
            json_data = json.load(json_file)
            for module in json_data['modules']:
                for resource, value in module['resources'].iteritems():
                    if value['type'] == 'aws_instance':
                        hostname = value['primary']['attributes'][
                            'private_dns'].split('.')[0]
                        host = '{}.node.{}'.format(hostname, self.grid_name)
                        ip = value['primary']['attributes']['private_ip']
                        hosts_entries['hosts'][str(host)] = str(ip)
        variables['hosts'] = json.dumps(hosts_entries['hosts'])
        variables['grid_name'] = self.current_grid.name
        self._generate_template(path, variables)
        vars_json = json.loads(self.current_config.vars)
        vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
        with open(path, "a") as yaml_file:
            yaml_file.write(vars_yaml)
项目:cloud-deploy-grid    作者:elodina    | 项目源码 | 文件源码
def generate_group_vars_all(self):
        path = 'result/{}/group_vars/all'.format(self.grid_name)
        variables = AutoDict()
        hosts_entries = AutoDict()
        with open('result/{}/infrastructure/terraform.tfstate'.format(
                self.grid_name), 'r') as json_file:
            json_data = json.load(json_file)
            for module in json_data['modules']:
                for resource, value in module['resources'].iteritems():
                    if value['type'] == 'openstack_compute_instance_v2':
                        host = '{}.node.{}'.format(
                            value['primary']['attributes'][
                                'name'], self.grid_name)
                        ip = value['primary']['attributes']['network.0.fixed_ip_v4']
                        hosts_entries['hosts'][str(host)] = str(ip)
        variables['hosts'] = json.dumps(hosts_entries['hosts'])
        variables['grid_name'] = self.current_grid.name
        self._generate_template(path, variables)
        vars_json = json.loads(self.current_config.vars)
        vars_yaml = yaml.safe_dump(vars_json, default_flow_style=False)
        with open(path, "a") as yaml_file:
            yaml_file.write(vars_yaml)
项目:GraphDash    作者:AmadeusITGroup    | 项目源码 | 文件源码
def export_conf(conf, conf_file, exclude=None):
    """Export configuration file."""
    exclude = set() if exclude is None else set(exclude)

    if not conf_file:
        print('( ) No configuration file provided')
        return

    if op.isfile(conf_file):
        print('(!) File "{0}" exists, skipping export'.format(conf_file))
        return

    with open(conf_file, 'w') as f:
        # The dict() is also to get rid of the SemiFrozenDict
        dumped = dict(item for item in conf.items()
                      if item[0] not in exclude)

        f.write(yaml.safe_dump(dumped, default_flow_style=False, allow_unicode=True))

    print('( ) File "{0}" created with configuration'.format(conf_file))
项目:ansible-role-tripleo-ssl    作者:redhat-openstack    | 项目源码 | 文件源码
def create_enable_file(certpem, keypem, source_dir, dest_dir, tht_release):
    output_dict = _open_yaml("{}environments/enable-tls.yaml".format(source_dir))

    if tht_release not in ['master', 'newton']:
        for key in output_dict["parameter_defaults"]["EndpointMap"]:
            if output_dict["parameter_defaults"]["EndpointMap"][key]["host"] == "CLOUDNAME":
                output_dict["parameter_defaults"]["EndpointMap"][key]["host"] = "IP_ADDRESS"

    output_dict["parameter_defaults"]["SSLCertificate"] = certpem
    output_dict["parameter_defaults"]["SSLKey"] = keypem

    output_dict["resource_registry"]["OS::TripleO::NodeTLSData"] = \
        "{}/puppet/extraconfig/tls/tls-cert-inject.yaml".format(source_dir)

    with open("{}enable-tls.yaml".format(dest_dir), "w") as stream:
        yaml.safe_dump(output_dict, stream, default_style='|')
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def relation_set(relation_id=None, relation_settings=None, **kwargs):
    """Set relation information for the current unit"""
    relation_settings = relation_settings if relation_settings else {}
    relation_cmd_line = ['relation-set']
    accepts_file = "--file" in subprocess.check_output(
        relation_cmd_line + ["--help"], universal_newlines=True)
    if relation_id is not None:
        relation_cmd_line.extend(('-r', relation_id))
    settings = relation_settings.copy()
    settings.update(kwargs)
    for key, value in settings.items():
        # Force value to be a string: it always should, but some call
        # sites pass in things like dicts or numbers.
        if value is not None:
            settings[key] = "{}".format(value)
    if accepts_file:
        # --file was introduced in Juju 1.23.2. Use it by default if
        # available, since otherwise we'll break if the relation data is
        # too big. Ideally we should tell relation-set to read the data from
        # stdin, but that feature is broken in 1.23.2: Bug #1454678.
        with tempfile.NamedTemporaryFile(delete=False) as settings_file:
            settings_file.write(yaml.safe_dump(settings).encode("utf-8"))
        subprocess.check_call(
            relation_cmd_line + ["--file", settings_file.name])
        os.remove(settings_file.name)
    else:
        for key, value in settings.items():
            if value is None:
                relation_cmd_line.append('{}='.format(key))
            else:
                relation_cmd_line.append('{}={}'.format(key, value))
        subprocess.check_call(relation_cmd_line)
    # Flush cache of any relation-gets for local unit
    flush(local_unit())
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def relation_set(relation_id=None, relation_settings=None, **kwargs):
    """Set relation information for the current unit"""
    relation_settings = relation_settings if relation_settings else {}
    relation_cmd_line = ['relation-set']
    accepts_file = "--file" in subprocess.check_output(
        relation_cmd_line + ["--help"], universal_newlines=True)
    if relation_id is not None:
        relation_cmd_line.extend(('-r', relation_id))
    settings = relation_settings.copy()
    settings.update(kwargs)
    for key, value in settings.items():
        # Force value to be a string: it always should, but some call
        # sites pass in things like dicts or numbers.
        if value is not None:
            settings[key] = "{}".format(value)
    if accepts_file:
        # --file was introduced in Juju 1.23.2. Use it by default if
        # available, since otherwise we'll break if the relation data is
        # too big. Ideally we should tell relation-set to read the data from
        # stdin, but that feature is broken in 1.23.2: Bug #1454678.
        with tempfile.NamedTemporaryFile(delete=False) as settings_file:
            settings_file.write(yaml.safe_dump(settings).encode("utf-8"))
        subprocess.check_call(
            relation_cmd_line + ["--file", settings_file.name])
        os.remove(settings_file.name)
    else:
        for key, value in settings.items():
            if value is None:
                relation_cmd_line.append('{}='.format(key))
            else:
                relation_cmd_line.append('{}={}'.format(key, value))
        subprocess.check_call(relation_cmd_line)
    # Flush cache of any relation-gets for local unit
    flush(local_unit())
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def relation_set(relation_id=None, relation_settings=None, **kwargs):
    """Set relation information for the current unit"""
    relation_settings = relation_settings if relation_settings else {}
    relation_cmd_line = ['relation-set']
    accepts_file = "--file" in subprocess.check_output(
        relation_cmd_line + ["--help"], universal_newlines=True)
    if relation_id is not None:
        relation_cmd_line.extend(('-r', relation_id))
    settings = relation_settings.copy()
    settings.update(kwargs)
    for key, value in settings.items():
        # Force value to be a string: it always should, but some call
        # sites pass in things like dicts or numbers.
        if value is not None:
            settings[key] = "{}".format(value)
    if accepts_file:
        # --file was introduced in Juju 1.23.2. Use it by default if
        # available, since otherwise we'll break if the relation data is
        # too big. Ideally we should tell relation-set to read the data from
        # stdin, but that feature is broken in 1.23.2: Bug #1454678.
        with tempfile.NamedTemporaryFile(delete=False) as settings_file:
            settings_file.write(yaml.safe_dump(settings).encode("utf-8"))
        subprocess.check_call(
            relation_cmd_line + ["--file", settings_file.name])
        os.remove(settings_file.name)
    else:
        for key, value in settings.items():
            if value is None:
                relation_cmd_line.append('{}='.format(key))
            else:
                relation_cmd_line.append('{}={}'.format(key, value))
        subprocess.check_call(relation_cmd_line)
    # Flush cache of any relation-gets for local unit
    flush(local_unit())
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def yaml(self, output):
        """Output data in YAML format"""
        import yaml
        yaml.safe_dump(output, self.outfile)
项目:cbas    作者:ImmobilienScout24    | 项目源码 | 文件源码
def yaml_format(data):
        return yaml.safe_dump(dict(data), default_flow_style=False, explicit_start=True, explicit_end=True)
项目:fastscore-cli    作者:opendatagroup    | 项目源码 | 文件源码
def show(connect, **kwargs):
    config = connect.get_config()
    if config:
        s = yaml.safe_dump(config, encoding='utf-8', allow_unicode=True)
        sys.stdout.write(s)
    else:
        print "FastScore not configured (use 'fastscore config set')"
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def dump_yaml(tables, outdir, outfilename):
    stream = file(os.path.join(outdir, outfilename+'.yaml'), 'wb')
    yaml.safe_dump(tables, stream, allow_unicode=True)    
    stream.close()    

#
# Process files listed on command line, or all .xls files in current dir if no
# args given
#
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def relation_set(relation_id=None, relation_settings=None, **kwargs):
    """Set relation information for the current unit"""
    relation_settings = relation_settings if relation_settings else {}
    relation_cmd_line = ['relation-set']
    accepts_file = "--file" in subprocess.check_output(
        relation_cmd_line + ["--help"], universal_newlines=True)
    if relation_id is not None:
        relation_cmd_line.extend(('-r', relation_id))
    settings = relation_settings.copy()
    settings.update(kwargs)
    for key, value in settings.items():
        # Force value to be a string: it always should, but some call
        # sites pass in things like dicts or numbers.
        if value is not None:
            settings[key] = "{}".format(value)
    if accepts_file:
        # --file was introduced in Juju 1.23.2. Use it by default if
        # available, since otherwise we'll break if the relation data is
        # too big. Ideally we should tell relation-set to read the data from
        # stdin, but that feature is broken in 1.23.2: Bug #1454678.
        with tempfile.NamedTemporaryFile(delete=False) as settings_file:
            settings_file.write(yaml.safe_dump(settings).encode("utf-8"))
        subprocess.check_call(
            relation_cmd_line + ["--file", settings_file.name])
        os.remove(settings_file.name)
    else:
        for key, value in settings.items():
            if value is None:
                relation_cmd_line.append('{}='.format(key))
            else:
                relation_cmd_line.append('{}={}'.format(key, value))
        subprocess.check_call(relation_cmd_line)
    # Flush cache of any relation-gets for local unit
    flush(local_unit())
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def relation_set(relation_id=None, relation_settings=None, **kwargs):
    """Set relation information for the current unit"""
    relation_settings = relation_settings if relation_settings else {}
    relation_cmd_line = ['relation-set']
    accepts_file = "--file" in subprocess.check_output(
        relation_cmd_line + ["--help"], universal_newlines=True)
    if relation_id is not None:
        relation_cmd_line.extend(('-r', relation_id))
    settings = relation_settings.copy()
    settings.update(kwargs)
    for key, value in settings.items():
        # Force value to be a string: it always should, but some call
        # sites pass in things like dicts or numbers.
        if value is not None:
            settings[key] = "{}".format(value)
    if accepts_file:
        # --file was introduced in Juju 1.23.2. Use it by default if
        # available, since otherwise we'll break if the relation data is
        # too big. Ideally we should tell relation-set to read the data from
        # stdin, but that feature is broken in 1.23.2: Bug #1454678.
        with tempfile.NamedTemporaryFile(delete=False) as settings_file:
            settings_file.write(yaml.safe_dump(settings).encode("utf-8"))
        subprocess.check_call(
            relation_cmd_line + ["--file", settings_file.name])
        os.remove(settings_file.name)
    else:
        for key, value in settings.items():
            if value is None:
                relation_cmd_line.append('{}='.format(key))
            else:
                relation_cmd_line.append('{}={}'.format(key, value))
        subprocess.check_call(relation_cmd_line)
    # Flush cache of any relation-gets for local unit
    flush(local_unit())
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def yaml(self, output):
        """Output data in YAML format"""
        import yaml
        yaml.safe_dump(output, self.outfile)
项目:Magic-Spoiler    作者:Cockatrice    | 项目源码 | 文件源码
def save_errorlog(errorlog):
    with open('out/errors.yml', 'w') as outfile:
        yaml.safe_dump(errorlog, outfile, default_flow_style=False)
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def append_config(file_name, p_domain, u_domain):
    """ Append values into a yaml file  """
    with open(file_name) as yfile:
        doc = yaml.load(yfile)
    doc['credentials']['abot-epc']['abot-epc']['project-domain-name'] = (
        p_domain)
    doc['credentials']['abot-epc']['abot-epc']['user-domain-name'] = (
        u_domain)

    with open(file_name, 'w') as yfile:
        yaml.safe_dump(doc, yfile, default_flow_style=False)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def relation_set(relation_id=None, relation_settings=None, **kwargs):
    """Set relation information for the current unit"""
    relation_settings = relation_settings if relation_settings else {}
    relation_cmd_line = ['relation-set']
    accepts_file = "--file" in subprocess.check_output(
        relation_cmd_line + ["--help"], universal_newlines=True)
    if relation_id is not None:
        relation_cmd_line.extend(('-r', relation_id))
    settings = relation_settings.copy()
    settings.update(kwargs)
    for key, value in settings.items():
        # Force value to be a string: it always should, but some call
        # sites pass in things like dicts or numbers.
        if value is not None:
            settings[key] = "{}".format(value)
    if accepts_file:
        # --file was introduced in Juju 1.23.2. Use it by default if
        # available, since otherwise we'll break if the relation data is
        # too big. Ideally we should tell relation-set to read the data from
        # stdin, but that feature is broken in 1.23.2: Bug #1454678.
        with tempfile.NamedTemporaryFile(delete=False) as settings_file:
            settings_file.write(yaml.safe_dump(settings).encode("utf-8"))
        subprocess.check_call(
            relation_cmd_line + ["--file", settings_file.name])
        os.remove(settings_file.name)
    else:
        for key, value in settings.items():
            if value is None:
                relation_cmd_line.append('{}='.format(key))
            else:
                relation_cmd_line.append('{}={}'.format(key, value))
        subprocess.check_call(relation_cmd_line)
    # Flush cache of any relation-gets for local unit
    flush(local_unit())
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def yaml(self, output):
        """Output data in YAML format"""
        import yaml
        yaml.safe_dump(output, self.outfile)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def relation_set(relation_id=None, relation_settings=None, **kwargs):
    """Set relation information for the current unit"""
    relation_settings = relation_settings if relation_settings else {}
    relation_cmd_line = ['relation-set']
    accepts_file = "--file" in subprocess.check_output(
        relation_cmd_line + ["--help"], universal_newlines=True)
    if relation_id is not None:
        relation_cmd_line.extend(('-r', relation_id))
    settings = relation_settings.copy()
    settings.update(kwargs)
    for key, value in settings.items():
        # Force value to be a string: it always should, but some call
        # sites pass in things like dicts or numbers.
        if value is not None:
            settings[key] = "{}".format(value)
    if accepts_file:
        # --file was introduced in Juju 1.23.2. Use it by default if
        # available, since otherwise we'll break if the relation data is
        # too big. Ideally we should tell relation-set to read the data from
        # stdin, but that feature is broken in 1.23.2: Bug #1454678.
        with tempfile.NamedTemporaryFile(delete=False) as settings_file:
            settings_file.write(yaml.safe_dump(settings).encode("utf-8"))
        subprocess.check_call(
            relation_cmd_line + ["--file", settings_file.name])
        os.remove(settings_file.name)
    else:
        for key, value in settings.items():
            if value is None:
                relation_cmd_line.append('{}='.format(key))
            else:
                relation_cmd_line.append('{}={}'.format(key, value))
        subprocess.check_call(relation_cmd_line)
    # Flush cache of any relation-gets for local unit
    flush(local_unit())
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def yaml(self, output):
        """Output data in YAML format"""
        import yaml
        yaml.safe_dump(output, self.outfile)