Python typing 模块,List() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.List()

项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def run_batch_query(self, query: str,
                        labels: List[str] = None,
                        params: List[dict] = None,
                        chunk_size: int = 1000):
        node_labels = ':{0}'.format(':'.join(labels)) \
            if labels else ''
        query_template = Template("UNWIND {params} AS params " + query)
        labeled_query = query_template.safe_substitute(labels=node_labels)

        chunk_count = 1

        def batch():
            for i in range(0, len(params), chunk_size):
                logger.debug('starting chunk %s', i)
                result = (yield labeled_query,
                                dict(params=params[i:i + chunk_size]))
                logger.debug(result)

        result = self.run_in_tx(batch(), chunk_count=chunk_count)
        return result
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def _send_receive(self, nummsgs: int, outformat: str='json',
                      dataupdate: Optional[Dict[AnyStr, Any]]=None,
                      restart_data: bool=True) -> List[Response]:
        if restart_data:
            self._restart_data(outformat)

        if dataupdate:
            self.data.update(dataupdate)

        self._add_to_buffer(nummsgs, outformat)
        self.sendbuffer.seek(0)

        processor, _ = get_processor_instance(
                outformat,
                custom_outbuffer=self.recvbuffer,
                custom_inbuffer=self.sendbuffer
        )
        processor.process_requests(self.sendbuffer)
        return self._loadResults(outformat)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def __init__(
            self,
            description: str = None,
            pre_hooks: (List, Tuple) = None,
            post_hooks: (List, Tuple) = None
    ):
        self.result = None
        self.total = None
        self.success = None
        self.errors = None
        self.params = None
        self.output = None
        self.pagination = None
        self.limit = None
        self.offset = None
        self.app = None
        self.settings = None
        self.description = description
        self.pre_hooks = pre_hooks
        self.post_hooks = post_hooks
        self.meta = {}
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def calc_norm_lp_div_scores(
        log_prob_scores: List[float],
        unigram_scores: List[float]) -> List[Union[None, float]]:
    r"""
    .. math:
        \frac{%
            \log P_\text{model}\left(\xi\right)
        }{%
            \log P_\text{unigram}\left(\xi\right)
        }
    >>> '{:.3f}'.format(calc_norm_lp_div_scores([-14.7579], [-35.6325])[0])
    '-0.414'
    """
    results = []
    for log_prob, unigram_score in zip(log_prob_scores, unigram_scores):
        if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05):
            x = None
        else:
            x = (-1.0) * float(log_prob) / float(unigram_score)
        results.append(x)
    return results
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def calc_norm_lp_sub_scores(
        log_prob_scores: List[float],
        unigram_scores: List[float]) -> List[Union[None, float]]:
    r"""
    .. math:
        \log P_\text{model}\left(\xi\right)
            - \log P_\text{unigram}\left(\xi\right)
    >>> '{:.3f}'.format(calc_norm_lp_sub_scores([-14.7579], [-35.6325])[0])
    '20.875'
    """

    results = []
    for log_prob, unigram_score in zip(log_prob_scores, unigram_scores):
        if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05):
            x = None
        else:
            x = float(log_prob) - float(unigram_score)
        results.append(x)
    return results
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def calc_slor_scores(norm_lp_sub_scores: List[float],
                     lengths: List[int]) -> List[Union[None, float]]:
    r"""Calculate SLOR (Syntactic Log-Odds Ratio)
    .. math:
        \frac{%
            \log P_\text{model}\left(\xi\right)
                - \log P_\text{unigram}\left(\xi\right)
        }{%
            \text{length}\left(\xi\right)
        }
    >>> '{:.3f}'.format(calc_slor_scores([20.8746], [4])[0])
    '5.219'
    """

    results = []
    for norm_lp_sub_score, length in zip(norm_lp_sub_scores, lengths):
        if (norm_lp_sub_score is None) or length == 0:
            x = None
        else:
            x = norm_lp_sub_score / length
        results.append(x)
    return results
项目:pylongecity    作者:cyrbon    | 项目源码 | 文件源码
def scrape_all_posts_unflat(url: str, verbose: bool, cache: bool) -> List[List['Post']]:
    unflat_posts = []

    fget = requests.get if not cache else memory.cache(requests.get)
    page = fget(url).text # Downloads the page twice.
    # ^ we can scrape_page(page), .append, [urls - url], but KISS.
    n_of_pages = pq(page).find('.pagejump > a').eq(0).text().strip().split(' ')[-1] # Gets '10' from 'Page 1 of 10'

    # If there is only one page
    if(n_of_pages is ''):
        urls = [url]
    else:
        url_prefix_match = re.match('(.*)(page-[0-9]+)', url)
        url_prefix = url if url_prefix_match is None else url_prefix_match.group(1)
        if(url_prefix[-1] != '/'): url_prefix += '/'
        urls = [(url_prefix + 'page-' + str(n + 1)) for n in range(int(n_of_pages))]

    with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
        fscrape = scrape_posts if not cache else memory.cache(scrape_posts, ignore=['verbose'])
        futures = [executor.submit(fscrape, url, verbose) for url in urls]
        results, _ = concurrent.futures.wait (futures)
        for result in results:
            unflat_posts.append(result.result())
    return unflat_posts
项目:pylongecity    作者:cyrbon    | 项目源码 | 文件源码
def get_posts(*urls : List[str], **kwargs):
    '''
    Args:
        *urls (List[str]): Url, where each url is a unique thread
        verbose (bool): Verbosity
        cache (bool): Cache results across calls
        disambiguate_threads (bool): When scraping multiple threads will add url to html of the first post to show thread.
    '''
    posts_unflat = []
    disambiguate_threads = True if 'disambiguate_threads' not in kwargs else kwargs['disambiguate_threads']
    kwargs.pop('disambiguate_threads', None)
    for url in urls:
        posts = scrape_all_posts(url, **kwargs)
        # Displaying a link title to show which posts come from which thread if
        # we are getting multiple threads.
        if(disambiguate_threads and len(urls) > 1):
            posts[0].html = '''
            <div style="background-color: #3B6796;">
                <a href="{0}"><h1 style="font-size: 40px; color: white;">{0}</h1></a>
            </div>'''.format(url) + posts[0].html
        posts_unflat.append(posts)

    return [p for slist in posts_unflat for p in slist]
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def querySelectorAll(self, selector: str) -> List['ElementHandle']:
        """Get all elelments which matches `selector`."""
        remoteObject = await self._rawEvaluate(
            'selector => Array.from(document.querySelectorAll(selector))',
            selector,
        )
        response = await self._client.send('Runtime.getProperties', {
            'objectId': remoteObject.get('objectId', ''),
            'ownProperties': True,
        })
        properties = response.get('result', {})
        result: List[ElementHandle] = []
        releasePromises = [helper.releaseObject(self._client, remoteObject)]
        for prop in properties:
            value = prop.get('value', {})
            if prop.get('enumerable') and value.get('subtype') == 'node':
                result.append(ElementHandle(self._client, value, self._mouse,
                                            self._touchscreen))
            else:
                releasePromises.append(
                    helper.releaseObject(self._client, value))
        await asyncio.gather(*releasePromises)
        return result

    #: Alias to querySelector
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def __init__(self, exe, cache=None):
        # type: (str, Optional[Cache]) -> None
        if not os.path.isabs(exe):
            exe = which(exe)  # type: ignore
        self.exe = unifilename(exe)
        self.cache = cache
        self._styledefinition = styledef_make()
        self.allow_encoding_change = False
        self.languages = []  # type: List[str]

        self.initial_style = style_make()
        # The are deleted after one call to minimize_errors
        self.globaltempfiles = set()  # type: Set[str]
        # These are deleted after each round of attempts
        self.tempfiles = set()  # type: Set[str]
        self.keeptempfiles = False
        self.version_string = formatter_version(exe)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def can_process_in_parallel(self, filenames):
        # type: (List[str]) -> bool
        """
        Returns False if one of the files is too large to be processed in parallel
        with another file.
        Returns True if all files are small enough.
        """
        result = True
        for filename in filenames:
            sourcedata = get_cached_file(filename)
            if len(sourcedata) > MAX_FILESIZE_FOR_MULTIPROCESSING:
                reportwarning('Warning: %s has a size of %s bytes.' % (filename,
                                                                       len(sourcedata)))
                reportwarning('  This may cause memory swapping so we only use'
                              ' a single processor core.')
                result = False
        return result
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def nested_derivations(self, style):
        # type: (Style) -> List[Style]
        options = [('BreakBeforeBraces', 'Custom')]
        nstyles = []
        for optionname, value in options:
            optdef = styledef_option(self.styledefinition, optionname)
            # We can only use this nested option if the clang version in use supports it.
            if optdef is None:
                continue
            if value not in option_configs(optdef):
                continue
            if style.get(optionname) != value:
                nstyle = Style(copy.deepcopy(style))
                set_option(nstyle, optionname, value)
                nstyles.append(nstyle)
        return nstyles
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def variants_for(self, option):
        # type: (Option) -> List[Style]

        def kvpairs(vs):
            # type: (Iterable[OptionValue]) -> List[Style]
            return stylevariants(stylename, vs)

        stylename = option_name(option)
        styletype = option_type(option)
        configs = option_configs(option)

        if configs:
            return kvpairs(configs)
        if stylename == self.columnlimitname:
            return kvpairs(self.column_limit_candidates)
        if styletype == 'int':
            return kvpairs([0, 1, 2, 4, 8, 16])
        return []
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def cmdargs_for_style(self, formatstyle, filename=None):
        # type: (Style, Optional[str]) -> List[str]
        assert isinstance(formatstyle, Style)
        configdata = bytestr(self.styletext(formatstyle))
        sha = shahex(configdata)
        cfg = os.path.join(tempfile.gettempdir(), 'whatstyle_uncrustify_%s.cfg' % sha)
        if not self.tempfile_exists(cfg):
            writebinary(cfg, configdata)
            self.add_tempfile(cfg)
        cmdargs = ['-c', cfg]
        # The filename extension might be ambiguous so we choose from the languages
        # registered in identify_language.
        if self.languages:
            lang = self.languages[0]
            cmdargs.extend(['-l', lang])
        return cmdargs
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def register_options(self):
        # type: () -> None
        """Parse options from text like this:
        Preferences:
          [+|-]alignArguments                                        Enable/disable ...
          ...
          [+|-]spacesWithinPatternBinders                            Enable/disable ...
          -alignSingleLineCaseStatements.maxArrowIndent=[1-100]      Set Maximum number ...
          -indentSpaces=[1-10]                                       Set Number of spaces ...
        """
        exeresult = run_executable(self.exe, ['--help'], cache=self.cache)
        options = []
        text = unistr(exeresult.stdout)
        for m in re.finditer(r'^  (\[\+\|-\]|-)([a-z][a-zA-Z.]+)(?:=\[(\d+)-(\d+)\])?', text,
                             re.MULTILINE):
            optionprefix, optionname, start, end = m.groups()
            if start is None:
                optiontype = 'bool'
                configs = [True, False]  # type: List[OptionValue]
            else:
                optiontype = 'int'
                configs = list(inclusiverange(int(start), int(end)))
            options.append(option_make(optionname, optiontype, configs))
        self.styledefinition = styledef_make(options)
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def cmdargs_for_style(self, formatstyle, filename=None):
        # type: (Style, Optional[str]) -> List[str]
        assert isinstance(formatstyle, Style)
        configdata = bytestr(self.styletext(formatstyle))
        sha = shahex(configdata)
        cfg = os.path.join(tempfile.gettempdir(),
                           'whatstyle_rustfmt_%s/%s' % (sha, self.configfilename))
        try:
            dirpath = os.path.dirname(cfg)
            os.makedirs(dirpath)
            self.add_tempfile(dirpath)
        except OSError as exc:
            if exc.errno != errno.EEXIST:
                raise
        if not self.tempfile_exists(cfg):
            writebinary(cfg, configdata)
            self.add_tempfile(cfg)
        cmdargs = ['--config-path', cfg]
        return cmdargs
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def mget(self, keys):
        # type: (List[str]) -> List[Optional[bytes]]
        rows = []
        if self.support_mget:
            try:
                with self.conn as conn:
                    for somekeys in grouper(self.sqlite_limit_variable_number, keys):
                        keylist = list(somekeys)
                        questionmarks = ','.join(['?'] * len(keylist))
                        sql = self.kv_mget % questionmarks
                        for row in conn.execute(sql, keylist):
                            rows.append(row)
                resultdict = dict(rows)  # type: Dict[str, bytes]
                rget = resultdict.get
                return [rget(k) for k in keys]
            except sqlite3.OperationalError:
                self.support_mget = False
        return [self.__get(k) for k in keys]
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def mget(self, keys):
        # type: (List[str]) -> List[Optional[bytes]]
        if not keys:
            return []
        cached = []
        uncached = []  # type: List[Tuple[int, Optional[bytes]]]
        contentkeys = super(DedupKeyValueStore, self).mget(keys)
        for idx, contentkey in enumerate(contentkeys):
            if contentkey is None:
                uncached.append((idx, None))
            else:
                sha = binary_type(contentkey)
                cached.append((idx, unistr(sha)))
        if not cached:
            return [None for _, contentkey in uncached]
        indices, existing_keys = zip(*cached)
        existing_values = self.kvstore.mget(existing_keys)
        idx_value_pairs = sorted(uncached + list(zip(indices, existing_values)))
        return list([value for _, value in idx_value_pairs])
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def table_iter(pairs,            # type: List[BytesPair]
               uniqueidx,        # type: int
               enc='utf-8',      # type: str
               fromdesc='',      # type: str
               todesc='',        # type: str
               numlines=2,       # type: int
               wrapcolumn=0      # type: int
               ):
    # type: (...) -> Iterator[Tuple[str, str, str]]
    htmldiffer = HtmlMultiDiff(tabsize=8, wrapcolumn=wrapcolumn)
    htmldiffer.uniqueidx = uniqueidx
    table = htmldiffer.table_from_pairs(pairs,
                                        enc,
                                        fromdesc=fromdesc,
                                        todesc=todesc,
                                        context=True,
                                        numlines=numlines)
    for tablestart, tbody, tableend in iter_tbodies(table):
        yield tablestart, tbody, tableend
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def update_evaluations(formatter,  # type: CodeFormatter
                       evaluations,  # type: List[AttemptResult]
                       finished_styles,  # type: List[AttemptResult]
                       bestdist  # type: Sequence[int]
                       ):
    # type: (...) -> Tuple[bool, bool, Sequence[int]]
    attemptresult = heapq.heappop(evaluations)
    nested_round = False
    if bestdist is None or (distquality(attemptresult.distance) < distquality(bestdist)):
        bestdist = attemptresult.distance
        heapq.heappush(evaluations, attemptresult)
    else:
        # We found a style that could no longer be improved by adding a single option value.
        heapq.heappush(finished_styles, attemptresult)
        nested_styles = formatter.nested_derivations(attemptresult.formatstyle)
        if not nested_styles:
            # This formatstyle does not unlock more options.
            return True, nested_round, bestdist
        # Restart the optimization from scratch with the attemptresult augmented with
        # every nested option as seed styles.
        bestdist = None
        ndist = (HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE, HUGE_DISTANCE)
        evaluations[:] = [AttemptResult(ndist, s) for s in nested_styles]
        nested_round = True
    return False, nested_round, bestdist
项目:whatstyle    作者:mikr    | 项目源码 | 文件源码
def avg_linelength_diffs(diffargs):
    # type: (List[Tuple[str, bytes]]) -> Iterable[int]
    """Returns the nudged absolute line length differences.
    """
    for filename1, content2 in diffargs:
        linelen1 = get_num_lines(filename1)
        filelen1 = len(get_cached_file(filename1))
        avg1 = 0.0
        if linelen1 > 0:
            avg1 = float(filelen1) / linelen1

        linelen2 = count_content_lines(content2)
        filelen2 = len(content2)
        avg2 = 0.0
        if linelen2 > 0:
            avg2 = float(filelen2) / linelen2

        yield int(abs(10000.0 * (avg1 - avg2)))
项目:telegram-autoposter    作者:vaniakosmos    | 项目源码 | 文件源码
def command_add(self, bot: Bot, update: Update, args: List[str]):
        usage_string = ('Nothing was added.\n'
                        'Usage: `/add <subreddit> <score_limit> [<subreddit> <score_limit>]*`')
        if len(args) == 0 or len(args) % 2 != 0:
            update.message.reply_text(usage_string, parse_mode=ParseMode.MARKDOWN)
            return

        subreddits = {}
        while args:
            name, score = args[:2]
            args = args[2:]
            if score.isdecimal():
                score = int(score)
            else:
                update.message.reply_text(usage_string)
                return
            subreddits[name] = score

        self.store.add(subreddits)
        self.command_list(bot, update)
项目:PicoSim    作者:Vadman97    | 项目源码 | 文件源码
def ripple(a: List[bool], b: List[bool], cin: bool = False, invert_b: bool = False) -> List[bool]:
    # allocate result bits
    result = list(range(0, Memory.REGISTER_WIDTH))  # type: List[bool]
    carry_wire = cin  # type: bool

    # go backwards to preserve carry propagation
    for i in range(max(len(a), len(b)) - 1, -1, -1):
        # sign extend, should not be needed as long as the memory row has all 8 bits filled out
        # if i < 8 - len(a):
        #     a_bit = a[0]
        #     b_bit = b[i]
        # elif i < 8 - len(b):
        #     a_bit = a[i]
        #     b_bit = b[0]
        # else:
        a_bit = a[i]
        b_bit = b[i]

        if invert_b:
            b_bit = not b_bit
        result[i], carry_wire = full_adder(carry_wire, a_bit, b_bit)

    return result
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def import_nodes(self, nodes: List[dict],
                     labels: List[str] = None,
                     chunk_size: int = 1000):
        node_labels = ':{0}'.format(':'.join(labels)) \
            if labels else ''
        query = self.import_nodes_template.safe_substitute(labels=node_labels)

        chunk_count = 1

        def batch():
            for i in range(0, len(nodes), chunk_size):
                logger.debug('starting chunk %s', i)
                result = (yield query, dict(props=nodes[i:i + chunk_size]))
                logger.debug(result)

        result = self.run_in_tx(batch(), chunk_count=chunk_count)
        return result
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def parse_name(self, name: str) -> List[str]:
        """
        splits a name into parts separated by ., _, camel casing and 
        similar
        :param name: potentially human name
        :return: list of name parts
        """
        parsed_name = ParsedName(**su.empty_dict(PARSED_NAME_FIELDS))
        lower_name = name.lower()
        if lower_name in self.role_names:
            parsed_name.name_type = self.role_names[lower_name]
            parsed_name.name = lower_name
        else:
            parsed_name.name_type = 'proper'
            parsed_name.name = cleanup_proper_name(name)
        return parsed_name
项目:saapy    作者:ashapochka    | 项目源码 | 文件源码
def parse_lexeme(self, lexeme: str) -> List:
        try:
            clean_lexeme = strip_noise(lexeme)
            low_lexeme = clean_lexeme.lower()
            if low_lexeme in self.terms:
                lexeme_parts = [clean_lexeme]
            else:
                lexeme_parts = split_lexeme(lexeme)
            parsed_lexeme = []
            for lexeme_part in lexeme_parts:
                segments = self.segment_into_words(lexeme, lexeme_part)
                parsed_lexeme.append((lexeme_part, segments))
            return parsed_lexeme
        except Exception:
            logger.exception('failed to parse lexeme {}'.format(lexeme))
            return [(lexeme, [SegmentMap('miss', lexeme, None, lexeme, [0])])]
项目:python-driver    作者:bblfsh    | 项目源码 | 文件源码
def _loadResults(self, format_: str) -> List[Response]:
        """Read all msgs from the recvbuffer"""
        self.recvbuffer.seek(0)

        res: List[Response] = []
        res = [doc for doc in self._extract_docs(self.recvbuffer)]
        return res
项目:Zoom2Youtube    作者:Welltory    | 项目源码 | 文件源码
def _get_files_from_dir(self, path: str, ext: str) -> List[str]:
        """
        Return list of files with .ext
        :param path:
        :param ext:
        :return:
        """
        return [x for x in os.listdir(path) if x.endswith('.{}'.format(ext))]
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def execute(self, db_name: str, query: str, values: List, _type: str):
        """
        Execute SQL query in connection pool
        """
        warnings.warn("Use single methods!", DeprecationWarning)

        if _type not in ('select', 'insert', 'update', 'delete'):
            raise RuntimeError(
                'Wrong request type {}'.format(_type)
            )
        if not self.dbs[db_name]['master']:
            raise RuntimeError(
                'db {} master is not initialized'.format(db_name)
            )

        pool = self.dbs[db_name]['master']
        if _type == 'select' and 'slave' in self.dbs[db_name]:
            pool = self.dbs[db_name]['slave']

        async with pool.acquire() as conn:
            async with conn.cursor(cursor_factory=DictCursor) as cursor:
                await cursor.execute(query, values)
                if _type == 'select':
                    data = await cursor.fetchall()
                else:
                    data = cursor.rowcount
        return data
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def select(self, query: str, values: Union[List, Dict],
                     db_name: str = 'default') -> List[DictRow]:
        return await self._select(query=query, values=values, db_name=db_name)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def first(self, query: str, values: Union[List, Dict],
                    db_name: str = 'default') -> Optional[DictRow]:
        return await self._first(query=query, values=values, db_name=db_name)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def insert(self, query: str, values: Union[List, Dict],
                     db_name: str = 'default', returning: bool = False):
        return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def delete(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
        return await self._execute(query=query, values=values, db_name=db_name)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def _execute(self, query: str, values: Union[List, Dict], db_name: str = 'default',
                       returning: bool = False):
        pool = self.dbs[db_name]['master']
        if pool is None:
            raise RuntimeError('db {} master is not initialized'.format(db_name))

        async with pool.acquire() as conn:
            async with conn.cursor(cursor_factory=DictCursor) as cursor:
                await cursor.execute(query, values)
                if returning:
                    return await cursor.fetchone()
                else:
                    return cursor.rowcount
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def _select(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
        dbs = self.dbs[db_name]
        pool = dbs.get('slave') or dbs.get('master')
        if pool is None:
            raise RuntimeError('db {} master is not initialized'.format(db_name))

        async with pool.acquire() as conn:
            async with conn.cursor(cursor_factory=DictCursor) as cursor:
                await cursor.execute(query, values)
                return await cursor.fetchall()
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def _first(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
        dbs = self.dbs[db_name]
        pool = dbs.get('slave') or dbs.get('master')
        if pool is None:
            raise RuntimeError('db {} master is not initialized'.format(db_name))

        async with pool.acquire() as conn:
            async with conn.cursor(cursor_factory=DictCursor) as cursor:
                await cursor.execute(query, values)
                return await cursor.fetchone()
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def posts_by_user(user: User, limit: Optional[int] = None) -> List[Post]:
    return list(islice(user_posts[user], limit))
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def posts_for_user(user: User, limit: Optional[int] = None) -> List[Post]:
    relevant = merge(*[user_posts[u] for u in following[user]], reverse=True)
    return list(islice(relevant, limit))
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def get_followers(user: User) -> List[User]:
    return sorted(followers[user])
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def get_followed(user: User) -> List[User]:
    return sorted(following[user])
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def search(phrase:str, limit: Optional[int] = None) -> List[Post]:
    # XXX this could benefit from caching and from preindexing
    return list(islice((post for post in posts if phrase in post.text), limit))
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def compute_centroids(groups: Iterable[Sequence[Point]]) -> List[Centroid]:
    'Compute the centroid of each group'
    return [tuple(map(mean, transpose(group))) for group in groups]
项目:modernpython    作者:rhettinger    | 项目源码 | 文件源码
def k_means(data: Iterable[Point], k:int=2, iterations:int=10) -> List[Point]:
    'Return k-centroids for the data'
    data = list(data)
    centroids = sample(data, k)
    for i in range(iterations):
        labeled = assign_data(centroids, data)
        centroids = compute_centroids(labeled.values())
    return centroids
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def __init__(self,
                 sentence: str,
                 chunks: List[Chunk],
                 surfaces: List[str]):

        self.sentence = sentence
        self.chunks = chunks
        self.surfaces = surfaces
        self.depth = self.depth()
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def __init__(self, text: str, delimiter: str, rnnlm_model_path: str):

        self.text = text
        self.sentences = split_text(text, delimiter)  # type: List[str]
        lengths, self.tss = tokenize(self.sentences)

        if not os.path.isfile(rnnlm_model_path):
            raise FileNotFoundError(errno.ENOENT,
                                    os.strerror(errno.ENOENT),
                                    rnnlm_model_path)
        self.rnnlm_model_path = rnnlm_model_path

        self.word_freq, self.n_total_words = self._load_word_freq(threshold=1)

        log_prob_scores = \
            self._calc_log_prob_scores()
        unigram_scores = \
            self._calc_unigram_scores()

        mean_lp_scores = \
            calc_mean_lp_scores(log_prob_scores, lengths)
        norm_lp_div_scores = \
            calc_norm_lp_div_scores(log_prob_scores, unigram_scores)
        norm_lp_sub_scores = \
            calc_norm_lp_sub_scores(log_prob_scores, unigram_scores)
        slor_scores = \
            calc_slor_scores(norm_lp_sub_scores, lengths)

        self.log_prob = average(log_prob_scores)
        self.mean_lp = average(mean_lp_scores)
        self.norm_lp_div = average(norm_lp_div_scores)
        self.norm_lp_sub = average(norm_lp_sub_scores)
        self.slor = average(slor_scores)
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def _calc_log_prob_scores(self) -> List[Union[None, float]]:
        """Get log likelihood scores by calling RNNLM
        """

        textfile = tempfile.NamedTemporaryFile(delete=True)
        content = '\n'.join([''.join(ts) for ts in self.tss]) + '\n'
        textfile.write(str.encode(content))
        textfile.seek(0)

        command = ['rnnlm',
                   '-rnnlm',
                   self.rnnlm_model_path,
                   '-test',
                   textfile.name]
        process = Popen(command, stdout=PIPE, stderr=PIPE)
        output, err = process.communicate()
        lines = [line.strip() for line in output.decode('UTF-8').split('\n')
                 if line.strip() != '']
        scores = []
        for line in lines:
            if line == const.OUT_OF_VOCABULARY:
                scores.append(None)
            else:
                try:
                    score = float(line)
                    scores.append(score)
                except ValueError:
                    pass
        textfile.close()
        return scores
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def _calc_unigram_scores(self) -> List[float]:

        unigram_scores = []
        for ts in self.tss:
            unigram_score = 0.0

            for t in ts:
                n = float(self.n_total_words)
                x = float(self.word_freq.get(t, self.word_freq['<unk/>']))
                unigram_score += math.log(x / n)

            unigram_scores.append(unigram_score)

        return unigram_scores
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def average(xs: List[Union[None, float]]) -> float:
    """Calculate the arithmetic mean of the given values (possibly None)
    >>> '{:.2f}'.format(average([None, 1.0, 2.0]))
    '1.50'
    """
    return numpy.mean([x for x in xs if x is not None])
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def tokenize(sentences: List[str]) -> Tuple[List[int], List[List[str]]]:

    tokenizer = Tokenizer()
    lengths = []
    texts = []
    for s in sentences:
        result = tokenizer.tokenize(s)

        surfaces = [t.surface for t in result]
        lengths.append(len(surfaces))

        text = ' '.join(surfaces)
        texts.append(text)
    return lengths, texts
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def split_text(text: str, delimiter: str='\n') -> List[str]:
    s = [s.strip() for s in text.split(delimiter) if len(s.strip()) > 0]
    return s