我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib.request.URLError()。
def mm_heartbeat(self): # Check if stop or set next timer if self.shutdown: return threading.Timer(self.hb_timer, self.mm_heartbeat).start() address = ("http://" + self.mm_host + ":" + self.mm_port + "/alexapi?action=AVSHB") logger.debug("Sending MM Heatbeat") try: response = urlopen(address).read() except URLError as err: logger.error("URLError: %s", err.reason) return logger.debug("Response: " + response)
def send_remote_shutdown_command(self): try: from urllib import request as url_request URLError = url_request.URLError except ImportError: import urllib2 as url_request import urllib2 URLError = urllib2.URLError try: url_request.urlopen("http://127.0.0.1:%d/shutdown" % self.port) except URLError: return count = 0 while self.is_connectable(): if count == 30: break count += 1 time.sleep(1)
def send_remote_shutdown_command(self): try: from urllib import request as url_request URLError = url_request.URLError except ImportError: import urllib2 as url_request import urllib2 URLError = urllib2.URLError try: url_request.urlopen("%s/shutdown" % self.service_url) except URLError: return for x in range(30): if not self.is_connectable(): break else: time.sleep(1)
def __getitem__(self, key): if self.cache and key in self.geocache: return self.geocache[key] location = Location() try: self._get_geocoding(key, location) self._get_timezone(location) self._get_elevation(location) except URLError: raise AstralError(('GoogleGeocoder: Unable to contact ' 'Google maps API')) url = 'http://maps.google.com/maps?q=loc:%f,%f' location.url = url % (location.latitude, location.longitude) if self.cache: self.geocache[key] = location return location
def command(self, command, value=None): func_str = 'GoProHero.command({}, {})'.format(command, value) if command in self.commandMaxtrix: args = self.commandMaxtrix[command] # accept both None and '' for commands without a value if value == '': value = None # for commands with values, translate the value if value is not None and value in args['translate']: value = args['translate'][value] # build the final url url = self._commandURL(args['cmd'], value) # attempt to contact the camera try: urlopen(url, timeout=self.timeout).read() logging.info('{} - http success!'.format(func_str)) return True except (HTTPError, URLError, socket.timeout) as e: logging.warning('{}{} - error opening {}: {}{}'.format( Fore.YELLOW, func_str, url, e, Fore.RESET)) # catchall return statement return False
def retry_wrapper(func, name, mid, log=False): def wrapped_func(): max_tries = 5 for trie in range(max_tries): try: func() return except ur.URLError: if log: print("URLError for {}({}); sleeping for 1 second".format(name, mid)) time.sleep(1) except http.client.RemoteDisconnected: if log: print("RemoteDisconnected for {}({}); sleeping for 1 second".format(name, mid)) time.sleep(1) else: raise ValueError("Unable to execute function.") return wrapped_func
def rank_checker(url,hatebu_url): try: html = request.urlopen(hatebu_url) except request.HTTPError as e: print(e.reason) except request.URLError as e: print(e.reason) soup = BeautifulSoup(html,"lxml") a = soup.find("a",href=url) if a == None: rank = None else: rank = a.get("data-entryrank") return rank # ????????????????????
def test_open_url(self): if sys.version_info[0] < 3: p2url = urllib.pathname2url else: p2url = urllib2.pathname2url fpath = os.path.join(os.path.dirname(__file__), "resources") fpath = os.path.abspath(fpath) tfile = os.path.join(fpath, "rwopstest.txt") urlpath = "file:%s" % p2url(tfile) resfile = resources.open_url(urlpath) self.assertIsNotNone(resfile) tfile = os.path.join(fpath, "invalid") urlpath = "file:%s" % p2url(tfile) self.assertRaises(urllib2.URLError, resources.open_url, urlpath)
def retrieve_url_nodecode(url): """ Return the content of the url page as a string """ req = Request(url, headers = headers) try: response = urlopen(req) except URLError as errno: print(" ".join(("Connection error:", str(errno.reason)))) print(" ".join(("URL:", url))) return "" dat = response.read() # Check if it is gzipped if dat[:2] == '\037\213': # Data is gzip encoded, decode it compressedstream = StringIO(dat) gzipper = gzip.GzipFile(fileobj=compressedstream) extracted_data = gzipper.read() dat = extracted_data return dat return dat
def send_remote_shutdown_command(self): try: from urllib import request as url_request URLError = url_request.URLError except ImportError: import urllib2 as url_request import urllib2 URLError = urllib2.URLError try: url_request.urlopen("%s/shutdown" % self.service_url) except URLError: return count = 0 while self.is_connectable(): if count == 30: break count += 1 time.sleep(1)
def _load_image_from_url_or_local_path(self, file_path): urlparts = urlparse(file_path) if urlparts.scheme in ('http', 'https'): try: file = self._session.get(file_path, stream=True).raw except RequestException: log.warning("Image skipped", exc_info=True) return None elif urlparts.scheme in ("ftp", "data"): try: file = urlopen(file_path) except (URLError, ) + ftplib.all_errors: log.warning("Image skipped", exc_info=True) return None else: file = file_path try: return open_image(file) except (IOError, ValueError): log.warning("Image skipped", exc_info=True) return None
def get_location(url): try: response = request.urlopen(url) # urllib will follow redirections and it's too much code to tell urllib # not to do that return response.geturl() except socket.timeout: print('request timeout') exit() except request.HTTPError as e: print(e.code) except request.URLError as e: print(e.reason) exit() return "fail"
def update_mm(self, status): address = ("http://" + self.mm_host + ":" + self.mm_port + "/alexapi?action=AVSSTATUS&status=" + status) logger.debug("Calling URL: %s", address) try: response = urlopen(address).read() except URLError as err: logger.error("URLError: %s", err.reason) return logger.debug("Response: %s", response)
def send_data(self, event): """Send event to VES""" server_url = "http{}://{}:{}{}/eventListener/v{}{}".format( 's' if self._app_config['UseHttps'] else '', self._app_config['Domain'], int(self._app_config['Port']), '{}'.format('/{}'.format(self._app_config['Path']) if len( self._app_config['Path']) > 0 else ''), int(self._app_config['ApiVersion']), '{}'.format( '/{}'.format(self._app_config['Topic']) if len( self._app_config['Topic']) > 0 else '')) logging.info('Vendor Event Listener is at: {}'.format(server_url)) credentials = base64.b64encode('{}:{}'.format( self._app_config['Username'], self._app_config['Password']).encode()).decode() logging.info('Authentication credentials are: {}'.format(credentials)) try: request = url.Request(server_url) request.add_header('Authorization', 'Basic {}'.format(credentials)) request.add_header('Content-Type', 'application/json') event_str = json.dumps(event).encode() logging.debug("Sending {} to {}".format(event_str, server_url)) url.urlopen(request, event_str, timeout=1) logging.debug("Sent data to {} successfully".format(server_url)) except url.HTTPError as e: logging.error('Vendor Event Listener exception: {}'.format(e)) except url.URLError as e: logging.error( 'Vendor Event Listener is is not reachable: {}'.format(e)) except Exception as e: logging.error('Vendor Event Listener error: {}'.format(e))
def request(self, host, handler, request_body, verbose=0): """Send XMLRPC request""" uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme, host=host, handler=handler) if self._passmgr: self._passmgr.add_password(None, uri, self._username, self._password) if self.verbose: _LOGGER.debug("FabricTransport: {0}".format(uri)) opener = urllib2.build_opener(*self._handlers) headers = { 'Content-Type': 'text/xml', 'User-Agent': self.user_agent, } req = urllib2.Request(uri, request_body, headers=headers) try: return self.parse_response(opener.open(req)) except (urllib2.URLError, urllib2.HTTPError) as exc: try: code = -1 if exc.code == 400: reason = 'Permission denied' code = exc.code else: reason = exc.reason msg = "{reason} ({code})".format(reason=reason, code=code) except AttributeError: if 'SSL' in str(exc): msg = "SSL error" else: msg = str(exc) raise InterfaceError("Connection with Fabric failed: " + msg) except BadStatusLine: raise InterfaceError("Connection with Fabric failed: check SSL")
def download_file(url, download_directory): """Download a remote file Args: download_directory: (string) Returns: (string) that path of the file that was just downloaded. If something failed during download, return None Raises: DownloadError """ Output.print_information("Downloading " + url + " ...") parsed_url = urlparse(url) if parsed_url.path in ["/", ""]: file_name = parsed_url.netloc else: file_name = parsed_url.path.split("/")[-1] download_path = abspath(join(download_directory, file_name)) try: with open(download_path, 'wb') as file_object: file_object.write(urlopen(url).read()) return download_path except HTTPError as expn: raise DownloadError("HTTP error code " + str(expn.code) + " while retrieving " \ + url + "\n" + str(expn.reason)) except URLError as expn: raise DownloadError("HTTP URL error while retrieving " + url + "\n" + str(expn.reason)) except Exception as expn: raise DownloadError("Unable to retrieve " + url + "\n" + str(expn))
def async_run(self, line, cell=None): """Run code into cell asynchronously Usage:\\ %async_run <source> (cell content) """ if cell is None: code_to_run = line else: code_to_run = cell session_id = str(uuid4()) connection_id = format_ws_connection_id(PY_ROLE, session_id) try: _ = urlopen(connection_string(web_socket=False, extra='ping')) except URLError: print("Connection to server refused!", end=' ') print("Use %async_run_server first!") else: connector = WSConnector(connection_id, code_to_run, self.shell) connector.connect() html_output = LIGHT_HTML_OUTPUT_CELL.format(session_id=session_id) js_code = JS_WEBSOCKET_CODE.replace('__sessionid__', session_id) js_code = js_code.replace('__connection_id__', format_ws_connection_id(JS_ROLE, session_id)) html_output += js_code return HTML(html_output)
def send_response(event, context, response_status, reason=None, response_data={}): body = { "Status": response_status, "PhysicalResourceId": context.log_stream_name, "StackId": event["StackId"], "RequestId": event["RequestId"], "LogicalResourceId": event["LogicalResourceId"], } print("Responding: {}".format(response_status)) if reason: print(reason) body["Reason"] = reason if response_data: print(response_data) body["Data"] = response_data body = json.dumps(body).encode("utf-8") req = Request(event["ResponseURL"], data=body, headers={ "Content-Length": len(body), "Content-Type": "", }) req.get_method = lambda: "PUT" try: urlopen(req) return True except HTTPError as e: print("Failed executing HTTP request: {}".format(e.code)) return False except URLError as e: print("Failed to reach the server: {}".format(e.reason)) return False
def test(self, url, toHex=True): try: url = 'http://{}/{}'.format(self._ip, url) print(url) response = urlopen( url, timeout=self.timeout).read() if toHex: response = response.encode('hex') print(response) except (HTTPError, URLError, socket.timeout) as e: print(e)
def catch_request(request): """Helper function to catch common exceptions encountered when establishing a connection with a HTTP/HTTPS request """ try: uh = urlopen(request) return uh, False except (HTTPError, URLError, socket.error): e = sys.exc_info()[1] return None, e
def getBestServer(servers): """Perform a speedtest.net latency request to determine which speedtest.net server has the lowest latency """ results = {} for server in servers: cum = [] url = '%s/latency.txt' % os.path.dirname(server['url']) urlparts = urlparse(url) for i in range(0, 3): try: if urlparts[0] == 'https': h = HTTPSConnection(urlparts[1]) else: h = HTTPConnection(urlparts[1]) headers = {'User-Agent': user_agent} start = timeit.default_timer() h.request("GET", urlparts[2], headers=headers) r = h.getresponse() total = (timeit.default_timer() - start) except (HTTPError, URLError, socket.error): cum.append(3600) continue text = r.read(9) if int(r.status) == 200 and text == 'test=test'.encode(): cum.append(total) else: cum.append(3600) h.close() avg = round((sum(cum) / 6) * 1000, 3) results[avg] = server fastest = sorted(results.keys())[0] best = results[fastest] best['latency'] = fastest return best
def query(self, query): query = textwrap.dedent(query) if self.debug: self.debug(query) url = '%s?query=%s' % (self.endpoint, quote(query)) req = Request(url) if not self.cache: req.add_header('cache-control', 'no-cache') req.add_header('Accept', 'application/sparql-results+json') try: res = urlopen(req).read() except (URLError) as e: raise WdmapperError(e) if six.PY3: res = res.decode('utf8') if res: try: data = json.loads(res) except ValueError as e: if res.find(b'QueryTimeoutException') != -1: e = 'query timeout' raise WdmapperError(e) if data and 'results' in data: result = [] qvars = data['head']['vars'] for row in data['results']['bindings']: values = {} for var in qvars: if var in row: values[var] = row[var]['value'] else: values[var] = None result.append(values) return result
def getBestServer(servers): """Perform a speedtest.net latency request to determine which speedtest.net server has the lowest latency """ results = {} for server in servers: cum = [] url = '%s/latency.txt' % os.path.dirname(server['url']) urlparts = urlparse(url) for i in range(0, 3): try: if urlparts[0] == 'https': h = HTTPSConnection(urlparts[1]) else: h = HTTPConnection(urlparts[1]) start = timeit.default_timer() h.request("GET", urlparts[2]) r = h.getresponse() total = (timeit.default_timer() - start) except (HTTPError, URLError, socket.error): cum.append(3600) continue text = r.read(9) if int(r.status) == 200 and text == 'test=test'.encode(): cum.append(total) else: cum.append(3600) h.close() avg = round((sum(cum) / 6) * 1000, 3) results[avg] = server fastest = sorted(results.keys())[0] best = results[fastest] best['latency'] = fastest return best
def ping(ip): try: urlopen("http://" + ip + ":1921/name").read() return True except URLError: return False
def catch_request(request): """Helper function to catch common exceptions encountered when establishing a connection with a HTTP/HTTPS request """ try: uh = urlopen(request) return uh except (HTTPError, URLError, socket.error): return False
def lookup(keyword): # word lookup function # build url to lookup url = base_url + keyword req = Request(url) # grab web page try: grab_page = urlopen(req) except URLError as e: if hasattr(e, 'reason'): print(keyword, e.reason) undef_unknowns = open("unknown_words_notfound.txt", "a") undef_unknowns.write((keyword + "\n")) # log unfound word in file undef_unknowns.close() elif hasattr(e, 'code'): print('The server couldn\'t fulfill the request.') print('Error code: ', e.code) else: web_page = grab_page.readlines() # read web page lines for line in web_page: line = line.decode('utf-8') if '<meta name="description"' in line: # find required line splitline = line.split('"') for entry in splitline: # extract bits we want if 'definition,' in entry: write_line = keyword+": "+''.join(entry.split('definition, ')[1:]) print(write_line) write_line +="\n" def_unknowns = open("unknown_words_defs.txt", "a") def_unknowns.write(write_line) # write word + def'n to file def_unknowns.close()
def get_category(url): try: html = request.urlopen("http://b.hatena.ne.jp/entry/{}".format(url)) soup = BeautifulSoup(html,"lxml") return soup.find("html").get("data-category-name") except request.HTTPError as e: print(e.reason) except request.URLError as e: print(e.reason) #??????????????????
def is_hatenatop(url): try: html = request.urlopen("http://hatenablog.com/") except urllib.HTTPError as e: print(e.reason) except urllib.URLError as e: print(e.reason) soup = BeautifulSoup(html,"lxml") a = soup.find("a",href=url) if a is None: return False return url == a.get("href")
def check_connectivity(reference): try: urlopen(reference, timeout=1) return True except URLError: return False
def request(url, data={}, headers={}, timeout=10): """ Returns a file-like object to the given URL. """ if cookies is not None: f = urllib2.HTTPCookieProcessor(cookies) f = urllib2.build_opener(f) else: f = urllib2.build_opener() try: f = f.open(Request(url, urlencode(data) if data else None, headers), timeout=timeout) except URLError as e: status = getattr(e, 'code', None) # HTTPError if status == 401: raise Forbidden if status == 403: raise Forbidden if status == 404: raise NotFound if status == 420: raise TooManyRequests if status == 429: raise TooManyRequests raise e except socket.error as e: if 'timed out' in repr(e.args): raise Timeout else: raise e log.info(url) return f
def _internet_on(address): """ Check to see if the internet is on by pinging a set address. :param address: the IP or address to hit :return: a boolean - true if can be reached, false if not. """ try: urllib2.urlopen(address, timeout=1) return True except urllib2.URLError as err: return False
def _query(self, path, before=None, after=None): res = [] url = '%s/lookup/%s' % (self.server, path) params = {} if self.limit: params['limit'] = self.limit if before and after: params['time_first_after'] = after params['time_last_before'] = before else: if before: params['time_first_before'] = before if after: params['time_last_after'] = after if params: url += '?{0}'.format(urlencode(params)) req = Request(url) req.add_header('Accept', 'application/json') req.add_header('X-Api-Key', self.apikey) proxy_args = {} if self.http_proxy: proxy_args['http'] = self.http_proxy if self.https_proxy: proxy_args['https'] = self.https_proxy proxy_handler = ProxyHandler(proxy_args) opener = build_opener(proxy_handler) try: http = opener.open(req) while True: line = http.readline() if not line: break yield json.loads(line.decode('ascii')) except (HTTPError, URLError) as e: raise QueryError(str(e), sys.exc_traceback)
def connect(self): """ Attempt to connect to the bridge and return true if successful """ self.logger.info('Connect: %s', self.url) try: urlopen(self.url, timeout=1) return True except URLError: return False
def _make_request(self, req, genmsg='', fail_msg='', abort_on_fail=None): """ Make the web request. Doing it here avoids a lot of duplicate exception handling :param gen_msg: Message we can print to console or logs so we know about the request :param fail_msg: Message we can print to console or logs on failure :param abort_on_fail: Exit on failed request :return: Response """ if genmsg: self.send_log(genmsg, 'info') try: res = urlopen(req) except URLError as e: if fail_msg: msg = fail_msg else: msg = 'Failed to make request' if abort_on_fail: self.send_log(msg, 'critical') self.send_log('Aborting', 'critical') sys.exit(1) else: self.send_log(msg, 'error') return None return res
def get_active_plugins(self): """ Return all active plugins :return: """ req = self._create_request(method='core.get_enabled_plugins', params=[]) try: self._check_session() # Make sure we still have an active session res = urlopen(req) except URLError as e: msg = 'Failed to get list of plugins. HTTP Error' self.send_log(msg, 'error') print(msg) print(e) self.active_plugins = [] return output = self._process_response(res) if output['error']: msg = 'Problem getting plugin list from {}. Error: {}'.format(self.torrent_client, output['error']) print(msg) self.send_log(msg, 'error') self.active_plugins = [] return self.active_plugins = output['result']
def run(self): # We create this context so that we can crawl # https sites myssl = ssl.create_default_context(); myssl.check_hostname=False myssl.verify_mode=ssl.CERT_NONE # process all the links in our queue while True: self.urlLock.acquire() print("Queue Size: {}".format(self.linksToCrawl.qsize())) link = self.linksToCrawl.get() self.urlLock.release() # have we reached the end of our queue? if link is None: break # Have we visited this link already? if (link in self.haveVisited): print("Already Visited: {}".format(link)) break try: link = urljoin(self.baseUrl, link) req = Request(link, headers={'User-Agent': 'Mozilla/5.0'}) response = urlopen(req, context=myssl) print("Url {} Crawled with Status: {}".format(response.geturl(), response.getcode())) soup = BeautifulSoup(response.read(), "html.parser") for atag in soup.find_all('a'): if (atag.get('href') not in self.haveVisited) and (urlparse(link).netloc == 'tutorialedge.net'): self.linksToCrawl.put(atag.get('href')) else : print("{} already visited or not part of website".format(atag.get('href'))) print("Adding {} to crawled list".format(link)) self.haveVisited.append(link) except URLError as e: print("URL {} threw this error when trying to parse: {}".format(link, e.reason)) self.errorLinks.append(link) finally: self.linksToCrawl.task_done()
def lookup(twitter, user_ids): """Resolve an entire list of user ids to screen names.""" users = {} api_limit = 100 for i in range(0, len(user_ids), api_limit): fail = Fail() while True: try: portion = lookup_portion(twitter, user_ids[i:][:api_limit]) except TwitterError as e: if e.e.code == 429: err("Fail: %i API rate limit exceeded" % e.e.code) rls = twitter.application.rate_limit_status() reset = rls.rate_limit_reset reset = time.asctime(time.localtime(reset)) delay = int(rls.rate_limit_reset - time.time()) + 5 # avoid race err("Interval limit of %i requests reached, next reset on " "%s: going to sleep for %i secs" % (rls.rate_limit_limit, reset, delay)) fail.wait(delay) continue elif e.e.code == 502: err("Fail: %i Service currently unavailable, retrying..." % e.e.code) else: err("Fail: %s\nRetrying..." % str(e)[:500]) fail.wait(3) except urllib2.URLError as e: err("Fail: urllib2.URLError %s - Retrying..." % str(e)) fail.wait(3) except httplib.error as e: err("Fail: httplib.error %s - Retrying..." % str(e)) fail.wait(3) except KeyError as e: err("Fail: KeyError %s - Retrying..." % str(e)) fail.wait(3) else: users.update(portion) err("Resolving user ids to screen names: %i/%i" % (len(users), len(user_ids))) break return users
def follow(twitter, screen_name, followers=True): """Get the entire list of followers/following for a user.""" user_ids = [] cursor = -1 fail = Fail() while True: try: portion, cursor = follow_portion(twitter, screen_name, cursor, followers) except TwitterError as e: if e.e.code == 401: reason = ("follow%s of that user are protected" % ("ers" if followers else "ing")) err("Fail: %i Unauthorized (%s)" % (e.e.code, reason)) break elif e.e.code == 429: err("Fail: %i API rate limit exceeded" % e.e.code) rls = twitter.application.rate_limit_status() reset = rls.rate_limit_reset reset = time.asctime(time.localtime(reset)) delay = int(rls.rate_limit_reset - time.time()) + 5 # avoid race err("Interval limit of %i requests reached, next reset on %s: " "going to sleep for %i secs" % (rls.rate_limit_limit, reset, delay)) fail.wait(delay) continue elif e.e.code == 502: err("Fail: %i Service currently unavailable, retrying..." % e.e.code) else: err("Fail: %s\nRetrying..." % str(e)[:500]) fail.wait(3) except urllib2.URLError as e: err("Fail: urllib2.URLError %s - Retrying..." % str(e)) fail.wait(3) except httplib.error as e: err("Fail: httplib.error %s - Retrying..." % str(e)) fail.wait(3) except KeyError as e: err("Fail: KeyError %s - Retrying..." % str(e)) fail.wait(3) else: new = -len(user_ids) user_ids = list(set(user_ids + portion)) new += len(user_ids) what = "follow%s" % ("ers" if followers else "ing") err("Browsing %s %s, new: %i" % (screen_name, what, new)) if cursor == 0: break fail = Fail() return user_ids
def statuses(twitter, screen_name, tweets, mentions=False, favorites=False, received_dms=None, isoformat=False): """Get all the statuses for a screen name.""" max_id = None fail = Fail() # get portions of statuses, incrementing max id until no new tweets appear while True: try: portion = statuses_portion(twitter, screen_name, max_id, mentions, favorites, received_dms, isoformat) except TwitterError as e: if e.e.code == 401: err("Fail: %i Unauthorized (tweets of that user are protected)" % e.e.code) break elif e.e.code == 429: err("Fail: %i API rate limit exceeded" % e.e.code) rls = twitter.application.rate_limit_status() reset = rls.rate_limit_reset reset = _time.asctime(_time.localtime(reset)) delay = int(rls.rate_limit_reset - _time.time()) + 5 # avoid race err("Interval limit of %i requests reached, next reset on %s: " "going to sleep for %i secs" % (rls.rate_limit_limit, reset, delay)) fail.wait(delay) continue elif e.e.code == 404: err("Fail: %i This profile does not exist" % e.e.code) break elif e.e.code == 502: err("Fail: %i Service currently unavailable, retrying..." % e.e.code) else: err("Fail: %s\nRetrying..." % str(e)[:500]) fail.wait(3) except urllib2.URLError as e: err("Fail: urllib2.URLError %s - Retrying..." % str(e)) fail.wait(3) except httplib.error as e: err("Fail: httplib.error %s - Retrying..." % str(e)) fail.wait(3) except KeyError as e: err("Fail: KeyError %s - Retrying..." % str(e)) fail.wait(3) else: new = -len(tweets) tweets.update(portion) new += len(tweets) err("Browsing %s statuses, new tweets: %i" % (screen_name if screen_name else "home", new)) if new < 190: break max_id = min(portion.keys())-1 # browse backwards fail = Fail()
def status(self): status = { # summary = 'notfound', 'sleeping', 'on', or 'recording' 'summary': 'notfound', 'raw': {} } camActive = True # loop through different status URLs for cmd in self.statusMatrix: # stop sending requests if a previous request failed if camActive: url = self._statusURL(cmd) # attempt to contact the camera try: response = urlopen( url, timeout=self.timeout).read().encode('hex') status['raw'][cmd] = response # save raw response # loop through different parts we know how to translate for item in self.statusMatrix[cmd]: args = self.statusMatrix[cmd][item] if 'a' in args and 'b' in args: part = response[args['a']:args['b']] else: part = response # translate the response value if we know how if 'translate' in args: status[item] = self._translate( args['translate'], part) else: status[item] = part except (HTTPError, URLError, socket.timeout) as e: logging.warning('{}{} - error opening {}: {}{}'.format( Fore.YELLOW, 'GoProHero.status()', url, e, Fore.RESET)) camActive = False # build summary if 'record' in status and status['record'] == 'on': status['summary'] = 'recording' elif 'power' in status and status['power'] == 'on': status['summary'] = 'on' elif 'power' in status and status['power'] == 'sleeping': status['summary'] = 'sleeping' logging.info('GoProHero.status() - result {}'.format(status)) return status