我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.List()。
def run_batch_query(self, query: str, labels: List[str] = None, params: List[dict] = None, chunk_size: int = 1000): node_labels = ':{0}'.format(':'.join(labels)) \ if labels else '' query_template = Template("UNWIND {params} AS params " + query) labeled_query = query_template.safe_substitute(labels=node_labels) chunk_count = 1 def batch(): for i in range(0, len(params), chunk_size): logger.debug('starting chunk %s', i) result = (yield labeled_query, dict(params=params[i:i + chunk_size])) logger.debug(result) result = self.run_in_tx(batch(), chunk_count=chunk_count) return result
def _send_receive(self, nummsgs: int, outformat: str='json', dataupdate: Optional[Dict[AnyStr, Any]]=None, restart_data: bool=True) -> List[Response]: if restart_data: self._restart_data(outformat) if dataupdate: self.data.update(dataupdate) self._add_to_buffer(nummsgs, outformat) self.sendbuffer.seek(0) processor, _ = get_processor_instance( outformat, custom_outbuffer=self.recvbuffer, custom_inbuffer=self.sendbuffer ) processor.process_requests(self.sendbuffer) return self._loadResults(outformat)
def __init__( self, description: str = None, pre_hooks: (List, Tuple) = None, post_hooks: (List, Tuple) = None ): self.result = None self.total = None self.success = None self.errors = None self.params = None self.output = None self.pagination = None self.limit = None self.offset = None self.app = None self.settings = None self.description = description self.pre_hooks = pre_hooks self.post_hooks = post_hooks self.meta = {}
def calc_norm_lp_div_scores( log_prob_scores: List[float], unigram_scores: List[float]) -> List[Union[None, float]]: r""" .. math: \frac{% \log P_\text{model}\left(\xi\right) }{% \log P_\text{unigram}\left(\xi\right) } >>> '{:.3f}'.format(calc_norm_lp_div_scores([-14.7579], [-35.6325])[0]) '-0.414' """ results = [] for log_prob, unigram_score in zip(log_prob_scores, unigram_scores): if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05): x = None else: x = (-1.0) * float(log_prob) / float(unigram_score) results.append(x) return results
def calc_norm_lp_sub_scores( log_prob_scores: List[float], unigram_scores: List[float]) -> List[Union[None, float]]: r""" .. math: \log P_\text{model}\left(\xi\right) - \log P_\text{unigram}\left(\xi\right) >>> '{:.3f}'.format(calc_norm_lp_sub_scores([-14.7579], [-35.6325])[0]) '20.875' """ results = [] for log_prob, unigram_score in zip(log_prob_scores, unigram_scores): if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05): x = None else: x = float(log_prob) - float(unigram_score) results.append(x) return results
def calc_slor_scores(norm_lp_sub_scores: List[float], lengths: List[int]) -> List[Union[None, float]]: r"""Calculate SLOR (Syntactic Log-Odds Ratio) .. math: \frac{% \log P_\text{model}\left(\xi\right) - \log P_\text{unigram}\left(\xi\right) }{% \text{length}\left(\xi\right) } >>> '{:.3f}'.format(calc_slor_scores([20.8746], [4])[0]) '5.219' """ results = [] for norm_lp_sub_score, length in zip(norm_lp_sub_scores, lengths): if (norm_lp_sub_score is None) or length == 0: x = None else: x = norm_lp_sub_score / length results.append(x) return results
def scrape_all_posts_unflat(url: str, verbose: bool, cache: bool) -> List[List['Post']]: unflat_posts = [] fget = requests.get if not cache else memory.cache(requests.get) page = fget(url).text # Downloads the page twice. # ^ we can scrape_page(page), .append, [urls - url], but KISS. n_of_pages = pq(page).find('.pagejump > a').eq(0).text().strip().split(' ')[-1] # Gets '10' from 'Page 1 of 10' # If there is only one page if(n_of_pages is ''): urls = [url] else: url_prefix_match = re.match('(.*)(page-[0-9]+)', url) url_prefix = url if url_prefix_match is None else url_prefix_match.group(1) if(url_prefix[-1] != '/'): url_prefix += '/' urls = [(url_prefix + 'page-' + str(n + 1)) for n in range(int(n_of_pages))] with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: fscrape = scrape_posts if not cache else memory.cache(scrape_posts, ignore=['verbose']) futures = [executor.submit(fscrape, url, verbose) for url in urls] results, _ = concurrent.futures.wait (futures) for result in results: unflat_posts.append(result.result()) return unflat_posts
def get_posts(*urls : List[str], **kwargs): ''' Args: *urls (List[str]): Url, where each url is a unique thread verbose (bool): Verbosity cache (bool): Cache results across calls disambiguate_threads (bool): When scraping multiple threads will add url to html of the first post to show thread. ''' posts_unflat = [] disambiguate_threads = True if 'disambiguate_threads' not in kwargs else kwargs['disambiguate_threads'] kwargs.pop('disambiguate_threads', None) for url in urls: posts = scrape_all_posts(url, **kwargs) # Displaying a link title to show which posts come from which thread if # we are getting multiple threads. if(disambiguate_threads and len(urls) > 1): posts[0].html = ''' <div style="background-color: #3B6796;"> <a href="{0}"><h1 style="font-size: 40px; color: white;">{0}</h1></a> </div>'''.format(url) + posts[0].html posts_unflat.append(posts) return [p for slist in posts_unflat for p in slist]
def querySelectorAll(self, selector: str) -> List['ElementHandle']: """Get all elelments which matches `selector`.""" remoteObject = await self._rawEvaluate( 'selector => Array.from(document.querySelectorAll(selector))', selector, ) response = await self._client.send('Runtime.getProperties', { 'objectId': remoteObject.get('objectId', ''), 'ownProperties': True, }) properties = response.get('result', {}) result: List[ElementHandle] = [] releasePromises = [helper.releaseObject(self._client, remoteObject)] for prop in properties: value = prop.get('value', {}) if prop.get('enumerable') and value.get('subtype') == 'node': result.append(ElementHandle(self._client, value, self._mouse, self._touchscreen)) else: releasePromises.append( helper.releaseObject(self._client, value)) await asyncio.gather(*releasePromises) return result #: Alias to querySelector
def __init__(self, exe, cache=None): # type: (str, Optional[Cache]) -> None if not os.path.isabs(exe): exe = which(exe) # type: ignore self.exe = unifilename(exe) self.cache = cache self._styledefinition = styledef_make() self.allow_encoding_change = False self.languages = [] # type: List[str] self.initial_style = style_make() # The are deleted after one call to minimize_errors self.globaltempfiles = set() # type: Set[str] # These are deleted after each round of attempts self.tempfiles = set() # type: Set[str] self.keeptempfiles = False self.version_string = formatter_version(exe)
def can_process_in_parallel(self, filenames): # type: (List[str]) -> bool """ Returns False if one of the files is too large to be processed in parallel with another file. Returns True if all files are small enough. """ result = True for filename in filenames: sourcedata = get_cached_file(filename) if len(sourcedata) > MAX_FILESIZE_FOR_MULTIPROCESSING: reportwarning('Warning: %s has a size of %s bytes.' % (filename, len(sourcedata))) reportwarning(' This may cause memory swapping so we only use' ' a single processor core.') result = False return result
def nested_derivations(self, style): # type: (Style) -> List[Style] options = [('BreakBeforeBraces', 'Custom')] nstyles = [] for optionname, value in options: optdef = styledef_option(self.styledefinition, optionname) # We can only use this nested option if the clang version in use supports it. if optdef is None: continue if value not in option_configs(optdef): continue if style.get(optionname) != value: nstyle = Style(copy.deepcopy(style)) set_option(nstyle, optionname, value) nstyles.append(nstyle) return nstyles
def variants_for(self, option): # type: (Option) -> List[Style] def kvpairs(vs): # type: (Iterable[OptionValue]) -> List[Style] return stylevariants(stylename, vs) stylename = option_name(option) styletype = option_type(option) configs = option_configs(option) if configs: return kvpairs(configs) if stylename == self.columnlimitname: return kvpairs(self.column_limit_candidates) if styletype == 'int': return kvpairs([0, 1, 2, 4, 8, 16]) return []
def cmdargs_for_style(self, formatstyle, filename=None): # type: (Style, Optional[str]) -> List[str] assert isinstance(formatstyle, Style) configdata = bytestr(self.styletext(formatstyle)) sha = shahex(configdata) cfg = os.path.join(tempfile.gettempdir(), 'whatstyle_uncrustify_%s.cfg' % sha) if not self.tempfile_exists(cfg): writebinary(cfg, configdata) self.add_tempfile(cfg) cmdargs = ['-c', cfg] # The filename extension might be ambiguous so we choose from the languages # registered in identify_language. if self.languages: lang = self.languages[0] cmdargs.extend(['-l', lang]) return cmdargs
def register_options(self): # type: () -> None """Parse options from text like this: Preferences: [+|-]alignArguments Enable/disable ... ... [+|-]spacesWithinPatternBinders Enable/disable ... -alignSingleLineCaseStatements.maxArrowIndent=[1-100] Set Maximum number ... -indentSpaces=[1-10] Set Number of spaces ... """ exeresult = run_executable(self.exe, ['--help'], cache=self.cache) options = [] text = unistr(exeresult.stdout) for m in re.finditer(r'^ (\[\+\|-\]|-)([a-z][a-zA-Z.]+)(?:=\[(\d+)-(\d+)\])?', text, re.MULTILINE): optionprefix, optionname, start, end = m.groups() if start is None: optiontype = 'bool' configs = [True, False] # type: List[OptionValue] else: optiontype = 'int' configs = list(inclusiverange(int(start), int(end))) options.append(option_make(optionname, optiontype, configs)) self.styledefinition = styledef_make(options)
def cmdargs_for_style(self, formatstyle, filename=None): # type: (Style, Optional[str]) -> List[str] assert isinstance(formatstyle, Style) configdata = bytestr(self.styletext(formatstyle)) sha = shahex(configdata) cfg = os.path.join(tempfile.gettempdir(), 'whatstyle_rustfmt_%s/%s' % (sha, self.configfilename)) try: dirpath = os.path.dirname(cfg) os.makedirs(dirpath) self.add_tempfile(dirpath) except OSError as exc: if exc.errno != errno.EEXIST: raise if not self.tempfile_exists(cfg): writebinary(cfg, configdata) self.add_tempfile(cfg) cmdargs = ['--config-path', cfg] return cmdargs
def mget(self, keys): # type: (List[str]) -> List[Optional[bytes]] rows = [] if self.support_mget: try: with self.conn as conn: for somekeys in grouper(self.sqlite_limit_variable_number, keys): keylist = list(somekeys) questionmarks = ','.join(['?'] * len(keylist)) sql = self.kv_mget % questionmarks for row in conn.execute(sql, keylist): rows.append(row) resultdict = dict(rows) # type: Dict[str, bytes] rget = resultdict.get return [rget(k) for k in keys] except sqlite3.OperationalError: self.support_mget = False return [self.__get(k) for k in keys]
def mget(self, keys): # type: (List[str]) -> List[Optional[bytes]] if not keys: return [] cached = [] uncached = [] # type: List[Tuple[int, Optional[bytes]]] contentkeys = super(DedupKeyValueStore, self).mget(keys) for idx, contentkey in enumerate(contentkeys): if contentkey is None: uncached.append((idx, None)) else: sha = binary_type(contentkey) cached.append((idx, unistr(sha))) if not cached: return [None for _, contentkey in uncached] indices, existing_keys = zip(*cached) existing_values = self.kvstore.mget(existing_keys) idx_value_pairs = sorted(uncached + list(zip(indices, existing_values))) return list([value for _, value in idx_value_pairs])
def table_iter(pairs, # type: List[BytesPair] uniqueidx, # type: int enc='utf-8', # type: str fromdesc='', # type: str todesc='', # type: str numlines=2, # type: int wrapcolumn=0 # type: int ): # type: (...) -> Iterator[Tuple[str, str, str]] htmldiffer = HtmlMultiDiff(tabsize=8, wrapcolumn=wrapcolumn) htmldiffer.uniqueidx = uniqueidx table = htmldiffer.table_from_pairs(pairs, enc, fromdesc=fromdesc, todesc=todesc, context=True, numlines=numlines) for tablestart, tbody, tableend in iter_tbodies(table): yield tablestart, tbody, tableend
def update_evaluations(formatter, # type: CodeFormatter evaluations, # type: List[AttemptResult] finished_styles, # type: List[AttemptResult] bestdist # type: Sequence[int] ): # type: (...) -> Tuple[bool, bool, Sequence[int]] attemptresult = heapq.heappop(evaluations) nested_round = False if bestdist is None or (distquality(attemptresult.distance) < distquality(bestdist)): bestdist = attemptresult.distance heapq.heappush(evaluations, attemptresult) else: # We found a style that could no longer be improved by adding a single option value. heapq.heappush(finished_styles, attemptresult) nested_styles = formatter.nested_derivations(attemptresult.formatstyle) if not nested_styles: # This formatstyle does not unlock more options. return True, nested_round, bestdist # Restart the optimization from scratch with the attemptresult augmented with # every nested option as seed styles. bestdist = None ndist = (HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE) evaluations[:] = [AttemptResult(ndist, s) for s in nested_styles] nested_round = True return False, nested_round, bestdist
def avg_linelength_diffs(diffargs): # type: (List[Tuple[str, bytes]]) -> Iterable[int] """Returns the nudged absolute line length differences. """ for filename1, content2 in diffargs: linelen1 = get_num_lines(filename1) filelen1 = len(get_cached_file(filename1)) avg1 = 0.0 if linelen1 > 0: avg1 = float(filelen1) / linelen1 linelen2 = count_content_lines(content2) filelen2 = len(content2) avg2 = 0.0 if linelen2 > 0: avg2 = float(filelen2) / linelen2 yield int(abs(10000.0 * (avg1 - avg2)))
def command_add(self, bot: Bot, update: Update, args: List[str]): usage_string = ('Nothing was added.\n' 'Usage: `/add <subreddit> <score_limit> [<subreddit> <score_limit>]*`') if len(args) == 0 or len(args) % 2 != 0: update.message.reply_text(usage_string, parse_mode=ParseMode.MARKDOWN) return subreddits = {} while args: name, score = args[:2] args = args[2:] if score.isdecimal(): score = int(score) else: update.message.reply_text(usage_string) return subreddits[name] = score self.store.add(subreddits) self.command_list(bot, update)
def ripple(a: List[bool], b: List[bool], cin: bool = False, invert_b: bool = False) -> List[bool]: # allocate result bits result = list(range(0, Memory.REGISTER_WIDTH)) # type: List[bool] carry_wire = cin # type: bool # go backwards to preserve carry propagation for i in range(max(len(a), len(b)) - 1, -1, -1): # sign extend, should not be needed as long as the memory row has all 8 bits filled out # if i < 8 - len(a): # a_bit = a[0] # b_bit = b[i] # elif i < 8 - len(b): # a_bit = a[i] # b_bit = b[0] # else: a_bit = a[i] b_bit = b[i] if invert_b: b_bit = not b_bit result[i], carry_wire = full_adder(carry_wire, a_bit, b_bit) return result
def import_nodes(self, nodes: List[dict], labels: List[str] = None, chunk_size: int = 1000): node_labels = ':{0}'.format(':'.join(labels)) \ if labels else '' query = self.import_nodes_template.safe_substitute(labels=node_labels) chunk_count = 1 def batch(): for i in range(0, len(nodes), chunk_size): logger.debug('starting chunk %s', i) result = (yield query, dict(props=nodes[i:i + chunk_size])) logger.debug(result) result = self.run_in_tx(batch(), chunk_count=chunk_count) return result
def parse_name(self, name: str) -> List[str]: """ splits a name into parts separated by ., _, camel casing and similar :param name: potentially human name :return: list of name parts """ parsed_name = ParsedName(**su.empty_dict(PARSED_NAME_FIELDS)) lower_name = name.lower() if lower_name in self.role_names: parsed_name.name_type = self.role_names[lower_name] parsed_name.name = lower_name else: parsed_name.name_type = 'proper' parsed_name.name = cleanup_proper_name(name) return parsed_name
def parse_lexeme(self, lexeme: str) -> List: try: clean_lexeme = strip_noise(lexeme) low_lexeme = clean_lexeme.lower() if low_lexeme in self.terms: lexeme_parts = [clean_lexeme] else: lexeme_parts = split_lexeme(lexeme) parsed_lexeme = [] for lexeme_part in lexeme_parts: segments = self.segment_into_words(lexeme, lexeme_part) parsed_lexeme.append((lexeme_part, segments)) return parsed_lexeme except Exception: logger.exception('failed to parse lexeme {}'.format(lexeme)) return [(lexeme, [SegmentMap('miss', lexeme, None, lexeme, [0])])]
def _loadResults(self, format_: str) -> List[Response]: """Read all msgs from the recvbuffer""" self.recvbuffer.seek(0) res: List[Response] = [] res = [doc for doc in self._extract_docs(self.recvbuffer)] return res
def _get_files_from_dir(self, path: str, ext: str) -> List[str]: """ Return list of files with .ext :param path: :param ext: :return: """ return [x for x in os.listdir(path) if x.endswith('.{}'.format(ext))]
def execute(self, db_name: str, query: str, values: List, _type: str): """ Execute SQL query in connection pool """ warnings.warn("Use single methods!", DeprecationWarning) if _type not in ('select', 'insert', 'update', 'delete'): raise RuntimeError( 'Wrong request type {}'.format(_type) ) if not self.dbs[db_name]['master']: raise RuntimeError( 'db {} master is not initialized'.format(db_name) ) pool = self.dbs[db_name]['master'] if _type == 'select' and 'slave' in self.dbs[db_name]: pool = self.dbs[db_name]['slave'] async with pool.acquire() as conn: async with conn.cursor(cursor_factory=DictCursor) as cursor: await cursor.execute(query, values) if _type == 'select': data = await cursor.fetchall() else: data = cursor.rowcount return data
def select(self, query: str, values: Union[List, Dict], db_name: str = 'default') -> List[DictRow]: return await self._select(query=query, values=values, db_name=db_name)
def first(self, query: str, values: Union[List, Dict], db_name: str = 'default') -> Optional[DictRow]: return await self._first(query=query, values=values, db_name=db_name)
def insert(self, query: str, values: Union[List, Dict], db_name: str = 'default', returning: bool = False): return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
def delete(self, query: str, values: Union[List, Dict], db_name: str = 'default'): return await self._execute(query=query, values=values, db_name=db_name)
def _execute(self, query: str, values: Union[List, Dict], db_name: str = 'default', returning: bool = False): pool = self.dbs[db_name]['master'] if pool is None: raise RuntimeError('db {} master is not initialized'.format(db_name)) async with pool.acquire() as conn: async with conn.cursor(cursor_factory=DictCursor) as cursor: await cursor.execute(query, values) if returning: return await cursor.fetchone() else: return cursor.rowcount
def _select(self, query: str, values: Union[List, Dict], db_name: str = 'default'): dbs = self.dbs[db_name] pool = dbs.get('slave') or dbs.get('master') if pool is None: raise RuntimeError('db {} master is not initialized'.format(db_name)) async with pool.acquire() as conn: async with conn.cursor(cursor_factory=DictCursor) as cursor: await cursor.execute(query, values) return await cursor.fetchall()
def _first(self, query: str, values: Union[List, Dict], db_name: str = 'default'): dbs = self.dbs[db_name] pool = dbs.get('slave') or dbs.get('master') if pool is None: raise RuntimeError('db {} master is not initialized'.format(db_name)) async with pool.acquire() as conn: async with conn.cursor(cursor_factory=DictCursor) as cursor: await cursor.execute(query, values) return await cursor.fetchone()
def posts_by_user(user: User, limit: Optional[int] = None) -> List[Post]: return list(islice(user_posts[user], limit))
def posts_for_user(user: User, limit: Optional[int] = None) -> List[Post]: relevant = merge(*[user_posts[u] for u in following[user]], reverse=True) return list(islice(relevant, limit))
def get_followers(user: User) -> List[User]: return sorted(followers[user])
def get_followed(user: User) -> List[User]: return sorted(following[user])
def search(phrase:str, limit: Optional[int] = None) -> List[Post]: # XXX this could benefit from caching and from preindexing return list(islice((post for post in posts if phrase in post.text), limit))
def compute_centroids(groups: Iterable[Sequence[Point]]) -> List[Centroid]: 'Compute the centroid of each group' return [tuple(map(mean, transpose(group))) for group in groups]
def k_means(data: Iterable[Point], k:int=2, iterations:int=10) -> List[Point]: 'Return k-centroids for the data' data = list(data) centroids = sample(data, k) for i in range(iterations): labeled = assign_data(centroids, data) centroids = compute_centroids(labeled.values()) return centroids
def __init__(self, sentence: str, chunks: List[Chunk], surfaces: List[str]): self.sentence = sentence self.chunks = chunks self.surfaces = surfaces self.depth = self.depth()
def __init__(self, text: str, delimiter: str, rnnlm_model_path: str): self.text = text self.sentences = split_text(text, delimiter) # type: List[str] lengths, self.tss = tokenize(self.sentences) if not os.path.isfile(rnnlm_model_path): raise FileNotFoundError(errno.ENOENT, os.strerror(errno.ENOENT), rnnlm_model_path) self.rnnlm_model_path = rnnlm_model_path self.word_freq, self.n_total_words = self._load_word_freq(threshold=1) log_prob_scores = \ self._calc_log_prob_scores() unigram_scores = \ self._calc_unigram_scores() mean_lp_scores = \ calc_mean_lp_scores(log_prob_scores, lengths) norm_lp_div_scores = \ calc_norm_lp_div_scores(log_prob_scores, unigram_scores) norm_lp_sub_scores = \ calc_norm_lp_sub_scores(log_prob_scores, unigram_scores) slor_scores = \ calc_slor_scores(norm_lp_sub_scores, lengths) self.log_prob = average(log_prob_scores) self.mean_lp = average(mean_lp_scores) self.norm_lp_div = average(norm_lp_div_scores) self.norm_lp_sub = average(norm_lp_sub_scores) self.slor = average(slor_scores)
def _calc_log_prob_scores(self) -> List[Union[None, float]]: """Get log likelihood scores by calling RNNLM """ textfile = tempfile.NamedTemporaryFile(delete=True) content = '\n'.join([''.join(ts) for ts in self.tss]) + '\n' textfile.write(str.encode(content)) textfile.seek(0) command = ['rnnlm', '-rnnlm', self.rnnlm_model_path, '-test', textfile.name] process = Popen(command, stdout=PIPE, stderr=PIPE) output, err = process.communicate() lines = [line.strip() for line in output.decode('UTF-8').split('\n') if line.strip() != ''] scores = [] for line in lines: if line == const.OUT_OF_VOCABULARY: scores.append(None) else: try: score = float(line) scores.append(score) except ValueError: pass textfile.close() return scores
def _calc_unigram_scores(self) -> List[float]: unigram_scores = [] for ts in self.tss: unigram_score = 0.0 for t in ts: n = float(self.n_total_words) x = float(self.word_freq.get(t, self.word_freq['<unk/>'])) unigram_score += math.log(x / n) unigram_scores.append(unigram_score) return unigram_scores
def average(xs: List[Union[None, float]]) -> float: """Calculate the arithmetic mean of the given values (possibly None) >>> '{:.2f}'.format(average([None, 1.0, 2.0])) '1.50' """ return numpy.mean([x for x in xs if x is not None])
def tokenize(sentences: List[str]) -> Tuple[List[int], List[List[str]]]: tokenizer = Tokenizer() lengths = [] texts = [] for s in sentences: result = tokenizer.tokenize(s) surfaces = [t.surface for t in result] lengths.append(len(surfaces)) text = ' '.join(surfaces) texts.append(text) return lengths, texts
def split_text(text: str, delimiter: str='\n') -> List[str]: s = [s.strip() for s in text.split(delimiter) if len(s.strip()) > 0] return s