我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用typing.Any()。
def waitFor(self, selectorOrFunctionOrTimeout: Union[str, int, float], options: dict = None, **kwargs: Any) -> Awaitable: """Wait until `selectorOrFunctionOrTimeout`.""" if options is None: options = dict() options.update(kwargs) if isinstance(selectorOrFunctionOrTimeout, (int, float)): fut: Awaitable[None] = asyncio.ensure_future( asyncio.sleep(selectorOrFunctionOrTimeout)) return fut if not isinstance(selectorOrFunctionOrTimeout, str): fut = asyncio.get_event_loop().create_future() fut.set_exception(TypeError( 'Unsupported target type: ' + str(type(selectorOrFunctionOrTimeout)) )) return fut if ('=>' in selectorOrFunctionOrTimeout or selectorOrFunctionOrTimeout.strip().startswith('function')): return self.waitForFunction(selectorOrFunctionOrTimeout, options) return self.waitForSelector(selectorOrFunctionOrTimeout, options)
def _send_receive(self, nummsgs: int, outformat: str='json', dataupdate: Optional[Dict[AnyStr, Any]]=None, restart_data: bool=True) -> List[Response]: if restart_data: self._restart_data(outformat) if dataupdate: self.data.update(dataupdate) self._add_to_buffer(nummsgs, outformat) self.sendbuffer.seek(0) processor, _ = get_processor_instance( outformat, custom_outbuffer=self.recvbuffer, custom_inbuffer=self.sendbuffer ) processor.process_requests(self.sendbuffer) return self._loadResults(outformat)
def get_processor_instance(format_: str, custom_inbuffer: InBuffer=None, custom_outbuffer: OutBuffer=None) -> Tuple[Any, Any]: """ Get a processor instance. The class and buffers will be selected based on the python_driver.ProcessorConfigs dictionary. The input and output buffers can be overriden using the custom_inbuffer and custom_outbuffer parameters. This is mainly useful for unittesting. """ conf = ProcessorConfigs.get(format_) if not conf: raise RequestInstantiationException('No RequestProcessor found for format %s' % format_) inbuffer = custom_inbuffer if custom_inbuffer else conf['inbuffer'] outbuffer = custom_outbuffer if custom_outbuffer else conf['outbuffer'] instance = conf['class'](outbuffer) # type: ignore return instance, inbuffer
def start(self, options: dict = None, **kwargs: Any) -> None: """Start.""" options = options or dict() options.update(kwargs) categoriesArray = [ '-*', 'devtools.timeline', 'v8.execute', 'disabled-by-default-devtools.timeline', 'disabled-by-default-devtools.timeline.frame', 'toplevel', 'blink.console', 'blink.user_timing', 'latencyInfo', 'disabled-by-default-devtools.timeline.stack', 'disabled-by-default-v8.cpu_profiler', ] if 'screenshots' in options: categoriesArray.append('disabled-by-default-devtools.screenshot') self._path = options.get('path', '') self._recording = True await self._client.send('Tracing.start', { 'transferMode': 'ReturnAsStream', 'categories': ','.join(categoriesArray), })
def __init__(self, client: Session, ignoreHTTPSErrors: Any, options: dict = None, **kwargs: Any) -> None: """Make new navigator watcher.""" if options is None: options = {} options.update(kwargs) self._client = client self._ignoreHTTPSErrors = ignoreHTTPSErrors self._timeout = options.get('timeout', 3000) self._idleTime = options.get('networkIdleTimeout', 1000) self._idleTimer: Optional[Union[asyncio.Future, asyncio.Handle]] = None self._idleInflight = options.get('networkIdleInflight', 2) self._waitUntil = options.get('waitUntil', 'load') if self._waitUntil not in ('load', 'networkidle'): raise ValueError( f'Unknown value for options.waitUntil: {self._waitUntil}')
def waitForNavigation(self, options: dict = None, **kwargs: Any ) -> Optional[Response]: """Wait navigation completes.""" if options is None: options = dict() options.update(kwargs) watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors, options) responses: Dict[str, Response] = dict() listener = helper.addEventListener( self._networkManager, NetworkManager.Events.Response, lambda response: responses.__setitem__(response.url, response) ) await watcher.waitForNavigation() helper.removeEventListeners([listener]) return responses.get(self.url)
def screenshot(self, options: dict = None, **kwargs: Any) -> bytes: """Take screen shot.""" options = options or dict() options.update(kwargs) screenshotType = None if 'path' in options: mimeType, _ = mimetypes.guess_type(options['path']) if mimeType == 'image/png': screenshotType = 'png' elif mimeType == 'image/jpeg': screenshotType = 'jpeg' else: raise PageError('Unsupported screenshot ' f'mime type: {mimeType}') if 'type' in options: screenshotType = options['type'] if not screenshotType: screenshotType = 'png' return await self._screenshotTask(screenshotType, options)
def __init__(self, options: Dict[str, Any] = None, **kwargs: Any) -> None: """Make new launcher.""" self.options = options or dict() self.options.update(kwargs) self.chrome_args = DEFAULT_ARGS self._tmp_user_data_dir: Optional[str] = None self._parse_args() if 'headless' not in self.options or self.options.get('headless'): self.chrome_args = self.chrome_args + [ '--headless', '--disable-gpu', '--hide-scrollbars', '--mute-audio', ] if 'executablePath' in self.options: self.exec = self.options['executablePath'] else: if not check_chromium(): download_chromium() self.exec = str(chromium_excutable()) self.cmd = [self.exec] + self.chrome_args
def move(self, x: int, y: int, options: dict = None, **kwargs: Any ) -> None: """Move cursor.""" options = options or dict() options.update(kwargs) fromX = self._x fromY = self._y self._x = x self._y = y steps = options.get('steps', 1) for i in range(1, steps + 1): x = round(fromX + (self._x - fromX) * (i / steps)) y = round(fromY + (self._y - fromY) * (i / steps)) await self._client.send('Input.dispatchMouseEvent', { 'type': 'mouseMoved', 'button': self._button, 'x': x, 'y': y, 'modifiers': self._keyboard._modifiers, })
def contains_all_options(optiongroup, parentstyle, matchvalues=False): # type: (Dict[Any, Any], Style, bool) -> bool """Returns true if all options in optiongroup are present in parentstyle. If matchvalues is True, the values in optiongroup must also match those in the parentstyle. """ for optionname, value in optiongroup.items(): if optionname not in parentstyle: return False if isinstance(value, dict): parent_suboptions = parentstyle[optionname] if not contains_all_options(value, parent_suboptions, matchvalues=matchvalues): return False elif matchvalues: pvalue = parentstyle.get(optionname) if type(pvalue) != type(value): return False if pvalue != value: return False return True
def compute_unified_diff(filename, content2, **kwargs): # type: (str, bytes, **Any) -> Tuple[int, Iterable[str]] diff = () # type: Iterable[str] exit_code = ERROR kw = kwargs.copy() if 'n' not in kwargs: # zero context lines kw['n'] = 0 try: content1 = get_cached_file(filename) if PY3: c1 = unistr(content1) c2 = unistr(content2) else: c1 = content1 c2 = content2 diff = difflib.unified_diff(c1.splitlines(True), c2.splitlines(True), **kw) exit_code = OK finally: return exit_code, diff # --------------------------------------------------------------------- # Spare the user from specifying a formatter by finding a suitable one.
def set_value(self, key, value): # type: (str, Any) -> None """Modify a value in the configuration. """ self._ensure_have_load_only() fname, parser = self._get_parser_to_modify() if parser is not None: section, name = _disassemble_key(key) # Modify the parser and the configuration if not parser.has_section(section): parser.add_section(section) parser.set(section, name, value) self._config[self.load_only][key] = value self._mark_as_modified(fname, parser)
def received(self, type: str, resp: Dict[str, Any]) -> None: if type == 'message': self.message_received(resp['text']) elif type == 'init': raise Exception() elif type == 'advanced': # Fast UI Animations pass elif type == 'connected': # Players connected to the game pass elif type == 'notify': self.handle_notify(resp) elif type == 'action': self.decide_action(resp['can_clue'], resp['can_discard']) else: print(type, resp) raise Exception()
def __getitem__(self, item: str) -> Any: if self._query_values or item in self._values: return self._values.get(item) hyperparameter = self.configuration_space._hyperparameters[item] item_idx = self.configuration_space._hyperparameter_idx[item] if not np.isfinite(self._vector[item_idx]): raise KeyError() value = hyperparameter._transform(self._vector[item_idx]) # Truncate the representation of the float to be of constant # length for a python version if isinstance(hyperparameter, FloatHyperparameter): value = float(repr(value)) # TODO make everything faster, then it'll be possible to init all values # at the same time and use an OrderedDict instead of only a dict here to # support iterating that dict in the same order as the actual order of # hyperparameters self._values[item] = value return self._values[item]
def randomly_grouped_by(key_from_example: Callable[[LabeledExample], Any], training_share: float = .9) -> Callable[ [List[LabeledExample]], Tuple[List[LabeledExample], List[LabeledExample]]]: def split(examples: List[LabeledExample]) -> Tuple[List[LabeledExample], List[LabeledExample]]: examples_by_directory = group(examples, key=key_from_example) directories = examples_by_directory.keys() # split must be the same every time: random.seed(42) keys = set(random.sample(directories, int(training_share * len(directories)))) training_examples = [example for example in examples if key_from_example(example) in keys] test_examples = [example for example in examples if key_from_example(example) not in keys] return training_examples, test_examples return split
def __init__(self, variables: List[VariableIdentifier], lattices: Dict[Type, Type[Lattice]], arguments: Dict[Type, Dict[str, Any]] = defaultdict(lambda: dict())): """Create a mapping Var -> L from each variable in Var to the corresponding element in L. :param variables: list of program variables :param lattices: dictionary from variable types to the corresponding lattice types :param arguments: dictionary from variable types to arguments of the corresponding lattices """ super().__init__() self._variables = variables self._lattices = lattices self._arguments = arguments try: self._store = {v: lattices[type(v.typ)](**arguments[type(v.typ)]) for v in variables} except KeyError as key: error = f"Missing lattice for variable type {repr(key.args[0])}!" raise ValueError(error)
def hashex(method, # type: HashMethod key, # type: KeyType **options # type: typing.Any ): # type: (...) -> int if isinstance(key, six.text_type): key = key.encode('utf-8') if method.name.lower() in hashlib.algorithms_guaranteed: return int(hashlib.new(method.name.lower(), key).hexdigest(), 16) elif method == HashMethod.MMH3_32: return hash_murmur3(key, size=32, **options) elif method == HashMethod.MMH3_64: return hash_murmur3(key, size=64, **options) elif method == HashMethod.MMH3_128: return hash_murmur3(key, size=128, **options) elif method == HashMethod.SIPHASH: return hash_siphash(key, **options)
def dispatch(method: Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]: dispatcher = singledispatch(method) provides = set() def wrapper(self: Any, type: Type[T], query: Mapping[str, Any], context: PipelineContext = None) -> Any: call = dispatcher.dispatch(type) try: return call(self, query, context=context) except TypeError: raise DataSource.unsupported(type) def register(type: Type[T]) -> Callable[[Any, Type[T], Mapping[str, Any], PipelineContext], Any]: provides.add(type) return dispatcher.register(type) wrapper.register = register wrapper._provides = provides update_wrapper(wrapper, method) return wrapper
def dispatch(method: Callable[[Any, Type[T], Any, PipelineContext], None]) -> Callable[[Any, Type[T], Any, PipelineContext], None]: dispatcher = singledispatch(method) accepts = set() def wrapper(self: Any, type: Type[T], items: Any, context: PipelineContext = None) -> None: call = dispatcher.dispatch(type) try: return call(self, items, context=context) except TypeError: raise DataSink.unsupported(type) def register(type: Type[T]) -> Callable[[Any, Type[T], Any, PipelineContext], None]: accepts.add(type) return dispatcher.register(type) wrapper.register = register wrapper._accepts = accepts update_wrapper(wrapper, method) return wrapper
def _best_transform_from(self, source_type: Type[S], target_types: Iterable[Type]) -> Tuple[Callable[[S], Any], Type, int]: best = None best_cost = _MAX_TRANSFORM_COST to_type = None for target_type in target_types: try: transform, cost = self._transform(source_type, target_type) if cost < best_cost: best = transform best_cost = cost to_type = target_type except NoConversionError: pass if best is None: raise NoConversionError("Pipeline can't convert \"{source_type}\" to any of \"{target_types}\"".format(source_type=source_type, target_types=target_types)) return best, to_type, best_cost
def evaluate(self, query: MutableMapping[str, Any], context: PipelineContext = None) -> True: # This is a little weird. We have to handle the case of can_have("x").and_("y") here. # Since the only type of Node that can return False rather than just raising a # QueryValidationError is a KeyNode that isn't required, or an OrNode whose children # are all KeyNodes that aren't required, we can't short circuit on a False value. If # we have can_have("x").and_("y"), and only some are True, we want raise a # QueryValidationError. If all are True or all are False, everything's fine. all_true = True all_possible_false = True for child in self.children: is_true = child.evaluate(query, context) all_true = all_true and is_true if child.falsifiable: all_possible_false = all_possible_false and not is_true if all_possible_false or all_true: return True raise BoundKeyExistenceError("Query must have all or none of the elements joined with \"and\" in a \"can_have\" statement!")
def dispatch(method: Callable[[Any, Type[T], F, PipelineContext], T]) -> Callable[[Any, Type[T], F, PipelineContext], T]: dispatcher = singledispatch(method) transforms = {} def wrapper(self: Any, target_type: Type[T], value: F, context: PipelineContext = None) -> T: call = dispatcher.dispatch(TypePair[value.__class__, target_type]) try: return call(self, value, context=context) except TypeError: raise DataTransformer.unsupported(target_type, value) def register(from_type: Type[F], to_type: Type[T]) -> Callable[[Any, Type[T], F, PipelineContext], T]: try: target_types = transforms[from_type] except KeyError: target_types = set() transforms[from_type] = target_types target_types.add(to_type) return dispatcher.register(TypePair[from_type, to_type]) wrapper.register = register wrapper._transforms = transforms update_wrapper(wrapper, method) return wrapper
def execute(self, query: str, *, infer: bool = True, multi: bool = False) -> Any: """Execute a Graql query against the knowledge base :param query: the Graql query string to execute against the knowledge base :param infer: enable inference :param multi: treat this request as a list of queries :return: a list of query results :raises: GraknError, requests.exceptions.ConnectionError """ response = self._post(query, infer=infer, multi=multi) if response.ok: return response.json() else: raise GraknError(response.json()['exception'])
def _generate_form_data(**kwargs: Union[Any, BotoFairu]) -> aiohttp.FormData: form_fata = aiohttp.FormData() for name, value in kwargs.items(): if isinstance(value, BotoFairu): form_fata.add_field( name, value._content, content_type=value._content_type, filename=value._filename, content_transfer_encoding=value._content_transfer_encoding) elif isinstance(value, (list, dict)): form_fata.add_field(name, json.dumps(value)) elif isinstance(value, (int, float, str, bool)): form_fata.add_field(name, str(value)) else: raise TypeError("Unknown Type: {}".format(type(value))) return form_fata
def largest_dimensions_for_aspect_ratio(dimensions_list: List[Dimensions], desired_aspect_ratio: float, default: Any = _sentinel) -> Union[Dimensions, Any]: """ Returns ------- The largest dimensions after cropping each dimensions to reach desired aspect ratio """ if not dimensions_list: if default is not _sentinel: return default raise ValueError(f"{dimensions_list} must not be empty.") largest_dimensions = crop_dimensions_to_aspect_ratio(dimensions_list[0], desired_aspect_ratio) for dimensions in dimensions_list[1:]: nearest_dimensions_to_aspect_ratio = crop_dimensions_to_aspect_ratio(dimensions, desired_aspect_ratio) if nearest_dimensions_to_aspect_ratio.resolution > largest_dimensions.resolution: largest_dimensions = nearest_dimensions_to_aspect_ratio return largest_dimensions
def create(cls, texts: List[Any], name: str, *, locations: Opt[List[float]] = None, durations: Opt[List[float]] = None, default: bool = False) -> 'SubtitleTrack': subtitles = [] if locations: start_times, end_times = loc_util.start_end_locations_from_locations(locations) elif durations: start_times, end_times = loc_util.start_end_locations_from_intervals(durations) else: raise ex.ParameterError(f"Must provide either locations or durations for the subtitle track {name}") for index, (text, start_time, end_time) in enumerate(zip(texts, start_times, end_times)): subtitle = Subtitle(str(text), start_time, end_time) subtitles.append(subtitle) subtitle_track = cls(subtitles, name, default=default) return subtitle_track
def parse_settings(config: Any) -> Optional[Dict[str, Any]]: ret = { 'webhook-url': config.get('slack-webhook-url'), 'tmpl-msg': config.get('slack-tmpl-msg'), 'tmpl-duration': config.get('slack-tmpl-duration', fallback=''), 'tmpl-url': config.get('slack-tmpl-url', fallback='') } # type: Any if not ret['webhook-url'] or not ret['tmpl-msg']: log.debug('Slack settings missing, no slack notifications will be sent', 'NOTIFICATIONS') ret = None else: log.debug('Valid slack notification settings found', 'NOTIFICATIONS') ret['tmpl-msg'] = jinja2.Template(ret['tmpl-msg']) if ret['tmpl-duration']: ret['tmpl-duration'] = jinja2.Template(ret['tmpl-duration']) if ret['tmpl-url']: ret['tmpl-url'] = jinja2.Template(ret['tmpl-url']) return ret
def parse_settings(config: Any) -> Optional[Dict[str, Any]]: """Parse clicksend sms settings. Should only be called from sms.parse_settings. """ ret = { 'provider': 'clicksend', 'username': config.get('sms-clicksend-username'), 'api-key': config.get('sms-clicksend-api-key'), 'sender': config.get('sms-clicksend-sender'), 'tmpl': config.get('sms-tmpl'), } # type: Any if not ret['username'] or not ret['api-key'] or not ['sender'] or not ['tmpl']: log.msg('SMS settings missing, no sms notifications will be sent', 'NOTIFICATIONS') ret = None else: log.debug('Valid SMS notification settings found', 'NOTIFICATIONS') ret['tmpl'] = jinja2.Template(ret['tmpl']) return ret
def ctrlc_signal_handler(sgn: int, frame: Any) -> None: sys.exit(0)
def __call__(self, input: Any, *styles, reset: bool=True, apply: bool=True): """ Styles text with ANSI styles and returns the new string. By default the styling is cleared at the end of the string, this can be prevented with``reset=False``. Examples:: print(sformat('Hello World!', sformat.green)) print(sformat('ATTENTION!', sformat.bg_magenta)) print(sformat('Some things', sformat.reverse, sformat.bold)) :param input: the object to style with ansi codes. :param *styles: zero or more styles to apply to the text, should be either style instances or strings matching style names. :param reset: if False the ansi reset code is not appended to the end of the string :param: apply: if False no ansi codes are applied """ text = str(input) if not apply: return text codes = [] for s in styles: # raw ints are allowed if not isinstance(s, self.__class__) and not isinstance(s, int): try: s = self.styles[s] except KeyError: raise ValueError('invalid style "{}"'.format(s)) codes.append(str(s.value)) if codes: r = _ansi_template.format(';'.join(codes)) + text else: r = text if reset: r += _ansi_template.format(self.reset) return r
def __call__(self, value: Any, *, indent: int=0, indent_first: bool=False, highlight: bool=False): self._stream = io.StringIO() self._format(value, indent_current=indent, indent_first=indent_first) s = self._stream.getvalue() if highlight and pyg_lexer: # apparently highlight adds a trailing new line we don't want s = pygments.highlight(s, lexer=pyg_lexer, formatter=pyg_formatter).rstrip('\n') return s
def _format(self, value: Any, indent_current: int, indent_first: bool): if indent_first: self._stream.write(indent_current * self._c) value_repr = repr(value) if len(value_repr) <= self._simple_cutoff and not isinstance(value, collections.Generator): self._stream.write(value_repr) else: indent_new = indent_current + self._indent_step for t, func in self._type_lookup: if isinstance(value, t): func(value, value_repr, indent_current, indent_new) return self._format_raw(value, value_repr, indent_current, indent_new)
def _format_raw(self, value: Any, value_repr: str, indent_current: int, indent_new: int): lines = value_repr.splitlines(True) if len(lines) > 1 or (len(value_repr) + indent_current) >= self._width: self._stream.write('(\n') wrap_at = self._width - indent_new prefix = indent_new * self._c for line in lines: sub_lines = textwrap.wrap(line, wrap_at) for sline in sub_lines: self._stream.write(prefix + sline + '\n') self._stream.write(indent_current * self._c + ')') else: self._stream.write(value_repr)
def __init__(self) -> None: """Make new multimap.""" # maybe defaultdict(set) is better self._map: OrderedDict[str, List[Any]] = OrderedDict()
def set(self, key: str, value: Any) -> None: """Set value.""" _set = self._map.get(key) if not _set: _set = list() self._map[key] = _set if value not in _set: _set.append(value)
def get(self, key: str) -> List[Any]: """Get values.""" return self._map.get(key, list())
def hasValue(self, key: str, value: Any) -> bool: """Chekc value is in this map.""" _set = self._map.get(key, list()) return value in _set
def firstValue(self, key: str) -> Any: """Get first value of the key.""" _set = self._map.get(key) if not _set: return None return _set[0]
def valuesArray(self) -> List[Any]: """Get all values as list.""" result: List[Any] = list() for values in self._map.values(): result.extend(values) return result
def querySelectorEval(self, selector: str, pageFunction: str, *args: Any) -> Optional[Any]: """Execute function on element which matches selector.""" elementHandle = await self.querySelector(selector) if elementHandle is None: raise PageError( f'Error: failed to find element matching selector "{selector}"' ) result = await elementHandle.evaluate(pageFunction, *args) await elementHandle.dispose() return result
def waitForSelector(self, selector: str, options: dict = None, **kwargs: Any) -> Awaitable: """Wait for selector matches element.""" if options is None: options = dict() options.update(kwargs) timeout = options.get('timeout', 30_000) # msec interval = options.get('interval', 0) # msec return WaitTask(self, 'selector', selector, timeout, interval=interval)
def __init__(self, frame: Frame, _type: str, expr: str, timeout: float, *args: Any, interval: float = 0) -> None: """Make new wait task. :arg float timeout: msec to wait for task [default 30_000 [msec]]. :arg float interval: msec to poll for task [default timeout / 1000]. """ if _type not in ['function', 'selector']: raise ValueError('Unsupported type for WaitTask: ' + _type) super().__init__() self.__frame: Frame = frame self.__type = _type self.expr = expr self.__timeout = timeout / 1000 # sec self.__interval = interval / 1000 or self.__timeout / 100 # sec self.__runCount: int = 0 self.__terminated = False self.__done = False frame._waitTasks.add(self) # Since page navigation requires us to re-install the pageScript, # we should track timeout on our end. self.__loop = asyncio.get_event_loop() self.__timeoutTimer = self.__loop.call_later( self.__timeout, lambda: self.terminate( BrowserError(f'waiting failed: timeout {timeout}ms exceeded') ) ) asyncio.ensure_future(self.rerun(True))
def setUserAgent(self, userAgent: str) -> Any: """Set user agent.""" return await self._client.send('Network.setUserAgentOverride', {'userAgent': userAgent})
def response(self) -> Any: """Get response.""" return self._response
def _onTargetCrashed(self, *args: Any, **kwargs: Any) -> None: self.emit('error', PageError('Page crashed!'))
def querySelectorEval(self, selector: str, pageFunction: str, *args: Any) -> Optional[Any]: """Execute function on element which matches selector.""" frame = self.mainFrame if not frame: raise PageError('no main frame.') return await frame.querySelectorEval(selector, pageFunction, *args)
def _onDialog(self, event: Any) -> None: dialogType = '' _type = event.get('type') if _type == 'alert': dialogType = Dialog.Type.Alert elif (_type == 'confirm'): dialogType = Dialog.Type.Confirm elif (_type == 'prompt'): dialogType = Dialog.Type.Prompt elif (_type == 'beforeunload'): dialogType = Dialog.Type.BeforeUnload dialog = Dialog(self._client, dialogType, event.get('message'), event.get('defaultPrompt')) self.emit(Page.Events.Dialog, dialog)
def goto(self, url: str, options: dict = None, **kwargs: Any ) -> Optional[Response]: """Got to url.""" if options is None: options = dict() options.update(kwargs) watcher = NavigatorWatcher(self._client, self._ignoreHTTPSErrors, options) responses: Dict[str, Response] = dict() listener = helper.addEventListener( self._networkManager, NetworkManager.Events.Response, lambda response: responses.__setitem__(response.url, response) ) result = asyncio.ensure_future(watcher.waitForNavigation()) referrer = self._networkManager.extraHTTPHeaders().get('referer', '') try: await self._client.send('Page.navigate', dict(url=url, referrer=referrer)) except Exception: watcher.cancel() raise await result helper.removeEventListeners([listener]) if self._frameManager.isMainFrameLoadingFailed(): raise PageError('Failed to navigate: ' + url) return responses.get(self.url)
def reload(self, options: dict = None, **kwargs: Any ) -> Optional[Response]: """Reload this page.""" if options is None: options = dict() options.update(kwargs) await self._client.send('Page.reload') return await self.waitForNavigation(options)
def goForward(self, options: dict = None, **kwargs: Any ) -> Optional[Response]: """Go forward history.""" if options is None: options = dict() options.update(kwargs) return await self._go(+1, options)