我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Type()。
def __init__(self, username: str, password: str, botModule: str, botconfig: Mapping, numPlayers: int, variant: Variant, spectators: bool, gameName: str, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.username: str = username self.password: str = password module = importlib.import_module(botModule + '.bot') self.botCls: Type[Bot] = module.Bot # type: ignore self.botconfig: Mapping = botconfig self.numPlayers: int = numPlayers self.variant: Variant = variant self.spectators: bool = spectators self.gameName: str = gameName self.conn: socketIO_client.SocketIO self.tablePlayers: List[str] = [] self.readyToStart: bool = False self.game: Optional[Game] = None
def __init__(self, variables: List[VariableIdentifier], lattices: Dict[Type, Type[Lattice]], arguments: Dict[Type, Dict[str, Any]] = defaultdict(lambda: dict())): """Create a mapping Var -> L from each variable in Var to the corresponding element in L. :param variables: list of program variables :param lattices: dictionary from variable types to the corresponding lattice types :param arguments: dictionary from variable types to arguments of the corresponding lattices """ super().__init__() self._variables = variables self._lattices = lattices self._arguments = arguments try: self._store = {v: lattices[type(v.typ)](**arguments[type(v.typ)]) for v in variables} except KeyError as key: error = f"Missing lattice for variable type {repr(key.args[0])}!" raise ValueError(error)
def dispatch(method: Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]: dispatcher = singledispatch(method) provides = set() def wrapper(self: Any, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Any: call = dispatcher.dispatch(type) try: return call(self, query, context=context) except TypeError: raise DataSource.unsupported(type) def register(type: Type[T]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]: provides.add(type) return dispatcher.register(type) wrapper.register = register wrapper._provides = provides update_wrapper(wrapper, method) return wrapper
def dispatch(method: Callable[[Any, Type[T], Any, PipelineContext], None]) -> Callable[[Any, Type[T], Any, PipelineContext], None]: dispatcher = singledispatch(method) accepts = set() def wrapper(self: Any, type: Type[T], items: Any, context: PipelineContext = None) -> None: call = dispatcher.dispatch(type) try: return call(self, items, context=context) except TypeError: raise DataSink.unsupported(type) def register(type: Type[T]) -> Callable[[Any, Type[T], Any, PipelineContext], None]: accepts.add(type) return dispatcher.register(type) wrapper.register = register wrapper._accepts = accepts update_wrapper(wrapper, method) return wrapper
def _transform(self, source_type: Type[S], target_type: Type[T]) -> Tuple[Callable[[S], T], int]: try: LOGGER.info("Searching type graph for shortest path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__)) path = dijkstra_path(self._type_graph, source=source_type, target=target_type, weight="cost") LOGGER.info("Found a path from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__)) except (KeyError, NetworkXNoPath): raise NoConversionError("Pipeline can't convert \"{source_type}\" to \"{target_type}\"".format(source_type=source_type, target_type=target_type)) LOGGER.info("Building transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__)) chain = [] cost = 0 for source, target in _pairwise(path): transformer = self._type_graph.adj[source][target][_TRANSFORMER] chain.append((transformer, target)) cost += transformer.cost LOGGER.info("Built transformer chain from \"{source_type}\" to \"{target_type}\"".format(source_type=source_type.__name__, target_type=target_type.__name__)) if not chain: return _identity, 0 return partial(_transform, transformer_chain=chain), cost
def _best_transform_from(self, source_type: Type[S], target_types: Iterable[Type]) -> Tuple[Callable[[S], Any], Type, int]: best = None best_cost = _MAX_TRANSFORM_COST to_type = None for target_type in target_types: try: transform, cost = self._transform(source_type, target_type) if cost < best_cost: best = transform best_cost = cost to_type = target_type except NoConversionError: pass if best is None: raise NoConversionError("Pipeline can't convert \"{source_type}\" to any of \"{target_types}\"".format(source_type=source_type, target_types=target_types)) return best, to_type, best_cost
def _best_transform_to(self, target_type: Type[T], source_types: Iterable[Type]) -> Tuple[Callable[[T], Any], Type, int]: best = None best_cost = _MAX_TRANSFORM_COST from_type = None for source_type in source_types: try: transform, cost = self._transform(source_type, target_type) if cost < best_cost: best = transform best_cost = cost from_type = source_type except NoConversionError: pass if best is None: raise NoConversionError("Pipeline can't convert from any of \"{source_types}\" to \"{target_type}\"".format(source_types=source_types, target_type=target_type)) return best, from_type, best_cost
def _create_source_handlers(self, type: Type[T]) -> List[_SourceHandler]: source_handlers = [] for source, targets in self._sources: if TYPE_WILDCARD in source.provides or type in source.provides: sink_handlers = self._create_sink_handlers(type, targets) source_handlers.append(_SourceHandler(source, type, _identity, {sink_handler: False for sink_handler in sink_handlers})) else: try: transform, source_type, cost = self._best_transform_to(type, source.provides) # If we got past the above function call, then there is a transformer from `source_type` to `type` pre_handlers, post_handlers = self._create_sink_handlers_simultaneously(source_type, transform, type, targets) sink_handlers = {sink_handler: False for sink_handler in pre_handlers} sink_handlers.update({sink_handler: True for sink_handler in post_handlers}) source_handlers.append(_SourceHandler(source, source_type, transform, sink_handlers)) except NoConversionError: pass return source_handlers
def put(self, type: Type[T], item: T) -> None: """Puts an objects into the data pipeline. The object may be transformed into a new type for insertion if necessary. Args: item: The object to be inserted into the data pipeline. """ LOGGER.info("Getting SinkHandlers for \"{type}\"".format(type=type.__name__)) try: handlers = self._put_types[type] except KeyError: try: LOGGER.info("Building new SinkHandlers for \"{type}\"".format(type=type.__name__)) handlers = self._put_handlers(type) except NoConversionError: handlers = None self._get_types[type] = handlers LOGGER.info("Creating new PipelineContext") context = self._new_context() LOGGER.info("Sending item \"{item}\" to SourceHandlers".format(item=item)) if handlers is not None: for handler in handlers: handler.put(item, context)
def put_many(self, type: Type[T], items: Iterable[T]) -> None: """Puts multiple objects of the same type into the data sink. The objects may be transformed into a new type for insertion if necessary. Args: items: An iterable (e.g. list) of objects to be inserted into the data pipeline. """ LOGGER.info("Getting SinkHandlers for \"{type}\"".format(type=type.__name__)) try: handlers = self._put_types[type] except KeyError: try: LOGGER.info("Building new SinkHandlers for \"{type}\"".format(type=type.__name__)) handlers = self._put_handlers(type) except NoConversionError: handlers = None self._get_types[type] = handlers LOGGER.info("Creating new PipelineContext") context = self._new_context() LOGGER.info("Sending items \"{items}\" to SourceHandlers".format(items=items)) if handlers is not None: items = list(items) for handler in handlers: handler.put_many(items, context)
def with_default(self, value: Union[Any, Callable[[MutableMapping[str, Any]], Any]], supplies_type: Type = None) -> "QueryValidator": if self._current is None or self._current.child is not None: raise QueryValidatorStructureError("No key is selected! Try using \"can_have\" before \"with_default\".") if self._current.required: raise QueryValidatorStructureError("Can't assign a default value to a required key! Try using \"can_have\" instead of \"have\".") if supplies_type: expected_type = supplies_type else: expected_type = type(value) default_node = _DefaultValueNode(self._current.key, value, supplies_type) result = self.as_(expected_type) result._current.child.child = default_node return result
def dispatch(method: Callable[[Any, Type[T], F, PipelineContext], T]) -> Callable[[Any, Type[T], F, PipelineContext], T]: dispatcher = singledispatch(method) transforms = {} def wrapper(self: Any, target_type: Type[T], value: F, context: PipelineContext = None) -> T: call = dispatcher.dispatch(TypePair[value.__class__, target_type]) try: return call(self, value, context=context) except TypeError: raise DataTransformer.unsupported(target_type, value) def register(from_type: Type[F], to_type: Type[T]) -> Callable[[Any, Type[T], F, PipelineContext], T]: try: target_types = transforms[from_type] except KeyError: target_types = set() transforms[from_type] = target_types target_types.add(to_type) return dispatcher.register(TypePair[from_type, to_type]) wrapper.register = register wrapper._transforms = transforms update_wrapper(wrapper, method) return wrapper
def _initializer_wrapper(init_function: Callable[..., None]) -> Type[Initializer]: class Init(Initializer): def __init__(self, **kwargs): self._init_function = init_function self._kwargs = kwargs def __call__(self, tensor: torch.autograd.Variable) -> None: self._init_function(tensor, **self._kwargs) def __repr__(self): return 'Init: %s, with params: %s' % (self._init_function, self._kwargs) @classmethod def from_params(cls, params: Params): return cls(**params.as_dict()) return Init # There are no classes to decorate, so we hack these into Registrable._registry
def multimask_images(images: Iterable[SpatialImage], masks: Sequence[np.ndarray], image_type: type = None ) -> Iterable[Sequence[np.ndarray]]: """Mask images with multiple masks. Parameters ---------- images: Images to mask. masks: Masks to apply. image_type: Type to cast images to. Yields ------ Sequence[np.ndarray] For each mask, a masked image. """ for image in images: yield [mask_image(image, mask, image_type) for mask in masks]
def mask_images(images: Iterable[SpatialImage], mask: np.ndarray, image_type: type = None) -> Iterable[np.ndarray]: """Mask images. Parameters ---------- images: Images to mask. mask: Mask to apply. image_type: Type to cast images to. Yields ------ np.ndarray Masked image. """ for images in multimask_images(images, (mask,), image_type): yield images[0]
def get_parsers_classes(self, filter_name: str=None) -> List[Type['BaseParser']]: parsers_list = list() for parser in self.parsers: parser_name = getattr(parser, 'name') if filter_name: if filter_name in parser_name: if parser_name == 'generic': parsers_list.append(parser) else: parsers_list.insert(0, parser) else: if parser_name == 'generic': parsers_list.append(parser) else: parsers_list.insert(0, parser) return parsers_list
def get_table(self, table_name: str) -> 'typing.Type[Table]': """ Gets a table from the current metadata. :param table_name: The name of the table to get. :return: A :class:`.Table` object. """ try: return self.tables[table_name] except KeyError: # we can load this from the name instead for table in self.tables.values(): if table.__name__ == table_name: return table else: return None
def run_update_query(self, query: 'md_query.BaseQuery'): """ Executes an update query. :param query: The :class:`.RowUpdateQuery` or :class:`.BulkUpdateQuery` to execute. """ if isinstance(query, md_query.RowUpdateQuery): for row, (sql, params) in zip(query.rows_to_update, query.generate_sql()): if md_inspection._get_mangled(row, "deleted"): raise RuntimeError("Row '{}' is marked as deleted".format(row)) if sql is None and params is None: continue await self.execute(sql, params) # copy the history of the row row._previous_values = row._values elif isinstance(query, md_query.BulkUpdateQuery): sql, params = query.generate_sql() await self.execute(sql, params) else: raise TypeError("Type {0.__class__.__name__} is not an update query".format(query)) return query
def run_delete_query(self, query: 'md_query.RowDeleteQuery'): """ Executes a delete query. :param query: The :class:`.RowDeleteQuery` or :class:`.BulkDeleteQuery` to execute. """ if isinstance(query, md_query.RowDeleteQuery): for row, (sql, params) in zip(query.rows_to_delete, query.generate_sql()): if md_inspection._get_mangled(row, "deleted"): raise RuntimeError("Row '{}' is already marked as deleted".format(row)) if sql is None and params is None: continue await self.execute(sql, params) md_inspection._set_mangled(row, "deleted", True) elif isinstance(query, md_query.BulkDeleteQuery): sql, params = query.generate_sql() await self.execute(sql, params) else: raise TypeError("Type {0.__class__.__name__} is not a delete query".format(query)) return query
def __init__(self, lsf_path: str, types: List[Type[pyimc.Message]] = None, make_index=True): """ Reads an LSF file. :param lsf_path: The path to the LSF file. :param types: The message types to return. List of pyimc message classes. :param make_index: If true, an index that speeds up subsequent reads is created. """ self.fpath = lsf_path self.f = None # type: io.BufferedIOBase self.header = IMCHeader() # Preallocate header buffer self.parser = pyimc.Parser() self.idx = {} # type: Dict[Union[int, str], List[int]] self.make_index = make_index if types: self.msg_types = [pyimc.Factory.id_from_abbrev(x.__name__) for x in types] else: self.msg_types = None
def get_all_subclasses(cls: t.Type[T]) -> t.Iterable[t.Type['T']]: """Returns all subclasses of the given class. Stolen from: https://stackoverflow.com/questions/3862310/how-can-i-find-all-subclasses-of-a-class-given-its-name :param cls: The parent class :returns: A list of all subclasses """ all_subclasses = [] for subclass in cls.__subclasses__(): all_subclasses.append(subclass) all_subclasses.extend(get_all_subclasses(subclass)) return all_subclasses
def _filter_or_404(model: t.Type[Y], get_all: bool, criteria: t.Tuple) -> t.Union[Y, t.Sequence[Y]]: """Get the specified object by filtering or raise an exception. :param get_all: Get all objects if ``True`` else get a single one. :param model: The object to get. :param criteria: The criteria to filter with. :returns: The requested object. :raises APIException: If no object with the given id could be found. (OBJECT_ID_NOT_FOUND) """ crit_str = ' AND '.join(str(crit) for crit in criteria) query = model.query.filter(*criteria) # type: ignore obj = query.all() if get_all else query.one_or_none() if not obj: raise psef.errors.APIException( f'The requested {model.__name__.lower()} was not found', f'There is no "{model.__name__}" when filtering with {crit_str}', psef.errors.APICodes.OBJECT_ID_NOT_FOUND, 404 ) return obj
def filter_all_or_404(model: t.Type[Y], *criteria: t.Any) -> t.Sequence[Y]: """Get all objects of the specified model filtered by the specified criteria. .. note:: ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a SQLAlchemy model. :param model: The object to get. :param criteria: The criteria to filter with. :returns: The requested objects. :raises APIException: If no object with the given id could be found. (OBJECT_ID_NOT_FOUND) """ return t.cast(t.Sequence[Y], _filter_or_404(model, True, criteria))
def filter_single_or_404(model: t.Type[Y], *criteria: t.Any) -> Y: """Get a single object of the specified model by filtering or raise an exception. .. note:: ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a SQLAlchemy model. :param model: The object to get. :param criteria: The criteria to filter with. :returns: The requested object. :raises APIException: If no object with the given id could be found. (OBJECT_ID_NOT_FOUND) """ return t.cast(Y, _filter_or_404(model, False, criteria))
def get_or_404(model: t.Type[Y], object_id: t.Any) -> Y: """Get the specified object by primary key or raise an exception. .. note:: ``Y`` is bound to :py:class:`psef.models.Base`, so it should be a SQLAlchemy model. :param model: The object to get. :param object_id: The primary key identifier for the given object. :returns: The requested object. :raises APIException: If no object with the given id could be found. (OBJECT_ID_NOT_FOUND) """ obj: t.Optional[Y] = model.query.get(object_id) if obj is None: raise psef.errors.APIException( f'The requested "{model.__name__}" was not found', f'There is no "{model.__name__}" with primary key {object_id}', psef.errors.APICodes.OBJECT_ID_NOT_FOUND, 404 ) return obj
def create_from_request(cls: t.Type['LTI'], req: flask.Request) -> 'LTI': params = req.form.copy() lti_provider = models.LTIProvider.query.filter_by( key=params['oauth_consumer_key'] ).first() if lti_provider is None: lti_provider = models.LTIProvider(key=params['oauth_consumer_key']) db.session.add(lti_provider) db.session.commit() params['lti_provider_id'] = lti_provider.id # This is semi sensitive information so it should not end up in the JWT # token. launch_params = {} for key, value in params.items(): if not key.startswith('oauth'): launch_params[key] = value self = cls(launch_params, lti_provider) auth.ensure_valid_oauth(self.key, self.secret, req) return self
def ensure_valid_oauth( key: str, secret: str, request: t.Any, parser_cls: t.Type = _FlaskOAuthValidator ) -> None: """Make sure the given oauth key and secret is valid for the given request. :param str key: The oauth key to be used for validating. :param str secret: The oauth secret to be used for validating. :param object request: The request that should be validated. :param RequestValidatorMixin parser_cls: The class used to parse the given ``request`` it should subclass :py:class:`RequestValidatorMixin` and should at least override the :func:`RequestValidatorMixin.parse_request` method. :returns: Nothing """ validator = parser_cls(key, secret) if not validator.is_valid_request(request): raise PermissionException( 'No valid oauth request could be found.', 'The given request is not a valid oauth request.', APICodes.INVALID_OAUTH_REQUEST, 400 )
def setup_plugin(cls: Type[NvimPlugin], name: str, prefix: str, debug: bool) -> None: help = Helpers(cls, name, prefix) cls.name = name cls.prefix = prefix cls.debug = debug help.msg_cmd('show_log_info', ShowLogInfo) help.short_handler('log_level', command, cls.set_log_level) help.msg_fun('mapping', Mapping) help.name_handler('stage_1', command, cls.stage_1, sync=True) help.name_handler('stage_2', command, cls.stage_2, sync=True) help.name_handler('stage_3', command, cls.stage_3, sync=True) help.name_handler('stage_4', command, cls.stage_4, sync=True) help.name_handler('quit', command, cls.quit, sync=True) help.name_handler('rpc_handlers', function, cls.rpc_handlers, sync=True) help.name_handler('append_python_path', function, cls.append_python_path) help.name_handler('show_python_path', function, cls.show_python_path) help.name_handler('send', function, cls.send_message)
def __init__( self, name: str, desc: str, help: str, prefix: bool, tpe: Type[A], ctor: Callable[[A], B], default: Either[str, B], ) -> None: self.name = name self.desc = desc self.help = help self.prefix = prefix self.tpe = tpe self.ctor = ctor self.default = default
def __init__( self, name: str, prefix: Optional[str]=None, components: Map[str, Union[str, type]]=Map(), state_type: Optional[Type[S]]=None, state_ctor: Optional[Callable[['Config', NvimFacade], S]]=None, settings: Optional[Settings]=None, request_handlers: List[RequestHandler]=Nil, core_components: List[str]=Nil, default_components: List[str]=Nil ) -> None: self.name = name self.prefix = prefix or name self.components = components self.state_type = state_type or AutoData self.state_ctor = state_ctor or (lambda c, v: self.state_type(config=c, vim_facade=Just(v))) self.settings = settings or PluginSettings(name=name) self.request_handlers = RequestHandlers.cons(*request_handlers) self.core_components = core_components self.default_components = default_components
def test_Type(): from typing import Type, Any class A(object): pass class B(A): pass class C(B): pass class D(A): pass assert is_type(A, type) assert is_type(A, Type) assert is_type(A, Type[Any]) assert is_type(A, Type[object]) assert is_type(A, Type[A]) assert is_type(B, Type[A]) assert is_type(C, Type[A]) assert is_type(C, Type[B]) assert is_type(D, Type[A]) assert not is_type(A, Type[B]) assert not is_type(D, Type[B]) assert not is_type("str", Type) assert not is_type(None, Type[A])
def test_typing(): from typing import Any, List, Set, Dict, Type, Tuple assert name_type(Any) == "Any" assert name_type(List) == "List" assert name_type(List[Any]) == "List" assert name_type(List[str]) == "List[str]" assert name_type(List[int]) == "List[int]" assert name_type(Set) == "Set" assert name_type(Set[Any]) == "Set" assert name_type(Set[List]) == "Set[List]" assert name_type(Dict) == "Dict" assert name_type(Dict[Any, Any]) == "Dict" assert name_type(Dict[str, int]) == "Dict[str, int]" assert name_type(Type) == "Type" assert name_type(Type[int]) == "Type[int]" assert name_type(Type[MagicType]) == "Type[MagicType]" assert name_type(Tuple) == "Tuple" assert name_type(Tuple[int]) == "Tuple[int]" assert name_type(Tuple[int, str, List]) == "Tuple[int, str, List]" assert name_type(Tuple[int, Ellipsis]) == "Tuple[int, ...]" assert name_type(Tuple[str, Ellipsis]) == "Tuple[str, ...]"
def create_master_instance(self, gen_cls, lib_name, params, used_cell_names, **kwargs): # type: (Type[MasterType], str, Dict[str, Any], Set[str], **kwargs) -> MasterType """Create a new non-finalized master instance. This instance is used to determine if we created this instance before. Parameters ---------- gen_cls : Type[MasterType] the generator Python class. lib_name : str generated instance library name. params : Dict[str, Any] instance parameters dictionary. used_cell_names : Set[str] a set of all used cell names. **kwargs optional arguments for the generator. Returns ------- master : MasterType the non-finalized generated instance. """ raise NotImplementedError('not implemented')
def _import_class_from_str(class_str): # type: (str) -> Type """Given a Python class string, convert it to the Python class. Parameters ---------- class_str : str a Python class string/ Returns ------- py_class : class a Python class. """ sections = class_str.split('.') module_str = '.'.join(sections[:-1]) class_str = sections[-1] modul = importlib.import_module(module_str) return getattr(modul, class_str)
def make_user(self, user_data: dict, *, user_klass: typing.Type[UserType] = User, override_cache: bool = False) -> UserType: """ Creates a new user and caches it. :param user_data: The user data to use to create. :param user_klass: The type of user to create. :param override_cache: Should the cache be overridden? :return: A new :class`~.User` (hopefully). """ id = int(user_data.get("id", 0)) if id in self._users and not override_cache: return self._users[id] user = user_klass(self.client, **user_data) self._users[user.id] = user return user
def load_plugin(self, klass: typing.Type[Plugin], *args, module: str = None): """ Loads a plugin. .. note:: The client instance will automatically be provided to the Plugin's ``__init__``. :param klass: The plugin class to load. :param args: Any args to provide to the plugin. :param module: The module name provided with this plugin. Only used interally. """ # get the name and create the plugin object plugin_name = getattr(klass, "plugin_name", klass.__name__) instance = klass(self.client, *args) # call load, of course await instance.load() self.plugins[plugin_name] = instance if module is not None: self._module_plugins[module].append(instance) return instance
def _warn(self, msg, category: Type[Warning]=RuntimeWarning): if self._show_warnings: warnings.warn(msg, category)
def from_dict(cls: Type[T], dikt) -> T: """ Returns the dict as a model """ return deserialize_model(dikt, cls)
def __init__(self, username: str, password: str, botModule: str, botconfig: Mapping, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.username: str = username self.password: str = password module = importlib.import_module(botModule + '.bot') self.botCls: Type[Bot] = module.Bot # type: ignore self.botconfig: Mapping = botconfig self.conn: socketIO_client.SocketIO self.game: Optional[Game] = None
def __copy__(self) -> 'CardKnowledge': cls: Type[CardKnowledge] = self.__class__ result: CardKnowledge = cls.__new__(cls) result.__dict__.update(self.__dict__) result.cantBe = {c: self.cantBe[c][:] for c in self.bot.colors} return result
def __init__(self, connection: Any, variant: Variant, names: List[str], botPosition: int, botCls: Type['bot.Bot'], **kwargs) -> None: self.connection: Any = connection self.variant: Variant = variant self.numPlayers: int = len(names) self.botPosition: int = botPosition self.bot: bot.Bot self.bot = botCls(self, botPosition, names[botPosition], **kwargs) self.players: List[Player] = [self.bot.create_player(p, names[p]) for p in range(self.numPlayers)] self.turnCount: int = -1 self.deckCount: int = -1 self.scoreCount: int = 0 self.clueCount: int = 8 self.strikeCount: int = 0 self.currentPlayer: int = -1 self.deck: Dict[int, Card] = {} self.discards: List[int] = [] self.playedCards: Dict[Color, List[Card]] self.playedCards = {c: [] for c in variant.pile_colors} self.actionLog: List[str] = [] self._lastAction: Optional[Action] = None self._cardMoved: Optional[int] = None self._cardPosition: Optional[int] = None self._striked: bool = False
def __exit__(self, exc_type: Optional[Type[BaseException]], exc_value: Optional[Exception], traceback: Optional[TracebackType]) \ -> None: """Terminate the server and join the thread on exit.""" self.server.terminate() self.server.join()
def __init__(self, lattice: Type, arguments: Dict[str, Any]): """Create a stack of elements of a lattice. :param lattice: type of the lattice """ super().__init__() self._stack = [lattice(**arguments)]
def unsupported(type: Type[T]) -> UnsupportedError: return UnsupportedError("The type \"{type}\" is not supported by this DataSource!".format(type=type.__name__))
def provides(self): # type: Union[Iterable[Type[T]], Type[Any]] """The types of objects the data store provides.""" types = set() any_dispatch = False try: types.update(getattr(self.__class__, "get")._provides) any_dispatch = True except AttributeError: pass try: types.update(getattr(self.__class__, "get_many")._provides) any_dispatch = True except AttributeError: pass return types if any_dispatch else TYPE_WILDCARD
def get(self, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> T: """Gets a query from the data source. Args: query: The query being requested. context: The context for the extraction (mutable). Returns: The requested object. """ pass
def get_many(self, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Iterable[T]: """Gets a query from the data source, which contains a request for multiple objects. Args: query: The query being requested (contains a request for multiple objects). context: The context for the extraction (mutable). Returns: The requested objects. """ pass
def get_many(self, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Iterable[T]: try: sources = self._sources[type] except KeyError as error: raise DataSource.unsupported(type) from error for source in sources: try: return source.get_many(type, deepcopy(query), context) except NotFoundError: continue raise NotFoundError()
def get(self, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> T: try: sources = self._sources[type] except KeyError as error: raise DataSource.unsupported(type) from error for source in sources: try: return source.get(type, deepcopy(query), context) except NotFoundError: continue raise NotFoundError()
def unsupported(type: Type[T]) -> UnsupportedError: return UnsupportedError("The type \"{type}\" is not supported by this DataSink!".format(type=type.__name__))