我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用twisted.internet.error.DNSLookupError()。
def error_back(self, failure): #request????error_back #???????request????????????????? #???????dns??????? #???????????redis??????? if failure.check(HttpError): #?????200????? response = failure.value.response self.logger.error('[%s] GET [%d]', response.url, response.status) if failure.check(DNSLookupError): #dns???? request = failure.request self.logger.error('[%s] DNSLookupError', request.url) if failure.check(TimeoutError, TCPTimedOutError): #???? request = failure.request self.logger.error('[%s] TimeoutError', request.url)
def connect(self): """Start connection to remote server.""" self.factory.doStart() self.factory.startedConnecting(self) if not self.servers: if self.domain is None: self.connectionFailed(error.DNSLookupError("Domain is not defined.")) return d = client.lookupService('_%s._%s.%s' % (self.service, self.protocol, self.domain)) d.addCallback(self._cbGotServers) d.addCallback(lambda x, self=self: self._reallyConnect()) d.addErrback(self.connectionFailed) elif self.connector is None: self._reallyConnect() else: self.connector.connect()
def _ebMX(self, failure, domain): from twisted.names import error, dns if self.fallbackToDomain: failure.trap(error.DNSNameError) log.msg("MX lookup failed; attempting to use hostname (%s) directly" % (domain,)) # Alright, I admit, this is a bit icky. d = self.resolver.getHostByName(domain) def cbResolved(addr): return dns.Record_MX(name=addr) def ebResolved(err): err.trap(error.DNSNameError) raise DNSLookupError() d.addCallbacks(cbResolved, ebResolved) return d elif failure.check(error.DNSNameError): raise IOError("No MX found for %r" % (domain,)) return failure
def test_socks5TorStyleFailedResolution(self): """ A Tor-style name resolution when resolution fails. """ self.assert_handshake() self.deliver_data( self.sock, struct.pack('!BBBB', 5, 0xf0, 0, 3) + struct.pack( "!B", len(b"unknown") ) + b"unknown" + struct.pack("!H", 3401) ) reply = self.sock.transport.value() self.sock.transport.clear() self.assertEqual(reply, struct.pack('!BBBB', 5, 4, 0, 0)) self.assertTrue(self.sock.transport.stringTCPTransport_closing) self.assertEqual(len(self.flushLoggedErrors(DNSLookupError)), 1)
def errback(self, failure): if failure.check(DNSLookupError): host = urlparse(failure.request.url).hostname logger.error('DNSLookupError on host[%s]', host) elif failure.check(TimeoutError, TCPTimedOutError): request = failure.request logger.error('TimeoutError on url[%s]', request.url) elif failure.check(HttpError): response = failure.value.response logger.error('HttpError on url[%s, status=%d]', response.url, response.status) else: logger.error('SpiderError on url[%s, error=%s', failure.request.url, repr(failure))
def hostname(self, request: IRequest, name: str) -> KleinRenderable: """ Hostname lookup resource. Performs a lookup on the given name and responds with the resulting address. :param request: The request to respond to. :param name: A host name. """ try: address = await getHostByName(name) except DNSNameError: request.setResponseCode(http.NOT_FOUND) return "no such host" except DNSLookupError: request.setResponseCode(http.NOT_FOUND) return "lookup error" return address
def test_hostname_lookup_error(self) -> None: """ :meth:`.dns.Application.hostname` responds with a :const:`twisted.web.http.NOT_FOUND` error if there is a DNS lookup error. """ def getHostByName(*args, **kwargs) -> Deferred: return fail(DNSLookupError()) self.patch(dns, "getHostByName", getHostByName) self.assertResponse( b"/gethostbyname/foo.example.com", response_data=b"lookup error", response_code=http.NOT_FOUND, )
def errback_httpbin(self, failure): # log all failures self.logger.error(repr(failure)) # in case you want to do something special for some errors, # you may need the failure's type: if failure.check(HttpError): # these exceptions come from HttpError spider middleware # you can get the non-200 response response = failure.value.response self.logger.error('HttpError on %s', response.url) elif failure.check(DNSLookupError): # this is the original request request = failure.request self.logger.error('DNSLookupError on %s', request.url) elif failure.check(TimeoutError, TCPTimedOutError): request = failure.request self.logger.error('TimeoutError on %s', request.url)
def test_failure(self): """ L{SimpleResolverComplexifier} translates a known error result from L{IResolverSimple.resolveHostName} into an empty result. """ simple = SillyResolverSimple() complex = SimpleResolverComplexifier(simple) receiver = ResultHolder(self) self.assertEqual(receiver._started, False) complex.resolveHostName(receiver, u"example.com") self.assertEqual(receiver._started, True) self.assertEqual(receiver._ended, False) self.assertEqual(receiver._addresses, []) simple._requests[0].errback(DNSLookupError("nope")) self.assertEqual(receiver._ended, True) self.assertEqual(receiver._addresses, [])
def test_simplifier(self): """ L{ComplexResolverSimplifier} translates an L{IHostnameResolver} into an L{IResolverSimple} for applications that still expect the old interfaces to be in place. """ self.pool, self.doThreadWork = deterministicPool() self.reactor, self.doReactorWork = deterministicReactorThreads() self.getter = FakeAddrInfoGetter() self.resolver = GAIResolver(self.reactor, lambda: self.pool, self.getter.getaddrinfo) simpleResolver = ComplexResolverSimplifier(self.resolver) self.getter.addResultForHost('example.com', ('192.168.3.4', 4321)) success = simpleResolver.getHostByName('example.com') failure = simpleResolver.getHostByName('nx.example.com') self.doThreadWork() self.doReactorWork() self.doThreadWork() self.doReactorWork() self.assertEqual(self.failureResultOf(failure).type, DNSLookupError) self.assertEqual(self.successResultOf(success), '192.168.3.4')
def _cbGotServers(self, result): answers, auth, add = result if len(answers) == 1 and answers[0].type == dns.SRV \ and answers[0].payload \ and answers[0].payload.target == dns.Name(b'.'): # decidedly not available raise error.DNSLookupError("Service %s not available for domain %s." % (repr(self.service), repr(self.domain))) self.servers = [] self.orderedServers = [] for a in answers: if a.type != dns.SRV or not a.payload: continue self.orderedServers.append(a.payload)
def errback_httpbin(self, failure): self.logger.error(repr(failure)) #if isinstance(failure.value, HttpError): if failure.check(HttpError): # you can get the response response = failure.value.response self.logger.error('HttpError on %s', response.url) #elif isinstance(failure.value, DNSLookupError): elif failure.check(DNSLookupError): # this is the original request request = failure.request self.logger.error('DNSLookupError on %s', request.url) #elif isinstance(failure.value, TimeoutError): elif failure.check(TimeoutError): request = failure.request self.logger.error('TimeoutError on %s', request.url)
def _fail(self, name, err): err = error.DNSLookupError("address %r not found: %s" % (name, err)) return failure.Failure(err)
def getHostByName(self, name, timeout = (1, 3, 11, 45)): try: address = socket.gethostbyname(name) except socket.error: msg = "address %r not found" % (name,) err = error.DNSLookupError(msg) return defer.fail(err) else: return defer.succeed(address)
def testDNSFailure(self): client = Client() d = client.startedDeferred = defer.Deferred() # if this domain exists, shoot your sysadmin reactor.connectUDP("xxxxxxxxx.zzzzzzzzz.yyyyy.", 8888, client) def didNotConnect(ign): self.assertEquals(client.stopped, 0) self.assertEquals(client.started, 0) d = self.assertFailure(d, error.DNSLookupError) d.addCallback(didNotConnect) return d
def _handle_error(self, failure): """Handle errors in connecting or resolving.""" log.err(failure) error_code = 1 if failure.check(DNSLookupError): error_code = 4 if failure.check(ConnectionRefusedError): error_code = 5 self._write_response(error_code, "0.0.0.0", 0)
def resolve(self, hostname): """ Resolve a hostname by looking it up in the C{names} dictionary. """ try: return defer.succeed(self.names[hostname]) except KeyError: return defer.fail( DNSLookupError( "FakeResolverReactor couldn't find {}".format(hostname) ) )
def process_exception(self, request, exception, spider): #???????????????????request?????? #??????????????? #???????????????????????????????????? if isinstance(exception, DNSLookupError): self.crawler.signals.send_catch_log(dnslookuperror, spider=spider) if isinstance(exception, (TimeoutError, TCPTimedOutError)): self.crawler.signals.send_catch_log(timeouterror, spider=spider) return request
def test_failure(self): """ L{ThreadedResolver.getHostByName} returns a L{Deferred} which fires a L{Failure} if the call to L{socket.gethostbyname} raises an exception. """ timeout = 30 reactor = FakeReactor() self.addCleanup(reactor._stop) def fakeGetHostByName(name): raise IOError("ENOBUFS (this is a funny joke)") self.patch(socket, 'gethostbyname', fakeGetHostByName) failedWith = [] resolver = ThreadedResolver(reactor) d = resolver.getHostByName("some.name", (timeout,)) self.assertFailure(d, DNSLookupError) d.addCallback(failedWith.append) reactor._runThreadCalls() self.assertEqual(len(failedWith), 1) # Make sure that any timeout-related stuff gets cleaned up. reactor._clock.advance(timeout + 1) self.assertEqual(reactor._clock.calls, [])
def test_timeout(self): """ If L{socket.gethostbyname} does not complete before the specified timeout elapsed, the L{Deferred} returned by L{ThreadedResolver.getHostByBame} fails with L{DNSLookupError}. """ timeout = 10 reactor = FakeReactor() self.addCleanup(reactor._stop) result = Queue() def fakeGetHostByName(name): raise result.get() self.patch(socket, 'gethostbyname', fakeGetHostByName) failedWith = [] resolver = ThreadedResolver(reactor) d = resolver.getHostByName("some.name", (timeout,)) self.assertFailure(d, DNSLookupError) d.addCallback(failedWith.append) reactor._clock.advance(timeout - 1) self.assertEqual(failedWith, []) reactor._clock.advance(1) self.assertEqual(len(failedWith), 1) # Eventually the socket.gethostbyname does finish - in this case, with # an exception. Nobody cares, though. result.put(IOError("The I/O was errorful"))
def test_failure(self): """ If no address is returned by GAI for a hostname, the connection attempt fails with L{error.DNSLookupError}. """ endpoint = endpoints.HostnameEndpoint( deterministicResolvingReactor(Clock(), []), b"example.com", 80 ) clientFactory = object() dConnect = endpoint.connect(clientFactory) exc = self.failureResultOf(dConnect, error.DNSLookupError) self.assertIn("example.com", str(exc))
def resolutionComplete(self): """ See L{IResolutionReceiver.resolutionComplete} """ if self._resolved: return self._deferred.errback(DNSLookupError(self._resolution.name))
def test_SRVNoService(self): """ Test that connecting fails when no service is present. """ payload = dns.Record_SRV(port=5269, target=b'.', ttl=60) client.theResolver.results = [dns.RRHeader(name='example.org', type=dns.SRV, cls=dns.IN, ttl=60, payload=payload)] self.connector.connect() self.assertIsNotNone(self.factory.reason) self.factory.reason.trap(DNSLookupError) self.assertEqual(self.reactor.tcpClients, [])
def _cbRecords(self, records, name, effort): (ans, auth, add) = records result = extractRecord(self, dns.Name(name), ans + auth + add, effort) if not result: raise error.DNSLookupError(name) return result
def testSimpleFailureWithFallback(self): return self.assertFailure(self.mx.getMX('test.domain'), DNSLookupError)
def test_do_failing_request(self): http_test = httpt.HTTPTest() http_test.localOptions['socksproxy'] = None http_test._setUp() yield self.assertFailure(http_test.doRequest('http://invaliddomain/'), DNSLookupError) assert http_test.report['requests'][0]['failure'] == 'dns_lookup_error'
def test_hostname_does_not_resolve(self): """ Specifying a hostname which cannot be resolved to an IP address will result in an ``DNSLookupError``. """ with ExpectedException(DNSLookupError, "DNS lookup failed: no results " "for hostname lookup: doesnotresolve."): self._authorized_request( token="test", headers=Headers({}), kubernetes_host=b"doesnotresolve" )
def _ebMX(self, failure, domain): """ Attempt to use the name of the domain directly when mail exchange lookup fails. @type failure: L{Failure} @param failure: The reason for the lookup failure. @type domain: L{bytes} @param domain: The domain name. @rtype: L{Record_MX <twisted.names.dns.Record_MX>} or L{Failure} @return: An MX record for the domain or a failure if the fallback to domain option is not in effect and an error, other than not finding an MX record, occurred during lookup. @raise IOError: When no MX record could be found and the fallback to domain option is not in effect. @raise DNSLookupError: When no MX record could be found and the fallback to domain option is in effect but no address for the domain could be found. """ from twisted.names import error, dns if self.fallbackToDomain: failure.trap(error.DNSNameError) log.msg("MX lookup failed; attempting to use hostname (%s) directly" % (domain,)) # Alright, I admit, this is a bit icky. d = self.resolver.getHostByName(domain) def cbResolved(addr): return dns.Record_MX(name=addr) def ebResolved(err): err.trap(error.DNSNameError) raise DNSLookupError() d.addCallbacks(cbResolved, ebResolved) return d elif failure.check(error.DNSNameError): raise IOError("No MX found for %r" % (domain,)) return failure