我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Optional()。
def _send_receive(self, nummsgs: int, outformat: str='json', dataupdate: Optional[Dict[AnyStr, Any]]=None, restart_data: bool=True) -> List[Response]: if restart_data: self._restart_data(outformat) if dataupdate: self.data.update(dataupdate) self._add_to_buffer(nummsgs, outformat) self.sendbuffer.seek(0) processor, _ = get_processor_instance( outformat, custom_outbuffer=self.recvbuffer, custom_inbuffer=self.sendbuffer ) processor.process_requests(self.sendbuffer) return self._loadResults(outformat)
def format(self, content: Optional[QqTag], blanks_to_pars=True, keep_end_pars=True) -> str: """ :param content: could be QqTag or any iterable of QqTags :param blanks_to_pars: use blanks_to_pars (True or False) :param keep_end_pars: keep end paragraphs :return: str: text of tag """ if content is None: return "" out = [] for child in content: if isinstance(child, str): if blanks_to_pars: out.append(self.blanks_to_pars(html_escape( child, keep_end_pars))) else: out.append(html_escape(child)) else: out.append(self.handle(child)) return "".join(out)
def get_counter_for_tag(self, tag: QqTag) -> Optional[Counter]: name = tag.name counters = self.counters while True: if tag.exists('nonumber'): return None current = counters.get(name) if current is None: return None if isinstance(current, Counter): return current if isinstance(current, dict): counters = current tag = tag.parent name = tag.name continue return None
def __init__(self, client: Session, mouse: Mouse, touchscreen: Touchscreen ) -> None: """Make new frame manager.""" super().__init__() self._client = client self._mouse = mouse self._touchscreen = touchscreen self._frames: Dict[str, Frame] = dict() self._mainFrame: Optional[Frame] = None client.on('Page.frameAttached', lambda event: self._onFrameAttached( event.get('frameId', ''), event.get('parentFrameId', '')) ) client.on('Page.frameNavigated', lambda event: self._onFrameNavigated(event.get('frame'))) client.on('Page.frameDetached', lambda event: self._onFrameDetached(event.get('frameId'))) client.on('Runtime.executionContextCreated', lambda event: self._onExecutionContextCreated( event.get('context')))
def __init__(self, client: Session, ignoreHTTPSErrors: Any, options: dict = None, **kwargs: Any) -> None: """Make new navigator watcher.""" if options is None: options = {} options.update(kwargs) self._client = client self._ignoreHTTPSErrors = ignoreHTTPSErrors self._timeout = options.get('timeout', 3000) self._idleTime = options.get('networkIdleTimeout', 1000) self._idleTimer: Optional[Union[asyncio.Future, asyncio.Handle]] = None self._idleInflight = options.get('networkIdleInflight', 2) self._waitUntil = options.get('waitUntil', 'load') if self._waitUntil not in ('load', 'networkidle'): raise ValueError( f'Unknown value for options.waitUntil: {self._waitUntil}')
def waitForNavigation(self, options: dict = None, **kwargs: Any ) -> Optional[Response]: """Wait navigation completes.""" if options is None: options = dict() options.update(kwargs) watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors, options) responses: Dict[str, Response] = dict() listener = helper.addEventListener( self._networkManager, NetworkManager.Events.Response, lambda response: responses.__setitem__(response.url, response) ) await watcher.waitForNavigation() helper.removeEventListeners([listener]) return responses.get(self.url)
def __init__(self, exe, cache=None): # type: (str, Optional[Cache]) -> None if not os.path.isabs(exe): exe = which(exe) # type: ignore self.exe = unifilename(exe) self.cache = cache self._styledefinition = styledef_make() self.allow_encoding_change = False self.languages = [] # type: List[str] self.initial_style = style_make() # The are deleted after one call to minimize_errors self.globaltempfiles = set() # type: Set[str] # These are deleted after each round of attempts self.tempfiles = set() # type: Set[str] self.keeptempfiles = False self.version_string = formatter_version(exe)
def cmdargs_for_style(self, formatstyle, filename=None): # type: (Style, Optional[str]) -> List[str] assert isinstance(formatstyle, Style) configdata = bytestr(self.styletext(formatstyle)) sha = shahex(configdata) cfg = os.path.join(tempfile.gettempdir(), 'whatstyle_rustfmt_%s/%s' % (sha, self.configfilename)) try: dirpath = os.path.dirname(cfg) os.makedirs(dirpath) self.add_tempfile(dirpath) except OSError as exc: if exc.errno != errno.EEXIST: raise if not self.tempfile_exists(cfg): writebinary(cfg, configdata) self.add_tempfile(cfg) cmdargs = ['--config-path', cfg] return cmdargs
def __init__(self, database, tabledesc=None, timestamp=False): # type: (str, Optional[Dict[str, str]], bool) -> None self.database = database if tabledesc is None: tabledesc = self.prefixdesc() self.tabledesc = tabledesc timestampsql = self._sqtimestamp if timestamp else '' sqcreate = self._sqcreate % timestampsql self.kv_create = sqcreate.format(**tabledesc) self.kv_get = self._sqget.format(**tabledesc) self.kv_mget = self._sqmget.format(**tabledesc) self.kv_put = self._sqput.format(**tabledesc) self.kv_delete = self._sqdelete.format(**tabledesc) self._connection = None # type: Optional[sqlite3.Connection] self.sqlite_limit_variable_number = 999 self.support_mget = True
def mget(self, keys): # type: (List[str]) -> List[Optional[bytes]] rows = [] if self.support_mget: try: with self.conn as conn: for somekeys in grouper(self.sqlite_limit_variable_number, keys): keylist = list(somekeys) questionmarks = ','.join(['?'] * len(keylist)) sql = self.kv_mget % questionmarks for row in conn.execute(sql, keylist): rows.append(row) resultdict = dict(rows) # type: Dict[str, bytes] rget = resultdict.get return [rget(k) for k in keys] except sqlite3.OperationalError: self.support_mget = False return [self.__get(k) for k in keys]
def mget(self, keys): # type: (List[str]) -> List[Optional[bytes]] if not keys: return [] cached = [] uncached = [] # type: List[Tuple[int, Optional[bytes]]] contentkeys = super(DedupKeyValueStore, self).mget(keys) for idx, contentkey in enumerate(contentkeys): if contentkey is None: uncached.append((idx, None)) else: sha = binary_type(contentkey) cached.append((idx, unistr(sha))) if not cached: return [None for _, contentkey in uncached] indices, existing_keys = zip(*cached) existing_values = self.kvstore.mget(existing_keys) idx_value_pairs = sorted(uncached + list(zip(indices, existing_values))) return list([value for _, value in idx_value_pairs])
def mixtohash(self, args=(), # type: Sequence[AnyStr] exe=None, # type: Optional[str] depfiles=(), # type: Sequence[str] hashobj=None # type: Optional[Any] ): # type: (...) -> Any if hashobj is None: hashobj = HASHFUNC() for filename in depfiles: hashobj.update(sysfilename(filename)) hashobj.update(filesha(filename)) hashobj.update(b'\x00') for arg in args: hashobj.update(sysfilename(arg)) hashobj.update(b'\x00') if exe is not None: hashobj.update(self.digest_for_exe(exe)) return hashobj
def apply(func, # type: Callable[..., bytes] args=(), # type: Sequence[AnyStr] exe=None, # type: Optional[str] depfiles=(), # type: Sequence[str] cache=None # type: Optional[Cache] ): """Applies func(*args) when the result is not present in the cache. The result of func(*args) must be bytes and must not be None which is used as cache-miss indicator. After evaluation of func the result is stored in the cache. """ key, value = None, None if cache is not None: hashobj = cache.mixtohash(args, exe=exe, depfiles=depfiles) key = hashobj.hexdigest() value = cache.get(key) if value is None: value = func(*args) if key is not None: cache.set(key, value) return value
def unified_diff(filename, content2=None): # type: (str, Optional[bytes]) -> Tuple[int, Iterable[str]] """This function prints a unified diff of the contents of filename and the standard input, when used from the command line as follows: echo 123 > d.txt ; echo 456 | ./whatstyle.py --stdindiff d.txt We get this result: --- +++ @@ -1 +1 @@ -123 +456 """ use_stdin = content2 is None if content2 is None: # Read binary input stream stdin = rawstream(sys.stdin) econtent2 = bytestr(stdin.read()) else: econtent2 = content2 exit_code, diff = compute_unified_diff(filename, econtent2, lineterm='') if use_stdin: write('\n'.join(diff)) return exit_code, diff
def __init__(self, username: str, password: str, botModule: str, botconfig: Mapping, numPlayers: int, variant: Variant, spectators: bool, gameName: str, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self.username: str = username self.password: str = password module = importlib.import_module(botModule + '.bot') self.botCls: Type[Bot] = module.Bot # type: ignore self.botconfig: Mapping = botconfig self.numPlayers: int = numPlayers self.variant: Variant = variant self.spectators: bool = spectators self.gameName: str = gameName self.conn: socketIO_client.SocketIO self.tablePlayers: List[str] = [] self.readyToStart: bool = False self.game: Optional[Game] = None
def isNowPlayable(self, color: Optional[Color], value: Optional[Value]) -> bool: '''Returns True the color and/or value is playable''' assert color is not None or value is not None if color is not None and value is not None: return self.isPlayable(color, value) if color is not None: playableValue = len(self.game.playedCards[color]) + 1 if (playableValue <= 5 and not self.locatedCount[color][playableValue]): return True return False if value is not None: for c in self.colors: if (len(self.game.playedCards[c]) + 1 == value and not self.locatedCount[c][value]): return True return False assert False
def maybeGiveEarlyGameHint(self, highValue: bool) -> bool: bestHint: Hint = Hint() i: int for i in range(1, self.game.numPlayers): player: int = (self.position + i) % self.game.numPlayers candidate: Optional[Hint] candidate = self.bestEarlyHintForPlayer(player, highValue) if candidate is not None and candidate.fitness >= 0: handState: List[HandState] = self.handState(player) if HandState.Playable not in handState: candidate.fitness += (self.game.numPlayers - i) * 2 if candidate is not None and candidate.fitness > bestHint.fitness: bestHint = candidate if bestHint.fitness <= 0: return False bestHint.give(self) return True
def deck_draw(self, player: int, deckidx: int, color: Optional[Color], value: Optional[Value]) -> None: if player == self.botPosition: card = self.bot.create_own_card(deckidx) self.deck[deckidx] = card self.bot.drew_card(deckidx) else: color = Suit(color).color(self.variant) value = Rank(value).value_() card = self.bot.create_player_card(player, deckidx, color, value) self.deck[deckidx] = card self.players[player].drew_card(deckidx) self.bot.someone_drew(player, deckidx)
def register_command(handler: Callable[[argparse.Namespace], None], main_parser: Optional[ArgParserType]=None, ) -> Callable[[argparse.Namespace], None]: if main_parser is None: main_parser = global_argparser if id(main_parser) not in _subparsers: subparsers = main_parser.add_subparsers(title='commands', dest='command') _subparsers[id(main_parser)] = subparsers else: subparsers = _subparsers[id(main_parser)] @functools.wraps(handler) def wrapped(args): handler(args) doc_summary = handler.__doc__.split('\n\n')[0] inner_parser = subparsers.add_parser(handler.__name__.replace('_', '-'), description=handler.__doc__, help=doc_summary) inner_parser.set_defaults(function=wrapped) wrapped.register_command = functools.partial(register_command, main_parser=inner_parser) wrapped.add_argument = inner_parser.add_argument return wrapped
def _list(cls, user_id: int, is_active: Optional[bool]=None, fields: Optional[Iterable[str]]=None): if fields is None: fields = ( 'access_key', 'secret_key', 'is_active', 'is_admin', ) q = 'query($user_id: Int!, $is_active: Boolean) {' \ ' keypairs(user_id: $user_id, is_active: $is_active) {' \ ' $fields' \ ' }' \ '}' q = q.replace('$fields', ' '.join(fields)) vars = { 'user_id': user_id, 'is_active': is_active, } resp = yield Request('POST', '/admin/graphql', { 'query': q, 'variables': vars, }) data = resp.json() return data['keypairs']
def _get_or_create(cls, lang: str, client_token: Optional[str]=None, mounts: Optional[Iterable[str]]=None, envs: Optional[Mapping[str, str]]=None, max_mem: int=0, exec_timeout: int=0) -> str: if client_token: assert len(client_token) > 8 else: client_token = uuid.uuid4().hex resp = yield Request('POST', '/kernel/create', { 'lang': lang, 'clientSessionToken': client_token, 'config': { 'mounts': mounts, 'envs': envs, }, # 'limits': { # 'maxMem': max_mem, # 'execTimeout': exec_timeout, # }, }) data = resp.json() o = cls(data['kernelId']) # type: ignore o.created = data.get('created', True) # True is for legacy return o
def __init__(self, endpoint: Optional[str]=None, version: str='v2.20170315', user_agent: Optional[str]=None, access_key: Optional[str]=None, secret_key: Optional[str]=None, hash_type: Optional[str]=None) -> None: self._endpoint = \ endpoint if endpoint else get_env('ENDPOINT', self.DEFAULTS['endpoint']) self._version = \ version if version else self.DEFAULTS['version'] self._user_agent = \ user_agent if user_agent else self.DEFAULTS['user_agent'] self._access_key = \ access_key if access_key else get_env('ACCESS_KEY') self._secret_key = \ secret_key if secret_key else get_env('SECRET_KEY') self._hash_type = \ hash_type.lower() if hash_type else 'sha256'
def indices_to_load_by_target_index(allowed_characters_for_loaded_model: List[chr], allowed_characters: List[chr]) -> List[Optional[int]]: load_character_set = set(allowed_characters_for_loaded_model) target_character_set = set(allowed_characters) ignored = load_character_set - target_character_set if ignored: log("Ignoring characters {} from loaded model.".format(sorted(ignored))) extra = target_character_set - load_character_set if extra: log("Initializing extra characters {} not found in model.".format(sorted(extra))) def character_index_to_load(target_character: chr) -> Optional[int]: return single_or_none([index for index, character in enumerate(allowed_characters_for_loaded_model) if character == target_character]) character_mapping = [character_index_to_load(character) for character in allowed_characters] log("Character mapping: {}".format(character_mapping)) return character_mapping
def __init__(self, get_raw_audio: Callable[[], ndarray], sample_rate: int = 16000, id: Optional[str] = None, label: Optional[str] = "nolabel", fourier_window_length: int = 512, hop_length: int = 128, mel_frequency_count: int = 128, label_with_tags: str = None, positional_label: Optional[PositionalLabel] = None): super().__init__(id=id, label=label) # The default values for hop_length and fourier_window_length are powers of 2 near the values specified in the wave2letter paper. self.get_raw_audio = get_raw_audio self.sample_rate = sample_rate self.fourier_window_length = fourier_window_length self.hop_length = hop_length self.mel_frequency_count = mel_frequency_count self.label_with_tags = label_with_tags self.positional_label = positional_label
def load(corpus_csv_file: Path, sampled_training_example_count: Optional[int] = None) -> 'Corpus': import csv with corpus_csv_file.open(encoding='utf8') as opened_csv: reader = csv.reader(opened_csv, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL) def to_absolute(audio_file_path: Path) -> Path: return audio_file_path if audio_file_path.is_absolute() else Path( corpus_csv_file.parent) / audio_file_path examples = [ ( LabeledExampleFromFile( audio_file=to_absolute(Path(audio_file_path)), id=id, label=label, positional_label=None if positional_label == "" else PositionalLabel.deserialize( positional_label)), Phase[phase]) for id, audio_file_path, label, phase, positional_label in reader] return Corpus(training_examples=[e for e, phase in examples if phase == Phase.training], test_examples=[e for e, phase in examples if phase == Phase.test], sampled_training_example_count=sampled_training_example_count)
def get_max_q_values( self, next_states: np.ndarray, possible_next_actions: Optional[np.ndarray] = None, use_target_network: Optional[bool] = True ) -> np.ndarray: q_values = self.get_q_values_all_actions( next_states, use_target_network ) if possible_next_actions is not None: mask = np.multiply( np.logical_not(possible_next_actions), self.ACTION_NOT_POSSIBLE_VAL ) q_values += mask return np.max(q_values, axis=1, keepdims=True)
def get_q_values_all_actions( self, states: np.ndarray, use_target_network: Optional[bool] = True ) -> np.ndarray: """ Takes in a set of states and runs the test Q Network on them. Creates Q(states, actions), a blob with shape (batch_size, action_dim). Q(states, actions)[i][j] is an approximation of Q*(states[i], action_j). Note that action_j takes on every possible action (of which there are self.action_dim_. Stores blob in self.output_blob and returns its value. :param states: Numpy array with shape (batch_size, state_dim). Each row contains a representation of a state. :param possible_next_actions: Numpy array with shape (batch_size, action_dim). possible_next_actions[i][j] = 1 iff the agent can take action j from state i. :use_target_network: Boolean that indicates whether or not to use this trainer's TargetNetwork to compute Q values. """ if use_target_network: return self.target_network.target_values(states) return self.score(states)
def preprocess_samples( self, states: List[Dict[str, float]], actions: List[str], rewards: List[float], next_states: List[Dict[str, float]], next_actions: List[str], is_terminals: List[bool], possible_next_actions: List[List[str]], reward_timelines: Optional[List[Dict[int, float]]], ) -> TrainingDataPage: tdp = self.preprocess_samples_discrete( states, actions, rewards, next_states, next_actions, is_terminals, possible_next_actions, reward_timelines ) tdp.states = np.where(tdp.states == 1.0)[1].reshape(-1, 1 ).astype(np.float32) tdp.next_states = np.where(tdp.next_states == 1.0)[1].reshape( -1, 1 ).astype(np.float32) return tdp
def add_assignment( self, *, service_code: netsgiro.ServiceCode, assignment_type: netsgiro.AssignmentType, agreement_id: Optional[str]=None, number: str, account: str, date: Optional[datetime.date]=None ) -> 'Assignment': """Add an assignment to the tranmission.""" assignment = Assignment( service_code=service_code, type=assignment_type, agreement_id=agreement_id, number=number, account=account, date=date, ) self.assignments.append(assignment) return assignment
def create(cls, texts: List[Any], name: str, *, locations: Opt[List[float]] = None, durations: Opt[List[float]] = None, default: bool = False) -> 'SubtitleTrack': subtitles = [] if locations: start_times, end_times = loc_util.start_end_locations_from_locations(locations) elif durations: start_times, end_times = loc_util.start_end_locations_from_intervals(durations) else: raise ex.ParameterError(f"Must provide either locations or durations for the subtitle track {name}") for index, (text, start_time, end_time) in enumerate(zip(texts, start_times, end_times)): subtitle = Subtitle(str(text), start_time, end_time) subtitles.append(subtitle) subtitle_track = cls(subtitles, name, default=default) return subtitle_track
def first(self, query: str, values: Union[List, Dict], db_name: str = 'default') -> Optional[DictRow]: return await self._first(query=query, values=values, db_name=db_name)
def post_message(user: User, text: str, timestamp: Optional[Timestamp]=None) -> None: user = intern(user) timestamp = timestamp or time() post = Post(timestamp, user, text) posts.appendleft(post) user_posts[user].appendleft(post)
def posts_by_user(user: User, limit: Optional[int] = None) -> List[Post]: return list(islice(user_posts[user], limit))
def posts_for_user(user: User, limit: Optional[int] = None) -> List[Post]: relevant = merge(*[user_posts[u] for u in following[user]], reverse=True) return list(islice(relevant, limit))
def search(phrase:str, limit: Optional[int] = None) -> List[Post]: # XXX this could benefit from caching and from preindexing return list(islice((post for post in posts if phrase in post.text), limit))
def hash_password(password: str, salt: Optional[bytes] = None) -> HashAndSalt: pepper = b'alchemists discovered that gold came from earth air fire and water' salt = salt or secrets.token_bytes(16) salted_pass = salt + password.encode('utf-8') return hashlib.pbkdf2_hmac('sha512', salted_pass, pepper, 100000), salt
def write_and_open_in_browser(html: str, filepath: Optional[str]): ''' Writes html to filepath and opens it. ''' file = NamedTemporaryFile('w', encoding='utf-8', delete=False) if filepath is None else open(filepath, 'w', encoding='utf-8') file.write(html) file.close() webbrowser.open(file.name)
def render(posts: Iterable[Post], output_filepath: Optional[str] = None): ''' Puts html from posts into output_filepath (.html) and opens that in a specified browser ''' posts = list(posts) if posts: write_and_open_in_browser(render_to_html(posts), output_filepath) else: print('Nothing found')
def split_by_predicate(seq: Sequence[T], predicate, zero_delim: Optional[T]=None) \ -> Iterator[Sequence[T]]: """ Splits a sequence by delimeters that satisfy predicate, keeping the delimeters split_by_predicate([0, "One", 1, 2, 3, "Two", 4, 5, 6, 7, "Three", "Four"], predicate=lambda x: isinstance(x, str), zero_delim="Nothing") [["Nothing", 0], ["One", 1, 2, 3], ["Two", 4, 5, 6, 7], ["Three"], ["Four"]] :param seq: sequence to proceed :param predicate: checks whether element is delimeter :param zero_delim: pseudo-delimter prepended to the sequence :return: sequence of sequences """ g = [zero_delim] for el in seq: if predicate(el): yield g g = [] g.append(el) yield g # END BASED
def spawn_or_create_counter(parent: Optional[Counter]): if parent is not None: return parent.spawn_child() else: return Counter()
def url_for_eq_snippet(self, label: str) -> Optional[str]: """ Returns url for equation snippet by label Should be implemented in subclass :param label: :return: """ raise NotImplementedError
def __init__(self, *, warnings: Optional[bool]=None, highlight: Optional[bool]=None, frame_context_length: int=50): self._show_warnings = self._env_bool(warnings, 'PY_DEVTOOLS_WARNINGS', True) self._highlight = self._env_bool(highlight, 'PY_DEVTOOLS_HIGHLIGHT', None) # 50 lines should be enough to make sure we always get the entire function definition self._frame_context_length = frame_context_length
def mainFrame(self) -> Optional['Frame']: """Retrun main frame.""" return self._mainFrame
def __init__(self, client: Session, mouse: Mouse, touchscreen: Touchscreen, parentFrame: Optional['Frame'], frameId: str) -> None: """Make new frame.""" self._client = client self._mouse = mouse self._touchscreen = touchscreen self._parentFrame = parentFrame self._url = '' self._detached = False self._id = frameId self._defaultContextId = '<not-initialized>' self._waitTasks: Set[WaitTask] = set() # maybe list self._childFrames: Set[Frame] = set() # maybe list if self._parentFrame: self._parentFrame._childFrames.add(self)
def querySelector(self, selector: str) -> Optional['ElementHandle']: """Get element which matches `selector` string. If `selector` matches multiple elements, return first-matched element. """ remoteObject = await self._rawEvaluate( 'selector => document.querySelector(selector)', selector) if remoteObject.get('subtype') == 'node': return ElementHandle(self._client, remoteObject, self._mouse, self._touchscreen) await helper.releaseObject(self._client, remoteObject) return None
def querySelectorEval(self, selector: str, pageFunction: str, *args: Any) -> Optional[Any]: """Execute function on element which matches selector.""" elementHandle = await self.querySelector(selector) if elementHandle is None: raise PageError( f'Error: failed to find element matching selector "{selector}"' ) result = await elementHandle.evaluate(pageFunction, *args) await elementHandle.dispose() return result
def clearTimeout(fut: Optional[Union[asyncio.Future, asyncio.Handle]]) -> None: """Cancel timer task.""" if fut: fut.cancel()
def mainFrame(self) -> Optional['Frame']: """Get main frame.""" return self._frameManager._mainFrame
def querySelector(self, selector: str) -> Optional['ElementHandle']: """Get Element which matches `selector`.""" frame = self.mainFrame if not frame: raise PageError('no main frame.') return await frame.querySelector(selector)
def querySelectorEval(self, selector: str, pageFunction: str, *args: Any) -> Optional[Any]: """Execute function on element which matches selector.""" frame = self.mainFrame if not frame: raise PageError('no main frame.') return await frame.querySelectorEval(selector, pageFunction, *args)