我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用yaml.CLoader()。
def parse_front_matter(lines): """ Parse lines of front matter """ if not lines: return "toml", {} if lines[0] == "{": # JSON import json return "json", json.loads("\n".join(lines)) if lines[0] == "+++": # TOML import toml return "toml", toml.loads("\n".join(lines[1:-1])) if lines[0] == "---": # YAML import yaml return "yaml", yaml.load("\n".join(lines[1:-1]), Loader=yaml.CLoader) return {}
def load_app_from_config(path): """ Generate app directly from config file, bypassing command line settings (useful for testing in ipython) """ Setting.generate_missing_shorthands() defaults = Setting.generate_defaults_dict() if osp.isfile(path): file_stream = open(path, "r", encoding="utf-8") config_defaults = load(file_stream, Loader=Loader) file_stream.close() for key, value in config_defaults.items(): defaults[key] = value else: raise ValueError("Settings file not found at: {0:s}".format(path)) args = ap.Namespace() for key, value in defaults.items(): args.__dict__[key] = value if args.unsynced: app = ApplicationUnsynced(args) else: app = ApplicationSynced(args) return app
def load_app_from_config(path): """ Generate app directly from config file, bypassing command line settings (useful for testing in ipython) """ Setting.generate_missing_shorthands() defaults = Setting.generate_defaults_dict() if osp.isfile(path): file_stream = open(path, "r", encoding="utf-8") config_defaults = load(file_stream, Loader=Loader) file_stream.close() for key, value in config_defaults.items(): defaults[key] = value else: raise ValueError("Settings file not found at: {0:s}".format(path)) args = ap.Namespace() for key, value in defaults.items(): args.__dict__[key] = value app = StereoMatcherApp(args) return app
def get_module(filename_with_path): """ Given a filename with an absolute path, load the contents, instantiate a Module, and set the Module's path attribute. Parameters: filename_with_path (str): the YAML filename with an absolute path """ try: with open(filename_with_path) as config_file: Module.temp_path = filename_with_path this_module = yaml.load(config_file, Loader=Loader) Module.temp_path = "" return this_module except IOError: raise ModulePathError(filename_with_path) except yaml.scanner.ScannerError: raise ModuleConstraintParseError("Parsing of module {} failed. This is likely caused by a typo in the file." "".format(filename_with_path)) # Add the YAML Module constructor so that YAML knows to use it in situations where the tag matches.
def read_configuration(cligraph, custom_suffix=''): """Read configuration dict for the given tool """ cfg = {} layers = collections.OrderedDict() layers['auto'] = [automatic_configuration, None] layers['shared'] = [os.path.join(cligraph.tool_path, 'conf/%s.yaml' % cligraph.tool_shortname), None] layers['custom'] = [os.path.join(os.path.abspath(os.path.expanduser('~/.' + cligraph.tool_shortname)), '%s.yaml%s' % (cligraph.tool_shortname, custom_suffix)), None] for layer_name, layer_data in layers.items(): if callable(layer_data[0]): layer = layer_data[0](cligraph, layer_name) else: if not os.path.exists(layer_data[0]): continue with open(layer_data[0], 'r') as filep: layer = yaml.load(filep, Loader=Loader) layers[layer_name][1] = layer if layer: update_recursive(cfg, layer) resolve_config(cfg) return AttrDict(**cfg), layers
def get_dataset(filename): ''' Iterator for dataset's items :param filename: Path to dataset's file :type filename: str :return: Dataset's items :raises OSError: if has problem with file :raises yaml.YAMLError: if has problem with format :raises ValueError: if has problem with content ''' with open(filename, 'rt', encoding='utf-8') as input: package = load(input, Loader=Loader) dataset = package.get('dataset') if not isinstance(dataset, list): raise ValueError('wrong format') yield from dataset
def process_measurements(self): """Process measurements""" loader = Loader(self.measurements_stream) setattr(loader, 'collector', self.collector) setattr(loader, 'system', self.system) setattr(loader, 'config', self.config) measurements = loader.get_data() for measurement_name in measurements: logging.debug('Process "{}" measurements: {}'.format( measurement_name, measurements[measurement_name])) for measurement in measurements[measurement_name]: self.send_data(measurement)
def process_notify(self, notification): """Process events""" loader = Loader(self.events_stream) setattr(loader, 'notification', notification) setattr(loader, 'system', self.system) notifications = loader.get_data() for notify_name in notifications: logging.debug('Process "{}" notification'.format(notify_name)) if notifications[notify_name] is not None: self.send_data(notifications[notify_name])
def parse_file(cls, path): try: if hasattr(path, 'read') and callable(path.read): # File-like obj conf = yaml.load(path.read(), Loader=_Loader) else: with io.open(path) as f: conf = yaml.load(f.read(), Loader=_Loader) except yaml.error.MarkedYAMLError as e: raise InvalidSpecification(str(e)) return cls.parse(conf)
def load_yaml(document: Path, required=False): document = Path(document) if not document.exists(): if required: click.secho('yaml document does not exists: {0}'.format(document), fg='red') return {} try: return yaml.load(document.text(), Loader=Loader) except yaml.YAMLError as exc: click.secho(str(exc), fg='red') return {}
def parse_annotations(self, ctx, symbol): assert ctx and symbol if ctx.comment: comment = ctx.comment.text symbol.comment = comment if ctx.tagSymbol(): lines = [tag.line.text[1:] for tag in ctx.tagSymbol()] try: data = yaml.load('\n'.join(lines), Loader=Loader) symbol._tags = data except yaml.YAMLError as exc: click.secho(exc, fg='red')
def read_yaml(file: str): with open(file, 'rU', encoding="utf-8") as stream: return yaml.load(stream, Loader=Loader)
def load_config(path): # TODO: support old-style setting names? i.e. pass them through ARGS_TO_SETTINGS ? data = {} yaml_err = "" if useYAML: try: # this assumes only one document with open(path, encoding='utf8') as infp: data = yaml_load(infp, Loader=YAMLLoader) except FileNotFoundError: return {} except YAMLError as e: yaml_err = 'YAML parsing error in file {}'.format(path) if hasattr(e, 'problem_mark'): mark = e.problem_mark yaml_err + '\nError on Line:{} Column:{}'.format(mark.line + 1, mark.column + 1) else: return _convert_old_config(data) try: with open(path, encoding='utf8') as infp: data = json.load(infp) except FileNotFoundError: return {} except json.JSONDecodeError as e: if useYAML and yaml_err: print(yaml_err) else: print('JSON parsing error in file {}'.format(path), 'Error on Line: {} Column: {}'.format(e.lineno, e.colno), sep='\n') data = _convert_old_config(data) # if 'directory' in data: # for k, v in data['directory'].items(): # data['directory'][k] = os.path.expanduser(v) return data
def load(*args, **kwargs): """Delegate to yaml load. """ if kwargs is None: kwargs = {} kwargs['Loader'] = Loader return yaml.load(*args, **kwargs)
def load_all(*args, **kwargs): """Delegate to yaml loadall. """ if kwargs is None: kwargs = {} kwargs['Loader'] = Loader return yaml.load_all(*args, **kwargs)
def main(argv): if len(argv) < 2: usage() filename = sys.argv[1] with open(filename, 'rb') as f: products = [] count, good, bad = 0, 0, 0 out = csv.writer(open("products.csv","w")) for line in f: count += 1 if not (count % 100000): print "count:", count, "good:", good, ", bad:", bad if ("'title':" in line) and ("'brand':" in line) and ("'categories':" in line): try: line = line.rstrip().replace("\\'","''") product = yaml.load(line, Loader=Loader) title, brand, categories = product['title'], product['brand'], product['categories'] description = product['description'] if 'description' in product else '' categories = ' / '.join([item for sublist in categories for item in sublist]) out.writerow([title, brand, description, categories]) good += 1 except Exception as e: print line print e bad += 1 print "good:", good, ", bad:", bad
def collect(self): loader = yaml.CLoader if hasattr(yaml, 'CLoader') else yaml.Loader with self.fspath.open() as f: data = yaml.load(f.read(), Loader=loader) for test in self._compose_tests(data): yield test
def parse_native_yaml(path): with open(path, 'r') as f: return yaml.load(f, Loader=Loader)
def parse_nn_yaml(filename): with open(filename, 'r') as f: return yaml.load(f, Loader=Loader)
def load_deprecated_signatures(declarations_by_signature): with open(deprecated_path, 'r') as f: deprecated_defs = yaml.load(f, Loader=Loader) declarations = [] def get_signature(name, params, call_args): # create a mapping of parameter name to parameter type types = dict([param.split(' ')[::-1] for param in params]) # if the name in the call is not in the parameter list, assume it's # a literal Scalar rearranged_types = [types.get(arg, 'Scalar') for arg in call_args] return '{}({})'.format(name, ', '.join(rearranged_types)) for deprecated in deprecated_defs: prototype = deprecated['name'] call_args = split_name_params(deprecated['aten'])[1] name, params = split_name_params(prototype) signature = get_signature(name, params, call_args) for declaration in declarations_by_signature[signature]: declaration = copy.deepcopy(declaration) declaration['deprecated'] = True declaration['call_args'] = call_args if declaration['inplace']: declaration['prototype'] = prototype.replace(name, name + '_') else: declaration['prototype'] = prototype args_by_name = {arg['name']: arg for arg in declaration['arguments']} declaration['arguments'] = [] for arg in params: _, arg_name = arg.split(' ') declaration['arguments'].append(args_by_name[arg_name]) declarations.append(declaration) return declarations
def load_yaml_file(filepath): with open(filepath) as fp: parse_data(load(fp, Loader=Loader))
def read(self): self.update(load(open(self.file), Loader)) self['playlists'] = self.get('playlists', {}) self['players'] = self.get('players', {}) if not 'chromecast' in self['players']: self['players']['chromecast'] = {'type': 'chromecast'}
def load_data_yaml(self, *args): ''' Load one or several YAML stats files and merge them with current results ''' superstats = [] for filename in args: self.stream = open(filename, 'r') superstats.extend([data for data in yaml.load_all(self.stream, Loader=Loader) if data is not None]) return superstats
def parse_yaml(filename): """ Parses a YAML file and returns a nested dictionary containing its contents. :param str filename: Name of YAML file to parse :return: Parsed file contents :rtype: dict or None """ try: # Enables use of stdin if '-' is specified with sys.stdin if filename == '-' else open(filename) as f: try: # Parses the YAML file into a dict return load(f, Loader=Loader) except YAMLError as exc: logging.critical("Could not parse YAML file %s", filename) if hasattr(exc, 'problem_mark'): # Tell user exactly where the syntax error is mark = exc.problem_mark logging.error("Error position: (%s:%s)", mark.line + 1, mark.column + 1) else: logging.error("Error: %s", exc) return None except FileNotFoundError: logging.critical("Could not find YAML file for parsing: %s", filename) return None
def load(cls, filename): '''Load model from file in YAML format.''' with open(filename, 'r') as fin: return load(fin, Loader=Loader)
def main(): parser = argparse.ArgumentParser(description='holosocket server') parser.add_argument('-c', '--config', help='config file') parser.add_argument('-4', '--ipv4', action='store_true', help='ipv4 only') parser.add_argument('--debug', action='store_true', help='debug mode') args = parser.parse_args() if args.config: with open(args.config, 'r') as f: config = yaml.load(f, Loader=Loader) if args.debug: LOGGING_MODE = logging.DEBUG else: LOGGING_MODE = logging.INFO logging.basicConfig( level=LOGGING_MODE, format='{asctime} {levelname} {message}', datefmt='%Y-%m-%d %H:%M:%S', style='{') if args.ipv4: SERVER = config['server'] else: SERVER = (config['server'], '::') SERVER_PORT = config['server_port'] KEY = config['password'] try: DNS = config['dns'] except KeyError: DNS = None try: import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) logging.info('uvloop mode') except ImportError: logging.info('pure asyncio mode') loop = asyncio.get_event_loop() server = Server(KEY, nameservers=DNS) coro = asyncio.start_server(server.handle, SERVER, SERVER_PORT, loop=loop) server = loop.run_until_complete(coro) try: loop.run_forever() except KeyboardInterrupt: pass server.close() loop.run_until_complete(server.wait_closed()) loop.close()
def main(): parser = argparse.ArgumentParser(description='holosocket local') parser.add_argument('-c', '--config', help='config file') parser.add_argument('--debug', action='store_true', help='debug mode') args = parser.parse_args() if args.config: with open(args.config, 'r') as f: config = yaml.load(f, Loader=Loader) if args.debug: LOGGING_MODE = logging.DEBUG else: LOGGING_MODE = logging.INFO logging.basicConfig( level=LOGGING_MODE, format='{asctime} {levelname} {message}', datefmt='%Y-%m-%d %H:%M:%S', style='{') SERVER = config['server'] try: V6_SERVER = config['v6_server'] except KeyError: V6_SERVER = None SERVER_PORT = config['server_port'] LOCAL = config['local'] PORT = config['local_port'] KEY = config['password'] try: import uvloop asyncio.set_event_loop_policy(uvloop.EventLoopPolicy()) logging.info('uvloop mode') except ImportError: logging.info('pure asyncio mode') loop = asyncio.get_event_loop() server = Server(SERVER, V6_SERVER, SERVER_PORT, KEY) coro = asyncio.start_server(server.handle, LOCAL, PORT, loop=loop) server = loop.run_until_complete(coro) try: loop.run_forever() except KeyboardInterrupt: pass server.close() loop.run_until_complete(server.wait_closed()) loop.close()
def main(): Setting.generate_missing_shorthands() defaults = Setting.generate_defaults_dict() conf_parser = \ Setting.generate_parser(defaults, console_only=True, description= "Use one or more .mp4 video files to perform calibration: " + "find the cameras' intrinsics and/or extrinsics.") # ============== STORAGE/RETRIEVAL OF CONSOLE SETTINGS ===========================================# args, remaining_argv = conf_parser.parse_known_args() defaults[Setting.save_settings.name] = args.save_settings if args.settings_file: defaults[Setting.settings_file.name] = args.settings_file if osp.isfile(args.settings_file): file_stream = open(args.settings_file, "r", encoding="utf-8") config_defaults = load(file_stream, Loader=Loader) file_stream.close() for key, value in config_defaults.items(): defaults[key] = value else: raise ValueError("Settings file not found at: {0:s}".format(args.settings_file)) parser = Setting.generate_parser(defaults, parents=[conf_parser]) args = parser.parse_args(remaining_argv) # process "special" setting values if args.folder == "!settings_file_location": if args.settings_file and osp.isfile(args.settings_file): args.folder = osp.dirname(args.settings_file) # save settings if prompted to do so if args.save_settings and args.settings_file: setting_dict = vars(args) file_stream = open(args.settings_file, "w", encoding="utf-8") file_name = setting_dict[Setting.save_settings.name] del setting_dict[Setting.save_settings.name] del setting_dict[Setting.settings_file.name] dump(setting_dict, file_stream, Dumper=Dumper) file_stream.close() setting_dict[Setting.save_settings.name] = file_name setting_dict[Setting.settings_file.name] = True app = MultiStereoApplication(args)
def main(): Setting.generate_missing_shorthands() defaults = Setting.generate_defaults_dict() conf_parser = \ Setting.generate_parser(defaults, console_only=True, description= "Test stereo algorithms on two image files.") # ============== STORAGE/RETRIEVAL OF CONSOLE SETTINGS ===========================================# args, remaining_argv = conf_parser.parse_known_args() defaults[Setting.save_settings.name] = args.save_settings if args.settings_file: defaults[Setting.settings_file.name] = args.settings_file if osp.isfile(args.settings_file): file_stream = open(args.settings_file, "r", encoding="utf-8") config_defaults = load(file_stream, Loader=Loader) file_stream.close() if config_defaults: for key, value in config_defaults.items(): defaults[key] = value else: raise ValueError("Settings file not found at: {0:s}".format(args.settings_file)) parser = Setting.generate_parser(defaults, parents=[conf_parser]) args = parser.parse_args(remaining_argv) # process "special" setting values if args.folder == "!settings_file_location": if args.settings_file and osp.isfile(args.settings_file): args.folder = osp.dirname(args.settings_file) # save settings if prompted to do so if args.save_settings and args.settings_file: setting_dict = vars(args) file_stream = open(args.settings_file, "w", encoding="utf-8") file_name = setting_dict[Setting.save_settings.name] del setting_dict[Setting.save_settings.name] del setting_dict[Setting.settings_file.name] dump(setting_dict, file_stream, Dumper=Dumper) file_stream.close() setting_dict[Setting.save_settings.name] = file_name setting_dict[Setting.settings_file.name] = True app = StereoMatcherApp(args) app.disparity2()
def load(stream, filename): if filename.endswith('.cdg'): import ubjson cg = ubjson.load(stream) elif filename.endswith('.json'): import json cg = json.load(stream) elif filename.endswith('.yaml'): import yaml try: from yaml import CLoader as Loader except ImportError: from yaml import Loader cg = yaml.load(stream, Loader=Loader) else: raise ValueError('Unhandled file type: %s' % filename) graph = create(filename) for (name, props) in cg['functions'].items(): graph.add_node(name) if 'attributes' in props: graph.node[name].update(props['attributes']) if 'calls' in props: calls = props['calls'] if calls: for target in calls: graph.add_edge(name, target, kind = EdgeKind.Call) if 'flows' in props: flows = props['flows'] if flows: for source in flows: graph.add_edge(source, name, kind = EdgeKind.Flow) return graph
def __init__(self, args, out_postfix="_out", with_video_output=True): self.global_video_offset = 0 self.flip_video = False self.datapath = "./" self.__dict__.update(vars(args)) self.writer = None if os.path.exists("settings.yaml"): stream = open("settings.yaml", mode='r') self.settings = load(stream, Loader=Loader) stream.close() self.datapath = self.settings['datapath'].replace("<current_user>", getuser()) print("Processing path: ", self.datapath) if 'raw_options' in self.settings: raw_options = self.settings['raw_options'] if self.in_video in raw_options: self.global_video_offset = raw_options[args.in_video]['global_offset'] self.flip_video = raw_options[args.in_video]['flip'] self.cap = None self.reload_video() print("Processing video file {:s}.".format(self.in_video)) last_frame = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT) - 1) if self.end_with == -1: self.end_with = last_frame else: if self.end_with > last_frame: print(("Warning: specified end frame ({:d}) is beyond the last video frame" + " ({:d}). Stopping after last frame.") .format(self.end_with, last_frame)) self.end_with = last_frame print("Frame range: {:d}--{:d}".format(self.start_from, self.end_with)) if with_video_output: if self.out_video == "": self.out_video = args.in_video[:-4] + "_" + out_postfix + ".mp4" self.writer = cv2.VideoWriter(os.path.join(self.datapath, self.out_video), cv2.VideoWriter_fourcc('X', '2', '6', '4'), self.cap.get(cv2.CAP_PROP_FPS), (int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))), True) self.writer.set(cv2.VIDEOWRITER_PROP_NSTRIPES, cpu_count()) else: self.writer = None self.frame = None self.cur_frame_number = None
def process_arguments(program_arguments_enum, program_help_description): argproc = ArgumentProcessor(program_arguments_enum) defaults = argproc.generate_defaults_dict() conf_parser = \ argproc.generate_parser(defaults, console_only=True, description= "Test stereo algorithms on two image files.") # ============== STORAGE/RETRIEVAL OF CONSOLE SETTINGS ===========================================# args, remaining_argv = conf_parser.parse_known_args() defaults[ArgumentProcessor.save_settings_name] = args.save_settings if args.settings_file: defaults[ArgumentProcessor.settings_file_name] = args.settings_file if os.path.isfile(args.settings_file): file_stream = open(args.settings_file, "r", encoding="utf-8") config_defaults = load(file_stream, Loader=Loader) file_stream.close() if config_defaults: for key, value in config_defaults.items(): defaults[key] = value else: if not args.save_settings: raise ValueError("Settings file not found at: {0:s}".format(args.settings_file)) parser = argproc.generate_parser(defaults, parents=[conf_parser]) args = parser.parse_args(remaining_argv) # process "special" setting values if args.settings_file and os.path.isfile(args.settings_file): for key in args.__dict__.keys(): if key in argproc.setting_file_location_args and args.__dict__[key] == \ Argument.setting_file_location_wildcard: args.__dict__[key] = os.path.dirname(args.settings_file) # save settings if prompted to do so if args.save_settings and args.settings_file: setting_dict = vars(args) file_stream = open(args.settings_file, "w", encoding="utf-8") file_name = setting_dict[ArgumentProcessor.save_settings_name] del setting_dict[ArgumentProcessor.save_settings_name] del setting_dict[ArgumentProcessor.settings_file_name] dump(setting_dict, file_stream, Dumper=Dumper, indent=3, default_flow_style=False) file_stream.close() setting_dict[ArgumentProcessor.save_settings_name] = file_name setting_dict[ArgumentProcessor.settings_file_name] = True return args
def load_aten_declarations(path): with open(path, 'r') as f: declarations = yaml.load(f, Loader=Loader) # enrich declarations with additional information for declaration in declarations: for arg in declaration['arguments']: simple_type = arg['type'] simple_type = simple_type.replace(' &', '').replace('const ', '') simple_type = simple_type.replace('Generator *', 'Generator') arg['simple_type'] = simple_type declaration['formals'] = [arg['type'] + ' ' + arg['name'] for arg in declaration['arguments']] declaration['args'] = [arg['name'] for arg in declaration['arguments']] declaration['api_name'] = declaration['name'] declaration['return_type'] = format_return_type(declaration['returns']) declaration['base_name'] = declaration['name'] # Compute the Python function prototype for argument parsing typed_args = [] positional = True for arg in declaration['arguments']: if arg.get('kwarg_only', False) and positional: typed_args.append('*') positional = False typename = arg['simple_type'] if arg.get('is_nullable'): typename = '{}?'.format(typename) if arg.get('size') is not None: typename = '{}[{}]'.format(typename, arg['size']) param = typename + ' ' + arg['name'] default = None if arg.get('default') is not None: default = arg['default'] if default == 'nullptr' or default == '{}': default = 'None' if arg.get('python_default_init') is not None: default = 'None' if default is not None: param += '=' + str(default) typed_args.append(param) # Python function prototype. # This is the string that we give to FunctionParameter, which is # then parsed into the actual structure which we do parsing # with. declaration['typed_args'] = typed_args declaration['prototype'] = FUNCTION_PROTOTYPE.substitute(declaration) return declarations