Python typing 模块,FrozenSet() 实例源码


项目:simone    作者:matheuspb    | 项目源码 | 文件源码
def down(self, visited: FrozenSet[Any]=None) -> Set[Any]:
        """ Returns the set of reachable nodes by going down on this node """
        if visited is None:
            visited = frozenset()

        if self in visited:
            return {self} if self.symbol not in OPERATORS else set()

        visited |= {self}
        if self.symbol == '|':
            return self.left.down(visited) | self.right.down(visited)
        elif self.symbol == '.':
            return self.left.down(visited)
        elif self.symbol == '*' or self.symbol == '?':
            return self.left.down(visited) | self.right.up(visited)
        elif self.symbol == EPSILON:
            return self.right.up(visited)
        return {self}
项目:simone    作者:matheuspb    | 项目源码 | 文件源码
def up(self, visited: FrozenSet[Any]=None) -> Set[Any]:
        """ Returns the set of reachable nodes by going up on this node """
        if visited is None:
            visited = frozenset()

        if self.symbol == '|':
            # skip the whole right sub tree
            node = self.right
            while node.symbol == '.' or node.symbol == '|':
                node = node.right
            return node.right.up(visited)
        elif self.symbol == '.':
            return self.right.down(visited)
        elif self.symbol == '*':
            return self.left.down(visited) | self.right.up(visited)
        elif self.symbol == '?':
            return self.right.up(visited)
        else:  # self.symbol == END:
            return {self}
项目:simone    作者:matheuspb    | 项目源码 | 文件源码
def _are_undistinguishable(
            self, state_a: str, state_b: str,
            undistinguishable: Set[FrozenSet[str]]) -> bool:
            State a and b are distinguishable if they go to distinguishable
            states for some input symbol.
        for symbol in self._alphabet:
            transition_a = \
                list(self._transitions.get((state_a, symbol), {""}))[0]
            transition_b = \
                list(self._transitions.get((state_b, symbol), {""}))[0]
            if transition_a != transition_b and \
                    frozenset((transition_a, transition_b)) not in \
                return False
        return True
项目:arthur-redshift-etl    作者:harrystech    | 项目源码 | 文件源码
def __init__(self, discovered_files: etl.file_sets.TableFileSet) -> None:
        # Basic properties to locate files describing the relation
        self._fileset = discovered_files
        if discovered_files.scheme == "s3":
            self.bucket_name = discovered_files.netloc
            self.prefix = discovered_files.path
            self.bucket_name = None
            self.prefix = None
        # Note the subtle difference to TableFileSet--here the manifest_file_name is always present since it's computed
        self.manifest_file_name = os.path.join(discovered_files.path or "", "data", self.source_path_name + ".manifest")
        # Lazy-loading of table design and query statement and any derived information from the table design
        self._table_design = None  # type: Optional[Dict[str, Any]]
        self._query_stmt = None  # type: Optional[str]
        self._dependencies = None  # type: Optional[FrozenSet[str]]
        self._is_required = None  # type: Union[None, bool]
项目:simone    作者:matheuspb    | 项目源码 | 文件源码
def merge_equivalent(self) -> None:
        """ Merges equivalent states """
        if not self.is_deterministic():
            raise RuntimeError("Automata is non-deterministic")

        # pairs of undistinguishable states
        undistinguishable = set()  # type: Set[FrozenSet[str]]

        # initially, you can't distinguish final and non-final states
        for pair in combinations(self._states - self._final_states, 2):
        for pair in combinations(self._final_states, 2):

        # find new distinguishable states
        while True:
            new_distinguishable_found = False
            undistinguishable_copy = undistinguishable.copy()
            for state_a, state_b in undistinguishable_copy:
                if not self._are_undistinguishable(
                        state_a, state_b, undistinguishable_copy):
                    undistinguishable.remove(frozenset((state_a, state_b)))
                    new_distinguishable_found = True
            if not new_distinguishable_found:

        for state_a, state_b in undistinguishable:
            self._merge_states(state_a, state_b)
项目:wikidata    作者:dahlia    | 项目源码 | 文件源码
def pytest_assertrepr_compare(op: str, left, right) -> Optional[Sequence[str]]:
    # set of entities
    if op == '==' and isinstance(left, (set, frozenset)) and \
       isinstance(right, (set, frozenset)) and \
       all(isinstance(v, Entity) for v in left) and \
       all(isinstance(v, Entity) for v in right):
        def repr_ids(ids: AbstractSet[EntityId]) -> str:
            sorted_ids = sorted(
                key=lambda i: (
                    # Since EntityIds usually consist of one letter followed
                    # by digits, order them numerically.  If it's not in
                    # that format they should be sorted in the other bucket.
                    (0, int(i[1:])) if i[1:].isdigit() else (1, i[1:])
            return '{' + ', '.join(sorted_ids) + '}'
        left = cast(Union[Set[Entity], FrozenSet[Entity]], left)
        right = cast(Union[Set[Entity], FrozenSet[Entity]], right)
        left_ids = { for e in left}
        right_ids = { for e in right}
        return [
            '{} == {}'.format(repr_ids(left_ids), repr_ids(right_ids)),
            'Extra entities in the left set:',
            repr_ids(left_ids - right_ids),
            'Extra entities in the right set:',
            repr_ids(right_ids - left_ids),
    return None
项目:tsukkomi    作者:spoqa    | 项目源码 | 文件源码
def check_frozenset(a: typing.FrozenSet[int]) -> typing.FrozenSet[int]:
    return a
项目:rets    作者:opendoor-labs    | 项目源码 | 文件源码
def fields(self) -> FrozenSet[str]:
        if self._fields is None:
            self._fields = frozenset(field['SystemName'] for field in self.table)
        return self._fields
项目:master-thesis    作者:AndreasMadsen    | 项目源码 | 文件源码
def vocabulary(self) -> FrozenSet[str]:
项目:master-thesis    作者:AndreasMadsen    | 项目源码 | 文件源码
def _get_vocabulary(self) -> FrozenSet[str]:
        source = ''.join(text_map)
        source_special = ' '
        target = ''.join(np.arange(0, self._digits).astype(np.str))
        return frozenset(source + source_special + target)
项目:pillar    作者:armadillica    | 项目源码 | 文件源码
def user_roles(self) -> typing.FrozenSet[str]:
        return frozenset(self._user_roles)
项目:pillar    作者:armadillica    | 项目源码 | 文件源码
def user_roles_indexable(self) -> typing.FrozenSet[str]:
        return frozenset(self._user_roles_indexable)
项目:pillar    作者:armadillica    | 项目源码 | 文件源码
def user_caps(self) -> typing.Mapping[str, typing.FrozenSet[str]]:
        return self._user_caps
项目:matchpy    作者:HPAC    | 项目源码 | 文件源码
def variables(self) -> FrozenSet[str]:
        """The names of the variables the constraint depends upon.

        Used by matchers to decide when a constraint can be evaluated (which is when all
        the dependency variables have been assigned a value). If the set is empty, the constraint will
        only be evaluated once the whole match is complete.
        return frozenset()
项目:arthur-redshift-etl    作者:harrystech    | 项目源码 | 文件源码
def dependencies(self) -> FrozenSet[str]:
        if self._dependencies is None:
            self._dependencies = frozenset(self.table_design.get("depends_on", []))
        return self._dependencies
项目:moneybot    作者:elsehow    | 项目源码 | 文件源码
def _possible_investments(self, market_state: MarketState) -> FrozenSet[str]:
        Returns a set of all coins that the strategy might invest in, excluding
        return market_state.available_coins() - {}
项目:moneybot    作者:elsehow    | 项目源码 | 文件源码
def _held_coins(self) -> FrozenSet[str]:
        return frozenset(
            coin for (coin, balance)
            in self.balances.items()
            if balance > 0
项目:moneybot    作者:elsehow    | 项目源码 | 文件源码
def available_markets(self) -> FrozenSet[str]:
        """Return a frozenset containing all available markets, e.g. 'BTC_ETH'.
        return frozenset(
                lambda market: market.startswith(,
项目:moneybot    作者:elsehow    | 项目源码 | 文件源码
def available_coins(self) -> FrozenSet[str]:
        markets = self.available_markets()  # All of these start with fiat
        return frozenset(split_currency_pair(m)[1] for m in markets) | {}
项目:moneybot    作者:elsehow    | 项目源码 | 文件源码
def held_coins_with_chart_data(self) -> FrozenSet[str]:
        return self._held_coins() & self.available_coins()
项目:master-thesis    作者:AndreasMadsen    | 项目源码 | 文件源码
def __init__(self,
                 source_lang: str, target_lang: str,
                 key: Any=None,
                 vocabulary: FrozenSet[str]=None,
                 observations: int=None,
                 validate: bool=False,
                 name: str='unamed',
                 tqdm: bool=True,
                 **kwargs) -> None:
        # save basic properties
        self._show_tqdm = tqdm

        # compute properties
        computed_properties = self._fetch_corpus_properties(
            name, key, observations=observations
        # use computed vocabulary if not provided
        if vocabulary is None:
            vocabulary = computed_properties.vocabulary

        # make properties public = CorpusProperties(

        # validate properties
        if '^' in vocabulary:
            raise ValueError('a eos char (^) was found in the vocabulary')
        if '?' in vocabulary:
            raise ValueError('a null char (?) was found in the vocabulary')
        if '' in vocabulary:
            raise ValueError('an invalid char () was found in the vocabulary')

        # create encoding schema

        # set language properties
        self.source_lang = source_lang
        self.target_lang = target_lang

        # validate corpus properties
        if validate:

        # setup tensorflow pipeline
项目:nlp-playground    作者:jamesmishra    | 项目源码 | 文件源码
def word_vector_sum(
        documents: Iterable[str],
        vector_dict: Dict[str, np.array],
        separator: Optional[FrozenSet] = None,
        lowercase: bool = True,
        stop_words: Optional[Set] = None
    Calculate the word vector sum for each document in an iterable.

        documents: An iterable of strings representing text

        vector_dict(dict): A dictionary of words and their
            corresponding vectors.

        separator(frozenset): A frozenset of Unicode code points
            to use as separators. Defaults to

        lowercase(bool): If True, lowercase each document before
            processing it.

        stop_words(set): A set of words whose vectors should not
            be included in the sum.

        (np.array): Return a numpy array containing a word
            vector for each document, where each document's
            word vector is the sum of its word vectors.

    # Dimension of the vectors we're using.
    dimension = len(next(vector_dict.values().__iter__()))
    # If every word in a document is either a stop word or
    # not in the dictionary...
    # then the list comprehension inside np.average will return
    # []. For that case, we will insert an array of zeros.
    is_empty = [np.zeros(dimension)]
    if lowercase:
        documents = (doc.lower() for doc in documents)
    if stop_words:
        return np.asarray([
                for x in cypunct.split(doc, separator)
                if x in vector_dict and
                x not in stop_words
            ] or is_empty, axis=0)
            for doc in documents
    return np.asarray([
            for x in cypunct.split(doc, separator)
            if x in vector_dict
        ] or is_empty, axis=0)
        for doc in documents
项目:moneybot    作者:elsehow    | 项目源码 | 文件源码
def propose_trades_for_partial_rebalancing(
        market_state: MarketState,
        coins_to_rebalance: FrozenSet[str],
    ) -> List[AbstractTrade]:
        """TODO: Trade directly from X to Y without going through fiat.
        ideal_fiat_value_per_coin = self._ideal_fiat_value_per_coin(market_state)

        est_values = market_state.estimate_values(market_state.balances,

        # 1) Fan in to fiat, selling excess value in coins we want to rebalance
        trades_to_fiat = []
        for sell_coin in sorted(coins_to_rebalance):
            if sell_coin ==
            value = est_values.get(sell_coin, 0)
            delta = value - ideal_fiat_value_per_coin
            if delta > 0:
                    AbstractTrade(sell_coin,,, delta),

        # 2) Simulate trades and estimate portfolio state afterwards
        est_balances_after_trades = simulate_trades(
        est_values_after_trades = market_state.estimate_values(

        fiat_after_trades = est_balances_after_trades[]
        fiat_to_redistribute = fiat_after_trades - ideal_fiat_value_per_coin
        if fiat_to_redistribute <= 0:
            return trades_to_fiat

        # 3) Find coins in which we don't hold enough value
        possible_buys = set()
        for buy_coin in self._possible_investments(market_state):
            value = est_values_after_trades.get(buy_coin, 0)
            if ideal_fiat_value_per_coin > value:

        fiat_to_redistribute_per_coin = fiat_to_redistribute / len(possible_buys)

        # 4) Plan trades, fanning back out from fiat to others
        trades_from_fiat = []
        for buy_coin in sorted(possible_buys):
            value = est_values_after_trades.get(buy_coin, 0)
            delta = ideal_fiat_value_per_coin - value
            if delta > 0:
                available_fiat = min(fiat_to_redistribute_per_coin, delta)
                    AbstractTrade(, buy_coin,, available_fiat),

        return trades_to_fiat + trades_from_fiat