我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用functools.reduce()。
def update_bibs_in(grouped_bibs, db_abbrev): actions = { "y": lambda items: [update_in(bibs, db_abbrev) for bibs in items], "m": lambda items: [manual_update_in(bibs, db_abbrev) for bibs in items], "n": lambda items: items } print("\n ") action = input("Abbreviate everthing?" + "y(yes, automatic)/m(manual)/n(do nothing)") grouped_bibs.sort(key=operator.itemgetter('journal')) grouped_by_journal = [] for key, items in groupby(grouped_bibs, lambda i: i["journal"]): grouped_by_journal.append(list(items)) if action in ("y", "m", "n"): updated_bibs = actions.get(action)(grouped_by_journal) else: return update_bibs_in(grouped_bibs, db_abbrev) updated_bibs = reduce(lambda a, b: a+b, updated_bibs) return updated_bibs
def checksum(self, samples): if samples is None: return None completed = [] for sentence in samples: assert sentence[0] == "$" cksum = reduce(xor, (ord(s) for s in sentence[1:])) completed.append("%s*%02X" % (sentence, cksum)) if len(completed) == 0: return None # NMEA0183 uses \r\n as line separator even on Unix systems. s = "" for line in completed: s = s + line + "\r\n" return s
def get_comma_separated_data(raw): # Convert to long string header, data = "".join(raw).strip().split(" = ") # Remove trailing comma assert data[-1] == ';' data = data[:-1] # Remove newline characters and convert to list data = eval(data.replace("\n", '')) shape = tuple(eval(header[header.index("["):header.index("]") + 1])) step_size = functools.reduce(operator.mul, shape) + 1 years = np.array(data[::step_size], dtype=int) data = np.stack([ np.array(data[1 + index * step_size:(index + 1) * step_size]).reshape(shape) for index in range(len(years)) ], axis=-1) return header, years, data
def get_foreign(self, queryset, search, filters): # Filter with search string query = [Q(code__icontains=search), ] for lang in settings.LANGUAGES_DATABASES: query.append(Q(**{"{}__name__icontains".format(lang.lower()): search})) qs = queryset.filter( reduce(operator.or_, query) ) category = filters.get('ProductForm_category', None) if category is None: category = filters.get('ProductFormCreate_category', None) if category is None: category = filters.get('ProductFormCreateCustom_category', None) if category: qs = qs.filter(category__pk=category) return qs[:settings.LIMIT_FOREIGNKEY]
def query_or(cls, query, *values_list, **annotations): pop_annotations = False if 'pop_annotations' in annotations: pop_annotations = annotations['pop_annotations'] annotations.pop('pop_annotations') annotated_keys = annotations.values() annotations = {key: F(value) for key, value in annotations.items()} if isinstance(query, Iterable): query = reduce(or_, query) result = cls.objects.filter(query).values(*values_list).annotate(**annotations) if pop_annotations: for querydict in result: for value in annotated_keys: querydict.pop(value) return result # tipos de impuestos aplicables a los productos
def GetDistinctColumnsFromSQLQueryData( self, queries: [sql_query_model.SQLQueryModel]) -> [str]: """Get a distinct list of all attributes from multiple queries. Args: queries ([sql_query_model.SQLQueryModel]): an array of multiple SQL query data objects Returns: list[str]: all distinct attributes used in the query """ if len(queries) != 0: list_of_list_of_column_model = [query.columns for query in queries] list_of_column_model = functools.reduce(lambda x, y: x + y, list_of_list_of_column_model) list_of_columns_snake_case = [column.GetColumnAsSnakeCase() for column in list_of_column_model] distinct_columns = sorted(set().union(list_of_columns_snake_case)) return distinct_columns else: return []
def ratio(self): """Return a measure of the sequences' similarity (float in [0,1]). Where T is the total number of elements in both sequences, and M is the number of matches, this is 2.0*M / T. Note that this is 1 if the sequences are identical, and 0 if they have nothing in common. .ratio() is expensive to compute if you haven't already computed .get_matching_blocks() or .get_opcodes(), in which case you may want to try .quick_ratio() or .real_quick_ratio() first to get an upper bound. >>> s = SequenceMatcher(None, "abcd", "bcde") >>> s.ratio() 0.75 >>> s.quick_ratio() 0.75 >>> s.real_quick_ratio() 1.0 """ matches = reduce(lambda sum, triple: sum + triple[-1], self.get_matching_blocks(), 0) return _calculate_ratio(matches, len(self.a) + len(self.b))
def __init__(self, n_out, n_state=31, taps=[27, 30]): self.o = Signal(n_out) # # # state = Signal(n_state) curval = [state[i] for i in range(n_state)] curval += [0]*(n_out - n_state) for i in range(n_out): nv = ~reduce(xor, [curval[tap] for tap in taps]) curval.insert(0, nv) curval.pop() self.sync += [ state.eq(Cat(*curval[:n_state])), self.o.eq(Cat(*curval)) ]
def calc_structured_append_parity(content): """\ Calculates the parity data for the Structured Append mode. :param str content: The content. :rtype: int """ if not isinstance(content, str_type): content = str(content) try: data = content.encode('iso-8859-1') except UnicodeError: try: data = content.encode('shift-jis') except (LookupError, UnicodeError): data = content.encode('utf-8') if _PY2: data = (ord(c) for c in data) return reduce(xor, data)
def _Lookup_PairPosFormat1_subtables_flatten(lst, font): assert _all_equal([l.ValueFormat2 == 0 for l in lst if l.PairSet]), "Report bug against fonttools." self = ot.PairPos() self.Format = 1 self.Coverage = ot.Coverage() self.Coverage.Format = 1 self.ValueFormat1 = reduce(int.__or__, [l.ValueFormat1 for l in lst], 0) self.ValueFormat2 = reduce(int.__or__, [l.ValueFormat2 for l in lst], 0) # Align them glyphs, padded = _merge_GlyphOrders(font, [v.Coverage.glyphs for v in lst], [v.PairSet for v in lst]) self.Coverage.glyphs = glyphs self.PairSet = [_PairSet_flatten([v for v in values if v is not None], font) for values in zip(*padded)] self.PairSetCount = len(self.PairSet) return self
def train_stats(m, trainloader, param_list = None): stats = {} params = filtered_params(m, param_list) counts = 0,0 for counts in enumerate(accumulate((reduce(lambda d1,d2: d1*d2, p[1].size()) for p in params)) ): pass stats['variables_optimized'] = counts[0] + 1 stats['params_optimized'] = counts[1] before = time.time() losses = train(m, trainloader, param_list=param_list) stats['training_time'] = time.time() - before stats['training_loss'] = losses[-1] if len(losses) else float('nan') stats['training_losses'] = losses return stats
def filter_queryset(self, term, queryset=None): """ Return QuerySet filtered by search_fields matching the passed term. Args: term (str): Search term Returns: QuerySet: Filtered QuerySet """ if queryset is None: queryset = self.get_queryset() search_fields = self.get_search_fields() select = Q() term = term.replace('\t', ' ') term = term.replace('\n', ' ') for t in [t for t in term.split(' ') if not t == '']: select &= reduce(lambda x, y: x | Q(**{y: t}), search_fields, Q(**{search_fields[0]: t})) return queryset.filter(select).distinct()
def test_calculate_SNR_positive_1(self): source_array = [89, -89] * 6000 + [502, -502] * 8000 + [89, -89] * 7000 source_data = reduce( lambda a, b: a + struct.pack('>h', b), source_array[1:], struct.pack('>h', source_array[0]) ) sampling_frequency = 8000 bounds_of_speech = [(2.0 * 6000.0 / sampling_frequency, 2.0 * (6000.0 + 8000.0) / sampling_frequency)] silence_energy = reduce( lambda a, b: a + b * b, source_array[0:(2 * 6000)] + source_array[(2 * (6000 + 8000)):], vad.EPS ) / (2.0 * (6000.0 + 7000.0)) speech_energy = reduce( lambda a, b: a + b * b, source_array[(2 * 6000):(2 * (6000 + 8000))], vad.EPS ) / (2.0 * 8000.0) target_snr = 20.0 * math.log10(speech_energy / silence_energy) self.assertAlmostEqual(target_snr, vad.calculate_SNR(source_data, sampling_frequency, bounds_of_speech))
def _column_type(strings, has_invisible=True): """The least generic type all column values are convertible to. >>> _column_type(["1", "2"]) is _int_type True >>> _column_type(["1", "2.3"]) is _float_type True >>> _column_type(["1", "2.3", "four"]) is _text_type True >>> _column_type(["four", '\u043f\u044f\u0442\u044c']) is _text_type True >>> _column_type([None, "brux"]) is _text_type True >>> _column_type([1, 2, None]) is _int_type True >>> import datetime as dt >>> _column_type([dt.datetime(1991,2,19), dt.time(17,35)]) is _text_type True """ types = [_type(s, has_invisible) for s in strings ] return reduce(_more_generic, types, int)
def get_space_separated_data(raw): assert raw[0].strip().endswith("= [") assert raw[-1].strip().endswith("];") header = raw[0].replace("= [", "").strip() shape = tuple(eval(header[header.index("["):header.index("]") + 1])) data = [eval(line.strip().replace(" ", ",")) for line in raw[1:-1]] if len(shape) == 1: step_size = 1 else: step_size = functools.reduce(operator.mul, shape[:-1]) years = np.array(data[::step_size + 1], dtype=int) subarrays = [ np.array(data[index * (step_size + 1) + 1:(index + 1) * (step_size + 1)]).reshape(shape) for index in range(len(years)) ] return header, years, np.stack(subarrays, axis=-1)
def get_current_status(cls): """Get the worse status of all check results. :returns: (status as str, code) :rtype: tuple """ from sauna.plugins.base import Plugin from sauna import check_results_lock, check_results def reduce_status(accumulated, update_value): if update_value.status > Plugin.STATUS_CRIT: return accumulated return accumulated if accumulated > update_value.status else \ update_value.status with check_results_lock: code = reduce(reduce_status, check_results.values(), 0) return Plugin.status_code_to_str(code), code
def __init__(self, jobs): """ Create a new Scheduler. >>> s = Scheduler([Job(1, max, 100, 200)]) >>> for jobs in s: ... time.sleep(s.tick_duration) :param jobs: Sequence of jobs to schedule """ periodicities = {job.periodicity for job in jobs} self.tick_duration = reduce(lambda x, y: fractions.gcd(x, y), periodicities) self._ticks = self.find_minimum_ticks_required(self.tick_duration, periodicities) self._jobs = jobs self._current_tick = 0 logger.debug('Scheduler has {} ticks, each one is {} seconds'. format(self._ticks, self.tick_duration))
def temperature(self, check_config): dummy_sensor = Sensor(device_name='Dummy', label='Dummy', value=-1000) sensors = self._get_temperatures() if check_config.get('sensors'): sensors = [ sensor for sensor in sensors if sensor.device_name in check_config.get('sensors', []) ] sensor = reduce(lambda x, y: x if x.value > y.value else y, sensors, dummy_sensor) if sensor is dummy_sensor: return self.STATUS_UNKNOWN, 'No sensor found' status = self._value_to_status_less(sensor.value, check_config) if status > self.STATUS_OK: return ( status, 'Sensor {}/{} {}°C'.format(sensor.device_name, sensor.label, sensor.value) ) return self.STATUS_OK, 'Temperature okay ({}°C)'.format(sensor.value)
def linear(input_, output_size, weights_initializer=initializers.xavier_initializer(), biases_initializer=tf.zeros_initializer, activation_fn=None, trainable=True, name='linear'): shape = input_.get_shape().as_list() if len(shape) > 2: input_ = tf.reshape(input_, [-1, reduce(lambda x, y: x * y, shape[1:])]) shape = input_.get_shape().as_list() with tf.variable_scope(name): w = tf.get_variable('w', [shape[1], output_size], tf.float32, initializer=weights_initializer, trainable=trainable) b = tf.get_variable('b', [output_size], initializer=biases_initializer, trainable=trainable) out = tf.nn.bias_add(tf.matmul(input_, w), b) if activation_fn != None: return activation_fn(out), w, b else: return out, w, b
def test_sumup(nr_sites, local_dim, rank, rgen, dtype): mpas = [factory.random_mpa(nr_sites, local_dim, 3, dtype=dtype, randstate=rgen) for _ in range(rank if rank is not np.nan else 1)] sum_naive = ft.reduce(mp.MPArray.__add__, mpas) sum_mp = mp.sumup(mpas) assert_array_almost_equal(sum_naive.to_array(), sum_mp.to_array()) assert all(r <= 3 * rank for r in sum_mp.ranks) assert(sum_mp.dtype is dtype) weights = rgen.randn(len(mpas)) summands = [w * mpa for w, mpa in zip(weights, mpas)] sum_naive = ft.reduce(mp.MPArray.__add__, summands) sum_mp = mp.sumup(mpas, weights=weights) assert_array_almost_equal(sum_naive.to_array(), sum_mp.to_array()) assert all(r <= 3 * rank for r in sum_mp.ranks) assert(sum_mp.dtype is dtype)
def test_staff(self): dfw2 = self.dfw.copy() dfw2 = 0 staffing = Staffing(self.dfs, self.dfw, dfw2, self.dtr, self.staffing_parameters,self.config) roster, s, vars =staffing.staff() collectors_day0 = reduce(lambda x,y: x+y, [s.getVal(vars[i,'DSSG',datetime(2011,11,11)]) for i in range(0,self.staffing_parameters['N'])]) collectors_day1 = reduce(lambda x,y: x+y, [s.getVal(vars[i,'DSSG',datetime(2011,11,12)]) for i in range(0,self.staffing_parameters['N'])]) collectors_day2 = reduce(lambda x,y: x+y, [s.getVal(vars[i,'DSSG',datetime(2011,11,13)]) for i in range(0,self.staffing_parameters['N'])]) collectors_day5 = reduce(lambda x,y: x+y, [s.getVal(vars[i,'DSSG',datetime(2011,11,16)]) for i in range(0,self.staffing_parameters['N'])]) #Need 2 people on Monday, 3 people on Tuesday, and 1 (-> 2) people on Wednesday. Zero on other days. self.assertEqual(collectors_day0, 2) self.assertEqual(collectors_day1, 3) self.assertEqual(collectors_day2, 2) self.assertEqual(collectors_day5, 0) self.assertEqual(roster.shape[0], 1) self.assertEqual( list(roster.loc['DSSG',[datetime(2011,11,11),datetime(2011,11,12),datetime(2011,11,13)]].values), [collectors_day0,collectors_day1,collectors_day2])
def __init__(self, env_name, num_episodes, alpha, gamma, epsilon, policy, **kwargs): """ base class for RL using lookup table :param env_name: name of environment, currently environments whose observation space is Box and action space is Discrete are supported. see https://github.com/openai/gym/wiki/Table-of-environments :param num_episodes: number of episode for training :param alpha: :param gamma: :param epsilon: :param kwargs: other arguments. """ super(FABase, self).__init__(env_name, num_episodes, alpha, gamma, policy, epsilon=epsilon, **kwargs) if not isinstance(self.env.action_space, gym.spaces.Discrete) or \ not isinstance(self.env.observation_space, gym.spaces.Box): raise NotImplementedError("action_space should be discrete and " "observation_space should be box") self.obs_shape = self.env.observation_space.shape self.obs_size = reduce(lambda x, y: x * y, self.obs_shape) self.action_size = self.env.action_space.n self._feature = torch.Tensor(self.action_size, self.obs_size) self._weight = None
def __init__(self, env_name, num_episodes, alpha, gamma, policy, **kwargs): """ base class for RL using policy gradient :param env_name: name of environment, currently environments whose observation space is Box and action space is Discrete are supported. see https://github.com/openai/gym/wiki/Table-of-environments :param num_episodes: :param alpha: :param gamma: :param policy: :param kwargs: """ super(PGBase, self).__init__(env_name, num_episodes, alpha, gamma, policy, **kwargs) if not isinstance(self.env.action_space, gym.spaces.Discrete) or \ not isinstance(self.env.observation_space, gym.spaces.Box): raise NotImplementedError("action_space should be discrete and " "observation_space should be box") self.obs_shape = self.env.observation_space.shape self.obs_size = reduce(lambda x, y: x * y, self.obs_shape) self.action_size = self.env.action_space.n self._feature = None self._weight = None
def search_read(cls, domain, offset=0, limit=None, order=None, fields_names=None): ''' Call search and read functions at once. Useful for the client to reduce the number of calls. ''' records = cls.search(domain, offset=offset, limit=limit, order=order) if not fields_names: fields_names = cls._fields.keys() if 'id' not in fields_names: fields_names.append('id') rows = cls.read(map(int, records), fields_names) index = {r.id: i for i, r in enumerate(records)} rows.sort(key=lambda r: index[r['id']]) return rows
def view(self, *args): dst = self.new() if len(args) == 1 and isinstance(args[0], torch.Size): sizes = args[0] else: sizes = torch.Size(args) sizes = _infer_sizes(sizes, self.nelement()) numel = reduce(lambda a, b: a * b, sizes) if len(sizes) > 0 else 0 if numel != self.nelement(): def format_size(size): return 'x'.join(str(v) for v in size) if len(size) > 0 else '0' raise ValueError( "view of size '{0}' is invalid for input of size '{1}'" .format(format_size(sizes), format_size(self.size()))) if not self.is_contiguous(): raise ValueError("input should be contiguous") if self.storage() is not None: dst.set_(self.storage(), self.storage_offset(), sizes) return dst
def generate_words(files): """ Transform list of files to list of words, removing new line character and replace name entity '<NE>...</NE>' and abbreviation '<AB>...</AB>' symbol """ repls = {'<NE>' : '','</NE>' : '','<AB>': '','</AB>': ''} words_all = [] for i, file in enumerate(files): lines = open(file, 'r') for line in lines: line = reduce(lambda a, kv: a.replace(*kv), repls.items(), line) words = [word for word in line.split("|") if word is not '\n'] words_all.extend(words) return words_all
def status(self): pending = sum(node.cpus_used for node in self._nodes.values()) servers = functools.reduce(operator.add, [list(node.servers.keys()) for node in self._nodes.values()], []) return {'Client': self._cur_computation._pulse_task.location if self._cur_computation else '', 'Pending': pending, 'Nodes': list(self._nodes.keys()), 'Servers': servers }
def numel(x): if hasattr(x, 'shape'): return reduce(lambda x, y: x * y, x.shape) if hasattr(x, 'size'): return reduce(lambda x, y: x * y, x.size) if isinstance(x, Iterable): return reduce(lambda x, y: x * y, x) return x.n
def resolve(self): """ Resolve the entry point from its module and attrs. """ module = __import__(self.module_name, fromlist=['__name__'], level=0) try: return functools.reduce(getattr, self.attrs, module) except AttributeError as exc: raise ImportError(str(exc))
def __invert__(self): members, uncovered = _decompose(self.__class__, self._value_) inverted_members = [ m for m in self.__class__ if m not in members and not m._value_ & self._value_ ] inverted = reduce(_or_, inverted_members, self.__class__(0)) return self.__class__(inverted)
def gather_nd(params, indices, shape): rank = len(shape) flat_params = tf.reshape(params, [-1]) multipliers = [reduce(lambda x, y: x*y, shape[i+1:], 1) for i in range(0, rank)] indices_unpacked = tf.unstack(tf.transpose(indices, [rank - 1] + list(range(0, rank - 1)))) flat_indices = sum([a*b for a,b in zip(multipliers, indices_unpacked)]) return tf.gather(flat_params, flat_indices) # ctc_label_dense_to_sparse is taken from https://github.com/tensorflow/tensorflow/issues/1742#issuecomment-205291527 # # The CTC implementation in TensorFlow needs labels in a sparse representation, # but sparse data and queues don't mix well, so we store padded tensors in the # queue and convert to a sparse representation after dequeuing a batch. #
def get_foreign(self, queryset, search, filters): # Filter with search string query = [Q(code__icontains=search), ] for lang in settings.LANGUAGES_DATABASES: query.append(Q(**{"{}__name__icontains".format(lang.lower()): search})) qs = queryset.filter( reduce(operator.or_, query) ) family = filters.get('FeatureForm_family', None) if family is None: family = filters.get('AttributeForm_family', None) if family is None: family = filters.get('FeatureSpecialForm_family', None) if family is None: family = filters.get('ProductForm_family', None) if family is None: family = filters.get('ProductFormCreate_family', None) if family is None: family = filters.get('ProductFormCreateCustom_family', None) if family: qs = qs.filter(family__pk=family) return qs[:settings.LIMIT_FOREIGNKEY] # ###########################################
def model_baseline(s_x_, s_pdpo_): '''very simple logistic regression model''' global g_mdl, g_dataset s_bsize = T.shape(s_x_)[0] idim, odim = reduce(int.__mul__, g_dataset.imsize), len(g_dataset.label_map) return T.nnet.softmax( g_mdl.op_dropout(g_mdl.lyr_linear( 'm', T.reshape(s_x_, (s_bsize,idim)), idim, odim), s_pdpo_))
def visit_Assign(self, t): def compose(left, right): return op.DUP_TOP + left + right return self(t.value) + reduce(compose, map(self, t.targets))
def visit_BoolOp(self, t): op_jump = self.ops_bool[type(t.op)] def compose(left, right): after = Label() return left + op_jump(after) + OffsetStack() + right + after return reduce(compose, map(self, t.values))
def has_result_of_status(self, status, results): inbound_str = self.items["operation"]["inbound"] query = Query() result_q = reduce(or_, [ query.result == a_result for a_result in results]) querys = [query.inbound == inbound_str, query.inbound_status_id == status.get_status_id(), result_q] combined_query = reduce(and_, querys) return self.search_db(combined_query)
def get_result_summaries_by_results(self, results): query = Query() querys = [query.result == a_result for a_result in results] combined_query = reduce(or_, querys) return self.search_db(combined_query)