Python yaml 模块,CLoader() 实例源码

我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用yaml.CLoader()

项目:staticsite    作者:spanezz    | 项目源码 | 文件源码
def parse_front_matter(lines):
    """
    Parse lines of front matter
    """
    if not lines: return "toml", {}

    if lines[0] == "{":
        # JSON
        import json
        return "json", json.loads("\n".join(lines))

    if lines[0] == "+++":
        # TOML
        import toml
        return "toml", toml.loads("\n".join(lines[1:-1]))

    if lines[0] == "---":
        # YAML
        import yaml
        return "yaml", yaml.load("\n".join(lines[1:-1]), Loader=yaml.CLoader)

    return {}
项目:cvcalib    作者:Algomorph    | 项目源码 | 文件源码
def load_app_from_config(path):
    """
    Generate app directly from config file, bypassing command line settings (useful for testing in ipython)
    """
    Setting.generate_missing_shorthands()
    defaults = Setting.generate_defaults_dict()
    if osp.isfile(path):
        file_stream = open(path, "r", encoding="utf-8")
        config_defaults = load(file_stream, Loader=Loader)
        file_stream.close()
        for key, value in config_defaults.items():
            defaults[key] = value
    else:
        raise ValueError("Settings file not found at: {0:s}".format(path))
    args = ap.Namespace()
    for key, value in defaults.items():
        args.__dict__[key] = value
    if args.unsynced:
        app = ApplicationUnsynced(args)
    else:
        app = ApplicationSynced(args)
    return app
项目:cvcalib    作者:Algomorph    | 项目源码 | 文件源码
def load_app_from_config(path):
    """
    Generate app directly from config file, bypassing command line settings (useful for testing in ipython)
    """
    Setting.generate_missing_shorthands()
    defaults = Setting.generate_defaults_dict()
    if osp.isfile(path):
        file_stream = open(path, "r", encoding="utf-8")
        config_defaults = load(file_stream, Loader=Loader)
        file_stream.close()
        for key, value in config_defaults.items():
            defaults[key] = value
    else:
        raise ValueError("Settings file not found at: {0:s}".format(path))
    args = ap.Namespace()
    for key, value in defaults.items():
        args.__dict__[key] = value

    app = StereoMatcherApp(args)

    return app
项目:cvcalib    作者:Algomorph    | 项目源码 | 文件源码
def load_app_from_config(path):
    """
    Generate app directly from config file, bypassing command line settings (useful for testing in ipython)
    """
    Setting.generate_missing_shorthands()
    defaults = Setting.generate_defaults_dict()
    if osp.isfile(path):
        file_stream = open(path, "r", encoding="utf-8")
        config_defaults = load(file_stream, Loader=Loader)
        file_stream.close()
        for key, value in config_defaults.items():
            defaults[key] = value
    else:
        raise ValueError("Settings file not found at: {0:s}".format(path))
    args = ap.Namespace()
    for key, value in defaults.items():
        args.__dict__[key] = value
    if args.unsynced:
        app = ApplicationUnsynced(args)
    else:
        app = ApplicationSynced(args)
    return app
项目:aws-ec2rescue-linux    作者:awslabs    | 项目源码 | 文件源码
def get_module(filename_with_path):
    """
    Given a filename with an absolute path, load the contents, instantiate a Module, and set the Module's path
    attribute.

    Parameters:
        filename_with_path (str): the YAML filename with an absolute path
    """
    try:
        with open(filename_with_path) as config_file:
            Module.temp_path = filename_with_path
            this_module = yaml.load(config_file, Loader=Loader)
            Module.temp_path = ""
            return this_module
    except IOError:
        raise ModulePathError(filename_with_path)
    except yaml.scanner.ScannerError:
        raise ModuleConstraintParseError("Parsing of module {} failed. This is likely caused by a typo in the file."
                                         "".format(filename_with_path))
# Add the YAML Module constructor so that YAML knows to use it in situations where the tag matches.
项目:cligraphy    作者:Netflix-Skunkworks    | 项目源码 | 文件源码
def read_configuration(cligraph, custom_suffix=''):
    """Read configuration dict for the given tool
    """

    cfg = {}
    layers = collections.OrderedDict()
    layers['auto'] = [automatic_configuration, None]
    layers['shared'] = [os.path.join(cligraph.tool_path, 'conf/%s.yaml' % cligraph.tool_shortname), None]
    layers['custom'] = [os.path.join(os.path.abspath(os.path.expanduser('~/.' + cligraph.tool_shortname)), '%s.yaml%s' % (cligraph.tool_shortname, custom_suffix)), None]

    for layer_name, layer_data in layers.items():
        if callable(layer_data[0]):
            layer = layer_data[0](cligraph, layer_name)
        else:
            if not os.path.exists(layer_data[0]):
                continue
            with open(layer_data[0], 'r') as filep:
                layer = yaml.load(filep, Loader=Loader)
        layers[layer_name][1] = layer
        if layer:
            update_recursive(cfg, layer)

    resolve_config(cfg)
    return AttrDict(**cfg), layers
项目:atom-hw    作者:flame0    | 项目源码 | 文件源码
def get_dataset(filename):
    '''
    Iterator for dataset's items
    :param filename: Path to dataset's file
    :type filename: str
    :return: Dataset's items
    :raises OSError: if has problem with file
    :raises yaml.YAMLError: if has problem with format
    :raises ValueError: if has problem with content
    '''
    with open(filename, 'rt', encoding='utf-8') as input:
        package = load(input, Loader=Loader)
        dataset = package.get('dataset')
        if not isinstance(dataset, list):
            raise ValueError('wrong format')
        yield from dataset
项目:barometer    作者:opnfv    | 项目源码 | 文件源码
def process_measurements(self):
        """Process measurements"""
        loader = Loader(self.measurements_stream)
        setattr(loader, 'collector', self.collector)
        setattr(loader, 'system', self.system)
        setattr(loader, 'config', self.config)
        measurements = loader.get_data()
        for measurement_name in measurements:
            logging.debug('Process "{}" measurements: {}'.format(
                measurement_name, measurements[measurement_name]))
            for measurement in measurements[measurement_name]:
                self.send_data(measurement)
项目:barometer    作者:opnfv    | 项目源码 | 文件源码
def process_notify(self, notification):
        """Process events"""
        loader = Loader(self.events_stream)
        setattr(loader, 'notification', notification)
        setattr(loader, 'system', self.system)
        notifications = loader.get_data()
        for notify_name in notifications:
            logging.debug('Process "{}" notification'.format(notify_name))
            if notifications[notify_name] is not None:
                self.send_data(notifications[notify_name])
项目:badwolf    作者:bosondata    | 项目源码 | 文件源码
def parse_file(cls, path):
        try:
            if hasattr(path, 'read') and callable(path.read):
                # File-like obj
                conf = yaml.load(path.read(), Loader=_Loader)
            else:
                with io.open(path) as f:
                    conf = yaml.load(f.read(), Loader=_Loader)
        except yaml.error.MarkedYAMLError as e:
            raise InvalidSpecification(str(e))

        return cls.parse(conf)
项目:qface    作者:Pelagicore    | 项目源码 | 文件源码
def load_yaml(document: Path, required=False):
        document = Path(document)
        if not document.exists():
            if required:
                click.secho('yaml document does not exists: {0}'.format(document), fg='red')
            return {}
        try:
            return yaml.load(document.text(), Loader=Loader)
        except yaml.YAMLError as exc:
            click.secho(str(exc), fg='red')
        return {}
项目:qface    作者:Pelagicore    | 项目源码 | 文件源码
def parse_annotations(self, ctx, symbol):
        assert ctx and symbol
        if ctx.comment:
            comment = ctx.comment.text
            symbol.comment = comment
        if ctx.tagSymbol():
            lines = [tag.line.text[1:] for tag in ctx.tagSymbol()]
            try:
                data = yaml.load('\n'.join(lines), Loader=Loader)
                symbol._tags = data
            except yaml.YAMLError as exc:
                click.secho(exc, fg='red')
项目:MUBench    作者:stg-tud    | 项目源码 | 文件源码
def read_yaml(file: str):
    with open(file, 'rU', encoding="utf-8") as stream:
        return yaml.load(stream, Loader=Loader)
项目:showroom    作者:wlerin    | 项目源码 | 文件源码
def load_config(path):
    # TODO: support old-style setting names? i.e. pass them through ARGS_TO_SETTINGS ?
    data = {}
    yaml_err = ""
    if useYAML:
        try:
            # this assumes only one document
            with open(path, encoding='utf8') as infp:
                data = yaml_load(infp, Loader=YAMLLoader)
        except FileNotFoundError:
            return {}
        except YAMLError as e:
            yaml_err = 'YAML parsing error in file {}'.format(path)
            if hasattr(e, 'problem_mark'):
                mark = e.problem_mark
                yaml_err + '\nError on Line:{} Column:{}'.format(mark.line + 1, mark.column + 1)
        else:
            return _convert_old_config(data)
    try:
        with open(path, encoding='utf8') as infp:
            data = json.load(infp)
    except FileNotFoundError:
        return {}
    except json.JSONDecodeError as e:
        if useYAML and yaml_err:
            print(yaml_err)
        else:
            print('JSON parsing error in file {}'.format(path),
                  'Error on Line: {} Column: {}'.format(e.lineno, e.colno), sep='\n')

    data = _convert_old_config(data)

    # if 'directory' in data:
    #     for k, v in data['directory'].items():
    #         data['directory'][k] = os.path.expanduser(v)

    return data
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def load(*args, **kwargs):
    """Delegate to yaml load.
    """
    if kwargs is None:
        kwargs = {}
    kwargs['Loader'] = Loader
    return yaml.load(*args, **kwargs)
项目:treadmill    作者:Morgan-Stanley    | 项目源码 | 文件源码
def load_all(*args, **kwargs):
    """Delegate to yaml loadall.
    """
    if kwargs is None:
        kwargs = {}
    kwargs['Loader'] = Loader
    return yaml.load_all(*args, **kwargs)
项目:productner    作者:etano    | 项目源码 | 文件源码
def main(argv):
    if len(argv) < 2:
        usage()
    filename = sys.argv[1]
    with open(filename, 'rb') as f:
        products = []
        count, good, bad = 0, 0, 0
        out = csv.writer(open("products.csv","w"))
        for line in f:
            count += 1
            if not (count % 100000):
                print "count:", count, "good:", good, ", bad:", bad
            if ("'title':" in line) and ("'brand':" in line) and ("'categories':" in line):
                try:
                    line = line.rstrip().replace("\\'","''")
                    product = yaml.load(line, Loader=Loader)
                    title, brand, categories = product['title'], product['brand'], product['categories']
                    description = product['description'] if 'description' in product else ''
                    categories = ' / '.join([item for sublist in categories for item in sublist])
                    out.writerow([title, brand, description, categories])
                    good += 1
                except Exception as e:
                    print line
                    print e
                    bad += 1
        print "good:", good, ", bad:", bad
项目:zazo    作者:pradyunsg    | 项目源码 | 文件源码
def collect(self):
        loader = yaml.CLoader if hasattr(yaml, 'CLoader') else yaml.Loader
        with self.fspath.open() as f:
            data = yaml.load(f.read(), Loader=loader)
        for test in self._compose_tests(data):
            yield test
项目:pytorch    作者:pytorch    | 项目源码 | 文件源码
def parse_native_yaml(path):
    with open(path, 'r') as f:
        return yaml.load(f, Loader=Loader)
项目:pytorch    作者:pytorch    | 项目源码 | 文件源码
def parse_nn_yaml(filename):
    with open(filename, 'r') as f:
        return yaml.load(f, Loader=Loader)
项目:pytorch    作者:pytorch    | 项目源码 | 文件源码
def load_deprecated_signatures(declarations_by_signature):
    with open(deprecated_path, 'r') as f:
        deprecated_defs = yaml.load(f, Loader=Loader)
    declarations = []

    def get_signature(name, params, call_args):
        # create a mapping of parameter name to parameter type
        types = dict([param.split(' ')[::-1] for param in params])
        # if the name in the call is not in the parameter list, assume it's
        # a literal Scalar
        rearranged_types = [types.get(arg, 'Scalar') for arg in call_args]
        return '{}({})'.format(name, ', '.join(rearranged_types))

    for deprecated in deprecated_defs:
        prototype = deprecated['name']
        call_args = split_name_params(deprecated['aten'])[1]
        name, params = split_name_params(prototype)
        signature = get_signature(name, params, call_args)

        for declaration in declarations_by_signature[signature]:
            declaration = copy.deepcopy(declaration)
            declaration['deprecated'] = True
            declaration['call_args'] = call_args
            if declaration['inplace']:
                declaration['prototype'] = prototype.replace(name, name + '_')
            else:
                declaration['prototype'] = prototype

            args_by_name = {arg['name']: arg for arg in declaration['arguments']}
            declaration['arguments'] = []
            for arg in params:
                _, arg_name = arg.split(' ')
                declaration['arguments'].append(args_by_name[arg_name])
            declarations.append(declaration)
    return declarations
项目:bluecanary    作者:abconway    | 项目源码 | 文件源码
def load_yaml_file(filepath):
    with open(filepath) as fp:
        parse_data(load(fp, Loader=Loader))
项目:random-episode    作者:Nekmo    | 项目源码 | 文件源码
def read(self):
        self.update(load(open(self.file), Loader))
        self['playlists'] = self.get('playlists', {})
        self['players'] = self.get('players', {})
        if not 'chromecast' in self['players']:
            self['players']['chromecast'] = {'type': 'chromecast'}
项目:BigBrotherBot-For-UrT43    作者:ptitbigorneau    | 项目源码 | 文件源码
def load_data_yaml(self, *args):
        ''' Load one or several YAML stats files and merge them with current results '''
        superstats = []
        for filename in args:
            self.stream = open(filename, 'r')
            superstats.extend([data for data in yaml.load_all(self.stream, Loader=Loader) if data is not None])
        return superstats
项目:ADLES    作者:GhostofGoes    | 项目源码 | 文件源码
def parse_yaml(filename):
    """
    Parses a YAML file and returns a nested dictionary containing its contents.

    :param str filename: Name of YAML file to parse
    :return: Parsed file contents
    :rtype: dict or None
    """
    try:
        # Enables use of stdin if '-' is specified
        with sys.stdin if filename == '-' else open(filename) as f:
            try:
                # Parses the YAML file into a dict
                return load(f, Loader=Loader)
            except YAMLError as exc:
                logging.critical("Could not parse YAML file %s", filename)
                if hasattr(exc, 'problem_mark'):
                    # Tell user exactly where the syntax error is
                    mark = exc.problem_mark
                    logging.error("Error position: (%s:%s)",
                                  mark.line + 1, mark.column + 1)
                else:
                    logging.error("Error: %s", exc)
                    return None
    except FileNotFoundError:
        logging.critical("Could not find YAML file for parsing: %s", filename)
        return None
项目:lomap    作者:wasserfeder    | 项目源码 | 文件源码
def load(cls, filename):
        '''Load model from file in YAML format.'''
        with open(filename, 'r') as fin:
            return load(fin, Loader=Loader)
项目:Holosocket    作者:Sherlock-Holo    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser(description='holosocket server')
    parser.add_argument('-c', '--config', help='config file')
    parser.add_argument('-4', '--ipv4', action='store_true', help='ipv4 only')
    parser.add_argument('--debug', action='store_true', help='debug mode')

    args = parser.parse_args()

    if args.config:
        with open(args.config, 'r') as f:
            config = yaml.load(f, Loader=Loader)

    if args.debug:
        LOGGING_MODE = logging.DEBUG
    else:
        LOGGING_MODE = logging.INFO

    logging.basicConfig(
        level=LOGGING_MODE,
        format='{asctime} {levelname} {message}',
        datefmt='%Y-%m-%d %H:%M:%S',
        style='{')

    if args.ipv4:
        SERVER = config['server']

    else:
        SERVER = (config['server'], '::')

    SERVER_PORT = config['server_port']
    KEY = config['password']
    try:
        DNS = config['dns']
    except KeyError:
        DNS = None

    try:
        import uvloop
        asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
        logging.info('uvloop mode')
    except ImportError:
        logging.info('pure asyncio mode')

    loop = asyncio.get_event_loop()
    server = Server(KEY, nameservers=DNS)
    coro = asyncio.start_server(server.handle, SERVER, SERVER_PORT, loop=loop)
    server = loop.run_until_complete(coro)

    try:
        loop.run_forever()

    except KeyboardInterrupt:
        pass

    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()
项目:Holosocket    作者:Sherlock-Holo    | 项目源码 | 文件源码
def main():
    parser = argparse.ArgumentParser(description='holosocket local')
    parser.add_argument('-c', '--config', help='config file')
    parser.add_argument('--debug', action='store_true', help='debug mode')

    args = parser.parse_args()

    if args.config:
        with open(args.config, 'r') as f:
            config = yaml.load(f, Loader=Loader)

    if args.debug:
        LOGGING_MODE = logging.DEBUG
    else:
        LOGGING_MODE = logging.INFO

    logging.basicConfig(
        level=LOGGING_MODE,
        format='{asctime} {levelname} {message}',
        datefmt='%Y-%m-%d %H:%M:%S',
        style='{')

    SERVER = config['server']
    try:
        V6_SERVER = config['v6_server']
    except KeyError:
        V6_SERVER = None

    SERVER_PORT = config['server_port']
    LOCAL = config['local']
    PORT = config['local_port']
    KEY = config['password']

    try:
        import uvloop
        asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
        logging.info('uvloop mode')
    except ImportError:
        logging.info('pure asyncio mode')

    loop = asyncio.get_event_loop()
    server = Server(SERVER, V6_SERVER, SERVER_PORT, KEY)
    coro = asyncio.start_server(server.handle, LOCAL, PORT, loop=loop)
    server = loop.run_until_complete(coro)

    try:
        loop.run_forever()

    except KeyboardInterrupt:
        pass

    server.close()
    loop.run_until_complete(server.wait_closed())
    loop.close()
项目:cvcalib    作者:Algomorph    | 项目源码 | 文件源码
def main():
    Setting.generate_missing_shorthands()
    defaults = Setting.generate_defaults_dict()
    conf_parser = \
        Setting.generate_parser(defaults, console_only=True, description=
        "Use one or more .mp4 video files to perform calibration: " +
        "find the cameras' intrinsics and/or extrinsics.")

    # ============== STORAGE/RETRIEVAL OF CONSOLE SETTINGS ===========================================#
    args, remaining_argv = conf_parser.parse_known_args()
    defaults[Setting.save_settings.name] = args.save_settings
    if args.settings_file:
        defaults[Setting.settings_file.name] = args.settings_file
        if osp.isfile(args.settings_file):
            file_stream = open(args.settings_file, "r", encoding="utf-8")
            config_defaults = load(file_stream, Loader=Loader)
            file_stream.close()
            for key, value in config_defaults.items():
                defaults[key] = value
        else:
            raise ValueError("Settings file not found at: {0:s}".format(args.settings_file))

    parser = Setting.generate_parser(defaults, parents=[conf_parser])
    args = parser.parse_args(remaining_argv)

    # process "special" setting values
    if args.folder == "!settings_file_location":
        if args.settings_file and osp.isfile(args.settings_file):
            args.folder = osp.dirname(args.settings_file)

    # save settings if prompted to do so
    if args.save_settings and args.settings_file:
        setting_dict = vars(args)
        file_stream = open(args.settings_file, "w", encoding="utf-8")
        file_name = setting_dict[Setting.save_settings.name]
        del setting_dict[Setting.save_settings.name]
        del setting_dict[Setting.settings_file.name]
        dump(setting_dict, file_stream, Dumper=Dumper)
        file_stream.close()
        setting_dict[Setting.save_settings.name] = file_name
        setting_dict[Setting.settings_file.name] = True

    app = MultiStereoApplication(args)
项目:cvcalib    作者:Algomorph    | 项目源码 | 文件源码
def main():
    Setting.generate_missing_shorthands()
    defaults = Setting.generate_defaults_dict()
    conf_parser = \
        Setting.generate_parser(defaults, console_only=True, description=
        "Test stereo algorithms on two image files.")

    # ============== STORAGE/RETRIEVAL OF CONSOLE SETTINGS ===========================================#
    args, remaining_argv = conf_parser.parse_known_args()
    defaults[Setting.save_settings.name] = args.save_settings
    if args.settings_file:
        defaults[Setting.settings_file.name] = args.settings_file
        if osp.isfile(args.settings_file):
            file_stream = open(args.settings_file, "r", encoding="utf-8")
            config_defaults = load(file_stream, Loader=Loader)
            file_stream.close()
            if config_defaults:
                for key, value in config_defaults.items():
                    defaults[key] = value
        else:
            raise ValueError("Settings file not found at: {0:s}".format(args.settings_file))

    parser = Setting.generate_parser(defaults, parents=[conf_parser])
    args = parser.parse_args(remaining_argv)

    # process "special" setting values
    if args.folder == "!settings_file_location":
        if args.settings_file and osp.isfile(args.settings_file):
            args.folder = osp.dirname(args.settings_file)

    # save settings if prompted to do so
    if args.save_settings and args.settings_file:
        setting_dict = vars(args)
        file_stream = open(args.settings_file, "w", encoding="utf-8")
        file_name = setting_dict[Setting.save_settings.name]
        del setting_dict[Setting.save_settings.name]
        del setting_dict[Setting.settings_file.name]
        dump(setting_dict, file_stream, Dumper=Dumper)
        file_stream.close()
        setting_dict[Setting.save_settings.name] = file_name
        setting_dict[Setting.settings_file.name] = True

    app = StereoMatcherApp(args)
    app.disparity2()
项目:py-cdg    作者:musec    | 项目源码 | 文件源码
def load(stream, filename):
    if filename.endswith('.cdg'):
        import ubjson
        cg = ubjson.load(stream)

    elif filename.endswith('.json'):
        import json
        cg = json.load(stream)

    elif filename.endswith('.yaml'):
        import yaml

        try:
            from yaml import CLoader as Loader
        except ImportError:
            from yaml import Loader

        cg = yaml.load(stream, Loader=Loader)

    else:
        raise ValueError('Unhandled file type: %s' % filename)

    graph = create(filename)

    for (name, props) in cg['functions'].items():
        graph.add_node(name)

        if 'attributes' in props:
            graph.node[name].update(props['attributes'])

        if 'calls' in props:
            calls = props['calls']
            if calls:
                for target in calls:
                    graph.add_edge(name, target, kind = EdgeKind.Call)

        if 'flows' in props:
            flows = props['flows']
            if flows:
                for source in flows:
                    graph.add_edge(source, name, kind = EdgeKind.Flow)

    return graph
项目:AMBR    作者:Algomorph    | 项目源码 | 文件源码
def __init__(self, args, out_postfix="_out", with_video_output=True):
        self.global_video_offset = 0
        self.flip_video = False
        self.datapath = "./"
        self.__dict__.update(vars(args))
        self.writer = None

        if os.path.exists("settings.yaml"):
            stream = open("settings.yaml", mode='r')
            self.settings = load(stream, Loader=Loader)
            stream.close()
            self.datapath = self.settings['datapath'].replace("<current_user>", getuser())
            print("Processing path: ", self.datapath)
            if 'raw_options' in self.settings:
                raw_options = self.settings['raw_options']
                if self.in_video in raw_options:
                    self.global_video_offset = raw_options[args.in_video]['global_offset']
                    self.flip_video = raw_options[args.in_video]['flip']

        self.cap = None
        self.reload_video()
        print("Processing video file {:s}.".format(self.in_video))

        last_frame = int(self.cap.get(cv2.CAP_PROP_FRAME_COUNT) - 1)

        if self.end_with == -1:
            self.end_with = last_frame
        else:
            if self.end_with > last_frame:
                print(("Warning: specified end frame ({:d}) is beyond the last video frame" +
                       " ({:d}). Stopping after last frame.")
                      .format(self.end_with, last_frame))
                self.end_with = last_frame

        print("Frame range: {:d}--{:d}".format(self.start_from, self.end_with))

        if with_video_output:
            if self.out_video == "":
                self.out_video = args.in_video[:-4] + "_" + out_postfix + ".mp4"

            self.writer = cv2.VideoWriter(os.path.join(self.datapath, self.out_video),
                                          cv2.VideoWriter_fourcc('X', '2', '6', '4'),
                                          self.cap.get(cv2.CAP_PROP_FPS),
                                          (int(self.cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
                                           int(self.cap.get(cv2.CAP_PROP_FRAME_HEIGHT))),
                                          True)
            self.writer.set(cv2.VIDEOWRITER_PROP_NSTRIPES, cpu_count())
        else:
            self.writer = None

        self.frame = None
        self.cur_frame_number = None
项目:AMBR    作者:Algomorph    | 项目源码 | 文件源码
def process_arguments(program_arguments_enum, program_help_description):
    argproc = ArgumentProcessor(program_arguments_enum)
    defaults = argproc.generate_defaults_dict()
    conf_parser = \
        argproc.generate_parser(defaults, console_only=True, description=
        "Test stereo algorithms on two image files.")

    # ============== STORAGE/RETRIEVAL OF CONSOLE SETTINGS ===========================================#
    args, remaining_argv = conf_parser.parse_known_args()
    defaults[ArgumentProcessor.save_settings_name] = args.save_settings
    if args.settings_file:
        defaults[ArgumentProcessor.settings_file_name] = args.settings_file
        if os.path.isfile(args.settings_file):
            file_stream = open(args.settings_file, "r", encoding="utf-8")
            config_defaults = load(file_stream, Loader=Loader)
            file_stream.close()
            if config_defaults:
                for key, value in config_defaults.items():
                    defaults[key] = value
        else:
            if not args.save_settings:
                raise ValueError("Settings file not found at: {0:s}".format(args.settings_file))

    parser = argproc.generate_parser(defaults, parents=[conf_parser])
    args = parser.parse_args(remaining_argv)

    # process "special" setting values
    if args.settings_file and os.path.isfile(args.settings_file):
        for key in args.__dict__.keys():
            if key in argproc.setting_file_location_args and args.__dict__[key] == \
                    Argument.setting_file_location_wildcard:
                args.__dict__[key] = os.path.dirname(args.settings_file)

    # save settings if prompted to do so
    if args.save_settings and args.settings_file:
        setting_dict = vars(args)
        file_stream = open(args.settings_file, "w", encoding="utf-8")
        file_name = setting_dict[ArgumentProcessor.save_settings_name]
        del setting_dict[ArgumentProcessor.save_settings_name]
        del setting_dict[ArgumentProcessor.settings_file_name]
        dump(setting_dict, file_stream, Dumper=Dumper, indent=3, default_flow_style=False)
        file_stream.close()
        setting_dict[ArgumentProcessor.save_settings_name] = file_name
        setting_dict[ArgumentProcessor.settings_file_name] = True

    return args
项目:pytorch    作者:pytorch    | 项目源码 | 文件源码
def load_aten_declarations(path):
    with open(path, 'r') as f:
        declarations = yaml.load(f, Loader=Loader)

    # enrich declarations with additional information
    for declaration in declarations:
        for arg in declaration['arguments']:
            simple_type = arg['type']
            simple_type = simple_type.replace(' &', '').replace('const ', '')
            simple_type = simple_type.replace('Generator *', 'Generator')
            arg['simple_type'] = simple_type
        declaration['formals'] = [arg['type'] + ' ' + arg['name']
                                  for arg in declaration['arguments']]
        declaration['args'] = [arg['name'] for arg in declaration['arguments']]
        declaration['api_name'] = declaration['name']
        declaration['return_type'] = format_return_type(declaration['returns'])

        declaration['base_name'] = declaration['name']

        # Compute the Python function prototype for argument parsing
        typed_args = []
        positional = True
        for arg in declaration['arguments']:
            if arg.get('kwarg_only', False) and positional:
                typed_args.append('*')
                positional = False
            typename = arg['simple_type']
            if arg.get('is_nullable'):
                typename = '{}?'.format(typename)
            if arg.get('size') is not None:
                typename = '{}[{}]'.format(typename, arg['size'])
            param = typename + ' ' + arg['name']
            default = None
            if arg.get('default') is not None:
                default = arg['default']
                if default == 'nullptr' or default == '{}':
                    default = 'None'
            if arg.get('python_default_init') is not None:
                default = 'None'
            if default is not None:
                param += '=' + str(default)
            typed_args.append(param)

        # Python function prototype.
        # This is the string that we give to FunctionParameter, which is
        # then parsed into the actual structure which we do parsing
        # with.
        declaration['typed_args'] = typed_args
        declaration['prototype'] = FUNCTION_PROTOTYPE.substitute(declaration)

    return declarations