Python six 模块,itervalues() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.itervalues()

项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def handle_data(self, data):
        if not self.ordered:
            for s in data:
                self.order(self.sid(s), 100)
            self.ordered = True

        if not self.exited:
            amounts = [pos.amount for pos
                       in itervalues(self.portfolio.positions)]
            if (
                all([(amount == 100) for amount in amounts]) and
                (len(amounts) == len(data.keys()))
            ):
                for stock in self.portfolio.positions:
                    self.order(self.sid(stock), -100)
                self.exited = True

        # Should be 0 when all positions are exited.
        self.record(num_positions=len(self.portfolio.positions))
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def get_treasury_data(start_date, end_date):
    bill_data = load_frame(
        format_bill_url(start_date, end_date, start_date),
        # We skip fewer rows here because we query for fewer bill fields,
        # which makes the header smaller.
        skiprows=18,
    )

    bond_data = load_frame(
        format_bond_url(start_date, end_date, start_date),
        skiprows=22,
    )
    check_known_inconsistencies(bill_data, bond_data)

    # dropna('any') removes the rows for which we only had data for one of
    # bills/bonds.
    out = pd.concat([bond_data, bill_data], axis=1).dropna(how='any')
    assert set(out.columns) == set(six.itervalues(COLUMN_NAMES))

    # Multiply by 0.01 to convert from percentages to expected output format.
    return out * 0.01
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def _get_file_contents(from_data, files):
    if not isinstance(from_data, (dict, list)):
        return
    if isinstance(from_data, dict):
        recurse_data = six.itervalues(from_data)
        for key, value in six.iteritems(from_data):
            if _ignore_if(key, value):
                continue
            if not value.startswith(('http://', 'https://')):
                raise exceptions.GetFileError(value, 'get_file')
            if value not in files:
                file_content = heat_utils.read_url_content(value)
                if template_utils.is_template(file_content):
                    template = get_template_files(template_url=value)[1]
                    file_content = jsonutils.dumps(template)
                files[value] = file_content
    else:
        recurse_data = from_data
    for value in recurse_data:
        _get_file_contents(value, files)
项目:vmware-nsx-tempest-plugin    作者:openstack    | 项目源码 | 文件源码
def get_server_info(self, cmgr, server_id):
        """Get server's ip addresses"""
        svr = cmgr.servers_client.show_server(server_id)
        svr = svr.get('server', svr)
        sinfo = dict(id=svr['id'], name=svr['name'],
                     security_gropus=svr['security_groups'],
                     fixed_ip_address=None, floating_ip_address=None)
        addresses = svr.get('addresses')
        for n_addresses in six.itervalues(addresses):
            for n_addr in n_addresses:
                if n_addr['OS-EXT-IPS:type'] == 'fixed':
                    if not sinfo['fixed_ip_address']:
                        sinfo['fixed_ip_address'] = n_addr['addr']
                elif n_addr['OS-EXT-IPS:type'] == 'floating':
                    if not sinfo['floating_ip_address']:
                        sinfo['floating_ip_address'] = n_addr['addr']
        return sinfo
项目:ggbackup    作者:lindegroup    | 项目源码 | 文件源码
def write_members(self):
        """Write the members CSV."""
        for group in itervalues(self.groups):
            filename = group['email'] + '-membership.csv'
            if self.datestamp:
                filename = self.append_datestamp(filename)
            path = os.path.join(self.path, filename)

            logger.debug('Writing %s...', path)

            with open(path, 'w') as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=[
                    'kind', 'id', 'email', 'role', 'type', 'status', 'etag'])
                writer.writeheader()
                for member in group['members']:
                    writer.writerow(member)
项目:Cloud-Custodian    作者:jtroberts83    | 项目源码 | 文件源码
def process_bucket(self, bucket):
        n = bucket['Notification']
        if not n:
            return

        statement_ids = self.data.get('statement_ids')
        if statement_ids == 'matched':
            statement_ids = bucket.get(BucketNotificationFilter.annotation_key, ())
        if not statement_ids:
            return

        cfg = defaultdict(list)

        for t in six.itervalues(BucketNotificationFilter.FIELDS):
            for c in n.get(t, []):
                if c['Id'] not in statement_ids:
                    cfg[t].append(c)

        client = bucket_client(local_session(self.manager.session_factory), bucket)
        client.put_bucket_notification_configuration(
            Bucket=bucket['Name'],
            NotificationConfiguration=cfg)
项目:psyplot    作者:Chilipp    | 项目源码 | 文件源码
def _get_ds_descriptions_unsorted(
            cls, data, ignore_keys=['attrs', 'plotter'], nums=None):
        """Recursive method to get all the file names or datasets out of a
        dictionary `data` created with the :meth`array_info` method"""
        ds_description = {'ds', 'fname', 'num', 'arr', 'store'}
        if 'ds' in data:
            # make sure that the data set has a number assigned to it
            data['ds'].psy.num
        keys_in_data = ds_description.intersection(data)
        if keys_in_data:
            return {key: data[key] for key in keys_in_data}
        for key in ignore_keys:
            data.pop(key, None)
        func = partial(cls._get_ds_descriptions_unsorted,
                       ignore_keys=ignore_keys, nums=nums)
        return chain(*map(lambda d: [d] if isinstance(d, dict) else d,
                          map(func, six.itervalues(data))))
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def _report_failed_challs(failed_achalls):
    """Notifies the user about failed challenges.

    :param set failed_achalls: A set of failed
        :class:`certbot.achallenges.AnnotatedChallenge`.

    """
    problems = dict()
    for achall in failed_achalls:
        if achall.error:
            problems.setdefault(achall.error.typ, []).append(achall)

    reporter = zope.component.getUtility(interfaces.IReporter)
    for achalls in six.itervalues(problems):
        reporter.add_message(
            _generate_failed_chall_msg(achalls), reporter.MEDIUM_PRIORITY)
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def _set_webroots(self, achalls):
        if self.conf("path"):
            webroot_path = self.conf("path")[-1]
            logger.info("Using the webroot path %s for all unmatched domains.",
                        webroot_path)
            for achall in achalls:
                self.conf("map").setdefault(achall.domain, webroot_path)
        else:
            known_webroots = list(set(six.itervalues(self.conf("map"))))
            for achall in achalls:
                if achall.domain not in self.conf("map"):
                    new_webroot = self._prompt_for_webroot(achall.domain,
                                                           known_webroots)
                    # Put the most recently input
                    # webroot first for easy selection
                    try:
                        known_webroots.remove(new_webroot)
                    except ValueError:
                        pass
                    known_webroots.insert(0, new_webroot)
                    self.conf("map")[achall.domain] = new_webroot
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def get_type_cls(cls, jobj):
        """Get the registered class for ``jobj``."""
        if cls in six.itervalues(cls.TYPES):
            if cls.type_field_name not in jobj:
                raise errors.DeserializationError(
                    "Missing type field ({0})".format(cls.type_field_name))
            # cls is already registered type_cls, force to use it
            # so that, e.g Revocation.from_json(jobj) fails if
            # jobj["type"] != "revocation".
            return cls

        if not isinstance(jobj, dict):
            raise errors.DeserializationError(
                "{0} is not a dictionary object".format(jobj))
        try:
            typ = jobj[cls.type_field_name]
        except KeyError:
            raise errors.DeserializationError("missing type field")

        try:
            return cls.TYPES[typ]
        except KeyError:
            raise errors.UnrecognizedTypeError(typ, jobj)
项目:pyramid_webpack    作者:stevearc    | 项目源码 | 文件源码
def includeme(config):
    """ Add pyramid_webpack methods and config to the app """
    settings = config.registry.settings
    root_package_name = config.root_package.__name__
    config.registry.webpack = {
        'DEFAULT': WebpackState(settings, root_package_name)
    }
    for extra_config in aslist(settings.get('webpack.configs', [])):
        state = WebpackState(settings, root_package_name, name=extra_config)
        config.registry.webpack[extra_config] = state

    # Set up any static views
    for state in six.itervalues(config.registry.webpack):
        if state.static_view:
            config.add_static_view(name=state.static_view_name,
                                   path=state.static_view_path,
                                   cache_max_age=state.cache_max_age)

    config.add_request_method(get_webpack, 'webpack')
项目:paragraph2vec    作者:thunlp    | 项目源码 | 文件源码
def compactify(self):
        """
        Assign new word ids to all words.

        This is done to make the ids more compact, e.g. after some tokens have
        been removed via :func:`filter_tokens` and there are gaps in the id series.
        Calling this method will remove the gaps.
        """
        logger.debug("rebuilding dictionary, shrinking gaps")

        # build mapping from old id -> new id
        idmap = dict(izip(itervalues(self.token2id), xrange(len(self.token2id))))

        # reassign mappings to new ids
        self.token2id = dict((token, idmap[tokenid]) for token, tokenid in iteritems(self.token2id))
        self.id2token = {}
        self.dfs = dict((idmap[tokenid], freq) for tokenid, freq in iteritems(self.dfs))
项目:paragraph2vec    作者:thunlp    | 项目源码 | 文件源码
def cossim(vec1, vec2):
    """
    Return cosine similarity between two sparse vectors.
    The similarity is a number between <-1.0, 1.0>, higher is more similar.
    """
    vec1, vec2 = dict(vec1), dict(vec2)
    if not vec1 or not vec2:
        return 0.0
    vec1len = 1.0 * math.sqrt(sum(val * val for val in itervalues(vec1)))
    vec2len = 1.0 * math.sqrt(sum(val * val for val in itervalues(vec2)))
    assert vec1len > 0.0 and vec2len > 0.0, "sparse documents must not contain any explicit zero entries"
    if len(vec2) < len(vec1):
        vec1, vec2 = vec2, vec1 # swap references so that we iterate over the shorter vector
    result = sum(value * vec2.get(index, 0.0) for index, value in iteritems(vec1))
    result /= vec1len * vec2len # rescale by vector lengths
    return result
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def __call__(self, *args, **kwargs):
        """
        Executes child computations in parallel.

        :arg args: list of values to the placeholders specified in __init__ *args

        :return: tuple of return values, one per return specified in __init__ returns list.
        """
        args = self.unpack_args_or_feed_dict(args, kwargs)
        for child in itervalues(self.child_computations):
            child.feed_input([args[i] for i in child.param_idx])

        return_vals = dict()
        for child in itervalues(self.child_computations):
            return_vals.update(child.get_results())

        if isinstance(self.computation_op.returns, Op):
            return return_vals[self.computation_op.returns]
        elif isinstance(self.computation_op.returns, (collections.Sequence, OrderedSet)):
            return tuple(return_vals[op] for op in self.computation_op.returns)
        elif isinstance(self.computation_op.returns, collections.Set):
            return return_vals
        else:
            return None
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def add_tensor_decl(self, tensor_decl):
        if tensor_decl in self.tensors_with_nodes:
            return
        if tensor_decl in self.tensors_without_nodes:
            self.tensors_without_nodes.remove(tensor_decl)
        self.tensors_with_nodes.add(tensor_decl)

        views_labels = ' | '.join(['<{}>'.format(self.tensor_view_decl_ext(tensor_view_decl))
                                   for tensor_view_decl in
                                   six.itervalues(tensor_decl.tensor_view_decls)])
        label = '{ <tensor> ' + tensor_decl.name + ' | { ' + views_labels + ' } }'
        self.graph.node(self.tensor_decl_name(tensor_decl), label=label, shape='Mrecord',
                        fillcolor=tensor_color, style='filled')
        if False:
            self.graph.edge(self.tensor_decl_name(tensor_decl), self.exop_name(tensor_decl),
                            color=tensor_edge_color, style='dashed')
项目:networking-ovn    作者:netgroup-polito    | 项目源码 | 文件源码
def get_acls(self, context):
        """create the list of ACLS in OVN.

        @param context: neutron context
        @type  context: object of type neutron.context.Context
        @var   lswitch_names: List of lswitch names
        @var   acl_list: List of NB acls
        @var   acl_list_dict: Dictionary of acl-lists based on lport as key
        @return: acl_list-dict
        """
        lswitch_names = set([])
        for network in self.core_plugin.get_networks(context):
            lswitch_names.add(network['id'])
        acl_dict, ignore1, ignore2 = \
            self.ovn_api.get_acls_for_lswitches(lswitch_names)
        acl_list = list(itertools.chain(*six.itervalues(acl_dict)))
        acl_list_dict = {}
        for acl in acl_list:
            key = acl['lport']
            if key in acl_list_dict:
                acl_list_dict[key].append(acl)
            else:
                acl_list_dict[key] = list([acl])
        return acl_list_dict
项目:deb-oslo.versionedobjects    作者:openstack    | 项目源码 | 文件源码
def test_object_dict_syntax(self):
        obj = MyObj(foo=123, bar=u'text')
        self.assertEqual(obj['foo'], 123)
        self.assertIn('bar', obj)
        self.assertNotIn('missing', obj)
        self.assertEqual(sorted(iter(obj)),
                         ['bar', 'foo'])
        self.assertEqual(sorted(obj.keys()),
                         ['bar', 'foo'])
        self.assertEqual(sorted(obj.iterkeys()),
                         ['bar', 'foo'])
        self.assertEqual(sorted(obj.values(), key=str),
                         [123, u'text'])
        self.assertEqual(sorted(obj.itervalues(), key=str),
                         [123, u'text'])
        self.assertEqual(sorted(obj.items()),
                         [('bar', u'text'), ('foo', 123)])
        self.assertEqual(sorted(list(obj.iteritems())),
                         [('bar', u'text'), ('foo', 123)])
        self.assertEqual(dict(obj),
                         {'foo': 123, 'bar': u'text'})
项目:transform    作者:tensorflow    | 项目源码 | 文件源码
def to_instance_dicts(batch_dict):
  """Converts from the internal batch format to a list of instances.

  Args:
    batch_dict: A dict in the in-memory batch format, as returned by
      `make_output_dict`.

  Returns:
    A list of dicts in the in-memory instance format.
  """
  def get_instance_values(batch_dict):
    # SparseFeatures are represented as a 2-tuple of list of lists, so
    # in that case we convert to a list of 2-tuples of lists.
    columns = (column if not isinstance(column, tuple) else zip(*column)
               for column in six.itervalues(batch_dict))
    return itertools.izip(*columns)

  return [dict(zip(six.iterkeys(batch_dict), instance_values))
          for instance_values in get_instance_values(batch_dict)]
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def handle_data(self, data):
        if not self.ordered:
            for s in self.sids:
                self.order(self.sid(s), 1)
            self.ordered = True

        if not self.exited:
            amounts = [pos.amount for pos
                       in itervalues(self.portfolio.positions)]

            if (
                len(amounts) > 0 and
                all([(amount == 1) for amount in amounts])
            ):
                for stock in self.portfolio.positions:
                    self.order(self.sid(stock), -1)
                self.exited = True

        # Should be 0 when all positions are exited.
        self.record(num_positions=len(self.portfolio.positions))
项目:catalyst    作者:enigmampc    | 项目源码 | 文件源码
def get_treasury_data(start_date, end_date):
    bill_data = load_frame(
        format_bill_url(start_date, end_date, start_date),
        # We skip fewer rows here because we query for fewer bill fields,
        # which makes the header smaller.
        skiprows=18,
    )
    bond_data = load_frame(
        format_bond_url(start_date, end_date, start_date),
        skiprows=22,
    )
    check_known_inconsistencies(bill_data, bond_data)

    # dropna('any') removes the rows for which we only had data for one of
    # bills/bonds.
    out = pd.concat([bond_data, bill_data], axis=1).dropna(how='any')
    assert set(out.columns) == set(six.itervalues(COLUMN_NAMES))

    # Multiply by 0.01 to convert from percentages to expected output format.
    return out * 0.01
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def _check_delete(self):
        '''Check project delete'''
        now = time.time()
        for project in list(itervalues(self.projects)):
            if project.db_status != 'STOP':
                continue
            if now - project.updatetime < self.DELETE_TIME:
                continue
            if 'delete' not in self.projectdb.split_group(project.group):
                continue

            logger.warning("deleting project: %s!", project.name)
            del self.projects[project.name]
            self.taskdb.drop(project.name)
            self.projectdb.drop(project.name)
            if self.resultdb:
                self.resultdb.drop(project.name)
            for each in self._cnt.values():
                del each[project.name]
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def complete_contexts(self):
        '''
        Returns a list of context interfaces that yield a complete context.
        '''
        interfaces = []
        [interfaces.extend(i.complete_contexts())
         for i in six.itervalues(self.templates)]
        return interfaces
项目:charm-plumgrid-gateway    作者:openstack    | 项目源码 | 文件源码
def get_incomplete_context_data(self, interfaces):
        '''
        Return dictionary of relation status of interfaces and any missing
        required context data. Example:
            {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
             'zeromq-configuration': {'related': False}}
        '''
        incomplete_context_data = {}

        for i in six.itervalues(self.templates):
            for context in i.contexts:
                for interface in interfaces:
                    related = False
                    if interface in context.interfaces:
                        related = context.get_related()
                        missing_data = context.missing_data
                        if missing_data:
                            incomplete_context_data[interface] = {'missing_data': missing_data}
                        if related:
                            if incomplete_context_data.get(interface):
                                incomplete_context_data[interface].update({'related': True})
                            else:
                                incomplete_context_data[interface] = {'related': True}
                        else:
                            incomplete_context_data[interface] = {'related': False}
        return incomplete_context_data
项目:sentry-plugins    作者:getsentry    | 项目源码 | 文件源码
def build_description(self, event):
        enhanced_privacy = event.organization.flags.enhanced_privacy
        if enhanced_privacy:
            return ENHANCED_PRIVACY_BODY

        interface_list = []
        for interface in six.itervalues(event.interfaces):
            body = interface.to_string(event)
            if not body:
                continue
            interface_list.append((interface.get_title(), body))

        return '\n\n'.join(('{}\n-----------\n\n{}'.format(k, v) for k, v in interface_list))
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def complete_contexts(self):
        '''
        Returns a list of context interfaces that yield a complete context.
        '''
        interfaces = []
        [interfaces.extend(i.complete_contexts())
         for i in six.itervalues(self.templates)]
        return interfaces
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_incomplete_context_data(self, interfaces):
        '''
        Return dictionary of relation status of interfaces and any missing
        required context data. Example:
            {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
             'zeromq-configuration': {'related': False}}
        '''
        incomplete_context_data = {}

        for i in six.itervalues(self.templates):
            for context in i.contexts:
                for interface in interfaces:
                    related = False
                    if interface in context.interfaces:
                        related = context.get_related()
                        missing_data = context.missing_data
                        if missing_data:
                            incomplete_context_data[interface] = {'missing_data': missing_data}
                        if related:
                            if incomplete_context_data.get(interface):
                                incomplete_context_data[interface].update({'related': True})
                            else:
                                incomplete_context_data[interface] = {'related': True}
                        else:
                            incomplete_context_data[interface] = {'related': False}
        return incomplete_context_data
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def calc_position_values(amounts,
                         last_sale_prices,
                         value_multipliers):
    iter_amount_price_multiplier = zip(
        amounts,
        last_sale_prices,
        itervalues(value_multipliers),
    )
    return [
        price * amount * multiplier for
        price, amount, multiplier in iter_amount_price_multiplier
    ]
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def calc_position_exposures(amounts,
                            last_sale_prices,
                            exposure_multipliers):
    iter_amount_price_multiplier = zip(
        amounts,
        last_sale_prices,
        itervalues(exposure_multipliers),
    )
    return [
        price * amount * multiplier for
        price, amount, multiplier in iter_amount_price_multiplier
    ]
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def __init__(self, terms):
        super(TermGraph, self).__init__(self)

        self._frozen = False
        parents = set()
        for term in itervalues(terms):
            self._add_to_graph(term, parents, extra_rows=0)
            # No parents should be left between top-level terms.
            assert not parents

        self._outputs = terms
        self._ordered = topological_sort(self)

        # Mark that no more terms should be added to the graph.
        self._frozen = True
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def test_retrieve_specific_type(self, type_, lookup_name, failure_type):
        equities = make_simple_equity_info(
            range(5),
            start_date=pd.Timestamp('2014-01-01'),
            end_date=pd.Timestamp('2015-01-01'),
        )
        max_equity = equities.index.max()
        futures = make_commodity_future_info(
            first_sid=max_equity + 1,
            root_symbols=['CL'],
            years=[2014],
        )
        equity_sids = [0, 1]
        future_sids = [max_equity + 1, max_equity + 2, max_equity + 3]
        if type_ == Equity:
            success_sids = equity_sids
            fail_sids = future_sids
        else:
            fail_sids = equity_sids
            success_sids = future_sids

        with tmp_asset_finder(equities=equities, futures=futures) as finder:
            # Run twice to exercise caching.
            lookup = getattr(finder, lookup_name)
            for _ in range(2):
                results = lookup(success_sids)
                self.assertIsInstance(results, dict)
                self.assertEqual(set(results.keys()), set(success_sids))
                self.assertEqual(
                    valmap(int, results),
                    dict(zip(success_sids, success_sids)),
                )
                self.assertEqual(
                    {type_},
                    {type(asset) for asset in itervalues(results)},
                )
                with self.assertRaises(failure_type):
                    lookup(fail_sids)
                with self.assertRaises(failure_type):
                    # Should fail if **any** of the assets are bad.
                    lookup([success_sids[0], fail_sids[0]])
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def tearDown(self):
        """
        Each test consumes a source, we need to rewind it.
        """
        for _, source in itervalues(self.sim_and_source):
            source.rewind()
项目:zipline-chinese    作者:zhanghan1990    | 项目源码 | 文件源码
def everything_but(k, d):
    """
    Return iterator of all values in d except the values in k.
    """
    assert k in d
    return concat(itervalues(keyfilter(ne(k), d)))
项目:caduc    作者:tjamet    | 项目源码 | 文件源码
def test_update_timers(self):
        images = self.getImages()
        img_mocks = {}
        for key in self.faker.pyiterable(10, True, str):
            img = mock.Mock()
            img.update_timer = mock.Mock()
            img_mocks[key] = img
        images.update(img_mocks)
        images.update_timers()
        for img in six.itervalues(img_mocks):
            img.update_timer.assert_called_once_with()
项目:caduc    作者:tjamet    | 项目源码 | 文件源码
def update_timers(self):
        for image in six.itervalues(self):
            image.update_timer()
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_iter(self):
        keys = ['first', 'middle', 'last']
        values = list(range(len(keys)))
        items = list(zip(keys, values))
        om = OrderedMap(items)

        itr = iter(om)
        self.assertEqual(sum([1 for _ in itr]), len(keys))
        self.assertRaises(StopIteration, six.next, itr)

        self.assertEqual(list(iter(om)), keys)
        self.assertEqual(list(six.iteritems(om)), items)
        self.assertEqual(list(six.itervalues(om)), values)
项目:girder_worker    作者:girder    | 项目源码 | 文件源码
def __check(self, key=None, oldvalue=None, newvalue=None, **kw):
        """Call all registered validation methods on this spec."""
        for func in six.itervalues(self.__checks):
            func(self, key, oldvalue, newvalue)
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def complete_contexts(self):
        '''
        Returns a list of context interfaces that yield a complete context.
        '''
        interfaces = []
        [interfaces.extend(i.complete_contexts())
         for i in six.itervalues(self.templates)]
        return interfaces
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def get_incomplete_context_data(self, interfaces):
        '''
        Return dictionary of relation status of interfaces and any missing
        required context data. Example:
            {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
             'zeromq-configuration': {'related': False}}
        '''
        incomplete_context_data = {}

        for i in six.itervalues(self.templates):
            for context in i.contexts:
                for interface in interfaces:
                    related = False
                    if interface in context.interfaces:
                        related = context.get_related()
                        missing_data = context.missing_data
                        if missing_data:
                            incomplete_context_data[interface] = {'missing_data': missing_data}
                        if related:
                            if incomplete_context_data.get(interface):
                                incomplete_context_data[interface].update({'related': True})
                            else:
                                incomplete_context_data[interface] = {'related': True}
                        else:
                            incomplete_context_data[interface] = {'related': False}
        return incomplete_context_data
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def complete_contexts(self):
        '''
        Returns a list of context interfaces that yield a complete context.
        '''
        interfaces = []
        [interfaces.extend(i.complete_contexts())
         for i in six.itervalues(self.templates)]
        return interfaces
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def get_incomplete_context_data(self, interfaces):
        '''
        Return dictionary of relation status of interfaces and any missing
        required context data. Example:
            {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
             'zeromq-configuration': {'related': False}}
        '''
        incomplete_context_data = {}

        for i in six.itervalues(self.templates):
            for context in i.contexts:
                for interface in interfaces:
                    related = False
                    if interface in context.interfaces:
                        related = context.get_related()
                        missing_data = context.missing_data
                        if missing_data:
                            incomplete_context_data[interface] = {'missing_data': missing_data}
                        if related:
                            if incomplete_context_data.get(interface):
                                incomplete_context_data[interface].update({'related': True})
                            else:
                                incomplete_context_data[interface] = {'related': True}
                        else:
                            incomplete_context_data[interface] = {'related': False}
        return incomplete_context_data
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def complete_contexts(self):
        '''
        Returns a list of context interfaces that yield a complete context.
        '''
        interfaces = []
        [interfaces.extend(i.complete_contexts())
         for i in six.itervalues(self.templates)]
        return interfaces
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def complete_contexts(self):
        '''
        Returns a list of context interfaces that yield a complete context.
        '''
        interfaces = []
        [interfaces.extend(i.complete_contexts())
         for i in six.itervalues(self.templates)]
        return interfaces
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def get_incomplete_context_data(self, interfaces):
        '''
        Return dictionary of relation status of interfaces and any missing
        required context data. Example:
            {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
             'zeromq-configuration': {'related': False}}
        '''
        incomplete_context_data = {}

        for i in six.itervalues(self.templates):
            for context in i.contexts:
                for interface in interfaces:
                    related = False
                    if interface in context.interfaces:
                        related = context.get_related()
                        missing_data = context.missing_data
                        if missing_data:
                            incomplete_context_data[interface] = {'missing_data': missing_data}
                        if related:
                            if incomplete_context_data.get(interface):
                                incomplete_context_data[interface].update({'related': True})
                            else:
                                incomplete_context_data[interface] = {'related': True}
                        else:
                            incomplete_context_data[interface] = {'related': False}
        return incomplete_context_data
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def complete_contexts(self):
        '''
        Returns a list of context interfaces that yield a complete context.
        '''
        interfaces = []
        [interfaces.extend(i.complete_contexts())
         for i in six.itervalues(self.templates)]
        return interfaces
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def complete_contexts(self):
        '''
        Returns a list of context interfaces that yield a complete context.
        '''
        interfaces = []
        [interfaces.extend(i.complete_contexts())
         for i in six.itervalues(self.templates)]
        return interfaces
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def get_incomplete_context_data(self, interfaces):
        '''
        Return dictionary of relation status of interfaces and any missing
        required context data. Example:
            {'amqp': {'missing_data': ['rabbitmq_password'], 'related': True},
             'zeromq-configuration': {'related': False}}
        '''
        incomplete_context_data = {}

        for i in six.itervalues(self.templates):
            for context in i.contexts:
                for interface in interfaces:
                    related = False
                    if interface in context.interfaces:
                        related = context.get_related()
                        missing_data = context.missing_data
                        if missing_data:
                            incomplete_context_data[interface] = {'missing_data': missing_data}
                        if related:
                            if incomplete_context_data.get(interface):
                                incomplete_context_data[interface].update({'related': True})
                            else:
                                incomplete_context_data[interface] = {'related': True}
                        else:
                            incomplete_context_data[interface] = {'related': False}
        return incomplete_context_data
项目:thorn    作者:robinhood    | 项目源码 | 文件源码
def override_worker_setting(self, setting_name, new_value):
        old_value = list(values(self.setenv(setting_name, new_value)))[0]
        try:
            yield
        finally:
            self.setenv(setting_name, old_value)
项目:thorn    作者:robinhood    | 项目源码 | 文件源码
def assert_ok_pidbox_response(self, replies):
        for reply in values(replies):
            if not reply['ok']:
                raise RuntimeError(
                    'Worker remote control command raised: {0!r}'.format(
                        reply.get('error', reply)))
        return replies
项目:thorn    作者:robinhood    | 项目源码 | 文件源码
def contribute_to_model(self, model):
        # type: (type) -> type
        [event.connect_model(model) for event in values(self.events) if event]
        model.webhooks = self
        model.webhook_events = self  # XXX remove for Thorn 2.0
        return model
项目:thorn    作者:robinhood    | 项目源码 | 文件源码
def override_worker_setting(self, setting_name, new_value):
        old_value = list(values(self.setenv(setting_name, new_value)))[0]
        try:
            yield
        finally:
            self.setenv(setting_name, old_value)