我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Dict()。
def _send_receive(self, nummsgs: int, outformat: str='json', dataupdate: Optional[Dict[AnyStr, Any]]=None, restart_data: bool=True) -> List[Response]: if restart_data: self._restart_data(outformat) if dataupdate: self.data.update(dataupdate) self._add_to_buffer(nummsgs, outformat) self.sendbuffer.seek(0) processor, _ = get_processor_instance( outformat, custom_outbuffer=self.recvbuffer, custom_inbuffer=self.sendbuffer ) processor.process_requests(self.sendbuffer) return self._loadResults(outformat)
def calc_rs_modality(self) -> Dict[str, float]: modality_counter = Counter() for i, s in enumerate(self.sentences): chunks = [] for bnst in self.knp.parse(s).bnst_list(): chunk = Chunk(chunk_id=bnst.bnst_id, link=bnst.parent, description=bnst.fstring) chunks.append(chunk) s = "".join([chunk.description for chunk in chunks]) ms = set(re.findall("<?????-(.+?)>", s)) modality_counter += Counter(ms) n = len(self.sentences) return dict([(k, float(c) / n) for k, c in modality_counter.items()])
def _load_word_freq(self, threshold: int) -> Tuple[Dict[str, int], int]: n_total_words = 0 word_freq = {} with open(self.rnnlm_model_path, mode='r') as f: for line in f: n_total_words += 1 word, freq = line.split(' ') freq = int(freq) if freq > threshold: word_freq[word] = freq else: word_freq['<unk/>'] = word_freq.get('<unk/>', 0) + 1 return (word_freq, n_total_words)
def __init__(self, client: Session) -> None: """Make new NetworkManager.""" super().__init__() self._client = client self._requestIdToRequest: Dict[str, Request] = dict() self._interceptionIdToRequest: Dict[str, Request] = dict() self._extraHTTPHeaders: OrderedDict[str, str] = OrderedDict() self._requestInterceptionEnabled = False self._requestHashToRequestIds = Multimap() self._requestHashToInterceptions = Multimap() self._client.on('Network.requestWillBeSent', self._onRequestWillBeSent) self._client.on('Network.requestIntercepted', self._onRequestIntercepted) # noqa: E501 self._client.on('Network.responseReceived', self._onResponseReceived) self._client.on('Network.loadingFinished', self._onLoadingFinished) self._client.on('Network.loadingFailed', self._onLoadingFailed)
def waitForNavigation(self, options: dict = None, **kwargs: Any ) -> Optional[Response]: """Wait navigation completes.""" if options is None: options = dict() options.update(kwargs) watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors, options) responses: Dict[str, Response] = dict() listener = helper.addEventListener( self._networkManager, NetworkManager.Events.Response, lambda response: responses.__setitem__(response.url, response) ) await watcher.waitForNavigation() helper.removeEventListeners([listener]) return responses.get(self.url)
def __init__(self, options: Dict[str, Any] = None, **kwargs: Any) -> None: """Make new launcher.""" self.options = options or dict() self.options.update(kwargs) self.chrome_args = DEFAULT_ARGS self._tmp_user_data_dir: Optional[str] = None self._parse_args() if 'headless' not in self.options or self.options.get('headless'): self.chrome_args = self.chrome_args + [ '--headless', '--disable-gpu', '--hide-scrollbars', '--mute-audio', ] if 'executablePath' in self.options: self.exec = self.options['executablePath'] else: if not check_chromium(): download_chromium() self.exec = str(chromium_excutable()) self.cmd = [self.exec] + self.chrome_args
def _visibleCenter(self) -> Dict[str, int]: center = await self.evaluate(''' element => { if (!element.ownerDocument.contains(element)) return null; element.scrollIntoViewIfNeeded(); let rect = element.getBoundingClientRect(); return { x: (Math.max(rect.left, 0) + Math.min(rect.right, window.innerWidth)) / 2, y: (Math.max(rect.top, 0) + Math.min(rect.bottom, window.innerHeight)) / 2 }; } ''') # noqa: E501 if not center: # raise Exception('No node found for selector: ' + selector) raise BrowserError('No node found for selector: ') return center
def __init__(self, database, tabledesc=None, timestamp=False): # type: (str, Optional[Dict[str, str]], bool) -> None self.database = database if tabledesc is None: tabledesc = self.prefixdesc() self.tabledesc = tabledesc timestampsql = self._sqtimestamp if timestamp else '' sqcreate = self._sqcreate % timestampsql self.kv_create = sqcreate.format(**tabledesc) self.kv_get = self._sqget.format(**tabledesc) self.kv_mget = self._sqmget.format(**tabledesc) self.kv_put = self._sqput.format(**tabledesc) self.kv_delete = self._sqdelete.format(**tabledesc) self._connection = None # type: Optional[sqlite3.Connection] self.sqlite_limit_variable_number = 999 self.support_mget = True
def mget(self, keys): # type: (List[str]) -> List[Optional[bytes]] rows = [] if self.support_mget: try: with self.conn as conn: for somekeys in grouper(self.sqlite_limit_variable_number, keys): keylist = list(somekeys) questionmarks = ','.join(['?'] * len(keylist)) sql = self.kv_mget % questionmarks for row in conn.execute(sql, keylist): rows.append(row) resultdict = dict(rows) # type: Dict[str, bytes] rget = resultdict.get return [rget(k) for k in keys] except sqlite3.OperationalError: self.support_mget = False return [self.__get(k) for k in keys]
def contains_all_options(optiongroup, parentstyle, matchvalues=False): # type: (Dict[Any, Any], Style, bool) -> bool """Returns true if all options in optiongroup are present in parentstyle. If matchvalues is True, the values in optiongroup must also match those in the parentstyle. """ for optionname, value in optiongroup.items(): if optionname not in parentstyle: return False if isinstance(value, dict): parent_suboptions = parentstyle[optionname] if not contains_all_options(value, parent_suboptions, matchvalues=matchvalues): return False elif matchvalues: pvalue = parentstyle.get(optionname) if type(pvalue) != type(value): return False if pvalue != value: return False return True
def misses_to_frame(parsed_lexemes: Iterable, terms: Dict[str, str]=None) -> pd.DataFrame: if not terms: terms = {} miss_dict = collect_misses(parsed_lexemes) misses = [] for miss in miss_dict: low_miss = miss.lower() miss_record = OrderedDict() miss_record['miss'] = low_miss miss_record['term'] = terms.get(low_miss, low_miss) miss_record['lexemes'] = ' '.join(miss_dict[miss]) misses.append(miss_record) miss_frame = pd.DataFrame.from_records( misses, index='miss', columns=['miss', 'term', 'lexemes']) return miss_frame
def get_meta(self) -> t.Generator[t.Dict, None, None]: all_fields = self.model._meta.get_fields( include_parents=self.include_parents, include_hidden=self.include_hidden ) for f in all_fields: if f.name in self.exclude: continue if self.fields and f.name not in self.fields: continue if f.name not in self.fields: if f.concrete not in self.concrete_in: continue if f.auto_created not in self.auto_created_in: continue if f.editable not in self.editable_in: continue yield self.get_field_meta(f)
def update_game_state(self) -> None: c: Color v: Value discards: Dict[Color, List[int]] discards = {c: [0] * 6 for c in self.game.variant.pile_colors} d: int for d in self.game.discards: ci: CardInfo = gameCards[d] discards[ci.color][ci.value] += 1 self.nextCardPlay: Dict[Color, int] self.nextCardPlay = {c: len(self.game.playedCards[c]) + 1 for c in self.game.variant.pile_colors} self.maxCardPlay: Dict[Color, Value] self.maxCardPlay = {c: Value.V5 for c in self.game.variant.pile_colors} for c in self.game.variant.pile_colors: for v in reversed(Value): # type: ignore if discards[c][v] < v.num_copies: self.maxCardPlay[c] = v break
def received(self, type: str, resp: Dict[str, Any]) -> None: if type == 'message': self.message_received(resp['text']) elif type == 'init': raise Exception() elif type == 'advanced': # Fast UI Animations pass elif type == 'connected': # Players connected to the game pass elif type == 'notify': self.handle_notify(resp) elif type == 'action': self.decide_action(resp['can_clue'], resp['can_discard']) else: print(type, resp) raise Exception()
def updateMaxScore(self) -> None: maxScore: int = 0 s: Suit for s in self.variant.pile_suits: possible: int = 5 copies: Dict[Rank, int] = {r: 0 for r in Rank} d: int for d in self.discards[s]: card: ServerCard = self.deck[d] copies[card.rank] += 1 r: Rank for r in reversed(Rank): # type: ignore totalCopies: int = r.num_copies if self.variant == Variant.OneOfEach and s == Suit.Extra: totalCopies += 1 if copies[r] == totalCopies: possible = r.value - 1 maxScore += possible self.maxScore = maxScore
def _extract_positional_label_by_id(self, files: Iterable[Path]) -> Dict[str, Union[PositionalLabel, str]]: xml_ending = ".xml" microphone_endings = [ "_Yamaha", "_Kinect-Beam", "_Kinect-RAW", "_Realtek", "_Samson", "_Microsoft-Kinect-Raw" ] xml_files = [file for file in files if file.name.endswith(xml_ending) if self.id_filter_regex.match(name_without_extension(file))] return OrderedDict( (name_without_extension(file) + microphone_ending, self._extract_label_from_xml(file)) for file in xml_files for microphone_ending in microphone_endings if (Path(file.parent) / (name_without_extension(file) + microphone_ending + ".wav")).exists())
def _input_dictionary_for_loss_net(self, labeled_spectrogram_batch: List[LabeledSpectrogram]) -> Dict[str, ndarray]: spectrograms = [x.z_normalized_transposed_spectrogram() for x in labeled_spectrogram_batch] labels = [x.label for x in labeled_spectrogram_batch] input_batch, prediction_lengths = self._input_batch_and_prediction_lengths(spectrograms) # Sets learning phase to training to enable dropout (see backend.learning_phase documentation for more info): training_phase_flag_tensor = array([True]) label_lengths = reshape(array([len(label) for label in labels]), (len(labeled_spectrogram_batch), 1)) return { Wav2Letter.InputNames.input_batch: input_batch, Wav2Letter.InputNames.prediction_lengths: self._prediction_length_batch(prediction_lengths, batch_size=len(spectrograms)), Wav2Letter.InputNames.label_batch: self.grapheme_encoding.encode_label_batch(labels), Wav2Letter.InputNames.label_lengths: label_lengths, 'keras_learning_phase': training_phase_flag_tensor }
def create_articles(pmids: Set[str], login: object, write: bool = True) -> Dict[str, str]: """ Given a list of pmids, make article items for each :param pmids: list of pmids :param login: wdi_core login instance :param write: actually perform write :return: map pmid -> wdid """ pmid_map = dict() for pmid in pmids: p = wdi_helpers.PubmedItem(pmid) if write: try: pmid_wdid = p.get_or_create(login) except Exception as e: print("Error creating article pmid: {}, error: {}".format(pmid, e)) continue pmid_map[pmid] = pmid_wdid else: pmid_map[pmid] = 'Q1' return pmid_map
def __init__(self, variables: List[VariableIdentifier], lattices: Dict[Type, Type[Lattice]], arguments: Dict[Type, Dict[str, Any]] = defaultdict(lambda: dict())): """Create a mapping Var -> L from each variable in Var to the corresponding element in L. :param variables: list of program variables :param lattices: dictionary from variable types to the corresponding lattice types :param arguments: dictionary from variable types to arguments of the corresponding lattices """ super().__init__() self._variables = variables self._lattices = lattices self._arguments = arguments try: self._store = {v: lattices[type(v.typ)](**arguments[type(v.typ)]) for v in variables} except KeyError as key: error = f"Missing lattice for variable type {repr(key.args[0])}!" raise ValueError(error)
def preprocess_samples( self, states: List[Dict[str, float]], actions: List[Dict[str, float]], rewards: List[float], next_states: List[Dict[str, float]], next_actions: List[Dict[str, float]], is_terminals: List[bool], possible_next_actions: List[List[Dict[str, float]]], reward_timelines: List[Dict[int, float]], ) -> TrainingDataPage: tdp = GridworldContinuous.preprocess_samples( self, states, actions, rewards, next_states, next_actions, is_terminals, possible_next_actions, reward_timelines ) tdp.states = np.where(tdp.states == 1.0)[1].reshape(-1, 1 ).astype(np.float32) tdp.next_states = np.where(tdp.next_states == 1.0)[1].reshape( -1, 1 ).astype(np.float32) return tdp
def preprocess_samples( self, states: List[Dict[str, float]], actions: List[str], rewards: List[float], next_states: List[Dict[str, float]], next_actions: List[str], is_terminals: List[bool], possible_next_actions: List[List[str]], reward_timelines: Optional[List[Dict[int, float]]], ) -> TrainingDataPage: tdp = self.preprocess_samples_discrete( states, actions, rewards, next_states, next_actions, is_terminals, possible_next_actions, reward_timelines ) tdp.states = np.where(tdp.states == 1.0)[1].reshape(-1, 1 ).astype(np.float32) tdp.next_states = np.where(tdp.next_states == 1.0)[1].reshape( -1, 1 ).astype(np.float32) return tdp
def to_dict(self) -> Dict[str, str or float or Dict[str, str or int]]: """ :return: The message data as a dictionary which can be used to store a message as a JSON file """ sender_group = \ None if self.sender_group is None else self.sender_group.to_dict() return { "message_title": self.message_title, "message_body": self.message_body, "sender": self.sender.to_dict(), "sender_group": sender_group, "receiver": self.receiver.to_dict(), "timestamp": self.timestamp, }
def from_dict(data: Dict[str, str or float or Dict[str, str or int]])\ -> Message: """ Generates a Message object from a dictionary :param data: The dictionary to turn into a Message :return: The generates Message object """ if data["sender_group"] is None: sender_group = None else: sender_group = contact_from_dict(data["sender_group"]) return Message( data["message_title"], data["message_body"], contact_from_dict(data["receiver"]), contact_from_dict(data["sender"]), sender_group, data["timestamp"] )
def test_importing(self): """ Tests the String import handler method :return: None """ os_import = \ self.config_handler.__handle_import_statement__("import os") dict_import = \ self.config_handler.__handle_import_statement__( "from typing import Dict" ) from typing import Dict self.assertEqual(os, os_import) self.assertEqual(dict_import, Dict)
def get_unsent_reminders(database: sqlite3.Connection) \ -> List[Dict[str, str or int or datetime]]: """ Retrieves all unsent reminders from the database :param database: The database to use :return: A list of dictionaries that contain the reminder information """ results = database.execute( "SELECT reminder.id, reminder.msg_text, reminder.due_time, " "address_book.address, address_book.id, address_book.display_name " "FROM reminder " "JOIN address_book ON reminder.sender_id = address_book.id " "WHERE reminder.sent = 0") formatted_results = [] for result in results: formatted_results.append({ "id": result[0], "message": result[1], "due_time": convert_string_to_datetime(result[2]), "receiver": Contact(result[4], result[5], result[3]) }) return formatted_results
def thread_exists(thread: Dict[str, str or int], db: sqlite3.Connection) \ -> bool: """ Checks if a reddit discussion thread with the given thread parameters already exists :param thread: The thread to check for :param db: The database to check :return: True if the thread exists, false otherwise """ result = db.execute( "SELECT * FROM anime_reminder_threads " "WHERE show_name=? AND episode=? AND thread=?", (thread["show_name"], thread["episode"], thread["url"]) ) return len(result.fetchall()) > 0 # noinspection SqlNoDataSourceInspection,SqlDialectInspection,SqlResolve
def define_language_text(self) -> Dict[str, Dict[str, str]]: """ Defines the dictionary with which the text is translated in the translate() method. This should be in the form of a dictionary like this: { key: {"language": "text_in_language", ...}, ... } Keep in mind that every instance of the 'key' value is replaced while translating :return: The dictionary to create translations with """ raise NotImplementedError # noinspection PyMethodMayBeStatic
def send_alert_notification(settings: Dict, tmpl_args: Dict): attachment = { 'fallback': settings['tmpl-msg'].render(**tmpl_args), 'fields': [], } attachment['pretext'] = attachment['fallback'] if settings['tmpl-duration']: attachment['fields'].append({ 'title': 'Duration', 'value': settings['tmpl-duration'].render(**tmpl_args), 'short': False, }) if settings['tmpl-url']: attachment['fields'].append({ 'title': 'URL', 'value': settings['tmpl-url'].render(**tmpl_args), 'short': False, }) await send_slack_notification(settings['webhook-url'], [attachment])
def parse_settings(config: Any) -> Optional[Dict[str, Any]]: ret = { 'webhook-url': config.get('slack-webhook-url'), 'tmpl-msg': config.get('slack-tmpl-msg'), 'tmpl-duration': config.get('slack-tmpl-duration', fallback=''), 'tmpl-url': config.get('slack-tmpl-url', fallback='') } # type: Any if not ret['webhook-url'] or not ret['tmpl-msg']: log.debug('Slack settings missing, no slack notifications will be sent', 'NOTIFICATIONS') ret = None else: log.debug('Valid slack notification settings found', 'NOTIFICATIONS') ret['tmpl-msg'] = jinja2.Template(ret['tmpl-msg']) if ret['tmpl-duration']: ret['tmpl-duration'] = jinja2.Template(ret['tmpl-duration']) if ret['tmpl-url']: ret['tmpl-url'] = jinja2.Template(ret['tmpl-url']) return ret
def parse_settings(config: Any) -> Optional[Dict[str, Any]]: """Parse clicksend sms settings. Should only be called from sms.parse_settings. """ ret = { 'provider': 'clicksend', 'username': config.get('sms-clicksend-username'), 'api-key': config.get('sms-clicksend-api-key'), 'sender': config.get('sms-clicksend-sender'), 'tmpl': config.get('sms-tmpl'), } # type: Any if not ret['username'] or not ret['api-key'] or not ['sender'] or not ['tmpl']: log.msg('SMS settings missing, no sms notifications will be sent', 'NOTIFICATIONS') ret = None else: log.debug('Valid SMS notification settings found', 'NOTIFICATIONS') ret['tmpl'] = jinja2.Template(ret['tmpl']) return ret
def update_monitor_group(dbcon: DBConnection, monitor_group_id: int, data: Dict[str, Any]): """Update a monitor group in the database. Data is a dict with parent_id/name values that will be updated. """ async def _run(cur: Cursor) -> None: for key, value in data.items(): if key not in ['parent_id', 'name']: raise errors.IrisettError('invalid monitor_group key %s' % key) if key == 'parent_id' and value: if monitor_group_id == int(value): raise errors.InvalidArguments('monitor group can\'t be its own parent') if not await monitor_group_exists(dbcon, value): raise errors.InvalidArguments('parent monitor group does not exist') q = """update monitor_groups set %s=%%s where id=%%s""" % key q_args = (value, monitor_group_id) await cur.execute(q, q_args) await dbcon.transact(_run)
def require_dict(value: Optional[Dict[Any, Any]], key_type: Any=None, value_type: Any=None, allow_none: bool=False) -> Any: """Make sure a value is a Dict[key_type, value_type]. Used when dealing with http input data. """ if value is None and allow_none: return value if type(value) != dict: raise InvalidData('value was %s(%s), expected dict' % (type(value), value)) value = cast(Dict, value) if key_type or value_type: for k, v in value.items(): if key_type and type(k) != key_type: raise InvalidData('dict key was %s(%s), expected %s' % (type(k), k, key_type)) if value_type and type(v) != value_type: raise InvalidData('dict value was %s(%s), expected %s' % (type(v), v, key_type)) return value
def _get_monitor_metadata(self, dbcon: DBConnection) -> Optional[Dict[int, Dict[str, str]]]: include_metadata = require_bool( get_request_param(self.request, 'include_metadata', error_if_missing=False), convert=True) or False if not include_metadata: return None if 'id' in self.request.rel_url.query: metadata_models = await metadata.get_metadata_for_object( dbcon, 'active_monitor', require_int(cast(str, get_request_param(self.request, 'id')))) elif 'meta_key' in self.request.rel_url.query: meta_key = require_str(get_request_param(self.request, 'meta_key')) meta_value = require_str(get_request_param(self.request, 'meta_value')) metadata_models = await metadata.get_metadata_for_object_metadata( dbcon, meta_key, meta_value, 'active_monitor', 'active_monitors') elif 'monitor_group_id' in self.request.rel_url.query: metadata_models = await monitor_group.get_active_monitor_metadata_for_monitor_group( dbcon, require_int(cast(str, get_request_param(self.request, 'monitor_group_id')))) else: metadata_models = await metadata.get_metadata_for_object_type(dbcon, 'active_monitor') metadata_dict = {} # type: Dict[int, Dict[str, str]] for metadata_model in metadata_models: if metadata_model.object_id not in metadata_dict: metadata_dict[metadata_model.object_id] = {} metadata_dict[metadata_model.object_id][metadata_model.key] = metadata_model.value return metadata_dict
def __init__(self, config: Dict): self.config = config self.dbs = {}
def select(self, query: str, values: Union[List, Dict], db_name: str = 'default') -> List[DictRow]: return await self._select(query=query, values=values, db_name=db_name)
def first(self, query: str, values: Union[List, Dict], db_name: str = 'default') -> Optional[DictRow]: return await self._first(query=query, values=values, db_name=db_name)
def insert(self, query: str, values: Union[List, Dict], db_name: str = 'default', returning: bool = False): return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
def update(self, query: str, values: Union[List, Dict], db_name: str = 'default', returning: bool = False): return await self._execute(query=query, values=values, db_name=db_name, returning=returning)
def _execute(self, query: str, values: Union[List, Dict], db_name: str = 'default', returning: bool = False): pool = self.dbs[db_name]['master'] if pool is None: raise RuntimeError('db {} master is not initialized'.format(db_name)) async with pool.acquire() as conn: async with conn.cursor(cursor_factory=DictCursor) as cursor: await cursor.execute(query, values) if returning: return await cursor.fetchone() else: return cursor.rowcount
def _select(self, query: str, values: Union[List, Dict], db_name: str = 'default'): dbs = self.dbs[db_name] pool = dbs.get('slave') or dbs.get('master') if pool is None: raise RuntimeError('db {} master is not initialized'.format(db_name)) async with pool.acquire() as conn: async with conn.cursor(cursor_factory=DictCursor) as cursor: await cursor.execute(query, values) return await cursor.fetchall()
def _first(self, query: str, values: Union[List, Dict], db_name: str = 'default'): dbs = self.dbs[db_name] pool = dbs.get('slave') or dbs.get('master') if pool is None: raise RuntimeError('db {} master is not initialized'.format(db_name)) async with pool.acquire() as conn: async with conn.cursor(cursor_factory=DictCursor) as cursor: await cursor.execute(query, values) return await cursor.fetchone()
def _create_request_data(request: Request) -> Dict: return { 'url': str(request.url), 'query_string': request.query_string, 'method': request.method, 'headers': dict(request.headers), }
def assign_data(centroids: Sequence[Centroid], data: Iterable[Point]) -> Dict[Centroid, Sequence[Point]]: 'Assign data the closest centroid' d : DefaultDict[Point, List[Point]] = defaultdict(list) for point in data: centroid: Point = min(centroids, key=partial(dist, point)) d[centroid].append(point) return dict(d)
def quality(labeled: Dict[Centroid, Sequence[Point]]) -> float: 'Mean value of squared distances from data to its assigned centroid' return mean(dist(c, p) ** 2 for c, pts in labeled.items() for p in pts)
def calc_rs_pos(self) -> Dict[str, float]: """Calculate the ratio of each pos of words in input text Returns: float: the ratio of each pos of words in input text """ pos = [] # TODO: It may take a long time when the number of sentences are large for sentence in self.sentences: juman_result = self.juman.analysis(sentence) pos += [mrph.hinsi for mrph in juman_result.mrph_list()] pos_counter = Counter(pos) total = sum(pos_counter.values()) return {name: float(num) / total for name, num in pos_counter.items()}
def setExtraHTTPHeaders(self, extraHTTPHeaders: Dict[str, str] ) -> None: """Set extra http headers.""" self._extraHTTPHeaders = OrderedDict() headers = OrderedDict() # type: Dict[str, str] for k, v in extraHTTPHeaders.items(): self._extraHTTPHeaders[k] = v headers[k] = v await self._client.send('Network.setExtraHTTPHeaders', {'headers': headers})
def extraHTTPHeaders(self) -> Dict[str, str]: """Get extra http headers.""" return dict(**self._extraHTTPHeaders)
def setExtraHTTPHeaders(self, headers: Dict[str, str]): """Set extra http headers.""" return await self._networkManager.setExtraHTTPHeaders(headers)
def goto(self, url: str, options: dict = None, **kwargs: Any ) -> Optional[Response]: """Got to url.""" if options is None: options = dict() options.update(kwargs) watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors, options) responses: Dict[str, Response] = dict() listener = helper.addEventListener( self._networkManager, NetworkManager.Events.Response, lambda response: responses.__setitem__(response.url, response) ) result = asyncio.ensure_future(watcher.waitForNavigation()) referrer = self._networkManager.extraHTTPHeaders().get('referer', '') try: await self._client.send('Page.navigate', dict(url=url, referrer=referrer)) except Exception: watcher.cancel() raise await result helper.removeEventListeners([listener]) if self._frameManager.isMainFrameLoadingFailed(): raise PageError('Failed to navigate: ' + url) return responses.get(self.url)
def addEventListener(emitter: EventEmitter, eventName: str, handler: Callable ) -> Dict[str, Any]: """Add handler to the emitter and return emitter/handler.""" emitter.on(eventName, handler) return {'emitter': emitter, 'eventName': eventName, 'handler': handler}