我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Mapping()。
def __init__(self, username: str, password: str, botModule: str, botconfig: Mapping, numPlayers: int, variant: Variant, spectators: bool, gameName: str, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.username: str = username self.password: str = password module = importlib.import_module(botModule + '.bot') self.botCls: Type[Bot] = module.Bot # type: ignore self.botconfig: Mapping = botconfig self.numPlayers: int = numPlayers self.variant: Variant = variant self.spectators: bool = spectators self.gameName: str = gameName self.conn: socketIO_client.SocketIO self.tablePlayers: List[str] = [] self.readyToStart: bool = False self.game: Optional[Game] = None
def _get_or_create(cls, lang: str, client_token: Optional[str]=None, mounts: Optional[Iterable[str]]=None, envs: Optional[Mapping[str, str]]=None, max_mem: int=0, exec_timeout: int=0) -> str: if client_token: assert len(client_token) > 8 else: client_token = uuid.uuid4().hex resp = yield Request('POST', '/kernel/create', { 'lang': lang, 'clientSessionToken': client_token, 'config': { 'mounts': mounts, 'envs': envs, }, # 'limits': { # 'maxMem': max_mem, # 'execTimeout': exec_timeout, # }, }) data = resp.json() o = cls(data['kernelId']) # type: ignore o.created = data.get('created', True) # True is for legacy return o
def dispatch(method: Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]: dispatcher = singledispatch(method) provides = set() def wrapper(self: Any, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Any: call = dispatcher.dispatch(type) try: return call(self, query, context=context) except TypeError: raise DataSource.unsupported(type) def register(type: Type[T]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]: provides.add(type) return dispatcher.register(type) wrapper.register = register wrapper._provides = provides update_wrapper(wrapper, method) return wrapper
def testGETOpenRedirect(url: str, cookies: Mapping[str, str]) -> Optional[str]: """ If the given URL redirects when accessed with the given cookies via GET, return the new URL, otherwise return None """ driver = SeleniumDrivers.getFirefoxDriver() driver.setCookies(url, cookies) try: driver.get(url) time.sleep(config.timeout) if driver.current_url == url: driver.reset() return None else: url = driver.current_url driver.reset() return url except (TimeoutException, URLError): driver.reset() return None
def testPOSTOpenRedirect(url: str, cookies: Mapping[str, str], data: Mapping[str, str]) -> Optional[str]: """ If the given URL redirects when accessed with the given cookies via POST, return the new URL, otherwise return None """ driver = SeleniumDrivers.getFirefoxDriver() driver.setCookies(url, cookies) try: driver.post(url, data) time.sleep(config.timeout) if driver.current_url == url: driver.reset() return None else: url = driver.current_url driver.reset() return url except (TimeoutException, URLError): driver.reset() return None
def testGETXSSDriver(url: str, cookies: Mapping[str, str], driver: webdriver) -> Optional[str]: """ If the given URL pops an alert box when accessed with the given cookies, return the contents of the alert box, otherwise return None """ driver.setCookies(url, cookies) try: driver.get(url) WebDriverWait(driver, config.timeout).until(expected_conditions.alert_is_present()) # Note that despite the name switch_to_alert also handles prompt: # - http://selenium-python.readthedocs.io/navigating.html#popup-dialogs alert = driver.switch_to_alert() text = alert.text driver.reset() return text except (TimeoutException, URLError): driver.reset() return None
def testPOSTXSSDriver(url: str, cookies: Mapping[str, str], data: Mapping[str, str], driver: webdriver) -> \ Optional[str]: """ If the given URL pops an alert box when accessed with the given cookies, return the contents of the alert box, otherwise return None """ driver.setCookies(url, cookies) try: driver.post(url, data) WebDriverWait(driver, config.timeout).until(expected_conditions.alert_is_present()) # Note that despite the name switch_to_alert also handles prompt: # - http://selenium-python.readthedocs.io/navigating.html#popup-dialogs alert = driver.switch_to_alert() text = alert.text driver.reset() return text except (TimeoutException, URLError): driver.reset() return None
def postComment(id: str, vti: VulnTestInfo, internal=False, addStopMessage=False) -> Mapping: """ Post a comment to the report with the given ID using the information in the given VulnTestInfo - Set internal=True in order to post an internal comment - Set addStopMessage=True in order to add the stop message """ if config.DEBUG: print("Posting comment: internal=%s, reproduced=%s, id=%s" % (str(internal), str(vti.reproduced), id)) if addStopMessage: message = vti.message + '\n\n' + constants.disableMessage else: message = vti.message postMessage("Posting Message: \n\n%s" % message) # TODO: Delete this resp = requests.post('http://api:8080/v1/sendMessage', json={'message': message, 'internal': internal, 'id': id}, auth=HTTPBasicAuth('AutoTriageBot', secrets.apiBoxToken)) if config.triageOnReproduce and vti.reproduced: changeStatus(id, 'triaged') return resp.json()
def suggestPayoutGivenType(db: Mapping[str, BountyInfo], domains: List[str]) -> BountyInfo: """ Returns a BountyInfo containing a suggested payout and the std for the given report given the DB for that class of vulnerability""" if len(domains) == 0: return db['average'] sum = 0.0 stdSum = 0.0 # Not actually the std, but good enough™ cnt = 0 for domain in domains: try: sum += db[domain].average stdSum += db[domain].std cnt += 1 except KeyError: pass try: return BountyInfo(average=sum/cnt, std=stdSum/cnt) except ZeroDivisionError: return db['average']
def extractJson(message: str) -> Optional[Mapping]: """ Returns the first json blob found in the string if any are found """ # First pass that relies on it being in a code block for match in re.findall('\`\s*?{[\s\S]*?}\s*?\`', message): potJson = match[1:-1].strip() try: return json.loads(potJson) except ValueError: pass # Second pass doesn't require the code block, but it still uses the json parser for match in re.findall('{[\s\S]*}', message): try: return json.loads(match) except ValueError: pass # Third pass uses ast.literal_eval (which IS safe-it only evals literals) and some replacements to handle malformed # JSON. This is a horrible JSON parser and will incorrectly parse certain types of JSON, but it is far more # accepting so we might as well try doing this for match in re.findall('{[\s\S]*}', message): try: return fuzzyJsonParse(match) except (SyntaxError, ValueError): pass return None
def __call__(self, batch: Mapping[TensorPort, np.ndarray], goal_ports: List[TensorPort] = None) -> Mapping[TensorPort, np.ndarray]: """Runs a batch and returns values/outputs for specified goal ports. Args: batch: mapping from ports to values goal_ports: optional output ports, defaults to output_ports of this module will be returned Returns: A mapping from goal ports to tensors. """ goal_ports = goal_ports or self.output_ports feed_dict = self.convert_to_feed_dict(batch) goal_tensors = {p: self.tensors[p] for p in goal_ports if p in self.output_ports or p in self.training_output_ports} outputs = self.tf_session.run(goal_tensors, feed_dict) for p in goal_ports: if p not in outputs and p in batch: outputs[p] = batch[p] return outputs
def create_training_output(self, shared_resources: SharedResources, training_input_tensors: Mapping[TensorPort, tf.Tensor]) \ -> Mapping[TensorPort, tf.Tensor]: """ This function needs to be implemented in order to define how the module produces tensors only used during training given tensors corresponding to the ones defined by `training_input_ports`, which might include tensors corresponding to ports defined by `output_ports`. This sub-graph should only be created during training. Args: shared_resources: contains resources shared by modules, such as hyper-parameters or vocabularies. training_input_tensors: a mapping from training input tensorports to tensors. Returns: mapping from defined training output ports to their tensors. """ raise NotImplementedError
def __call__(self, batch: Mapping[TensorPort, np.ndarray], goal_ports: List[TensorPort] = None) -> Mapping[TensorPort, np.ndarray]: """Runs a batch and returns values/outputs for specified goal ports. Args: batch: mapping from ports to values goal_ports: optional output ports, defaults to output_ports of this module will be returned Returns: A mapping from goal ports to tensors. """ goal_ports = goal_ports or self.output_ports inputs = [p.create_torch_variable(batch.get(p), gpu=torch.cuda.device_count() > 0) for p in self.input_ports] outputs = self.prediction_module.forward(*inputs) ret = {p: p.torch_to_numpy(t) for p, t in zip(self.output_ports, outputs) if p in goal_ports} for p in goal_ports: if p not in ret and p in batch: ret[p] = batch[p] return ret
def __get__(self, obj, cls: typing.Optional[type]=None): if obj is None: return self default, expression = self.get_raw_value(obj) if not default: if not isinstance(expression, collections.abc.Mapping): raise ConfigTypeError( '{0!r} field must be a mapping, not {1}'.format( self.key, typing._type_repr(type(expression)) ) ) elif 'class' not in expression: raise ConfigValueError( '{0!r} field lacks "class" field'.format(self.key) ) value = self.evaluate(expression) self.typecheck(value) return value
def evaluate(self, expression) -> object: if not isinstance(expression, collections.abc.Mapping): return expression try: import_path = expression['class'] except KeyError: return expression f = self.import_(import_path) args = expression.get('*', ()) if isinstance(args, str) or \ not isinstance(args, collections.abc.Sequence): raise ConfigValueError( '"*" field must be a list, not ' + repr(args) ) kw = {k: v for k, v in expression.items() if k not in ('class', '*')} if self.recurse: args = map(self.evaluate, args) kw = {k: self.evaluate(v) for k, v in kw.items()} return f(*args, **kw)
def worker_config(self) -> typing.Mapping[str, object]: """(:class:`typing.Mapping`\ [:class:`str`, :class:`object`]) The configuration maping for worker that will go to :attr:`Celery.conf <celery.Celery.conf>`. """ raw_config = self.config.get('worker', {}) if isinstance(raw_config, collections.abc.Mapping): celery_config = {k.upper(): v for k, v in raw_config.items()} else: celery_config = {} celery_config.update( BROKER_URL=self.worker_broker_url, CELERY_RESULT_BACKEND=self.worker_result_backend, CELERYBEAT_SCHEDULE=self.worker_schedule ) return celery_config
def sorted_dict(value): # type: (Mapping) -> Any """ Sorts a dict's keys to avoid leaking information about the backend's handling of unordered dicts. """ if isinstance(value, Mapping): return OrderedDict( (key, sorted_dict(value[key])) for key in sorted(iterkeys(value)) ) elif isinstance(value, Sequence) and not isinstance(value, string_types): return list(map(sorted_dict, value)) else: return value
def __init__( self, *, cache_sketches: bool=True, source_encoding: str="utf-8", custom_escape_fns: Mapping[str, Callable[[Any], str]]={}) -> None: self._source_encoding = source_encoding escape_fns = escaping.builtin_escape_fns.copy() if custom_escape_fns: escape_fns.update(custom_escape_fns) self._escape_fns = types.MappingProxyType(escape_fns) self._stmt_classes = list(statements.builtin_stmt_classes) class OutputStmt(statements.BaseOutput): _filter_fn_names = list(self.escape_fns.keys()) self._stmt_classes.append(OutputStmt) self._cache_sketches = cache_sketches
def gfa2_line_to_la(reads: Mapping[str, Read]): def mapper(line: str): if not line.startswith('E'): raise ValueError('Given GFA2 line is not an edge.') sid1, sid2, arange, brange, alignment, tags = gfa2_parse_edge(line) a_read = reads[sid1[:-1]] b_read = reads[sid2[:-1]] return LocalAlignment( a_read.with_orientation(sid1[-1]), b_read.with_orientation(sid2[-1]), arange, brange, alignment) return mapper
def parse_data(data, measurement=None, tag_columns=None, **extra_tags): """Converts input data into line protocol format""" if isinstance(data, bytes): return data elif isinstance(data, str): return data.encode('utf-8') elif isinstance(data, pd.DataFrame): if measurement is None: raise ValueError("Missing 'measurement'") return parse_df(data, measurement, tag_columns, **extra_tags) elif isinstance(data, Mapping): return make_line(data, measurement, **extra_tags) elif isinstance(data, Iterable): return b'\n'.join([parse_data(i, measurement, tag_columns, **extra_tags) for i in data]) else: raise ValueError('Invalid input', data)
def __getitem__(self, key: str) -> Any: node = self.mapping leafs = key.split(".") for i, leaf in enumerate(leafs): if not isinstance(node, c_abc.Mapping): raise KeyError(f"Element {'.'.join(leafs[:i])!r} is not a mapping") if not leaf: raise KeyError(f"Empty sub-key after {'.'.join(leafs[:i])!r}") if leaf not in node: break node = node[leaf] else: return node raise KeyError(f"Cannot find '{key}'")
def _parse_servers(self, mapping: Mapping) -> List[Server]: servers = [] servers_conf = mapping.get('servers') if not servers_conf: raise ConfigurationError(f"Network {self.name!r} has no servers") if not isinstance(servers_conf, list): raise ConfigurationError(f"Servers of Network {self.name!r} are not a list") for server_conf in mapping.get('servers', ()): if isinstance(server_conf, str): server = Server.from_string(server_conf) else: server = Server.with_optional_port(**server_conf) servers.append(server) else: return servers
def execute(self, sql: str, params: typing.Mapping[str, typing.Any] = None): """ Executes SQL inside the transaction. :param sql: The SQL to execute. :param params: The parameters to excecute with. """ # re-paramatarize the query logger.debug("Executing query {} with params {}".format(sql, params)) query, params = get_param_query(sql, params) try: results = await self.acquired_connection.execute(query, *params) except (asyncpg.IntegrityConstraintViolationError, asyncpg.exceptions.NotNullViolationError) as e: raise IntegrityError(*e.args) from e except asyncpg.ObjectNotInPrerequisiteStateError as e: raise OperationalError(*e.args) from e except (asyncpg.SyntaxOrAccessError, asyncpg.InFailedSQLTransactionError) as e: raise DatabaseException(*e.args) from e return results
def execute(self, sql: str, params: typing.Union[typing.Mapping, typing.Iterable] = None): """ Executes SQL in the current transaction. """ # lock to ensure nothing else is using the connection at once logger.debug("Running SQL {} with params {}".format(sql, params)) async with self._lock: async with threadpool(): for stmt in separate_statements(sql): try: if params is None: res = self.connection.execute(stmt) else: res = self.connection.execute(stmt, params) except sqlite3.IntegrityError as e: raise IntegrityError(*e.args) except sqlite3.OperationalError as e: raise DatabaseException(*e.args) return res
def cursor(self, sql: str, params: typing.Union[typing.Mapping, typing.Iterable] = None) \ -> 'Sqlite3ResultSet': """ Gets a cursor for the specified SQL. """ logger.debug("Running SQL {} with params {}".format(sql, params)) async with self._lock: async with threadpool(): for stmt in separate_statements(sql): cur = self.connection.cursor() try: if params is None: cur.execute(stmt) else: cur.execute(stmt, params) except sqlite3.OperationalError as e: raise DatabaseException(*e.args) return Sqlite3ResultSet(cur)
def map_many(self, *rows: typing.Mapping[str, typing.Any]): """ Maps many records to one row. This will group the records by the primary key of the main query table, then add additional columns as appropriate. """ # this assumes that the rows come in grouped by PK on the left # also fuck right joins # get the first row and construct the first table row using map_columns # this will also map any extra relationship data there first_row = rows[0] tbl_row = self.map_columns(first_row) # loop over every "extra" rows # and update the relationship data in the table for runon_row in rows[1:]: tbl_row._update_relationships(runon_row) pass return tbl_row # Helper methods for natural builder-style queries
def __init__( self, params: t.Mapping[str, str], lti_provider: models.LTIProvider = None ) -> None: self.launch_params = params if lti_provider is not None: self.lti_provider = lti_provider else: lti_id = params['lti_provider_id'] self.lti_provider = helpers.get_or_404( models.LTIProvider, lti_id, ) self.key = self.lti_provider.key self.secret = self.lti_provider.secret # TODO support more than just flask
def __init__( self, operation: str = None, score: str = None, result_data: t.Mapping[str, str] = None, message_identifier: str = None, lis_outcome_service_url: str = None, lis_result_sourcedid: str = None, consumer_key: str = None, consumer_secret: str = None, post_request: t.Any = None ) -> None: self.operation = operation self.score = score self.result_data = result_data self.outcome_response = None # type: t.Optional['OutcomeResponse'] self.message_identifier = message_identifier self.lis_outcome_service_url = lis_outcome_service_url self.lis_result_sourcedid = lis_result_sourcedid self.consumer_key = consumer_key self.consumer_secret = consumer_secret self.post_request = post_request
def get_all_permissions(self) -> t.Mapping[str, bool]: """Get all course permissions (:class:`Permission`) for this role. :returns: A name boolean mapping where the name is the name of the permission and the value indicates if this user has this permission. """ perms: t.Sequence[Permission] = ( Permission.query. filter_by( # type: ignore course_permission=False ).all() ) result: t.MutableMapping[str, bool] = {} for perm in perms: if perm.name in self._permissions: result[perm.name] = not perm.default_value else: result[perm.name] = perm.default_value return result
def __to_json__(self) -> t.Mapping[str, t.Any]: """Creates a JSON serializable representation of this object. This object will look like this: .. code:: python { 'id': int, # The id of this user. 'name': str, # The full name of this user. 'email': str, # The email of this user. 'username': str, # The username of this user. } :returns: An object as described above. """ return { 'id': self.id, 'name': self.name, 'email': self.email, 'username': self.username, }
def __extended_to_json__(self) -> t.Mapping[str, t.Any]: """Create a extended JSON serializable representation of this object. This object will look like this: .. code:: python { 'hidden': bool, # indicating if this user can once # see hidden assignments. **self.__to_json__() } :returns: A object as described above. """ return { "hidden": self.can_see_hidden, **self.__to_json__(), }
def __to_json__(self) -> t.Mapping[str, t.Any]: """Creates a JSON serializable representation of this object. This object will look like this: .. code:: python { 'name': str, # The name of the course, 'id': int, # The id of this course. 'created_at': str, # ISO UTC date. 'is_lti': bool, # Is the this course a LTI course, } :returns: A object as described above. """ return { 'id': self.id, 'name': self.name, 'created_at': self.created_at.isoformat(), 'is_lti': self.lti_course_id is not None, }
def __to_json__(self) -> t.Mapping[str, t.Any]: """Converts a rubric of a work to a object that is JSON serializable. The resulting object will look like this: .. code:: python { 'changed_at': str, # The date the history was added. 'is_rubric': bool, # Was this history items added by a rubric # grade. 'grade': float, # The new grade, -1 if the grade was deleted. 'passed_back': bool, # Is this grade given back to LTI. 'user': User, # The user that added this grade. } :returns: A object as described above. """ return { 'changed_at': self.changed_at.isoformat(), 'is_rubric': self.is_rubric, 'grade': self.grade, 'passed_back': self.passed_back, 'user': self.user, }
def __to_json__(self) -> t.Mapping[str, t.Union[str, bool, int]]: """Creates a JSON serializable representation of this object. This object will look like this: .. code:: python { 'name': str, # The name of the file or directory. 'id': int, # The id of this file. 'is_directory': bool, # Is this file a directory. } :returns: A object as described above. """ return { 'name': self.name, 'is_directory': self.is_directory, 'id': self.id, }
def __to_json__(self) -> t.Mapping[str, t.Any]: """Returns the JSON serializable representation of this class. This representation also returns a count of the :class:`LinterState` of the attached :class:`LinterInstance` objects. :returns: A dict containing JSON serializable representations of the attributes and the test state counts of this AssignmentLinter. """ return { 'done': self.linters_done, 'working': self.linters_running, 'crashed': self.linters_crashed, 'id': self.id, 'name': self.name, }
def second_phase_lti_launch( ) -> helpers.JSONResponse[t.Mapping[str, t.Union[str, models.Assignment, bool]] ]: launch_params = jwt.decode( flask.request.headers.get('Jwt', None), app.config['LTI_SECRET_KEY'], algorithm='HS512' )['params'] lti = CanvasLTI(launch_params) user, new_token = lti.ensure_lti_user() course = lti.get_course() assig = lti.get_assignment(user) lti.set_user_role(user) new_role_created = lti.set_user_course_role(user, course) db.session.commit() result: t.Mapping[str, t.Union[str, models.Assignment, bool]] result = {'assignment': assig, 'new_role_created': new_role_created} if new_token is not None: result['access_token'] = new_token return helpers.jsonify(result)
def about( ) -> JSONResponse[t.Mapping[str, t.Union[str, t.Mapping[str, bool]]]]: """Get the version and features of the currently running instance. .. :quickref: About; Get the version and features. :>json string version: The version of the running instance. :>json object features: A mapping from string to a boolean for every feature indicating if the current instance has it enabled. :returns: The mapping as described above. """ features = { key: bool(value) for key, value in psef.app.config['FEATURES'].items() } return jsonify( { 'version': psef.app.config['_VERSION'], 'features': features, }, )
def get_rubric(submission_id: int) -> JSONResponse[t.Mapping[str, t.Any]]: """Return full rubric of the :class:`.models.Assignment` of the given submission (:class:`.models.Work`). .. :quickref: Submission; Get a rubric and its selected items. :param int submission_id: The id of the submission :returns: A response containing the JSON serialized rubric as described in :py:meth:`.Work.__rubric_to_json__`. :raises APIException: If the submission with the given id does not exist. (OBJECT_ID_NOT_FOUND) :raises PermissionException: If there is no logged in user. (NOT_LOGGED_IN) :raises PermissionException: If the user can not see the assignment of the given submission. (INCORRECT_PERMISSION) """ work = helpers.get_or_404(models.Work, submission_id) auth.ensure_permission('can_see_assignments', work.assignment.course_id) return jsonify(work.__rubric_to_json__())
def update_dict(initial: JSON, other: Mapping) -> JSON: """Recursively update a dictionary. :param initial: Dict to update. :type initial: dict or list :param other: Dict to update from. :type other: Mapping :return: Updated dict. :rtype: dict """ for key, value in other.items(): if isinstance(value, collections.Mapping): r = update_dict(initial.get(key, {}), value) initial[key] = r else: initial[key] = other[key] return initial
def load_params(fname: str) -> Tuple[Dict[str, mx.nd.NDArray], Dict[str, mx.nd.NDArray]]: """ Loads parameters from a file. :param fname: The file containing the parameters. :return: Mapping from parameter names to the actual parameters for both the arg parameters and the aux parameters. """ save_dict = mx.nd.load(fname) arg_params = {} aux_params = {} for k, v in save_dict.items(): tp, name = k.split(':', 1) if tp == 'arg': arg_params[name] = v if tp == 'aux': aux_params[name] = v return arg_params, aux_params
def get_Generic_parameters(tp, generic_supertype): """tp must be a subclass of generic_supertype. Retrieves the type values from tp that correspond to parameters defined by generic_supertype. E.g. get_Generic_parameters(tp, typing.Mapping) is equivalent to get_Mapping_key_value(tp) except for the error message. Note that get_Generic_itemtype(tp) is not exactly equal to get_Generic_parameters(tp, typing.Container), as that method additionally contains treatment for typing.Tuple and typing.Iterable. """ try: res = _select_Generic_superclass_parameters(tp, generic_supertype) except TypeError: res = None if res is None: raise TypeError("%s has no proper parameters defined by %s."% (type_str(tp), type_str(generic_supertype))) else: return tuple(res)
def from_vocab(cls, sequences: Map[int, Seq[H]], vocab: Vocabulary, max_len: int, pack_sequences: bool=False, append_eos: bool=True, eos_token: Opt[H]=DEFAULT_EOS, null_token: H=DEFAULT_NULL, int_id_type: str='long', shuffle: bool=True): """ :param vocab: instance of Vocabulary to use for encoding/decoding tokens :param max_len: maximum length of sequences to sample :param pack_sequences: bool indicating whether to return regular Tensors or PackedSequence instances. :param int_id_type: string indicating the type of int ids to use. Must be a key of data.str_to_int_tensor_type. :param eos_token: string or hashable to append to mark end-of-sequence in encoding :param null_token: Optional hashable to use for padding sequences. Added to the vocab, unless none is passed and none is built, in which case this is considered to be an int id. Numpy aliases for integer types are valid, as well as 'long', 'short', 'byte', 'char'. The default 'long' is recommended, as only LongTensors can be used to index Embeddings in pytorch. """ encoder = SequenceTensorEncoder(vocab, append_eos=append_eos, eos_token=eos_token, null_token=null_token, int_id_type=int_id_type) return cls(sequences=sequences, encoder=encoder, max_len=max_len, pack_sequences=pack_sequences, null_token=null_token, shuffle=shuffle)
def from_token2id(cls, sequences: Map[int, Seq[H]], token2id: Dict[H, int], max_len: int, pack_sequences: bool=False, append_eos: bool=True, eos_token: Opt[H]=DEFAULT_EOS, null_token: H=DEFAULT_NULL, oov_token: H=DEFAULT_OOV, int_id_type: str='long', shuffle: bool=True): """ :param token2id: mapping of tokens to int ids :param max_len: maximum length of sequences to sample :param pack_sequences: bool indicating whether to return regular Tensors or PackedSequence instances. :param int_id_type: string indicating the type of int ids to use. Must be a key of data.str_to_int_tensor_type. :param oov_token: hashable to insert for out-of-vocab tokens when encoding :param eos_token: string or hashable to append to mark end-of-sequence in encoding :param null_token: Optional hashable to use for padding sequences. Added to the vocab, unless none is passed and none is built, in which case this is considered to be an int id. Numpy aliases for integer types are valid, as well as 'long', 'short', 'byte', 'char'. The default 'long' is recommended, as only LongTensors can be used to index Embeddings in pytorch. """ vocab = Vocabulary.from_token2id(token2id, oov_token=oov_token) encoder = SequenceTensorEncoder(vocab, append_eos=append_eos, eos_token=eos_token, null_token=null_token, int_id_type=int_id_type) return cls(sequences=sequences, encoder=encoder, max_len=max_len, pack_sequences=pack_sequences, null_token=null_token, shuffle=shuffle)
def from_id2token(cls, sequences: Map[int, Seq[H]], id2token: Dict[H, int], max_len: int, pack_sequences: bool=False, append_eos: bool=True, eos_token: Opt[H]=DEFAULT_EOS, null_token: H=DEFAULT_NULL, oov_token: H=DEFAULT_OOV, int_id_type: str='long', shuffle: bool=True): """ :param id2token: mapping of int ids to tokens :param max_len: maximum length of sequences to sample :param pack_sequences: bool indicating whether to return regular Tensors or PackedSequence instances. :param int_id_type: string indicating the type of int ids to use. Must be a key of data.str_to_int_tensor_type. :param oov_token: hashable to insert for out-of-vocab tokens when encoding :param eos_token: hashable to append to mark end-of-sequence in encoding :param null_token: hashable to use for padding sequences. Added to the vocab, unless none is passed and none is built, in which case this is considered to be an int id. Numpy aliases for integer types are valid, as well as 'long', 'short', 'byte', 'char'. The default 'long' is recommended, as only LongTensors can be used to index Embeddings in pytorch. """ vocab = Vocabulary.from_id2token(id2token, oov_token=oov_token) encoder = SequenceTensorEncoder(vocab, append_eos=append_eos, eos_token=eos_token, null_token=null_token, int_id_type=int_id_type) return cls(sequences=sequences, encoder=encoder, max_len=max_len, pack_sequences=pack_sequences, null_token=null_token, shuffle=shuffle)
def validate_type(data, type_): instance_check = False abstract_types = {typing.AbstractSet, typing.Sequence, typing.Mapping} if hasattr(type_, '__origin__') and type_.__origin__ in abstract_types: param_type = get_abstract_param_types(type_) imp_types = { typing.AbstractSet: collections.Set, typing.Sequence: collections.Sequence, typing.Mapping: collections.Mapping, } instance_check = isinstance(data, imp_types[type_.__origin__]) and \ all(isinstance(item, param_type[0]) for item in data) else: try: instance_check = isinstance(data, type_) except TypeError: if is_union_type(type_): instance_check = any( isinstance(data, t) for t in get_union_types(type_) ) else: raise ValueError('{!r} cannot validated.'.format(type_)) return instance_check
def deserialize_abstract_type(cls, data): abstract_type_map = { typing.Sequence: list, typing.List: list, typing.Dict: dict, typing.Set: set, typing.AbstractSet: set, } cls_origin_type = cls.__origin__ if cls_origin_type is None: cls_origin_type = cls iterable_types = { typing.Sequence, typing.List, typing.Tuple, typing.Set, typing.AbstractSet, typing.Mapping, } if cls_origin_type in iterable_types: return deserialize_iterable_abstract_type(cls, cls_origin_type, data) else: return abstract_type_map[cls_origin_type](data)
def __call__(self, client: Client, datatype: str, datavalue: Mapping[str, object]) -> object: try: type_ = datavalue['type'] except KeyError: raise DatavalueError('no "type" specified', datavalue) assert isinstance(type_, str) if 'value' not in datavalue: raise DatavalueError('no "value" field', datavalue) method_name = '{}__{}'.format(datatype, type_).replace('-', '_') method = getattr(self, method_name, None) if callable(method): return method(client, datavalue) method_name = type_.replace('-', '_') method = getattr(self, method_name, None) if callable(method): return method(client, datavalue) raise DatavalueError('{!r} is unsupported type'.format(type_), datavalue)
def label_map(self) -> Optional[Mapping[str, float]]: """ Returns the mapping of nominal labels to numeric labels for the data set. This method delegates to the selected parser. Returns ------- map of str to int The mapping of nominal labels to numeric labels Raises ------ IOError If the data set cannot be parsed """ if not self.can_parse(): raise IOError("unable to parse data set at {}".format(self._basedir)) return self._parser.label_map
def do_explode(self, kind): if kind in basic_types or type(kind) is typing.TypeVar: return False if not issubclass(kind, (typing.Sequence, typing.Mapping)): self.clear() self.extend(Args(kind)) return True return False
def __init__(self, username: str, password: str, botModule: str, botconfig: Mapping, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.username: str = username self.password: str = password module = importlib.import_module(botModule + '.bot') self.botCls: Type[Bot] = module.Bot # type: ignore self.botconfig: Mapping = botconfig self.conn: socketIO_client.SocketIO self.game: Optional[Game] = None