我们从Python开源项目中,提取了以下14个代码示例,用于说明如何使用toolz.concatv()。
def visit_enumeration(self, node, children): def iter_enumerations(): integers_or_symbols = concatv( find(children, type='integer'), find(children, type='symbol'), ) values = list(pluck('value', integers_or_symbols)) if values: yield make_json_ast_node( type='enumeration_values', values=values, ) intervals = find_many_or_none(children, type='interval') if intervals is not None: yield from intervals assert isinstance(children, list), children return list(iter_enumerations())
def test_id_macro_dataset(self): """ input (self.macro_df) asof_date timestamp value 0 2014-01-01 2014-01-01 0 3 2014-01-02 2014-01-02 1 6 2014-01-03 2014-01-03 2 output (expected): value 2014-01-01 Equity(65 [A]) 0 Equity(66 [B]) 0 Equity(67 [C]) 0 2014-01-02 Equity(65 [A]) 1 Equity(66 [B]) 1 Equity(67 [C]) 1 2014-01-03 Equity(65 [A]) 2 Equity(66 [B]) 2 Equity(67 [C]) 2 """ asset_info = asset_infos[0][0] nassets = len(asset_info) with tmp_asset_finder() as finder: expected = pd.DataFrame( list(concatv([0] * nassets, [1] * nassets, [2] * nassets)), index=pd.MultiIndex.from_product(( self.macro_df.timestamp, finder.retrieve_all(asset_info.index), )), columns=('value',), ) self._test_id( self.macro_df, self.macro_dshape, expected, finder, ('value',), )
def __new__(mcls, name, bases, dict_): self = super().__new__(mcls, name, bases, dict_) if len(bases) and bases[0] is ADT: self._typevars = dict_._typevars self._constructors = tuple(dict_._constructors.values()) constructors = set(self._constructors) for constructor in constructors: types = concatv( constructor._args, constructor._kwargs.values(), ) for t in types: if isinstance(t, RecursiveType) and t._name != name: raise TypeError( 'recursive type name must be the same as the type' ' name, %r != %r' % ( t._name, name, ), ) if t in constructors: raise TypeError( 'constructor %r has arguments that are other' ' constructors' % constructor, ) if not self._typevars: return adt(self, ()) return self
def visit_infix_expression(node, operators={}): def interleave(*iterables): for values in itertools.zip_longest(*iterables, fillvalue=UnboundLocalError): for index, value in enumerate(values): if value != UnboundLocalError: yield index, value tokens = [ visit_node(operand_or_operator) if index == 0 else operators.get(operand_or_operator, operand_or_operator) for index, operand_or_operator in interleave(node['operands'], node['operators']) ] # Transform product expressions into a lazy "and" expression in order to prevent a division by 0: if node['type'] == 'product_expression': tokens = concatv( interpose( el='and', seq=map(visit_node, node['operands']), ), ['and'], tokens, ) return '({})'.format(' '.join(map(str, tokens))) # Main visitor
def generate_stencil_points(self): return concatv(self._stencil_points, self._stencil_iter)
def update_usage_search_locations(self, platform: str): '''Update the places where usages are found Call this whenever you load new modules or scripts. ''' if platform.lower().startswith('python'): from . import jedi_dump jedi_dump.JediCodeElementNode.usage_resolution_modules = ( frozenset((nn.module_context for nn in tz.concatv(self.module_nodes[platform].values(), self.script_nodes[platform].values()) if nn.code_element.path)))
def keyPressEvent(self, event): super().keyPressEvent(event) #ll.setFocus() if not event.isAccepted(): if event.key() == Qt.Qt.Key_Right: _next = self.next_call_list() if _next and _next.count() > 0: if all(future.done() for future in tz.concatv(self.populate_futures, self.add_next_futures)): _next.setFocus() self.map_widget.ensureWidgetVisible(_next, 0, 0) if self.next_call_list().currentItem() is None: self.next_call_list().setCurrentRow(0) else: wait_item = _next.item(0) wait_item.poked += 1 if wait_item.poked == 3: wait_item.setText('QUIT POKING ME') event.accept() if event.key() == Qt.Qt.Key_Left: _prev = self.prev_call_list() if _prev: _prev.setFocus() (self.map_widget.ensureWidgetVisible(_prev, 0, 0)) event.accept() if event.key() == Qt.Qt.Key_Space: pass
def append(self, other: 'LazyList[A]') -> 'LazyList[A]': return self.copy(lambda s: concatv(s, self.strict, other.source, other.strict), lambda s: List())
def test_deltas_macro(self): asset_info = asset_infos[0][0] expr = bz.data(self.macro_df, name='expr', dshape=self.macro_dshape) deltas = bz.data( self.macro_df.iloc[:-1], name='deltas', dshape=self.macro_dshape, ) deltas = bz.transform( deltas, value=deltas.value + 10, timestamp=deltas.timestamp + timedelta(days=1), ) nassets = len(asset_info) expected_views = keymap(pd.Timestamp, { '2014-01-02': repeat_last_axis(np.array([10.0, 1.0]), nassets), '2014-01-03': repeat_last_axis(np.array([11.0, 2.0]), nassets), }) with tmp_asset_finder(equities=asset_info) as finder: expected_output = pd.DataFrame( list(concatv([10] * nassets, [11] * nassets)), index=pd.MultiIndex.from_product(( sorted(expected_views.keys()), finder.retrieve_all(asset_info.index), )), columns=('value',), ) dates = self.dates self._run_pipeline( expr, deltas, expected_views, expected_output, finder, calendar=dates, start=dates[1], end=dates[-1], window_length=2, compute_fn=np.nanmax, )
def test_novel_deltas_macro(self): asset_info = asset_infos[0][0] base_dates = pd.DatetimeIndex([ pd.Timestamp('2014-01-01'), pd.Timestamp('2014-01-04') ]) baseline = pd.DataFrame({ 'value': (0, 1), 'asof_date': base_dates, 'timestamp': base_dates, }) expr = bz.data(baseline, name='expr', dshape=self.macro_dshape) deltas = bz.data(baseline, name='deltas', dshape=self.macro_dshape) deltas = bz.transform( deltas, value=deltas.value + 10, timestamp=deltas.timestamp + timedelta(days=1), ) nassets = len(asset_info) expected_views = keymap(pd.Timestamp, { '2014-01-03': repeat_last_axis( np.array([10.0, 10.0, 10.0]), nassets, ), '2014-01-06': repeat_last_axis( np.array([10.0, 10.0, 11.0]), nassets, ), }) cal = pd.DatetimeIndex([ pd.Timestamp('2014-01-01'), pd.Timestamp('2014-01-02'), pd.Timestamp('2014-01-03'), # omitting the 4th and 5th to simulate a weekend pd.Timestamp('2014-01-06'), ]) with tmp_asset_finder(equities=asset_info) as finder: expected_output = pd.DataFrame( list(concatv([10] * nassets, [11] * nassets)), index=pd.MultiIndex.from_product(( sorted(expected_views.keys()), finder.retrieve_all(asset_info.index), )), columns=('value',), ) self._run_pipeline( expr, deltas, expected_views, expected_output, finder, calendar=cal, start=cal[2], end=cal[-1], window_length=3, compute_fn=op.itemgetter(-1), )
def summary(feature_names, features, **labels): """Summarize the data we are about to train with. Parameters ---------- feature_names : iterable[str] The names of the features in the ``features`` array. features : np.ndarray The 3d feature array. **labels The named label arrays. Returns ------- summary : str A summary of the features and labels. """ single_attribute_template = dedent( """\ {name}: mean: {mean} std: {std} min: {min} max: {max}""", ) def format_attribute(name, values): return ' ' + '\n '.join( single_attribute_template.format( name=name, mean=values.mean(), std=values.std(), min=values.min(), max=values.max(), ).splitlines(), ) return '\n'.join(concatv( ( 'summary:', ' labels:', ), ( format_attribute(name, value) for name, value in sorted(labels.items(), key=first) ), ( 'features:', ), ( format_attribute(name, features[..., ix]) for ix, name in enumerate(feature_names) ) ))
def load_extensions(default, extensions, strict, environ, reload=False): """Load all of the given extensions. This should be called by run_algo or the cli. Parameters ---------- default : bool Load the default exension (~/.catalyst/extension.py)? extension : iterable[str] The paths to the extensions to load. If the path ends in ``.py`` it is treated as a script and executed. If it does not end in ``.py`` it is treated as a module to be imported. strict : bool Should failure to load an extension raise. If this is false it will still warn. environ : mapping The environment to use to find the default extension path. reload : bool, optional Reload any extensions that have already been loaded. """ if default: default_extension_path = pth.default_extension(environ=environ) pth.ensure_file(default_extension_path) # put the default extension first so other extensions can depend on # the order they are loaded extensions = concatv([default_extension_path], extensions) for ext in extensions: if ext in _loaded_extensions and not reload: continue try: # load all of the catalyst extensionss if ext.endswith('.py'): run_path(ext, run_name='<extension>') else: __import__(ext) except Exception as e: if strict: # if `strict` we should raise the actual exception and fail raise # without `strict` we should just log the failure warnings.warn( 'Failed to load extension: %r\n%s' % (ext, e), stacklevel=2 ) else: _loaded_extensions.add(ext)
def test_deltas_macro(self): expr = bz.data(self.macro_df, name='expr', dshape=self.macro_dshape) deltas = bz.data( self.macro_df.iloc[:-1], name='deltas', dshape=self.macro_dshape, ) deltas = bz.transform( deltas, value=deltas.value + 10, timestamp=deltas.timestamp + timedelta(days=1), ) nassets = len(simple_asset_info) expected_views = keymap(pd.Timestamp, { '2014-01-02': np.array([[10.0], [1.0]]), '2014-01-03': np.array([[11.0], [2.0]]), }) with tmp_asset_finder(equities=simple_asset_info) as finder: expected_output = pd.DataFrame( list(concatv([10] * nassets, [11] * nassets)), index=pd.MultiIndex.from_product(( sorted(expected_views.keys()), finder.retrieve_all(simple_asset_info.index), )), columns=('value',), ) dates = self.dates self._run_pipeline( expr, deltas, None, expected_views, expected_output, finder, calendar=dates, start=dates[1], end=dates[-1], window_length=2, compute_fn=np.nanmax, )
def _test_checkpoints_macro(self, checkpoints, ffilled_value=-1.0): """Simple checkpoints test that accepts a checkpoints dataframe and the expected value for 2014-01-03 for macro datasets. The underlying data has value -1.0 on 2014-01-01 and 1.0 on 2014-01-04. Parameters ---------- checkpoints : pd.DataFrame The checkpoints data. ffilled_value : float, optional The value to be read on the third, if not provided, it will be the value in the base data that will be naturally ffilled there. """ dates = pd.Timestamp('2014-01-01'), pd.Timestamp('2014-01-04') baseline = pd.DataFrame({ 'value': [-1.0, 1.0], 'asof_date': dates, 'timestamp': dates, }) nassets = len(simple_asset_info) expected_views = keymap(pd.Timestamp, { '2014-01-03': np.array([[ffilled_value]]), '2014-01-04': np.array([[1.0]]), }) with tmp_asset_finder(equities=simple_asset_info) as finder: expected_output = pd.DataFrame( list(concatv([ffilled_value] * nassets, [1.0] * nassets)), index=pd.MultiIndex.from_product(( sorted(expected_views.keys()), finder.retrieve_all(simple_asset_info.index), )), columns=('value',), ) self._run_pipeline( bz.data(baseline, name='expr', dshape=self.macro_dshape), None, bz.data( checkpoints, name='expr_checkpoints', dshape=self.macro_dshape, ), expected_views, expected_output, finder, calendar=pd.date_range('2014-01-01', '2014-01-04'), start=pd.Timestamp('2014-01-03'), end=dates[-1], window_length=1, compute_fn=op.itemgetter(-1), )