我们从Python开源项目中,提取了以下18个代码示例,用于说明如何使用xmlrpc.client.Binary()。
def test_dump_bytes(self): sample = b"my dog has fleas" self.assertEqual(sample, xmlrpclib.Binary(sample)) for type_ in bytes, bytearray, xmlrpclib.Binary: value = type_(sample) s = xmlrpclib.dumps((value,)) result, m = xmlrpclib.loads(s, use_builtin_types=True) (newvalue,) = result self.assertEqual(newvalue, sample) self.assertIs(type(newvalue), bytes) self.assertIsNone(m) result, m = xmlrpclib.loads(s, use_builtin_types=False) (newvalue,) = result self.assertEqual(newvalue, sample) self.assertIs(type(newvalue), xmlrpclib.Binary) self.assertIsNone(m)
def send_back_binary(self, bin): """Accepts single Binary argument, and unpacks and repacks it to return it.""" data = bin.data print('send_back_binary({!r})'.format(data)) response = Binary(data) return response
def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False): '''Run xmlrpc server''' import umsgpack from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication try: from xmlrpc.client import Binary except ImportError: from xmlrpclib import Binary application = WSGIXMLRPCApplication() application.register_function(self.quit, '_quit') application.register_function(self.size) def sync_fetch(task): result = self.sync_fetch(task) result = Binary(umsgpack.packb(result)) return result application.register_function(sync_fetch, 'fetch') def dump_counter(_time, _type): return self._cnt[_time].to_dict(_type) application.register_function(dump_counter, 'counter') import tornado.wsgi import tornado.ioloop import tornado.httpserver container = tornado.wsgi.WSGIContainer(application) self.xmlrpc_ioloop = tornado.ioloop.IOLoop() self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop) self.xmlrpc_server.listen(port=port, address=bind) logger.info('fetcher.xmlrpc listening on %s:%s', bind, port) self.xmlrpc_ioloop.start()
def test_default(self): t = xmlrpclib.Binary() self.assertEqual(str(t), '')
def test_string(self): d = b'\x01\x02\x03abc123\xff\xfe' t = xmlrpclib.Binary(d) self.assertEqual(str(t), str(d, "latin-1"))
def test_decode(self): d = b'\x01\x02\x03abc123\xff\xfe' de = base64.encodebytes(d) t1 = xmlrpclib.Binary() t1.decode(de) self.assertEqual(str(t1), str(d, "latin-1")) t2 = xmlrpclib._binary(de) self.assertEqual(str(t2), str(d, "latin-1"))
def retrieve(self,stub,jobNumber,password): # NEOS should return results as uu-encoded xmlrpclib.Binary data results = self.neos.getFinalResults(jobNumber,password) if isinstance(results,xmlrpclib.Binary): results = results.data #decode results to kestrel.sol # Well try to anyway, any errors will result in error strings in .sol file # instead of solution. if stub[-4:] == '.sol': stub = stub[:-4] solfile = open(stub + ".sol","wb") solfile.write(results) solfile.close()