我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用itertools.dropwhile()。
def starts_when(iterable, condition): # type: (Iterable, Union[Callable, Any]) -> Iterable """Start yielding items when a condition arise. Args: iterable: the iterable to filter. condition: if the callable returns True once, start yielding items. If it's not a callable, it will be converted to one as `lambda condition: condition == item`. Example: >>> list(starts_when(range(10), lambda x: x > 5)) [6, 7, 8, 9] >>> list(starts_when(range(10), 7)) [7, 8, 9] """ if not callable(condition): cond_value = condition def condition(x): return x == cond_value return itertools.dropwhile(lambda x: not condition(x), iterable)
def getMenu(today): day = today.weekday() try: dom = get_dom(URL) menu = dom.xpath("/html/body//div[@class='fck']/*[self::h3 or self::p]//text()") menu = dropwhile(lambda line: days_lower[day] not in line.lower(), menu) menu = islice(skip_empty_lines(menu), 1, 3) menu = '<br>'.join(menu) except: menu = '' return { 'name': 'Gólvonal', 'url': URL, 'menu': menu }
def lines(self): source = '' lines = self.definition._source[self.definition._slice] offset = self.definition.start lines_stripped = list(reversed(list(dropwhile(is_blank, reversed(lines))))) numbers_width = 0 for n, line in enumerate(lines_stripped): numbers_width = max(numbers_width, n + offset) numbers_width = len(str(numbers_width)) numbers_width = 6 for n, line in enumerate(lines_stripped): source += '%*d: %s' % (numbers_width, n + offset, line) if n > 5: source += ' ...\n' break return source
def pathgen(stroke, path, height, split_at_invisible, f=lambda v: not v.attribute.visible): """Generator that creates SVG paths (as strings) from the current stroke """ it = iter(stroke) # start first path yield path for v in it: x, y = v.point yield '{:.3f}, {:.3f} '.format(x, height - y) if split_at_invisible and v.attribute.visible is False: # end current and start new path; yield '" />' + path # fast-forward till the next visible vertex it = itertools.dropwhile(f, it) # yield next visible vertex svert = next(it, None) if svert is None: break x, y = svert.point yield '{:.3f}, {:.3f} '.format(x, height - y) # close current path yield '" />'
def getMenu(today): day = today.weekday() try: is_this_week = lambda date: datetime.strptime(date, '%Y-%m-%dT%H:%M:%S%z').date() > today.date() - timedelta(days=7) menu_filter = lambda post: is_this_week(post['created_time']) and sum(day in post['message'].lower() for day in days_lower) > 3 menu = get_filtered_fb_post(FB_ID, menu_filter) menu = dropwhile(lambda line: days_lower[day] not in line.lower(), skip_empty_lines(menu.split('\n'))) menu = islice(menu, 1, 4) menu = '<br>'.join(menu) except: menu = '' return { 'name': 'Amici Miei', 'url': FB_PAGE, 'menu': menu }
def getFBMenu(today): day = today.weekday() menu = '' try: if day < 5: is_this_week = lambda date: datetime.strptime(date, '%Y-%m-%dT%H:%M:%S%z').date() > today.date() - timedelta(days=7) menu_filter = lambda post: is_this_week(post['created_time']) and "jelmagyarázat" in post['message'].lower() menu = get_filtered_fb_post(FB_ID, menu_filter) post_parts = menu.split("HETI MENÜ") if len(post_parts) > 1: weekly_menu = post_parts[1] menu = weekly_menu.strip().split("\n") menu = islice(dropwhile(lambda l: days_lower[day] not in l, menu), 1, None) menu = takewhile(lambda l: not any(day in l for day in days_lower), menu) menu = '<br>'.join(skip_empty_lines(menu)) else: menu = f'<a href="{get_fb_cover_url(FB_ID)}">heti menü</a>' except: pass return menu
def default(self): """Return last changes in truncated unified diff format""" output = ensure_unicode(self.git.log( '-1', '-p', '--no-color', '--format=%s', ).stdout) lines = output.splitlines() return u'\n'.join( itertools.chain( lines[:1], itertools.islice( itertools.dropwhile( lambda x: not x.startswith('+++'), lines[1:], ), 1, None, ), ) )
def _trim_silence(self, audio: ndarray) -> ndarray: def trim_start(sound: ndarray) -> ndarray: return numpy.array(list(dropwhile(lambda x: x < self.silence_threshold_for_not_normalized_sound, sound))) def trim_end(sound: ndarray) -> ndarray: return flipud(trim_start(flipud(sound))) return trim_start(trim_end(audio))
def write_pot_file(potfile, msgs): """ Write the :param potfile: POT file with the :param msgs: contents, previously making sure its format is valid. """ if os.path.exists(potfile): # Strip the header msgs = '\n'.join(dropwhile(len, msgs.split('\n'))) else: msgs = msgs.replace('charset=CHARSET', 'charset=UTF-8') with io.open(potfile, 'a', encoding='utf-8') as fp: fp.write(msgs)
def write_pot_file(potfile, msgs): """ Write the :param potfile: POT file with the :param msgs: contents, previously making sure its format is valid. """ pot_lines = msgs.splitlines() if os.path.exists(potfile): # Strip the header lines = dropwhile(len, pot_lines) else: lines = [] found, header_read = False, False for line in pot_lines: if not found and not header_read: found = True line = line.replace('charset=CHARSET', 'charset=UTF-8') if not line and not found: header_read = True lines.append(line) msgs = '\n'.join(lines) with io.open(potfile, 'a', encoding='utf-8') as fp: fp.write(msgs)
def _gen_words(match, splits): groups = list(it.dropwhile(lambda x: not x, match.groups())) for s in splits: try: num = int(s) except ValueError: word = s else: word = next(it.islice(groups, num, num + 1)) yield word
def get_first_occurence(occurences, from_=None, until=None): results = occurences if until is not None: results = takewhile(lambda x: x <= until, results) if from_ is not None: results = dropwhile(lambda x: x < from_, results) return list(results)[0]
def occurences_from(from_, dates, is_ints=False): _from = from_ if is_ints: _from = from_ and dt2int(from_) or None if _from is not None: results = dropwhile(lambda x: x < _from, dates) return results return dates #@region.cache_on_arguments()
def pig_latin(word): if is_vowel(word[0]): return word + 'yay' else: remain = ''.join(dropwhile(is_consonant, word)) removed = word[:len(word) - len(remain)] return remain + removed + 'ay'
def pathgen(stroke, style, height, split_at_invisible, stroke_color_mode, f=lambda v: not v.attribute.visible): """Generator that creates SVG paths (as strings) from the current stroke """ if len(stroke) <= 1: return "" if stroke_color_mode != 'BASE': # try to use the color of the first or last vertex try: index = 0 if stroke_color_mode == 'FIRST' else -1 color = format_rgb(stroke[index].attribute.color) style["stroke"] = color except (ValueError, IndexError): # default is linestyle base color pass # put style attributes into a single svg path definition path = '\n<path ' + "".join('{}="{}" '.format(k, v) for k, v in style.items()) + 'd=" M ' it = iter(stroke) # start first path yield path for v in it: x, y = v.point yield '{:.3f}, {:.3f} '.format(x, height - y) if split_at_invisible and v.attribute.visible is False: # end current and start new path; yield '" />' + path # fast-forward till the next visible vertex it = itertools.dropwhile(f, it) # yield next visible vertex svert = next(it, None) if svert is None: break x, y = svert.point yield '{:.3f}, {:.3f} '.format(x, height - y) # close current path yield '" />'
def split_by(pred, seq): """ Splits start of sequence, consisting of items passing predicate, from the rest of it. Works similar to takewhile(pred, seq), dropwhile(pred, seq), but works with iterator seq correctly. """ a, b = _tee(seq) return _takewhile(pred, a), _dropwhile(pred, b) # # Special iteration #
def source(self): """Return the source code for the definition.""" full_src = self._source[self._slice] def is_empty_or_comment(line): return line.strip() == '' or line.strip().startswith('#') filtered_src = dropwhile(is_empty_or_comment, reversed(full_src)) return ''.join(reversed(list(filtered_src)))
def dropwhile(predicate, col): ''' Drop elements from a collection while the predicate holds. Return a list of those elements that are left ''' return list(idropwhile(predicate, col))
def ios_chunk(line_it): chunk_start = b': GoogleAnalytics ' chunk_end = b'timestamp = ' it = itertools.dropwhile(lambda x: chunk_start not in x, line_it) next(it) return itertools.takewhile(lambda x: chunk_end not in x, it)
def memoryPills(pills): gen = dropwhile(lambda s: len(s) % 2 != 0, pills + [""] * 3) next(gen) return [next(gen) for _ in range(3)]
def enumerateslice(iterable, first, last): return itertools.takewhile(lambda (i,v): i<last, itertools.dropwhile(lambda (i,v): i<first, enumerate(iterable))) ## Convert fractional day into datetime.time
def collect_docstring(lines): """Return document docstring if it exists""" lines = dropwhile(lambda x: not x.startswith('"""'), lines) doc = "" for line in lines: doc += line if doc.endswith('"""\n'): break return doc[3:-4].replace("\r", "").replace("\n", " ")
def validate_turtle(contents): all_lines = contents.splitlines() # Separate prefixes from the body prefix_lines = [line for line in all_lines if line.startswith(u"@prefix")] prefixes = { "books": "http://www.books.org/", "isbn": "http://www.books.org/isbn/", "xsd": "http://www.w3.org/2001/XMLSchema#" } # Validate that specified prefixes are there for pre, url in prefixes.items(): pattern = r"@prefix {}:[\s]* <{}> \.".format(pre, url) assert any([re.match(pattern, x) is not None for x in prefix_lines]), \ "{} is not found among prefixes".format(pre) # Validate subject grouping # Move the cursor until the first subject iter = dropwhile(lambda x: len(x) == 0 or re.match("[\s]+$", x) or x.startswith(u"@prefix"), all_lines) # Check the block for each subject for s in range(NUM_SUBJECTS): this_sub_lines = list(takewhile(lambda x: len(x) != 0 and not re.match("[\s]+$", x), iter)) assert len(this_sub_lines) == NUM_TRIPLES_PER_SUBJ # First line is where subject is defined subj_line = this_sub_lines[0] assert subj_line.startswith(u"isbn:") assert subj_line.endswith(";") # Rest of the lines starts with some whitespace assert all([re.match(r"^[\s]+", x) for x in this_sub_lines[1:]]) # Next two lines end with ; assert all([x.endswith(u";") for x in this_sub_lines[1:(NUM_TRIPLES_PER_SUBJ-1)]]) # Last line ends with a dot assert this_sub_lines[-1].endswith(u".") # Each line has a "books:" for the predicate assert all(["books:" in x for x in this_sub_lines]) # One of the lines has true or false assert any(["true" in x or "false" in x for x in this_sub_lines]) # Two of the lines has xsd: assert sum([1 for x in this_sub_lines if "xsd:" in x]) == 2
def get_after(sentinel, iterable): "Get the value after `sentinel` in an `iterable`" truncated = dropwhile(lambda el: el != sentinel, iterable) next(truncated) return next(truncated)
def parse_pr_message(message): message_by_line = message.split("\n") if len(message) == 0: return None, None title = message_by_line[0] body = "\n".join(itertools.dropwhile( operator.not_, message_by_line[1:])) return title, body
def next(self): in_list = self._get_list_attribute_is_member_off() if in_list is None: return None next_node = list(itertools.dropwhile(lambda x: x is not self, in_list))[1:] return next_node[0] if next_node else None
def next_generator(self): in_list = self._get_list_attribute_is_member_off() if in_list is None: return None generator = itertools.dropwhile(lambda x: x is not self, in_list) next(generator) return generator
def previous(self): in_list = self._get_list_attribute_is_member_off() if in_list is None: return None next_node = list(itertools.dropwhile(lambda x: x is not self, reversed(in_list)))[1:] return next_node[0] if next_node else None
def previous_generator(self): in_list = self._get_list_attribute_is_member_off() if in_list is None: return None generator = itertools.dropwhile(lambda x: x is not self, reversed(in_list)) next(generator) return generator
def load_info(self, bot): with open(os.path.join(self.path, 'plugin.yml')) as yml_file: yml = yaml.safe_load(yml_file) if 'enabled' not in yml or not yml['enabled']: raise PluginNotEnabled path = self.path.split('/') name = path.pop() # use the directory as plugin name if it is not set if 'name' not in yml: yml['name'] = name self.name = yml['name'] self.log = create_logger(self.name) self.log.info('Loading plugin {:s}'.format(self.name)) # set categories from rest of pathname if 'categories' not in yml: it = iter(path) cat = [dropwhile(lambda x: x != 'plugins', it)][1:] yml['categories'] = cat self.categories = yml['categories'] if 'commands' in yml: self.commands_info = yml['commands'] if 'events' in yml: self.events_info = yml['events']
def __order_subjects__(self): global_order = sorted(self.subjects.items(), key=itemgetter(1)) sort = list(global_order) for year in range(self.years): self.ordered_by_year[year] = list(takewhile(lambda x: x[1].year == year+1, sort)) sort = list(dropwhile(lambda x: x[1].year == year+1, sort)) return global_order
def get_repo_line(result): stdout = (line.strip() for line in result.stdout_lines) stdout = itertools.dropwhile(lambda item: not item.startswith("***"), stdout) stdout = list(stdout) if not stdout: raise ValueError("Server {0} has no installed ceph-common".format( result.srv.ip)) yield result.srv, stdout[1].split(" ", 1)[1]
def get_function_body(func): source_lines = inspect.getsourcelines(func)[0] source_lines = dropwhile(lambda x: x.startswith('@'), source_lines) def_line = next(source_lines).strip() if def_line.startswith('def ') and def_line.endswith(':'): # Handle functions that are not one-liners first_line = next(source_lines) # Find the indentation of the first line indentation = len(first_line) - len(first_line.lstrip()) return [first_line[indentation:]] + [line[indentation:] for line in source_lines] else: # Handle single line functions return [def_line.rsplit(':')[-1].strip()]
def get_imports(obj): source_lines = inspect.getsourcelines(sys.modules[obj.__module__])[0] source_lines = dropwhile(lambda x: x.startswith('@'), source_lines) import_lines = filter(lambda x: x.startswith('import ') or x.startswith('from '), source_lines) return list(import_lines)
def lstrip(self): """ Remove the left zeros and shift the exponent Warning: 0 Not supported """ result = Number(list(reversed(list( itertools.dropwhile(lambda x: x == 0, reversed(self.val)) ))), self.exponent, self.sign) if len(result.val) == 0: result.val = [0] result.sign = +1 result.exponent -= len(self.val) - len(result.val) return result
def data_deal_function(): # compress()????????????.????????????????,??????????????. # ????????????????True????? # ??,????????????.???????Python??????????,?????? # itertools.filterfalse()???????????,??????.???????????False???True??? for item in it.compress([1, 2, 3, 4, 5], [False, True, False, 0, 1]): print(item) # dropwhile()?takewhile()?????????????.??????????????????????????,???????????????. # dropwhile()??????????????????????False.?takewhile()??????????False # ??,????????????????????????(??dropwhile????,????????????,?takewhile?????????) def __single_digit(n): return n < 10 for n in it.dropwhile(__single_digit, range(20)): print(n, end=" ") for n in it.takewhile(__single_digit, range(20)): print(n, end=" ") # accumulate()?????????????????????????????(??????,????????????).??,??????? # [1,2,3,4]??,???result1?1.?????????result1?2??result2,????.????????functools???reduce()???? for n in it.accumulate([1, 2, 3, 4, ]): print(n, end=" ")
def datapoints(self, query, maxdatapoints=None): if query.statistics and (query.statistics not in ['Average', 'Sum', 'SampleCount', 'Maximum', 'Minimum']): raise InvalidMetricQuery("Query statistic invalid value `{}`".format(query.statistics)) elif query.statistics: statistics = query.statistics else: statistics = "Average" if maxdatapoints: # Calculate the Period where the number of datapoints # returned are less than maxdatapoints. # Get the first granularity that suits for return the maxdatapoints seconds = (query.get_until() - query.get_since()).total_seconds() period = next(dropwhile(lambda g: seconds / g > maxdatapoints, count(60, 60))) else: period = 60 # get a client using the region given by the query, or if it # is None using the one given by the datasource or the profile client = self._cw_client(region=query.region) kwargs = { 'Namespace': query.namespace, 'MetricName': query.metricname, 'StartTime': query.get_since(), 'EndTime': query.get_until(), 'Period': period, 'Dimensions': [{ 'Name': query.dimension_name, 'Value': query.dimension_value, }], 'Statistics': [statistics] } datapoints = self._cw_call(client, "get_metric_statistics", **kwargs) return [(point[statistics], time.mktime(point['Timestamp'].timetuple())) for point in datapoints['Datapoints']]
def getMenu(today): menu = '' try: day = today.weekday() is_this_week = lambda date: datetime.strptime(date, '%Y-%m-%dT%H:%M:%S%z').date() > today.date() - timedelta(days=7) is_today = lambda date: datetime.strptime(date, '%Y-%m-%dT%H:%M:%S%z').date() == today.date() ignore_hashtags = lambda post: " ".join(word.lower() for word in post.split() if word[0] != "#") daily_menu_filter = lambda post: is_today(post['created_time']) \ and "menü" in post['message'].lower() weekly_menu_filter = lambda post: is_this_week(post['created_time']) \ and days_lower[day] in ignore_hashtags(post['message']) weekly_menu = get_filtered_fb_post(FB_ID, weekly_menu_filter) if weekly_menu: menu_post = dropwhile(lambda line: days_lower[day] not in line.lower(), skip_empty_lines(weekly_menu.split('\n'))) else: menu_post = get_filtered_fb_post(FB_ID, daily_menu_filter).split('\n') menu_post = list(menu_post) for i, line in enumerate(menu_post): if "A:" in line: menu = "<br>".join((menu_post[i-1], menu_post[i], menu_post[i+1])) break except: pass return { 'name': 'Kompót', 'url': FB_PAGE, 'menu': menu }