我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用urllib2.HTTPError()。
def import_dashboard_via_user_pass(api_url, user, password, dashboard): payload = {'dashboard': dashboard, 'overwrite': True} auth_string = base64.b64encode('%s:%s' % (user, password)) headers = {'Authorization': "Basic {}".format(auth_string), 'Content-Type': 'application/json'} req = urllib2.Request(api_url + 'api/dashboards/db', headers=headers, data=json.dumps(payload)) try: resp = urllib2.urlopen(req) data = json.load(resp) return data except urllib2.HTTPError, error: data = json.load(error) return data
def get(self, url, proxy=None): if proxy: proxy = urllib2.ProxyHandler({'http': proxy}) opener = urllib2.build_opener(proxy) urllib2.install_opener(opener) try: response = urllib2.urlopen(url) except HTTPError, e: resp = e.read() self.status_code = e.code except URLError, e: resp = e.read() self.status_code = e.code else: self.status_code = response.code resp = response.read() return resp
def run(self): data = self.getData() value = { data: { "type": self.data_type } } json_data = json.dumps(value) post_data = json_data.encode('utf-8') headers = {'Content-Type': 'application/json'} try: request = urllib2.Request('{}/hippocampe/api/v1.0/{}'.format(self.url, self.service), post_data, headers) response = urllib2.urlopen(request) report = json.loads(response.read()) self.report(report) except urllib2.HTTPError: self.error("Hippocampe: " + str(sys.exc_info()[1])) except urllib2.URLError: self.error("Hippocampe: service is not available") except Exception as e: self.unexpectedError(e)
def send_result(email, result, title, urn): """ Args: email (str): address to send the results result (obj): results to send title (str): urn (str): uniform resource name Returns: str: response from endpoint """ url = 'https://mongoaud.it/results' headers = {'Content-type': 'application/json', 'Accept': 'application/json'} values = {'email': email, 'result': result, 'title': title, 'urn': urn, 'date': get_date()} try: req = urllib2.Request(url, json.dumps(values), headers) response = urllib2.urlopen(req) return response.read() except (urllib2.HTTPError, urllib2.URLError) as exc: return "Sadly enough, we are having technical difficulties at the moment, " \ "please try again later.\n\n%s" % str(exc)
def check_version(version): # if application is binary then check for latest version if getattr(sys, 'frozen', False): try: url = "https://api.github.com/repos/stampery/mongoaudit/releases/latest" req = urllib2.urlopen(url) releases = json.loads(req.read()) latest = releases["tag_name"] if version < latest: print("mongoaudit version " + version) print("There's a new version " + latest) _upgrade(releases) except (urllib2.HTTPError, urllib2.URLError): print("Couldn't check for upgrades") except os.error: print("Couldn't write mongoaudit binary")
def download_songs(url): time.sleep(random.random() * 0.5) try: page = urllib2.urlopen(url).read() soup = BeautifulSoup(page, 'html.parser') # Get the artist name artist_name = soup.findAll('h1')[0].get_text()[:-7].lower().replace(' ', '_') # Store all songs for a given artist with open('artist_data/'+artist_name+'.txt', 'wb') as w: for song in soup.findAll('a', {'target': '_blank'}): if 'lyrics/' in song['href']: song_url = song['href'][1:].strip() w.write(song_url + '\n') except urllib2.HTTPError: print '404 not found'
def TestSite(url): protocheck(url) print "Trying: " + url try: urllib2.urlopen(url, timeout=3) except urllib2.HTTPError, e: if e.code == 405: print url + " found!" print "Now the brute force will begin! >:)" if e.code == 404: printout(str(e), YELLOW) print " - XMLRPC has been moved, removed, or blocked" sys.exit() except urllib2.URLError, g: printout("Could not identify XMLRPC. Please verify the domain.\n", YELLOW) sys.exit() except socket.timeout as e: print type(e) printout("The socket timed out, try it again.", YELLOW) sys.exit()
def main(): parser = argparse.ArgumentParser() parser.add_argument('-q', '--term', dest='term', default=DEFAULT_TERM, type=str, help='Search term (default: %(default)s)') parser.add_argument('-l', '--location', dest='location', default=DEFAULT_LOCATION, type=str, help='Search location (default: %(default)s)') input_values = parser.parse_args() try: query_api(input_values.term, input_values.location) except HTTPError as error: sys.exit( 'Encountered HTTP error {0} on {1}:\n {2}\nAbort program.'.format( error.code, error.url, error.read(), ) )
def _do_trakt_auth_post(self, url, data): try: session = self.get_session() headers = { 'Content-Type': 'application/json', 'Authorization': 'Bearer ' + session, 'trakt-api-version': '2', 'trakt-api-key': self.CLIENT_ID } # timeout in seconds timeout = 5 socket.setdefaulttimeout(timeout) request = urllib2.Request(url, data, headers) response = urllib2.urlopen(request).read() self.logger.info('Response: {0}'.format(response)) return response except urllib2.HTTPError as e: self.logger.error('Unable to submit post data {url} - {error}'.format(url=url, error=e.reason)) raise
def GetThatShit(head_URL): source = "" global gets;global proxy_num head_URL = head_URL.replace("+",arg_eva) request_web = urllib2.Request(head_URL) request_web.add_header('User-Agent',agent) while len(source) < 1: if arg_debug == "on": print "\n[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n" try: gets+=1;proxy_num+=1 source = proxy_list[proxy_num % proxy_len].open(request_web).read() except (KeyboardInterrupt, SystemExit): raise except (urllib2.HTTPError): print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Trying again!" print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n" break except: print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Look at the error and try to figure it out!" print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n" raise return source #the guts and glory - Binary Algorithim that does all the guessing for the Blind Methodology
def getFile(link): try: source = urllib2.urlopen(link) except(urllib2.HTTPError),msg: print "\nError:",msg sys.exit() num = 1 file = 'tmp_insidepropw_'+link.split('=')[1]+'.txt' while os.path.isfile(file) == True: file = link.rsplit("/",1)[1]+"."+str(num) num+=1 try: shutil.copyfileobj(source, open(file, "w+")) except(IOError): print "\nCannot write to `"+file+"' (Permission denied)." sys.exit(1) print "File downloaded", file newfilelist.append(file)
def GetThatShit(head_URL): source = "" global gets;global proxy_num head_URL = head_URL.replace("+",arg_eva) request_web = urllib2.Request(head_URL) request_web.add_header('User-Agent',agent) while len(source) < 1: if arg_debug == "on": print "\n[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n" try: gets+=1;proxy_num+=1 source = proxy_list[proxy_num % proxy_len].open(request_web).read() except (KeyboardInterrupt, SystemExit): raise except (urllib2.HTTPError): print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Trying again!" print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n" break except: print "[-] Unexpected error:", sys.exc_info()[0],"\n[-] Look at the error and try to figure it out!" print "[proxy]:",proxy_list_count[proxy_num % proxy_len]+"\n[agent]:",agent+"\n[debug]:",head_URL,"\n" raise return source #say hello
def run(self): password = getword() try: print "-"*12 print "User:",username,"Password:",password req = urllib2.Request(sys.argv[1]) passman = urllib2.HTTPPasswordMgrWithDefaultRealm() passman.add_password(None, sys.argv[1], username, password) authhandler = urllib2.HTTPBasicAuthHandler(passman) opener = urllib2.build_opener(authhandler) fd = opener.open(req) print "\t\n\n[+] Login successful: Username:",username,"Password:",password,"\n" print "[+] Retrieved", fd.geturl() info = fd.info() for key, value in info.items(): print "%s = %s" % (key, value) sys.exit(2) except (urllib2.HTTPError,socket.error): pass
def run(self): username, password = getword() try: print "-"*12 print "User:",username,"Password:",password req = urllib2.Request(sys.argv[1]) passman = urllib2.HTTPPasswordMgrWithDefaultRealm() passman.add_password(None, sys.argv[1], username, password) authhandler = urllib2.HTTPBasicAuthHandler(passman) opener = urllib2.build_opener(authhandler) fd = opener.open(req) print "\t\n\nUsername:",username,"Password:",password,"----- Login successful!!!\n\n" print "Retrieved", fd.geturl() info = fd.info() for key, value in info.items(): print "%s = %s" % (key, value) sys.exit(2) except (urllib2.HTTPError, httplib.BadStatusLine,socket.error), msg: print "An error occurred:", msg pass
def getCookie(self): """ This method is the first to be called when initializing a Google dorking object through this library. It is used to retrieve the Google session cookie needed to perform the further search """ try: conn = self.opener.open("http://www.google.com/ncr") headers = conn.info() except urllib2.HTTPError, e: headers = e.info() except urllib2.URLError, e: errMsg = "unable to connect to Google" raise sqlmapConnectionException, errMsg
def search(self, googleDork): """ This method performs the effective search on Google providing the google dork and the Google session cookie """ if not googleDork: return None url = "http://www.google.com/search?" url += "q=%s&" % urlencode(googleDork) url += "num=100&hl=en&safe=off&filter=0&btnG=Search" try: conn = self.opener.open(url) page = conn.read() except urllib2.HTTPError, e: page = e.read() except urllib2.URLError, e: errMsg = "unable to connect to Google" raise sqlmapConnectionException, errMsg self.__matches = self.__parsePage(page) return self.__matches
def _api_call(url, opener): """ Makes a REST call against the Couchbase API. Args: url (str): The URL to get, including endpoint Returns: list: The JSON response """ try: urllib2.install_opener(opener) resp = urllib2.urlopen(url, timeout=http_timeout) except (urllib2.HTTPError, urllib2.URLError) as e: collectd.error("Error making API call (%s) %s" % (e, url)) return None try: return json.load(resp) except ValueError, e: collectd.error("Error parsing JSON for API call (%s) %s" % (e, url)) return None
def _woxikon_de_url_handler(target): ''' Query woxikon for sysnonym ''' time_out_choice = float(get_variable( 'tq_online_backends_timeout', _timeout_period_default)) try: response = urlopen(fixurl(u'http://synonyms.woxikon.com/de/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice) web_content = StringIO(unescape(decode_utf_8(response.read()))) response.close() except HTTPError: return 1 except URLError as err: if isinstance(err.reason, socket.timeout): # timeout error? return 1 return -1 # other error except socket.timeout: # timeout error failed to be captured by URLError return 1 return web_content
def _jeck_ru_url_handler(target): ''' Query jiport for sysnonym ''' time_out_choice = float(get_variable( 'tq_online_backends_timeout', _timeout_period_default)) try: response = urlopen(fixurl(u'http://jeck.ru/tools/SynonymsDictionary/{0}'.format(target)).decode('ASCII'), timeout = time_out_choice) web_content = StringIO(decode_utf_8(response.read())) response.close() except HTTPError: return 1 except URLError as err: if isinstance(err.reason, socket.timeout): # timeout error? return 1 return -1 # any other error except socket.timeout: # if timeout error not captured by URLError return 1 return web_content
def post_request(self, request, payload=None): # FIXME: provide full set of ssl options instead of this hack if self.server_url.startswith('https'): import ssl return urllib2.urlopen(request, data=payload, timeout=self.timeout, context=ssl._create_unverified_context()) return urllib2.urlopen(request, data=payload, timeout=self.timeout) # def post_request(self, request, payload=None): # @UnusedVariable # try: # try: # _response = urllib2.urlopen(request, timeout=self.timeout) # except TypeError: # _response = urllib2.urlopen(request) # except urllib2.HTTPError, e: # logerr("post failed: %s" % e) # raise weewx.restx.FailedPost(e) # else: # return _response
def gethtml(url): try: request = urllib2.Request(url) request.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64; rv:21.0) Gecko/20100101 Firefox/21.0') request.add_header('Accept-Language', 'en-us;q=0.5,en;q=0.3') request.add_header('Referer', request.get_full_url()) u = urllib2.urlopen(request , timeout = 3) content = u.read() try: content = content.encode("utf-8") except: content = content.decode('gbk','ignore').encode("utf-8",'ignore') return {"html":content,"code":u.code,"url":u.geturl()} except urllib2.HTTPError,e: try: return {"html":e.read(),"code":e.code,"url":e.geturl()} except: return {"html":'',"code":e.code,"url":e.geturl()} except: return {"html":"","code":404, "url":url}
def symlinks(user, repo): mappings = [] url1 = 'https://api.github.com/repos/%s/%s/git/refs/heads/master' % (user, repo) try: r = urllib2.urlopen(url1) except urllib2.HTTPError: print("Invalid url %s.Leaving..." % url1) sys.exit(1) base = json.load(r) sha = base['object']['sha'] url2 = 'https://api.github.com/repos/%s/%s/git/trees/%s?recursive=1' % (user, repo, sha) r = urllib2.urlopen(url2) try: base = json.load(r) except: return [] for e in base['tree']: if e['mode'] == '120000': mappings.append(e['path']) return mappings
def __login(self): """Funkcja wykonuj?ca logowanie do librusa""" # Odebranie ciasteczek self.__opener.addheaders = [('Authorization', 'Basic MzU6NjM2YWI0MThjY2JlODgyYjE5YTMzZjU3N2U5NGNiNGY=')] try: self.__opener.open('https://synergia.librus.pl') list(self.__cj)[0].domain='api.librus.pl' tokens = loads(self.__opener.open('https://api.librus.pl/OAuth/Token', data=urlencode({ 'grant_type': 'password', 'username': config.login, 'password': config.password, 'librus_long_term_token': '1', })).read()) except urllib2.HTTPError as e: e.getcode() == 400 raise WrongPasswordError('Nieprawid?owe has?o') self.__opener.addheaders = [('Authorization', 'Bearer %s' % tokens['access_token'])]
def get_announcements(self): """ Funkcja pobieraj?ca dane ze strony https://librus.synergia.pl/ogloszenia :returns: :return: lista [{"author": autor, "title": tytu?, "time": czas, "content": zawarto??}] """ # Za?adowanie og?osze? try: data = loads(self.__opener.open('https://api.librus.pl/2.0/SchoolNotices').read()) except urllib2.HTTPError: raise SessionExpiredError print data return [{'author': notice[u'AddedBy'][u'Id'], 'title': notice[u'Subject'].encode('utf-8'), 'content': notice[u'Content'].encode('utf-8'), 'time': notice[u'StartDate'] } for notice in data[u'SchoolNotices']]
def basic_auth(server="http://127.0.0.1"): """ to use basic login with a different server from gluon.contrib.login_methods.basic_auth import basic_auth auth.settings.login_methods.append(basic_auth('http://server')) """ def basic_login_aux(username, password, server=server): key = base64.b64encode(username + ':' + password) headers = {'Authorization': 'Basic ' + key} request = urllib2.Request(server, None, headers) try: urllib2.urlopen(request) return True except (urllib2.URLError, urllib2.HTTPError): return False return basic_login_aux
def get_server_secret(credentials, expires): """ Fetch server secret from CertiVox server """ path = 'serverSecret' params = urllib.urlencode({ 'app_id': credentials['app_id'], 'expires': expires, 'signature': sign_message( '{}{}{}'.format(path, credentials['app_id'], expires), str(credentials['app_key']) ) }) try: response = urllib2.urlopen('{api_url}{end_point}?{params}'.format( api_url=credentials['api_url'], end_point=path, params=params, )) except urllib2.HTTPError as e: if e.code == 408: print "Make sure your time it correct!" raise ScriptException('Response code: {} - {}'.format(e.code, e.read())) data = json.loads(response.read()) return data['serverSecret']
def fetch_json(self, url): """Fetch remote json""" timeout = 1 while True: try: logging.debug('Opening %s.', url) response = urllib2.urlopen(url) break except urllib2.HTTPError as err: if timeout <= MAX_TIMEOUT: logging.warn('Error opening %s, error code %d, reason is %s.', url, err.code, err.reason) logging.warn('Waiting for %ds before retrying.', timeout) time.sleep(timeout) timeout *= 2 else: logging.error('Error opening %s, error code %d, reason is %s.', url, err.code, err.reason) raise err data = json.load(response) return data
def __init__(self): super(SwarfarmLogger, self).__init__() self.plugin_enabled = True config_name = 'swproxy.config' if not os.path.exists(config_name): self.config = {} else: with open(config_name) as f: self.config = json.load(f) self.plugin_enabled = not self.config.get('disable_swarfarm_logger', False) if self.plugin_enabled: # Get the list of accepted commands from the server logger.info('SwarfarmLogger - Retrieving list of accepted log types from SWARFARM...') try: resp = urllib2.urlopen(self.commands_url) self.accepted_commands = json.loads(resp.readline()) resp.close() logger.info('SwarfarmLogger - Looking for the following commands to log:\r\n' + ', '.join(self.accepted_commands.keys())) except urllib2.HTTPError: logger.fatal('SwarfarmLogger - Unable to retrieve accepted log types. SWARFARM logging is disabled.') self.plugin_enabled = False
def process_data(self, req_json, resp_json): command = req_json.get('command') if command in self.accepted_commands: accepted_data = self.accepted_commands[command] result_data = {} if 'request' in accepted_data: result_data['request'] = {item: req_json.get(item) for item in accepted_data['request']} if 'response' in accepted_data: result_data['response'] = {item: resp_json.get(item) for item in accepted_data['response']} if result_data: data = json.dumps(result_data) try: resp = urllib2.urlopen(self.log_url, data=urllib.urlencode({'data': data})) except urllib2.HTTPError as e: logger.warn('SwarfarmLogger - Error: {}'.format(e.readline())) else: resp.close() logger.info('SwarfarmLogger - {} logged successfully'.format(command))
def main(): module = AnsibleModule( argument_spec=dict( url=dict(required=True), body=dict(required=True), header=dict(required=False), ) ) url = module.params['url'] body = module.params['body'] header = module.params['header'] req = Request(url) req.add_header('Content-Type', 'application/json') if header: for k, v in header.iteritems(): req.add_header(k, v) try: urlopen(req, json.dumps(body)) except HTTPError as e: module.fail_json(msg=e.reason, code=e.code, response=e.read()) else: module.exit_json(changed=True)
def download_and_import(self, repo): try: response = urllib2.urlopen(GITHUB_LINK.format(repo)) response_sio = StringIO.StringIO(response.read()) with zipfile.ZipFile(response_sio) as repo_zip: repo_zip.extractall(tempfile.tempdir) deck_base_name = repo.split("/")[-1] deck_directory_wb = Path(tempfile.tempdir).joinpath(deck_base_name + "-" + BRANCH_NAME) deck_directory = Path(tempfile.tempdir).joinpath(deck_base_name) utils.fs_remove(deck_directory) deck_directory_wb.rename(deck_directory) # Todo progressbar on download AnkiJsonImporter.import_deck(self.collection, deck_directory) except (urllib2.URLError, urllib2.HTTPError, OSError) as error: aqt.utils.showWarning("Error while trying to get deck from Github: {}".format(error)) raise
def fetch_decode(url, encoding=None): """ Fetch url and decode. """ try: req = g.opener.open(url) except HTTPError as e: if e.getcode() == 503: time.sleep(.5) return fetch_decode(url, encoding) else: raise ct = req.headers['content-type'] if encoding: return req.read().decode(encoding) elif "charset=" in ct: dbg("charset: %s", ct) encoding = re.search(r"charset=([\w-]+)\s*(:?;|$)", ct).group(1) return req.read().decode(encoding) else: dbg("encoding unknown") return req.read()
def call_gdata(api, qs): """Make a request to the youtube gdata api.""" qs = dict(qs) qs['key'] = g.api_key url = g.urls['gdata'] + api + '?' + urlencode(qs) try: data = g.opener.open(url).read().decode('utf-8') except HTTPError as e: try: errdata = e.file.read().decode() error = json.loads(errdata)['error']['message'] errmsg = 'Youtube Error %d: %s' % (e.getcode(), error) except: errmsg = str(e) raise GdataError(errmsg) return json.loads(data)
def getUrlContent( self, url ): # connect to server: # if there is a 500 error, try a few more times before giving up # any other error, just bail #print "ATB---", url for tries in range(3): try: resp = urllib2.urlopen( url ) return resp.read() except urllib2.HTTPError as e: if e.getcode() == 500: self.writeLog( "Try #{0}: ".format(tries+1) ) time.sleep(1) self.writeLog( str(e) + "\n" ) if e.getcode() != 500: break except Exception as e: self.writeLog( str(e) + "\n" ) raise ComicVineTalkerException(ComicVineTalkerException.Network, "Network Error!") raise ComicVineTalkerException(ComicVineTalkerException.Unknown, "Error on Comic Vine server")
def check_for_update(): if os.path.exists(FILE_UPDATE): mtime = os.path.getmtime(FILE_UPDATE) last = datetime.utcfromtimestamp(mtime).strftime('%Y-%m-%d') today = datetime.utcnow().strftime('%Y-%m-%d') if last == today: return try: with open(FILE_UPDATE, 'a'): os.utime(FILE_UPDATE, None) request = urllib2.Request( CORE_VERSION_URL, urllib.urlencode({'version': main.__version__}), ) response = urllib2.urlopen(request) with open(FILE_UPDATE, 'w') as update_json: update_json.write(response.read()) except (urllib2.HTTPError, urllib2.URLError): pass
def _GetAuthCookie(self, auth_token): """Fetches authentication cookies for an authentication token. Args: auth_token: The authentication token returned by ClientLogin. Raises: HTTPError: If there was an error fetching the authentication cookies. """ continue_location = "http://localhost/" args = {"continue": continue_location, "auth": auth_token} login_path = os.environ.get("APPCFG_LOGIN_PATH", "/_ah") req = self._CreateRequest("%s://%s%s/login?%s" % (self.scheme, self.host, login_path, urllib.urlencode(args))) try: response = self.opener.open(req) except urllib2.HTTPError, e: response = e if (response.code != 302 or response.info()["location"] != continue_location): raise urllib2.HTTPError(req.get_full_url(), response.code, response.msg, response.headers, response.fp) self.authenticated = True
def _GetRemoteResourceLimits(logging_context): """Get the resource limit as reported by the admin console. Get the resource limits by querying the admin_console/appserver. The actual limits returned depends on the server we are talking to and could be missing values we expect or include extra values. Args: logging_context: The _ClientDeployLoggingContext for this upload. Returns: A dictionary. """ try: yaml_data = logging_context.Send('/api/appversion/getresourcelimits') except urllib2.HTTPError, err: if err.code != 404: raise return {} return yaml.safe_load(yaml_data)
def Send(self, url, payload='', **kwargs): """Sends a request to the server, with common params.""" start_time_usec = self.GetCurrentTimeUsec() request_size_bytes = len(payload) try: logging.info('Send: %s, params=%s', url, self.request_params) kwargs.update(self.request_params) result = self.rpcserver.Send(url, payload=payload, **kwargs) self._RegisterReqestForLogging(url, 200, start_time_usec, request_size_bytes) return result except urllib2.HTTPError, e: self._RegisterReqestForLogging(url, e.code, start_time_usec, request_size_bytes) raise e
def _IsExceptionClientDeployLoggable(self, exception): """Determines if an exception qualifes for client deploy log reistration. Args: exception: The exception to check. Returns: True iff exception qualifies for client deploy logging - basically a system error rather than a user or error or cancellation. """ if isinstance(exception, KeyboardInterrupt): return False if (isinstance(exception, urllib2.HTTPError) and 400 <= exception.code <= 499): return False return True
def _LogDoUploadException(exception): """Helper that logs exceptions that occurred during DoUpload. Args: exception: An exception that was thrown during DoUpload. """ def InstanceOf(tipe): return isinstance(exception, tipe) if InstanceOf(KeyboardInterrupt): logging.info('User interrupted. Aborting.') elif InstanceOf(urllib2.HTTPError): logging.info('HTTP Error (%s)', exception) elif InstanceOf(CannotStartServingError): logging.error(exception.message) else: logging.exception('An unexpected error occurred. Aborting.')
def get_page(self, url, data=None): handlers = [PoolHTTPHandler] opener = urllib2.build_opener(*handlers) if data: data = urllib.urlencode(data) request = urllib2.Request(url, data, self.headers) try: response = opener.open(request) return response.read() except (urllib2.HTTPError, urllib2.URLError), e: raise BrowserError(url, str(e)) except (socket.error, socket.sslerror), msg: raise BrowserError(url, msg) except socket.timeout, e: raise BrowserError(url, "timeout") except KeyboardInterrupt: raise except: raise BrowserError(url, "unknown error")
def broadcast_tx(self, tx): s = io.BytesIO() tx.stream(s) tx_as_hex = b2h(s.getvalue()) data = urlencode(dict(tx=tx_as_hex)).encode("utf8") URL = "http://blockchain.info/pushtx" try: d = urlopen(URL, data=data).read() return d except HTTPError as ex: try: d = ex.read() ex.message = d except: pass raise ex
def discordEmbeddedPush(self, embed): """ Send embedded message to discord bot huehue """ data = json.dumps({"embeds": [embed]}) req = urllib2.Request(self._discordWebhookUrl, data, { 'Content-Type': 'application/json', "User-Agent": "B3DiscordbanPlugin/1.1" #Is that a real User-Agent? Nope but who cares. }) # Final magic happens here, we will never get an error ofcourse ;) try: urllib2.urlopen(req) except urllib2.HTTPError as ex: self.debug("Cannot push data to Discord. is your webhook url right?") self.debug("Data: %s\nCode: %s\nRead: %s" % (data, ex.code, ex.read()))
def _do_put_request(self, resource, param_dict): req_url = urlparse.urlunparse(["http", self.host, "api/v%s/%s" % (self.api_version, resource), "", "", ""]) print "req_url=%s" % (req_url) opener = urllib2.build_opener(urllib2.HTTPHandler) req = urllib2.Request(req_url, data=json.dumps(param_dict)) req.add_header('Content-Type', 'application/json') req.get_method = lambda: 'PUT' try: return eval(opener.open(req).read()) except urllib2.HTTPError, err: return parse_errors(err) #--------------------------------------------- # error parsing # --------------------------------------------
def Post(self, url, data, refer=None): try: # print "requesting " + str(url) + " with data:" # print data # print "Cookies: " # print self.__cookie req = urllib2.Request(url, urllib.urlencode(data)) if refer is not None: req.add_header('Referer', refer) else: req.add_header('Referer', 'http://d1.web2.qq.com/proxy.html?v=20151105001&callback=1&id=2') # print "Headers: " # print req.headers tmp_req = urllib2.urlopen(req, timeout=180) self.__cookie.save('cookie/cookie.data',ignore_discard=True,ignore_expires=True) return tmp_req.read() except urllib2.HTTPError, e: return e.read()
def raise_for_status(self): """Raise stored error if one occurred. error will be instance of :class:`urllib2.HTTPError` """ if self.error is not None: raise self.error return
def run(self): try: client = DnsdbClient(self.dnsdb_server, self.dnsdb_key) self.report({ "records": map(lambda r: self.update_date('time_first', self.update_date('time_last', r)), self.execute_dnsdb_service(client)) }) except HTTPError, e: if e.code != 404: self.unexpectedError(e) else: self.report({"records": []})
def linksExtractor(url, fileFormat='png'): tag = 'a' attr = 'href' if (fileFormat in ['png', 'jpg', 'jpeg', 'tiff', 'bmp', 'svg', 'gif']): tag = 'img' attr = 'src' try: headers={'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'} req=urllib2.Request(url, None, headers) htmlDoc=urllib2.urlopen(req).read() except urllib2.HTTPError as err: print("Server Response : " + str(err.code())) return "Server refused to connect!" except urllib2.URLError: return 'Invalid URL!' page = BeautifulSoup(htmlDoc, 'html.parser') page.prettify() res = [] for link in page.find_all(tag): pre = link.get(attr) pre = str(pre) if (pre[-len(fileFormat):] == fileFormat): res.append(pre) else: pass if (len(res) < 1): return 'EMPTY' return res
def update_plex(): Logger.info("plex - sending request to update Plex") url = 'http://%s/library/sections/all/refresh?X-Plex-Token=%s' % (PLEX_IP, PLEX_TOKEN) try: urllib2.urlopen(url).read() except urllib2.HTTPError, e: Logger.warning("plex - unable to make request to Plex - HTTP Error %s", str(e.code)) except urllib2.URLError, e: Logger.warning("plex - unable to make request to Plex - URL Error %s", e.reason) else: Logger.info("plex - update successful")