我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.iterkeys()。
def _add_proposed(): """Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for the deb line. For intel architecutres PROPOSED_POCKET is used for the release, but for other architectures PROPOSED_PORTS_POCKET is used for the release. """ release = lsb_release()['DISTRIB_CODENAME'] arch = platform.machine() if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET): raise SourceConfigError("Arch {} not supported for (distro-)proposed" .format(arch)) with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt: apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def getFieldInfo(self): """ Return a list of fields that are known and can be queried. :return: a list of known fields. Each entry is a dictionary with name, datatype, and optionally a description. """ if self.fieldInfo is None: # cache the fieldInfo so we don't process all of the documents # every time. # TODO: either have a maximum duration or some other method of # analyzing a subset of the table; on a large table this takes a # long time. coll = self.connect() fields = {} for result in coll.find(): fields.update(result) fieldInfo = [] for field in sorted(six.iterkeys(fields)): fieldInfo.append({'name': field, 'type': 'unknown'}) self.fieldInfo = fieldInfo return self.fieldInfo
def get_active_version(self): if self._active is not None: return self.supported[self._active] key = getattr(settings, self.SETTINGS_KEY, {}).get(self.service_type) if key is None: # TODO(gabriel): support API version discovery here; we'll leave # the setting in as a way of overriding the latest available # version. key = self.preferred # Since we do a key lookup in the supported dict the type matters, # let's ensure people know if they use a string when the key isn't. if isinstance(key, six.string_types): msg = ('The version "%s" specified for the %s service should be ' 'either an integer or a float, not a string.' % (key, self.service_type)) raise exceptions.ConfigurationError(msg) # Provide a helpful error message if the specified version isn't in the # supported list. if key not in self.supported: choices = ", ".join(str(k) for k in six.iterkeys(self.supported)) msg = ('%s is not a supported API version for the %s service, ' ' choices are: %s' % (key, self.service_type, choices)) raise exceptions.ConfigurationError(msg) self._active = key return self.supported[self._active]
def run_mtz_basic_ops(self, router_type): self.tp_svrs = {} router, nets = self._test_router_with_network_and_mtz_networks( router_type) servers_client = self.manager.servers_client for net_id in six.iterkeys(nets): s_id, network, subnet, security_group = nets[net_id] """ servers_client = (self.manager.servers_client if s_id is None else self.admin_manager.servers_client) """ security_groups = [{'name': security_group['id']}] svr = self.create_server_on_network( network, security_groups, name=network['name'], servers_client=servers_client, wait_on_boot=False) self.tp_svrs[net_id] = dict(server=svr, s_id=s_id, network=network, subnet=subnet, security_group=security_group, servers_client=servers_client) self.wait_for_servers_become_active(self.tp_svrs) self.run_servers_connectivity_test(self.tp_svrs)
def _find_get_patten_match_single(dic, match_func): """ Find a matching key from dict. This function returns value when the found match is the only match in the dict. Args: dic (dict): Mapping from a key (str) to corresponding metric (numeric) from one output of LogReporter. match_func (func): A function that takes a variable (str) as input and returns a bool if it is a valid match. Returns: str or None """ # match_func(str) -> bool found_key = None for k in six.iterkeys(dic): if match_func(k): if found_key is None: found_key = k else: # The second match is found return None return found_key
def get_callable_args(function, required_only=False): """Get names of callable arguments. Special arguments (like ``*args`` and ``**kwargs``) are not included into output. If required_only is True, optional arguments (with default values) are not included into output. """ sig = get_signature(function) function_args = list(six.iterkeys(sig.parameters)) for param_name, p in six.iteritems(sig.parameters): if (p.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD) or (required_only and p.default is not Parameter.empty)): function_args.remove(param_name) return function_args
def iterkeys(self): """Unsorted iterator over keys""" patterns = self.patterns replace = self.replace seen = set() for key in six.iterkeys(self.base): for pattern in patterns: m = pattern.match(key) if m: ret = m.group('key') if replace else m.group() if ret not in seen: seen.add(ret) yield ret break for key in DictMethods.iterkeys(self): if key not in seen: yield key
def test_name1_tls_sni_01_1(self, mock_poll): self.mock_net.request_domain_challenges.side_effect = functools.partial( gen_dom_authzr, challs=acme_util.CHALLENGES) mock_poll.side_effect = self._validate_all authzr = self.handler.get_authorizations(["0"]) self.assertEqual(self.mock_net.answer_challenge.call_count, 1) self.assertEqual(mock_poll.call_count, 1) chall_update = mock_poll.call_args[0][0] self.assertEqual(list(six.iterkeys(chall_update)), ["0"]) self.assertEqual(len(chall_update.values()), 1) self.assertEqual(self.mock_auth.cleanup.call_count, 1) # Test if list first element is TLSSNI01, use typ because it is an achall self.assertEqual( self.mock_auth.cleanup.call_args[0][0][0].typ, "tls-sni-01") self.assertEqual(len(authzr), 1)
def test_name1_tls_sni_01_1_http_01_1_dns_1(self, mock_poll): self.mock_net.request_domain_challenges.side_effect = functools.partial( gen_dom_authzr, challs=acme_util.CHALLENGES, combos=False) mock_poll.side_effect = self._validate_all self.mock_auth.get_chall_pref.return_value.append(challenges.HTTP01) self.mock_auth.get_chall_pref.return_value.append(challenges.DNS01) authzr = self.handler.get_authorizations(["0"]) self.assertEqual(self.mock_net.answer_challenge.call_count, 3) self.assertEqual(mock_poll.call_count, 1) chall_update = mock_poll.call_args[0][0] self.assertEqual(list(six.iterkeys(chall_update)), ["0"]) self.assertEqual(len(chall_update.values()), 1) self.assertEqual(self.mock_auth.cleanup.call_count, 1) # Test if list first element is TLSSNI01, use typ because it is an achall for achall in self.mock_auth.cleanup.call_args[0][0]: self.assertTrue(achall.typ in ["tls-sni-01", "http-01", "dns-01"]) # Length of authorizations list self.assertEqual(len(authzr), 1)
def test_name3_tls_sni_01_3(self, mock_poll): self.mock_net.request_domain_challenges.side_effect = functools.partial( gen_dom_authzr, challs=acme_util.CHALLENGES) mock_poll.side_effect = self._validate_all authzr = self.handler.get_authorizations(["0", "1", "2"]) self.assertEqual(self.mock_net.answer_challenge.call_count, 3) # Check poll call self.assertEqual(mock_poll.call_count, 1) chall_update = mock_poll.call_args[0][0] self.assertEqual(len(list(six.iterkeys(chall_update))), 3) self.assertTrue("0" in list(six.iterkeys(chall_update))) self.assertEqual(len(chall_update["0"]), 1) self.assertTrue("1" in list(six.iterkeys(chall_update))) self.assertEqual(len(chall_update["1"]), 1) self.assertTrue("2" in list(six.iterkeys(chall_update))) self.assertEqual(len(chall_update["2"]), 1) self.assertEqual(self.mock_auth.cleanup.call_count, 1) self.assertEqual(len(authzr), 3)
def _flatten_location_translations(location_translations): """If location A translates to B, and B to C, then make A translate directly to C. Args: location_translations: dict of Location -> Location, where the key translates to the value. Mutated in place for efficiency and simplicity of implementation. """ sources_to_process = set(six.iterkeys(location_translations)) def _update_translation(source): """Return the proper (fully-flattened) translation for the given location.""" destination = location_translations[source] if destination not in location_translations: # "destination" cannot be translated, no further flattening required. return destination else: # "destination" can itself be translated -- do so, # and then flatten "source" to the final translation as well. sources_to_process.discard(destination) final_destination = _update_translation(destination) location_translations[source] = final_destination return final_destination while sources_to_process: _update_translation(sources_to_process.pop())
def _ensure_arguments_are_provided(expected_types, arguments): """Ensure that all arguments expected by the query were actually provided.""" # This function only checks that the arguments were specified, # and does not check types. Type checking is done as part of the actual formatting step. expected_arg_names = set(six.iterkeys(expected_types)) provided_arg_names = set(six.iterkeys(arguments)) if expected_arg_names != provided_arg_names: missing_args = expected_arg_names - provided_arg_names unexpected_args = provided_arg_names - expected_arg_names raise GraphQLInvalidArgumentError(u'Missing or unexpected arguments found: ' u'missing {}, unexpected ' u'{}'.format(missing_args, unexpected_args)) ###### # Public API ######
def sorted_dict(value): # type: (Mapping) -> Any """ Sorts a dict's keys to avoid leaking information about the backend's handling of unordered dicts. """ if isinstance(value, Mapping): return OrderedDict( (key, sorted_dict(value[key])) for key in sorted(iterkeys(value)) ) elif isinstance(value, Sequence) and not isinstance(value, string_types): return list(map(sorted_dict, value)) else: return value
def test_get_product_metadata_path(self): product = 'S2A_OPER_PRD_MSIL1C_PDMC_20160311T194734_R031_V20160311T011614_20160311T011614' v = get_product_metadata_path(product) keys = list(iterkeys(v)) assert len(keys) == 1 assert 'tiles' in v[keys[0]] assert 'metadata' in v[keys[0]] assert keys[0] == product metadata = v[keys[0]]['metadata'].split('/') tiles = v[keys[0]]['tiles'][0].split('/') self.assertEqual(metadata[-1], 'metadata.xml') self.assertEqual(tiles[-1], 'tileInfo.json') self.assertEqual(len(v[product]['tiles']), 2)
def test_get_product_metadata_path_new_format(self): product = 'S2A_MSIL1C_20161211T190342_N0204_R113_T10SDG_20161211T190344' v = get_product_metadata_path(product) keys = list(iterkeys(v)) assert len(keys) == 1 assert 'tiles' in v[keys[0]] assert 'metadata' in v[keys[0]] assert keys[0] == product metadata = v[keys[0]]['metadata'].split('/') tiles = v[keys[0]]['tiles'][0].split('/') self.assertEqual(metadata[-1], 'metadata.xml') self.assertEqual(tiles[-1], 'tileInfo.json') self.assertEqual(len(v[product]['tiles']), 1)
def _structs_eq(a, b, comparison_func): assert isinstance(a, dict) if not isinstance(b, dict): return False # TODO support multiple mappings from same field name. dict_len = len(a) if dict_len != len(b): return False for a, b in ((a, b), (b, a)): key_iter = six.iterkeys(a) while True: try: key = next(key_iter) except StopIteration: break if not comparison_func(a[key], b[key]): return False return True
def CalculatePlaneThickness(self, planesDict): """Calculates the plane thickness for each structure.""" planes = [] # Iterate over each plane in the structure for z in iterkeys(planesDict): planes.append(float(z)) planes.sort() # Determine the thickness thickness = 10000 for n in range(0, len(planes)): if (n > 0): newThickness = planes[n] - planes[n-1] if (newThickness < thickness): thickness = newThickness # If the thickness was not detected, set it to 0 if (thickness == 10000): thickness = 0 return thickness
def to_instance_dicts(batch_dict): """Converts from the internal batch format to a list of instances. Args: batch_dict: A dict in the in-memory batch format, as returned by `make_output_dict`. Returns: A list of dicts in the in-memory instance format. """ def get_instance_values(batch_dict): # SparseFeatures are represented as a 2-tuple of list of lists, so # in that case we convert to a list of 2-tuples of lists. columns = (column if not isinstance(column, tuple) else zip(*column) for column in six.itervalues(batch_dict)) return itertools.izip(*columns) return [dict(zip(six.iterkeys(batch_dict), instance_values)) for instance_values in get_instance_values(batch_dict)]
def from_schema_json(schema_json): """Translate a v1 JSON schema into a `Schema`.""" schema_dict = json.loads(schema_json) feature_column_schemas = { feature_dict['name']: _from_feature_dict(feature_dict) for feature_dict in schema_dict.get('feature', []) } sparse_feature_column_schemas = { sparse_feature_dict['name']: _from_sparse_feature_dict( sparse_feature_dict) for sparse_feature_dict in schema_dict.get('sparseFeature', []) } overlapping_keys = set(six.iterkeys(feature_column_schemas)).intersection( six.iterkeys(sparse_feature_column_schemas)) if overlapping_keys: raise ValueError('Keys of dense and sparse features overlapped. ' 'overlapping keys: %s' % overlapping_keys) feature_column_schemas.update(sparse_feature_column_schemas) return sch.Schema(feature_column_schemas)
def build(self): parts = [] for name in iterkeys(self._stats): if self._stats[name] is not None: parts.append("stats['{name}'].add(row['{name}'])".format(name=name)) if not parts: error_msg = 'Did not get any stats variables for table {}. Was add() or init() called first?'\ .format(self.table.name) raise StatsError(error_msg) code = 'def _process_row(stats, row):\n {}'.format('\n '.join(parts)) exec(code) f = locals()['_process_row'] return f, code
def load_user(): """Read user config file and return it as a dict.""" config_path = get_user_config_path() config = {} # TODO: This may be overkill and too slow just for reading a config file with open(config_path) as f: code = compile(f.read(), config_path, 'exec') exec(code, config) keys = list(six.iterkeys(config)) for k in keys: if k.startswith('_'): del config[k] return config
def __new__(mcs, name, bases, dikt): fields = {} for base in bases: fields.update(getattr(base, '_fields', {})) # Do not reorder, this class might override fields from base classes! for key, value in tuple(six.iteritems(dikt)): # not six.iterkeys() (in-place edit!) if isinstance(value, Field): fields[key] = dikt.pop(key) dikt['_orig_slots'] = dikt.get('__slots__', ()) dikt['__slots__'] = tuple( list(dikt['_orig_slots']) + list(six.iterkeys(fields))) dikt['_fields'] = fields return abc.ABCMeta.__new__(mcs, name, bases, dikt)
def new_api_object(client, obj, cls=None, **kwargs): if isinstance(obj, dict): if not cls: resource = obj.get('resource', None) cls = _resource_to_model.get(resource, None) if not cls: obj_keys = set(six.iterkeys(obj)) for keys, model in six.iteritems(_obj_keys_to_model): if keys <= obj_keys: cls = model break cls = cls or APIObject result = cls(client, **kwargs) for k, v in six.iteritems(obj): result[k] = new_api_object(client, v) return result if isinstance(obj, list): return [new_api_object(client, v, cls) for v in obj] return obj