Python future.utils 模块,viewitems() 实例源码


项目:fallball-connector    作者:ingrammicro    | 项目源码 | 文件源码
def test_config_load(self):
        config = InlineClass({
            'debug': True,
            'fallball_service_url': 'PUT_HERE_FALLBALL_SERVICE_URI',
            'fallball_service_authorization_token': 'PUT_HERE_FALLBALL_SERVICE_AUTHORIZATION_TOKEN',
            'oauth_key': 'PUT_HERE_OAUTH_KEY',
            'oauth_secret': 'PUT_HERE_OAUTH_SECRET',
            'diskspace_resource': 'DISKSPACE',
            'devices_resource': 'DEVICES',

        for (key, value) in viewitems(config.__dict__):
            self.assertTrue(hasattr(Config, key))

        Config.conf_file = 'tests/fake_config_invalid.json'
        with self.assertRaises(RuntimeError):

        Config.conf_file = 'not_exists'
        with self.assertRaises(IOError):
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def __iter__(self):
        while self.index < self.total_iterations:
            idx = self.index % self.nbatches
            self.index += 1
            dict = {}
            for k, x in viewitems(self.data_arrays):
                if k == 'inp_txt' or k == 'teacher_tgt':
                    dict[k] = np.squeeze(x[:, idx:(idx + 1), :, :])
                    dict[k] = np.squeeze(x[:, idx:(idx + 1), :])

            yield dict
项目:OpenPoGoBot    作者:tehp    | 项目源码 | 文件源码
def _merge_config(d, u):
    for k, v in viewitems(u):
        if isinstance(v, collections.Mapping):
            r = _merge_config(d.get(k, {}), v)
            d[k] = r
            d[k] = u[k]
    return d
项目:envmgr-cli    作者:trainline    | 项目源码 | 文件源码
def __init__(self, options, *args, **kwargs):
        self.args = args
        self.kwargs = kwargs
        self.opts = {}
        self.cli_args = {}
        self.cmds = {}
        self.spinner = None
        self.register = {}
        user_agent = BaseCommand.get_user_agent()

        for (k, v) in options.items():
            if v is not None:
                cli_arg ='\<([\w\-]+)\>', k)
                cli_opt ='^\-\-([a-zA-Z0-9]+[\w\-]+)', k)

                if cli_arg is not None:
                    self.cli_args[] = v
                elif cli_opt is not None:
                    self.opts[] = v
                    self.cmds[k] = v

        safe_log_opts = {k: v for k, v in viewitems(self.opts) if k != 'pass' }

        logging.debug('Args: {0}'.format(self.cli_args))
        logging.debug('Opts: {0}'.format(safe_log_opts))
        logging.debug('User-Agent: {0}'.format(user_agent))

        host = self.get_config('host', 'ENVMGR_HOST')
        user = self.get_config('user', 'ENVMGR_USER')
        pwrd = self.get_password('pass', 'ENVMGR_PASS')
        headers = {'User-Agent':user_agent}
        envmgr.config(host, user, b64encode(pwrd.encode('ascii')), default_headers=headers)
项目:logger    作者:oval-group    | 项目源码 | 文件源码
def get(self):
        res = dict()
        for (name, child) in viewitems(self.children):
            res[name] = child.get()
        return res
项目:geckoboard-python    作者:helium    | 项目源码 | 文件源码
def from_json(cls, session, json):
        id = json.get('id')
        fields = json.get('fields')
        unique_fields = frozenset(json.get('unique_by', []))

        def _build_field(name, value):
            unique = name in unique_fields
            return Field.from_schema(value, unique=unique)

        fields = {f[0]: _build_field(f[0], f[1]) for f in viewitems(fields)}
        return cls(session, id, fields)
项目:geckoboard-python    作者:helium    | 项目源码 | 文件源码
def create(cls, session, id, fields):
        json = {
            'fields': {f[0]: f[1].to_schema() for f in viewitems(fields)}
        unique_by = [f[0] for f in viewitems(fields) if f[1].unique]
        if unique_by:
            json['unique_by'] = unique_by
        url = session.build_url('datasets', id)
        response = session.put(url, json=json)
        return cls.from_json(session, response.json())
项目:geckoboard-python    作者:helium    | 项目源码 | 文件源码
def _build_data_json(self, data):
        def _fields_json(entry):
            fields = self.fields
            return {f[0]: fields[f[0]].to_json(f[1])
                    for f in viewitems(entry)}

        json = {
            'data': [_fields_json(entry) for entry in data]
        return json
项目:stream2segment    作者:rizac    | 项目源码 | 文件源码
def yaml_load(filepath, **updates):
    """Loads a yaml file producing the corresponding Python dict (`safe_load`). Then:
    1. normalizes non-absolute sqlite path values relative to `filepath`, if any
    2. updates the dict values with `updqtes`
    and returns the yaml dict.
    :param updates: arguments which will updates the yaml dict before it is returned
    with open(filepath, 'r') as stream:
        ret = yaml.safe_load(stream)
    # convert sqlite into absolute paths, if any
    configfilepath = abspath(dirname(filepath))
    # convert relative sqlite path to absolute, assuming they are relative to the config:
    sqlite_prefix = 'sqlite:///'
    # we cannot modify a dict while in iteration, thus create a new dict of possibly
    # modified sqlite paths and use later dict.update
    newdict = {}
    for k, v in viewitems(ret):
            if v.startswith(sqlite_prefix) and ":memory:" not in v:
                dbpath = v[len(sqlite_prefix):]
                if not isabs(dbpath):
                    newdict[k] = sqlite_prefix + abspath(normpath(join(configfilepath, dbpath)))
        except AttributeError:
    return ret
项目:toxiproxy-python    作者:douglas    | 项目源码 | 文件源码
def proxies(self):
        """ Returns all the proxies registered in the server """

        proxies = APIConsumer.get("/proxies").json()
        proxies_dict = {}

        for name, values in viewitems(proxies):
            # Lets create a Proxy object to hold all its data
            proxy = Proxy(**values)

            # Add the new proxy to the toxiproxy proxies collection
            proxies_dict.update({name: proxy})

        return proxies_dict