Python yaml 模块,safe_load_all() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用yaml.safe_load_all()

项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_list_cleartext_rendered_documents_insufficient_permissions(self):
        rules = {'deckhand:list_cleartext_documents': 'rule:admin_api',
                 'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        # Create a document for a bucket.
        documents_factory = factories.DocumentFactory(1, [1])
        payload = [documents_factory.gen_test({})[0]]
        resp = self.app.simulate_put(
            '/api/v1.0/buckets/mop/documents',
            headers={'Content-Type': 'application/x-yaml'},
            body=yaml.safe_dump_all(payload))
        self.assertEqual(200, resp.status_code)
        revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']

        # Verify that the created document was not returned.
        resp = self.app.simulate_get(
            '/api/v1.0/revisions/%s/rendered-documents' % revision_id,
            headers={'Content-Type': 'application/x-yaml'})
        self.assertEqual(403, resp.status_code)
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_revision_rollback_cleartext_except_forbidden(self):
        rules = {'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        # Create a revision so we have something to roll back to.
        secrets_factory = factories.DocumentSecretFactory()
        payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
        resp = self.app.simulate_put(
            '/api/v1.0/buckets/mop/documents',
            headers={'Content-Type': 'application/x-yaml'},
            body=yaml.safe_dump_all(payload))
        self.assertEqual(200, resp.status_code)
        revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']

        rules = {'deckhand:create_cleartext_documents': 'rule:admin_api'}
        self.policy.set_rules(rules)

        resp = self.app.simulate_post(
            '/api/v1.0/rollback/%s' % revision_id,
            headers={'Content-Type': 'application/x-yaml'})
        self.assertEqual(403, resp.status_code)
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_list_cleartext_revision_documents_insufficient_permissions(self):
        rules = {'deckhand:list_cleartext_documents': 'rule:admin_api',
                 'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        # Create a document for a bucket.
        secrets_factory = factories.DocumentSecretFactory()
        payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
        resp = self.app.simulate_put(
            '/api/v1.0/buckets/mop/documents',
            headers={'Content-Type': 'application/x-yaml'},
            body=yaml.safe_dump_all(payload))
        self.assertEqual(200, resp.status_code)
        revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']

        # Verify that the created document was not returned.
        resp = self.app.simulate_get(
            '/api/v1.0/revisions/%s/documents' % revision_id,
            headers={'Content-Type': 'application/x-yaml'})
        self.assertEqual(403, resp.status_code)
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_put_bucket(self):
        rules = {'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        documents_factory = factories.DocumentFactory(2, [1, 1])
        document_mapping = {
            "_GLOBAL_DATA_1_": {"data": {"a": {"x": 1, "y": 2}}},
            "_SITE_DATA_1_": {"data": {"a": {"x": 7, "z": 3}, "b": 4}},
            "_SITE_ACTIONS_1_": {
                "actions": [{"method": "merge", "path": "."}]}
        }
        payload = documents_factory.gen_test(document_mapping)

        resp = self.app.simulate_put(
            '/api/v1.0/buckets/mop/documents',
            headers={'Content-Type': 'application/x-yaml'},
            body=yaml.safe_dump_all(payload))
        self.assertEqual(200, resp.status_code)
        created_documents = list(yaml.safe_load_all(resp.text))
        self.assertEqual(3, len(created_documents))
        expected = sorted([(d['schema'], d['metadata']['name'])
                           for d in payload])
        actual = sorted([(d['schema'], d['metadata']['name'])
                         for d in created_documents])
        self.assertEqual(expected, actual)
项目:ambassador    作者:datawire    | 项目源码 | 文件源码
def load_yaml(self, filename, serialization, k8s=False):
        try:
            # XXX This is a bit of a hack -- yaml.safe_load_all returns a
            # generator, and if we don't use list() here, any exception
            # dealing with the actual object gets deferred             
            ocount = 1

            for obj in yaml.safe_load_all(serialization):
                if k8s:
                    self.prep_k8s(filename, ocount, obj)
                else:
                    self.objects_to_process.append((filename, ocount, obj))
                ocount += 1
        except Exception as e:
            # No sense letting one attribute with bad YAML take down the whole
            # gateway, so post the error but keep any objects we were able to 
            # parse before hitting the error.
            self.filename = filename
            self.ocount = ocount

            self.post_error(RichStatus.fromError("%s: could not parse YAML" % filename))
项目:piss    作者:AOSC-Dev    | 项目源码 | 文件源码
def run_update(args):
    logging.info('Getting updates...')
    db = init_db(args.db)
    cur = db.cursor()
    cfg = next(yaml.safe_load_all(open(args.chores, 'r', encoding='utf-8')))
    chores_avail = []
    tasks = sched.scheduler(time.time)
    for name, config in cfg.items():
        result = cur.execute('SELECT updated, last_result FROM chore_status WHERE name = ?', (name,)).fetchone()
        if result:
            result = chores.ChoreStatus(*result)
        chorename = config.pop('chore')
        chore = chores.CHORE_HANDLERS[chorename](name, status=result, **config)
        chores_avail.append((chorename, chore))
    try:
        while 1:
            for chorename, chore in chores_avail:
                tasks.enterabs(
                    chore.status.updated + args.keep * 60,
                    chores.CHORE_PRIO[chorename],
                    wrap_fetch, (chore, cur)
                )
            tasks.run()
            db.commit()
            if args.keep:
                logging.info('A round of updating completed.')
            else:
                break
    except KeyboardInterrupt:
        logging.warning('Interrupted.')
    finally:
        db.commit()
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def invoke(self):
        if not self.ctx.obj.get('api', False):
            documents = yaml.safe_load_all(open(self.filename).read())
            manifest_obj = Manifest(documents).get_manifest()
            obj_check = validate_armada_object(manifest_obj)
            doc_check = validate_armada_documents(documents)

            try:
                if doc_check and obj_check:
                    self.logger.info(
                        'Successfully validated: %s', self.filename)
            except Exception:
                raise Exception('Failed to validate: %s', self.filename)
        else:
            client = self.ctx.obj.get('CLIENT')
            with open(self.filename, 'r') as f:
                resp = client.post_validate(f.read())
                if resp.get('valid', False):
                    self.logger.info(
                        'Successfully validated: %s', self.filename)
                else:
                    self.logger.error("Failed to validate: %s", self.filename)
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def test_update_dictionary_valid(self):
        expected = "{}/templates/override-{}-expected.yaml".format(
            self.basepath, '01')
        merge = "{}/templates/override-{}.yaml".format(self.basepath, '01')

        with open(self.base_manifest) as f, open(expected) as e, open(
                merge) as m:
            merging_values = list(yaml.safe_load_all(m.read()))
            doc_obj = list(yaml.safe_load_all(f.read()))
            doc_path = ['chart', 'blog-1']
            ovr = Override(doc_obj)
            ovr.update_document(merging_values)
            ovr_doc = ovr.find_manifest_document(doc_path)
            expect_doc = list(yaml.load_all(e.read()))[0]

            self.assertEqual(ovr_doc, expect_doc)
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def load_schemas(self):
        self.v1_doc_schemas = dict()
        schema_dir = self._get_schema_dir()

        for schema_file in os.listdir(schema_dir):
            f = open(os.path.join(schema_dir, schema_file), 'r')
            for schema in yaml.safe_load_all(f):
                schema_for = schema['metadata']['name']
                if schema_for in self.v1_doc_schemas:
                    self.logger.warning(
                        "Duplicate document schemas found for document kind %s."
                        % schema_for)
                self.logger.debug(
                    "Loaded schema for document kind %s." % schema_for)
                self.v1_doc_schemas[schema_for] = schema
            f.close()
项目:drydock    作者:att-comdev    | 项目源码 | 文件源码
def load_schemas(self):
        self.v1_doc_schemas = dict()
        schema_dir = self._get_schema_dir()

        for schema_file in os.listdir(schema_dir):
            f = open(os.path.join(schema_dir, schema_file), 'r')
            for schema in yaml.safe_load_all(f):
                schema_for = schema['metadata']['name']
                if schema_for in self.v1_doc_schemas:
                    self.logger.warning(
                        "Duplicate document schemas found for document kind %s."
                        % schema_for)
                self.logger.debug(
                    "Loaded schema for document kind %s." % schema_for)
                self.v1_doc_schemas[schema_for] = schema.get('data')
            f.close()
项目:kuberdock-platform    作者:cloudlinux    | 项目源码 | 文件源码
def post(self, **params):
        user = self.get_current_user()
        data = params.get('data')
        if data is None:
            raise InsufficientData('No "data" provided')
        try:
            parsed_data = list(yaml.safe_load_all(data))
        except yaml.YAMLError as e:
            raise PredefinedAppExc.UnparseableTemplate(
                'Incorrect yaml, parsing failed: "{0}"'.format(str(e)))

        try:
            res = start_pod_from_yaml(parsed_data, user=user)
        except APIError as e:  # pass as is
            raise
        except Exception as e:
            raise PredefinedAppExc.InternalPredefinedAppError(
                details={'message': str(e)})
        send_event_to_user('pod:change', res, user.id)
        return res
项目:Magic-Spoiler    作者:Cockatrice    | 项目源码 | 文件源码
def load_file(input_file, lib_to_use):
    try:
        with open(input_file) as data_file:
            if lib_to_use == 'yaml':
                output_file = yaml.safe_load(data_file)
            elif lib_to_use == 'yaml_multi':
                output_file = []
                for doc in yaml.safe_load_all(data_file):
                    output_file.append(doc)
            return output_file
    except Exception as ex:
        print "Unable to load file: " + input_file + "\nException information:\n" + str(ex.args)
        sys.exit("Unable to load file: " + input_file)
项目:tuning-box    作者:openstack    | 项目源码 | 文件源码
def read_yaml(self):
        docs_gen = yaml.safe_load_all(self.app.stdin)
        doc = next(docs_gen)
        guard = object()
        if next(docs_gen, guard) is not guard:
            self.app.stderr.write("Warning: will use only first "
                                  "document from YAML stream")
        return doc
项目:tuning-box    作者:openstack    | 项目源码 | 文件源码
def get_value_to_set(self, parsed_args):
        type_ = parsed_args.type
        if type_ == 'null':
            return None
        elif type_ == 'bool':
            if parsed_args.value.lower() in ('1', 'true'):
                return True
            elif parsed_args.value.lower() in ('0', 'false'):
                return False
            else:
                raise Exception(
                    "Bad value for 'bool' type: '{}'. Should be one of '0', "
                    "'1', 'false', 'true'.".format(parsed_args.value))
        elif type_ == 'int':
            return int(parsed_args.value)
        elif type_ == 'str':
            return parsed_args.value
        elif type_ == 'json':
            return json.loads(parsed_args.value)
        elif type_ == 'yaml':
            return yaml.safe_load(parsed_args.value)
        elif type_ is None:
            if parsed_args.format == 'json':
                return json.load(self.app.stdin)
            elif parsed_args.format == 'yaml':
                docs_gen = yaml.safe_load_all(self.app.stdin)
                doc = next(docs_gen)
                guard = object()
                if next(docs_gen, guard) is not guard:
                    self.app.stderr.write("Warning: will use only first "
                                          "document from YAML stream")
                return doc
        assert False, "Shouldn't get here"
项目:Test_framework    作者:huilansame    | 项目源码 | 文件源码
def data(self):
        # ????????data???yaml????????????????
        if not self._data:
            with open(self.yamlf, 'rb') as f:
                self._data = list(yaml.safe_load_all(f))  # load???generator??list?????
        return self._data
项目:councillor-party    作者:carsonyl    | 项目源码 | 文件源码
def get_all_configs():
    return safe_load_all(open('config.yaml'))
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def _to_dict(self, body, many=False):
        """Convert YAML-formatted response body into dict or list.

        :param body: YAML-formatted response body to convert.
        :param many: Controls whether to return list or dict. If True, returns
            list, else dict. False by default.
        :rtype: dict or list
        """
        try:
            return (
                list(yaml.safe_load_all(body))
                    if many else yaml.safe_load(body)
            )
        except yaml.YAMLError:
            return None
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def loads(string):
        # NOTE: The simple approach to handling dictionary versus list response
        # bodies is to always parse the response body as a list and index into
        # the first element using [0] throughout the tests.
        return list(yaml.safe_load_all(string))
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_list_rendered_documents_exclude_abstract_documents(self):
        rules = {'deckhand:list_cleartext_documents': '@',
                 'deckhand:list_encrypted_documents': '@',
                 'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        # Create 2 docs: one concrete, one abstract.
        documents_factory = factories.DocumentFactory(2, [1, 1])
        payload = documents_factory.gen_test(
            {}, global_abstract=False, region_abstract=True)
        concrete_doc = payload[1]

        resp = self.app.simulate_put(
            '/api/v1.0/buckets/mop/documents',
            headers={'Content-Type': 'application/x-yaml'},
            body=yaml.safe_dump_all(payload))
        self.assertEqual(200, resp.status_code)
        revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']

        # Verify that the concrete document is returned, but not the abstract
        # one.
        resp = self.app.simulate_get(
            '/api/v1.0/revisions/%s/rendered-documents' % revision_id,
            headers={'Content-Type': 'application/x-yaml'})
        self.assertEqual(200, resp.status_code)

        rendered_documents = list(yaml.safe_load_all(resp.text))
        self.assertEqual(1, len(rendered_documents))
        is_abstract = rendered_documents[0]['metadata']['layeringDefinition'][
            'abstract']
        self.assertFalse(is_abstract)
        for key, value in concrete_doc.items():
            if isinstance(value, dict):
                self.assertDictContainsSubset(value,
                                              rendered_documents[0][key])
            else:
                self.assertEqual(value, rendered_documents[0][key])
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_list_rendered_documents_multiple_buckets(self):
        rules = {'deckhand:list_cleartext_documents': '@',
                 'deckhand:list_encrypted_documents': '@',
                 'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        documents_factory = factories.DocumentFactory(1, [1])

        for idx in range(2):
            payload = documents_factory.gen_test({})
            if idx == 0:
                # Pop off the first entry so that a conflicting layering
                # policy isn't created during the 1st iteration.
                payload.pop(0)
            resp = self.app.simulate_put(
                '/api/v1.0/buckets/%s/documents' % test_utils.rand_name(
                    'bucket'),
                headers={'Content-Type': 'application/x-yaml'},
                body=yaml.safe_dump_all(payload))
            self.assertEqual(200, resp.status_code)
        revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']

        resp = self.app.simulate_get(
            '/api/v1.0/revisions/%s/rendered-documents' % revision_id,
            headers={'Content-Type': 'application/x-yaml'})
        self.assertEqual(200, resp.status_code)
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_list_encrypted_rendered_documents_insufficient_permissions(self):
        rules = {'deckhand:list_cleartext_documents': '@',
                 'deckhand:list_encrypted_documents': 'rule:admin_api',
                 'deckhand:create_cleartext_documents': '@',
                 'deckhand:create_encrypted_documents': '@'}
        self.policy.set_rules(rules)

        # Create a document for a bucket.
        documents_factory = factories.DocumentFactory(1, [1])
        layering_policy = documents_factory.gen_test({})[0]
        secrets_factory = factories.DocumentSecretFactory()
        encrypted_document = secrets_factory.gen_test('Certificate',
                                                      'encrypted')
        payload = [layering_policy, encrypted_document]

        with mock.patch.object(buckets.BucketsResource, 'secrets_mgr',
                               autospec=True) as mock_secrets_mgr:
            mock_secrets_mgr.create.return_value = {
                'secret': payload[0]['data']}
            resp = self.app.simulate_put(
                '/api/v1.0/buckets/mop/documents',
                headers={'Content-Type': 'application/x-yaml'},
                body=yaml.safe_dump_all(payload))
        self.assertEqual(200, resp.status_code)
        revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']

        # Verify that the created document was not returned.
        resp = self.app.simulate_get(
            '/api/v1.0/revisions/%s/rendered-documents' % revision_id,
            headers={'Content-Type': 'application/x-yaml'})
        self.assertEqual(200, resp.status_code)
        self.assertEmpty(list(yaml.safe_load_all(resp.text)))
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_revision_rollback_encrypted_except_forbidden(self):
        rules = {'deckhand:create_encrypted_documents': '@',
                 'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        # Create a revision so we have something to roll back to.
        secrets_factory = factories.DocumentSecretFactory()
        payload = [secrets_factory.gen_test('Certificate', 'encrypted')]

        with mock.patch.object(buckets.BucketsResource, 'secrets_mgr',
                               autospec=True) as mock_secrets_mgr:
            mock_secrets_mgr.create.return_value = {
                'secret': payload[0]['data']}
            resp = self.app.simulate_put(
                '/api/v1.0/buckets/mop/documents',
                headers={'Content-Type': 'application/x-yaml'},
                body=yaml.safe_dump_all(payload))
        self.assertEqual(200, resp.status_code)
        revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']

        rules = {'deckhand:create_encrypted_documents': 'rule:admin_api',
                 'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        resp = self.app.simulate_post(
            '/api/v1.0/rollback/%s' % revision_id,
            headers={'Content-Type': 'application/x-yaml'})
        self.assertEqual(403, resp.status_code)
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def setUp(self):
        super(TestRevisionTagsController, self).setUp()
        rules = {'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        # Create a revision to tag.
        secrets_factory = factories.DocumentSecretFactory()
        payload = [secrets_factory.gen_test('Certificate', 'cleartext')]
        resp = self.app.simulate_put(
            '/api/v1.0/buckets/mop/documents',
            headers={'Content-Type': 'application/x-yaml'},
            body=yaml.safe_dump_all(payload))
        self.assertEqual(200, resp.status_code)
        self.revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_list_encrypted_revision_documents_insufficient_permissions(self):
        rules = {'deckhand:list_cleartext_documents': '@',
                 'deckhand:list_encrypted_documents': 'rule:admin_api',
                 'deckhand:create_cleartext_documents': '@',
                 'deckhand:create_encrypted_documents': '@'}
        self.policy.set_rules(rules)

        # Create a document for a bucket.
        secrets_factory = factories.DocumentSecretFactory()
        payload = [secrets_factory.gen_test('Certificate', 'encrypted')]
        with mock.patch.object(buckets.BucketsResource, 'secrets_mgr',
                               autospec=True) as mock_secrets_mgr:
            mock_secrets_mgr.create.return_value = {
                'secret': payload[0]['data']}
            resp = self.app.simulate_put(
                '/api/v1.0/buckets/mop/documents',
                headers={'Content-Type': 'application/x-yaml'},
                body=yaml.safe_dump_all(payload))
        self.assertEqual(200, resp.status_code)
        revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']

        # Verify that the created document was not returned.
        resp = self.app.simulate_get(
            '/api/v1.0/revisions/%s/documents' % revision_id,
            headers={'Content-Type': 'application/x-yaml'})
        self.assertEqual(200, resp.status_code)
        self.assertEmpty(list(yaml.safe_load_all(resp.text)))
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_list_revision_documents_sorting_metadata_name(self):
        rules = {'deckhand:list_cleartext_documents': '@',
                 'deckhand:list_encrypted_documents': '@',
                 'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        documents_factory = factories.DocumentFactory(2, [1, 1])
        documents = documents_factory.gen_test({})
        expected_names = ['bar', 'baz', 'foo']
        for idx in range(len(documents)):
            documents[idx]['metadata']['name'] = expected_names[idx]

        resp = self.app.simulate_put(
            '/api/v1.0/buckets/mop/documents',
            headers={'Content-Type': 'application/x-yaml'},
            body=yaml.safe_dump_all(documents))
        self.assertEqual(200, resp.status_code)
        revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']

        resp = self.app.simulate_get(
            '/api/v1.0/revisions/%s/documents' % revision_id,
            params={'sort': 'metadata.name'}, params_csv=False,
            headers={'Content-Type': 'application/x-yaml'})
        self.assertEqual(200, resp.status_code)
        retrieved_documents = list(yaml.safe_load_all(resp.text))

        self.assertEqual(3, len(retrieved_documents))
        self.assertEqual(expected_names,
                         [d['metadata']['name'] for d in retrieved_documents])
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_list_revision_documents_sorting_by_metadata_name_and_schema(self):
        rules = {'deckhand:list_cleartext_documents': '@',
                 'deckhand:list_encrypted_documents': '@',
                 'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        documents_factory = factories.DocumentFactory(2, [1, 1])
        documents = documents_factory.gen_test({})
        expected_names = ['foo', 'baz', 'bar']
        expected_schemas = ['deckhand/Certificate/v1',
                            'deckhand/Certificate/v1',
                            'deckhand/LayeringPolicy/v1']
        for idx in range(len(documents)):
            documents[idx]['metadata']['name'] = expected_names[idx]
            documents[idx]['schema'] = expected_schemas[idx]

        resp = self.app.simulate_put(
            '/api/v1.0/buckets/mop/documents',
            headers={'Content-Type': 'application/x-yaml'},
            body=yaml.safe_dump_all(documents))
        self.assertEqual(200, resp.status_code)
        revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']

        resp = self.app.simulate_get(
            '/api/v1.0/revisions/%s/documents' % revision_id,
            params={'sort': ['schema', 'metadata.name']}, params_csv=False,
            headers={'Content-Type': 'application/x-yaml'})
        self.assertEqual(200, resp.status_code)
        retrieved_documents = list(yaml.safe_load_all(resp.text))

        self.assertEqual(3, len(retrieved_documents))
        self.assertEqual(['baz', 'foo', 'bar'],
                         [d['metadata']['name'] for d in retrieved_documents])
        self.assertEqual(expected_schemas,
                         [d['schema'] for d in retrieved_documents])
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_list_revision_documents_sorting_by_schema(self):
        rules = {'deckhand:list_cleartext_documents': '@',
                 'deckhand:list_encrypted_documents': '@',
                 'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        documents_factory = factories.DocumentFactory(2, [1, 1])
        documents = documents_factory.gen_test({})
        expected_schemas = ['deckhand/Certificate/v1',
                            'deckhand/CertificateKey/v1',
                            'deckhand/LayeringPolicy/v1']
        for idx in range(len(documents)):
            documents[idx]['schema'] = expected_schemas[idx]

        resp = self.app.simulate_put(
            '/api/v1.0/buckets/mop/documents',
            headers={'Content-Type': 'application/x-yaml'},
            body=yaml.safe_dump_all(documents))
        self.assertEqual(200, resp.status_code)
        revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']

        resp = self.app.simulate_get(
            '/api/v1.0/revisions/%s/documents' % revision_id,
            params={'sort': 'schema'}, params_csv=False,
            headers={'Content-Type': 'application/x-yaml'})
        self.assertEqual(200, resp.status_code)
        retrieved_documents = list(yaml.safe_load_all(resp.text))

        self.assertEqual(3, len(retrieved_documents))
        self.assertEqual(expected_schemas,
                         [d['schema'] for d in retrieved_documents])
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def test_create_delete_then_recreate_document_in_different_bucket(self):
        """Ordiniarly creating a document with the same metadata.name/schema
        in a separate bucket raises an exception, but if we delete the document
        and re-create it in a different bucket this should be a success
        scenario.
        """
        rules = {'deckhand:create_cleartext_documents': '@'}
        self.policy.set_rules(rules)

        payload = factories.DocumentFactory(2, [1, 1]).gen_test({})
        bucket_name = test_utils.rand_name('bucket')
        alt_bucket_name = test_utils.rand_name('bucket')

        # Create the documents in the first bucket.
        resp = self.app.simulate_put(
            '/api/v1.0/buckets/%s/documents' % bucket_name,
            headers={'Content-Type': 'application/x-yaml'},
            body=yaml.safe_dump_all(payload))
        self.assertEqual(200, resp.status_code)
        documents = list(yaml.safe_load_all(resp.text))
        self.assertEqual(3, len(documents))
        self.assertEqual([bucket_name] * 3,
                         [d['status']['bucket'] for d in documents])

        # Delete the documents from the first bucket.
        resp = self.app.simulate_put(
            '/api/v1.0/buckets/%s/documents' % bucket_name,
            headers={'Content-Type': 'application/x-yaml'}, body=None)
        self.assertEqual(200, resp.status_code)

        # Re-create the documents in the second bucket.
        resp = self.app.simulate_put(
            '/api/v1.0/buckets/%s/documents' % alt_bucket_name,
            headers={'Content-Type': 'application/x-yaml'},
            body=yaml.safe_dump_all(payload))
        self.assertEqual(200, resp.status_code)
        documents = list(yaml.safe_load_all(resp.text))
        self.assertEqual(3, len(documents))
        self.assertEqual([alt_bucket_name] * 3,
                         [d['status']['bucket'] for d in documents])
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def _create_revision(self, payload=None):
        if not payload:
            documents_factory = factories.DocumentFactory(2, [1, 1])
            payload = documents_factory.gen_test({})
        resp = self.app.simulate_put(
            '/api/v1.0/buckets/mop/documents',
            headers={'Content-Type': 'application/x-yaml'},
            body=yaml.safe_dump_all(payload))
        self.assertEqual(200, resp.status_code)
        revision_id = list(yaml.safe_load_all(resp.text))[0]['status'][
            'revision']
        return revision_id
项目:deckhand    作者:att-comdev    | 项目源码 | 文件源码
def on_put(self, req, resp, bucket_name=None):
        document_data = req.stream.read(req.content_length or 0)
        try:
            documents = list(yaml.safe_load_all(document_data))
        except yaml.YAMLError as e:
            error_msg = ("Could not parse the document into YAML data. "
                         "Details: %s." % e)
            LOG.error(error_msg)
            raise falcon.HTTPBadRequest(description=six.text_type(e))

        # NOTE: Must validate documents before doing policy enforcement,
        # because we expect certain formatting of the documents while doing
        # policy enforcement. If any documents fail basic schema validaiton
        # raise an exception immediately.
        try:
            doc_validator = document_validation.DocumentValidation(documents)
            validations = doc_validator.validate_all()
        except (deckhand_errors.InvalidDocumentFormat,
                deckhand_errors.InvalidDocumentSchema) as e:
            LOG.exception(e.format_message())
            raise falcon.HTTPBadRequest(description=e.format_message())

        for document in documents:
            if document['metadata'].get('storagePolicy') == 'encrypted':
                policy.conditional_authorize(
                    'deckhand:create_encrypted_documents', req.context)
                break

        self._prepare_secret_documents(documents)

        created_documents = self._create_revision_documents(
            bucket_name, documents, validations)

        resp.body = self.view_builder.list(created_documents)
        resp.status = falcon.HTTP_200
项目:piss    作者:AOSC-Dev    | 项目源码 | 文件源码
def generate_config(args):
    setup_yaml()
    if args.existing:
        existing = next(yaml.safe_load_all(open(args.existing, 'r', encoding='utf-8')))
    else:
        existing = None
    cfg, failed = chores.generate_chore_config(args.db, args.bookmark, existing)
    with open(args.output, 'w', encoding='utf-8') as f:
        yaml.dump_all((cfg, failed), f, default_flow_style=False)
    logging.info('Done.')
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def invoke(self):

        if not self.ctx.obj.get('api', False):
            with open(self.filename) as f:
                armada = Armada(
                    list(yaml.safe_load_all(f.read())),
                    self.disable_update_pre,
                    self.disable_update_post,
                    self.enable_chart_cleanup,
                    self.dry_run,
                    self.set,
                    self.wait,
                    self.timeout,
                    self.tiller_host,
                    self.tiller_port,
                    self.values)

                resp = armada.sync()
                self.output(resp)
        else:
            query = {
                'disable_update_post': self.disable_update_post,
                'disable_update_pre': self.disable_update_pre,
                'dry_run': self.dry_run,
                'enable_chart_cleanup': self.enable_chart_cleanup,
                'tiller_host': self.tiller_host,
                'tiller_port': self.tiller_port,
                'timeout': self.timeout,
                'wait': self.wait
            }

            client = self.ctx.obj.get('CLIENT')

            with open(self.filename, 'r') as f:
                resp = client.post_apply(
                    manifest=f.read(), values=self.values, set=self.set,
                    query=query)
                self.output(resp.get('message'))
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def test_lint_armada_yaml_pass(self):
        template = '{}/templates/valid_armada_document.yaml'.format(
            self.basepath)
        document = yaml.safe_load_all(open(template).read())
        resp = lint.validate_armada_documents(document)
        self.assertTrue(resp)
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def test_lint_armada_manifest_no_groups(self):
        template_manifest = """
        schema: armada/Manifest/v1
        metadata:
            schema: metadata/Document/v1
            name: example-manifest
        data:
            release_prefix: example
        """
        document = yaml.safe_load_all(template_manifest)
        with self.assertRaises(Exception):
            lint.validate_armada_documents(document)
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def test_lint_validate_manifest_pass(self):
        template_manifest = """
        schema: armada/Manifest/v1
        metadata:
            schema: metadata/Document/v1
            name: example-manifest
        data:
            release_prefix: example
            chart_groups:
                - example-group
        """
        document = yaml.safe_load_all(template_manifest)
        self.assertTrue(lint.validate_manifest_document(document))
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def test_lint_validate_group_pass(self):
        template_manifest = """
        schema: armada/ChartGroup/v1
        metadata:
            schema: metadata/Document/v1
            name: example-manifest
        data:
            description: this is sample
            chart_group:
                - example-group
        """
        document = yaml.safe_load_all(template_manifest)
        self.assertTrue(lint.validate_chart_group_document(document))
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def test_lint_validate_group_no_chart_group(self):
        template_manifest = """
        schema: armada/ChartGroup/v1
        metadata:
            schema: metadata/Document/v1
            name: example-manifest
        data:
            description: this is sample
        """
        document = yaml.safe_load_all(template_manifest)
        with self.assertRaises(Exception):
            lint.validate_chart_group_document(document)
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def test_lint_validate_chart_pass(self):
        template_manifest = """
        schema: armada/Chart/v1
        metadata:
          schema: metadata/Document/v1
          name: example-chart
        data:
          name: keystone
          release: keystone
          namespace: undercloud
          timeout: 100
          install:
            no_hooks: false
          upgrade:
            no_hooks: false
          values: {}
          source:
            type: git
            location: git://github.com/example/example
            subpath: example-chart
            reference: master
          dependencies:
            - dep-chart
        """
        document = yaml.safe_load_all(template_manifest)
        self.assertTrue(lint.validate_chart_document(document))
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def test_lint_validate_chart_no_release(self):
        template_manifest = """
        schema: armada/Chart/v1
        metadata:
          schema: metadata/Document/v1
          name: example-chart
        data:
          name: keystone
          namespace: undercloud
          timeout: 100
          install:
            no_hooks: false
          upgrade:
            no_hooks: false
          values: {}
          source:
            type: git
            location: git://github.com/example/example
            subpath: example-chart
            reference: master
          dependencies:
            - dep-chart
        """
        document = yaml.safe_load_all(template_manifest)
        with self.assertRaises(Exception):
            lint.validate_chart_document(document)
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def test_pre_flight_ops(self, mock_tiller, mock_lint, mock_git):
        '''Test pre-flight checks and operations'''
        armada = Armada('')
        armada.tiller = mock_tiller
        armada.documents = yaml.safe_load_all(self.test_yaml)
        armada.config = Manifest(armada.documents).get_manifest()

        CHART_SOURCES = [('git://github.com/dummy/armada', 'chart_1'),
                         ('/tmp/dummy/armada', 'chart_2')]

        # mock methods called by pre_flight_ops()
        mock_tiller.tiller_status.return_value = True
        mock_lint.valid_manifest.return_value = True
        mock_git.git_clone.return_value = CHART_SOURCES[0][0]

        armada.pre_flight_ops()

        mock_git.git_clone.assert_called_once_with(CHART_SOURCES[0][0],
                                                   'master')
        for group in armada.config.get('armada').get('charts'):
            for counter, chart in enumerate(group.get('chart_group')):
                self.assertEqual(
                    chart.get('chart').get('source_dir')[0],
                    CHART_SOURCES[counter][0])
                self.assertEqual(
                    chart.get('chart').get('source_dir')[1],
                    CHART_SOURCES[counter][1])
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def test_find_document_type_valid(self):
        with open(self.base_manifest) as f:
            doc_obj = list(yaml.safe_load_all(f.read()))
            ovr = Override(doc_obj)
            test_group = ovr.find_document_type('chart_group')
            self.assertEqual(test_group, const.DOCUMENT_GROUP)

            test_chart = ovr.find_document_type('chart')
            self.assertEqual(test_chart, const.DOCUMENT_CHART)

            test_manifest = ovr.find_document_type('manifest')
            self.assertEqual(test_manifest, const.DOCUMENT_MANIFEST)
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def test_find_document_type_invalid(self):
        with self.assertRaises(Exception):
            with open(self.base_manifest) as f:
                doc_obj = list(yaml.safe_load_all(f.read()))
                ovr = Override(doc_obj)
                ovr.find_document_type('charts')
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def test_find_manifest_document_valid(self):
        expected = "{}/templates/override-{}-expected.yaml".format(
            self.basepath, '02')

        with open(self.base_manifest) as f, open(expected) as e:
            doc_path = ['chart', 'blog-1']
            doc_obj = list(yaml.safe_load_all(f.read()))
            ovr = Override(doc_obj).find_manifest_document(doc_path)
            expected_doc = list(yaml.safe_load_all(e.read()))[0]

            self.assertEqual(ovr, expected_doc)
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def _load_yaml_file(self, doc):
        '''
        Retrieve yaml file as a dictionary.
        '''

        try:
            with open(doc) as f:
                return list(yaml.safe_load_all(f.read()))
        except IOError:
            raise override_exceptions.InvalidOverrideFileException(doc)
项目:armada    作者:att-comdev    | 项目源码 | 文件源码
def post_apply(self, manifest=None, values=None, set=None, query=None):

        if values or set:
            document = list(yaml.safe_load_all(manifest))
            override = Override(
                document, overrides=set, values=values).update_manifests()
            manifest = yaml.dump(override)

        endpoint = self._set_endpoint('1.0', 'apply')
        resp = self.session.post(endpoint, body=manifest, query=query)

        self._check_response(resp)

        return resp.json()
项目:patrole    作者:openstack    | 项目源码 | 文件源码
def __init__(self, filepath):
            with open(filepath) as f:
                RequirementsParser.Inner._rbac_map = \
                    list(yaml.safe_load_all(f))
项目:promenade    作者:att-comdev    | 项目源码 | 文件源码
def _load_schemas():
    '''
    Fills the cache of known schemas
    '''
    schema_dir = _get_schema_dir()
    for schema_file in os.listdir(schema_dir):
        with open(os.path.join(schema_dir, schema_file)) as f:
            for schema in yaml.safe_load_all(f):
                name = schema['metadata']['name']
                if name in SCHEMAS:
                    raise RuntimeError(
                        'Duplicate schema specified for: %s' % name)

                SCHEMAS[name] = schema['data']
项目:promenade    作者:att-comdev    | 项目源码 | 文件源码
def from_streams(cls, *, streams, **kwargs):
        documents = []
        for stream in streams:
            stream_name = getattr(stream, 'name')
            if stream_name is not None:
                LOG.info('Loading documents from %s', stream_name)
            stream_documents = list(yaml.safe_load_all(stream))
            validation.check_schemas(stream_documents)
            if stream_name is not None:
                LOG.info('Successfully validated documents from %s',
                         stream_name)
            documents.extend(stream_documents)

        return cls(documents=documents, **kwargs)
项目:promenade    作者:att-comdev    | 项目源码 | 文件源码
def from_design_ref(cls, design_ref):
        response = requests.get(design_ref)
        response.raise_for_status()

        documents = list(yaml.safe_load_all(response.text))
        validation.check_schemas(documents)

        return cls(documents=documents)
项目:bernard    作者:leviroth    | 项目源码 | 文件源码
def load_yaml_config(database, subreddit, config_file):
    """Parse the given file and return a list of Browsers."""
    with config_file.open() as file:
        config = yaml.safe_load_all(file)
        return parse_subreddit_config(database, subreddit, config)