我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用itertools.filterfalse()。
def display_reviews(self, message): def filter_predicate(x): return self.__cache.IsInCache(self.__get_cachekey(x, message)) reviews = self.__crucible_regex.findall(message.body['text']) reviews = filterfalse(filter_predicate, reviews) if reviews: attachments = [] for reviewid in filterfalse(filter_predicate, reviews): self.__cache.AddToCache(self.__get_cachekey(reviewid, message)) try: msg = self.__get_review_message(reviewid, message._client) if msg is None: msg = self.__get_reviewnotfound_message(reviewid) attachments.append(msg) except requests.exceptions.HTTPError as e: if e.response.status_code == 401: print('Invalid auth') raise if attachments: message.send_webapi('', json.dumps(attachments))
def display_issues(self, message): attachments = [] issues = self.__jira_regex.findall(message.body['text']) def filter_predicate(x): return self.__cache.IsInCache(self.__get_cachekey(x, message)) for issue in filterfalse(filter_predicate, issues): self.__cache.AddToCache(self.__get_cachekey(issue, message)) issue_message = self.get_issue_message(issue) if issue_message is None: issue_message = self.__get_issuenotfound_message(issue) attachments.append(issue_message) if attachments: message.send_webapi('', json.dumps(attachments))
def unique_everseen(iterable, key=None): "List unique elements, preserving order. Remember all elements ever seen." # unique_everseen('AAAABBBCCDAABBB') --> A B C D # unique_everseen('ABBCcAD', str.lower) --> A B C D seen = set() seen_add = seen.add if key is None: for element in itertools.filterfalse(seen.__contains__, iterable): seen_add(element) yield element else: for element in iterable: k = key(element) if k not in seen: seen_add(k) yield element
def unique_everseen(iterable, key=None): """List unique elements, preserving order. Remember all elements ever seen. The snippet is taken form https://docs.python.org/3.6/library/itertools.html#itertools-recipes >>> unique_everseen('AAAABBBCCDAABBB') A B C D >>> unique_everseen('ABBCcAD', str.lower) A B C D """ seen = set() seen_add = seen.add if key is None: for element in filterfalse(seen.__contains__, iterable): seen_add(element) yield element else: for element in iterable: k = key(element) if k not in seen: seen_add(k) yield element
def main(): # ???????????? # ??accumulate()????????????,????islice()?????????len(items)-2?????None???????? for n in itertools.islice(itertools.accumulate(cost(item) for item in items[1:]), len(items) - 2, None): print(n) # ????????????? print(n / (len(items) - 1)) # ????????? owners = {} for ky, grp in itertools.groupby(sorted(items[1:], key=owner), key=owner): owners[ky] = len(list(grp)) for member in members[1:]: print(member[1], " : ", owners[member[0]]) # ??????returned()???,???????????????.???filterfalse()??????,???????????????,???????????? print([items[int(loan[1])] for loan in itertools.filterfalse(returned, loans)])
def make_node_set(node_set, reverse=False): ids = set() def is_unique_id(node): node_id = id(node) if node_id in ids: return False else: ids.add(node_id) return True if not isinstance(node_set, list): node_set = [node_set] non_node_member = next(filterfalse(is_any_node, node_set), False) if non_node_member: format_str = 'Constructed node set that includes {0} object "{1}"' raise HqueryEvaluationError(format_str.format(object_type_name(non_node_member), non_node_member)) node_set = list(sorted(filter(is_unique_id, node_set), key=lambda n: n.hq_doc_index, reverse=reverse)) return node_set
def download_manga(manga_name, range_start=1, range_end=None, b_make_cbz=False, remove=False): """Download a range of a chapters""" chapter_urls = get_chapter_urls(manga_name) if range_end == None : range_end = max(chapter_urls.keys()) for chapter, url in filterfalse (lambda chapter_url: chapter_url[0] < range_start or chapter_url[0] > range_end, chapter_urls.items()): chapter_number = get_chapter_number(url) print('===============================================') print('Chapter ' + chapter_number) print('===============================================') image_urls = get_chapter_image_urls(url) download_urls(image_urls, manga_name, chapter_number) download_dir = './{0}/{1}'.format(manga_name, chapter_number) if b_make_cbz is True: make_cbz(download_dir) if remove is True: shutil.rmtree(download_dir)
def members(): all_users = json.loads(database.get_all_users()) if not all_users['ok']: flask.abort(500) users = [User(username=user['username'], skills=user['skills'], points=user['points'], last_seen='Not Available') for user in all_users['response']] order = flask.request.args.get('order') lang = flask.request.args.get('lang', '').lower() if order == 'points': users.sort(key=attrgetter('points'), reverse=True) else: users.sort(key=attrgetter('username')) if lang: t1, t2 = tee(users) lang_yes = filter(lambda user: lang in user.skills.lower(), t1) lang_no = filterfalse(lambda user: lang in user.skills.lower(), t2) users = list(lang_yes) + list(lang_no) return flask.render_template('members.html', members=users)
def uniq(iterable, key=None): "List unique elements, preserving order. Remember all elements ever seen." # unique_everseen('AAAABBBCCDAABBB') --> A B C D # unique_everseen('ABBCcAD', str.lower) --> A B C D seen = set() seen_add = seen.add if key is None: for element in filterfalse(seen.__contains__, iterable): seen_add(element) yield element else: for element in iterable: k = key(element) if k not in seen: seen_add(k) yield element
def partition(iterable, predicate): """Divide the iterable into two iterables according to the predicate. >>> evens, odds = partition(range(10), lambda x: not x % 2) >>> list(evens) [0, 2, 4, 6, 8] >>> list(odds) [1, 3, 5, 7, 9] """ t1, t2 = tee(iterable) return filter(predicate, t1), filterfalse(predicate, t2)
def unique(*iterables, key=None): """Yield unique elements, preserving order. >>> ''.join(unique('AAAABBBCCDAABBB')) 'ABCD' >>> ''.join(unique('AAAA', 'BBBC', 'CDA', 'ABBB')) 'ABCD' >>> ''.join(unique('ABBCcAD', key=str.casefold)) 'ABCD' """ combined = chain.from_iterable(iterables) yielded = set() # Avoid inner-loop name lookups already_yielded = yielded.__contains__ remember = yielded.add if key is None: for element in filterfalse(already_yielded, combined): remember(element) yield element else: for element in combined: k = key(element) if not already_yielded(k): remember(k) yield element
def remove_duplicates(iterable, key=None): """ Renvoie un generateur sur un iterable qui enleve tous les elements en double dans une liste, conservant l'ordre.""" seen = set() seen_add = seen.add if key is None: for element in filterfalse(seen.__contains__, iterable): seen_add(element) yield element else: for element in iterable: k = key(element) if k not in seen: seen_add(k) yield element
def _parse_merged_entities(self): """set self._merged_entities to the longest possible(wrapping) tokens """ self._merged_entities = list(filterfalse( lambda token: self._is_wrapped(token, self.entities), self.entities))
def _parse_all_merged_entities(self): """set self._all_merged_entities to the longest possible(wrapping) tokens including non-entity tokens """ self._all_merged_entities = list(filterfalse( lambda token: self._is_wrapped(token, self.all_entities), self.all_entities))
def phase1(self): # Compute common names a = dict(zip(map(os.path.normcase, self.left_list), self.left_list)) b = dict(zip(map(os.path.normcase, self.right_list), self.right_list)) self.common = list(map(a.__getitem__, filter(b.__contains__, a))) self.left_only = list(map(a.__getitem__, filterfalse(b.__contains__, a))) self.right_only = list(map(b.__getitem__, filterfalse(a.__contains__, b)))
def _filter(flist, skip): return list(filterfalse(skip.__contains__, flist)) # Demonstration and testing. #
def test_itertools_filterfalse(self): """ Tests whether itertools.filterfalse is available. """ from itertools import filterfalse not_div_by_3 = filterfalse(lambda x: x % 3 == 0, range(8)) self.assertEqual(list(not_div_by_3), [1, 2, 4, 5, 7])
def test_install_aliases(self): """ Does the install_aliases() interface monkey-patch urllib etc. successfully? """ from future.standard_library import remove_hooks, install_aliases remove_hooks() install_aliases() from collections import Counter, OrderedDict # backported to Py2.6 from collections import UserDict, UserList, UserString # Requires Python dbm support: # import dbm # import dbm.dumb # import dbm.gnu # import dbm.ndbm from itertools import filterfalse, zip_longest from subprocess import check_output # backported to Py2.6 from subprocess import getoutput, getstatusoutput from sys import intern # test_support may not be available (e.g. on Anaconda Py2.6): # import test.support import urllib.error import urllib.parse import urllib.request import urllib.response import urllib.robotparser self.assertTrue('urlopen' in dir(urllib.request))
def riffle_shuffle(iterable, n=2): """Generator that performs a perfect riffle shuffle on the input, using a given number of subdecks.""" return itertools.filterfalse(non, itertools.chain.from_iterable(zip(*list(itertools.zip_longest(*[iter(iterable)]*n))))) # Mappings
def unique_everseen(iterable, key=None): seen = set() seen_add = seen.add if key is None: for element in filterfalse(seen.__contains__, iterable): seen_add(element) yield element else: for element in iterable: k = key(element) if k not in seen: seen_add(k) yield element
def unique_everseen(iterable, key=None): """ The generator to list unique elements, preserving the order. Remember all elements ever seen. This was taken from the itertools recipes. Args: iterable: An iterable to process. key: Optional function to run when checking elements (e.g., str.lower) Returns: Generator: Yields a generator object. """ seen = set() seen_add = seen.add if key is None: for element in filterfalse(seen.__contains__, iterable): seen_add(element) yield element else: for element in iterable: k = key(element) if k not in seen: seen_add(k) yield element
def __get_preview_window(self): return next(filterfalse(lambda x: not x.options['previewwindow'], self.vim.windows), None)
def __get_preview_window(self): return next(filterfalse(lambda x: not x.options['previewwindow'], self.vim.windows), None) # Needed for openable actions
def parse_lines(msg_list): msg_list = filterfalse(lambda x: '#' in x, msg_list) new_list = [] for line in msg_list: try: new_list.append(parse_line(line)) except (ValueError, TypeError): logger.debug('Error parsing {}'.format(line)) return convert_to_df(new_list)
def partition(predicate, iterable): """Use a predicate to partition true and false entries. Reference --------- Python itertools documentation. """ t1, t2 = tee(iterable) return filterfalse(predicate, t1), filter(predicate, t2)
def send_game_start(self, session): conn_in_room = itertools.filterfalse( lambda x: x.room is None or x.spectate is True, self.server.connections) conns = sorted(conn_in_room, key=lambda x: x.room) for room_id, room_conns in itertools.groupby(conns, key=lambda x: x.room): if not room_id: continue self.check_song_start(session, room_id, room_conns)
def difference(a, b): """ Equivalent to A-B or A\B in set theory. Difference/Relative Complement :param a: First list of dicts :param b: Second list of dicts :return: List of elements in a but not in b """ return list(filterfalse(lambda x: x in b, a))
def partition(pred, iterable): """Use a predicate to partition entries into false entries and true entries.""" # https://stackoverflow.com/questions/8793772/how-to-split-a-sequence-according-to-a-predicate # NOTE: this might iterate over the collection twice # NOTE: need to use filter(s) here because we're lazily dealing with iterators it1, it2 = itertools.tee(iterable) return itertools.filterfalse(pred, it1), filter(pred, it2) # pylint: disable=bad-builtin
def to_python(self, value): if not value: return [] if not isinstance(value, str): raise ValidationError( '%s is not a valid string value.' % str(value)) result = [item.strip() for item in filterfalse( lambda item: item.strip() == '', value.split(self.delimiter))] return result
def parse_address(branch): """ expected address structure: required: organization : str add_no : int optional: full_address : str organization synonyms : list of str country : str city : str state : str zipcode : str street : str """ success = True try: org_names = branch.findall(org_path) def condition(x): return x.attrib and 'pref' in x.attrib and x.attrib['pref'] == 'Y' # find first org with pref='Y' orgs_pref = list(filter(condition, org_names)) orgs_pref = list(map(lambda x: x.text, filterfalse(lambda x: x is None, orgs_pref))) result_dict = {'organizations_pref': orgs_pref} orgs_rest = list(filterfalse(condition, org_names)) orgs_rest = list(map(lambda x: x.text, filterfalse(lambda x: x is None, orgs_rest))) result_dict.update({'organizations': orgs_rest}) suborg_names = branch.findall(suborg_path) suborgs = list(map(lambda x: x.text, filterfalse(lambda y: y is None, suborg_names))) result_dict.update({'suborganizations': suborgs}) if branch.attrib: if add_no_key in branch.attrib: # TODO add try-catch-raise with logging # if not int-able : exception triggered addr_number = int(branch.attrib[add_no_key]) result_dict.update({add_no_key: addr_number}) else: result_dict.update({add_no_key: 1}) # entries below are optional add_entry(result_dict, branch, full_address_path) add_entry(result_dict, branch, country_path) add_entry(result_dict, branch, city_path) add_entry(result_dict, branch, state_path) add_entry(result_dict, branch, zipcode_path) add_entry(result_dict, branch, street_path) except: success = False result_dict = etree_to_dict(branch) return success, result_dict
def test_future_moves(self): """ Ensure everything is available from the future.moves interface that we claim and expect. (Issue #104). """ from future.moves.collections import Counter, OrderedDict # backported to Py2.6 from future.moves.collections import UserDict, UserList, UserString from future.moves import configparser from future.moves import copyreg from future.moves.itertools import filterfalse, zip_longest from future.moves import html import future.moves.html.entities import future.moves.html.parser from future.moves import http import future.moves.http.client import future.moves.http.cookies import future.moves.http.cookiejar import future.moves.http.server from future.moves import queue from future.moves import socketserver from future.moves.subprocess import check_output # even on Py2.6 from future.moves.subprocess import getoutput, getstatusoutput from future.moves.sys import intern from future.moves import urllib import future.moves.urllib.error import future.moves.urllib.parse import future.moves.urllib.request import future.moves.urllib.response import future.moves.urllib.robotparser try: # Is _winreg available on Py2? If so, ensure future.moves._winreg is available too: import _winreg except ImportError: pass else: from future.moves import winreg from future.moves import xmlrpc import future.moves.xmlrpc.client import future.moves.xmlrpc.server from future.moves import _dummy_thread from future.moves import _markupbase from future.moves import _thread
def filter_candidates(self, context): for source in self._current_sources: ctx = source.context ctx['matchers'] = context['matchers'] ctx['input'] = context['input'] if context['smartcase']: ctx['ignorecase'] = re.search(r'[A-Z]', ctx['input']) is None ctx['mode'] = context['mode'] ctx['async_timeout'] = 0.03 if ctx['mode'] != 'insert' else 0.02 if ctx['prev_input'] != ctx['input'] and ctx['is_interactive']: ctx['event'] = 'interactive' ctx['all_candidates'] = self._gather_source_candidates( ctx, source) ctx['prev_input'] = ctx['input'] entire = ctx['all_candidates'] if ctx['is_async']: ctx['event'] = 'async' entire += self._gather_source_candidates(ctx, source) if not entire: yield source.name, entire, [], [] continue partial = [] ctx['candidates'] = entire for i in range(0, len(entire), 1000): ctx['candidates'] = entire[i:i+1000] matchers = [self._filters[x] for x in (ctx['matchers'].split(',') if ctx['matchers'] else source.matchers) if x in self._filters] self.match_candidates(ctx, matchers) partial += ctx['candidates'] if len(partial) >= 1000: break ctx['candidates'] = partial for f in [self._filters[x] for x in source.sorters + source.converters if x in self._filters]: ctx['candidates'] = f.filter(ctx) partial = ctx['candidates'] for c in partial: c['source'] = source.name ctx['candidates'] = [] patterns = filterfalse(lambda x: x == '', ( self._filters[x].convert_pattern(context['input']) for x in source.matchers if self._filters[x])) yield source.name, entire, partial, patterns
def add_many(self, name, timestamp_pairs, chunks_size=2000, *args, **kwargs): """ :param name: :param timestamp_pairs: [("timestamp",data)] :param chunks_size: :param args: :param kwargs: :return: """ incr_key = self.incr_format.format(key=name) hash_key = self.hash_format.format(key=name) # remove exist data # todo maybe other way to optimize this filter code sorted_timestamps = sorted(timestamp_pairs, key=itemgetter(0)) max_timestamp = sorted_timestamps[-1][0] # max min_timestamp = sorted_timestamps[0][0] # min filter_data = self.get_slice(name, start=min_timestamp, end=max_timestamp) if filter_data: timestamp_set = set(map(lambda x: x[0], filter_data)) filter_results = itertools.filterfalse(lambda x: x[0] in timestamp_set, sorted_timestamps) else: filter_results = sorted_timestamps chunks_data = helper.chunks(filter_results, chunks_size) with self._pipe_acquire() as pipe: for chunks in chunks_data: start_id = self.client.get(incr_key) or 1 # if key not exist id equal 0 end_id = self.client.incrby(incr_key, amount=len(chunks)) # incr the add length start_id = int(start_id) end_id = int(end_id) ids_range = range(start_id, end_id) dumps_results = map(lambda x: (x[0], self.serializer.dumps(x[1])), chunks) mix_data = itertools.zip_longest(dumps_results, ids_range) # [(("timestamp",data),id),...] mix_data = list(mix_data) # need converted as list timestamp_ids = map(lambda seq: (seq[0][0], seq[1]), mix_data) # [("timestamp",id),...] ids_pairs = map(lambda seq: (seq[1], seq[0][1]), mix_data) # [("id",data),...] timestamp_ids = itertools.chain.from_iterable(timestamp_ids) ids_values = {k: v for k, v in ids_pairs} pipe.multi() pipe.zadd(name, *timestamp_ids) pipe.hmset(hash_key, ids_values) pipe.execute()