我们从Python开源项目中,提取了以下12个代码示例,用于说明如何使用toolz.compose()。
def doctable(ctx): df = pd.read_csv('./docs/flight-options.csv') # open an existing document doc = docx.Document('./docs/style-reference.docx') as_int = partial(format_decimal, format='#') as_usd = partial(format_currency, currency='USD') s = doc.sections[0] width = s.page_width - s.left_margin - s.right_margin doc.add_picture('./docs/diagrams_002.png', width=width) formatters = { 'ticket_price': as_usd, 'total_hours': as_int, 'trip': as_int, 'airline': partial(shorten_long_name, width=20), 'selected': compose({0: 'No', 1: 'Yes'}.get, int) } add_table(df, doc, table_style='Plain Table 3', formatters=formatters) # save the doc doc.save('./docs/test.docx')
def matches(self, pattern): """ Elementwise regex match. Parameters ---------- pattern : str or compiled regex Returns ------- matches : np.ndarray[bool] An array with the same shape as self indicating whether each element of self was matched by ``pattern``. """ return self.map_predicate(compose(bool, pattern.match)) # These types all implement an O(N) __contains__, so pre-emptively # coerce to `set`.
def create_influxdb_writer(influxdb_client, series_name="gpu_measurements", **tags): """ Returns function which writes to influxdb Parameters ---------- influxdb_client: series_name: (str) tags: Extra tags to be added to the measurements """ to_influxdb = _influxdb_writer_for(influxdb_client, series_name) if tags: logger.debug('Creating writer with tags') write_to_db = compose(to_influxdb, _add_tags(tags), _to_json_dict, parse_line) else: logger.debug('Creating writer') write_to_db = compose(to_influxdb, _to_json_dict, parse_line) return _call_when(write_to_db, lambda x: x is not None and '#' not in x)
def test_quantiles_uneven_buckets(self): permute = partial(permute_rows, 5) shape = (5, 5) factor_data = permute(log1p(arange(25, dtype=float).reshape(shape))) mask_data = permute(self.eye_mask(shape=shape)) f = F() m = Mask() permuted_array = compose(permute, partial(array, dtype=int64_dtype)) self.check_terms( terms={ '3_masked': f.quantiles(bins=3, mask=m), '7_masked': f.quantiles(bins=7, mask=m), }, initial_workspace={ f: factor_data, m: mask_data, }, expected={ '3_masked': permuted_array([[-1, 0, 0, 1, 2], [0, -1, 0, 1, 2], [0, 0, -1, 1, 2], [0, 0, 1, -1, 2], [0, 0, 1, 2, -1]]), '7_masked': permuted_array([[-1, 0, 2, 4, 6], [0, -1, 2, 4, 6], [0, 2, -1, 4, 6], [0, 2, 4, -1, 6], [0, 2, 4, 6, -1]]), }, mask=self.build_mask(self.ones_mask(shape=shape)), )
def __call__(self, *args, **kwargs): contract = self.contract(*args, **kwargs) if type(contract) == type: # contract still needs to be initialized, wrap afterwards return compose(ContractSugar, contract) else: return ContractSugar(contract)
def format_results(terminal_width, key_list, separator, text_list, left_align=True, min_factor=3, **kwargs): """Returns formatted results in two columns. """ key_width = max(map(len, key_list)) separator_length = len(separator) desc_wrap = toolz.identity if terminal_width: if key_width / terminal_width > .5: key_width = terminal_width // 2 - 3 text_width = terminal_width - key_width - separator_length if text_width * min_factor > terminal_width: desc_wrap = toolz.compose( ('\n' + ' ' * (key_width + separator_length)).join, toolz.partial(textwrap.wrap, width=text_width, **kwargs), ) if left_align: fmt = '%-*s%s%s' else: fmt = '%*s%s%s' for key, text in zip(key_list, text_list): text = desc_wrap(text) if len(key) > key_width: yield fmt % (key_width, key, separator, '') yield fmt % (key_width, '', ' ' * separator_length, text) else: yield fmt % (key_width, key, separator, text)
def transform_paragraphs(notebook, transformations=None): """""" if transformations: f = compose(*reversed(transformations)) notebook["paragraphs"] = [f(p) for p in notebook.get("paragraphs", [])] return notebook
def test_quantiles_unmasked(self, seed): permute = partial(permute_rows, seed) shape = (6, 6) # Shuffle the input rows to verify that we don't depend on the order. # Take the log to ensure that we don't depend on linear scaling or # integrality of inputs factor_data = permute(log1p(arange(36, dtype=float).reshape(shape))) f = self.f # Apply the same shuffle we applied to the input rows to our # expectations. Doing it this way makes it obvious that our # expectation corresponds to our input, while still testing against # a range of input orderings. permuted_array = compose(permute, partial(array, dtype=int64_dtype)) self.check_terms( terms={ '2': f.quantiles(bins=2), '3': f.quantiles(bins=3), '6': f.quantiles(bins=6), }, initial_workspace={ f: factor_data, }, expected={ # The values in the input are all increasing, so the first half # of each row should be in the bottom bucket, and the second # half should be in the top bucket. '2': permuted_array([[0, 0, 0, 1, 1, 1], [0, 0, 0, 1, 1, 1], [0, 0, 0, 1, 1, 1], [0, 0, 0, 1, 1, 1], [0, 0, 0, 1, 1, 1], [0, 0, 0, 1, 1, 1]]), # Similar for three buckets. '3': permuted_array([[0, 0, 1, 1, 2, 2], [0, 0, 1, 1, 2, 2], [0, 0, 1, 1, 2, 2], [0, 0, 1, 1, 2, 2], [0, 0, 1, 1, 2, 2], [0, 0, 1, 1, 2, 2]]), # In the limiting case, we just have every column different. '6': permuted_array([[0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5]]), }, mask=self.build_mask(self.ones_mask(shape=shape)), )
def expect_types(__funcname=_qualified_name, **named): """ Preprocessing decorator that verifies inputs have expected types. Examples -------- >>> @expect_types(x=int, y=str) ... def foo(x, y): ... return x, y ... >>> foo(2, '3') (2, '3') >>> foo(2.0, '3') # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS Traceback (most recent call last): ... TypeError: ...foo() expected a value of type int for argument 'x', but got float instead. Notes ----- A special argument, __funcname, can be provided as a string to override the function name shown in error messages. This is most often used on __init__ or __new__ methods to make errors refer to the class name instead of the function name. """ for name, type_ in iteritems(named): if not isinstance(type_, (type, tuple)): raise TypeError( "expect_types() expected a type or tuple of types for " "argument '{name}', but got {type_} instead.".format( name=name, type_=type_, ) ) def _expect_type(type_): # Slightly different messages for type and tuple of types. _template = ( "%(funcname)s() expected a value of type {type_or_types} " "for argument '%(argname)s', but got %(actual)s instead." ) if isinstance(type_, tuple): template = _template.format( type_or_types=' or '.join(map(_qualified_name, type_)) ) else: template = _template.format(type_or_types=_qualified_name(type_)) return make_check( exc_type=TypeError, template=template, pred=lambda v: not isinstance(v, type_), actual=compose(_qualified_name, type), funcname=__funcname, ) return preprocess(**valmap(_expect_type, named))
def test_top_and_bottom_with_groupby_and_mask(self, dtype, seed): permute = partial(permute_rows, seed) permuted_array = compose(permute, partial(array, dtype=int64_dtype)) shape = (8, 8) # Shuffle the input rows to verify that we correctly pick out the top # values independently of order. factor_data = permute(arange(0, 64, dtype=dtype).reshape(shape)) classifier_data = permuted_array([[0, 0, 1, 1, 2, 2, 0, 0], [0, 0, 1, 1, 2, 2, 0, 0], [0, 1, 2, 3, 0, 1, 2, 3], [0, 1, 2, 3, 0, 1, 2, 3], [0, 0, 0, 0, 1, 1, 1, 1], [0, 0, 0, 0, 1, 1, 1, 1], [0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0]]) f = self.f c = self.c self.check_terms( terms={ 'top2': f.top(2, groupby=c), 'bottom2': f.bottom(2, groupby=c), }, initial_workspace={ f: factor_data, c: classifier_data, }, expected={ # Should be the rightmost two entries in classifier_data, # ignoring the off-diagonal. 'top2': permuted_array([[0, 1, 1, 1, 1, 1, 1, 0], [0, 1, 1, 1, 1, 1, 0, 1], [1, 1, 1, 1, 1, 0, 1, 1], [1, 1, 1, 1, 0, 1, 1, 1], [0, 1, 1, 0, 0, 0, 1, 1], [0, 1, 0, 1, 0, 0, 1, 1], [0, 0, 0, 0, 0, 0, 1, 1], [0, 0, 0, 0, 0, 0, 1, 1]], dtype=bool), # Should be the rightmost two entries in classifier_data, # ignoring the off-diagonal. 'bottom2': permuted_array([[1, 1, 1, 1, 1, 1, 0, 0], [1, 1, 1, 1, 1, 1, 0, 0], [1, 1, 1, 1, 1, 0, 1, 1], [1, 1, 1, 1, 0, 1, 1, 1], [1, 1, 0, 0, 1, 1, 0, 0], [1, 1, 0, 0, 1, 1, 0, 0], [1, 0, 1, 0, 0, 0, 0, 0], [0, 1, 1, 0, 0, 0, 0, 0]], dtype=bool), }, mask=self.build_mask(permute(rot90(self.eye_mask(shape=shape)))), )