我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用collections.OrderedDict.fromkeys()。
def parse_stream_url(self, url): logger.debug('Extracting URIs from %s', url) extension = urlparse(url).path[-4:] if extension in ['.mp3', '.wma']: logger.debug('Got %s', url) return [url] # Catch these easy ones results = [] playlist, content_type = self._get_playlist(url) if playlist: parser = find_playlist_parser(extension, content_type) if parser: playlist_data = StringIO.StringIO(playlist) try: results = [u for u in parser(playlist_data) if u and u != url] except Exception as exp: # pylint: disable=broad-except logger.error('TuneIn playlist parsing failed %s', exp) if not results: logger.debug('Parsing failure, ' 'malformed playlist: %s', playlist) elif content_type: results = [url] logger.debug('Got %s', results) return list(OrderedDict.fromkeys(results))
def __init__(self, agent, pattern=None, *args, **kwargs): #agent is posting agent super(WorkTodoForm, self).__init__(*args, **kwargs) contexts = agent.related_contexts() self.fields["context_agent"].choices = list(set([(ct.id, ct) for ct in contexts])) peeps = [agent,] from_agent_choices = [('', 'Unassigned'), (agent.id, agent),] #import pdb; pdb.set_trace() for context in contexts: if agent.is_manager_of(context): peeps.extend(context.task_assignment_candidates()) if len(peeps) > 1: peeps = list(OrderedDict.fromkeys(peeps)) from_agent_choices = [('', 'Unassigned')] + [(peep.id, peep) for peep in peeps] self.fields["from_agent"].choices = from_agent_choices #import pdb; pdb.set_trace() if pattern: self.pattern = pattern #self.fields["resource_type"].choices = [(rt.id, rt) for rt in pattern.todo_resource_types()] self.fields["resource_type"].queryset = pattern.todo_resource_types()
def test_fromkeys(self): for cls in [MIMapping, MIDict, FrozenMIDict]: for keys in [[], [1]]: for value in [None, 0]: for names in [None, ['a', 'b']]: d = cls.fromkeys(keys, value, names) self.assertEqual(d.__class__, cls) self.assertEqual(list(d.keys()), keys) self.assertEqual(list(d.values()), [value for k in keys]) if keys: if names is None: names = ['index_1', 'index_2'] self.assertEqual(list(d.indices.keys()), names) with self.assertRaises(ValueError): MIMapping.fromkeys([1,2,3])
def filter_push_list(push_list, option): allowed_types = option['allowed_types'] floor_limit = option['floor_limit'] step = option['step'] duplicate = option['duplicate'] amount = option['amount'] if floor_limit: push_list = push_list[:floor_limit] # step push_list = [v for i, v in enumerate(push_list) if (i+1) % step == 0] # push type push_list = [x for x in push_list if (x['push'] in allowed_types)] id_list = [x['id'] for x in push_list] if not duplicate: id_list = list(OrderedDict.fromkeys(id_list)) if amount: id_list = id_list[:amount] return id_list
def _adjust_defaults(self): ''' Adjust the default parameters to include necessary parameters For any query involving DAP DB, always return the spaxel index TODO: change this to spaxel x and y TODO: change this entirely ''' dapschema = ['dapdb' in c.class_.__table__.schema for c in self.queryparams] if any(dapschema): dapcols = ['spaxelprop.x', 'spaxelprop.y', 'bintype.name', 'template.name'] self.defaultparams.extend(dapcols) self.params.extend(dapcols) self.params = list(OrderedDict.fromkeys(self.params)) self._create_query_modelclasses() # qpdap = self.marvinform._param_form_lookup.mapToColumn(dapcols) # self.queryparams.extend(qpdap) # self.queryparams_order.extend([q.key for q in qpdap])
def get_inputs_of_variables(variables): """ This function returns required inputs for the (tensor variable) variable. The order of the inputs are toposorted. Parameters ---------- variable: list a list of (tensor variable) to see. usally this is a theano function output list. (loss, accuracy, etc.) Returns ------- list a list of required inputs to compute the variable. """ # assert assert isinstance(variables, list), 'Variables should be a list of tensor variable(s).' assert all(isinstance(var, T.TensorVariable) for var in variables), 'All input should be a tensor variable.' # do variable_inputs = [var for var in graph.inputs(variables) if isinstance(var, T.TensorVariable)] variable_inputs = list(OrderedDict.fromkeys(variable_inputs)) # preserve order and make to list print('Required inputs are:', variable_inputs) return variable_inputs
def test_hnames_to_names(): hnames = OrderedDict.fromkeys(['a', 'b']) names = hdf.hnames_to_names(hnames) assert names == ['a', 'b'] hnames = OrderedDict() hnames['a'] = ['a1', 'a2'] hnames['b'] = ['b1', 'b2'] names = hdf.hnames_to_names(hnames) assert names == ['a/a1', 'a/a2', 'b/b1', 'b/b2'] hnames = OrderedDict() hnames['a'] = 'a1' hnames['b'] = ['b1', 'b2'] hnames['c'] = None names = hdf.hnames_to_names(hnames) assert names == ['a/a1', 'b/b1', 'b/b2', 'c']
def _evaluate_concatenate(e, self): """Concatenate nodes flatten and concatenate shapes.""" ops = [self(o) for o in e.children] fids = tuple(OrderedDict.fromkeys(itertools.chain(*(o.fids for o in ops)))) fshape = tuple(i.extent for i in fids) arrs = [] for o in ops: # Create temporary with correct shape arr = numpy.empty(fshape + o.shape) # Broadcast for extra free indices arr[:] = o.broadcast(fids) # Flatten shape arr = arr.reshape(arr.shape[:arr.ndim-len(o.shape)] + (-1,)) arrs.append(arr) arrs = numpy.concatenate(arrs, axis=-1) return Result(arrs, fids)
def restart_on_change(restart_map, func): """Restart services using provided function based on configuration files changing""" def wrap(f): def wrapped_f(*args): checksums = {} for path in restart_map: checksums[path] = file_hash(path) f(*args) restarts = [] for path in restart_map: if checksums[path] != file_hash(path): restarts += restart_map[path] services_list = list(OrderedDict.fromkeys(restarts)) for s_name in services_list: func(s_name) return wrapped_f return wrap
def parse_stream_url(self, url): logger.debug('Extracting URIs from %s', url) extension = urlparse.urlparse(url).path[-4:] if extension in ['.mp3', '.wma']: logger.debug('Got %s', url) return [url] # Catch these easy ones results = [] playlist, content_type = self._get_playlist(url) if playlist: parser = find_playlist_parser(extension, content_type) if parser: playlist_data = StringIO.StringIO(playlist) try: results = [u for u in parser(playlist_data) if u and u != url] except Exception as e: logger.error('TuneIn playlist parsing failed %s' % e) if not results: logger.debug('Parsing failure, ' 'malformed playlist: %s' % playlist) elif content_type: results = [url] logger.debug('Got %s', results) return list(OrderedDict.fromkeys(results))
def merge_version(lgr_set): """ Merge versions from LGR set. :param lgr_set: The LGRs in the set :return: The merged version object """ values = OrderedDict() comments = OrderedDict() for version in [lgr.metadata.version for lgr in lgr_set]: if not version: continue if version.value: values.update(OrderedDict.fromkeys([version.value])) if version.comment: comments.update(OrderedDict.fromkeys([version.comment])) return Version('|'.join(values.keys()), '|'.join(comments.keys()))
def restart_on_change_helper(lambda_f, restart_map, stopstart=False, restart_functions=None): """Helper function to perform the restart_on_change function. This is provided for decorators to restart services if files described in the restart_map have changed after an invocation of lambda_f(). @param lambda_f: function to call. @param restart_map: {file: [service, ...]} @param stopstart: whether to stop, start or restart a service @param restart_functions: nonstandard functions to use to restart services {svc: func, ...} @returns result of lambda_f() """ if restart_functions is None: restart_functions = {} checksums = {path: path_hash(path) for path in restart_map} r = lambda_f() # create a list of lists of the services to restart restarts = [restart_map[path] for path in restart_map if path_hash(path) != checksums[path]] # create a flat list of ordered services without duplicates from lists services_list = list(OrderedDict.fromkeys(itertools.chain(*restarts))) if services_list: actions = ('stop', 'start') if stopstart else ('restart',) for service_name in services_list: if service_name in restart_functions: restart_functions[service_name](service_name) else: for action in actions: service(action, service_name) return r
def __new__(cls, name, bases, classdict): self = super().__new__(cls, name, bases, dict(classdict)) attrs_lists = [getattr(b, '__attrs__', ()) for b in bases] attrs_lists += [[a for a in classdict if not a.startswith('__')]] # use ordered dict like an ordered set self.__attrs__ = tuple(OrderedDict.fromkeys(chain.from_iterable(attrs_lists))) base_fields = {} for base in bases: for attr in getattr(base, '__fields__', ()): if attr not in base_fields: base_fields[attr] = getattr(base, attr).clone(self) fields = {} for attr, value in list(classdict.items()): if attr.startswith('__'): continue if isinstance(value, Field): fields[attr] = value._setup(self, attr, base=base_fields.get(attr, NOTSET)) elif attr in base_fields: fields[attr] = base_fields[attr].clone(self, default=value) for attr, base_field in base_fields.items(): if attr not in fields: fields[attr] = base_field.clone(self) for attr, field in fields.items(): for prop_name in ('fget', 'fset', 'fdel'): prop_value = getattr(field, prop_name) if isinstance(prop_value, FunctionType): prop_value = prop_value.__name__ if isinstance(prop_value, str): prop_value = getattr(self, prop_value) setattr(field, prop_name, prop_value) setattr(self, attr, field) self.__fields__ = tuple(a for a in self.__attrs__ if a in fields) return self
def distinct(sequence: List[E]) -> List[E]: return list(OrderedDict.fromkeys(sequence))
def dedup(values): """ Removes duplicate items from a list. Note that it does not change the original list. :param values: list :type values: list :returns: de-duped list :rtype: list """ return list(OrderedDict.fromkeys(values))
def fromkeys(cls, seq, value=None): d = DotMap() d._map = OrderedDict.fromkeys(seq, value) return d
def tune(self, station): logger.debug('Tuning station id %s', station['guide_id']) args = '&id=' + station['guide_id'] stream_uris = [] for stream in self._tunein('Tune.ashx', args): if 'url' in stream: stream_uris.append(stream['url']) if not stream_uris: logger.error('Failed to tune station id %s', station['guide_id']) return list(OrderedDict.fromkeys(stream_uris))
def build_vocab(self, *args, **kwargs): """Construct the Vocab object for this field from one or more datasets. Arguments: Positional arguments: Dataset objects or other iterable data sources from which to construct the Vocab object that represents the set of possible values for this field. If a Dataset object is provided, all columns corresponding to this field are used; individual columns can also be provided directly. Remaining keyword arguments: Passed to the constructor of Vocab. """ counter = Counter() sources = [] for arg in args: if isinstance(arg, Dataset): sources += [getattr(arg, name) for name, field in arg.fields.items() if field is self] else: sources.append(arg) for data in sources: for x in data: if not self.sequential: x = [x] counter.update(x) specials = list(OrderedDict.fromkeys( tok for tok in [self.unk_token, self.pad_token, self.init_token, self.eos_token] if tok is not None)) self.vocab = self.vocab_cls(counter, specials=specials, **kwargs)
def __init__(self, configuration, input_parameter_values): """ To initialize an entity, a corresponding entity configuration together and values for the input parameter(s) are needed. :param configuration: an object of class EntityConfiguration :param input_parameter_values: A dictionary with values for the input parameters defined in the configuration. """ # corresponding entity configuration self.configuration = configuration # parameters needed to identify entity (or for validation) self.input_parameters = OrderedDict.fromkeys(configuration.input_parameters) # parameters that should be retrieved using the API self.output_parameters = OrderedDict.fromkeys(configuration.output_parameter_mapping.keys()) # set values for input parameters for parameter in configuration.input_parameters: if parameter in input_parameter_values: self.input_parameters[parameter] = input_parameter_values[parameter] else: raise IllegalArgumentError("Illegal input parameter: " + parameter) # get uri for this entity from uri template in the configuration uri_variable_values = { **self.input_parameters, "api_key": self.configuration.api_key } self.uri = self.configuration.uri_template.replace_variables(uri_variable_values)
def yield_source_records(source_db_paths: Dict[Text, PathInfo], source_fieldnames: Sequence[Text] ) -> Generator[NamedTuple, None, None]: """ Returns a generator of named tuple which can yield a record across all database files. Accepts dict of profile names and their database filepaths; and inclusive list of fieldnames. source_db_paths: {Profile names: profile database filepaths} source_fieldnames: list of fieldnames inclusive of all the fieldnames across all database files. returns: Generator of namedtuple which can yield each record. """ global DBRecord # Additional field to store last_visited_date field value converted from microsceonds to human usable format. # source_fieldnames.append('visited_on') # will likely be moved to browser specific settings, when otjer browsers are added. DBRecord = namedtuple('DBRecord', source_fieldnames) incr = helpers.incrementer() source_records_template = odict.fromkeys(source_fieldnames, None) for profile_name, profile_db_path in source_db_paths.items(): with sqlite3.connect(profile_db_path) as source_conn: source_conn.row_factory = sqlite3.Row try: for db_record_yielder in source_conn.execute("""SELECT * FROM moz_places WHERE title IS NOT NULL"""): ''' Prevents adding additional keys, only updates keys/fields specified in source_fieldnames. Prevents field mismatches among profiles, ex: favicon_url in Firefox exists in some profiles not in others. ''' source_records_template = odict( (key, dict(db_record_yielder).setdefault(key, None)) for key in source_records_template) # Couldn't figure out how to make AUTOINCREMENT PRIMARY KEY work in SQL, hence this serial# generator. source_records_template['id'] = next(incr) try: source_records_template['last_visit_date_readable'] = dt.fromtimestamp(source_records_template['last_visit_date'] // 10**6).strftime('%x %X') except TypeError: pass # OrderedDict converted to NamedTuple as tuples easily convert to SQL query bindings. yield DBRecord(*source_records_template.values()) except sqlite3.OperationalError: print(f'This browser profile does not seem to have any data: {profile_name}')
def arguments(self): """Returns a list of the arguments provided in the query and/or POST. The return value is an ordered list of strings. """ return list(OrderedDict.fromkeys(self.params.keys()))
def phone_to_num(phone_values): """Converts all phone strings into numerical values. This helper function iterates over a set of different phones and returns an alphabetically orderd directory where the phone string is the key and the numerical value is the corrosponding value. :params phone_values: a set (or list) of phone values :returns: an ordered directory with the numerical values as values """ phone_values = OrderedDict.fromkeys(sorted(phone_values)) for i,k in enumerate(phone_values.keys()): phone_values[k] = i phone_values['None'] = -1 return phone_values
def deduplicate(seq): return list(OrderedDict.fromkeys(seq))
def get_backward_ops(seed_tensors, treat_as_inputs=None): """ Get backward ops from inputs to `seed_tensors` by topological order. :param seed_tensors: A Tensor or list of Tensors, for which to get all preceding Tensors. :param treat_as_inputs: None or a list of Tensors that is treated as inputs during the search (where to stop searching the backward graph). :return: A list of tensorflow `Operation` s in topological order. """ if treat_as_inputs is None: treat_as_inputs = [] treat_as_inputs = set(treat_as_inputs) if not isinstance(seed_tensors, (list, tuple)): seed_tensors = [seed_tensors] seed_tensors = [t for t in seed_tensors if t not in treat_as_inputs] seed_ops = list(OrderedDict.fromkeys(t.op for t in seed_tensors)) q = deque(seed_ops) seen = set() done = set() ret = [] while q: op = q[0] if op not in seen: seen.add(op) for tensor in reversed(op.inputs): if tensor not in treat_as_inputs: q.appendleft(tensor.op) q.extendleft(reversed(op.control_inputs)) else: # have seen this op before q.popleft() if op not in done: done.add(op) ret.append(op) return ret
def _get_tenant_network_types(): default_tenant_network_type = config('default-tenant-network-type') tenant_network_types = _get_overlay_network_type() tenant_network_types.extend(NON_OVERLAY_NET_TYPES) if default_tenant_network_type: if (default_tenant_network_type in TENANT_NET_TYPES and default_tenant_network_type in tenant_network_types): tenant_network_types[:0] = [default_tenant_network_type] else: raise ValueError('Unsupported or unconfigured ' 'default-tenant-network-type' ' {}'.format(default_tenant_network_type)) # Dedupe list but preserve order return list(OrderedDict.fromkeys(tenant_network_types))