我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用yaml.load_all()。
def load(self, istream): """ Load the model from input stream. reset() is called to clean up network instance. """ self.reset() allLayer = {} for layer in yaml.load_all(istream): allLayer[layer.saveName] = layer if issubclass(type(layer), RawInput): self.setInput(layer, reload=True) # TODO: consider there are multiple input layer # TODO: branch and merge shouldStop = False currentLayer = self.currentLayer while not shouldStop: self.append(allLayer[currentLayer.outputLayerName[0]], reload=True) currentLayer = allLayer[currentLayer.outputLayerName[0]] if len(currentLayer.outputLayerName) <= 0: shouldStop = True
def test_update_dictionary_valid(self): expected = "{}/templates/override-{}-expected.yaml".format( self.basepath, '01') merge = "{}/templates/override-{}.yaml".format(self.basepath, '01') with open(self.base_manifest) as f, open(expected) as e, open( merge) as m: merging_values = list(yaml.safe_load_all(m.read())) doc_obj = list(yaml.safe_load_all(f.read())) doc_path = ['chart', 'blog-1'] ovr = Override(doc_obj) ovr.update_document(merging_values) ovr_doc = ovr.find_manifest_document(doc_path) expect_doc = list(yaml.load_all(e.read()))[0] self.assertEqual(ovr_doc, expect_doc)
def get_chart_templates(self, template_name, name, release_name, namespace, chart, disable_hooks, values): # returns some info LOG.info("Template( %s ) : %s ", template_name, name) stub = ReleaseServiceStub(self.channel) release_request = InstallReleaseRequest( chart=chart, dry_run=True, values=values, name=name, namespace=namespace, wait=False) templates = stub.InstallRelease( release_request, self.timeout, metadata=self.metadata) for template in yaml.load_all( getattr(templates.release, 'manifest', [])): if template_name == template.get('metadata', None).get( 'name', None): LOG.info(template_name) return template
def get_content(self): """Return a single document from YAML""" def multi_constructor(loader, tag_suffix, node): """Stores all unknown tags content into a dict Original yaml: !unknown_tag - some content Python object: {"!unknown_tag": ["some content", ]} """ if type(node.value) is list: if type(node.value[0]) is tuple: return {node.tag: loader.construct_mapping(node)} else: return {node.tag: loader.construct_sequence(node)} else: return {node.tag: loader.construct_scalar(node)} yaml.add_multi_constructor("!", multi_constructor) with self.__get_file() as file_obj: self.__documents = [x for x in yaml.load_all(file_obj)] return self.__documents[self.__document_id]
def load_camera_from_calibr(f): s = open(f).read() y = list(yaml.load_all(s))[0] K0 = Camera.buildK(y['cam0']['intrinsics']) C0 = Camera(K0, np.eye(3), np.zeros((3,))) K1 = Camera.buildK(y['cam1']['intrinsics']) M = y['cam1']['T_cn_cnm1'][:3] R1 = np.asarray([k[:3] for k in M]) t1 = np.asarray([k[3] for k in M]) C1 = Camera(K1, R1, t1) dist0 = np.asarray(y['cam0']['distortion_coeffs']) dist1 = np.array(y['cam1']['distortion_coeffs']) return C0, C1, dist0, dist1
def parse_kubeconfig(): if not os.path.exists(os.path.expanduser("~/.kube/config")): return ("", "", "") with open(os.path.expanduser("~/.kube/config"), "r") as fd: docs = yaml.load_all(fd) for doc in docs: current_context = doc.get("current-context", "") contexts = doc.get("contexts") if contexts: for index, context in enumerate(contexts): if context['name'] == current_context: KubeConfig.current_context_index = index KubeConfig.current_context_name = context['name'] if 'cluster' in context['context']: KubeConfig.clustername = context['context']['cluster'] if 'namespace' in context['context']: KubeConfig.namespace = context['context']['namespace'] if 'user' in context['context']: KubeConfig.user = context['context']['user'] return (KubeConfig.clustername, KubeConfig.user, KubeConfig.namespace) return ("", "", "")
def openCollection(self, fileName): try: f = open(unicode(fileName), 'r') Mainframe.model = model.Model() Mainframe.model.delete(0) for data in yaml.load_all(f): Mainframe.model.add(model.makeSafe(data), False) f.close() Mainframe.model.is_dirty = False except IOError: msgBox(Lang.value('MSG_IO_failed')) Mainframe.model = model.Model() except yaml.YAMLError as e: msgBox(Lang.value('MSG_YAML_failed') % e) Mainframe.model = model.Model() else: if len(Mainframe.model.entries) == 0: Mainframe.model = model.Model() Mainframe.model.filename = unicode(fileName) finally: Mainframe.sigWrapper.sigModelChanged.emit()
def read(): with open(Conf.file, 'r') as f: Conf.values = yaml.load(f) Conf.zoos = [] with open(Conf.zoo_file, 'r') as f: for zoo in yaml.load_all(f): Conf.zoos.append(zoo) with open(Conf.keywords_file, 'r') as f: Conf.keywords = yaml.load(f) with open(Conf.popeye_file, 'r') as f: Conf.popeye = yaml.load(f) with open(Conf.chest_file, 'r') as f: Conf.chest = yaml.load(f)
def display(self, f): def handle_input(key): if key in ('q', 'Q'): raise urwid.ExitMainLoop() elif key in ('right', 'j', ' '): if self.slide_id < len(self.sd) - 1: self.slide_id += 1 elif key in ('left', 'k'): if self.slide_id > 0: self.slide_id -= 1 self.update_display() self.load_charset() self.sd = list(yaml.load_all(f)) self.slide_id = 0 self.update_display() txt = urwid.Text(u"Presenting...") fill = urwid.Filler(txt, 'bottom') urwid.MainLoop(fill, unhandled_input=handle_input).run()
def runTest(self): os.environ['RANDOMSEED'] = '2' options = get_options(['--samples', '10', '--output', 'ch13_r05_test.yaml']) face_count = write_rolls(options.output_path, roll_iter(options.samples, options.seed)) self.assertDictEqual( {8: 8, 7: 6, 10: 5, 4: 3, 6: 3, 9: 3, 2: 2, 3: 1, 5: 1, 11: 1, 12: 1}, face_count) results = list(yaml.load_all(self.data_path.read_text())) self.assertListEqual( [[[1, 1]], [[1, 3], [2, 6], [6, 3], [3, 5], [2, 5]], [[1, 5], [6, 2], [4, 6], [4, 6], [5, 3], [5, 4], [5, 3], [1, 1], [3, 4]], [[3, 4]], [[4, 5], [2, 5]], [[2, 2], [2, 1], [2, 3], [2, 2]], [[5, 5], [3, 5], [6, 5], [2, 4], [4, 6]], [[5, 3], [5, 3]], [[3, 4]], [[2, 4], [6, 6], [4, 6], [5, 2]]], results)
def get_deployer_notes(stig_id): """Read deployer notes based on the STIG ID.""" filename = "{0}/rhel7/{1}.rst".format(METADATA_DIR, stig_id) # Does this deployer note exist? if not os.path.isfile(filename): return False # Read the note and parse it with YAML with open(filename, 'r') as f: rst_file = f.read() # Split the RST into frontmatter and text # NOTE(mhayden): Can't use the standard yaml.load_all() here at it will # have scanner errors in documents that have colons (:). yaml_boundary = re.compile(r'^-{3,}$', re.MULTILINE) _, metadata, text = yaml_boundary.split(rst_file, 2) # Assemble the metadata and the text from the deployer note. post = yaml.safe_load(metadata) post['content'] = text return post
def get_api_objs(self, group, manifest, ctx=None): if ctx is None: ctx = {} ctx = self.get_manifest_ctx(group, manifest, **ctx) docs = yaml.load_all( self.cluster.decode_manifest( self.cluster.config["release"][group]["manifests"][manifest], ctx, ) ) objs = collections.defaultdict(list) for doc in docs: obj = getattr(pykube.objects, doc["kind"])(self.api, doc) if obj.exists(): obj.reload() # set the shadow object to the original doc enabling proper # update handling if the object has changed in the manifest obj.obj = doc objs[doc["kind"]].append(obj) return objs
def _check_if_pod_or_more(self, app_id, app_info): only_pod = True kind_list = [] app_dir = app_info['app_location'] app_folder_name = app_info['app_folder_name'] df_dir = app_dir + "/" + app_folder_name app_yaml = app_info['app_yaml'] stream = open(df_dir + "/" + app_yaml, "r") docs = yaml.load_all(stream) for doc in docs: for k,v in doc.items(): if k == 'kind': kind_list.append(v.strip()) if 'Service' in kind_list or 'Deployment' in kind_list: only_pod = False return only_pod
def validate_app_format(app_file_name): valid = False kind_set = [] try: stream = open(app_file_name, "r") docs = yaml.load_all(stream) for doc in docs: for k,v in doc.items(): if k == 'kind': kind_set.append(v.strip()) if k == 'app': valid = True break if len(kind_set) == 1 and 'Pod' in kind_set: valid = True except Exception as e: print("Could not parse %s" % app_file_name) exit() return valid
def initialize(): f = cfg.CONF.drilldown.mappingfile with open(f, 'r') as stream: a = yaml.load_all(stream) for dictio in a: for key, value in dictio.iteritems(): for mylist in value: obj = MappingFile(mylist['sourcelabel'], mylist['targetlabel'], mylist['returnfields'], mylist['nextfields'], mylist['datasource']) odict[mylist['sourcelabel']] = obj
def get_config(): if not len(sys.argv) > 1: raise Exception('please give config file path') if not os.path.exists(sys.argv[1]): raise Exception("config file doesn't exists") stream = open(sys.argv[1], "r") docs = yaml.load_all(stream) for f in docs: return f
def pull_yaml(args, source): """ Pull YAML resources from a file-like object. """ for dct in load_all(source): for item in dct.items(): yield item
def to_dict(path): """ Normalize each resource as a dictionary for smarter comparison. Relies on link sorting performed by the pull script. """ with open(path) as file_: return { key: value for dct in load_all(file_) for key, value in dct.items() }
def fromObject(cls, obj, decode=None): orig = Documentation.fromObject(obj, decode) self = cls(orig.raw) lines = self.raw.splitlines() if '---' in lines: n = lines.index('---') this, that = '\n'.join(lines[:n]), '\n'.join(lines[n:]) self.yamlData = {} for ydoc in yaml.load_all(that): assert isinstance(ydoc, dict), "only dict-like structures allowed in yaml docstrings not %r" % type(ydoc) self.yamlData.update(ydoc) else: this = '\n'.join(lines) self.raw = this return self
def regular_load(stream, loader=yaml.loader.Loader): # LOAD fails if more than one document is there # return yaml.load(fh) # LOAD ALL gets more than one document inside the file # gen = yaml.load_all(fh) return yaml.load_all(stream, loader)
def load_all(stream): return yaml.load_all(stream, Loader=Loader)
def load_with_includes(filename): with open(filename) as f: docs = list(load_all(f)) base_dir = os.path.dirname(filename) res = AttrDict() for doc in docs: if isinstance(doc, Includes): for inc_file in doc.lst: if not os.path.isabs(inc_file): inc_file = os.path.join(base_dir, inc_file) inc_res = load_with_includes(inc_file) res._merge(inc_res) else: res._merge(doc) return res
def ds_spec(self): if not os.path.isfile(self.ds_yaml_file): return None with open(self.ds_yaml_file) as ds_conf: return [i for i in yaml.load_all(ds_conf)]
def load_current_file(): print("Loading current file: {}".format(current_file)) global current_docs, current_doc try: with open(current_file, "r", encoding="UTF-8") as f: current_docs = list(yaml.load_all(f)) current_doc = len(current_docs) except FileNotFoundError: pass except yaml.YAMLError: print("Failed to parse edit file")
def load_cases(): cases_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'cases') cases_files = [os.path.join(cases_path, f)for f in os.listdir(cases_path) if os.path.isfile(os.path.join(cases_path, f))] cases = [] for cases_file in cases_files: with open(cases_file) as f: cases.extend(list(yaml.load_all(f))) return cases
def yaml(self): if self._yaml is None: sio = six.BytesIO(self.raw()) setattr(sio, 'name', self._name) self._yaml = list(yaml.load_all(sio, yaml_loader.YamlLoader)) return self._yaml
def switch_to_next_cluster(): if not os.path.exists(os.path.expanduser("~/.kube/config")): return with open(os.path.expanduser("~/.kube/config"), "r") as fd: docs = yaml.load_all(fd) for doc in docs: contexts = doc.get("contexts") if contexts: KubeConfig.current_context_index = (KubeConfig.current_context_index+1) % len(contexts) cluster_name = contexts[KubeConfig.current_context_index]['name'] kubectl_config_use_context = "kubectl config use-context " + cluster_name cmd_process = subprocess.Popen(kubectl_config_use_context, shell=True, stdout=subprocess.PIPE) cmd_process.wait() return
def test_summary_file_entries(self): """Verifies the output summary's file format. This focuses on the format of the file instead of the content of entries, which is covered in base_test_test. """ mock_test_config = self.base_mock_test_config.copy() mock_ctrlr_config_name = mock_controller.MOBLY_CONTROLLER_CONFIG_NAME my_config = [{ 'serial': 'xxxx', 'magic': 'Magic1' }, { 'serial': 'xxxx', 'magic': 'Magic2' }] mock_test_config.controller_configs[mock_ctrlr_config_name] = my_config tr = test_runner.TestRunner(self.log_dir, self.test_bed_name) tr.add_test_class(mock_test_config, integration_test.IntegrationTest) tr.run() summary_path = os.path.join(mock_test_config.log_path, mock_test_config.test_bed_name, 'latest', records.OUTPUT_FILE_SUMMARY) with open(summary_path, 'r') as f: summary_entries = list(yaml.load_all(f)) self.assertEqual(len(summary_entries), 4) # Verify the first entry is the list of test names. self.assertEqual(summary_entries[0]['Type'], records.TestSummaryEntryType.TEST_NAME_LIST.value) self.assertEqual(summary_entries[1]['Type'], records.TestSummaryEntryType.RECORD.value)
def test_output(self): """Verifies the expected output files from a test run. * Files are correctly created. * Basic sanity checks of each output file. """ mock_test_config = self.base_mock_test_config.copy() mock_ctrlr_config_name = mock_controller.MOBLY_CONTROLLER_CONFIG_NAME my_config = [{ 'serial': 'xxxx', 'magic': 'Magic1' }, { 'serial': 'xxxx', 'magic': 'Magic2' }] mock_test_config.controller_configs[mock_ctrlr_config_name] = my_config tr = test_runner.TestRunner(self.log_dir, self.test_bed_name) tr.add_test_class(mock_test_config, integration_test.IntegrationTest) tr.run() output_dir = os.path.join(self.log_dir, self.test_bed_name, 'latest') summary_file_path = os.path.join(output_dir, records.OUTPUT_FILE_SUMMARY) debug_log_path = os.path.join(output_dir, records.OUTPUT_FILE_DEBUG_LOG) info_log_path = os.path.join(output_dir, records.OUTPUT_FILE_INFO_LOG) self.assertTrue(os.path.isfile(summary_file_path)) self.assertTrue(os.path.isfile(debug_log_path)) self.assertTrue(os.path.isfile(info_log_path)) summary_entries = [] with open(summary_file_path) as f: for entry in yaml.load_all(f): self.assertTrue(entry['Type']) summary_entries.append(entry) with open(debug_log_path, 'r') as f: content = f.read() self.assertIn('DEBUG', content) self.assertIn('INFO', content) with open(info_log_path, 'r') as f: content = f.read() self.assertIn('INFO', content) self.assertNotIn('DEBUG', content)
def __enter__(self): self._file = open(self.filename, 'rU', encoding="utf-8") return yaml.load_all(self._file)
def load(cls, config_file): """ Load experiment parameters from a YAML configuration file Parameters ---------- config_file: string path to a YAML configuration file Returns ------- dictionary (or list of dictionaries) of parameters to pass to a behavior """ try: import yaml except ImportError: raise ImportError("Pyyaml is required to use a .yaml configuration file") parameters = list() with open(config_file, "rb") as config: for val in yaml.load_all(config): parameters.append(val) if len(parameters) == 1: parameters = parameters[0] return parameters
def test(): def spec_test(name, spec): "Make a test function for a case from specification.yml" def test(*args): exc, res = None, None try: res = jsone.render(spec['template'], spec['context']) except jsone.JSONTemplateError as e: if 'error' not in spec: raise exc = e if 'error' in spec: assert exc, "expected exception" expected = spec['error'] if expected is True: # no specific expectation return eq_(str(exc), expected) else: assert not exc assert res == spec['result'], \ '{!r} != {!r}'.format(res, spec['result']) return test with open(os.path.join(os.path.dirname(__file__), '../specification.yml')) as f: with freeze_time('2017-01-19T16:27:20.974Z'): for spec in yaml.load_all(f): if 'section' in spec: section = spec['section'] continue name = '{}: {}'.format(section, spec['title']) t = spec_test(name, spec) yield (t, name)
def onPaste(self): try: data = yaml.load_all(unicode(self.clipboard.text())) if isinstance(data, dict): data = [data] except yaml.YAMLError as e: msgBox(Lang.value('MSG_YAML_failed') % e) return for entry in data: entry = model.makeSafe(entry) Mainframe.model.insert(entry, True, Mainframe.model.current + 1) self.rebuild() Mainframe.sigWrapper.sigModelChanged.emit()
def load_info(path, qt_parent=None): path_base, path_extension = _ospath.splitext(path) filename = path_base + '.yaml' try: with open(filename, 'r') as info_file: info = list(_yaml.load_all(info_file)) except FileNotFoundError as e: print('\nAn error occured. Could not find metadata file:\n{}'.format(filename)) if qt_parent is not None: _QMessageBox.critical(qt_parent, 'An error occured', 'Could not find metadata file:\n{}'.format(filename)) raise NoMetadataFileError(e) return info
def yml_2_json(): json_text.delete(1.0, tk.END) error_tips['text']=tips_str txt = yml_text.get(1.0, tk.END) if len(txt) <= 1: return try: yml_doc = yaml.load_all(txt) j = "" for obj in yml_doc: j += json.dumps(obj, indent=4) json_text.insert(1.0, j) except: error_tips['text']=tips_str + 'YAML????'
def headers(self): try: headers = next(yaml.load_all(self.read(body=False))) except StopIteration as e: raise ValueError("YAML header is missing. Please ensure that the top of your post has a header of the following form:\n" + SAMPLE_HEADER) except yaml.YAMLError as e: raise ValueError( "YAML header is incorrectly formatted or missing. The following information may be useful:\n{}\nIf you continue to have difficulties, try pasting your YAML header into an online parser such as http://yaml-online-parser.appspot.com/.".format(str(e))) for key, value in headers.copy().items(): if isinstance(value, datetime.date): headers[key] = datetime.datetime.combine(value, datetime.time(0)) if key == 'tags' and isinstance(value, list): headers[key] = [str(v) if six.PY3 else unicode(v) for v in value] return headers
def load_all(*args, **kwargs): """Delegate to yaml loadall. """ if kwargs is None: kwargs = {} kwargs['Loader'] = Loader return yaml.load_all(*args, **kwargs)
def load(stream, safe=False, many=False): Loader = SafeCustomLoader if safe else CustomLoader if many: return tuple(yaml.load_all(stream, Loader)) else: return yaml.load(stream, Loader)
def generate_yaml_tests(): file_path = join(YAML_TESTS_ROOT, "macros.yaml") with open(file_path) as f: configs = load_all(f) for config in configs: yield (config['name'], _generate_test(config))
def bulk_create_problem_yml(self, path): """Create new problem entries in database from yml document stream. Can create a yml file with individual problems delimited into documents using the `---` ... `---` document stream syntax. Parameters ---------- path: str or path-like Path to yml file """ with open(path, "r") as f: obj_all = yaml.load_all(f) for obj in obj_all: self.create_problem(**obj)
def load_all(filename): """Generate objects contained in ``filename``.""" if filename.endswith('.yaml') or filename.endswith('.yml'): with io.open(filename, encoding='utf-8') as handle: for obj in yaml.load_all(handle): yield obj else: raise UsageError("Unsupported file type (extension) in '{}'!".format(filename))
def read_merged_files(cfgfiles): """Read a list of hierachical config files, and merge their keys.""" result = {} for cfgfile in cfgfiles: for obj in load_all(cfgfile): merge_objects(result, obj) return result
def parse_file(file_name): with open(file_name) as f: theYAML = list(yaml.load_all(f, Loader=Loader)) try: theYAML.remove(None) except ValueError: pass return theYAML
def test_duplicates(self): mydir = os.path.dirname(__file__) path = os.path.join(mydir, 'dups_good') buildfilepath = os.path.join(path, 'build.yml') if os.path.exists(buildfilepath): os.remove(buildfilepath) build.generate_build_file(path) assert os.path.exists(buildfilepath) with open(buildfilepath) as fd: docs = yaml.load_all(fd) data = next(docs, None) contents = data['contents'] assert contents == { # File extensions added to "a" due to conflicts 'a_txt': {'file': 'a.txt'}, 'a_csv': {'file': 'a.csv'}, # "a" dir stays the same - but it's fine cause other "a"s got renamed 'a': {}, # Directories don't actually have extensions, so include them even with no conficts 'dir_ext': {}, # Weird characters replaced with a single "_" 'a_b_c': {'file': 'a%%b___c'}, # Prepend "n" to files that start with a number 'n1': {'file': '1'}, # ... even if there used to be an underscore there 'n123': {'file': '_123'}, # Handle conflicts with numbers, too 'n1_txt': {'file': '1.txt'}, } os.remove(buildfilepath)
def read_yml_file(fn): mydir = os.path.dirname(__file__) filepath = os.path.join(mydir, fn) with open(filepath) as fd: return next(yaml.load_all(fd), None)
def load_inputs(filename: str) -> list: """Load inputs configuration from `filename`.""" if not filename: filename = _find_config_file("inputs.yaml") _LOG.debug("loading inputs from %s", filename) if not os.path.isfile(filename): _LOG.error("loading inputs file error: '%s' not found", filename) return None try: with open(filename) as fin: inps = [doc for doc in yaml.load_all(fin) if doc and doc.get("enable", True)] _LOG.debug("loading inputs - found %d enabled inputs", len(inps)) if not inps: _LOG.error("loading inputs error: no valid/enabled " "inputs found") return inps except IOError as err: _LOG.error("loading inputs from file %s error: %s", filename, err) except yaml.error.YAMLError as err: _LOG.error("loading inputs from file %s - invalid YAML: %s", filename, err) return None
def process_all_files(result_file, file_names): for source_path in (Path(n) for n in file_names): detail_log.info("read {}".format(source_path)) with source_path.open() as source_file: game_iter = yaml.load_all(source_file) statistics = gather_stats(game_iter) result_file.write( yaml.dump(dict(statistics), explicit_start=True) )
def __init__(self, filename): with open(filename) as infile: try: text = infile.read() except: raise ZettelLoaderError( "%s: I/O error - Encoding must be UTF-8" % filename) try: self.ydocs = yaml.load_all(text) except: raise ZettelLoaderError("%s: YAML load failure" % filename)