我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Set()。
def get_conditions(self) -> List[AbstractCondition]: conditions = [] added_conditions = set() # type: Set[str] # Nodes is a list of nodes for source_node in self.get_hyperparameters(): # This is a list of keys in a dictionary # TODO sort the edges by the order of their source_node in the # hyperparameter list! for target_node in self._children[source_node.name]: if target_node not in added_conditions: condition = self._children[source_node.name][target_node] conditions.append(condition) added_conditions.add(target_node) return conditions
def create_articles(pmids: Set[str], login: object, write: bool = True) -> Dict[str, str]: """ Given a list of pmids, make article items for each :param pmids: list of pmids :param login: wdi_core login instance :param write: actually perform write :return: map pmid -> wdid """ pmid_map = dict() for pmid in pmids: p = wdi_helpers.PubmedItem(pmid) if write: try: pmid_wdid = p.get_or_create(login) except Exception as e: print("Error creating article pmid: {}, error: {}".format(pmid, e)) continue pmid_map[pmid] = pmid_wdid else: pmid_map[pmid] = 'Q1' return pmid_map
def get_contact_dict_for_active_monitor(dbcon: DBConnection, monitor_id: int) -> Dict[str, set]: """Get all contact addresses/numbers for a specific active monitor. Return: Dict[str, Set(str)] for 'email' and 'phone'. """ ret = { 'email': set(), 'phone': set(), } # type: Dict[str, set] contacts = await get_all_contacts_for_active_monitor(dbcon, monitor_id) for contact in contacts: if contact.email: ret['email'].add(contact.email) if contact.phone: ret['phone'].add(contact.phone) return ret
def select_channel( self, versions: typing.Set[CustomVersion], update_channel: str = channel.STABLE ) -> typing.Union[CustomVersion, None]: """ Selects the latest version, equals or higher than "channel" Args: versions: versions to select from update_channel: member of :class:`Channel` Returns: latest version or None """ LOGGER.debug(f'selecting latest version amongst {len(versions)}; active channel: {str(channel)}') options = list(self.filter_channel(versions, update_channel)) if options: latest = max(options) return latest LOGGER.debug('no version passed the test') return None
def analyze_must_defined_regs( blocks: List[BasicBlock], cfg: CFG, initial_defined: Set[Register], num_regs: int) -> AnalysisResult[Register]: """Calculate always defined registers at each CFG location. A register is defined if it has a value along all paths from the initial location. """ return run_analysis(blocks=blocks, cfg=cfg, gen_and_kill=MustDefinedVisitor(), initial=initial_defined, backward=False, kind=MUST_ANALYSIS, universe=set([Register(r) for r in range(num_regs)]))
def add_listener(self, keys: Union[Key, Set[Key]], callback: ListenerCallback) -> None: ''' Attach ``callback`` to one or more keys. If more than one key is provided, the callback will be reading all messages from one queue, and they are guaranteed to arrive in the same order they were published. :param keys: One key, or a set of keys. ''' keys, sub_id = self._get_listener_subscription(keys, callback) for key in keys: self.subscribe(key, sub_id) async def consumer() -> None: key, msg = await self.consume(sub_id) callback(key, msg) loop = aiopubsub.loop.Loop(consumer, delay = None) loop.start() self._listeners[sub_id] = Listener(loop, keys)
def read_all_stop_words() -> Set[str]: # Data source: https://wenku.baidu.com/view/7ca26338376baf1ffc4fad6a.html with open("data/chinese_stop_words.txt", mode="r", encoding="utf-8") as local_file: text_lines = local_file.readlines() text_lines = list(x.replace("\n", "") for x in text_lines) with open("data/chinese_stop_symbols.txt", mode="r", encoding="utf-8") as local_file: symbol_lines = local_file.readlines() symbol_lines = list(x.replace("\n", "") for x in symbol_lines) public_stop_words = get_stop_words("zh") stop_words: Set[str] = set() stop_words = stop_words.union(text_lines) stop_words = stop_words.union(symbol_lines) stop_words = stop_words.union(public_stop_words) return stop_words
def _get_blocks(self, blocks: Union[List[int], Set[int]]): """ Fetch multiple blocks from steemd at once. Warning: This method does not ensure that all blocks are returned, or that the results are ordered. You will probably want to use `steemd.get_blocks()` instead. Args: blocks (list): A list, or a set of block numbers. Returns: A generator with results. """ results = self.exec_multi_with_futures('get_block', blocks, max_workers=10) return ({**x, 'block_num': int(x['block_id'][:8], base=16)} for x in results if x)
def get_majority_vote_in_set_for_event(hashgraph, s: Set[str], x: Event) -> (bool, int): """ Returns the majority vote and the winning amount of stake that a set of witnesses has for another event :param hashgraph: :param s: :param x: :return: Tuple containing the majority vote (bool) and the total stake of the majority vote (int) """ stake_for = 0 stake_against = 0 for event_id in s: event = hashgraph.lookup_table[event_id] if x.id in event.votes and event.votes[x.id]: stake_for += hashgraph.known_members[event.verify_key].stake else: stake_against += hashgraph.known_members[event.verify_key].stake return Fame.TRUE if stake_for >= stake_against else Fame.FALSE, stake_for if stake_for >= stake_against else stake_against
def superbubble_nodes(g: AssemblyGraph, source: Node, sink: Node) -> Set[Node]: """Find all nodes inside a superbubble.""" queue = deque([source]) visited = {source, sink} while queue: current = queue.popleft() for neighbour in g.neighbors_iter(current): if neighbour not in visited: queue.append(neighbour) visited.add(neighbour) return visited
def __init__(self, ploidy: int, copy_from: 'HaplotypeSet'=None): self.ploidy = ploidy # Nodes spelling each haplotype self.haplotypes = [] # type: List[List[Node]] # Also keep a set of reads used for each haplotype, useful for # relative likelihood calculation self.read_sets = [] # type: List[Set[OrientedRead]] if isinstance(copy_from, HaplotypeSet): for i in range(ploidy): self.haplotypes.append(deque(copy_from.haplotypes[i])) self.read_sets.append(set(copy_from.read_sets[i])) else: for i in range(ploidy): self.haplotypes.append(deque()) self.read_sets.append(set()) self.log_rl = float('-inf') self.from_large_bubble = False
def extend(self, extensions: List[Tuple[Node]], ext_read_sets: List[Set[OrientedRead]]) -> 'HaplotypeSet': """Extend the haplotype set with a new set of paths.""" # Make a copy of itself for a new set new_set = HaplotypeSet(self.ploidy, copy_from=self) for hap_num, (extension, read_set) in enumerate( zip(extensions, ext_read_sets)): haplotype_nodes = new_set.haplotypes[hap_num] # Add the nodes of the extension to each haplotype # It's possible that the last node of this haplotype set # (which is probably a bubble exit), is also the bubble entrance # and thus our start node of our extension. if (len(haplotype_nodes) > 0 and haplotype_nodes[-1] == extension[0]): haplotype_nodes.extend(extension[1:]) else: haplotype_nodes.extend(extension) new_set.read_sets[hap_num].update(read_set) return new_set
def _update_grammar_text(self) -> None: """ "B", {"aB", "bC", "a"} turns into "B -> aB | bC | a" """ def transform_production(non_terminal: str, productions: Set[str]): return "{} -> {}".format( non_terminal, " | ".join(sorted(productions))) initial_symbol = self._grammar.initial_symbol() productions = self._grammar.productions() text = "" if initial_symbol in productions: text = transform_production( initial_symbol, productions[initial_symbol]) + "\n" for non_terminal in sorted(set(productions.keys()) - {initial_symbol}): text += transform_production( non_terminal, productions[non_terminal]) + "\n" self.grammarText.setPlainText(text)
def down(self, visited: FrozenSet[Any]=None) -> Set[Any]: """ Returns the set of reachable nodes by going down on this node """ if visited is None: visited = frozenset() if self in visited: return {self} if self.symbol not in OPERATORS else set() visited |= {self} if self.symbol == '|': return self.left.down(visited) | self.right.down(visited) elif self.symbol == '.': return self.left.down(visited) elif self.symbol == '*' or self.symbol == '?': return self.left.down(visited) | self.right.up(visited) elif self.symbol == EPSILON: return self.right.up(visited) return {self}
def up(self, visited: FrozenSet[Any]=None) -> Set[Any]: """ Returns the set of reachable nodes by going up on this node """ if visited is None: visited = frozenset() if self.symbol == '|': # skip the whole right sub tree node = self.right while node.symbol == '.' or node.symbol == '|': node = node.right return node.right.up(visited) elif self.symbol == '.': return self.right.down(visited) elif self.symbol == '*': return self.left.down(visited) | self.right.up(visited) elif self.symbol == '?': return self.right.up(visited) else: # self.symbol == END: return {self}
def remove_state(self, state: str) -> None: """ Removes a state """ # may not remove initial state if state != self._initial_state: self._states.discard(state) self._final_states.discard(state) for symbol in self._alphabet: # remove useless transitions that come from the removed state if (state, symbol) in self._transitions: del self._transitions[state, symbol] empty_transitions = set() # type Set[Tuple[str, str]] for actual_state, next_state in self._transitions.items(): # remove transitions that go to the removed state next_state.discard(state) if not next_state: empty_transitions.add(actual_state) for transition in empty_transitions: del self._transitions[transition]
def _are_undistinguishable( self, state_a: str, state_b: str, undistinguishable: Set[FrozenSet[str]]) -> bool: """ State a and b are distinguishable if they go to distinguishable states for some input symbol. """ for symbol in self._alphabet: transition_a = \ list(self._transitions.get((state_a, symbol), {""}))[0] transition_b = \ list(self._transitions.get((state_b, symbol), {""}))[0] if transition_a != transition_b and \ frozenset((transition_a, transition_b)) not in \ undistinguishable: return False return True
def _determinize_state(self, states_set: Set[str]) -> None: """ For a given set of states, verify whether they pertains to the actual states of the FA. In negative case, add it and insert the transitions properly """ name = "".join(sorted(states_set)) if name and name not in self._states: self.add_state(name) if states_set.intersection(self._final_states): self._final_states.add(name) for symbol in self._alphabet: reachable = self._find_reachable(states_set, symbol) if reachable: self._transitions[name, symbol] = reachable self._determinize_state(reachable)
def _has_recursion(self, to_visit: str, visited: Set[str]) -> bool: """ Checks if the automata has recursive states, using a depth first search approach. """ if to_visit in visited: return True visited.add(to_visit) reachable = set() # type: Set[str] # Find the reachable through all symbols for symbol in self._alphabet: reachable.update(self._find_reachable({to_visit}, symbol)) for state in reachable: if self._has_recursion(state, copy.deepcopy(visited)): return True return False
def from_regular_grammar(grammar) -> 'NFA': """ Converts RegularGrammar to NFA """ initial_symbol = grammar.initial_symbol() productions = grammar.productions() states = set(productions.keys()) | {"X"} alphabet = set() # type: Set[str] transitions = {} # type: Dict[Tuple[str, str], Set[str]] initial_state = initial_symbol final_states = set("X") | \ ({initial_symbol} if "&" in productions[initial_symbol] else set()) for non_terminal, prods in productions.items(): for production in prods: if production == "&": continue new_transition = "X" if len(production) == 1 else production[1] transitions.setdefault( (non_terminal, production[0]), set()).add(new_transition) alphabet.add(production[0]) return NFA(states, alphabet, transitions, initial_state, final_states)
def get_remaining_shared_breaks_this_week(group_members: Set[User]) -> List[Break]: """ Finds this weeks remaining common breaks between a group of users """ # So, the Mypy type checker treats `List` as invariant, meaning we # can't give a `List[B]` to a function that expects a `List[A]` if # B is a subclass of A. # So we have to cast it in to the function... # FIXME: Get rid of these casts when Van Rossum figures out how to write a # proper type system breaks = cast(List[Event_], get_shared_breaks(group_members)) now = datetime.now(BRISBANE_TIME_ZONE) ### ... and out. return cast(List[Break], get_this_weeks_events(now, breaks)) # FIXME: Make 'request_status' an enum: https://docs.python.org/3/library/enum.html
def search_show_ids_by_names(self, *names, exact=False) -> Set[Show]: shows = set() for name in names: debug("Searching shows by name: {}".format(name)) if exact: self.q.execute("SELECT show, name FROM ShowNames WHERE name = ?", (name,)) else: self.q.execute("SELECT show, name FROM ShowNames WHERE name = ? COLLATE alphanum", (name,)) matched = self.q.fetchall() for match in matched: debug(" Found match: {} | {}".format(match[0], match[1])) shows.add(match[0]) return shows # Helper methods ## Conversions
def init_by_vocabulary(self, lemma_counter: Counter, lemma_to_word_forms: Dict[str, Set[WordForm]], lemma_case: Dict[str, LemmaCase]): """ ?????? ??????? ?? ???????????????? ?????? :param lemma_counter: Counter ?? ??????. :param lemma_to_word_forms: ??????????? ?? ????? ? ?????? ????????? ????????? ??? ??? (?? ?????????) :param lemma_case: ??????????? ?? ????? ? ??? ?????????????, ????????? ??? ???? ????? """ for i, (lemma, _) in enumerate(tqdm(lemma_counter.most_common(), desc="Init vocabulary")): for word_form in lemma_to_word_forms[lemma]: word_form.set_case(lemma_case[word_form.lemma]) self.word_forms.append(word_form) self.word_form_indices[word_form] = len(self.word_forms) - 1 assert self.word_forms[self.word_form_indices[word_form]] == word_form self.lemma_indices[word_form] = i + 1 # 0 - ?????????????? ??? ????????. assert self.lemma_indices[SEQ_END_WF] == 1
def __init__(self, language: str="ru", mode: Mode=Mode.GRAPHEMES, raw_dict_path=None, trie_path=None, zalyzniak_dict=ZALYZNYAK_DICT, cmu_dict=CMU_DICT) -> None: self.data = pygtrie.Trie() # type: Dict[str, Set[Stress]] self.raw_dict_path = raw_dict_path self.trie_path = trie_path if language == "ru" and mode == self.Mode.GRAPHEMES: self.__init_defaults(RU_GRAPHEME_STRESS_PATH, RU_GRAPHEME_STRESS_TRIE_PATH) if not os.path.exists(self.raw_dict_path): from rupo.dict.zaliznyak import ZalyzniakDict ZalyzniakDict.convert_to_accent_only(zalyzniak_dict, self.raw_dict_path) elif mode == self.Mode.PHONEMES and language == "en": self.__init_defaults(EN_PHONEME_STRESS_PATH, EN_PHONEME_STRESS_TRIE_PATH) if not os.path.exists(self.raw_dict_path): CMUDict.convert_to_phoneme_stress(cmu_dict, self.raw_dict_path) else: assert False if not os.path.isfile(self.raw_dict_path): raise FileNotFoundError("Dictionary raw file not found.") if os.path.isfile(self.trie_path): self.load(self.trie_path) else: self.create(self.raw_dict_path, self.trie_path)
def _find_default_mounts() -> Set[str]: global config_paths basepath = os.getcwd() miniparser = configargparse.ArgumentParser() miniparser.add_argument(*args_for_setting_config_path, dest="config", action="append", default=[]) args, _ = miniparser.parse_known_args() # type: ignore, because mypy doesn't parse add_argument above correctly if not args.config: args.config = default_config_files paths = set() paths.add(basepath) for cfg in args.config: if os.path.isfile(cfg): paths.add(os.path.abspath(os.path.dirname(cfg))) config_paths.add(os.path.abspath(os.path.dirname(cfg))) return paths
def __init__(self): """ TODO :param program_ast: TODO :return: None """ super(PyCoolSemanticAnalyser, self).__init__() # Initialize the internal program ast instance. self._program_ast = None # Classes Map: maps each class name (key: String) to its class instance (value: AST.Class). # Dict[AnyStr, AST.Class] self._classes_map = dict() # Class Inheritance Graph: maps a parent class (key: String) to a unique collection of its # children classes (value: set). # Dict[AnyStr, Set] self._inheritance_graph = defaultdict(set) # ######################################################################### # PUBLIC # # #########################################################################
def _get_commands(dist # type: setuptools.dist.Distribution ): # type: (...) -> typing.Dict[str, typing.Set[str]] """Find all commands belonging to the given distribution. Args: dist: The Distribution to search for docopt-compatible docstrings that can be used to generate command entry points. Returns: A dictionary containing a mapping of primary commands to sets of subcommands. """ py_files = (f for f in setuptools.findall() if os.path.splitext(f)[1].lower() == '.py') pkg_files = (f for f in py_files if _get_package_name(f) in dist.packages) commands = {} # type: typing.Dict[str, typing.Set[str]] for file_name in pkg_files: with open(file_name) as py_file: module = typing.cast(ast.Module, ast.parse(py_file.read())) module_name = _get_module_name(file_name) _append_commands(commands, module_name, _get_module_commands(module)) _append_commands(commands, module_name, _get_class_commands(module)) _append_commands(commands, module_name, _get_function_commands(module)) return commands
def _append_commands(dct, # type: typing.Dict[str, typing.Set[str]] module_name, # type: str commands # type:typing.Iterable[_EntryPoint] ): # type: (...) -> None """Append entry point strings representing the given Command objects. Args: dct: The dictionary to append with entry point strings. Each key will be a primary command with a value containing a list of entry point strings representing a Command. module_name: The name of the module in which the command object resides. commands: A list of Command objects to convert to entry point strings. """ for command in commands: entry_point = '{command}{subcommand} = {module}{callable}'.format( command=command.command, subcommand=(':{}'.format(command.subcommand) if command.subcommand else ''), module=module_name, callable=(':{}'.format(command.callable) if command.callable else ''), ) dct.setdefault(command.command, set()).add(entry_point)
def set_grade(self, new_grade: float, user: User) -> None: """Set the grade to the new grade. .. note:: This also passes back the grade to LTI if this is necessary (see :py:func:`passback_grade`). :param new_grade: The new grade to set :param user: The user setting the new grade. :returns: Nothing """ self._grade = new_grade passback = self.assignment.should_passback grade = self.grade history = GradeHistory( is_rubric=self._grade is None and grade is not None, grade=-1 if grade is None else grade, passed_back=False, work=self, user=user ) db.session.add(history) db.session.flush() if passback: psef.tasks.passback_grades([self.id])
def _add_defining_attribute(self, coll: Collection, group: int, rels: Set[RF2Files.Relationship]) -> None: if group == 0: for rel in rels: restr = existential_restriction(self, as_uri(rel.typeId), as_uri(rel.destinationId)) if rel.typeId in self._context.NEVER_GROUPED: coll.append(restr) else: coll.append(role_group(self, restr)) else: if len(rels) > 1: # A group whose target is an intersection of subjects + inner restrictions target, inner_coll = intersection(self) [inner_coll.append(existential_restriction(self, as_uri(rel.typeId), as_uri(rel.destinationId))) for rel in rels] coll.append(role_group(self, target)) else: rel = list(rels)[0] coll.append(existential_restriction(self, as_uri(rel.typeId), as_uri(rel.destinationId)))
def copy_tree_actions(base: str, include_patterns: t.Union[t.List[str], str] = ["**", "**/.*"], exclude_patterns: t.List[str] = None) -> t.List[Action]: """ Actions for all files and directories in the base directory that match the given patterns. It's used to copy a whole directory tree. :param base: base directory :param include_pattern: patterns that match the paths that should be included :param exclude_patterns: patterns that match the paths that should be excluded :return: list of actions """ paths = matched_paths(base, include_patterns, exclude_patterns) files = set() # type: t.Set[str] dirs = set() # type: t.Set[str] ret = [] # type: t.List[Action] for path in paths: ret.extend(actions_for_dir_path(path, path_acc=dirs)) if os.path.isfile(path) and path not in files: files.add(path) ret.append(CopyFile(normalize_path(path))) return ret
def actions_for_dir_path(path: str, path_acc: t.Set[str] = set()) -> t.List[Action]: """ Returns a list of actions that is needed to create a folder and it's parent folders. :param path: :param path_acc: paths already examined """ path = abspath(path) typecheck_locals(path=FileName(allow_non_existent=False)|DirName(), create=Bool()) assert os.path.exists(path) if path == "" or path == "~": return [] path = normalize_path(path) parts = path.split("/") ret = [] for i in range(2 if parts[0] == "~" else 1, len(parts) + 1 if os.path.isdir(abspath(path)) else len(parts)): subpath = "/".join(parts[:i]) subpath_norm = normalize_path(subpath) if subpath_norm in path_acc: continue ret.append(CreateDir(subpath_norm)) path_acc.add(subpath_norm) return ret
def combine(*messages: t.Tuple[t.Optional['StatMessage']]) -> t.List['StatMessage']: """ Combines all message of the same type and with the same parent in the passed list. Ignores None entries. :param messages: passed list of messages :return: new reduced list """ msgs = set([msg for msg in messages if msg is not None]) # t.Set['StatMessage'] something_changed = True while something_changed: something_changed = False merged_pair = None # type: t.Tuple['StatMessage', 'StatMessage'] for (msg, msg2) in itertools.product(msgs, msgs): if msg is not msg2: if msg.parent.eq_except_property(msg2.parent) and type(msg) == type(msg2): merged_pair = (msg, msg2) something_changed = True break if something_changed: msg, msg2 = merged_pair msgs.remove(msg) msgs.remove(msg2) msgs.add(msg + msg2) return list(msgs)
def test_typing(): from typing import Any, List, Set, Dict, Type, Tuple assert name_type(Any) == "Any" assert name_type(List) == "List" assert name_type(List[Any]) == "List" assert name_type(List[str]) == "List[str]" assert name_type(List[int]) == "List[int]" assert name_type(Set) == "Set" assert name_type(Set[Any]) == "Set" assert name_type(Set[List]) == "Set[List]" assert name_type(Dict) == "Dict" assert name_type(Dict[Any, Any]) == "Dict" assert name_type(Dict[str, int]) == "Dict[str, int]" assert name_type(Type) == "Type" assert name_type(Type[int]) == "Type[int]" assert name_type(Type[MagicType]) == "Type[MagicType]" assert name_type(Tuple) == "Tuple" assert name_type(Tuple[int]) == "Tuple[int]" assert name_type(Tuple[int, str, List]) == "Tuple[int, str, List]" assert name_type(Tuple[int, Ellipsis]) == "Tuple[int, ...]" assert name_type(Tuple[str, Ellipsis]) == "Tuple[str, ...]"
def name(self): if self._args is None: return "Callable" elif self._args[0] is Ellipsis: return "Callable[..., %s]" % checker_for_type(self._args[1]).name() else: return "Callable[[%s], %s]" % ( ", ".join(checker_for_type(z).name() for z in self._args[:-1]), checker_for_type(self._args[-1]).name()) # ------------------------------------------------------------------------------ # # Set operations with checkers # ------------------------------------------------------------------------------
def __init__(self, master_db, lib_name, params, used_names, **kwargs): # type: (MasterDB, str, Dict[str, Any], Set[str], **kwargs) -> None self._master_db = master_db self._lib_name = lib_name self._used_names = used_names # set parameters params_info = self.get_params_info() default_params = self.get_default_param_values() self.params = {} if params_info is None: # compatibility with old schematics generators self.params.update(params) self._prelim_key = self.to_immutable_id((self._get_qualified_name(), params)) self._cell_name = None self._key = None else: self.populate_params(params, params_info, default_params, **kwargs) # get unique cell name self._prelim_key = self.compute_unique_key() self.update_master_info() self.children = None self._finalized = False
def deserialize_abstract_type(cls, data): abstract_type_map = { typing.Sequence: list, typing.List: list, typing.Dict: dict, typing.Set: set, typing.AbstractSet: set, } cls_origin_type = cls.__origin__ if cls_origin_type is None: cls_origin_type = cls iterable_types = { typing.Sequence, typing.List, typing.Tuple, typing.Set, typing.AbstractSet, typing.Mapping, } if cls_origin_type in iterable_types: return deserialize_iterable_abstract_type(cls, cls_origin_type, data) else: return abstract_type_map[cls_origin_type](data)
def hamiltonian_cycle(g: Graph, start: Vertex) -> List[Vertex]: path = [start] current = start visited: Set[Vertex] = set() try: while len(visited) != g.order: visited.add(current) (_, nearest) = min( (g.weight[current, v], v) for v in g.neighbors(current) if v not in visited ) path.append(nearest) current = nearest except ValueError as e: if len(path) == g.order: return path raise HamiltonianCycleNotFound('graph has dead ends')
def dfs(g: Graph, current: Vertex, condition: Test, visited: Set = None) -> Optional[Vertex]: visited = visited or set() if current in visited: return None visited.add(current) if condition(current): return current for n in g.neighbors(current): v = dfs(g, n, condition, visited) if v is not None: return v return None
def transitive_closure( g: Graph, v: Vertex, visited: Optional[Set[Vertex]] = None) -> Set[Vertex]: """ Returns a set containing all vertices reachable from v """ visited = visited or set() visited.add(v) for v_neigh in g.neighbors(v): if v_neigh not in visited: transitive_closure(g, v_neigh, visited) return visited
def test_typing_extractor_register(typing_extractor): def extract_set(extractor, typ): subtype = Any if typ.__args__ and typ.__args__[0] is not Any: subtype = typ.__args__[0] return { "type": "array", "title": "set", "items": extractor.extract(extractor, subtype) } typing_extractor.register(set, extract_set) assert typing_extractor.extract(typing_extractor, Set[int]) == { "type": "array", "title": "set", "items": {"type": "integer"} }
def from_type(t): '''Converts a type `t` to a Typeable ''' if isinstance(t, Typeable): return t if isinstance(t, list): if len(t) != 1: if len(t) < 1: reason = 'Missing type parameter' else: reason = 'Too many type parameters, only homogenous lists allowed' msg = 'Can only use literal list alias with a single type, `{}` is invalid: {}' raise ValueError(msg.format(repr(t), reason)) t0 = from_type(t[0]).typ return from_type(typing.List[t0]) elif isinstance(t, set): if len(t) != 1: if len(t) < 1: reason = 'Missing type parameter' else: reason = 'Too many type parameters, only homogenous sets allowed' msg = 'Can only use literal set alias with a single type, `{}` is invalid: {}' raise ValueError(msg.format(repr(t), reason)) t0 = from_type(next(iter(t))).typ return from_type(typing.Set[t0]) elif isinstance(t, tuple): args = tuple([from_type(a).typ for a in t]) return from_type(typing.Tuple[args]) return _from_typing36(t)
def test_setint_convert(): it = typeable.from_type({int}) assert it.typ is typing.Set[int] assert it.origin.typ is typing.Set assert it.origin.origin is None assert it.origin.args == [] assert it.arity == 0
def __init__(self, seed: Union[int, None] = None) -> None: self._hyperparameters = OrderedDict() # type: OrderedDict[str, Hyperparameter] self._hyperparameter_idx = dict() # type: Dict[str, int] self._idx_to_hyperparameter = dict() # type: Dict[int, str] # Use dictionaries to make sure that we don't accidently add # additional keys to these mappings (which happened with defaultdict()). # This once broke auto-sklearn's equal comparison of configuration # spaces when _children of one instance contained all possible # hyperparameters as keys and empty dictionaries as values while the # other instance not containing these. self._children = OrderedDict() # type: OrderedDict[str, OrderedDict[str, Union[None, AbstractCondition]]] self._parents = OrderedDict() # type: OrderedDict[str, OrderedDict[str, Union[None, AbstractCondition]]] # changing this to a normal dict will break sampling because there is # no guarantee that the parent of a condition was evaluated before self._conditionals = set() # type: Set[str] self.forbidden_clauses = [] # type: List['AbstractForbiddenComponent'] self.random = np.random.RandomState(seed) self._children['__HPOlib_configuration_space_root__'] = OrderedDict() # caching self._parent_conditions_of = dict() self._child_conditions_of = dict() self._parents_of = dict() self._children_of = dict()
def __init__(self, nodes: Set[Node], in_node: Node, out_node: Node, edges: Set[Edge]): """Control flow graph representation. :param nodes: set of nodes of the control flow graph :param in_node: entry node of the control flow graph :param out_node: exit node of the control flow graph :param edges: set of edges of the control flow graph """ self._nodes = {node.identifier: node for node in nodes} self._in_node = in_node self._out_node = out_node self._edges = {(edge.source, edge.target): edge for edge in edges}
def in_edges(self, node: Node) -> Set[Edge]: """Ingoing edges of a given node. :param node: given node :return: set of ingoing edges of the node """ return {self.edges[(source, target)] for (source, target) in self.edges if target == node}
def predecessors(self, node: Node) -> Set[Node]: """Predecessors of a given node. :param node: given node :return: set of predecessors of the node """ return {edge.source for edge in self.in_edges(node)}
def out_edges(self, node: Node) -> Set[Edge]: """Outgoing edges of a given node. :param node: given node :return: set of outgoing edges of the node """ return {self.edges[(source, target)] for (source, target) in self.edges if source == node}
def successors(self, node: Node) -> Set[Node]: """Successors of a given node. :param node: given node :return: set of successors of the node """ return {edge.target for edge in self.out_edges(node)}
def result(self, result: Set[Expression]): self._result = result