我们从Python开源项目中,提取了以下21个代码示例,用于说明如何使用http.client.HTTPException()。
def test_named_sequences_full(self): # Check all the named sequences url = ("http://www.unicode.org/Public/%s/ucd/NamedSequences.txt" % unicodedata.unidata_version) try: testdata = support.open_urlresource(url, encoding="utf-8", check=check_version) except (IOError, HTTPException): self.skipTest("Could not retrieve " + url) self.addCleanup(testdata.close) for line in testdata: line = line.strip() if not line or line.startswith('#'): continue seqname, codepoints = line.split(';') codepoints = ''.join(chr(int(cp, 16)) for cp in codepoints.split()) self.assertEqual(unicodedata.lookup(seqname), codepoints) with self.assertRaises(SyntaxError): self.checkletter(seqname, None) with self.assertRaises(KeyError): unicodedata.ucd_3_2_0.lookup(seqname)
def test_named_sequences_full(self): # Check all the named sequences url = ("http://www.pythontest.net/unicode/%s/NamedSequences.txt" % unicodedata.unidata_version) try: testdata = support.open_urlresource(url, encoding="utf-8", check=check_version) except (OSError, HTTPException): self.skipTest("Could not retrieve " + url) self.addCleanup(testdata.close) for line in testdata: line = line.strip() if not line or line.startswith('#'): continue seqname, codepoints = line.split(';') codepoints = ''.join(chr(int(cp, 16)) for cp in codepoints.split()) self.assertEqual(unicodedata.lookup(seqname), codepoints) with self.assertRaises(SyntaxError): self.checkletter(seqname, None) with self.assertRaises(KeyError): unicodedata.ucd_3_2_0.lookup(seqname)
def test_named_sequences_full(self): # Check all the named sequences url = ("http://www.unicode.org/Public/%s/ucd/NamedSequences.txt" % unicodedata.unidata_version) try: testdata = support.open_urlresource(url, encoding="utf-8", check=check_version) except (OSError, HTTPException): self.skipTest("Could not retrieve " + url) self.addCleanup(testdata.close) for line in testdata: line = line.strip() if not line or line.startswith('#'): continue seqname, codepoints = line.split(';') codepoints = ''.join(chr(int(cp, 16)) for cp in codepoints.split()) self.assertEqual(unicodedata.lookup(seqname), codepoints) with self.assertRaises(SyntaxError): self.checkletter(seqname, None) with self.assertRaises(KeyError): unicodedata.ucd_3_2_0.lookup(seqname)
def test_download_failed_HTTPException(self, mock_urlopen): mock_urlopen.side_effect = httplib.HTTPException() fake_request = urllib2.Request('http://fakeurl.com') self.assertRaises( self.glance.RetryableError, self.glance._download_tarball_and_verify, fake_request, 'fake_staging_path')
def __init__(self, *args, **kw): unittest.TestCase.__init__(self, *args, **kw) try: self.open_mapping_file().close() # test it to report the error early except (IOError, HTTPException): self.skipTest("Could not retrieve "+self.mapfileurl)
def test_failure(self): host = bpo.Host(util.FakeServerHost()) failed_response = util.FakeResponse(status=404) fake_session = util.FakeSession(response=failed_response) with self.assertRaises(client.HTTPException): self.run_awaitable(host.check(fake_session, ['brettcannon']))
def test_too_many_headers(self): headers = '\r\n'.join('Header%d: foo' % i for i in range(client._MAXHEADERS + 1)) + '\r\n' text = ('HTTP/1.1 200 OK\r\n' + headers) s = FakeSocket(text) r = client.HTTPResponse(s) self.assertRaisesRegex(client.HTTPException, r"got more than \d+ headers", r.begin)
def connect(self): """Opens a HTTP connection to the RPC server.""" logger.debug("Opening connection to %s:%s", self.url.hostname, self.url.port) try: self.connection = httplib.HTTPConnection(self.url.hostname, self.url.port) self.connection.connect() except (httplib.HTTPException, socket.error) as e: raise errors.InterfaceError('Unable to connect to the specified service', e)
def close(self): """Closes the HTTP connection to the RPC server.""" if self.connection is not None: logger.debug("Closing connection to %s:%s", self.url.hostname, self.url.port) try: self.connection.close() except httplib.HTTPException: logger.warning("Error while closing connection", exc_info=True) self.connection = None
def _post_request(self, body, headers): retry_count = self.max_retries while True: logger.debug("POST %s %r %r", self.url.path, body, headers) try: self.connection.request('POST', self.url.path, body=body, headers=headers) response = self.connection.getresponse() except httplib.HTTPException as e: if retry_count > 0: delay = math.exp(-retry_count) logger.debug("HTTP protocol error, will retry in %s seconds...", delay, exc_info=True) self.close() self.connect() time.sleep(delay) retry_count -= 1 continue raise errors.InterfaceError('RPC request failed', cause=e) else: if response.status == httplib.SERVICE_UNAVAILABLE: if retry_count > 0: delay = math.exp(-retry_count) logger.debug("Service unavailable, will retry in %s seconds...", delay, exc_info=True) time.sleep(delay) retry_count -= 1 continue return response
def setUp(self): try: self.open_mapping_file().close() # test it to report the error early except (OSError, HTTPException): self.skipTest("Could not retrieve "+self.mapfileurl)
def get_live(self, first_comment_created_at=0): comments_collected = self.comments commenter_ids = self.user_config.commenters or [] before_count = len(comments_collected) try: comments_res = self.api.broadcast_comments( self.broadcast['id'], last_comment_ts=first_comment_created_at) comments = comments_res.get('comments', []) first_comment_created_at = ( comments[0]['created_at_utc'] if comments else int(time.time() - 5)) # save comment if it's in list of commenter IDs or if user is verified comments_collected.extend( list(filter( lambda x: (str(x['user_id']) in commenter_ids or x['user']['username'] in commenter_ids or x['user']['is_verified']), comments))) after_count = len(comments_collected) if after_count > before_count: # save intermediately to avoid losing comments due to unexpected errors broadcast = self.broadcast.copy() broadcast.pop('segments', None) # save space broadcast['comments'] = comments_collected with open(self.destination_file, 'w') as outfile: json.dump(broadcast, outfile, indent=2) self.comments = comments_collected except (SSLError, timeout, URLError, HTTPException, SocketError) as e: # Probably transient network error, ignore and continue self.logger.warning('Comment collection error: %s' % e) except ClientError as e: if e.code == 500: self.logger.warning('Comment collection ClientError: %d %s' % (e.code, e.error_response)) elif e.code == 400 and not e.msg: # 400 error fail but no error message self.logger.warning('Comment collection ClientError: %d %s' % (e.code, e.error_response)) else: raise e finally: time.sleep(4) return first_comment_created_at
def osu_get(conn, endpoint, paramsdict=None): '''GETs /api/endpoint?paramsdict&k=args.key from conn. return json object, exits process on api errors''' global osu_treset, osu_ncalls, args sys.stderr.write("%s %s\n" % (endpoint, str(paramsdict))) paramsdict["k"] = args.key path = "/api/%s?%s" % (endpoint, urllib.urlencode(paramsdict)) while True: while True: if time.time() >= osu_treset: osu_ncalls = 0 osu_treset = time.time() + 60 sys.stderr.write("\napi ready\n") if osu_ncalls < 60: break else: sys.stderr.write("waiting for api cooldown...\r") time.sleep(1) try: conn.request("GET", path) osu_ncalls += 1 r = conn.getresponse() raw = "" while True: try: raw += r.read() break except httplib.IncompleteRead as e: raw += e.partial j = json.loads(raw) if "error" in j: sys.stderr.write("%s\n" % j["error"]) sys.exit(1) return j except (httplib.HTTPException, ValueError) as e: sys.stderr.write("%s\n" % (traceback.format_exc())) try: ''' prevents exceptions on next request if the response wasn't previously read due to errors ''' conn.getresponse().read() except httplib.HTTPException: pass time.sleep(5)
def test_main(self): part = None part1_data = {} # Hit the exception early try: testdata = open_urlresource(TESTDATAURL, encoding="utf-8", check=check_version) except (IOError, HTTPException): self.skipTest("Could not retrieve " + TESTDATAURL) self.addCleanup(testdata.close) for line in testdata: if '#' in line: line = line.split('#')[0] line = line.strip() if not line: continue if line.startswith("@Part"): part = line.split()[0] continue try: c1,c2,c3,c4,c5 = [unistr(x) for x in line.split(';')[:-1]] except RangeError: # Skip unsupported characters; # try atleast adding c1 if we are in part1 if part == "@Part1": try: c1 = unistr(line.split(';')[0]) except RangeError: pass else: part1_data[c1] = 1 continue # Perform tests self.assertTrue(c2 == NFC(c1) == NFC(c2) == NFC(c3), line) self.assertTrue(c4 == NFC(c4) == NFC(c5), line) self.assertTrue(c3 == NFD(c1) == NFD(c2) == NFD(c3), line) self.assertTrue(c5 == NFD(c4) == NFD(c5), line) self.assertTrue(c4 == NFKC(c1) == NFKC(c2) == \ NFKC(c3) == NFKC(c4) == NFKC(c5), line) self.assertTrue(c5 == NFKD(c1) == NFKD(c2) == \ NFKD(c3) == NFKD(c4) == NFKD(c5), line) # Record part 1 data if part == "@Part1": part1_data[c1] = 1 # Perform tests for all other data for c in range(sys.maxunicode+1): X = chr(c) if X in part1_data: continue self.assertTrue(X == NFC(X) == NFD(X) == NFKC(X) == NFKD(X), c)
def test_main(self): part = None part1_data = {} # Hit the exception early try: testdata = open_urlresource(TESTDATAURL, encoding="utf-8", check=check_version) except (IOError, HTTPException): self.skipTest("Could not retrieve " + TESTDATAURL) self.addCleanup(testdata.close) for line in testdata: if '#' in line: line = line.split('#')[0] line = line.strip() if not line: continue if line.startswith("@Part"): part = line.split()[0] continue try: c1,c2,c3,c4,c5 = [unistr(x) for x in line.split(';')[:-1]] except RangeError: # Skip unsupported characters; # try at least adding c1 if we are in part1 if part == "@Part1": try: c1 = unistr(line.split(';')[0]) except RangeError: pass else: part1_data[c1] = 1 continue # Perform tests self.assertTrue(c2 == NFC(c1) == NFC(c2) == NFC(c3), line) self.assertTrue(c4 == NFC(c4) == NFC(c5), line) self.assertTrue(c3 == NFD(c1) == NFD(c2) == NFD(c3), line) self.assertTrue(c5 == NFD(c4) == NFD(c5), line) self.assertTrue(c4 == NFKC(c1) == NFKC(c2) == \ NFKC(c3) == NFKC(c4) == NFKC(c5), line) self.assertTrue(c5 == NFKD(c1) == NFKD(c2) == \ NFKD(c3) == NFKD(c4) == NFKD(c5), line) # Record part 1 data if part == "@Part1": part1_data[c1] = 1 # Perform tests for all other data for c in range(sys.maxunicode+1): X = chr(c) if X in part1_data: continue self.assertTrue(X == NFC(X) == NFD(X) == NFKC(X) == NFKD(X), c)
def test_main(self): part = None part1_data = {} # Hit the exception early try: testdata = open_urlresource(TESTDATAURL, encoding="utf-8", check=check_version) except (OSError, HTTPException): self.skipTest("Could not retrieve " + TESTDATAURL) self.addCleanup(testdata.close) for line in testdata: if '#' in line: line = line.split('#')[0] line = line.strip() if not line: continue if line.startswith("@Part"): part = line.split()[0] continue try: c1,c2,c3,c4,c5 = [unistr(x) for x in line.split(';')[:-1]] except RangeError: # Skip unsupported characters; # try at least adding c1 if we are in part1 if part == "@Part1": try: c1 = unistr(line.split(';')[0]) except RangeError: pass else: part1_data[c1] = 1 continue # Perform tests self.assertTrue(c2 == NFC(c1) == NFC(c2) == NFC(c3), line) self.assertTrue(c4 == NFC(c4) == NFC(c5), line) self.assertTrue(c3 == NFD(c1) == NFD(c2) == NFD(c3), line) self.assertTrue(c5 == NFD(c4) == NFD(c5), line) self.assertTrue(c4 == NFKC(c1) == NFKC(c2) == \ NFKC(c3) == NFKC(c4) == NFKC(c5), line) self.assertTrue(c5 == NFKD(c1) == NFKD(c2) == \ NFKD(c3) == NFKD(c4) == NFKD(c5), line) # Record part 1 data if part == "@Part1": part1_data[c1] = 1 # Perform tests for all other data for c in range(sys.maxunicode+1): X = chr(c) if X in part1_data: continue self.assertTrue(X == NFC(X) == NFD(X) == NFKC(X) == NFKD(X), c)