我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用copyreg.pickle()。
def test_long(self): for proto in protocols: # 256 bytes is where LONG4 begins. for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257: nbase = 1 << nbits for npos in nbase-1, nbase, nbase+1: for n in npos, -npos: pickle = self.dumps(n, proto) got = self.loads(pickle) self.assertEqual(n, got) # Try a monster. This is quadratic-time in protos 0 & 1, so don't # bother with those. nbase = int("deadbeeffeedface", 16) nbase += nbase << 1000000 for n in nbase, -nbase: p = self.dumps(n, 2) got = self.loads(p) self.assertEqual(n, got)
def test_proto(self): build_none = pickle.NONE + pickle.STOP for proto in protocols: expected = build_none if proto >= 2: expected = pickle.PROTO + bytes([proto]) + expected p = self.dumps(None, proto) self.assertEqual(p, expected) oob = protocols[-1] + 1 # a future protocol badpickle = pickle.PROTO + bytes([oob]) + build_none try: self.loads(badpickle) except ValueError as detail: self.assertTrue(str(detail).startswith( "unsupported pickle protocol")) else: self.fail("expected bad protocol number to raise ValueError")
def test_singletons(self): # Map (proto, singleton) to expected opcode. expected_opcode = {(0, None): pickle.NONE, (1, None): pickle.NONE, (2, None): pickle.NONE, (3, None): pickle.NONE, (0, True): pickle.INT, (1, True): pickle.INT, (2, True): pickle.NEWTRUE, (3, True): pickle.NEWTRUE, (0, False): pickle.INT, (1, False): pickle.INT, (2, False): pickle.NEWFALSE, (3, False): pickle.NEWFALSE, } for proto in protocols: for x in None, False, True: s = self.dumps(x, proto) y = self.loads(s) self.assertTrue(x is y, (proto, x, s, y)) expected = expected_opcode[proto, x] self.assertEqual(opcode_in_pickle(expected, s), True)
def test_list_chunking(self): n = 10 # too small to chunk x = list(range(n)) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) num_appends = count_opcode(pickle.APPENDS, s) self.assertEqual(num_appends, proto > 0) n = 2500 # expect at least two chunks when proto > 0 x = list(range(n)) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) num_appends = count_opcode(pickle.APPENDS, s) if proto == 0: self.assertEqual(num_appends, 0) else: self.assertTrue(num_appends >= 2)
def test_dict_chunking(self): n = 10 # too small to chunk x = dict.fromkeys(range(n)) for proto in protocols: s = self.dumps(x, proto) self.assertIsInstance(s, bytes_types) y = self.loads(s) self.assertEqual(x, y) num_setitems = count_opcode(pickle.SETITEMS, s) self.assertEqual(num_setitems, proto > 0) n = 2500 # expect at least two chunks when proto > 0 x = dict.fromkeys(range(n)) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) num_setitems = count_opcode(pickle.SETITEMS, s) if proto == 0: self.assertEqual(num_setitems, 0) else: self.assertTrue(num_setitems >= 2)
def test_reduce_bad_iterator(self): # Issue4176: crash when 4th and 5th items of __reduce__() # are not iterators class C(object): def __reduce__(self): # 4th item is not an iterator return list, (), None, [], None class D(object): def __reduce__(self): # 5th item is not an iterator return dict, (), None, None, [] # Protocol 0 is less strict and also accept iterables. for proto in protocols: try: self.dumps(C(), proto) except (pickle.PickleError): pass try: self.dumps(D(), proto) except (pickle.PickleError): pass
def putmessage(self, message): self.debug("putmessage:%d:" % message[0]) try: s = pickle.dumps(message) except pickle.PicklingError: print("Cannot pickle:", repr(message), file=sys.__stderr__) raise s = struct.pack("<i", len(s)) + s while len(s) > 0: try: r, w, x = select.select([], [self.sock], []) n = self.sock.send(s[:BUFSIZE]) except (AttributeError, TypeError): raise IOError("socket no longer exists") except socket.error: raise else: s = s[n:]
def test_badly_quoted_string(self): # Issue #17710 badpickles = [b"S'\n.", b'S"\n.', b'S\' \n.', b'S" \n.', b'S\'"\n.', b'S"\'\n.', b"S' ' \n.", b'S" " \n.', b"S ''\n.", b'S ""\n.', b'S \n.', b'S\n.', b'S.'] for p in badpickles: self.check_unpickling_error(pickle.UnpicklingError, p)
def test_long(self): for proto in protocols: # 256 bytes is where LONG4 begins. for nbits in 1, 8, 8*254, 8*255, 8*256, 8*257: nbase = 1 << nbits for npos in nbase-1, nbase, nbase+1: for n in npos, -npos: pickle = self.dumps(n, proto) got = self.loads(pickle) self.assert_is_copy(n, got) # Try a monster. This is quadratic-time in protos 0 & 1, so don't # bother with those. nbase = int("deadbeeffeedface", 16) nbase += nbase << 1000000 for n in nbase, -nbase: p = self.dumps(n, 2) got = self.loads(p) # assert_is_copy is very expensive here as it precomputes # a failure message by computing the repr() of n and got, # we just do the check ourselves. self.assertIs(type(got), int) self.assertEqual(n, got)
def test_proto(self): for proto in protocols: pickled = self.dumps(None, proto) if proto >= 2: proto_header = pickle.PROTO + bytes([proto]) self.assertTrue(pickled.startswith(proto_header)) else: self.assertEqual(count_opcode(pickle.PROTO, pickled), 0) oob = protocols[-1] + 1 # a future protocol build_none = pickle.NONE + pickle.STOP badpickle = pickle.PROTO + bytes([oob]) + build_none try: self.loads(badpickle) except ValueError as err: self.assertIn("unsupported pickle protocol", str(err)) else: self.fail("expected bad protocol number to raise ValueError")
def test_list_chunking(self): n = 10 # too small to chunk x = list(range(n)) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assert_is_copy(x, y) num_appends = count_opcode(pickle.APPENDS, s) self.assertEqual(num_appends, proto > 0) n = 2500 # expect at least two chunks when proto > 0 x = list(range(n)) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assert_is_copy(x, y) num_appends = count_opcode(pickle.APPENDS, s) if proto == 0: self.assertEqual(num_appends, 0) else: self.assertTrue(num_appends >= 2)
def test_set_chunking(self): n = 10 # too small to chunk x = set(range(n)) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assert_is_copy(x, y) num_additems = count_opcode(pickle.ADDITEMS, s) if proto < 4: self.assertEqual(num_additems, 0) else: self.assertEqual(num_additems, 1) n = 2500 # expect at least two chunks when proto >= 4 x = set(range(n)) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assert_is_copy(x, y) num_additems = count_opcode(pickle.ADDITEMS, s) if proto < 4: self.assertEqual(num_additems, 0) else: self.assertGreaterEqual(num_additems, 2)
def check_frame_opcodes(self, pickled): """ Check the arguments of FRAME opcodes in a protocol 4+ pickle. """ frame_opcode_size = 9 last_arg = last_pos = None for op, arg, pos in pickletools.genops(pickled): if op.name != 'FRAME': continue if last_pos is not None: # The previous frame's size should be equal to the number # of bytes up to the current frame. frame_size = pos - last_pos - frame_opcode_size self.assertEqual(frame_size, last_arg) last_arg, last_pos = arg, pos # The last frame's size should be equal to the number of bytes up # to the pickle's end. frame_size = len(pickled) - last_pos - frame_opcode_size self.assertEqual(frame_size, last_arg)
def test_huge_bytes_32b(self, size): data = b"abcd" * (size // 4) try: for proto in protocols: if proto < 3: continue with self.subTest(proto=proto): try: pickled = self.dumps(data, protocol=proto) header = (pickle.BINBYTES + struct.pack("<I", len(data))) data_start = pickled.index(data) self.assertEqual( header, pickled[data_start-len(header):data_start]) finally: pickled = None finally: data = None
def test_huge_str_32b(self, size): data = "abcd" * (size // 4) try: for proto in protocols: if proto == 0: continue with self.subTest(proto=proto): try: pickled = self.dumps(data, protocol=proto) header = (pickle.BINUNICODE + struct.pack("<I", len(data))) data_start = pickled.index(b'abcd') self.assertEqual( header, pickled[data_start-len(header):data_start]) self.assertEqual((pickled.rindex(b"abcd") + len(b"abcd") - pickled.index(b"abcd")), len(data)) finally: pickled = None finally: data = None # BINUNICODE (protocols 1, 2 and 3) cannot carry more than 2**32 - 1 bytes # of utf-8 encoded unicode. BINUNICODE8 (protocol 4) supports these huge # unicode strings however.
def InitMessage(descriptor, cls): cls._decoders_by_tag = {} cls._extensions_by_name = {} cls._extensions_by_number = {} if (descriptor.has_options and descriptor.GetOptions().message_set_wire_format): cls._decoders_by_tag[decoder.MESSAGE_SET_ITEM_TAG] = ( decoder.MessageSetItemDecoder(cls._extensions_by_number), None) for field in descriptor.fields: _AttachFieldHelpers(cls, field) _AddEnumValues(descriptor, cls) _AddInitMethod(descriptor, cls) _AddPropertiesForFields(descriptor, cls) _AddPropertiesForExtensions(descriptor, cls) _AddStaticMethods(cls) _AddMessageMethods(descriptor, cls) _AddPrivateHelperMethods(descriptor, cls) copyreg.pickle(cls, lambda obj: (cls, (), obj.__getstate__()))
def test_nonIdentityHash(self): global ClassWithCustomHash class ClassWithCustomHash(styles.Versioned): def __init__(self, unique, hash): self.unique = unique self.hash = hash def __hash__(self): return self.hash v1 = ClassWithCustomHash('v1', 0) v2 = ClassWithCustomHash('v2', 0) pkl = pickle.dumps((v1, v2)) del v1, v2 ClassWithCustomHash.persistenceVersion = 1 ClassWithCustomHash.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True) v1, v2 = pickle.loads(pkl) styles.doUpgrade() self.assertEqual(v1.unique, 'v1') self.assertEqual(v2.unique, 'v2') self.assertTrue(v1.upgraded) self.assertTrue(v2.upgraded)
def test_upgradeDeserializesObjectsRequiringUpgrade(self): global ToyClassA, ToyClassB class ToyClassA(styles.Versioned): pass class ToyClassB(styles.Versioned): pass x = ToyClassA() y = ToyClassB() pklA, pklB = pickle.dumps(x), pickle.dumps(y) del x, y ToyClassA.persistenceVersion = 1 def upgradeToVersion1(self): self.y = pickle.loads(pklB) styles.doUpgrade() ToyClassA.upgradeToVersion1 = upgradeToVersion1 ToyClassB.persistenceVersion = 1 ToyClassB.upgradeToVersion1 = lambda self: setattr(self, 'upgraded', True) x = pickle.loads(pklA) styles.doUpgrade() self.assertTrue(x.y.upgraded)
def _pickleFunction(f): """ Reduce, in the sense of L{pickle}'s C{object.__reduce__} special method, a function object into its constituent parts. @param f: The function to reduce. @type f: L{types.FunctionType} @return: a 2-tuple of a reference to L{_unpickleFunction} and a tuple of its arguments, a 1-tuple of the function's fully qualified name. @rtype: 2-tuple of C{callable, native string} """ if f.__name__ == '<lambda>': raise _UniversalPicklingError( "Cannot pickle lambda function: {}".format(f)) return (_unpickleFunction, tuple([".".join([f.__module__, f.__qualname__])]))
def test_singletons(self): # Map (proto, singleton) to expected opcode. expected_opcode = {(0, None): pickle.NONE, (1, None): pickle.NONE, (2, None): pickle.NONE, (3, None): pickle.NONE, (0, True): pickle.INT, (1, True): pickle.INT, (2, True): pickle.NEWTRUE, (3, True): pickle.NEWTRUE, (0, False): pickle.INT, (1, False): pickle.INT, (2, False): pickle.NEWFALSE, (3, False): pickle.NEWFALSE, } for proto in protocols: for x in None, False, True: s = self.dumps(x, proto) y = self.loads(s) self.assertTrue(x is y, (proto, x, s, y)) expected = expected_opcode[min(proto, 3), x] self.assertTrue(opcode_in_pickle(expected, s))
def test_dict_chunking(self): n = 10 # too small to chunk x = dict.fromkeys(range(n)) for proto in protocols: s = self.dumps(x, proto) self.assertIsInstance(s, bytes_types) y = self.loads(s) self.assert_is_copy(x, y) num_setitems = count_opcode(pickle.SETITEMS, s) self.assertEqual(num_setitems, proto > 0) n = 2500 # expect at least two chunks when proto > 0 x = dict.fromkeys(range(n)) for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assert_is_copy(x, y) num_setitems = count_opcode(pickle.SETITEMS, s) if proto == 0: self.assertEqual(num_setitems, 0) else: self.assertTrue(num_setitems >= 2)
def test_badly_quoted_string(self): # Issue #17710 badpickles = [b"S'\n.", b'S"\n.', b'S\' \n.', b'S" \n.', b'S\'"\n.', b'S"\'\n.', b"S' ' \n.", b'S" " \n.', b"S ''\n.", b'S ""\n.', b'S \n.', b'S\n.', b'S.'] for p in badpickles: self.assertRaises(pickle.UnpicklingError, self.loads, p)
def _ufunc_reconstruct(module, name): # The `fromlist` kwarg is required to ensure that `mod` points to the # inner-most module rather than the parent package when module name is # nested. This makes it possible to pickle non-toplevel ufuncs such as # scipy.special.expit for instance. mod = __import__(module, fromlist=[name]) return getattr(mod, name)
def _ufunc_reduce(func): from pickle import whichmodule name = func.__name__ return _ufunc_reconstruct, (whichmodule(func, name), name)
def install(): try: import copy_reg except ImportError: import copyreg as copy_reg copy_reg.pickle(TracebackType, pickle_traceback) # Added by gevent # We have to defer the initialization, and especially the import of platform, # until runtime. If we're monkey patched, we need to be sure to use # the original __import__ to avoid switching through the hub due to # import locks on Python 2. See also builtins.py for details.
def _import_dump_load(): global dumps global loads try: import cPickle as pickle except ImportError: import pickle dumps = pickle.dumps loads = pickle.loads
def dump_traceback(tb): # Both _init and dump/load have to be unlocked, because # copy_reg and pickle can do imports to resolve class names; those # class names are in this module and greenlet safe though _init() return dumps(tb)
def save_dbt(self,table, sql_fields_current): tfile = self.file_open(table._dbt, 'w') pickle.dump(sql_fields_current, tfile) self.file_close(tfile)
def import_table_definitions(self, path, migrate=False, fake_migrate=False, items=None): pattern = pjoin(path,self._uri_hash+'_*.table') if items: for tablename, table in items.iteritems(): # TODO: read all field/table options fields = [] # remove unsupported/illegal Table arguments [table.pop(name) for name in ("name", "fields") if name in table] if "items" in table: for fieldname, field in table.pop("items").iteritems(): # remove unsupported/illegal Field arguments [field.pop(key) for key in ("requires", "name", "compute", "colname") if key in field] fields.append(Field(str(fieldname), **field)) self.define_table(str(tablename), *fields, **table) else: for filename in glob.glob(pattern): tfile = self._adapter.file_open(filename, 'r') try: sql_fields = pickle.load(tfile) name = filename[len(pattern)-7:-6] mf = [(value['sortable'], Field(key, type=value['type'], length=value.get('length',None), notnull=value.get('notnull',False), unique=value.get('unique',False))) \ for key, value in sql_fields.iteritems()] mf.sort(lambda a,b: cmp(a[0],b[0])) self.define_table(name,*[item[1] for item in mf], **dict(migrate=migrate, fake_migrate=fake_migrate)) finally: self._adapter.file_close(tfile)
def __init__(self, address=None, authkey=None, serializer='pickle'): if authkey is None: authkey = current_process().authkey self._address = address # XXX not final address if eg ('', 0) self._authkey = AuthenticationString(authkey) self._state = State() self._state.value = State.INITIAL self._serializer = serializer self._Listener, self._Client = listener_client[serializer]
def opcode_in_pickle(code, pickle): for op, dummy, dummy in pickletools.genops(pickle): if op.code == code.decode("latin-1"): return True return False # Return the number of times opcode code appears in pickle.
def count_opcode(code, pickle): n = 0 for op, dummy, dummy in pickletools.genops(pickle): if op.code == code.decode("latin-1"): n += 1 return n
def test_float(self): test_values = [0.0, 4.94e-324, 1e-310, 7e-308, 6.626e-34, 0.1, 0.5, 3.14, 263.44582062374053, 6.022e23, 1e30] test_values = test_values + [-x for x in test_values] for proto in protocols: for value in test_values: pickle = self.dumps(value, proto) got = self.loads(pickle) self.assertEqual(value, got)
def test_long1(self): x = 12345678910111213141516178920 for proto in protocols: s = self.dumps(x, proto) y = self.loads(s) self.assertEqual(x, y) self.assertEqual(opcode_in_pickle(pickle.LONG1, s), proto >= 2)