我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用itertools.starmap()。
def elements(self): '''Iterator over elements repeating each as many times as its count. >>> c = Counter('ABCABC') >>> sorted(c.elements()) ['A', 'A', 'B', 'B', 'C', 'C'] # Knuth's example for prime factors of 1836: 2**2 * 3**3 * 17**1 >>> prime_factors = Counter({2: 2, 3: 3, 17: 1}) >>> product = 1 >>> for factor in prime_factors.elements(): # loop over factors ... product *= factor # and multiply them >>> product 1836 Note, if an element's count has been set to zero or is a negative number, elements() will ignore it. ''' # Emulate Bag.do from Smalltalk and Multiset.begin from C++. return _chain.from_iterable(_starmap(_repeat, self.items())) # Override dict methods where necessary
def elements(self): '''Iterator over elements repeating each as many times as its count. >>> c = Counter('ABCABC') >>> sorted(c.elements()) ['A', 'A', 'B', 'B', 'C', 'C'] # Knuth's example for prime factors of 1836: 2**2 * 3**3 * 17**1 >>> prime_factors = Counter({2: 2, 3: 3, 17: 1}) >>> product = 1 >>> for factor in prime_factors.elements(): # loop over factors ... product *= factor # and multiply them >>> product 1836 Note, if an element's count has been set to zero or is a negative number, elements() will ignore it. ''' # Emulate Bag.do from Smalltalk and Multiset.begin from C++. return _chain.from_iterable(_starmap(_repeat, self.iteritems())) # Override dict methods where necessary
def _readBitwiseImageData(bitmapObject, name, attrs, content, ttFont): bitDepth = safeEval(attrs['bitDepth']) metrics = SmallGlyphMetrics() metrics.width = safeEval(attrs['width']) metrics.height = safeEval(attrs['height']) # A dict for mapping from ASCII to binary. All characters are considered # a '1' except space, period and '0' which maps to '0'. binaryConv = {' ':'0', '.':'0', '0':'0'} dataRows = [] for element in content: if not isinstance(element, tuple): continue name, attr, content = element if name == 'row': mapParams = zip(attr['value'], itertools.repeat('1')) rowData = strjoin(itertools.starmap(binaryConv.get, mapParams)) dataRows.append(_binary2data(rowData)) bitmapObject.setRows(dataRows, bitDepth=bitDepth, metrics=metrics, reverseBytes=True)
def __call__(self,data): if len(data) < 2: if 1 == len(data): return data[:] return [] if len(data) != self.n: self.__del__() self.n = len(data) size = c_ulong(self.n) self.workspace = real_workspace_alloc(size) self.wavetable = real_wavetable_alloc(size) a = array('d',data) # need a copy of the data real_transform(ADDRESS(a),1,self.n,self.wavetable,self.workspace) rv = [complex(a[0]),] rv.extend(itertools.starmap(complex,grouper(2,a[1:],fillvalue=0))) return rv
def pbkdf2_bin(data, salt, iterations=1000, keylen=24, hashfunc=None): """Returns a binary digest for the PBKDF2 hash algorithm of `data` with the given `salt`. It iterates `iterations` time and produces a key of `keylen` bytes. By default SHA-1 is used as hash function, a different hashlib `hashfunc` can be provided. """ hashfunc = hashfunc or hashlib.sha1 mac = hmac.new(data, None, hashfunc) def _pseudorandom(x, mac=mac): h = mac.copy() h.update(x) return map(ord, h.digest()) buf = [] for block in xrange(1, -(-keylen // mac.digest_size) + 1): rv = u = _pseudorandom(salt + _pack_int(block)) for i in xrange(iterations - 1): u = _pseudorandom(''.join(map(chr, u))) rv = starmap(xor, izip(rv, u)) buf.extend(rv) return ''.join(map(chr, buf))[:keylen]
def test_join(self): names = [(1, 'one'), (2, 'two'), (3, 'three')] fruit = [('apple', 1), ('orange', 1), ('banana', 2), ('coconut', 2)] def addpair(pair): return pair[0] + pair[1] result = set(starmap(add, join(first, names, second, fruit))) expected = {((1, 'one', 'apple', 1)), ((1, 'one', 'orange', 1)), ((2, 'two', 'banana', 2)), ((2, 'two', 'coconut', 2))} assert result == expected result = set(starmap(add, join(first, names, second, fruit, left_default=no_default2, right_default=no_default2))) assert result == expected
def pbkdf2_bin(data, salt, iterations=1000, key_len=24, hash_func=None): """Returns a binary digest for the PBKDF2 hash algorithm of `data` with the given `salt`. It iterates `iterations` time and produces a key of `key_len` bytes. By default SHA-1 is used as hash function, a different hashlib `hash_func` can be provided. """ hash_func = hash_func or hashlib.sha1 mac = hmac.new(data, None, hash_func) def _pseudo_random(x): h = mac.copy() h.update(x) return h.digest() buf = bytearray() for block in range(1, -(-key_len // mac.digest_size) + 1): rv = u = _pseudo_random(salt + _pack_int(block)) for i in range(iterations - 1): u = _pseudo_random(u) rv = starmap(xor, zip(rv, u)) buf.extend(rv) return bytes(buf[:key_len])
def _show_question(self, n): question = self._current_question self._choices = question.choices leader = self.leader leader_text = f'{leader[0]} with {leader[1]} points' if leader else None description = self.category.description is_tf = question.type == 'boolean' tf_header = '**True or False**\n' * is_tf question_field = f'{tf_header}{question.question}' possible_answers = '\n'.join(itertools.starmap('{0}. {1}'.format, enumerate(self._choices, 1))) embed = (discord.Embed(description=description, colour=random.randint(0, 0xFFFFFF)) .set_author(name=self.category.name) .add_field(name='Category', value=question.category, inline=False) .add_field(name=f'Question #{n}', value=question_field, inline=False) .set_footer(text=f'Current leader: {leader_text}') ) if not is_tf: embed.add_field(name='Possible answers', value=possible_answers, inline=True) await self.ctx.send(embed=embed)
def __init__(self, ctx, opponent): self.ctx = ctx self.board = Board(ctx._ttt_size) self.opponent = opponent xo = random.sample((Tile.X, Tile.O), 2) self.players = random.sample(list(map(Player, (self.ctx.author, self.opponent), xo)), 2) self._current = None self._runner = None size = self.ctx._ttt_size help_text = ('Type the column and the row in the format `column row` to make your move!\n' 'Or `quit` to stop the game (you will lose though).') player_field = '\n'.join(itertools.starmap('{1} = **{0}**'.format, self.players)) self._game_screen = (discord.Embed(colour=0x00FF00) .set_author(name=f'Tic-Tac-toe - {size} x {size}') .add_field(name='Players', value=player_field) .add_field(name='Current Player', value=None, inline=False) .add_field(name='Instructions', value=help_text) )
def __init__(self, ctx, opponent): self.ctx = ctx self.board = Board() self.opponent = opponent xo = random.sample((Tile.X, Tile.O), 2) self.players = random.sample(list(map(Player, (self.ctx.author, self.opponent), xo)), 2) self._current = None self._runner = None player_field = '\n'.join(itertools.starmap('{1} = **{0}**'.format, self.players)) instructions = ('Type the number of the column to play!\n' 'Or `quit` to stop the game (you will lose though).') self._game_screen = (discord.Embed(colour=0x00FF00) .set_author(name=f'Connect 4 - {self.ctx.author} vs {self.opponent}') .add_field(name='Players', value=player_field) .add_field(name='Current Player', value=None, inline=False) .add_field(name='Instructions', value=instructions) )
def tag_by(self, ctx, *, member: discord.Member = None): """Shows all the tags in the server.""" member = member or ctx.author query = (ctx.session.select.from_(Tag) .where((Tag.location_id == ctx.guild.id) & (Tag.owner_id == member.id)) .order_by(Tag.name) ) tags = [tag.name async for tag in await query.all()] entries = ( itertools.starmap('{0}. {1}'.format, enumerate(tags, 1)) if tags else (f"{member} didn't make any tags yet. :(", ) ) paginator = MemberTagPaginator(ctx, entries, member=member) await paginator.interact()
def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, error_callback=None): ''' Helper function to implement map, starmap and their async counterparts. ''' if self._state != RUN: raise ValueError("Pool not running") if not hasattr(iterable, '__len__'): iterable = list(iterable) if chunksize is None: chunksize, extra = divmod(len(iterable), len(self._pool) * 4) if extra: chunksize += 1 if len(iterable) == 0: chunksize = 0 task_batches = Pool._get_tasks(func, iterable, chunksize) result = MapResult(self._cache, chunksize, len(iterable), callback, error_callback=error_callback) self._taskqueue.put((((result._job, i, mapper, (x,), {}) for i, x in enumerate(task_batches)), None)) return result
def check_extras(dist, attr, value): """Verify that extras_require mapping is valid""" try: list(itertools.starmap(_check_extra, value.items())) except (TypeError, ValueError, AttributeError): raise DistutilsSetupError( "'extras_require' must be a dictionary whose values are " "strings or lists of strings containing valid project/version " "requirement specifiers." )
def get_exclusions(self): """ Return a collections.Sized collections.Container of paths to be excluded for single_version_externally_managed installations. """ all_packages = ( pkg for ns_pkg in self._get_SVEM_NSPs() for pkg in self._all_packages(ns_pkg) ) excl_specs = product(all_packages, self._gen_exclusion_paths()) return set(starmap(self._exclude_pkg_path, excl_specs))
def scan_egg_links(self, search_path): dirs = filter(os.path.isdir, search_path) egg_links = ( (path, entry) for path in dirs for entry in os.listdir(path) if entry.endswith('.egg-link') ) list(itertools.starmap(self.scan_egg_link, egg_links))
def __repr__(self): return '{}({})'.format( type(self).__name__, ', '.join(itertools.starmap('{!s}={!r}'.format, sorted(vars(self).items()))))
def __str__(self): return '<{} {}>'.format(type(self).__name__, ' '.join(starmap( lambda fn, f: '{}={}'.format(fn, repr(getattr(self, fn))), self.proxy, )))