我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用future.utils.PY2。
def xml_to_str(tree, encoding=None, xml_declaration=False): from xml.etree.ElementTree import ElementTree # tostring() returns bytecode unless encoding is 'unicode', and does not reliably produce an XML declaration. We # ALWAYS want bytecode so we can convert to unicode explicitly. if encoding is None: assert not xml_declaration if PY2: stream = io.BytesIO() else: stream = io.StringIO() encoding = 'unicode' else: stream = io.BytesIO() stream.write(('<?xml version="1.0" encoding="%s"?>' % encoding).encode(encoding)) ElementTree(tree).write(stream, encoding=encoding, xml_declaration=False, method='xml') return stream.getvalue()
def emit(self, record): """Pretty-print and syntax highlight a log statement if all these conditions are met: * This is a DEBUG message * We're outputting to a terminal * The log message args is a dict containing keys starting with 'xml_' and values as bytes """ if record.levelno == logging.DEBUG and self.is_tty() and isinstance(record.args, dict): for key, value in record.args.items(): if not key.startswith('xml_'): continue if not isinstance(value, bytes): continue if not is_xml(value[:10].decode('utf-8', errors='ignore')): continue try: if PY2: record.args[key] = self.highlight_xml(self.prettify_xml(value)).encode('utf-8') else: record.args[key] = self.highlight_xml(self.prettify_xml(value)) except Exception: # Something bad happened, but we don't want to crash the program just because logging failed pass return super(PrettyXmlHandler, self).emit(record)
def __new__(cls, offset, name=_Omitted): if not isinstance(offset, timedelta): raise TypeError("offset must be a timedelta") if name is cls._Omitted: if not offset: return cls.utc name = None elif not isinstance(name, str): ### # For Python-Future: if PY2 and isinstance(name, native_str): name = name.decode() else: raise TypeError("name must be a string") ### if not cls._minoffset <= offset <= cls._maxoffset: raise ValueError("offset must be a timedelta" " strictly between -timedelta(hours=24) and" " timedelta(hours=24).") if (offset.microseconds != 0 or offset.seconds % 60 != 0): raise ValueError("offset must be a timedelta" " representing a whole number of minutes") return cls._create(offset, name)
def idle_cpu_count(mincpu=1): """Estimate number of idle CPUs, for use by multiprocessing code needing to determine how many processes can be run without excessive load. This function uses :func:`os.getloadavg` which is only available under a Unix OS. Parameters ---------- mincpu : int Minimum number of CPUs to report, independent of actual estimate Returns ------- idle : int Estimate of number of idle CPUs """ if PY2: ncpu = mp.cpu_count() else: ncpu = os.cpu_count() idle = int(ncpu - np.floor(os.getloadavg()[0])) return max(mincpu, idle)
def test_call_with_args_does_nothing(self): if utils.PY2: from __builtin__ import super as builtin_super else: from builtins import super as builtin_super class Base(object): def calc(self,value): return 2 * value class Sub1(Base): def calc(self,value): return 7 + super().calc(value) class Sub2(Base): def calc(self,value): return super().calc(value) - 1 class Diamond(Sub1,Sub2): def calc(self,value): return 3 * super().calc(value) for cls in (Base,Sub1,Sub2,Diamond,): obj = cls() self.assertSuperEquals(builtin_super(cls), super(cls)) self.assertSuperEquals(builtin_super(cls,obj), super(cls,obj))
def test_eq_bytes(self): s = str('ABCD') b = bytes(b'ABCD') self.assertNotEqual(s, b) self.assertNotEqual(str(''), bytes(b'')) native_s = 'ABCD' native_b = b'ABCD' self.assertFalse(b == native_s) self.assertTrue(b != native_s) # Fails on Py2: # self.assertNotEqual(native_s, b) # with no obvious way to change this. # For backward compatibility with broken string-handling code in # Py2 libraries, we allow the following: if utils.PY2: self.assertTrue(native_b == s) self.assertFalse(s != native_b)
def test_mul(self): s = str(u'ABC') c = s * 4 self.assertTrue(isinstance(c, str)) self.assertEqual(c, u'ABCABCABCABC') d = s * int(4) self.assertTrue(isinstance(d, str)) self.assertEqual(d, u'ABCABCABCABC') if utils.PY2: e = s * long(4) self.assertTrue(isinstance(e, str)) self.assertEqual(e, u'ABCABCABCABC') with self.assertRaises(TypeError): s * 3.3 with self.assertRaises(TypeError): s * (3.3 + 3j)
def test_rmul(self): s = str(u'XYZ') c = 3 * s self.assertTrue(isinstance(c, str)) self.assertEqual(c, u'XYZXYZXYZ') d = s * int(3) self.assertTrue(isinstance(d, str)) self.assertEqual(d, u'XYZXYZXYZ') if utils.PY2: e = long(3) * s self.assertTrue(isinstance(e, str)) self.assertEqual(e, u'XYZXYZXYZ') with self.assertRaises(TypeError): 3.3 * s with self.assertRaises(TypeError): (3.3 + 3j) * s
def test_suspend_hooks(self): """ Code like the try/except block here appears in Pyflakes v0.6.1. This method tests whether suspend_hooks() works as advertised. """ example_PY2_check = False with standard_library.suspend_hooks(): # An example of fragile import code that we don't want to break: try: import builtins except ImportError: example_PY2_check = True if utils.PY2: self.assertTrue(example_PY2_check) else: self.assertFalse(example_PY2_check) # The import should succeed again now: import builtins
def shelve_open_with_failover(filename): # We can expect empty or corrupt files. Whatever happens, just delete the cache file and try again. # 'shelve' may add a backend-specific suffix to the file, so also delete all files with a suffix. # We don't know which file caused the error, so just delete them all. try: shelve_handle = shelve.open(filename) except Exception as e: for f in glob.glob(filename + '*'): log.warning('Deleting invalid cache file %s (%r)', f, e) os.unlink(f) shelve_handle = shelve.open(filename) yield shelve_handle if PY2: shelve_handle.close()
def to_xml(text): try: if PY2: # On python2, fromstring expects an encoded string return fromstring((text[BOM_LEN:] if text.startswith(BOM) else text).encode('utf-8')) return fromstring(text[BOM_LEN:] if text.startswith(BOM) else text) except ParseError: # Exchange servers may spit out the weirdest XML. lxml is pretty good at recovering from errors log.warning('Fallback to lxml processing of faulty XML') magical_parser = XMLParser(recover=True) no_bom_text = text[BOM_LEN:] if text.startswith(BOM) else text root = parse(io.BytesIO(no_bom_text.encode('utf-8')), magical_parser) try: return fromstring(tostring(root)) except ParseError as e: if hasattr(e, 'position'): e.lineno, e.offset = e.position if not e.lineno: raise ParseError('%s' % text_type(e)) try: offending_line = no_bom_text.splitlines()[e.lineno - 1] except IndexError: raise ParseError('%s' % text_type(e)) else: offending_excerpt = offending_line[max(0, e.offset - 20):e.offset + 20] raise ParseError('%s\nOffending text: [...]%s[...]' % (text_type(e), offending_excerpt)) except TypeError: raise ParseError('This is not XML: %s' % text)
def expectedFailurePY2(func): if not PY2: return func return unittest.expectedFailure(func) # Renamed in Py3.3:
def readinto(self, b): if self.fp is None: return 0 if self._method == "HEAD": self._close_conn() return 0 if self.chunked: return self._readinto_chunked(b) if self.length is not None: if len(b) > self.length: # clip the read to the "end of response" b = memoryview(b)[0:self.length] # we do not use _safe_read() here because this may be a .will_close # connection, and the user is reading more bytes than will be provided # (for example, reading in 1k chunks) if PY2: data = self.fp.read(len(b)) n = len(data) b[:n] = data else: n = self.fp.readinto(b) if not n and b: # Ideally, we would raise IncompleteRead if the content-length # wasn't satisfied, but it might break compatibility. self._close_conn() elif self.length is not None: self.length -= n if not self.length: self._close_conn() return n
def __repr__(self): if PY2 and isinstance(self.value, unicode): val = str(self.value) # make it a newstr to remove the u prefix else: val = self.value return '<%s: %s=%s>' % (self.__class__.__name__, str(self.key), repr(val))
def __repr__(self): l = [] items = sorted(self.items()) for key, value in items: if PY2 and isinstance(value.value, unicode): val = str(value.value) # make it a newstr to remove the u prefix else: val = value.value l.append('%s=%s' % (str(key), repr(val))) return '<%s: %s>' % (self.__class__.__name__, _spacejoin(l))
def get_tempfile(): if not os.path.exists(CACHEDIR): if PY2: try: os.mkdir(CACHEDIR) except OSError: if not os.path.isdir(CACHEDIR): raise else: os.makedirs(CACHEDIR, exist_ok=True) tmpfile = tempfile.NamedTemporaryFile(dir=CACHEDIR, delete=False) return tmpfile
def test_issue_159(self): """ The latest version of urllib3 (as of 2015-07-25) uses http.client.HTTPMessage, which isn't normally exported on Py2 through __all__ in httplib.py. """ from http.client import HTTPMessage if PY2: import mimetools assert issubclass(HTTPMessage, mimetools.Message) else: import email.message assert issubclass(HTTPMessage, email.message.Message)
def test_fails_for_raw_functions(self): def not_a_method(): super().not_a_method() self.assertRaises(RuntimeError,not_a_method) def not_a_method(self): super().not_a_method() if utils.PY2: self.assertRaises(RuntimeError,not_a_method,self) else: self.assertRaises(AttributeError,not_a_method,self)
def test_str_join_bytes(self): s = str('ABCD') byte_strings1 = [b'EFGH', u'IJKL'] # We allow this on Python 2 for compatibility with old libraries: if utils.PY2: self.assertEqual(s.join(byte_strings1), u'EFGHABCDIJKL') byte_strings2 = [bytes(b'EFGH'), u'IJKL'] with self.assertRaises(TypeError): s.join(byte_strings2)
def test_str_contains_something(self): s = str('ABCD') self.assertTrue('A' in s) if utils.PY2: self.assertTrue(b'A' in s) with self.assertRaises(TypeError): bytes(b'A') in s with self.assertRaises(TypeError): 65 in s # unlike bytes self.assertTrue('AB' in s) self.assertFalse(str([65, 66]) in s) # unlike bytes self.assertFalse('AC' in s) self.assertFalse('Z' in s)
def test_join(self): sep = str('-') self.assertEqual(sep.join('abcd'), 'a-b-c-d') if utils.PY2: sep.join(b'abcd') with self.assertRaises(TypeError) as cm: sep.join(bytes(b'abcd'))
def test_endswith(self): s = str('abcd') self.assertTrue(s.endswith('d')) self.assertTrue(s.endswith(('b', 'd'))) self.assertTrue(s.endswith(str('cd'))) self.assertFalse(s.endswith(('A', 'B'))) if utils.PY2: self.assertFalse(s.endswith(b'D')) self.assertTrue(s.endswith((b'D', b'd'))) with self.assertRaises(TypeError) as cm: s.endswith(65) with self.assertRaises(TypeError) as cm: s.endswith((bytes(b'D'),))
def test_split(self): s = str('ABCD') self.assertEqual(s.split('B'), ['A', 'CD']) if utils.PY2: self.assertEqual(s.split(b'B'), ['A', 'CD']) with self.assertRaises(TypeError) as cm: s.split(bytes(b'B'))
def test_rsplit(self): s = str('ABCD') self.assertEqual(s.rsplit('B'), ['A', 'CD']) if utils.PY2: self.assertEqual(s.rsplit(b'B'), ['A', 'CD']) with self.assertRaises(TypeError) as cm: s.rsplit(bytes(b'B'))
def test_ne(self): s = str('ABCD') self.assertNotEqual('A', s) self.assertNotEqual(s, 'A') self.assertNotEqual(s, 5) self.assertNotEqual(2.7, s) self.assertNotEqual(s, ['A', 'B', 'C', 'D']) if utils.PY2: self.assertFalse(b'ABCD' != s) else: self.assertTrue(b'ABCD' != s) self.assertTrue(bytes(b'ABCD') != s)
def test_is_py2_stdlib_module(self): """ Tests whether the internal is_py2_stdlib_module function (called by the sys.modules scrubbing functions) is reliable. """ externalmodules = [standard_library, utils] self.assertTrue(not any([standard_library.is_py2_stdlib_module(module) for module in externalmodules])) py2modules = [sys, tempfile, copy, textwrap] if utils.PY2: # Debugging: for module in py2modules: if hasattr(module, '__file__'): print(module.__file__, file=sys.stderr) self.assertTrue(all([standard_library.is_py2_stdlib_module(module) for module in py2modules])) else: self.assertTrue( not any ([standard_library.is_py2_stdlib_module(module) for module in py2modules])) # @unittest.skip("No longer relevant") # def test_all_modules_identical(self): # """ # Tests whether all of the old imports in RENAMES are accessible # under their new names. # """ # for (oldname, newname) in standard_library.RENAMES.items(): # if newname == 'winreg' and sys.platform not in ['win32', 'win64']: # continue # if newname in standard_library.REPLACED_MODULES: # # Skip this check for e.g. the stdlib's ``test`` module, # # which we have replaced completely. # continue # oldmod = __import__(oldname) # newmod = __import__(newname) # if '.' not in oldname: # self.assertEqual(oldmod, newmod)
def test_remove_hooks2(self): """ As above, but with the new names """ example_PY2_check = False standard_library.install_hooks() old_meta_path = copy.copy(sys.meta_path) standard_library.remove_hooks() standard_library.scrub_future_sys_modules() if utils.PY2: self.assertTrue(len(old_meta_path) == len(sys.meta_path) + 1) else: self.assertTrue(len(old_meta_path) == len(sys.meta_path)) # An example of fragile import code that we don't want to break: try: import builtins except ImportError: example_PY2_check = True if utils.PY2: self.assertTrue(example_PY2_check) else: self.assertFalse(example_PY2_check) standard_library.install_hooks() # The import should succeed again now: import builtins self.assertTrue(len(old_meta_path) == len(sys.meta_path))
def test_detect_hooks(self): """ Tests whether the future.standard_library.detect_hooks is doing its job. """ standard_library.install_hooks() if utils.PY2: self.assertTrue(standard_library.detect_hooks()) meta_path = copy.copy(sys.meta_path) standard_library.remove_hooks() if utils.PY2: self.assertEqual(len(meta_path), len(sys.meta_path) + 1) self.assertFalse(standard_library.detect_hooks())
def test_next_1(self): """ Custom next methods should not be converted to __next__ in stage1, but any obj.next() calls should be converted to next(obj). """ before = """ class Upper: def __init__(self, iterable): self._iter = iter(iterable) def next(self): # note the Py2 interface return next(self._iter).upper() def __iter__(self): return self itr = Upper('hello') assert itr.next() == 'H' assert next(itr) == 'E' assert list(itr) == list('LLO') """ after = """ class Upper: def __init__(self, iterable): self._iter = iter(iterable) def next(self): # note the Py2 interface return next(self._iter).upper() def __iter__(self): return self itr = Upper('hello') assert next(itr) == 'H' assert next(itr) == 'E' assert list(itr) == list('LLO') """ self.convert_check(before, after, stages=[1], run=PY2)
def test_next_2(self): """ This version of the above doesn't currently work: the self._iter.next() call in line 5 isn't converted to next(self._iter). """ before = """ class Upper: def __init__(self, iterable): self._iter = iter(iterable) def next(self): # note the Py2 interface return self._iter.next().upper() def __iter__(self): return self itr = Upper('hello') assert itr.next() == 'H' assert next(itr) == 'E' assert list(itr) == list('LLO') """ after = """ class Upper(object): def __init__(self, iterable): self._iter = iter(iterable) def next(self): # note the Py2 interface return next(self._iter).upper() def __iter__(self): return self itr = Upper('hello') assert next(itr) == 'H' assert next(itr) == 'E' assert list(itr) == list('LLO') """ self.convert_check(before, after, stages=[1], run=PY2)
def test_xrange(self): """ xrange should not be changed by futurize --stage1 """ code = ''' for i in xrange(10): pass ''' self.unchanged(code, stages=[1], run=PY2)
def test_stdlib_modules_not_changed(self): """ Standard library module names should not be changed in stage 1 """ before = """ import ConfigParser import HTMLParser import collections print 'Hello' try: raise AttributeError('blah') except AttributeError, e: pass """ after = """ import ConfigParser import HTMLParser import collections print('Hello') try: raise AttributeError('blah') except AttributeError as e: pass """ self.convert_check(before, after, stages=[1], run=PY2)
def test_object_implements_py2_unicode_method(self): my_unicode_str = u'Unicode string: \u5b54\u5b50' class A(object): def __str__(self): return my_unicode_str a = A() self.assertEqual(len(str(a)), 18) if utils.PY2: self.assertTrue(hasattr(a, '__unicode__')) else: self.assertFalse(hasattr(a, '__unicode__')) self.assertEqual(str(a), my_unicode_str) self.assertTrue(isinstance(str(a).encode('utf-8'), bytes)) if utils.PY2: self.assertTrue(type(unicode(a)) == unicode) self.assertEqual(unicode(a), my_unicode_str) # Manual equivalent on Py2 without the decorator: if not utils.PY3: class B(object): def __unicode__(self): return u'Unicode string: \u5b54\u5b50' def __str__(self): return unicode(self).encode('utf-8') b = B() assert str(a) == str(b)
def test_long_special_method(self): class A(object): def __int__(self): return 0 a = A() self.assertEqual(int(a), 0) if utils.PY2: self.assertEqual(long(a), 0)
def test_mul(self): b = bytes(b'ABC') c = b * 4 self.assertTrue(isinstance(c, bytes)) self.assertEqual(c, b'ABCABCABCABC') d = b * int(4) self.assertTrue(isinstance(d, bytes)) self.assertEqual(d, b'ABCABCABCABC') if utils.PY2: e = b * long(4) self.assertTrue(isinstance(e, bytes)) self.assertEqual(e, b'ABCABCABCABC')
def test_rmul(self): b = bytes(b'XYZ') c = 3 * b self.assertTrue(isinstance(c, bytes)) self.assertEqual(c, b'XYZXYZXYZ') d = b * int(3) self.assertTrue(isinstance(d, bytes)) self.assertEqual(d, b'XYZXYZXYZ') if utils.PY2: e = long(3) * b self.assertTrue(isinstance(e, bytes)) self.assertEqual(e, b'XYZXYZXYZ')