Python yaml 模块,load() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用yaml.load()

项目:qgis_wp    作者:Zverik    | 项目源码 | 文件源码
def openGeoPackage(self, filename=None):
        if not filename:
            filename = QFileDialog.getOpenFileName(
                parent=None,
                caption=self.tr(u'Select GeoPackage file'),
                filter=self.tr(u'GeoPackage File') + u' (*.gpkg *.geopackage)')
        if not filename or not os.path.isfile(filename):
            return
        filename = os.path.abspath(filename)

        styleFile = os.path.join(self.path, 'res', 'wp_style.yaml')
        with open(styleFile, 'r') as f:
            style = yaml.load(f)
        applyStyle(filename, style)
        for layer in self.iface.legendInterface().layers():
            self.iface.legendInterface().refreshLayerSymbology(layer)
        self.createPie()
项目:botterlord    作者:Marchearth    | 项目源码 | 文件源码
def retrieve(cont, filename):
    stream = file(filename, 'r')
    data = yaml.load(stream)
    #return yaml.dump(data, encoding=('utf-8'), default_flow_style=False, allow_unicode=True)
    return data[cont].encode('utf-8')
项目:botterlord    作者:Marchearth    | 项目源码 | 文件源码
def internal_data(filename, io, entry, cont, cont_in = None, cont_in2 = None): #Supports up to 3 containers stacked on top.
    "filename = 'string',, io = [in, out],, entry = val,, cont,,..."
    stream = open(filename, 'r')
    prof = yaml.load(stream)
    if io == 'out':
        if cont_in == None:
            val = prof[cont]
        else:
            if cont_in2 == None:
                val = prof[cont][cont_in]
            else:
                val = prof[cont][cont_in][cont_in2]
        return val

    if io == 'in':
        if cont_in == None:
            prof[cont] = entry
        else:
            if cont_in2 == None:
                prof[cont][cont_in] = entry
            else:
                prof[cont][cont_in][cont_in2] = entry
        with open(filename, 'w') as yaml_file:
            yaml_file.write(yaml.dump(prof, default_flow_style = False))
项目:fuel-nailgun-extension-iac    作者:openstack    | 项目源码 | 文件源码
def __init__(self):
        settings_files = []
        project_path = os.path.dirname(__file__)
        project_settings_file = os.path.join(project_path, 'settings.yaml')
        settings_files.append(project_settings_file)
        settings_files.append('/etc/git-exension-settings.yaml')
        settings_files.append('/etc/nailgun/git-exension-settings.yaml')
        self.config = {}
        for sf in settings_files:
            try:
                logger.debug("Trying to read config file %s" % sf)
                with open(sf) as custom_config:
                    self.config.update(yaml.load(custom_config.read()))
            except Exception as e:
                logger.error("Error while reading config file %s: %s" %
                             (sf, str(e)))
项目:consul-pg    作者:adamcstephens    | 项目源码 | 文件源码
def configure(self):
        # load config values
        try:
            with open(self.configfile) as configfile_contents:
                self.config = json.load(configfile_contents)
        except:
            self.config = {}

        try:
            self.agent_services = self.api_session.get(self.api_endpoint + '/agent/services?stale').json()
        except:
            print_exc()
            exit(135)
        self.managed_service = self.agent_services[self.service]

        if self.managed_service['Tags'] == None:
            self.managed_service['Tags'] = []

        if self.role_source == "facter":
            self.get_facter_state(self.DEFAULT_FACTERFILE)
        else:
            print("!! unsupported PG role source !!")
            exit(140)
项目:ownbot    作者:michaelimfeld    | 项目源码 | 文件源码
def __load_config(self):
        """Loads the configuration file.

            Loads all usergroups and users as a dict from
            the configuration file into the config attribute.
        """
        if not os.path.exists(self.USERS_CONF_PATH):
            self.__config = {}
            return

        with open(self.USERS_CONF_PATH, "r") as config_file:
            config = yaml.load(config_file)
            if not config:
                self.__config = {}
                return

            self.__config = config
项目:PyPlanet    作者:PyPlanet    | 项目源码 | 文件源码
def load(self):
        # Prepare + load directory.
        super().load()

        # Load the files and parse Yaml.
        parsed_settings = dict()

        try:
            for file_name in self.files:
                file_path = os.path.join(self.directory, file_name)
                with open(file_path, 'r') as file_handle:
                    parsed_settings.update(yaml.load(file_handle))
        except (yaml.YAMLError, yaml.MarkedYAMLError) as e:
            raise ImproperlyConfigured(
                'Your settings file(s) contain invalid YAML syntax! Please fix and restart!, {}'.format(str(e))
            )

        # Loop and set in local settings (+ uppercase keys).
        for key, value in parsed_settings.items():
            self.settings[key.upper()] = value
项目:seq2seq    作者:google    | 项目源码 | 文件源码
def test_without_extra_args(self):
    pipeline_def = yaml.load("""
      class: ParallelTextInputPipeline
      params:
        source_files: ["file1"]
        target_files: ["file2"]
        num_epochs: 1
        shuffle: True
    """)
    pipeline = input_pipeline.make_input_pipeline_from_def(
        pipeline_def, tf.contrib.learn.ModeKeys.TRAIN)
    self.assertIsInstance(pipeline, input_pipeline.ParallelTextInputPipeline)
    #pylint: disable=W0212
    self.assertEqual(pipeline.params["source_files"], ["file1"])
    self.assertEqual(pipeline.params["target_files"], ["file2"])
    self.assertEqual(pipeline.params["num_epochs"], 1)
    self.assertEqual(pipeline.params["shuffle"], True)
项目:seq2seq    作者:google    | 项目源码 | 文件源码
def test_with_extra_args(self):
    pipeline_def = yaml.load("""
      class: ParallelTextInputPipeline
      params:
        source_files: ["file1"]
        target_files: ["file2"]
        num_epochs: 1
        shuffle: True
    """)
    pipeline = input_pipeline.make_input_pipeline_from_def(
        def_dict=pipeline_def,
        mode=tf.contrib.learn.ModeKeys.TRAIN,
        num_epochs=5,
        shuffle=False)
    self.assertIsInstance(pipeline, input_pipeline.ParallelTextInputPipeline)
    #pylint: disable=W0212
    self.assertEqual(pipeline.params["source_files"], ["file1"])
    self.assertEqual(pipeline.params["target_files"], ["file2"])
    self.assertEqual(pipeline.params["num_epochs"], 5)
    self.assertEqual(pipeline.params["shuffle"], False)
项目:yt-browser    作者:juanfgs    | 项目源码 | 文件源码
def __init__(self):
        self.config_dir = os.path.expanduser("~/.config/ytbrowser/")
        self.defaults['format'] = "mkv"
        self.defaults['quality'] = "bestvideo"
        self.defaults['preferredcodec'] = "mp3"
        self.defaults['preferredquality'] = 192
        self.defaults['developerKey'] = "AIzaSyDFuK00HWV0fd1VMb17R8GghRVf_iQx9uk"
        self.defaults['apiServiceName'] = "youtube"
        self.defaults['apiVersion'] = "v3"

        if not os.path.exists(self.config_dir):
            os.makedirs(self.config_dir)

        if not os.path.exists(self.config_dir + "config.yml"):
            open(self.config_dir + "config.yml", "a").close()

        with open(self.config_dir + "config.yml", 'r') as ymlfile:
                self.user_settings = yaml.load(ymlfile)

        if self.user_settings is None:
            self.user_settings = {}
项目:tripletloss    作者:luhaofang    | 项目源码 | 文件源码
def setup(self, bottom, top):
        """Setup the RoIDataLayer."""
        # parse the layer parameter string, which must be valid YAML
        layer_params = yaml.load(self.param_str_)    
        self._batch_size = config.BATCH_SIZE
        self._triplet = self._batch_size/3
        assert self._batch_size % 3 == 0
        self._name_to_top_map = {
            'data': 0,
            'labels': 1}

        self.data_container =  sampledata() 
        self._index = 0

        # data blob: holds a batch of N images, each with 3 channels
        # The height and width (100 x 100) are dummy values
        top[0].reshape(self._batch_size, 3, 224, 224)

        top[1].reshape(self._batch_size)
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def __getattr__(self, item):
        try:
            val = super(NewBaseModel, self).__getattribute__(item)
        except AttributeError:
            pass         # fall through to the rest of the logic...

        # try looking up via self._info, if we already have it.
        if item in self._info:
            return self._info[item]

        # if we're still here, let's load the object if we haven't done so already.
        if not self._full_init:
            self._refresh()

        # try one more time.
        if item in self._info:
            return self._info[item]
        else:
            raise AttributeError("'{0}' object has no attribute '{1}'".format(self.__class__.__name__,
                                                                              item))
项目:cbapi-python    作者:carbonblack    | 项目源码 | 文件源码
def __str__(self):
        lines = []
        lines.append("{0:s} object, bound to {1:s}.".format(self.__class__.__name__, self._cb.session.server))
        if self._last_refresh_time:
            lines.append(" Last refreshed at {0:s}".format(time.ctime(self._last_refresh_time)))
        if not self._full_init:
            lines.append(" Partially initialized. Use .refresh() to load all attributes")
        lines.append("-"*79)
        lines.append("")

        for attr in sorted(self._info):
            status = "   "
            if attr in self._dirty_attributes:
                if self._dirty_attributes[attr] is None:
                    status = "(+)"
                else:
                    status = "(*)"
            val = str(self._info[attr])
            if len(val) > 50:
                val = val[:47] + u"..."
            lines.append(u"{0:s} {1:>20s}: {2:s}".format(status, attr, val))

        return "\n".join(lines)
项目:sm-engine-ansible    作者:METASPACE2020    | 项目源码 | 文件源码
def __init__(self, ansible_config_path, aws_key_name=None, interval=60,
                 qname='sm_annotate', debug=False):

        with open(ansible_config_path) as fp:
            self.ansible_config = yaml.load(fp)

        self.interval = min(interval, 1200)
        self.aws_key_name = aws_key_name or self.ansible_config['aws_key_name']
        self.master_hostgroup = self.ansible_config['cluster_configuration']['instances']['master']['hostgroup']
        self.slave_hostgroup = self.ansible_config['cluster_configuration']['instances']['slave']['hostgroup']
        self.stage = self.ansible_config['stage']
        self.qname = qname
        self.debug = debug

        self._setup_logger()
        self.ec2 = boto3.resource('ec2', self.ansible_config['aws_region'])
项目:boss    作者:kabirbaidhya    | 项目源码 | 文件源码
def resolve_dotenv_file(path, stage=None):
    '''
    Resolve dotenv file and load environment vars if it exists.
    If stage parameter is provided, then stage specific .env file is resolved,
    for instance .env.production if stage=production etc.
    If stage is None, just .env file is resolved.
    '''
    filename = '.env' + ('' if not stage else '.{}'.format(stage))
    dotenv_path = os.path.join(path, filename)
    fallback_path = os.path.join(path, '.env')

    if fs.exists(dotenv_path):
        info('Resolving env file: {}'.format(cyan(dotenv_path)))
        dotenv.load_dotenv(dotenv_path)

    elif fs.exists(fallback_path):
        info('Resolving env file: {}'.format(cyan(fallback_path)))
        dotenv.load_dotenv(fallback_path)
项目:boss    作者:kabirbaidhya    | 项目源码 | 文件源码
def load(filename=DEFAULT_CONFIG_FILE, stage=None):
    ''' Load the configuration and return it. '''
    try:
        # pass
        file_contents = fs.read(filename)
        resolve_dotenv_file(os.path.dirname(filename), stage)

        # Expand the environment variables used in the yaml config.
        loaded_config = os.path.expandvars(file_contents)

        # Parse the yaml configuration.
        # And merge it with the defaults before it's used everywhere.
        loaded_config = yaml.load(loaded_config)
        merged_config = merge_config(loaded_config)

        _config.update(merged_config)

        return get()

    except KeyError:
        halt('Invalid configuration file "{}"'.format(filename))

    except IOError:
        halt('Error loading config file "%s"' % filename)
项目:PyWebRunner    作者:IntuitiveWebSolutions    | 项目源码 | 文件源码
def main():
    global ARGS

    parser = argparse.ArgumentParser(description='Run a PyWebRunner YAML/JSON script.')
    parser.add_argument('-b', '--browser', help='Which browser to load. Defaults to Chrome.')
    parser.add_argument('--base-url', help='Base URL to use with goto command.')
    parser.add_argument('-t', '--timeout', help='Global wait timeout (in seconds). Defaults to 30.')
    parser.add_argument('-p', '--processes', help='Number of processes (browsers) to use. Defaults to 1')
    parser.add_argument('-do', '--default-offset', help='New default offset for scroll_to_element. (Default is 0)')
    parser.add_argument('--errors', dest='errors', action='store_true', help='Show errors.')
    parser.add_argument('--focus', dest='focus', action='store_true', help='Focus the browser on launch.')
    parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', help='Verbose output of commands being executed.')
    parser.add_argument('files', nargs='*')
    ARGS = parser.parse_args()

    processes = ARGS.processes or 1
    pool = Pool(int(processes))

    pool.map(run_test, ARGS.files)

    pool.close()
    pool.join()
项目:Auto_Analysis    作者:ztwo    | 项目源码 | 文件源码
def get_device_info():
    """
    ?????????devices
    :return: ??????
    """
    device_list = []
    ini = U.ConfigIni()
    test_info = ini.get_ini('test_info', 'info')
    test_device = ini.get_ini('test_device', 'device')
    with open(test_info) as f:
        test_dic = yaml.load(f)[0]

    with open(test_device) as f:
        for device in yaml.load(f):
            device_list.append(dict(test_dic.items() + device.items()))

    return device_list
项目:pybot    作者:spillai    | 项目源码 | 文件源码
def iter_keys_values(self, keys, inds=None, verbose=False): 
        for key in keys: 
            if key not in self.keys_: 
                raise RuntimeError('Key %s not found in dataset. keys: %s' % (key, self.keys_))

        idx, ii = 0, 0
        total_chunks = len(self.meta_file_.chunks)
        inds = np.sort(inds) if inds is not None else None

        for chunk_idx, chunk in enumerate(progressbar(self.meta_file_.chunks, size=total_chunks, verbose=verbose)): 
            data = AttrDict.load(self.get_chunk_filename(chunk_idx))

            # if inds is None: 
            items = (data[key] for key in keys)
            for item in izip(*items): 
                yield item
            # else:
            #     for i, item in enumerate(data[key]): 
            #         if inds[ii] == idx + i: 
            #             yield item
            #             ii += 1
            #             if ii >= len(inds): break
            #     idx += len(data[key])
项目:pybot    作者:spillai    | 项目源码 | 文件源码
def iterchunks(self, key, batch_size=10, verbose=False): 
        if key not in self.keys_: 
            raise RuntimeError('Key %s not found in dataset. keys: %s' % (key, self.keys_))

        idx, ii = 0, 0
        total_chunks = len(self.meta_file_.chunks)
        batch_chunks = grouper(range(len(self.meta_file_.chunks)), batch_size)

        for chunk_group in progressbar(batch_chunks, size=total_chunks / batch_size, verbose=verbose): 
            items = []
            # print key, chunk_group
            for chunk_idx in chunk_group: 
                # grouper will fill chunks with default none values
                if chunk_idx is None: continue
                # Load chunk
                data = AttrDict.load(self.get_chunk_filename(chunk_idx))
                for item in data[key]: 
                    items.append(item)
            yield items
项目:gransk    作者:pcbje    | 项目源码 | 文件源码
def test_config(self):
    with open('config.yml') as inp:
      config = yaml.load(inp.read())

    _find_entities = find_entities.Subscriber(test_helper.get_mock_pipeline([]))
    _find_entities.setup(config)

    doc = document.get_document('dummy')

    for entity_type, pattern_conf in config.get(helper.ENTITIES, {}).items():
      if not isinstance(pattern_conf['test'], list):
        pattern_conf['test'] = [pattern_conf['test']]

      for test in pattern_conf['test']:
        doc.text = 'dum dum {} dum'.format(test)
        _find_entities.consume(doc, None)
        entities = doc.entities.get_all()

        self.assertEqual(1, len(entities),
                         msg='regex for %s found nothing' % entity_type)
        self.assertEqual(entity_type, entities[0][1]['type'])
        self.assertEqual(test, entities[0][1]['value'])
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def import_scitools_yaml_to_neo4j(ctx, yaml_path, neo4j_url='bolt://localhost',
                                  user='neo4j', labels=''):
    """

    :param labels:
    :param ctx:
    :param yaml_path:
    :param neo4j_url:
    :param user:
    """
    label_list = to_label_list(labels)
    with open(yaml_path, 'r') as input_stream:
        scitools_db = yaml.load(input_stream)
    neo4j_client = connect_neo4j(ctx, neo4j_url, user)
    ScitoolsETL.import_to_neo4j(scitools_db, neo4j_client, labels=label_list)


# noinspection PyUnusedLocal
项目:webtzite    作者:materialsproject    | 项目源码 | 文件源码
def split_config(config):
        """Split the 'config' object into a set of fields.

        :param config: Configuration
        :type param: dict or str
        :raises: yaml.error.YAMLError if config is a str that doesn't parse
        """
        if not isinstance(config, dict):
            config = yaml.load(config)
        db = config.get("db", None)
        host = config.get("host", "0.0.0.0")
        user_name = config.get("user_name", None)
        password = config.get("password", None)
        port = int(config.get("port", 27017))
        coll = config.get("collection", None)
        return db, host, user_name, password, port, coll
项目:messier    作者:conorsch    | 项目源码 | 文件源码
def parse_messier_config(self, config_filepath=".messier"):
        """
        Read YAML config file for Messier. Defaults to .messier.
        Supported options include:

          `serverspec_commands`: list of shell commands to run for Serverspec
          `serverspec_base_directory`: directory to cd into prior to running Serverspec
        """
        try:
            config_file = open(config_filepath,'r')
        except IOError:
            config = {}
        else:
            config = yaml.load(config_file)
            if not config:
                config = {}
        return config


    # Elegant solution from https://gist.github.com/LeoHuckvale/8f50f8f2a6235512827b
    # Stuffing this method into class because it's harder to reference otherwise
项目:caduc    作者:tjamet    | 项目源码 | 文件源码
def __init__(self, options=[], config_path=None):
        if config_path is None:
            config_path = os.path.join(os.path.expanduser("~"), ".caduc", "config.yml")
            if os.path.exists(config_path):
                config = yaml.load(open(config_path, 'r'))
            else:
                config = {}
        else:
            config = yaml.load(open(config_path, 'r'))
        super(Config, self).__init__(**config)
        for opt in options:
            k, v = self.parse_kv(opt)
            node = {}
            child = node
            keys = self.parse_key(k)
            for key in keys[:-1]:
                child[key] = {}
                child = child[key]
            child[keys[-1]] = v
            self.update(node)
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _get_job_parameters(self, job_spec, job_log_dir, client):
        with open('{}/{}'.format(self.bench_dir, job_spec, 'r')) as yml:
            try:
                job = yaml.load(yml)
            except YAMLError as error:
                log.error('Error parsing job spec in file {}/fio/{}'.format(self.bench_dir, job_spec))
                log.error(error)
                raise error
        output_options = '''
        write_bw_log={logdir}/output
        write_lat_log={logdir}/output
        write_hist_log={logdir}/output
        write_iops_log={logdir}/output
        '''.format(logdir=job_log_dir)
        job.update({'dir': self.work_dir, 'output_options': output_options,
                    'client': client})
        return job
项目:DeepSea    作者:SUSE    | 项目源码 | 文件源码
def _record_filter(args, base_dir):
    """
    Save the filter provided
    """
    filter_file = '{}/.filter'.format(base_dir)

    if not isfile(filter_file):
        # do a touch filter_file
        open(filter_file, 'a').close()

    current_filter = {}
    with open(filter_file) as filehandle:
        current_filter = yaml.load(filehandle)
    if current_filter is None:
        current_filter = {}

    pprint.pprint(current_filter)

    # filter a bunch of salt content and the target key before writing
    rec_args = {k: v for k, v in args.items() if k is not 'target' and not
                k.startswith('__')}
    current_filter[args['target']] = rec_args

    with open(filter_file, 'w') as filehandle:
        yaml.dump(current_filter, filehandle, default_flow_style=False)
项目:manage    作者:rochacbruno    | 项目源码 | 文件源码
def load_manage_dict(filename=None):
    manage_filename = None
    if not MANAGE_DICT:
        if filename:
            manage_filename = filename
        elif os.path.exists(MANAGE_FILE):
            manage_filename = MANAGE_FILE
        elif os.path.exists(HIDDEN_MANAGE_FILE):
            manage_filename = HIDDEN_MANAGE_FILE
        else:
            MANAGE_DICT.update(copy.deepcopy(default_manage_dict))
            MANAGE_DICT['shell']['banner']['message'] = (
                "WARNING: This is not a managed project\n"
                "\tPlease `exit()` and \n"
                "\trun `$ manage init`\n"
                "\tand edit `manage.yml` file with desired options"
            )
            MANAGE_DICT['shell']['auto_import']['display'] = False
        if manage_filename:
            with open(manage_filename) as manage_file:
                MANAGE_DICT.update(yaml.load(manage_file))
    return MANAGE_DICT
项目:clouds-aws    作者:elias5000    | 项目源码 | 文件源码
def load_parameters(stack):
    """load parameters from yaml file and return as dictionary"""
    params = []
    param_path = path.join('stacks', stack, 'parameters.yaml')

    if not path.exists(param_path):
        return params

    with open(param_path, encoding='utf-8') as file:
        params_raw = yaml.load(file.read())

        # build parameter dict
        for param in params_raw.keys():
            params.append({
                'ParameterKey': param,
                'ParameterValue': params_raw[param]
            })
    return params
项目:privcount    作者:privcount    | 项目源码 | 文件源码
def run(self):
        '''
        Called by twisted
        '''
        # load initial config
        self.refresh_config()
        if self.config is None:
            logging.critical("cannot start due to error in config file")
            return

        # refresh and check status every event_period seconds
        self.refresh_task = task.LoopingCall(self.refresh_loop)
        refresh_deferred = self.refresh_task.start(self.config['event_period'], now=False)
        refresh_deferred.addErrback(errorCallback)

        # setup server for receiving blinded counts from the DC nodes and key shares from the SK nodes
        listen_port = self.config['listen_port']
        key_path = self.config['key']
        cert_path = self.config['cert']
        ssl_context = ssl.DefaultOpenSSLContextFactory(key_path, cert_path)

        logging.info("Tally Server listening on port {}".format(listen_port))
        reactor.listenSSL(listen_port, self, ssl_context)
        reactor.run()
项目:klaxer    作者:klaxer    | 项目源码 | 文件源码
def __init__(self):
        self._classification_rules = {}
        self._exclusion_rules = {}
        self._enrichment_rules = {}
        self._routing_rules = {}
        self._config = None

        try:
            # TODO: Absolute path? Where should this live?
            with open('config/klaxer.yml', 'r') as ymlfile:
                self._config = yaml.load(ymlfile)
        except yaml.YAMLError as ye:
            raise ConfigurationError('failed to parse config') from ye

        for section in self._config:
            # Subsequent definitions of the same service will overwrite the
            # previous ones.
            self._build_rules(section)
项目:functest    作者:opnfv    | 项目源码 | 文件源码
def process_abot_test_result(file_path):
    """ Process ABoT Result """
    with open(file_path) as test_result:
        data = json.load(test_result)
        res = []
        for tests in data:
            tests = update_data(tests)
            try:
                flatten_steps = tests['elements'][0].pop('flatten_steps')
                for steps in flatten_steps:
                    steps['result'] = steps['step_status']
                    res.append(steps)
            except:
                logging.error("Could not post data to ElasticSearch host")
                raise
        return res
项目:Ushio    作者:Hanaasagi    | 项目源码 | 文件源码
def post(self):
        if self.current_user['level'] != 0:
            self.custom_error()
        settings = {
            'init_money': int(self.get_body_argument('init_money')),
            'reg_type': self.get_body_argument('reg_type'),
            'cookie_secret': self.get_body_argument('cookie_secret') or self.settings['cookie_secret'],
            'site': {
                'name': self.get_body_argument('sitename'),
                'keyword': self.get_body_argument('keyword'),
                'description': self.get_body_argument('description')
            }
        }
        self.settings.update(settings)
        custom_settings = {}
        with open(self.settings['config_file'], 'r') as f:
            custom_settings = yaml.load(f)
            custom_settings['global'].update(settings)
        with open(self.settings['config_file'], 'w') as f:
            yaml.dump(custom_settings, f,
                      default_flow_style=False, default_style='"')
        self.redirect('/ushio/setting')
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def parse_config(conf_file):
    if not os.path.isfile(conf_file):
        logging.error('Invalid config file: %s.' % conf_file)
        return False
    return yaml.load(open(conf_file).read())
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def __init__(self, *args):
        self.required_options = args
        self['config'] = hookenv.config()
        with open(os.path.join(hookenv.charm_dir(), 'config.yaml')) as fp:
            self.config = yaml.load(fp).get('options', {})
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def read_context(self, file_name):
        if not os.path.isabs(file_name):
            file_name = os.path.join(hookenv.charm_dir(), file_name)
        with open(file_name, 'r') as file_stream:
            data = yaml.load(file_stream)
            if not data:
                raise OSError("%s is empty" % file_name)
            return data
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def get_matchmaker_map(mm_file='/etc/oslo/matchmaker_ring.json'):
    mm_map = {}
    if os.path.isfile(mm_file):
        with open(mm_file, 'r') as f:
            mm_map = json.load(f)
    return mm_map
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def _git_yaml_load(projects_yaml):
    """
    Load the specified yaml into a dictionary.
    """
    if not projects_yaml:
        return None

    return yaml.load(projects_yaml)
项目:qgis_wp    作者:Zverik    | 项目源码 | 文件源码
def __init__(self, iface):
        self.iface = iface
        self.path = os.path.dirname(os.path.realpath(__file__))
        locale = QSettings().value("locale/userLocale")[0:2]
        localePath = os.path.join(self.path, 'i18n', '{}.qm'.format(locale))
        if os.path.exists(localePath):
            self.translator = QTranslator()
            self.translator.load(localePath)
            QCoreApplication.installTranslator(self.translator)
项目:qgis_wp    作者:Zverik    | 项目源码 | 文件源码
def createRotationLayer(self):
        pies = QgsMapLayerRegistry.instance().mapLayersByName(PIE_LAYER)
        if not pies:
            self.iface.messageBar().pushCritical(
                self.tr(u'No layer'), self.tr(u'Please add "{}" layer.').format(PIE_LAYER))
            return
        pie = pies[0]
        if not pie.featureCount():
            self.iface.messageBar().pushInfo(
                self.tr(u'No data'), self.tr(u'No features in the "{}" layer.').format(PIE_LAYER))
            return
        if pie.isEditable():
            self.iface.vectorLayerTools().saveEdits(pie)

        boxes = runalg('qgis:orientedminimumboundingbox', pie, True, None)
        boxesLayer = QgsVectorLayer(boxes['OUTPUT'], ROTATION_LAYER, 'ogr')
        if not boxesLayer.isValid():
            self.iface.messageBar().pushCritical(
                self.tr(u'Access error'), self.tr(u'Failed to load a temporary processing layer.'))
            return

        self.addFieldToLayer(boxesLayer, NAME_FIELD, QVariant.String)
        rotIndex = boxesLayer.dataProvider().fieldNameIndex('ANGLE')
        nameIndex = boxesLayer.dataProvider().fieldNameIndex(NAME_FIELD)
        iterpie = pie.getFeatures()
        for box in boxesLayer.getFeatures():
            name = next(iterpie)['name']
            angle = round(box['ANGLE'])
            if box['WIDTH'] > box['HEIGHT']:
                angle += 90 if angle < 0 else -90
            geom = QgsGeometry(box.geometry())
            geom.rotate(angle, box.geometry().boundingBox().center())
            boxesLayer.dataProvider().changeAttributeValues(
                {box.id(): {rotIndex: angle, nameIndex: name}})
            boxesLayer.dataProvider().changeGeometryValues({box.id(): geom})
        QgsMapLayerRegistry.instance().addMapLayer(boxesLayer)
        self.iface.legendInterface().setLayerVisible(boxesLayer, False)
        self.iface.legendInterface().setLayerVisible(pie, False)
        return boxesLayer
项目:AutoML5    作者:djajetic    | 项目源码 | 文件源码
def show_io(input_dir, output_dir):     
    swrite('\n=== DIRECTORIES ===\n\n')
    # Show this directory
    swrite("-- Current directory " + pwd() + ":\n")
    write_list(ls('.'))
    write_list(ls('./*'))
    write_list(ls('./*/*'))
    swrite("\n")

    # List input and output directories
    swrite("-- Input directory " + input_dir + ":\n")
    write_list(ls(input_dir))
    write_list(ls(input_dir + '/*'))
    write_list(ls(input_dir + '/*/*'))
    write_list(ls(input_dir + '/*/*/*'))
    swrite("\n")
    swrite("-- Output directory  " + output_dir + ":\n")
    write_list(ls(output_dir))
    write_list(ls(output_dir + '/*'))
    swrite("\n")

    # write meta data to sdterr
    swrite('\n=== METADATA ===\n\n')
    swrite("-- Current directory " + pwd() + ":\n")
    try:
        metadata = yaml.load(open('metadata', 'r'))
        for key,value in metadata.items():
            swrite(key + ': ')
            swrite(str(value) + '\n')
    except:
        swrite("none\n");
    swrite("-- Input directory " + input_dir + ":\n")
    try:
        metadata = yaml.load(open(os.path.join(input_dir, 'metadata'), 'r'))
        for key,value in metadata.items():
            swrite(key + ': ')
            swrite(str(value) + '\n')
        swrite("\n")
    except:
        swrite("none\n");
项目:decouvrez_django    作者:oc-courses    | 项目源码 | 文件源码
def handle(self, *args, **options):
        reference = 0
        # open file with data
        directory = os.path.dirname(os.path.dirname(__file__))
        path = os.path.join(directory, 'data', 'albums.yml')
        with open(path, 'r') as file:
            data = yaml.load(file)
            albums = data['albums']
            for album in albums:
                # Create artists
                artists = []
                for artist in album['artists']:
                    try:
                        stored_artist = Artist.objects.get(name=artist)
                        lg.info('Artist found: %s'%stored_artist)
                    except ObjectDoesNotExist:
                        stored_artist = Artist.objects.create(name=artist)
                        lg.info('Artist created: %s'%stored_artist)
                    artists.append(stored_artist)
                # Find or create album
                try:
                    stored_album = Album.objects.get(title=album['title'])
                    lg.info('Album found: %s'%stored_album.title)
                except ObjectDoesNotExist:
                    reference += 1
                    album = Album.objects.create(
                        title=album['title'],
                        reference=reference,
                        picture=album['picture']
                    )
                    album.artists = artists
                    lg.info('New album: %s'%stored_artist)
项目:hdcp_test    作者:imamotts    | 项目源码 | 文件源码
def __init__(self, conf_file):

        f = open(conf_file, "r")
        conf = yaml.load(f)
        f.close()

        cert_bin = self.__hex2bin(conf["cert_hex"])

        priv = conf["priv_key"]
        P_bin           = self.__hex2bin(priv["P_hex"])
        Q_bin           = self.__hex2bin(priv["Q_hex"])
        d_mod_p_1_bin   = self.__hex2bin(priv["d_mod_p-1_hex"])
        d_mod_q_1_bin   = self.__hex2bin(priv["d_mod_q-1_hex"])
        inv_q_mod_p_bin = self.__hex2bin(priv["inv_q_mod_p_hex"])

        self.__cert_bin   = cert_bin
        self.__priv_key_P = int.from_bytes(P_bin, "big")
        self.__priv_key_Q = int.from_bytes(Q_bin, "big")
        self.__priv_key_d_mod_p_1   = int.from_bytes(d_mod_p_1_bin, "big")
        self.__priv_key_d_mod_q_1   = int.from_bytes(d_mod_q_1_bin, "big")
        self.__priv_key_inv_q_mod_p = int.from_bytes(inv_q_mod_p_bin, "big")


        f = open("yaml/global.yaml", "r")
        conf = yaml.load(f)
        f.close()

        global_constant_hex = re.sub(r'\s', "", conf["global_constant"])
        self.__global_constant_bin = bytes.fromhex(global_constant_hex)
项目:hdcp_test    作者:imamotts    | 项目源码 | 文件源码
def __init__(self, conf_yaml):
        f = open(conf_yaml, "r")
        conf = yaml.load(f)
        f.close()
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def load(self, filename):
        self.filename = filename
        with open(filename, 'r') as stream:
            self.config = yaml.load(stream)
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def reload(self):
        self.load(self.filename)
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def load(self, name, force_reload_if_unmanaged=False):
        if name in sys.modules and name not in self.plugins:
            # we're getting an already loaded module, which we has not been
            # loaded through PluginManager, return it from sys.modules and
            # add it to our list
            module = sys.modules[name]
            if force_reload_if_unmanaged:
                importlib.reload(module)
        else:
            module = importlib.import_module(name)
        self.plugins.add(name)
        return module
项目:spoonybard    作者:notnownikki    | 项目源码 | 文件源码
def load_yaml(self, job_yaml):
        obj = yaml.load(job_yaml)
        job_steps = []
        for step in obj['steps']:
            handler_name = list(step.keys())[0]
            plugin = engine.plugins.get_job_handler(handler_name)
            job_steps.append(plugin(step[handler_name]))
        job = Job()
        job.steps = job_steps
        return job
项目:sensor21    作者:21dotco    | 项目源码 | 文件源码
def docs():
    """
    Provides the manifest.json file for the 21 endpoint crawler.
    """
    with open('./manifest.yaml', 'r') as f:
        manifest = yaml.load(f)
    return json.dumps(manifest)
项目:rust_pypi_example    作者:mckaymatt    | 项目源码 | 文件源码
def load_yaml_config(filepath):
    with open(filepath) as f:
        return yaml.load(f)