Python typing 模块,Union() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Union()

项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def waitFor(self, selectorOrFunctionOrTimeout: Union[str, int, float],
                options: dict = None, **kwargs: Any) -> Awaitable:
        """Wait until `selectorOrFunctionOrTimeout`."""
        if options is None:
            options = dict()
        options.update(kwargs)
        if isinstance(selectorOrFunctionOrTimeout, (int, float)):
            fut: Awaitable[None] = asyncio.ensure_future(
                asyncio.sleep(selectorOrFunctionOrTimeout))
            return fut
        if not isinstance(selectorOrFunctionOrTimeout, str):
            fut = asyncio.get_event_loop().create_future()
            fut.set_exception(TypeError(
                'Unsupported target type: ' +
                str(type(selectorOrFunctionOrTimeout))
            ))
            return fut
        if ('=>' in selectorOrFunctionOrTimeout or
                selectorOrFunctionOrTimeout.strip().startswith('function')):
            return self.waitForFunction(selectorOrFunctionOrTimeout, options)
        return self.waitForSelector(selectorOrFunctionOrTimeout, options)
项目:greenstalk    作者:mayhewj    | 项目源码 | 文件源码
def __init__(self,
                 host: str = '127.0.0.1',
                 port: int = 11300,
                 encoding: Optional[str] = 'utf-8',
                 use: str = DEFAULT_TUBE,
                 watch: Union[str, Iterable[str]] = DEFAULT_TUBE) -> None:
        self._sock = socket.create_connection((host, port))
        self._reader = self._sock.makefile('rb')  # type: BinaryIO
        self.encoding = encoding

        if use != DEFAULT_TUBE:
            self.use(use)

        if isinstance(watch, str):
            if watch != DEFAULT_TUBE:
                self.watch(watch)
                self.ignore(DEFAULT_TUBE)
        else:
            for tube in watch:
                self.watch(tube)
            if DEFAULT_TUBE not in watch:
                self.ignore(DEFAULT_TUBE)
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def retspec(defs):
    # return specs
    # only return 1, so if there is more than one type
    # we need to include a union
    # In truth there is only 1 return
    # Error or the expected Type
    if not defs:
        return None
    if defs in basic_types:
        return strcast(defs, False)
    rtypes = _registry.getObj(_types[defs])
    if not rtypes:
        return None
    if len(rtypes) > 1:
        return Union[tuple([strcast(r[1], True) for r in rtypes])]
    return strcast(rtypes[0][1], False)
项目:python-libjuju    作者:juju    | 项目源码 | 文件源码
def type_anno_func(func, defs, is_result=False):
    annos = {}
    if not defs:
        return func
    rtypes = _registry.getObj(_types[defs])
    if is_result:
        kn = "return"
        if not rtypes:
            annos[kn] = None
        elif len(rtypes) > 1:
            annos[kn] = Union[tuple([r[1] for r in rtypes])]
        else:
            annos[kn] = rtypes[0][1]
    else:
        for name, rtype in rtypes:
            name = name_to_py(name)
            annos[name] = rtype
    func.__annotations__.update(annos)
    return func
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def calc_mean_lp_scores(log_prob_scores: List[float],
                        lengths: List[int]) -> List[Union[None, float]]:
    r"""
    .. math:
        \frac{%
            \log P_\text{model}\left(\xi\right)
            }{%
              \text{length}\left(\xi\right)
            }
    >>> '{:.3f}'.format(calc_mean_lp_scores([-14.7579], [4])[0])
    '-3.689'
    """
    mean_lp_scores = []
    for score, length in zip(log_prob_scores, lengths):
        x = None \
            if score is None or length == 0 \
            else float(score) / float(length)
        mean_lp_scores.append(x)
    return mean_lp_scores
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def calc_norm_lp_div_scores(
        log_prob_scores: List[float],
        unigram_scores: List[float]) -> List[Union[None, float]]:
    r"""
    .. math:
        \frac{%
            \log P_\text{model}\left(\xi\right)
        }{%
            \log P_\text{unigram}\left(\xi\right)
        }
    >>> '{:.3f}'.format(calc_norm_lp_div_scores([-14.7579], [-35.6325])[0])
    '-0.414'
    """
    results = []
    for log_prob, unigram_score in zip(log_prob_scores, unigram_scores):
        if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05):
            x = None
        else:
            x = (-1.0) * float(log_prob) / float(unigram_score)
        results.append(x)
    return results
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def calc_norm_lp_sub_scores(
        log_prob_scores: List[float],
        unigram_scores: List[float]) -> List[Union[None, float]]:
    r"""
    .. math:
        \log P_\text{model}\left(\xi\right)
            - \log P_\text{unigram}\left(\xi\right)
    >>> '{:.3f}'.format(calc_norm_lp_sub_scores([-14.7579], [-35.6325])[0])
    '20.875'
    """

    results = []
    for log_prob, unigram_score in zip(log_prob_scores, unigram_scores):
        if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05):
            x = None
        else:
            x = float(log_prob) - float(unigram_score)
        results.append(x)
    return results
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def __init__(self, client: Session, ignoreHTTPSErrors: Any,
                 options: dict = None, **kwargs: Any) -> None:
        """Make new navigator watcher."""
        if options is None:
            options = {}
        options.update(kwargs)
        self._client = client
        self._ignoreHTTPSErrors = ignoreHTTPSErrors
        self._timeout = options.get('timeout', 3000)
        self._idleTime = options.get('networkIdleTimeout', 1000)
        self._idleTimer: Optional[Union[asyncio.Future, asyncio.Handle]] = None
        self._idleInflight = options.get('networkIdleInflight', 2)
        self._waitUntil = options.get('waitUntil', 'load')
        if self._waitUntil not in ('load', 'networkidle'):
            raise ValueError(
                f'Unknown value for options.waitUntil: {self._waitUntil}')
项目:pyppeteer    作者:miyakogi    | 项目源码 | 文件源码
def convertPrintParameterToInches(parameter: Union[None, int, float, str]
                                  ) -> Optional[float]:
    """Convert print parameter to inches."""
    if parameter is None:
        return None
    if isinstance(parameter, (int, float)):
        pixels = parameter
    elif isinstance(parameter, str):
        text = parameter
        unit = text[-2:].lower()
        if unit in unitToPixels:
            valueText = text[:-2]
        else:
            unit = 'px'
            valueText = text
        try:
            value = float(valueText)
        except ValueError:
            raise ValueError('Failed to parse parameter value: ' + text)
        pixels = value * unitToPixels[unit]
    else:
        raise TypeError('page.pdf() Cannot handle parameter type: ' +
                        str(type(parameter)))
    return pixels / 96
项目:ConfigSpace    作者:automl    | 项目源码 | 文件源码
def get_parents_of(self, name: Union[str, Hyperparameter]) -> List[Hyperparameter]:
        """Return the parent hyperparameters of a given hyperparameter.

        Parameters
        ----------
        name : str or Hyperparameter
            Can either be the name of a hyperparameter or the hyperparameter
            object.

        Returns
        -------
        list
            List with all parent hyperparameters.
        """
        conditions = self.get_parent_conditions_of(name)
        parents = [] # type: List[Hyperparameter]
        for condition in conditions:
            parents.extend(condition.get_parents())
        return parents
项目:backend.ai-client-py    作者:lablup    | 项目源码 | 文件源码
def content(self) -> Union[bytes, bytearray, None]:
        '''
        Retrieves the content in the original form.
        Private codes should NOT use this as it incurs duplicate
        encoding/decoding.
        '''
        if self._content is None:
            raise ValueError('content is not set.')
        if self.content_type == 'application/octet-stream':
            return self._content
        elif self.content_type == 'application/json':
            return json.loads(self._content.decode('utf-8'),
                              object_pairs_hook=OrderedDict)
        elif self.content_type == 'text/plain':
            return self._content.decode('utf-8')
        elif self.content_type == 'multipart/form-data':
            return self._content
        else:
            raise RuntimeError('should not reach here')  # pragma: no cover
项目:speechless    作者:JuliusKunze    | 项目源码 | 文件源码
def _extract_positional_label_by_id(self, files: Iterable[Path]) -> Dict[str, Union[PositionalLabel, str]]:
        xml_ending = ".xml"

        microphone_endings = [
            "_Yamaha",
            "_Kinect-Beam",
            "_Kinect-RAW",
            "_Realtek",
            "_Samson",
            "_Microsoft-Kinect-Raw"
        ]

        xml_files = [file for file in files if file.name.endswith(xml_ending) if
                     self.id_filter_regex.match(name_without_extension(file))]

        return OrderedDict(
            (name_without_extension(file) + microphone_ending,
             self._extract_label_from_xml(file))
            for file in xml_files
            for microphone_ending in microphone_endings
            if (Path(file.parent) / (name_without_extension(file) + microphone_ending + ".wav")).exists())
项目:datapipelines-python    作者:meraki-analytics    | 项目源码 | 文件源码
def with_default(self, value: Union[Any, Callable[[MutableMapping[str, Any]], Any]], supplies_type: Type = None) -> "QueryValidator":
        if self._current is None or self._current.child is not None:
            raise QueryValidatorStructureError("No key is selected! Try using \"can_have\" before \"with_default\".")

        if self._current.required:
            raise QueryValidatorStructureError("Can't assign a default value to a required key! Try using \"can_have\" instead of \"have\".")

        if supplies_type:
            expected_type = supplies_type
        else:
            expected_type = type(value)

        default_node = _DefaultValueNode(self._current.key, value, supplies_type)
        result = self.as_(expected_type)
        result._current.child.child = default_node
        return result
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def _ecies_gen_ephemeral_key(
        recp_pubkey: Union[bytes, elliptic_curve.ec_element]
) -> Tuple[bytes, Tuple[bytes, bytes]]:
    """
    Generates and encrypts an ephemeral key for the `recp_pubkey`.

    :param recp_pubkey: Recipient's pubkey

    :return: Tuple of the eph_privkey, and a tuple of the encrypted symmetric
             key, and encrypted ephemeral privkey
    """
    symm_key, enc_symm_key = API.ecies_encapsulate(recp_pubkey)
    eph_privkey = API.ecies_gen_priv()

    enc_eph_privkey = API.symm_encrypt(symm_key, eph_privkey)
    return (eph_privkey, (enc_symm_key, enc_eph_privkey))
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def ecdsa_priv2pub(
        privkey: bytes,
        to_bytes: bool = True
) -> Union[bytes, Tuple[int]]:
    """
    Returns the public component of an ECDSA private key.

    :param privkey: Private key as an int or bytestring
    :param to_bytes: Serialize to bytes or not?

    :return: Byte encoded or Tuple[int] ECDSA pubkey
    """
    pubkey = privtopub(privkey)
    if to_bytes:
        return SIG_KEYPAIR_BYTE + PUB_KEY_BYTE + ecdsa_pub2bytes(pubkey)
    return pubkey
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def ecdsa_verify(
        v: int,
        r: int,
        s: int,
        msghash: bytes,
        pubkey: Union[bytes, Tuple[int, int]]
) -> bool:
    """
    Takes a v, r, s, a pubkey, and a hash of a message to verify via ECDSA.

    :param v: V of sig
    :param r: R of sig
    :param s: S of sig
    :param bytes msghash: The hashed message to verify
    :param bytes pubkey: Pubkey to validate signature for

    :rtype: Boolean
    :return: Is the signature valid or not?
    """
    if bytes == type(pubkey):
        pubkey = ecdsa_bytes2pub(pubkey)

    verify_sig = ecdsa_raw_recover(msghash, (v, r, s))
    # TODO: Should this equality test be done better?
    return verify_sig == pubkey
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def ecies_priv2pub(
        privkey: Union[bytes, elliptic_curve.ec_element],
        to_bytes: bool = True
) -> Union[bytes, elliptic_curve.ec_element]:
    """
    Takes a private key (secret bytes or an elliptic_curve.ec_element) and
    derives the Public key from it.

    :param privkey: The Private key to derive the public key from
    :param to_bytes: Return the byte serialization of the pubkey?

    :return: The Public component of the Private key provided
    """
    if type(privkey) == bytes:
        privkey = priv_bytes2ec(privkey)

    pubkey = PRE.priv2pub(privkey)
    if to_bytes:
        return elliptic_curve.serialize(pubkey)[1:]
    return pubkey
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def ecies_rekey(
        privkey_a: Union[bytes, elliptic_curve.ec_element],
        privkey_b: Union[bytes, elliptic_curve.ec_element],
        to_bytes: bool = True
) -> Union[bytes, umbral.RekeyFrag]:
    """
    Generates a re-encryption key from privkey_a to privkey_b.

    :param privkey_a: Private key to re-encrypt from
    :param privkey_b: Private key to re-encrypt to
    :param to_bytes: Format result as bytes?

    :return: Re-encryption key
    """
    if type(privkey_a) == bytes:
        privkey_a = priv_bytes2ec(privkey_a)
    if type(privkey_b) == bytes:
        privkey_b = priv_bytes2ec(privkey_b)

    rk = PRE.rekey(privkey_a, privkey_b)
    if to_bytes:
        return elliptic_curve.serialize(rk.key)[1:]
    return rk
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def ecies_split_rekey(
        privkey_a: Union[bytes, elliptic_curve.ec_element],
        privkey_b: Union[bytes, elliptic_curve.ec_element],
        min_shares: int,
        total_shares: int
) -> List[umbral.RekeyFrag]:
    """
    Performs a split-key re-encryption key generation where a minimum
    number of shares `min_shares` are required to reproduce a rekey.
    Will split a rekey into `total_shares`.

    :param privkey_a: Privkey to re-encrypt from
    :param privkey_b: Privkey to re-encrypt to
    :param min_shares: Minimum shares needed to reproduce rekey
    :param total_shares: Total shares to generate from split-rekey gen

    :return: A list of RekeyFrags to distribute
    """
    if type(privkey_a) == bytes:
        privkey_a = priv_bytes2ec(privkey_a)
    if type(privkey_b) == bytes:
        privkey_b = priv_bytes2ec(privkey_b)
    umbral_rekeys = PRE.split_rekey(privkey_a, privkey_b,
                                    min_shares, total_shares)
    return [KFrag(umbral_kfrag=u) for u in umbral_rekeys]
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def ecies_ephemeral_split_rekey(
        privkey_a: Union[bytes, elliptic_curve.ec_element],
        pubkey_b: Union[bytes, elliptic_curve.ec_element],
        min_shares: int,
        total_shares: int
) -> Tuple[List[umbral.RekeyFrag], Tuple[bytes, bytes]]:
    """
    Performs a split-key re-encryption key generation where a minimum
    number of shares `min_shares` are required to reproduce a rekey.
    Will split a rekey inot `total_shares`.
    This also generates an ephemeral keypair for the recipient as `pubkey_b`.

    :param privkey_a: Privkey to re-encrypt from
    :param pubkey_b: Public key to re-encrypt for (w/ ephemeral key)
    :param min_shares: Minium shares needed to reproduce a rekey
    :param total_shares: Total shares to generate from split-rekey gen

    :return: A tuple containing a list of rekey frags, and a tuple of the
             encrypted ephemeral key data (enc_symm_key, enc_eph_privkey)
    """
    eph_privkey, (encrypted_key, encrypted_message) = _internal._ecies_gen_ephemeral_key(pubkey_b)
    kfrags = ecies_split_rekey(privkey_a, eph_privkey, min_shares, total_shares)
    pfrag = PFrag(ephemeral_data_as_bytes=None, encrypted_key=encrypted_key, encrypted_message=encrypted_message)

    return (kfrags, pfrag)
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def ecies_reencrypt(
        rekey: Union[bytes, umbral.RekeyFrag],
        enc_key: Union[bytes, umbral.EncryptedKey],
) -> umbral.EncryptedKey:
    """
    Re-encrypts the key provided.

    :param rekey: Re-encryption key to use
    :param enc_key: Encrypted key to re-encrypt

    :return: The re-encrypted key
    """
    if type(rekey) == bytes:
        rekey = umbral.RekeyFrag(None, priv_bytes2ec(rekey))
    if type(enc_key) == bytes:
        enc_key = umbral.EncryptedKey(priv_bytes2ec(enc_key), None)
    reencrypted_data = PRE.reencrypt(rekey, enc_key)
    return CFrag(reencrypted_data=reencrypted_data)
项目:nucypher-kms    作者:nucypher    | 项目源码 | 文件源码
def add_key(self,
                keypair: Union[keypairs.EncryptingKeypair,
                               keypairs.SigningKeypair],
                store_pub: bool = True) -> bytes:
        """
        Gets a fingerprint of the key and adds it to the keystore.

        :param key: Key, in bytes, to add to lmdb

        :return: Fingerprint, in bytes, of the added key
        """
        if store_pub:
            fingerprint = self._get_fingerprint(keypair.pubkey)
            key = keypair.serialize_pubkey()
        else:
            fingerprint = self._get_fingerprint(keypair.privkey)
            key = keypair.serialize_privkey()

        # Create new Key object and commit to db
        self.session.add(Key(key))
        self.session.commit()
        return fingerprint
项目:botodesu    作者:futursolo    | 项目源码 | 文件源码
def _generate_form_data(**kwargs: Union[Any, BotoFairu]) -> aiohttp.FormData:
    form_fata = aiohttp.FormData()

    for name, value in kwargs.items():
        if isinstance(value, BotoFairu):
            form_fata.add_field(
                name, value._content,
                content_type=value._content_type,
                filename=value._filename,
                content_transfer_encoding=value._content_transfer_encoding)

        elif isinstance(value, (list, dict)):
            form_fata.add_field(name, json.dumps(value))

        elif isinstance(value, (int, float, str, bool)):
            form_fata.add_field(name, str(value))

        else:
            raise TypeError("Unknown Type: {}".format(type(value)))

    return form_fata
项目:mugen    作者:scherroman    | 项目源码 | 文件源码
def largest_dimensions_for_aspect_ratio(dimensions_list: List[Dimensions], desired_aspect_ratio: float,
                                        default: Any = _sentinel) -> Union[Dimensions, Any]:
    """
    Returns
    -------
    The largest dimensions after cropping each dimensions to reach desired aspect ratio
    """
    if not dimensions_list:
        if default is not _sentinel:
            return default
        raise ValueError(f"{dimensions_list} must not be empty.")

    largest_dimensions = crop_dimensions_to_aspect_ratio(dimensions_list[0], desired_aspect_ratio)
    for dimensions in dimensions_list[1:]:
        nearest_dimensions_to_aspect_ratio = crop_dimensions_to_aspect_ratio(dimensions, desired_aspect_ratio)

        if nearest_dimensions_to_aspect_ratio.resolution > largest_dimensions.resolution:
            largest_dimensions = nearest_dimensions_to_aspect_ratio

    return largest_dimensions
项目:mugen    作者:scherroman    | 项目源码 | 文件源码
def __init__(self, sources=Opt[Union[List[Union[Source, 'VideoSourceList']], str]], **kwargs):
        """
        Parameters
        ----------
        sources
            A list of sources.
            Accepts arbitrarily nested video files, file globs, directories, VideoSources, and VideoSourceLists.
        """
        self.name = None

        if isinstance(sources, str):
            self.name = paths.filename_from_path(sources)

            # Build list of sources from directory or file glob
            if os.path.isdir(sources):
                sources = self._sources_from_files(util.files_from_directory(sources))
            else:
                sources = self._sources_from_files(globber.glob(sources))
        else:
            # Convert any source files to VideoSources, and any lists, file globs or directories to VideoSourceLists
            sources = self._fill_in_sources(sources)

        super().__init__(sources, **kwargs)
项目:mugen    作者:scherroman    | 项目源码 | 文件源码
def speed_multiply(self, speed: Union[float, Fraction],
                       offset: Opt[int] = None):
        """
        Speeds up or slows down events by grouping them together or splitting them up.
        For slowdowns, event type group boundaries and isolated events are preserved.

        Parameters
        ----------
        speed
            Factor to speedup or slowdown by. 
            Must be of the form x (speedup) or 1/x (slowdown), where x is a natural number.
            Otherwise, 0 to remove all events.

        offset
            Offsets the grouping of events for slowdowns.
            Takes a max offset of x - 1 for a slowdown of 1/x, where x is a natural number
        """
        if speed == 0:
            self.clear()
        elif speed > 1:
            self._split(speed.numerator)
        elif speed < 1:
            self._merge(speed.denominator, offset)
项目:mugen    作者:scherroman    | 项目源码 | 文件源码
def __init__(self, groups: Opt[Union[List[EventList], List[List[TIME_FORMAT]]]] = None, *,
                 selected: List[EventList] = None):
        """
        Parameters
        ----------
        groups

        selected
            A subset of groups to track
        """
        if groups is not None:
            for index, group in enumerate(groups):
                if not isinstance(group, EventList):
                    # Convert event locations to EventList
                    groups[index] = EventList(group)

        super().__init__(groups)

        self._selected_groups = selected if selected is not None else []
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def send_email(loop: asyncio.AbstractEventLoop, mail_from: str, mail_to: Union[Iterable, str],
                     subject: str, body: str, server: str='localhost') -> None:
    """Send an email to one or more recipients.

    Only supports plain text emails with a single message body.
    No attachments etc.
    """
    if type(mail_to) == str:
        mail_to = [mail_to]
    smtp = aiosmtplib.SMTP(hostname=server, port=25, loop=loop)
    try:
        await smtp.connect()
        for rcpt in mail_to:
            msg = MIMEText(body)
            msg['Subject'] = subject
            msg['From'] = mail_from
            msg['To'] = rcpt
            await smtp.send_message(msg)
        await smtp.quit()
    except aiosmtplib.errors.SMTPException as e:
        log.msg('Error sending smtp notification: %s' % (str(e)), 'NOTIFICATIONS')
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def require_bool(value: Optional[Union[bool, str, int]], convert: bool=False, allow_none: bool=False) -> Any:
    """Make sure a value is a boolean.

    Used when dealing with http input data.
    """
    if value is None and allow_none:
        return value
    if type(value) != bool:
        if not convert:
            raise InvalidData()
        if value in [None, 0, '0', 'false', 'False']:
            value = False
        elif value in [1, '1', 'true', 'True']:
            value = True
        else:
            raise InvalidData('value was %s(%s), expected bool' % (type(value), value))
    return cast(bool, value)
项目:irisett    作者:beebyte    | 项目源码 | 文件源码
def __init__(self, id: int, args: Dict[str, str], monitor_def: ActiveMonitorDef, state: str, state_ts: float,
                 msg: str, alert_id: Union[int, None], checks_enabled: bool,
                 alerts_enabled: bool, manager: ActiveMonitorManager) -> None:
        self.id = id
        self.args = args
        self.monitor_def = monitor_def
        self.state = state
        self.manager = manager
        self.last_check_state = None  # type: Optional[str]
        self.consecutive_checks = 0
        self.last_check = time.time()
        self.msg = msg
        self.alert_id = alert_id
        self.state_ts = state_ts
        if not self.state_ts:
            self.state_ts = time.time()
        self.monitoring = False
        self.deleted = False
        self.checks_enabled = checks_enabled
        self.alerts_enabled = alerts_enabled
        self._pending_reset = False
        self.scheduled_job = None  # type: Optional[asyncio.Handle]
        self.scheduled_job_ts = 0.0
        event.running('CREATE_ACTIVE_MONITOR', monitor=self)
        stats.inc('num_monitors', 'ACT_MON')
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def __init__(self,
                 label: Union[str, int],
                 label_namespace: str = 'labels',
                 skip_indexing: bool = False) -> None:
        self.label = label
        self._label_namespace = label_namespace
        self._label_id = None
        self._maybe_warn_for_namespace(label_namespace)

        if skip_indexing:
            if not isinstance(label, int):
                raise ConfigurationError("In order to skip indexing, your labels must be integers. "
                                         "Found label = {}".format(label))
            else:
                self._label_id = label
        else:
            if not isinstance(label, str):
                raise ConfigurationError("LabelFields must be passed a string label if skip_indexing=False. "
                                         "Found label: {} with type: {}.".format(label, type(label)))
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def __init__(self,
                 labels: Union[List[str], List[int]],
                 sequence_field: SequenceField,
                 label_namespace: str = 'labels') -> None:
        self.labels = labels
        self.sequence_field = sequence_field
        self._label_namespace = label_namespace
        self._indexed_labels = None
        self._maybe_warn_for_namespace(label_namespace)
        if len(labels) != sequence_field.sequence_length():
            raise ConfigurationError("Label length and sequence length "
                                     "don't match: %d and %d" % (len(labels), sequence_field.sequence_length()))

        if all([isinstance(x, int) for x in labels]):
            self._indexed_labels = labels

        elif not all([isinstance(x, str) for x in labels]):
            raise ConfigurationError("SequenceLabelFields must be passed either all "
                                     "strings or all ints. Found labels {} with "
                                     "types: {}.".format(labels, [type(x) for x in labels]))
项目:allennlp    作者:allenai    | 项目源码 | 文件源码
def from_dataset(cls,
                     dataset,
                     min_count: int = 1,
                     max_vocab_size: Union[int, Dict[str, int]] = None,
                     non_padded_namespaces: Sequence[str] = DEFAULT_NON_PADDED_NAMESPACES,
                     pretrained_files: Optional[Dict[str, str]] = None,
                     only_include_pretrained_words: bool = False) -> 'Vocabulary':
        """
        Constructs a vocabulary given a :class:`.Dataset` and some parameters.  We count all of the
        vocabulary items in the dataset, then pass those counts, and the other parameters, to
        :func:`__init__`.  See that method for a description of what the other parameters do.
        """
        logger.info("Fitting token dictionary from dataset.")
        namespace_token_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int))
        for instance in tqdm.tqdm(dataset.instances):
            instance.count_vocab_items(namespace_token_counts)

        return Vocabulary(counter=namespace_token_counts,
                          min_count=min_count,
                          max_vocab_size=max_vocab_size,
                          non_padded_namespaces=non_padded_namespaces,
                          pretrained_files=pretrained_files,
                          only_include_pretrained_words=only_include_pretrained_words)
项目:PrivacyScore    作者:PrivacyScore    | 项目源码 | 文件源码
def test_site(url: str, previous_results: dict, remote_host: str = None) -> Dict[str, Dict[str, Union[str, bytes]]]:
    # test first mx
    try:
        hostname = previous_results['mx_records'][0][1]
    except (KeyError, IndexError):
        return {
            'jsonresult': {
                'mime_type': 'application/json',
                'data': b'',
            },
        }

    jsonresult = run_testssl(hostname, True, remote_host)

    return {
        'jsonresult': {
            'mime_type': 'application/json',
            'data': jsonresult,
        },
    }
项目:osbrain    作者:opensistemas-hub    | 项目源码 | 文件源码
def _subscribe_to_topic(self, alias: str, topic: Union[bytes, str]):
        '''
        Do the actual ZeroMQ subscription of a socket given by its alias to
        a specific topic. This method only makes sense to be called on
        SUB/SYNC_SUB sockets.

        Note that the handler is not set within this function.
        '''
        topic = topic_to_bytes(topic)

        if isinstance(self.address[alias], AgentAddress):
            self.socket[alias].setsockopt(zmq.SUBSCRIBE, topic)
        elif isinstance(self.address[alias], AgentChannel):
            channel = self.address[alias]
            sub_address = channel.receiver
            treated_topic = channel.uuid + topic
            self.socket[sub_address].setsockopt(zmq.SUBSCRIBE, treated_topic)
        else:
            raise NotImplementedError('Unsupported address type %s!' %
                                      self.address[alias])
项目:osbrain    作者:opensistemas-hub    | 项目源码 | 文件源码
def topics_to_bytes(handlers: Dict[Union[bytes, str], Any], uuid: bytes = b''):
    '''
    Given some pairs topic/handler, leaves them prepared for making the actual
    ZeroMQ subscription.

    Parameters
    ----------
    handlers
        Contains pairs "topic - handler".
    uuid
        uuid of the SYNC_PUB/SYNC_SUB channel (if applies). For normal
        PUB/SUB communication, this should be `b''`.

    Returns
    -------
    Dict[bytes, Any]
    '''
    curated_handlers = {}

    for topic, value in handlers.items():
        topic = topic_to_bytes(topic)
        curated_handlers[uuid + topic] = value

    return curated_handlers
项目:greenstalk    作者:mayhewj    | 项目源码 | 文件源码
def _parse_simple_yaml(buf: bytes) -> Stats:
    data = buf.decode('ascii')

    assert data[:4] == '---\n'
    data = data[4:]  # strip YAML head

    stats = {}
    for line in data.splitlines():
        key, value = line.split(': ', 1)
        try:
            v = int(value)  # type: Union[int, str]
        except ValueError:
            v = value
        stats[key] = v

    return stats
项目:postix    作者:c3cashdesk    | 项目源码 | 文件源码
def reverse_session_view(request: HttpRequest, pk: int) -> Union[HttpRequest, HttpResponseRedirect]:
    session = get_object_or_404(CashdeskSession, pk=pk)

    if request.method == 'POST':
        try:
            reverse_session(session)
        except FlowError as e:
            messages.error(request, str(e))
        else:
            messages.success(request, _('All transactions in the session have been cancelled.'))
        return redirect('backoffice:session-detail', pk=pk)

    elif request.method == 'GET':
        return render(request, 'backoffice/reverse_session.html', {
            'session': session,
        })
项目:parsita    作者:drhagen    | 项目源码 | 文件源码
def rep1sep(parser: Union[Parser, Sequence[Input]], separator: Union[Parser, Sequence[Input]]) \
        -> RepeatedOnceSeparatedParser:
    """Match a parser one or more times separated by another parser.

    This matches repeated sequences of ``parser`` separated by ``separator``.
    If there is at least one match, a list containing the values of the
    ``parser`` matches is returned. The values from ``separator`` are discarded.
    If it does not match ``parser`` at all, it fails.

    Args:
        parser: Parser or literal
        separator: Parser or literal
    """
    if isinstance(parser, str):
        parser = lit(parser)
    if isinstance(separator, str):
        separator = lit(separator)
    return RepeatedOnceSeparatedParser(parser, separator)
项目:parsita    作者:drhagen    | 项目源码 | 文件源码
def repsep(parser: Union[Parser, Sequence[Input]], separator: Union[Parser, Sequence[Input]]) \
        -> RepeatedSeparatedParser:
    """Match a parser zero or more times separated by another parser.

    This matches repeated sequences of ``parser`` separated by ``separator``. A
    list is returned containing the value from each match of ``parser``. The
    values from ``separator`` are discarded. If there are no matches, an empty
    list is returned.

    Args:
        parser: Parser or literal
        separator: Parser or literal
    """
    if isinstance(parser, str):
        parser = lit(parser)
    if isinstance(separator, str):
        separator = lit(separator)
    return RepeatedSeparatedParser(parser, separator)
项目:EMFT    作者:132nd-etcher    | 项目源码 | 文件源码
def select_channel(
        self,
        versions: typing.Set[CustomVersion],
        update_channel: str = channel.STABLE
    ) -> typing.Union[CustomVersion, None]:
        """
        Selects the latest version, equals or higher than "channel"

        Args:
            versions: versions to select from
            update_channel: member of :class:`Channel`

        Returns: latest version or None

        """
        LOGGER.debug(f'selecting latest version amongst {len(versions)}; active channel: {str(channel)}')
        options = list(self.filter_channel(versions, update_channel))
        if options:
            latest = max(options)
            return latest
        LOGGER.debug('no version passed the test')
        return None
项目:scarlett_os    作者:bossjones    | 项目源码 | 文件源码
def load_yaml(fname: str) -> Union[List, Dict]:
    """Load a YAML file."""
    try:
        with open(fname, encoding='utf-8') as conf_file:
            # If configuration file is empty YAML returns None
            # We convert that to an empty dict
            return yaml.load(conf_file, Loader=SafeLineLoader) or {}
    except yaml.YAMLError as exc:
        logger.error(exc)
        raise ScarlettError(exc)
    except UnicodeDecodeError as exc:
        logger.error('Unable to read file %s: %s', fname, exc)
        raise ScarlettError(exc)


# def clear_secret_cache() -> None:
#     """Clear the secret cache.
#
#     Async friendly.
#     """
#     __SECRET_CACHE.clear()
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def select(self, query: str, values: Union[List, Dict],
                     db_name: str = 'default') -> List[DictRow]:
        return await self._select(query=query, values=values, db_name=db_name)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def first(self, query: str, values: Union[List, Dict],
                    db_name: str = 'default') -> Optional[DictRow]:
        return await self._first(query=query, values=values, db_name=db_name)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def insert(self, query: str, values: Union[List, Dict],
                     db_name: str = 'default', returning: bool = False):
        return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def update(self, query: str, values: Union[List, Dict],
                     db_name: str = 'default', returning: bool = False):
        return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def delete(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
        return await self._execute(query=query, values=values, db_name=db_name)
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def _select(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
        dbs = self.dbs[db_name]
        pool = dbs.get('slave') or dbs.get('master')
        if pool is None:
            raise RuntimeError('db {} master is not initialized'.format(db_name))

        async with pool.acquire() as conn:
            async with conn.cursor(cursor_factory=DictCursor) as cursor:
                await cursor.execute(query, values)
                return await cursor.fetchall()
项目:djaio    作者:Sberned    | 项目源码 | 文件源码
def _first(self, query: str, values: Union[List, Dict], db_name: str = 'default'):
        dbs = self.dbs[db_name]
        pool = dbs.get('slave') or dbs.get('master')
        if pool is None:
            raise RuntimeError('db {} master is not initialized'.format(db_name))

        async with pool.acquire() as conn:
            async with conn.cursor(cursor_factory=DictCursor) as cursor:
                await cursor.execute(query, values)
                return await cursor.fetchone()
项目:trf    作者:aistairc    | 项目源码 | 文件源码
def _calc_log_prob_scores(self) -> List[Union[None, float]]:
        """Get log likelihood scores by calling RNNLM
        """

        textfile = tempfile.NamedTemporaryFile(delete=True)
        content = '\n'.join([''.join(ts) for ts in self.tss]) + '\n'
        textfile.write(str.encode(content))
        textfile.seek(0)

        command = ['rnnlm',
                   '-rnnlm',
                   self.rnnlm_model_path,
                   '-test',
                   textfile.name]
        process = Popen(command, stdout=PIPE, stderr=PIPE)
        output, err = process.communicate()
        lines = [line.strip() for line in output.decode('UTF-8').split('\n')
                 if line.strip() != '']
        scores = []
        for line in lines:
            if line == const.OUT_OF_VOCABULARY:
                scores.append(None)
            else:
                try:
                    score = float(line)
                    scores.append(score)
                except ValueError:
                    pass
        textfile.close()
        return scores