Python typing 模块,Mapping() 实例源码


项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def __init__(self,
                 username: str,
                 password: str,
                 botModule: str,
                 botconfig: Mapping,
                 numPlayers: int,
                 variant: Variant,
                 spectators: bool,
                 gameName: str,
                 **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.username: str = username
        self.password: str = password
        module = importlib.import_module(botModule + '.bot')
        self.botCls: Type[Bot] = module.Bot  # type: ignore
        self.botconfig: Mapping = botconfig
        self.numPlayers: int = numPlayers
        self.variant: Variant = variant
        self.spectators: bool = spectators
        self.gameName: str = gameName
        self.conn: socketIO_client.SocketIO
        self.tablePlayers: List[str] = []
        self.readyToStart: bool = False Optional[Game] = None
项目    作者:lablup    | 项目源码 | 文件源码
def _get_or_create(cls, lang: str,
                       client_token: Optional[str]=None,
                       mounts: Optional[Iterable[str]]=None,
                       envs: Optional[Mapping[str, str]]=None,
                       max_mem: int=0, exec_timeout: int=0) -> str:
        if client_token:
            assert len(client_token) > 8
            client_token = uuid.uuid4().hex
        resp = yield Request('POST', '/kernel/create', {
            'lang': lang,
            'clientSessionToken': client_token,
            'config': {
                'mounts': mounts,
                'envs': envs,
            # 'limits': {
            #     'maxMem': max_mem,
            #     'execTimeout': exec_timeout,
            # },
        data = resp.json()
        o = cls(data['kernelId'])  # type: ignore
        o.created = data.get('created', True)  # True is for legacy
        return o
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def dispatch(method: Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
        dispatcher = singledispatch(method)
        provides = set()

        def wrapper(self: Any, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Any:
            call = dispatcher.dispatch(type)
                return call(self, query, context=context)
            except TypeError:
                raise DataSource.unsupported(type)

        def register(type: Type[T]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]:
            return dispatcher.register(type)

        wrapper.register = register
        wrapper._provides = provides
        update_wrapper(wrapper, method)
        return wrapper
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def testGETOpenRedirect(url: str, cookies: Mapping[str, str]) -> Optional[str]:
    """ If the given URL redirects when accessed with the given cookies via GET, return the new URL, otherwise
        return None """
    driver = SeleniumDrivers.getFirefoxDriver()
    driver.setCookies(url, cookies)



        if driver.current_url == url:
            return None
            url = driver.current_url
            return url
    except (TimeoutException, URLError):
        return None
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def testPOSTOpenRedirect(url: str, cookies: Mapping[str, str], data: Mapping[str, str]) -> Optional[str]:
    """ If the given URL redirects when accessed with the given cookies via POST, return the new URL, otherwise
        return None """
    driver = SeleniumDrivers.getFirefoxDriver()
    driver.setCookies(url, cookies)
    try:, data)


        if driver.current_url == url:
            return None
            url = driver.current_url
            return url
    except (TimeoutException, URLError):
        return None
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def testGETXSSDriver(url: str, cookies: Mapping[str, str], driver: webdriver) -> Optional[str]:
    """ If the given URL pops an alert box when accessed with the given cookies, return the contents of the alert box,
        otherwise return None """
    driver.setCookies(url, cookies)


        WebDriverWait(driver, config.timeout).until(expected_conditions.alert_is_present())
        # Note that despite the name switch_to_alert also handles prompt:
        #   -
        alert = driver.switch_to_alert()
        text = alert.text
        return text
    except (TimeoutException, URLError):
        return None
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def testPOSTXSSDriver(url: str, cookies: Mapping[str, str], data: Mapping[str, str], driver: webdriver) -> \
    """ If the given URL pops an alert box when accessed with the given cookies, return the contents of the alert box,
        otherwise return None """
    driver.setCookies(url, cookies)

    try:, data)

        WebDriverWait(driver, config.timeout).until(expected_conditions.alert_is_present())
        # Note that despite the name switch_to_alert also handles prompt:
        #   -
        alert = driver.switch_to_alert()
        text = alert.text
        return text
    except (TimeoutException, URLError):
        return None
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def postComment(id: str, vti: VulnTestInfo, internal=False, addStopMessage=False) -> Mapping:
    """ Post a comment to the report with the given ID using the information in the given VulnTestInfo
          - Set internal=True in order to post an internal comment
          - Set addStopMessage=True in order to add the stop message """
    if config.DEBUG:
        print("Posting comment: internal=%s, reproduced=%s, id=%s" % (str(internal), str(vti.reproduced), id))

    if addStopMessage:
        message = vti.message + '\n\n' + constants.disableMessage
        message = vti.message

    postMessage("Posting Message: \n\n%s" % message)  # TODO: Delete this

    resp ='http://api:8080/v1/sendMessage',
                         json={'message': message, 'internal': internal, 'id': id},
                         auth=HTTPBasicAuth('AutoTriageBot', secrets.apiBoxToken))

    if config.triageOnReproduce and vti.reproduced:
        changeStatus(id, 'triaged')

    return resp.json()
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def suggestPayoutGivenType(db: Mapping[str, BountyInfo], domains: List[str]) -> BountyInfo:
    """ Returns a BountyInfo containing a suggested payout and the std for the given report given the DB
        for that class of vulnerability"""
    if len(domains) == 0:
        return db['average']
    sum = 0.0
    stdSum = 0.0  # Not actually the std, but good enough™
    cnt = 0
    for domain in domains:
            sum += db[domain].average
            stdSum += db[domain].std
            cnt += 1
        except KeyError:
        return BountyInfo(average=sum/cnt, std=stdSum/cnt)
    except ZeroDivisionError:
        return db['average']
项目:AutoTriageBot    作者:salesforce    | 项目源码 | 文件源码
def extractJson(message: str) -> Optional[Mapping]:
    """ Returns the first json blob found in the string if any are found """
    # First pass that relies on it being in a code block
    for match in re.findall('\`\s*?{[\s\S]*?}\s*?\`', message):
        potJson = match[1:-1].strip()
            return json.loads(potJson)
        except ValueError:
    # Second pass doesn't require the code block, but it still uses the json parser
    for match in re.findall('{[\s\S]*}', message):
            return json.loads(match)
        except ValueError:
    # Third pass uses ast.literal_eval (which IS safe-it only evals literals) and some replacements to handle malformed
    # JSON. This is a horrible JSON parser and will incorrectly parse certain types of JSON, but it is far more
    # accepting so we might as well try doing this
    for match in re.findall('{[\s\S]*}', message):
            return fuzzyJsonParse(match)
        except (SyntaxError, ValueError):
    return None
项目:jack    作者:uclmr    | 项目源码 | 文件源码
def __call__(self, batch: Mapping[TensorPort, np.ndarray],
                 goal_ports: List[TensorPort] = None) -> Mapping[TensorPort, np.ndarray]:
        """Runs a batch and returns values/outputs for specified goal ports.
            batch: mapping from ports to values
            goal_ports: optional output ports, defaults to output_ports of this module will be returned

            A mapping from goal ports to tensors.
        goal_ports = goal_ports or self.output_ports
        feed_dict = self.convert_to_feed_dict(batch)
        goal_tensors = {p: self.tensors[p] for p in goal_ports
                        if p in self.output_ports or p in self.training_output_ports}
        outputs =, feed_dict)

        for p in goal_ports:
            if p not in outputs and p in batch:
                outputs[p] = batch[p]

        return outputs
项目:jack    作者:uclmr    | 项目源码 | 文件源码
def create_training_output(self, shared_resources: SharedResources,
                               training_input_tensors: Mapping[TensorPort, tf.Tensor]) \
            -> Mapping[TensorPort, tf.Tensor]:
        This function needs to be implemented in order to define how the module produces tensors only used
        during training given tensors corresponding to the ones defined by `training_input_ports`, which might include
        tensors corresponding to ports defined by `output_ports`. This sub-graph should only be created during training.

            shared_resources: contains resources shared by modules, such as hyper-parameters or vocabularies.
            training_input_tensors: a mapping from training input tensorports to tensors.

            mapping from defined training output ports to their tensors.
        raise NotImplementedError
项目:jack    作者:uclmr    | 项目源码 | 文件源码
def __call__(self, batch: Mapping[TensorPort, np.ndarray],
                 goal_ports: List[TensorPort] = None) -> Mapping[TensorPort, np.ndarray]:
        """Runs a batch and returns values/outputs for specified goal ports.
            batch: mapping from ports to values
            goal_ports: optional output ports, defaults to output_ports of this module will be returned

            A mapping from goal ports to tensors.
        goal_ports = goal_ports or self.output_ports
        inputs = [p.create_torch_variable(batch.get(p), gpu=torch.cuda.device_count() > 0) for p in self.input_ports]
        outputs = self.prediction_module.forward(*inputs)
        ret = {p: p.torch_to_numpy(t) for p, t in zip(self.output_ports, outputs) if p in goal_ports}
        for p in goal_ports:
            if p not in ret and p in batch:
                ret[p] = batch[p]
        return ret
项目:settei    作者:spoqa    | 项目源码 | 文件源码
def __get__(self, obj, cls: typing.Optional[type]=None):
        if obj is None:
            return self
        default, expression = self.get_raw_value(obj)
        if not default:
            if not isinstance(expression,
                raise ConfigTypeError(
                    '{0!r} field must be a mapping, not {1}'.format(
                        self.key, typing._type_repr(type(expression))
            elif 'class' not in expression:
                raise ConfigValueError(
                    '{0!r} field lacks "class" field'.format(self.key)
            value = self.evaluate(expression)
        return value
项目:settei    作者:spoqa    | 项目源码 | 文件源码
def evaluate(self, expression) -> object:
        if not isinstance(expression,
            return expression
            import_path = expression['class']
        except KeyError:
            return expression
        f = self.import_(import_path)
        args = expression.get('*', ())
        if isinstance(args, str) or \
           not isinstance(args,
            raise ConfigValueError(
                '"*" field must be a list, not ' + repr(args)
        kw = {k: v for k, v in expression.items() if k not in ('class', '*')}
        if self.recurse:
            args = map(self.evaluate, args)
            kw = {k: self.evaluate(v) for k, v in kw.items()}
        return f(*args, **kw)
项目:settei    作者:spoqa    | 项目源码 | 文件源码
def worker_config(self) -> typing.Mapping[str, object]:
        """(:class:`typing.Mapping`\ [:class:`str`, :class:`object`])
        The configuration maping for worker that will go to :attr:`Celery.conf

        raw_config = self.config.get('worker', {})
        if isinstance(raw_config,
            celery_config = {k.upper(): v for k, v in raw_config.items()}
            celery_config = {}
        return celery_config
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def sorted_dict(value):
    # type: (Mapping) -> Any
    Sorts a dict's keys to avoid leaking information about the
    backend's handling of unordered dicts.
    if isinstance(value, Mapping):
        return OrderedDict(
            (key, sorted_dict(value[key]))
                for key in sorted(iterkeys(value))

    elif isinstance(value, Sequence) and not isinstance(value, string_types):
        return list(map(sorted_dict, value))

        return value
项目:sketchbook    作者:futursolo    | 项目源码 | 文件源码
def __init__(
        self, *, cache_sketches: bool=True,
        source_encoding: str="utf-8",
            custom_escape_fns: Mapping[str, Callable[[Any], str]]={}) -> None:

        self._source_encoding = source_encoding

        escape_fns = escaping.builtin_escape_fns.copy()
        if custom_escape_fns:
        self._escape_fns = types.MappingProxyType(escape_fns)

        self._stmt_classes = list(statements.builtin_stmt_classes)

        class OutputStmt(statements.BaseOutput):
            _filter_fn_names = list(self.escape_fns.keys())


        self._cache_sketches = cache_sketches
项目:phasm    作者:AbeelLab    | 项目源码 | 文件源码
def gfa2_line_to_la(reads: Mapping[str, Read]):
    def mapper(line: str):
        if not line.startswith('E'):
            raise ValueError('Given GFA2 line is not an edge.')

        sid1, sid2, arange, brange, alignment, tags = gfa2_parse_edge(line)

        a_read = reads[sid1[:-1]]
        b_read = reads[sid2[:-1]]

        return LocalAlignment(
            arange, brange, alignment)

    return mapper
项目:aioinflux    作者:plugaai    | 项目源码 | 文件源码
def parse_data(data, measurement=None, tag_columns=None, **extra_tags):
    """Converts input data into line protocol format"""
    if isinstance(data, bytes):
        return data
    elif isinstance(data, str):
        return data.encode('utf-8')
    elif isinstance(data, pd.DataFrame):
        if measurement is None:
            raise ValueError("Missing 'measurement'")
        return parse_df(data, measurement, tag_columns, **extra_tags)
    elif isinstance(data, Mapping):
        return make_line(data, measurement, **extra_tags)
    elif isinstance(data, Iterable):
        return b'\n'.join([parse_data(i, measurement, tag_columns, **extra_tags) for i in data])
        raise ValueError('Invalid input', data)
项目:shanghai    作者:chireiden    | 项目源码 | 文件源码
def __getitem__(self, key: str) -> Any:
        node = self.mapping
        leafs = key.split(".")

        for i, leaf in enumerate(leafs):
            if not isinstance(node, c_abc.Mapping):
                raise KeyError(f"Element {'.'.join(leafs[:i])!r} is not a mapping")
            if not leaf:
                raise KeyError(f"Empty sub-key after {'.'.join(leafs[:i])!r}")
            if leaf not in node:
            node = node[leaf]
            return node

        raise KeyError(f"Cannot find '{key}'")
项目:shanghai    作者:chireiden    | 项目源码 | 文件源码
def _parse_servers(self, mapping: Mapping) -> List[Server]:
        servers = []
        servers_conf = mapping.get('servers')
        if not servers_conf:
            raise ConfigurationError(f"Network {!r} has no servers")
        if not isinstance(servers_conf, list):
            raise ConfigurationError(f"Servers of Network {!r} are not a list")
        for server_conf in mapping.get('servers', ()):
            if isinstance(server_conf, str):
                server = Server.from_string(server_conf)
                server = Server.with_optional_port(**server_conf)

            return servers
项目:asyncqlio    作者:SunDwarf    | 项目源码 | 文件源码
def execute(self, sql: str, params: typing.Mapping[str, typing.Any] = None):
        Executes SQL inside the transaction.

        :param sql: The SQL to execute.
        :param params: The parameters to excecute with.
        # re-paramatarize the query
        logger.debug("Executing query {} with params {}".format(sql, params))
        query, params = get_param_query(sql, params)

            results = await self.acquired_connection.execute(query, *params)
        except (asyncpg.IntegrityConstraintViolationError,
                asyncpg.exceptions.NotNullViolationError) as e:
            raise IntegrityError(*e.args) from e
        except asyncpg.ObjectNotInPrerequisiteStateError as e:
            raise OperationalError(*e.args) from e
        except (asyncpg.SyntaxOrAccessError, asyncpg.InFailedSQLTransactionError) as e:
            raise DatabaseException(*e.args) from e

        return results
项目:asyncqlio    作者:SunDwarf    | 项目源码 | 文件源码
def execute(self, sql: str, params: typing.Union[typing.Mapping, typing.Iterable] = None):
        Executes SQL in the current transaction.
        # lock to ensure nothing else is using the connection at once

        logger.debug("Running SQL {} with params {}".format(sql, params))
        async with self._lock:
            async with threadpool():
                for stmt in separate_statements(sql):
                        if params is None:
                            res = self.connection.execute(stmt)
                            res = self.connection.execute(stmt, params)
                    except sqlite3.IntegrityError as e:
                        raise IntegrityError(*e.args)
                    except sqlite3.OperationalError as e:
                        raise DatabaseException(*e.args)

            return res
项目:asyncqlio    作者:SunDwarf    | 项目源码 | 文件源码
def cursor(self, sql: str, params: typing.Union[typing.Mapping, typing.Iterable] = None) \
            -> 'Sqlite3ResultSet':
        Gets a cursor for the specified SQL.

        logger.debug("Running SQL {} with params {}".format(sql, params))

        async with self._lock:
            async with threadpool():
                for stmt in separate_statements(sql):
                    cur = self.connection.cursor()
                        if params is None:
                            cur.execute(stmt, params)
                    except sqlite3.OperationalError as e:
                        raise DatabaseException(*e.args)

        return Sqlite3ResultSet(cur)
项目:asyncqlio    作者:SunDwarf    | 项目源码 | 文件源码
def map_many(self, *rows: typing.Mapping[str, typing.Any]):
        Maps many records to one row.

        This will group the records by the primary key of the main query table, then add additional
        columns as appropriate.
        # this assumes that the rows come in grouped by PK on the left
        # also fuck right joins
        # get the first row and construct the first table row using map_columns
        # this will also map any extra relationship data there
        first_row = rows[0]
        tbl_row = self.map_columns(first_row)

        # loop over every "extra" rows
        # and update the relationship data in the table
        for runon_row in rows[1:]:

        return tbl_row

    # Helper methods for natural builder-style queries
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def __init__(
        params: t.Mapping[str, str],
        lti_provider: models.LTIProvider = None
    ) -> None:
        self.launch_params = params

        if lti_provider is not None:
            self.lti_provider = lti_provider
            lti_id = params['lti_provider_id']
            self.lti_provider = helpers.get_or_404(

        self.key = self.lti_provider.key
        self.secret = self.lti_provider.secret

    # TODO support more than just flask
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def __init__(
        operation: str = None,
        score: str = None,
        result_data: t.Mapping[str, str] = None,
        message_identifier: str = None,
        lis_outcome_service_url: str = None,
        lis_result_sourcedid: str = None,
        consumer_key: str = None,
        consumer_secret: str = None,
        post_request: t.Any = None
    ) -> None:
        self.operation = operation
        self.score = score
        self.result_data = result_data
        self.outcome_response = None  # type: t.Optional['OutcomeResponse']
        self.message_identifier = message_identifier
        self.lis_outcome_service_url = lis_outcome_service_url
        self.lis_result_sourcedid = lis_result_sourcedid
        self.consumer_key = consumer_key
        self.consumer_secret = consumer_secret
        self.post_request = post_request
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def get_all_permissions(self) -> t.Mapping[str, bool]:
        """Get all course permissions (:class:`Permission`) for this role.

        :returns: A name boolean mapping where the name is the name of the
                  permission and the value indicates if this user has this
        perms: t.Sequence[Permission] = (
            filter_by(  # type: ignore
        result: t.MutableMapping[str, bool] = {}
        for perm in perms:
            if in self._permissions:
                result[] = not perm.default_value
                result[] = perm.default_value
        return result
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def __to_json__(self) -> t.Mapping[str, t.Any]:
        """Creates a JSON serializable representation of this object.

        This object will look like this:

        .. code:: python

                'id':    int, # The id of this user.
                'name':  str, # The full name of this user.
                'email': str, # The email of this user.
                'username': str, # The username of this user.

        :returns: An object as described above.
        return {
            'username': self.username,
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def __extended_to_json__(self) -> t.Mapping[str, t.Any]:
        """Create a extended JSON serializable representation of this object.

        This object will look like this:

        .. code:: python

                'hidden': bool, # indicating if this user can once
                                # see hidden assignments.

        :returns: A object as described above.
        return {
            "hidden": self.can_see_hidden,
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def __to_json__(self) -> t.Mapping[str, t.Any]:
        """Creates a JSON serializable representation of this object.

        This object will look like this:

        .. code:: python

                'name': str, # The name of the course,
                'id': int, # The id of this course.
                'created_at': str, # ISO UTC date.
                'is_lti': bool, # Is the this course a LTI course,

        :returns: A object as described above.
        return {
            'created_at': self.created_at.isoformat(),
            'is_lti': self.lti_course_id is not None,
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def __to_json__(self) -> t.Mapping[str, t.Any]:
        """Converts a rubric of a work to a object that is JSON serializable.

        The resulting object will look like this:

        .. code:: python

                'changed_at': str, # The date the history was added.
                'is_rubric': bool, # Was this history items added by a rubric
                                   # grade.
                'grade': float, # The new grade, -1 if the grade was deleted.
                'passed_back': bool, # Is this grade given back to LTI.
                'user': User, # The user that added this grade.

        :returns: A object as described above.
        return {
            'changed_at': self.changed_at.isoformat(),
            'is_rubric': self.is_rubric,
            'grade': self.grade,
            'passed_back': self.passed_back,
            'user': self.user,
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def __to_json__(self) -> t.Mapping[str, t.Union[str, bool, int]]:
        """Creates a JSON serializable representation of this object.

        This object will look like this:

        .. code:: python

                'name': str, # The name of the file or directory.
                'id': int, # The id of this file.
                'is_directory': bool, # Is this file a directory.

        :returns: A object as described above.
        return {
            'is_directory': self.is_directory,
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def __to_json__(self) -> t.Mapping[str, t.Any]:
        """Returns the JSON serializable representation of this class.

        This representation also returns a count of the :class:`LinterState` of
        the attached :class:`LinterInstance` objects.

        :returns: A dict containing JSON serializable representations of the
                  attributes and the test state counts of this
        return {
            'done': self.linters_done,
            'working': self.linters_running,
            'crashed': self.linters_crashed,
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def second_phase_lti_launch(
) -> helpers.JSONResponse[t.Mapping[str, t.Union[str, models.Assignment, bool]]
    launch_params = jwt.decode(
        flask.request.headers.get('Jwt', None),
    lti = CanvasLTI(launch_params)

    user, new_token = lti.ensure_lti_user()
    course = lti.get_course()
    assig = lti.get_assignment(user)
    new_role_created = lti.set_user_course_role(user, course)

    result: t.Mapping[str, t.Union[str, models.Assignment, bool]]
    result = {'assignment': assig, 'new_role_created': new_role_created}
    if new_token is not None:
        result['access_token'] = new_token

    return helpers.jsonify(result)
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def about(
) -> JSONResponse[t.Mapping[str, t.Union[str, t.Mapping[str, bool]]]]:
    """Get the version and features of the currently running instance.

    .. :quickref: About; Get the version and features.

    :>json string version: The version of the running instance.
    :>json object features: A mapping from string to a boolean for every
        feature indicating if the current instance has it enabled.

    :returns: The mapping as described above.
    features = {
        key: bool(value)
        for key, value in['FEATURES'].items()
    return jsonify(
            'features': features,
项目    作者:CodeGra-de    | 项目源码 | 文件源码
def get_rubric(submission_id: int) -> JSONResponse[t.Mapping[str, t.Any]]:
    """Return full rubric of the :class:`.models.Assignment` of the given
    submission (:class:`.models.Work`).

    .. :quickref: Submission; Get a rubric and its selected items.

    :param int submission_id: The id of the submission
    :returns: A response containing the JSON serialized rubric as described in

    :raises APIException: If the submission with the given id does not exist.
    :raises PermissionException: If there is no logged in user. (NOT_LOGGED_IN)
    :raises PermissionException: If the user can not see the assignment of the
                                 given submission. (INCORRECT_PERMISSION)
    work = helpers.get_or_404(models.Work, submission_id)
    auth.ensure_permission('can_see_assignments', work.assignment.course_id)
    return jsonify(work.__rubric_to_json__())
项目:mimesis    作者:lk-geimfari    | 项目源码 | 文件源码
def update_dict(initial: JSON, other: Mapping) -> JSON:
    """Recursively update a dictionary.

    :param initial: Dict to update.
    :type initial: dict or list
    :param other: Dict to update from.
    :type other: Mapping
    :return: Updated dict.
    :rtype: dict
    for key, value in other.items():
        if isinstance(value, collections.Mapping):
            r = update_dict(initial.get(key, {}), value)
            initial[key] = r
            initial[key] = other[key]
    return initial
项目:sockeye    作者:awslabs    | 项目源码 | 文件源码
def load_params(fname: str) -> Tuple[Dict[str, mx.nd.NDArray], Dict[str, mx.nd.NDArray]]:
    Loads parameters from a file.

    :param fname: The file containing the parameters.
    :return: Mapping from parameter names to the actual parameters for both the arg parameters and the aux parameters.
    save_dict = mx.nd.load(fname)
    arg_params = {}
    aux_params = {}
    for k, v in save_dict.items():
        tp, name = k.split(':', 1)
        if tp == 'arg':
            arg_params[name] = v
        if tp == 'aux':
            aux_params[name] = v
    return arg_params, aux_params
项目:pytypes    作者:Stewori    | 项目源码 | 文件源码
def get_Generic_parameters(tp, generic_supertype):
    """tp must be a subclass of generic_supertype.
    Retrieves the type values from tp that correspond to parameters
    defined by generic_supertype.

    E.g. get_Generic_parameters(tp, typing.Mapping) is equivalent
    to get_Mapping_key_value(tp) except for the error message.

    Note that get_Generic_itemtype(tp) is not exactly equal to
    get_Generic_parameters(tp, typing.Container), as that method
    additionally contains treatment for typing.Tuple and typing.Iterable.
        res = _select_Generic_superclass_parameters(tp, generic_supertype)
    except TypeError:
        res = None
    if res is None:
        raise TypeError("%s has no proper parameters defined by %s."%
                (type_str(tp), type_str(generic_supertype)))
        return tuple(res)
项目:sk-torch    作者:mattHawthorn    | 项目源码 | 文件源码
def from_vocab(cls, sequences: Map[int, Seq[H]], vocab: Vocabulary, max_len: int, pack_sequences: bool=False,
                   append_eos: bool=True, eos_token: Opt[H]=DEFAULT_EOS, null_token: H=DEFAULT_NULL,
                   int_id_type: str='long', shuffle: bool=True):
        :param vocab: instance of Vocabulary to use for encoding/decoding tokens
        :param max_len: maximum length of sequences to sample
        :param pack_sequences: bool indicating whether to return regular Tensors or PackedSequence instances.
        :param int_id_type: string indicating the type of int ids to use. Must be a key of data.str_to_int_tensor_type.
        :param eos_token: string or hashable to append to mark end-of-sequence in encoding
        :param null_token: Optional hashable to use for padding sequences. Added to the vocab, unless none is passed
            and none is built, in which case this is considered to be an int id.
            Numpy aliases for integer types are valid, as well as 'long', 'short', 'byte', 'char'.
            The default 'long' is recommended, as only LongTensors can be used to index Embeddings in pytorch.
        encoder = SequenceTensorEncoder(vocab, append_eos=append_eos, eos_token=eos_token, null_token=null_token,
        return cls(sequences=sequences, encoder=encoder, max_len=max_len, pack_sequences=pack_sequences,
                   null_token=null_token, shuffle=shuffle)
项目:sk-torch    作者:mattHawthorn    | 项目源码 | 文件源码
def from_token2id(cls, sequences: Map[int, Seq[H]], token2id: Dict[H, int],
                      max_len: int, pack_sequences: bool=False,
                      append_eos: bool=True, eos_token: Opt[H]=DEFAULT_EOS,
                      null_token: H=DEFAULT_NULL, oov_token: H=DEFAULT_OOV,
                      int_id_type: str='long', shuffle: bool=True):
        :param token2id: mapping of tokens to int ids
        :param max_len: maximum length of sequences to sample
        :param pack_sequences: bool indicating whether to return regular Tensors or PackedSequence instances.
        :param int_id_type: string indicating the type of int ids to use. Must be a key of data.str_to_int_tensor_type.
        :param oov_token: hashable to insert for out-of-vocab tokens when encoding
        :param eos_token: string or hashable to append to mark end-of-sequence in encoding
        :param null_token: Optional hashable to use for padding sequences. Added to the vocab, unless none is passed
            and none is built, in which case this is considered to be an int id.
            Numpy aliases for integer types are valid, as well as 'long', 'short', 'byte', 'char'.
            The default 'long' is recommended, as only LongTensors can be used to index Embeddings in pytorch.
        vocab = Vocabulary.from_token2id(token2id, oov_token=oov_token)
        encoder = SequenceTensorEncoder(vocab, append_eos=append_eos, eos_token=eos_token, null_token=null_token,
        return cls(sequences=sequences, encoder=encoder, max_len=max_len, pack_sequences=pack_sequences,
                   null_token=null_token, shuffle=shuffle)
项目:sk-torch    作者:mattHawthorn    | 项目源码 | 文件源码
def from_id2token(cls, sequences: Map[int, Seq[H]], id2token: Dict[H, int],
                      max_len: int, pack_sequences: bool=False,
                      append_eos: bool=True, eos_token: Opt[H]=DEFAULT_EOS,
                      null_token: H=DEFAULT_NULL, oov_token: H=DEFAULT_OOV,
                      int_id_type: str='long', shuffle: bool=True):
        :param id2token: mapping of int ids to tokens
        :param max_len: maximum length of sequences to sample
        :param pack_sequences: bool indicating whether to return regular Tensors or PackedSequence instances.
        :param int_id_type: string indicating the type of int ids to use. Must be a key of data.str_to_int_tensor_type.
        :param oov_token: hashable to insert for out-of-vocab tokens when encoding
        :param eos_token: hashable to append to mark end-of-sequence in encoding
        :param null_token: hashable to use for padding sequences. Added to the vocab, unless none is passed
            and none is built, in which case this is considered to be an int id.
            Numpy aliases for integer types are valid, as well as 'long', 'short', 'byte', 'char'.
            The default 'long' is recommended, as only LongTensors can be used to index Embeddings in pytorch.
        vocab = Vocabulary.from_id2token(id2token, oov_token=oov_token)
        encoder = SequenceTensorEncoder(vocab, append_eos=append_eos, eos_token=eos_token, null_token=null_token,
        return cls(sequences=sequences, encoder=encoder, max_len=max_len, pack_sequences=pack_sequences,
                   null_token=null_token, shuffle=shuffle)
项目:nirum-python    作者:spoqa    | 项目源码 | 文件源码
def validate_type(data, type_):
    instance_check = False
    abstract_types = {typing.AbstractSet, typing.Sequence, typing.Mapping}
    if hasattr(type_, '__origin__') and type_.__origin__ in abstract_types:
        param_type = get_abstract_param_types(type_)
        imp_types = {
            typing.AbstractSet: collections.Set,
            typing.Sequence: collections.Sequence,
            typing.Mapping: collections.Mapping,
        instance_check = isinstance(data, imp_types[type_.__origin__]) and \
            all(isinstance(item, param_type[0]) for item in data)
            instance_check = isinstance(data, type_)
        except TypeError:
            if is_union_type(type_):
                instance_check = any(
                    isinstance(data, t) for t in get_union_types(type_)
                raise ValueError('{!r} cannot validated.'.format(type_))
    return instance_check
项目:nirum-python    作者:spoqa    | 项目源码 | 文件源码
def deserialize_abstract_type(cls, data):
    abstract_type_map = {
        typing.Sequence: list,
        typing.List: list,
        typing.Dict: dict,
        typing.Set: set,
        typing.AbstractSet: set,
    cls_origin_type = cls.__origin__
    if cls_origin_type is None:
        cls_origin_type = cls
    iterable_types = {
        typing.Sequence, typing.List, typing.Tuple, typing.Set,
        typing.AbstractSet, typing.Mapping,
    if cls_origin_type in iterable_types:
        return deserialize_iterable_abstract_type(cls, cls_origin_type, data)
        return abstract_type_map[cls_origin_type](data)
项目:wikidata    作者:dahlia    | 项目源码 | 文件源码
def __call__(self,
                 client: Client,
                 datatype: str,
                 datavalue: Mapping[str, object]) -> object:
            type_ = datavalue['type']
        except KeyError:
            raise DatavalueError('no "type" specified', datavalue)
        assert isinstance(type_, str)
        if 'value' not in datavalue:
            raise DatavalueError('no "value" field', datavalue)
        method_name = '{}__{}'.format(datatype, type_).replace('-', '_')
        method = getattr(self, method_name, None)
        if callable(method):
            return method(client, datavalue)
        method_name = type_.replace('-', '_')
        method = getattr(self, method_name, None)
        if callable(method):
            return method(client, datavalue)
        raise DatavalueError('{!r} is unsupported type'.format(type_),
项目:auDeep    作者:auDeep    | 项目源码 | 文件源码
def label_map(self) -> Optional[Mapping[str, float]]:
        Returns the mapping of nominal labels to numeric labels for the data set.

        This method delegates to the selected parser.

        map of str to int
            The mapping of nominal labels to numeric labels

            If the data set cannot be parsed
        if not self.can_parse():
            raise IOError("unable to parse data set at {}".format(self._basedir))

        return self._parser.label_map
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def do_explode(self, kind):
        if kind in basic_types or type(kind) is typing.TypeVar:
            return False
        if not issubclass(kind, (typing.Sequence,
            return True
        return False
项目:Hanabi-AI    作者:MeGotsThis    | 项目源码 | 文件源码
def __init__(self,
                 username: str,
                 password: str,
                 botModule: str,
                 botconfig: Mapping,
                 **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.username: str = username
        self.password: str = password
        module = importlib.import_module(botModule + '.bot')
        self.botCls: Type[Bot] = module.Bot  # type: ignore
        self.botconfig: Mapping = botconfig
        self.conn: socketIO_client.SocketIO Optional[Game] = None