我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用twisted.python.failure.value()。
def makeTodo(value): """ Return a L{Todo} object built from C{value}. If C{value} is a string, return a Todo that expects any exception with C{value} as a reason. If C{value} is a tuple, the second element is used as the reason and the first element as the excepted error(s). @param value: A string or a tuple of C{(errors, reason)}, where C{errors} is either a single exception class or an iterable of exception classes. @return: A L{Todo} object. """ if isinstance(value, str): return Todo(reason=value) if isinstance(value, tuple): errors, reason = value try: errors = list(errors) except TypeError: errors = [errors] return Todo(reason=reason, errors=errors)
def failUnlessFailure(self, deferred, *expectedFailures): """Assert that C{deferred} will errback with one of C{expectedFailures}. Returns the original Deferred with callbacks added. You will need to return this Deferred from your test case. """ def _cb(ignore): raise self.failureException( "did not catch an error, instead got %r" % (ignore,)) def _eb(failure): if failure.check(*expectedFailures): return failure.value else: output = ('\nExpected: %r\nGot:\n%s' % (expectedFailures, str(failure))) raise self.failureException(output) return deferred.addCallbacks(_cb, _eb)
def search(self, *queries, **kwarg): """Search messages in the currently selected mailbox This command is allowed in the Selected state. Any non-zero number of queries are accepted by this method, as returned by the C{Query}, C{Or}, and C{Not} functions. One keyword argument is accepted: if uid is passed in with a non-zero value, the server is asked to return message UIDs instead of message sequence numbers. @rtype: C{Deferred} @return: A deferred whose callback will be invoked with a list of all the message sequence numbers return by the search, or whose errback will be invoked if there is an error. """ if kwarg.get('uid'): cmd = 'UID SEARCH' else: cmd = 'SEARCH' args = ' '.join(queries) d = self.sendCommand(Command(cmd, args, wantResponse=(cmd,))) d.addCallback(self.__cbSearch) return d
def create(pathspec): """Create a new mailbox from the given hierarchical name. @type pathspec: C{str} @param pathspec: The full hierarchical name of a new mailbox to create. If any of the inferior hierarchical names to this one do not exist, they are created as well. @rtype: C{Deferred} or C{bool} @return: A true value if the creation succeeds, or a deferred whose callback will be invoked when the creation succeeds. @raise MailboxException: Raised if this mailbox cannot be added. This may also be raised asynchronously, if a C{Deferred} is returned. """
def rename(oldname, newname): """Rename a mailbox @type oldname: C{str} @param oldname: The current name of the mailbox to rename. @type newname: C{str} @param newname: The new name to associate with the mailbox. @rtype: C{Deferred} or C{bool} @return: A true value if the mailbox is successfully renamed, or a C{Deferred} whose callback will be invoked when the rename operation is completed. @raise MailboxException: Raised if this mailbox cannot be renamed. This may also be raised asynchronously, if a C{Deferred} is returned. """
def requestStatus(names): """Return status information about this mailbox. Mailboxes which do not intend to do any special processing to generate the return value, C{statusRequestHelper} can be used to build the dictionary by calling the other interface methods which return the data for each name. @type names: Any iterable @param names: The status names to return information regarding. The possible values for each name are: MESSAGES, RECENT, UIDNEXT, UIDVALIDITY, UNSEEN. @rtype: C{dict} or C{Deferred} @return: A dictionary containing status information about the requested names is returned. If the process of looking this information up would be costly, a deferred whose callback will eventually be passed this dictionary is returned instead. """
def testNonExistentDelete(self): def login(): return self.client.login('testuser', 'password-test') def delete(): return self.client.delete('delete/me') def deleteFailed(failure): self.failure = failure self.failure = None d1 = self.connected.addCallback(strip(login)) d1.addCallback(strip(delete)).addErrback(deleteFailed) d1.addCallbacks(self._cbStopClient, self._ebGeneral) d2 = self.loopback() d = defer.gatherResults([d1, d2]) d.addCallback(lambda _: self.assertEquals(str(self.failure.value), 'No such mailbox')) return d
def testPathelogicalScatteringOfLiterals(self): self.server.checker.addUser('testuser', 'password-test') transport = StringTransport() self.server.makeConnection(transport) transport.clear() self.server.dataReceived("01 LOGIN {8}\r\n") self.assertEquals(transport.value(), "+ Ready for 8 octets of text\r\n") transport.clear() self.server.dataReceived("testuser {13}\r\n") self.assertEquals(transport.value(), "+ Ready for 13 octets of text\r\n") transport.clear() self.server.dataReceived("password-test\r\n") self.assertEquals(transport.value(), "01 OK LOGIN succeeded\r\n") self.assertEquals(self.server.state, 'auth') self.server.connectionLost(error.ConnectionDone("Connection done."))
def _on_no_port_mapping_received(self, failure, mappings): """ Called when we have no more port mappings to retreive, or an error occured while retreiving them. Either we have a "SpecifiedArrayIndexInvalid" SOAP error, and that's ok, it just means we have finished. If it returns some other error, then we fail with an UPnPError. @param mappings: the already retreived mappings @param failure: the failure @return: The existing mappings as defined in L{get_port_mappings} @raise UPnPError: When we got any other error than "SpecifiedArrayIndexInvalid" """ logging.debug("_on_no_port_mapping_received: %s", failure) err = failure.value message = err.args[0]["UPnPError"]["errorDescription"] if "SpecifiedArrayIndexInvalid" == message: return mappings else: return failure
def _format_error(failure): """ Logs the failure backtrace, and returns a json containing the error message. If a exception declares the 'expected' attribute as True, we will not print a full traceback. instead, we will dispatch the ``exception`` message attribute as the ``error`` field in the response json. """ expected = getattr(failure.value, 'expected', False) if not expected: log.error('[DISPATCHER] Unexpected error!') log.error('{0!r}'.format(failure.value)) log.error(failure.getTraceback()) # if needed, we could add here the exception type as an extra field return json.dumps({'error': failure.value.message, 'result': None})
def test_cancelDeferredListWithFireOnOneErrback(self): """ When cancelling an unfired L{defer.DeferredList} with the flag C{fireOnOneErrback} set, cancel every L{defer.Deferred} in the list. """ deferredOne = defer.Deferred() deferredTwo = defer.Deferred() deferredList = defer.DeferredList([deferredOne, deferredTwo], fireOnOneErrback=True) deferredList.cancel() self.failureResultOf(deferredOne, defer.CancelledError) self.failureResultOf(deferredTwo, defer.CancelledError) deferredListFailure = self.failureResultOf(deferredList, defer.FirstError) firstError = deferredListFailure.value self.assertTrue(firstError.subFailure.check(defer.CancelledError))
def test_errbackWithNoArgsNoDebug(self): """ C{Deferred.errback()} creates a failure from the current Python exception. When Deferred.debug is not set no globals or locals are captured in that failure. """ defer.setDebugging(False) d = defer.Deferred() l = [] exc = GenericError("Bang") try: raise exc except: d.errback() d.addErrback(l.append) fail = l[0] self.assertEqual(fail.value, exc) localz, globalz = fail.frames[0][-2:] self.assertEqual([], localz) self.assertEqual([], globalz)
def test_errbackWithNoArgs(self): """ C{Deferred.errback()} creates a failure from the current Python exception. When Deferred.debug is set globals and locals are captured in that failure. """ defer.setDebugging(True) d = defer.Deferred() l = [] exc = GenericError("Bang") try: raise exc except: d.errback() d.addErrback(l.append) fail = l[0] self.assertEqual(fail.value, exc) localz, globalz = fail.frames[0][-2:] self.assertNotEqual([], localz) self.assertNotEqual([], globalz)
def test_timedOutCustom(self): """ If a custom C{onTimeoutCancel] function is provided, the L{defer.Deferred} returns the custom function's return value if the L{defer.Deferred} times out before callbacking or errbacking. The custom C{onTimeoutCancel} function can return a result instead of a failure. """ clock = Clock() d = defer.Deferred() d.addTimeout(10, clock, onTimeoutCancel=_overrideFunc) self.assertNoResult(d) clock.advance(15) self.assertEqual("OVERRIDDEN", self.successResultOf(d))
def test_timedOutProvidedCancelFailure(self): """ If a cancellation function is provided when the L{defer.Deferred} is initialized, the L{defer.Deferred} returns the cancellation value's non-L{CanceledError} failure when the L{defer.Deferred} times out. """ clock = Clock() error = ValueError('what!') d = defer.Deferred(lambda c: c.errback(error)) d.addTimeout(10, clock) self.assertNoResult(d) clock.advance(15) f = self.failureResultOf(d, ValueError) self.assertIs(f.value, error)
def test_errbackAddedBeforeTimeoutSuppressesCancellation(self): """ An errback added before a timeout is added errbacks with a L{defer.CancelledError} when the timeout fires. If the errback suppresses the L{defer.CancelledError}, the deferred successfully completes. """ clock = Clock() d = defer.Deferred() dErrbacked = [None] def errback(f): dErrbacked[0] = f f.trap(defer.CancelledError) d.addErrback(errback) d.addTimeout(10, clock) clock.advance(15) self.assertIsInstance(dErrbacked[0], failure.Failure) self.assertIsInstance(dErrbacked[0].value, defer.CancelledError) self.successResultOf(d)
def test_errbackAddedBeforeTimeoutCustom(self): """ An errback added before a timeout is added with a custom timeout function errbacks with a L{defer.CancelledError} when the timeout fires. The timeout function runs if the errback returns the L{defer.CancelledError}. """ clock = Clock() d = defer.Deferred() dErrbacked = [None] def errback(f): dErrbacked[0] = f return f d.addErrback(errback) d.addTimeout(10, clock, _overrideFunc) clock.advance(15) self.assertIsInstance(dErrbacked[0], failure.Failure) self.assertIsInstance(dErrbacked[0].value, defer.CancelledError) self.assertEqual("OVERRIDDEN", self.successResultOf(d))
def test_errbackAddedBeforeTimeoutSuppressesCancellationCustom(self): """ An errback added before a timeout is added with a custom timeout function errbacks with a L{defer.CancelledError} when the timeout fires. The timeout function runs if the errback suppresses the L{defer.CancelledError}. """ clock = Clock() d = defer.Deferred() dErrbacked = [None] def errback(f): dErrbacked[0] = f d.addErrback(errback) d.addTimeout(10, clock, _overrideFunc) clock.advance(15) self.assertIsInstance(dErrbacked[0], failure.Failure) self.assertIsInstance(dErrbacked[0].value, defer.CancelledError) self.assertEqual("OVERRIDDEN", self.successResultOf(d))
def getTimeout(self): """ Returns the timeout value set on this test. Checks on the instance first, then the class, then the module, then packages. As soon as it finds something with a C{timeout} attribute, returns that. Returns L{util.DEFAULT_TIMEOUT_DURATION} if it cannot find anything. See L{TestCase} docstring for more details. """ timeout = util.acquireAttribute(self._parents, 'timeout', util.DEFAULT_TIMEOUT_DURATION) try: return float(timeout) except (ValueError, TypeError): # XXX -- this is here because sometimes people will have methods # called 'timeout', or set timeout to 'orange', or something # Particularly, test_news.NewsTestCase and ReactorCoreTestCase # both do this. warnings.warn("'timeout' attribute needs to be a number.", category=DeprecationWarning) return util.DEFAULT_TIMEOUT_DURATION
def lookup(self, serverAddress, clientAddress): """ Lookup user information about the specified address pair. Return value should be a two-tuple of system name and username. Acceptable values for the system name may be found online at:: U{http://www.iana.org/assignments/operating-system-names} This method may also raise any IdentError subclass (or IdentError itself) to indicate user information will not be provided for the given query. A Deferred may also be returned. @param serverAddress: A two-tuple representing the server endpoint of the address being queried. The first element is a string holding a dotted-quad IP address. The second element is an integer representing the port. @param clientAddress: Like I{serverAddress}, but represents the client endpoint of the address being queried. """ raise IdentError()
def generate(self, request, node): if self.data: try: child = microdom.parseString(self.data) except Exception, e: log.msg("Error parsing return value, probably invalid xml:", e) child = request.d.createTextNode(self.data) else: child = request.d.createTextNode(self.data) nodeMutator = NodeNodeMutator(child) return nodeMutator.generate(request, node)
def renderFailure(self, failure, request): try: xml = request.d.toxml() except: xml = "" # if not hasattr(request, 'channel'): # log.msg("The request got away from me before I could render an error page.") # log.err(failure) # return failure if not self.failed: self.failed = 1 if failure: request.write("<html><head><title>%s: %s</title></head><body>\n" % (html.escape(str(failure.type)), html.escape(str(failure.value)))) else: request.write("<html><head><title>Failure!</title></head><body>\n") utils.renderFailure(failure, request) request.write("<h3>Here is the partially processed DOM:</h3>") request.write("\n<pre>\n") request.write(html.escape(xml)) request.write("\n</pre>\n") request.write("</body></html>") request.finish() return failure ########################################## # Deprecation zone # Wear a hard hat ########################################## # DOMView is now deprecated since the functionality was merged into domtemplate
def _ebRender(self, failure): if isinstance(failure.value, Fault): return failure.value log.err(failure) return Fault(self.FAILURE, "error")
def testDeferredList(self): defr1 = defer.Deferred() defr2 = defer.Deferred() defr3 = defer.Deferred() dl = defer.DeferredList([defr1, defr2, defr3]) result = [] def cb(resultList, result=result): result.extend(resultList) def catch(err): return None dl.addCallbacks(cb, cb) defr1.callback("1") defr2.addErrback(catch) # "catch" is added to eat the GenericError that will be passed on by # the DeferredList's callback on defr2. If left unhandled, the # Failure object would cause a log.err() warning about "Unhandled # error in Deferred". Twisted's pyunit watches for log.err calls and # treats them as failures. So "catch" must eat the error to prevent # it from flunking the test. defr2.errback(GenericError("2")) defr3.callback("3") self.failUnlessEqual([result[0], #result[1][1] is now a Failure instead of an Exception (result[1][0], str(result[1][1].value)), result[2]], [(defer.SUCCESS, "1"), (defer.FAILURE, "2"), (defer.SUCCESS, "3")])
def testDeferredListDontConsumeErrors(self): d1 = defer.Deferred() dl = defer.DeferredList([d1]) errorTrap = [] d1.addErrback(errorTrap.append) result = [] dl.addCallback(result.append) d1.errback(GenericError('Bang')) self.failUnlessEqual('Bang', errorTrap[0].value.args[0]) self.failUnlessEqual(1, len(result)) self.failUnlessEqual('Bang', result[0][0][1].value.args[0])
def testDeferredListConsumeErrors(self): d1 = defer.Deferred() dl = defer.DeferredList([d1], consumeErrors=True) errorTrap = [] d1.addErrback(errorTrap.append) result = [] dl.addCallback(result.append) d1.errback(GenericError('Bang')) self.failUnlessEqual([], errorTrap) self.failUnlessEqual(1, len(result)) self.failUnlessEqual('Bang', result[0][0][1].value.args[0])
def testImmediateFailure(self): l = [] d = defer.fail(GenericError("fail")) d.addErrback(l.append) self.assertEquals(str(l[0].value), "fail")
def testPausedFailure(self): l = [] d = defer.fail(GenericError("fail")) d.pause() d.addErrback(l.append) self.assertEquals(l, []) d.unpause() self.assertEquals(str(l[0].value), "fail")
def testCallbackErrors(self): l = [] d = defer.Deferred().addCallback(lambda _: 1/0).addErrback(l.append) d.callback(1) self.assert_(isinstance(l[0].value, ZeroDivisionError)) l = [] d = defer.Deferred().addCallback( lambda _: failure.Failure(ZeroDivisionError())).addErrback(l.append) d.callback(1) self.assert_(isinstance(l[0].value, ZeroDivisionError))
def failUnlessIn(self, containee, container, msg=None): """fail the test if C{containee} is not found in C{container} @param containee: the value that should be in C{container} @param container: a sequence type, or in the case of a mapping type, will follow semantics of 'if key in dict.keys()' @param msg: if msg is None, then the failure message will be '%r not in %r' % (first, second) """ if containee not in container: raise self.failureException(msg or "%r not in %r" % (containee, container)) return containee
def failIfIn(self, containee, container, msg=None): """fail the test if C{containee} is found in C{container} @param containee: the value that should not be in C{container} @param container: a sequence type, or in the case of a mapping type, will follow semantics of 'if key in dict.keys()' @param msg: if msg is None, then the failure message will be '%r in %r' % (first, second) """ if containee in container: raise self.failureException(msg or "%r in %r" % (containee, container)) return containee
def _getReason(self, f): if len(f.value.args) > 0: reason = f.value.args[0] else: warnings.warn(("Do not raise unittest.SkipTest with no " "arguments! Give a reason for skipping tests!"), stacklevel=2) reason = f return reason
def mktemp(self): """Returns a unique name that may be used as either a temporary directory or filename. @note: you must call os.mkdir on the value returned from this method if you wish to use it as a directory! """ MAX_FILENAME = 32 # some platforms limit lengths of filenames base = os.path.join(self.__class__.__module__[:MAX_FILENAME], self.__class__.__name__[:MAX_FILENAME], self._testMethodName[:MAX_FILENAME]) if not os.path.exists(base): os.makedirs(base) dirname = tempfile.mkdtemp('', '', base) return os.path.join(dirname, 'temp')
def _exc_info(self, err): if isinstance(err, failure.Failure): # Unwrap the Failure into a exc_info tuple. # XXX: if err.tb is a real traceback and not stringified, we should # use that. err = (err.type, err.value, None) return err
def test_pyunitAddError(self): # pyunit passes an exc_info tuple directly to addError try: raise RuntimeError('foo') except RuntimeError, excValue: self.result.addError(self, sys.exc_info()) failure = self.result.errors[0][1] self.assertEqual(excValue, failure.value) self.assertEqual(RuntimeError, failure.type)
def test_pyunitAddFailure(self): # pyunit passes an exc_info tuple directly to addFailure try: raise self.failureException('foo') except self.failureException, excValue: self.result.addFailure(self, sys.exc_info()) failure = self.result.failures[0][1] self.assertEqual(excValue, failure.value) self.assertEqual(self.failureException, failure.type)
def _ebLookup(self, failure, sport, cport): if failure.check(IdentError): self.sendLine('%d, %d : ERROR : %s' % (sport, cport, failure.value)) else: log.err(failure) self.sendLine('%d, %d : ERROR : %s' % (sport, cport, IdentError(failure.value)))