我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Union()。
def waitFor(self, selectorOrFunctionOrTimeout: Union[str, int, float], options: dict = None, **kwargs: Any) -> Awaitable: """Wait until `selectorOrFunctionOrTimeout`.""" if options is None: options = dict() options.update(kwargs) if isinstance(selectorOrFunctionOrTimeout, (int, float)): fut: Awaitable[None] = asyncio.ensure_future( asyncio.sleep(selectorOrFunctionOrTimeout)) return fut if not isinstance(selectorOrFunctionOrTimeout, str): fut = asyncio.get_event_loop().create_future() fut.set_exception(TypeError( 'Unsupported target type: ' + str(type(selectorOrFunctionOrTimeout)) )) return fut if ('=>' in selectorOrFunctionOrTimeout or selectorOrFunctionOrTimeout.strip().startswith('function')): return self.waitForFunction(selectorOrFunctionOrTimeout, options) return self.waitForSelector(selectorOrFunctionOrTimeout, options)
def __init__(self, host: str = '127.0.0.1', port: int = 11300, encoding: Optional[str] = 'utf-8', use: str = DEFAULT_TUBE, watch: Union[str, Iterable[str]] = DEFAULT_TUBE) -> None: self._sock = socket.create_connection((host, port)) self._reader = self._sock.makefile('rb') # type: BinaryIO self.encoding = encoding if use != DEFAULT_TUBE: self.use(use) if isinstance(watch, str): if watch != DEFAULT_TUBE: self.watch(watch) self.ignore(DEFAULT_TUBE) else: for tube in watch: self.watch(tube) if DEFAULT_TUBE not in watch: self.ignore(DEFAULT_TUBE)
def retspec(defs): # return specs # only return 1, so if there is more than one type # we need to include a union # In truth there is only 1 return # Error or the expected Type if not defs: return None if defs in basic_types: return strcast(defs, False) rtypes = _registry.getObj(_types[defs]) if not rtypes: return None if len(rtypes) > 1: return Union[tuple([strcast(r[1], True) for r in rtypes])] return strcast(rtypes[0][1], False)
def type_anno_func(func, defs, is_result=False): annos = {} if not defs: return func rtypes = _registry.getObj(_types[defs]) if is_result: kn = "return" if not rtypes: annos[kn] = None elif len(rtypes) > 1: annos[kn] = Union[tuple([r[1] for r in rtypes])] else: annos[kn] = rtypes[0][1] else: for name, rtype in rtypes: name = name_to_py(name) annos[name] = rtype func.__annotations__.update(annos) return func
def calc_mean_lp_scores(log_prob_scores: List[float], lengths: List[int]) -> List[Union[None, float]]: r""" .. math: \frac{% \log P_\text{model}\left(\xi\right) }{% \text{length}\left(\xi\right) } >>> '{:.3f}'.format(calc_mean_lp_scores([-14.7579], [4])[0]) '-3.689' """ mean_lp_scores = [] for score, length in zip(log_prob_scores, lengths): x = None \ if score is None or length == 0 \ else float(score) / float(length) mean_lp_scores.append(x) return mean_lp_scores
def calc_norm_lp_div_scores( log_prob_scores: List[float], unigram_scores: List[float]) -> List[Union[None, float]]: r""" .. math: \frac{% \log P_\text{model}\left(\xi\right) }{% \log P_\text{unigram}\left(\xi\right) } >>> '{:.3f}'.format(calc_norm_lp_div_scores([-14.7579], [-35.6325])[0]) '-0.414' """ results = [] for log_prob, unigram_score in zip(log_prob_scores, unigram_scores): if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05): x = None else: x = (-1.0) * float(log_prob) / float(unigram_score) results.append(x) return results
def calc_norm_lp_sub_scores( log_prob_scores: List[float], unigram_scores: List[float]) -> List[Union[None, float]]: r""" .. math: \log P_\text{model}\left(\xi\right) - \log P_\text{unigram}\left(\xi\right) >>> '{:.3f}'.format(calc_norm_lp_sub_scores([-14.7579], [-35.6325])[0]) '20.875' """ results = [] for log_prob, unigram_score in zip(log_prob_scores, unigram_scores): if log_prob is None or numpy.isclose(unigram_score, 0.0, rtol=1e-05): x = None else: x = float(log_prob) - float(unigram_score) results.append(x) return results
def __init__(self, client: Session, ignoreHTTPSErrors: Any, options: dict = None, **kwargs: Any) -> None: """Make new navigator watcher.""" if options is None: options = {} options.update(kwargs) self._client = client self._ignoreHTTPSErrors = ignoreHTTPSErrors self._timeout = options.get('timeout', 3000) self._idleTime = options.get('networkIdleTimeout', 1000) self._idleTimer: Optional[Union[asyncio.Future, asyncio.Handle]] = None self._idleInflight = options.get('networkIdleInflight', 2) self._waitUntil = options.get('waitUntil', 'load') if self._waitUntil not in ('load', 'networkidle'): raise ValueError( f'Unknown value for options.waitUntil: {self._waitUntil}')
def convertPrintParameterToInches(parameter: Union[None, int, float, str] ) -> Optional[float]: """Convert print parameter to inches.""" if parameter is None: return None if isinstance(parameter, (int, float)): pixels = parameter elif isinstance(parameter, str): text = parameter unit = text[-2:].lower() if unit in unitToPixels: valueText = text[:-2] else: unit = 'px' valueText = text try: value = float(valueText) except ValueError: raise ValueError('Failed to parse parameter value: ' + text) pixels = value * unitToPixels[unit] else: raise TypeError('page.pdf() Cannot handle parameter type: ' + str(type(parameter))) return pixels / 96
def get_parents_of(self, name: Union[str, Hyperparameter]) -> List[Hyperparameter]: """Return the parent hyperparameters of a given hyperparameter. Parameters ---------- name : str or Hyperparameter Can either be the name of a hyperparameter or the hyperparameter object. Returns ------- list List with all parent hyperparameters. """ conditions = self.get_parent_conditions_of(name) parents = [] # type: List[Hyperparameter] for condition in conditions: parents.extend(condition.get_parents()) return parents
def content(self) -> Union[bytes, bytearray, None]: ''' Retrieves the content in the original form. Private codes should NOT use this as it incurs duplicate encoding/decoding. ''' if self._content is None: raise ValueError('content is not set.') if self.content_type == 'application/octet-stream': return self._content elif self.content_type == 'application/json': return json.loads(self._content.decode('utf-8'), object_pairs_hook=OrderedDict) elif self.content_type == 'text/plain': return self._content.decode('utf-8') elif self.content_type == 'multipart/form-data': return self._content else: raise RuntimeError('should not reach here') # pragma: no cover
def _extract_positional_label_by_id(self, files: Iterable[Path]) -> Dict[str, Union[PositionalLabel, str]]: xml_ending = ".xml" microphone_endings = [ "_Yamaha", "_Kinect-Beam", "_Kinect-RAW", "_Realtek", "_Samson", "_Microsoft-Kinect-Raw" ] xml_files = [file for file in files if file.name.endswith(xml_ending) if self.id_filter_regex.match(name_without_extension(file))] return OrderedDict( (name_without_extension(file) + microphone_ending, self._extract_label_from_xml(file)) for file in xml_files for microphone_ending in microphone_endings if (Path(file.parent) / (name_without_extension(file) + microphone_ending + ".wav")).exists())
def with_default(self, value: Union[Any, Callable[[MutableMapping[str, Any]], Any]], supplies_type: Type = None) -> "QueryValidator": if self._current is None or self._current.child is not None: raise QueryValidatorStructureError("No key is selected! Try using \"can_have\" before \"with_default\".") if self._current.required: raise QueryValidatorStructureError("Can't assign a default value to a required key! Try using \"can_have\" instead of \"have\".") if supplies_type: expected_type = supplies_type else: expected_type = type(value) default_node = _DefaultValueNode(self._current.key, value, supplies_type) result = self.as_(expected_type) result._current.child.child = default_node return result
def _ecies_gen_ephemeral_key( recp_pubkey: Union[bytes, elliptic_curve.ec_element] ) -> Tuple[bytes, Tuple[bytes, bytes]]: """ Generates and encrypts an ephemeral key for the `recp_pubkey`. :param recp_pubkey: Recipient's pubkey :return: Tuple of the eph_privkey, and a tuple of the encrypted symmetric key, and encrypted ephemeral privkey """ symm_key, enc_symm_key = API.ecies_encapsulate(recp_pubkey) eph_privkey = API.ecies_gen_priv() enc_eph_privkey = API.symm_encrypt(symm_key, eph_privkey) return (eph_privkey, (enc_symm_key, enc_eph_privkey))
def ecdsa_priv2pub( privkey: bytes, to_bytes: bool = True ) -> Union[bytes, Tuple[int]]: """ Returns the public component of an ECDSA private key. :param privkey: Private key as an int or bytestring :param to_bytes: Serialize to bytes or not? :return: Byte encoded or Tuple[int] ECDSA pubkey """ pubkey = privtopub(privkey) if to_bytes: return SIG_KEYPAIR_BYTE + PUB_KEY_BYTE + ecdsa_pub2bytes(pubkey) return pubkey
def ecdsa_verify( v: int, r: int, s: int, msghash: bytes, pubkey: Union[bytes, Tuple[int, int]] ) -> bool: """ Takes a v, r, s, a pubkey, and a hash of a message to verify via ECDSA. :param v: V of sig :param r: R of sig :param s: S of sig :param bytes msghash: The hashed message to verify :param bytes pubkey: Pubkey to validate signature for :rtype: Boolean :return: Is the signature valid or not? """ if bytes == type(pubkey): pubkey = ecdsa_bytes2pub(pubkey) verify_sig = ecdsa_raw_recover(msghash, (v, r, s)) # TODO: Should this equality test be done better? return verify_sig == pubkey
def ecies_priv2pub( privkey: Union[bytes, elliptic_curve.ec_element], to_bytes: bool = True ) -> Union[bytes, elliptic_curve.ec_element]: """ Takes a private key (secret bytes or an elliptic_curve.ec_element) and derives the Public key from it. :param privkey: The Private key to derive the public key from :param to_bytes: Return the byte serialization of the pubkey? :return: The Public component of the Private key provided """ if type(privkey) == bytes: privkey = priv_bytes2ec(privkey) pubkey = PRE.priv2pub(privkey) if to_bytes: return elliptic_curve.serialize(pubkey)[1:] return pubkey
def ecies_rekey( privkey_a: Union[bytes, elliptic_curve.ec_element], privkey_b: Union[bytes, elliptic_curve.ec_element], to_bytes: bool = True ) -> Union[bytes, umbral.RekeyFrag]: """ Generates a re-encryption key from privkey_a to privkey_b. :param privkey_a: Private key to re-encrypt from :param privkey_b: Private key to re-encrypt to :param to_bytes: Format result as bytes? :return: Re-encryption key """ if type(privkey_a) == bytes: privkey_a = priv_bytes2ec(privkey_a) if type(privkey_b) == bytes: privkey_b = priv_bytes2ec(privkey_b) rk = PRE.rekey(privkey_a, privkey_b) if to_bytes: return elliptic_curve.serialize(rk.key)[1:] return rk
def ecies_split_rekey( privkey_a: Union[bytes, elliptic_curve.ec_element], privkey_b: Union[bytes, elliptic_curve.ec_element], min_shares: int, total_shares: int ) -> List[umbral.RekeyFrag]: """ Performs a split-key re-encryption key generation where a minimum number of shares `min_shares` are required to reproduce a rekey. Will split a rekey into `total_shares`. :param privkey_a: Privkey to re-encrypt from :param privkey_b: Privkey to re-encrypt to :param min_shares: Minimum shares needed to reproduce rekey :param total_shares: Total shares to generate from split-rekey gen :return: A list of RekeyFrags to distribute """ if type(privkey_a) == bytes: privkey_a = priv_bytes2ec(privkey_a) if type(privkey_b) == bytes: privkey_b = priv_bytes2ec(privkey_b) umbral_rekeys = PRE.split_rekey(privkey_a, privkey_b, min_shares, total_shares) return [KFrag(umbral_kfrag=u) for u in umbral_rekeys]
def ecies_ephemeral_split_rekey( privkey_a: Union[bytes, elliptic_curve.ec_element], pubkey_b: Union[bytes, elliptic_curve.ec_element], min_shares: int, total_shares: int ) -> Tuple[List[umbral.RekeyFrag], Tuple[bytes, bytes]]: """ Performs a split-key re-encryption key generation where a minimum number of shares `min_shares` are required to reproduce a rekey. Will split a rekey inot `total_shares`. This also generates an ephemeral keypair for the recipient as `pubkey_b`. :param privkey_a: Privkey to re-encrypt from :param pubkey_b: Public key to re-encrypt for (w/ ephemeral key) :param min_shares: Minium shares needed to reproduce a rekey :param total_shares: Total shares to generate from split-rekey gen :return: A tuple containing a list of rekey frags, and a tuple of the encrypted ephemeral key data (enc_symm_key, enc_eph_privkey) """ eph_privkey, (encrypted_key, encrypted_message) = _internal._ecies_gen_ephemeral_key(pubkey_b) kfrags = ecies_split_rekey(privkey_a, eph_privkey, min_shares, total_shares) pfrag = PFrag(ephemeral_data_as_bytes=None, encrypted_key=encrypted_key, encrypted_message=encrypted_message) return (kfrags, pfrag)
def ecies_reencrypt( rekey: Union[bytes, umbral.RekeyFrag], enc_key: Union[bytes, umbral.EncryptedKey], ) -> umbral.EncryptedKey: """ Re-encrypts the key provided. :param rekey: Re-encryption key to use :param enc_key: Encrypted key to re-encrypt :return: The re-encrypted key """ if type(rekey) == bytes: rekey = umbral.RekeyFrag(None, priv_bytes2ec(rekey)) if type(enc_key) == bytes: enc_key = umbral.EncryptedKey(priv_bytes2ec(enc_key), None) reencrypted_data = PRE.reencrypt(rekey, enc_key) return CFrag(reencrypted_data=reencrypted_data)
def add_key(self, keypair: Union[keypairs.EncryptingKeypair, keypairs.SigningKeypair], store_pub: bool = True) -> bytes: """ Gets a fingerprint of the key and adds it to the keystore. :param key: Key, in bytes, to add to lmdb :return: Fingerprint, in bytes, of the added key """ if store_pub: fingerprint = self._get_fingerprint(keypair.pubkey) key = keypair.serialize_pubkey() else: fingerprint = self._get_fingerprint(keypair.privkey) key = keypair.serialize_privkey() # Create new Key object and commit to db self.session.add(Key(key)) self.session.commit() return fingerprint
def _generate_form_data(**kwargs: Union[Any, BotoFairu]) -> aiohttp.FormData: form_fata = aiohttp.FormData() for name, value in kwargs.items(): if isinstance(value, BotoFairu): form_fata.add_field( name, value._content, content_type=value._content_type, filename=value._filename, content_transfer_encoding=value._content_transfer_encoding) elif isinstance(value, (list, dict)): form_fata.add_field(name, json.dumps(value)) elif isinstance(value, (int, float, str, bool)): form_fata.add_field(name, str(value)) else: raise TypeError("Unknown Type: {}".format(type(value))) return form_fata
def largest_dimensions_for_aspect_ratio(dimensions_list: List[Dimensions], desired_aspect_ratio: float, default: Any = _sentinel) -> Union[Dimensions, Any]: """ Returns ------- The largest dimensions after cropping each dimensions to reach desired aspect ratio """ if not dimensions_list: if default is not _sentinel: return default raise ValueError(f"{dimensions_list} must not be empty.") largest_dimensions = crop_dimensions_to_aspect_ratio(dimensions_list[0], desired_aspect_ratio) for dimensions in dimensions_list[1:]: nearest_dimensions_to_aspect_ratio = crop_dimensions_to_aspect_ratio(dimensions, desired_aspect_ratio) if nearest_dimensions_to_aspect_ratio.resolution > largest_dimensions.resolution: largest_dimensions = nearest_dimensions_to_aspect_ratio return largest_dimensions
def __init__(self, sources=Opt[Union[List[Union[Source, 'VideoSourceList']], str]], **kwargs): """ Parameters ---------- sources A list of sources. Accepts arbitrarily nested video files, file globs, directories, VideoSources, and VideoSourceLists. """ self.name = None if isinstance(sources, str): self.name = paths.filename_from_path(sources) # Build list of sources from directory or file glob if os.path.isdir(sources): sources = self._sources_from_files(util.files_from_directory(sources)) else: sources = self._sources_from_files(globber.glob(sources)) else: # Convert any source files to VideoSources, and any lists, file globs or directories to VideoSourceLists sources = self._fill_in_sources(sources) super().__init__(sources, **kwargs)
def speed_multiply(self, speed: Union[float, Fraction], offset: Opt[int] = None): """ Speeds up or slows down events by grouping them together or splitting them up. For slowdowns, event type group boundaries and isolated events are preserved. Parameters ---------- speed Factor to speedup or slowdown by. Must be of the form x (speedup) or 1/x (slowdown), where x is a natural number. Otherwise, 0 to remove all events. offset Offsets the grouping of events for slowdowns. Takes a max offset of x - 1 for a slowdown of 1/x, where x is a natural number """ if speed == 0: self.clear() elif speed > 1: self._split(speed.numerator) elif speed < 1: self._merge(speed.denominator, offset)
def __init__(self, groups: Opt[Union[List[EventList], List[List[TIME_FORMAT]]]] = None, *, selected: List[EventList] = None): """ Parameters ---------- groups selected A subset of groups to track """ if groups is not None: for index, group in enumerate(groups): if not isinstance(group, EventList): # Convert event locations to EventList groups[index] = EventList(group) super().__init__(groups) self._selected_groups = selected if selected is not None else []
def send_email(loop: asyncio.AbstractEventLoop, mail_from: str, mail_to: Union[Iterable, str], subject: str, body: str, server: str='localhost') -> None: """Send an email to one or more recipients. Only supports plain text emails with a single message body. No attachments etc. """ if type(mail_to) == str: mail_to = [mail_to] smtp = aiosmtplib.SMTP(hostname=server, port=25, loop=loop) try: await smtp.connect() for rcpt in mail_to: msg = MIMEText(body) msg['Subject'] = subject msg['From'] = mail_from msg['To'] = rcpt await smtp.send_message(msg) await smtp.quit() except aiosmtplib.errors.SMTPException as e: log.msg('Error sending smtp notification: %s' % (str(e)), 'NOTIFICATIONS')
def require_bool(value: Optional[Union[bool, str, int]], convert: bool=False, allow_none: bool=False) -> Any: """Make sure a value is a boolean. Used when dealing with http input data. """ if value is None and allow_none: return value if type(value) != bool: if not convert: raise InvalidData() if value in [None, 0, '0', 'false', 'False']: value = False elif value in [1, '1', 'true', 'True']: value = True else: raise InvalidData('value was %s(%s), expected bool' % (type(value), value)) return cast(bool, value)
def __init__(self, id: int, args: Dict[str, str], monitor_def: ActiveMonitorDef, state: str, state_ts: float, msg: str, alert_id: Union[int, None], checks_enabled: bool, alerts_enabled: bool, manager: ActiveMonitorManager) -> None: self.id = id self.args = args self.monitor_def = monitor_def self.state = state self.manager = manager self.last_check_state = None # type: Optional[str] self.consecutive_checks = 0 self.last_check = time.time() self.msg = msg self.alert_id = alert_id self.state_ts = state_ts if not self.state_ts: self.state_ts = time.time() self.monitoring = False self.deleted = False self.checks_enabled = checks_enabled self.alerts_enabled = alerts_enabled self._pending_reset = False self.scheduled_job = None # type: Optional[asyncio.Handle] self.scheduled_job_ts = 0.0 event.running('CREATE_ACTIVE_MONITOR', monitor=self) stats.inc('num_monitors', 'ACT_MON')
def __init__(self, label: Union[str, int], label_namespace: str = 'labels', skip_indexing: bool = False) -> None: self.label = label self._label_namespace = label_namespace self._label_id = None self._maybe_warn_for_namespace(label_namespace) if skip_indexing: if not isinstance(label, int): raise ConfigurationError("In order to skip indexing, your labels must be integers. " "Found label = {}".format(label)) else: self._label_id = label else: if not isinstance(label, str): raise ConfigurationError("LabelFields must be passed a string label if skip_indexing=False. " "Found label: {} with type: {}.".format(label, type(label)))
def __init__(self, labels: Union[List[str], List[int]], sequence_field: SequenceField, label_namespace: str = 'labels') -> None: self.labels = labels self.sequence_field = sequence_field self._label_namespace = label_namespace self._indexed_labels = None self._maybe_warn_for_namespace(label_namespace) if len(labels) != sequence_field.sequence_length(): raise ConfigurationError("Label length and sequence length " "don't match: %d and %d" % (len(labels), sequence_field.sequence_length())) if all([isinstance(x, int) for x in labels]): self._indexed_labels = labels elif not all([isinstance(x, str) for x in labels]): raise ConfigurationError("SequenceLabelFields must be passed either all " "strings or all ints. Found labels {} with " "types: {}.".format(labels, [type(x) for x in labels]))
def from_dataset(cls, dataset, min_count: int = 1, max_vocab_size: Union[int, Dict[str, int]] = None, non_padded_namespaces: Sequence[str] = DEFAULT_NON_PADDED_NAMESPACES, pretrained_files: Optional[Dict[str, str]] = None, only_include_pretrained_words: bool = False) -> 'Vocabulary': """ Constructs a vocabulary given a :class:`.Dataset` and some parameters. We count all of the vocabulary items in the dataset, then pass those counts, and the other parameters, to :func:`__init__`. See that method for a description of what the other parameters do. """ logger.info("Fitting token dictionary from dataset.") namespace_token_counts: Dict[str, Dict[str, int]] = defaultdict(lambda: defaultdict(int)) for instance in tqdm.tqdm(dataset.instances): instance.count_vocab_items(namespace_token_counts) return Vocabulary(counter=namespace_token_counts, min_count=min_count, max_vocab_size=max_vocab_size, non_padded_namespaces=non_padded_namespaces, pretrained_files=pretrained_files, only_include_pretrained_words=only_include_pretrained_words)
def test_site(url: str, previous_results: dict, remote_host: str = None) -> Dict[str, Dict[str, Union[str, bytes]]]: # test first mx try: hostname = previous_results['mx_records'][0][1] except (KeyError, IndexError): return { 'jsonresult': { 'mime_type': 'application/json', 'data': b'', }, } jsonresult = run_testssl(hostname, True, remote_host) return { 'jsonresult': { 'mime_type': 'application/json', 'data': jsonresult, }, }
def _subscribe_to_topic(self, alias: str, topic: Union[bytes, str]): ''' Do the actual ZeroMQ subscription of a socket given by its alias to a specific topic. This method only makes sense to be called on SUB/SYNC_SUB sockets. Note that the handler is not set within this function. ''' topic = topic_to_bytes(topic) if isinstance(self.address[alias], AgentAddress): self.socket[alias].setsockopt(zmq.SUBSCRIBE, topic) elif isinstance(self.address[alias], AgentChannel): channel = self.address[alias] sub_address = channel.receiver treated_topic = channel.uuid + topic self.socket[sub_address].setsockopt(zmq.SUBSCRIBE, treated_topic) else: raise NotImplementedError('Unsupported address type %s!' % self.address[alias])
def topics_to_bytes(handlers: Dict[Union[bytes, str], Any], uuid: bytes = b''): ''' Given some pairs topic/handler, leaves them prepared for making the actual ZeroMQ subscription. Parameters ---------- handlers Contains pairs "topic - handler". uuid uuid of the SYNC_PUB/SYNC_SUB channel (if applies). For normal PUB/SUB communication, this should be `b''`. Returns ------- Dict[bytes, Any] ''' curated_handlers = {} for topic, value in handlers.items(): topic = topic_to_bytes(topic) curated_handlers[uuid + topic] = value return curated_handlers
def _parse_simple_yaml(buf: bytes) -> Stats: data = buf.decode('ascii') assert data[:4] == '---\n' data = data[4:] # strip YAML head stats = {} for line in data.splitlines(): key, value = line.split(': ', 1) try: v = int(value) # type: Union[int, str] except ValueError: v = value stats[key] = v return stats
def reverse_session_view(request: HttpRequest, pk: int) -> Union[HttpRequest, HttpResponseRedirect]: session = get_object_or_404(CashdeskSession, pk=pk) if request.method == 'POST': try: reverse_session(session) except FlowError as e: messages.error(request, str(e)) else: messages.success(request, _('All transactions in the session have been cancelled.')) return redirect('backoffice:session-detail', pk=pk) elif request.method == 'GET': return render(request, 'backoffice/reverse_session.html', { 'session': session, })
def rep1sep(parser: Union[Parser, Sequence[Input]], separator: Union[Parser, Sequence[Input]]) \ -> RepeatedOnceSeparatedParser: """Match a parser one or more times separated by another parser. This matches repeated sequences of ``parser`` separated by ``separator``. If there is at least one match, a list containing the values of the ``parser`` matches is returned. The values from ``separator`` are discarded. If it does not match ``parser`` at all, it fails. Args: parser: Parser or literal separator: Parser or literal """ if isinstance(parser, str): parser = lit(parser) if isinstance(separator, str): separator = lit(separator) return RepeatedOnceSeparatedParser(parser, separator)
def repsep(parser: Union[Parser, Sequence[Input]], separator: Union[Parser, Sequence[Input]]) \ -> RepeatedSeparatedParser: """Match a parser zero or more times separated by another parser. This matches repeated sequences of ``parser`` separated by ``separator``. A list is returned containing the value from each match of ``parser``. The values from ``separator`` are discarded. If there are no matches, an empty list is returned. Args: parser: Parser or literal separator: Parser or literal """ if isinstance(parser, str): parser = lit(parser) if isinstance(separator, str): separator = lit(separator) return RepeatedSeparatedParser(parser, separator)
def select_channel( self, versions: typing.Set[CustomVersion], update_channel: str = channel.STABLE ) -> typing.Union[CustomVersion, None]: """ Selects the latest version, equals or higher than "channel" Args: versions: versions to select from update_channel: member of :class:`Channel` Returns: latest version or None """ LOGGER.debug(f'selecting latest version amongst {len(versions)}; active channel: {str(channel)}') options = list(self.filter_channel(versions, update_channel)) if options: latest = max(options) return latest LOGGER.debug('no version passed the test') return None
def load_yaml(fname: str) -> Union[List, Dict]: """Load a YAML file.""" try: with open(fname, encoding='utf-8') as conf_file: # If configuration file is empty YAML returns None # We convert that to an empty dict return yaml.load(conf_file, Loader=SafeLineLoader) or {} except yaml.YAMLError as exc: logger.error(exc) raise ScarlettError(exc) except UnicodeDecodeError as exc: logger.error('Unable to read file %s: %s', fname, exc) raise ScarlettError(exc) # def clear_secret_cache() -> None: # """Clear the secret cache. # # Async friendly. # """ # __SECRET_CACHE.clear()
def select(self, query: str, values: Union[List, Dict], db_name: str = 'default') -> List[DictRow]: return await self._select(query=query, values=values, db_name=db_name)
def first(self, query: str, values: Union[List, Dict], db_name: str = 'default') -> Optional[DictRow]: return await self._first(query=query, values=values, db_name=db_name)
def insert(self, query: str, values: Union[List, Dict], db_name: str = 'default', returning: bool = False): return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
def update(self, query: str, values: Union[List, Dict], db_name: str = 'default', returning: bool = False): return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
def delete(self, query: str, values: Union[List, Dict], db_name: str = 'default'): return await self._execute(query=query, values=values, db_name=db_name)
def _select(self, query: str, values: Union[List, Dict], db_name: str = 'default'): dbs = self.dbs[db_name] pool = dbs.get('slave') or dbs.get('master') if pool is None: raise RuntimeError('db {} master is not initialized'.format(db_name)) async with pool.acquire() as conn: async with conn.cursor(cursor_factory=DictCursor) as cursor: await cursor.execute(query, values) return await cursor.fetchall()
def _first(self, query: str, values: Union[List, Dict], db_name: str = 'default'): dbs = self.dbs[db_name] pool = dbs.get('slave') or dbs.get('master') if pool is None: raise RuntimeError('db {} master is not initialized'.format(db_name)) async with pool.acquire() as conn: async with conn.cursor(cursor_factory=DictCursor) as cursor: await cursor.execute(query, values) return await cursor.fetchone()
def _calc_log_prob_scores(self) -> List[Union[None, float]]: """Get log likelihood scores by calling RNNLM """ textfile = tempfile.NamedTemporaryFile(delete=True) content = '\n'.join([''.join(ts) for ts in self.tss]) + '\n' textfile.write(str.encode(content)) textfile.seek(0) command = ['rnnlm', '-rnnlm', self.rnnlm_model_path, '-test', textfile.name] process = Popen(command, stdout=PIPE, stderr=PIPE) output, err = process.communicate() lines = [line.strip() for line in output.decode('UTF-8').split('\n') if line.strip() != ''] scores = [] for line in lines: if line == const.OUT_OF_VOCABULARY: scores.append(None) else: try: score = float(line) scores.append(score) except ValueError: pass textfile.close() return scores