我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用urllib.request.HTTPError()。
def broadcast_tx(self, tx): s = io.BytesIO() tx.stream(s) tx_as_hex = b2h(s.getvalue()) data = urlencode(dict(tx=tx_as_hex)).encode("utf8") URL = "http://blockchain.info/pushtx" try: d = urlopen(URL, data=data).read() return d except HTTPError as ex: try: d = ex.read() ex.message = d except: pass raise ex
def get_access_token(self, code, state=None): ''' In callback url: http://host/callback?code=123&state=xyz use code and state to get an access token. ''' kw = dict(client_id=self._client_id, client_secret=self._client_secret, code=code) if self._redirect_uri: kw['redirect_uri'] = self._redirect_uri if state: kw['state'] = state opener = build_opener(HTTPSHandler) request = Request('https://github.com/login/oauth/access_token', data=_encode_params(kw)) request.get_method = _METHOD_MAP['POST'] request.add_header('Accept', 'application/json') try: response = opener.open(request, timeout=TIMEOUT) r = _parse_json(response.read()) if 'error' in r: raise ApiAuthError(str(r.error)) return str(r.access_token) except HTTPError as e: raise ApiAuthError('HTTPError when get access token')
def command(self, command, value=None): func_str = 'GoProHero.command({}, {})'.format(command, value) if command in self.commandMaxtrix: args = self.commandMaxtrix[command] # accept both None and '' for commands without a value if value == '': value = None # for commands with values, translate the value if value is not None and value in args['translate']: value = args['translate'][value] # build the final url url = self._commandURL(args['cmd'], value) # attempt to contact the camera try: urlopen(url, timeout=self.timeout).read() logging.info('{} - http success!'.format(func_str)) return True except (HTTPError, URLError, socket.timeout) as e: logging.warning('{}{} - error opening {}: {}{}'.format( Fore.YELLOW, func_str, url, e, Fore.RESET)) # catchall return statement return False
def rank_checker(url,hatebu_url): try: html = request.urlopen(hatebu_url) except request.HTTPError as e: print(e.reason) except request.URLError as e: print(e.reason) soup = BeautifulSoup(html,"lxml") a = soup.find("a",href=url) if a == None: rank = None else: rank = a.get("data-entryrank") return rank # ????????????????????
def do_request(self, req): if DEBUG: print('requesting', req.get_method(), req.get_full_url()) opener = self.build_opener() opener.add_handler(self._cookie_processor) try: self._response = opener.open(req) except HTTPError as e: self._response = e self.url = self._response.geturl() self.path = get_selector(Request(self.url)) self.data = self._response.read() self.status = self._response.code self._forms = None self.form = None return self.get_response()
def call_api(self): url = 'http://prj2epsg.org/search.json?' params = { 'mode': 'wkt', 'terms': self.prj } try: req = request.urlopen(url+urlencode(params)) except request.HTTPError as http_exc: logger.warning("""Failed to retrieve data from prj2epsg.org API:\n Status: %s \n Message: %s""" % (http_exc.code, http_exc.msg)) else: raw_resp = req.read() try: resp = json.loads(raw_resp.decode('utf-8')) except json.JSONDecodeError: logger.warning('API call succeeded but response\ is not JSON: %s' % raw_resp) self.process_api_result(resp)
def _do_post(self, url, data, headers): try: # Python 3 if data != None: data = bytes(data, 'utf-8') except: # Python 2 pass try: req = url_request.Request(url=url, data=data, headers=headers) req.add_header('User-Agent', KKBOXHTTP.USER_AGENT) f = url_request.urlopen(req) r = f.read() except url_request.HTTPError as e: print(e.fp.read()) raise (e) except Exception as e: raise (e) try: # Python 3 r = str(r, 'utf-8') except: # Python 2 pass json_object = json.loads(r) return json_object
def download_fzf_binary(plat, arch, overwrite=False, access_token=None): bin_path = fzf_windows_bin_path if plat == 'windows' else fzf_bin_path if overwrite or not os.path.isfile(bin_path): asset = get_fzf_binary_url(plat, arch, access_token) url, ext = asset if access_token: url = '{0}?access_token={1}'.format(url, access_token) try: r = urllib2.urlopen(url) except urllib2.HTTPError as e: if e.code == 403 and e.info().get('X-RateLimit-Remaining') == 0: raise RuntimeError( 'GitHub rate limit reached. To increate the limit use ' '-g/--github-access-token option.\n ' + str(e) ) elif e.code == 401 and access_token: raise RuntimeError('Invalid GitHub access token.') raise extract(r, ext, bin_path) r.close() mode = os.stat(bin_path).st_mode if not (mode & 0o111): os.chmod(bin_path, mode | 0o111)
def install(self, version, core_count, read_replica_count, initial_port, password, verbose=False): try: package = _create_controller().download("enterprise", version, self.path, verbose=verbose) port_gen = count(initial_port) initial_discovery_members = self._install_cores(self.path, package, core_count, port_gen) self._install_read_replicas(self.path, package, initial_discovery_members, read_replica_count, port_gen) self._set_initial_password(password) return realpath(self.path) except HTTPError as error: if error.code == 401: raise RuntimeError("Missing or incorrect authorization") elif error.code == 403: raise RuntimeError("Could not download package from %s (403 Forbidden)" % error.url) else: raise
def get_location(url): try: response = request.urlopen(url) # urllib will follow redirections and it's too much code to tell urllib # not to do that return response.geturl() except socket.timeout: print('request timeout') exit() except request.HTTPError as e: print(e.code) except request.URLError as e: print(e.reason) exit() return "fail"
def send_data(self, event): """Send event to VES""" server_url = "http{}://{}:{}{}/eventListener/v{}{}".format( 's' if self._app_config['UseHttps'] else '', self._app_config['Domain'], int(self._app_config['Port']), '{}'.format('/{}'.format(self._app_config['Path']) if len( self._app_config['Path']) > 0 else ''), int(self._app_config['ApiVersion']), '{}'.format( '/{}'.format(self._app_config['Topic']) if len( self._app_config['Topic']) > 0 else '')) logging.info('Vendor Event Listener is at: {}'.format(server_url)) credentials = base64.b64encode('{}:{}'.format( self._app_config['Username'], self._app_config['Password']).encode()).decode() logging.info('Authentication credentials are: {}'.format(credentials)) try: request = url.Request(server_url) request.add_header('Authorization', 'Basic {}'.format(credentials)) request.add_header('Content-Type', 'application/json') event_str = json.dumps(event).encode() logging.debug("Sending {} to {}".format(event_str, server_url)) url.urlopen(request, event_str, timeout=1) logging.debug("Sent data to {} successfully".format(server_url)) except url.HTTPError as e: logging.error('Vendor Event Listener exception: {}'.format(e)) except url.URLError as e: logging.error( 'Vendor Event Listener is is not reachable: {}'.format(e)) except Exception as e: logging.error('Vendor Event Listener error: {}'.format(e))
def request(self, host, handler, request_body, verbose=0): """Send XMLRPC request""" uri = '{scheme}://{host}{handler}'.format(scheme=self._scheme, host=host, handler=handler) if self._passmgr: self._passmgr.add_password(None, uri, self._username, self._password) if self.verbose: _LOGGER.debug("FabricTransport: {0}".format(uri)) opener = urllib2.build_opener(*self._handlers) headers = { 'Content-Type': 'text/xml', 'User-Agent': self.user_agent, } req = urllib2.Request(uri, request_body, headers=headers) try: return self.parse_response(opener.open(req)) except (urllib2.URLError, urllib2.HTTPError) as exc: try: code = -1 if exc.code == 400: reason = 'Permission denied' code = exc.code else: reason = exc.reason msg = "{reason} ({code})".format(reason=reason, code=code) except AttributeError: if 'SSL' in str(exc): msg = "SSL error" else: msg = str(exc) raise InterfaceError("Connection with Fabric failed: " + msg) except BadStatusLine: raise InterfaceError("Connection with Fabric failed: check SSL")
def request(self, url, method="GET", body=None, headers={}): req = urllib2.Request(url, body, headers) try: f = self.request_opener(req, timeout=self._timeout) return f.info(), f.read() except urllib2.HTTPError as f: if f.code != 500: raise return f.info(), f.read()
def download_file(url, download_directory): """Download a remote file Args: download_directory: (string) Returns: (string) that path of the file that was just downloaded. If something failed during download, return None Raises: DownloadError """ Output.print_information("Downloading " + url + " ...") parsed_url = urlparse(url) if parsed_url.path in ["/", ""]: file_name = parsed_url.netloc else: file_name = parsed_url.path.split("/")[-1] download_path = abspath(join(download_directory, file_name)) try: with open(download_path, 'wb') as file_object: file_object.write(urlopen(url).read()) return download_path except HTTPError as expn: raise DownloadError("HTTP error code " + str(expn.code) + " while retrieving " \ + url + "\n" + str(expn.reason)) except URLError as expn: raise DownloadError("HTTP URL error while retrieving " + url + "\n" + str(expn.reason)) except Exception as expn: raise DownloadError("Unable to retrieve " + url + "\n" + str(expn))
def _http(self, _method, _path, **kw): data = None params = None if _method=='GET' and kw: _path = '%s?%s' % (_path, _encode_params(kw)) if _method in ['POST', 'PATCH', 'PUT']: data = bytes(_encode_json(kw), 'utf-8') url = '%s%s' % (_URL, _path) opener = build_opener(HTTPSHandler) request = Request(url, data=data) request.get_method = _METHOD_MAP[_method] if self._authorization: request.add_header('Authorization', self._authorization) if _method in ['POST', 'PATCH', 'PUT']: request.add_header('Content-Type', 'application/x-www-form-urlencoded') try: response = opener.open(request, timeout=TIMEOUT) is_json = self._process_resp(response.headers) if is_json: return _parse_json(response.read().decode('utf-8')) except HTTPError as e: is_json = self._process_resp(e.headers) if is_json: json = _parse_json(e.read().decode('utf-8')) else: json = e.read().decode('utf-8') req = JsonObject(method=_method, url=url) resp = JsonObject(code=e.code, json=json) if resp.code==404: raise ApiNotFoundError(url, req, resp) raise ApiError(url, req, resp)
def api_request_native(url, data=None, token=None, https_proxy=None, method=None): request = urllib.Request(url) # print('API request url:', request.get_full_url()) if method: request.get_method = lambda: method token = token if token != None else token_auth_string() request.add_header('Authorization', 'token ' + token) request.add_header('Accept', 'application/json') request.add_header('Content-Type', 'application/json') if data is not None: request.add_data(bytes(data.encode('utf8'))) # print('API request data:', request.get_data()) # print('API request header:', request.header_items()) # https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy') # if https_proxy: # opener = urllib.build_opener(urllib.HTTPHandler(), urllib.HTTPSHandler(), # urllib.ProxyHandler({'https': https_proxy})) # urllib.install_opener(opener) try: with contextlib.closing(urllib.urlopen(request)) as response: if response.code == 204: # No Content return None else: return json.loads(response.read().decode('utf8', 'ignore')) except urllib.HTTPError as err: with contextlib.closing(err): raise SimpleHTTPError(err.code, err.read())
def send_response(event, context, response_status, reason=None, response_data={}): body = { "Status": response_status, "PhysicalResourceId": context.log_stream_name, "StackId": event["StackId"], "RequestId": event["RequestId"], "LogicalResourceId": event["LogicalResourceId"], } print("Responding: {}".format(response_status)) if reason: print(reason) body["Reason"] = reason if response_data: print(response_data) body["Data"] = response_data body = json.dumps(body).encode("utf-8") req = Request(event["ResponseURL"], data=body, headers={ "Content-Length": len(body), "Content-Type": "", }) req.get_method = lambda: "PUT" try: urlopen(req) return True except HTTPError as e: print("Failed executing HTTP request: {}".format(e.code)) return False except URLError as e: print("Failed to reach the server: {}".format(e.reason)) return False
def test(self, url, toHex=True): try: url = 'http://{}/{}'.format(self._ip, url) print(url) response = urlopen( url, timeout=self.timeout).read() if toHex: response = response.encode('hex') print(response) except (HTTPError, URLError, socket.timeout) as e: print(e)
def catch_request(request): """Helper function to catch common exceptions encountered when establishing a connection with a HTTP/HTTPS request """ try: uh = urlopen(request) return uh, False except (HTTPError, URLError, socket.error): e = sys.exc_info()[1] return None, e
def getBestServer(servers): """Perform a speedtest.net latency request to determine which speedtest.net server has the lowest latency """ results = {} for server in servers: cum = [] url = '%s/latency.txt' % os.path.dirname(server['url']) urlparts = urlparse(url) for i in range(0, 3): try: if urlparts[0] == 'https': h = HTTPSConnection(urlparts[1]) else: h = HTTPConnection(urlparts[1]) headers = {'User-Agent': user_agent} start = timeit.default_timer() h.request("GET", urlparts[2], headers=headers) r = h.getresponse() total = (timeit.default_timer() - start) except (HTTPError, URLError, socket.error): cum.append(3600) continue text = r.read(9) if int(r.status) == 200 and text == 'test=test'.encode(): cum.append(total) else: cum.append(3600) h.close() avg = round((sum(cum) / 6) * 1000, 3) results[avg] = server fastest = sorted(results.keys())[0] best = results[fastest] best['latency'] = fastest return best
def getBestServer(servers): """Perform a speedtest.net latency request to determine which speedtest.net server has the lowest latency """ results = {} for server in servers: cum = [] url = '%s/latency.txt' % os.path.dirname(server['url']) urlparts = urlparse(url) for i in range(0, 3): try: if urlparts[0] == 'https': h = HTTPSConnection(urlparts[1]) else: h = HTTPConnection(urlparts[1]) start = timeit.default_timer() h.request("GET", urlparts[2]) r = h.getresponse() total = (timeit.default_timer() - start) except (HTTPError, URLError, socket.error): cum.append(3600) continue text = r.read(9) if int(r.status) == 200 and text == 'test=test'.encode(): cum.append(total) else: cum.append(3600) h.close() avg = round((sum(cum) / 6) * 1000, 3) results[avg] = server fastest = sorted(results.keys())[0] best = results[fastest] best['latency'] = fastest return best
def catch_request(request): """Helper function to catch common exceptions encountered when establishing a connection with a HTTP/HTTPS request """ try: uh = urlopen(request) return uh except (HTTPError, URLError, socket.error): return False
def send_tx(tx): s = io.BytesIO() tx.stream(s) tx_as_hex = b2h(s.getvalue()) data = urlencode(dict(tx=tx_as_hex)).encode("utf8") URL = "http://blockchain.info/pushtx" try: d = urlopen(URL, data=data).read() return d except HTTPError as ex: d = ex.read() print(ex)
def get_category(url): try: html = request.urlopen("http://b.hatena.ne.jp/entry/{}".format(url)) soup = BeautifulSoup(html,"lxml") return soup.find("html").get("data-category-name") except request.HTTPError as e: print(e.reason) except request.URLError as e: print(e.reason) #??????????????????
def is_hatenatop(url): try: html = request.urlopen("http://hatenablog.com/") except urllib.HTTPError as e: print(e.reason) except urllib.URLError as e: print(e.reason) soup = BeautifulSoup(html,"lxml") a = soup.find("a",href=url) if a is None: return False return url == a.get("href")
def get_access_token(self, code, state=None): ''' In callback url: http://host/callback?code=123&state=xyz use code and state to get an access token. ''' kw = dict( client_id=self._client_id, client_secret=self._client_secret, code=code) if self._redirect_uri: kw['redirect_uri'] = self._redirect_uri if state: kw['state'] = state opener = build_opener(HTTPSHandler) request = Request( 'https://github.com/login/oauth/access_token', data=_encode_params(kw)) request.get_method = _METHOD_MAP['POST'] request.add_header('Accept', 'application/json') try: response = opener.open(request, timeout=TIMEOUT) r = _parse_json(response.read()) if 'error' in r: raise ApiAuthError(str(r.error)) return str(r.access_token) except HTTPError as e: raise ApiAuthError('HTTPError when get access token')
def _http(self, _method, _path, **kw): data = None params = None if _method == 'GET' and kw: _path = '%s?%s' % (_path, _encode_params(kw)) if _method in ['POST', 'PATCH', 'PUT']: data = bytes(_encode_json(kw), 'utf-8') url = '%s%s' % (_URL, _path) opener = build_opener(HTTPSHandler) request = Request(url, data=data) request.get_method = _METHOD_MAP[_method] if self._authorization: request.add_header('Authorization', self._authorization) if _method in ['POST', 'PATCH', 'PUT']: request.add_header('Content-Type', 'application/x-www-form-urlencoded') class Resp(): code = None resp = Resp() req = None try: response = opener.open(request, timeout=TIMEOUT) is_json = self._process_resp(response.headers) if is_json: return _parse_json(response.read().decode('utf-8')) except HTTPError as e: is_json = self._process_resp(e.headers) if is_json: json = _parse_json(e.read().decode('utf-8')) else: json = e.read().decode('utf-8') req = JsonObject(method=_method, url=url) resp = JsonObject(code=e.code, json=json) finally: if resp.code == 404: raise ApiNotFoundError(url, req, resp) elif req: raise ApiError(url, req, resp)
def __get_profile_first_posts(self): url = self.__profile_fp_url.format(self.username) try: data = json.loads(self.__process_url(url)) except simple_browser.HTTPError: raise ValueError('User not found.') self.user['un'] = self.username self.user['id'] = data['user']['id'] self.user['fn'] = data['user']['full_name'] self.user['b'] = data['user']['biography'] self.user['pic'] = data['user']['profile_pic_url_hd'] self.user['iv'] = data['user']['is_verified'] self.user['ip'] = data['user']['is_private'] self.user[COUNTERS_KEY] = { COUNT_KEY_FOLLOWING: data['user']['follows']['count'], COUNT_KEY_FOLLOWED_BY: data['user']['followed_by']['count'], COUNT_KEY_POSTS: data['user']['media']['count'], COUNT_KEY_IMAGE_POSTS: 0, COUNT_KEY_VIDEO_POSTS: 0, COUNT_KEY_ALBUM_POSTS: 0, COUNT_KEY_LIKES: 0, COUNT_KEY_COMMENTS: 0, COUNT_KEY_VIDEO_VIEWS: 0, COUNT_KEY_LIKES_PER_POST: 0, COUNT_KEY_COMMENTS_PER_POST: 0, COUNT_KEY_VIEWS_PER_POST: 0, } self.__send_success_callback('account', self.user) if not self.user['ip'] and self.user[COUNTERS_KEY][COUNT_KEY_POSTS]: self.__process_posts_first(data['user']['media']['nodes']) self.__tmp_req_info = data['user']['media']['page_info']
def get_access_token(self, renew=False): """ Get an access token using your app_id and app_secret. You shouldn't need to call this method yourself. If there is no access token yet, this method will be called when a request is made. If a token expires, this method will also automatically be called to renew the token. Args: renew: if True, then force the client to get a new token (even if not expired). By default if there is already an access token in the client then this method is a no-op. """ if self.access_token is None or renew: headers = {} # don't use json here, juse urlencode. url = self._url_for_op('token') data = urlencode({'grant_type': 'client_credentials', 'client_id':self.CLIENT_ID, 'client_secret':self.CLIENT_SECRET}) data = bytearray(data, 'utf-8') req = urllib2.Request(url, data, headers) try: response = urllib2.urlopen(req).read() response = self._parse_response(response) except urllib2.HTTPError as e: raise ApiError(e.reason) except Exception as e: raise ApiError(e) self.access_token = response['access_token'] return self.access_token
def urlopen(self, url): # pylint:disable=no-self-use """ Open a URI; return urllib.request.Request object """ if download_npo.verbose: msg('urlopen: ' + url) headers = { 'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:51.0) Gecko/20100101 Firefox/51.0', 'Cookie': 'npo_cc=tmp; npo_cc_www.npogeschiedenis.nl=tmp', } req = urllib2.Request(url, headers=headers) try: return urllib2.urlopen(req) except urllib2.HTTPError: raise download_npo.Error( 'De URL {} is niet gevonden (404 error)'.format(url))
def crawl_club_users(cid): ''' Crawl through and get all user ids in a specific club''' club_users=[] page_num, count = 0, 36 while count==36: # Prepare url: url = 'https://myanimelist.net/clubs.php?action=view&t=members&id={0}&show={1}'.format(cid,str(page_num*36)) try: page = urllib2.urlopen(url) soup = BeautifulSoup(page) # Extract all links: all_links = soup.find_all('a',href=True) count = 0 for link in all_links: if 'profile' in link['href']: if len(link.text)>0: # These are the users club_users.append(link.text) count+=1 # Get the next page number and rest: page_num +=1 time.sleep(0.5+abs(np.random.randn(1))) if int(page_num)%10==9: print('Moving to page ' + str(page_num)) except urllib2.HTTPError: count=0 return club_users
def __request_handler(callee): try: return callee() except HTTPError as e: if e.code == 403: print("Seems like I exceeded the number of requests per minute...") print("I'm gonna sleep for 1 min and try again later...") time.sleep(60) return callee()
def get_fzf_release(access_token=None): filename = 'fzf-{0}-release.json'.format(fzf_version) filepath = os.path.join(os.path.dirname(__file__), filename) try: with open(filepath) as f: d = f.read() except IOError: url = release_url if access_token: url = '{0}?access_token={1}'.format(url, access_token) try: r = urllib2.urlopen(url) except urllib2.HTTPError as e: if e.code == 403 and e.info().get('X-RateLimit-Remaining') == 0: raise RuntimeError( 'GitHub rate limit reached. To increate the limit use ' '-g/--github-access-token option.\n ' + str(e) ) elif e.code == 401 and access_token: raise RuntimeError('Invalid GitHub access token.') raise d = r.read() r.close() mode = 'w' + ('b' if isinstance(d, bytes) else '') try: with open(filename, mode) as f: f.write(d) except IOError: pass try: return json.loads(d) except TypeError: return json.loads(d.decode('utf-8'))
def _query(self, path, before=None, after=None): res = [] url = '%s/lookup/%s' % (self.server, path) params = {} if self.limit: params['limit'] = self.limit if before and after: params['time_first_after'] = after params['time_last_before'] = before else: if before: params['time_first_before'] = before if after: params['time_last_after'] = after if params: url += '?{0}'.format(urlencode(params)) req = Request(url) req.add_header('Accept', 'application/json') req.add_header('X-Api-Key', self.apikey) proxy_args = {} if self.http_proxy: proxy_args['http'] = self.http_proxy if self.https_proxy: proxy_args['https'] = self.https_proxy proxy_handler = ProxyHandler(proxy_args) opener = build_opener(proxy_handler) try: http = opener.open(req) while True: line = http.readline() if not line: break yield json.loads(line.decode('ascii')) except (HTTPError, URLError) as e: raise QueryError(str(e), sys.exc_traceback)
def _install(edition, version, path, **kwargs): controller = _create_controller() try: home = controller.install(edition, version.strip(), path, **kwargs) except HTTPError as error: if error.code == 401: raise RuntimeError("Missing or incorrect authorization") elif error.code == 403: raise RuntimeError("Could not download package from %s (403 Forbidden)" % error.url) else: raise else: return home
def send(self): sg = sendgrid.SendGridAPIClient( apikey=settings.DJANGO_SENDGRID_PARSE_API) data = { 'personalizations': [ { 'to': [ { 'email': mail } for mail in self.to ], 'substitutions': self.body, 'subject': self.subject, } ], 'from': { 'email': self.from_email }, 'template_id': self.template_id, } try: response = sg.client.mail.send.post(request_body=data) except urllib.HTTPError as e: print(e.read())
def markdownRefresh(): PORT = vim.eval('g:mkdp_port') curBuf = vim.current.buffer bufnr = curBuf.number pbufnr = prefix + str(bufnr) lineNum = vim.current.window.cursor[0] - 1 encoding = vim.eval('&encoding').upper() if PY_VERSOIN == '2': lines = NEW_LINE.join(curBuf).decode(encoding).split(U_NEW_LINE) else: lines = NEW_LINE.join(curBuf).split(U_NEW_LINE) curLine = lines[lineNum] if tag.search(curLine) != None: curLine = tag.sub(u'\\1 ' + flagSign, curLine, 1) else: curLine = B.sub(flagSign, curLine, 1) lines[lineNum] = curLine data = U_NEW_LINE.join(lines).encode('utf-8') req = urllib2.Request(URL % (PORT, pbufnr), data = data) req.get_method = lambda: "PUT" try: urllib2.urlopen(req) except urllib2.HTTPError as e: if e.code == 406: vim.command('call remove(g:mkdp_bufs, %s)' % bufnr)
def stop(self): """Stop application process.""" if self._thread: try: urlopen(self.url('/shutdown')) except HTTPError: pass # 500 server closed self._thread.join() self._thread = None
def _http_request(self, url, headers={}, data=None): req = urllib2.Request(url, headers=headers, data=data) try: resp = self.opener.open(req) except urllib2.HTTPError as e: if e.code == 404: raise NotGitRepository() if e.code != 200: raise GitProtocolError("unexpected http response %d" % e.code) return resp