我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.PY3。
def sort(cmpfn): if not this.Class in ('Array', 'Arguments'): return this.to_object() # do nothing arr = [] for i in xrange(len(this)): arr.append(this.get(six.text_type(i))) if not arr: return this if not cmpfn.is_callable(): cmpfn = None cmp = lambda a,b: sort_compare(a, b, cmpfn) if six.PY3: key = functools.cmp_to_key(cmp) arr.sort(key=key) else: arr.sort(cmp=cmp) for i in xrange(len(arr)): this.put(six.text_type(i), arr[i]) return this
def parse_slave_and_channel(file_path): """Parse the dumped file to get slave address and channel number. :param file_path: file path of dumped SDR file. :return: slave address and channel number of target device. """ prefix = '5701000d01' # According to Intel Node Manager spec, section 4.5, for Intel NM # discovery OEM SDR records are type C0h. It contains manufacture ID # and OEM data in the record body. # 0-2 bytes are OEM ID, byte 3 is 0Dh and byte 4 is 01h. Byte 5, 6 # is Intel NM device slave address and channel number/sensor owner LUN. with open(file_path, 'rb') as bin_fp: data_str = binascii.hexlify(bin_fp.read()) if six.PY3: data_str = data_str.decode() oem_id_index = data_str.find(prefix) if oem_id_index != -1: ret = data_str[oem_id_index + len(prefix): oem_id_index + len(prefix) + 4] # Byte 5 is slave address. [7:4] from byte 6 is channel # number, so just pick ret[2] here. return ('0x' + ret[0:2], '0x0' + ret[2])
def get_mon_map(service): """ Returns the current monitor map. :param service: six.string_types. The Ceph user name to run the command under :return: json string. :raise: ValueError if the monmap fails to parse. Also raises CalledProcessError if our ceph command fails """ try: mon_status = check_output(['ceph', '--id', service, 'mon_status', '--format=json']) if six.PY3: mon_status = mon_status.decode('UTF-8') try: return json.loads(mon_status) except ValueError as v: log("Unable to parse mon_status json: {}. Error: {}" .format(mon_status, str(v))) raise except CalledProcessError as e: log("mon_status command failed with message: {}" .format(str(e))) raise
def get_cache_mode(service, pool_name): """ Find the current caching mode of the pool_name given. :param service: six.string_types. The Ceph user name to run the command under :param pool_name: six.string_types :return: int or None """ validator(value=service, valid_type=six.string_types) validator(value=pool_name, valid_type=six.string_types) out = check_output(['ceph', '--id', service, 'osd', 'dump', '--format=json']) if six.PY3: out = out.decode('UTF-8') try: osd_json = json.loads(out) for pool in osd_json['pools']: if pool['pool_name'] == pool_name: return pool['cache_mode'] return None except ValueError: raise
def write(self, config_file): """ Write a single config file, raises if config file is not registered. """ if config_file not in self.templates: log('Config not registered: %s' % config_file, level=ERROR) raise OSConfigException _out = self.render(config_file) if six.PY3: _out = _out.encode('UTF-8') with open(config_file, 'wb') as out: out.write(_out) log('Wrote template %s.' % config_file, level=INFO)
def output_articles(articles): if len(articles) == 0: print('No articles found') return try: pager = subprocess.Popen(['less'], stdin=subprocess.PIPE, stdout=sys.stdout) for article in articles: if int(article['reading_time']) <= 0: article['reading_time'] = 'Unknown' content = format_article(article, line=True) if six.PY3: content = bytearray(content, 'utf-8') pager.stdin.write(content) pager.stdin.close() pager.wait() except (KeyboardInterrupt, ValueError): pass
def write(self, data): if not data: return write_header = False if self.is_empty(): write_header = True mode = 'a+b' if six.PY3: mode = 'a+t' with open(self._filename, mode) as csv_file: dict_writer = csv.DictWriter(csv_file, data[0].keys()) if write_header: dict_writer.writeheader() dict_writer.writerows(self._encode_data(data))
def testLongIntegers(self): if not PY3: # There is no longs in python3 self.assertEqual(list(rrule(MINUTELY, count=long(2), interval=long(2), bymonth=long(2), byweekday=long(3), byhour=long(6), byminute=long(6), bysecond=long(6), dtstart=datetime(1997, 9, 2, 9, 0))), [datetime(1998, 2, 5, 6, 6, 6), datetime(1998, 2, 12, 6, 6, 6)]) self.assertEqual(list(rrule(YEARLY, count=long(2), bymonthday=long(5), byweekno=long(2), dtstart=datetime(1997, 9, 2, 9, 0))), [datetime(1998, 1, 5, 9, 0), datetime(2004, 1, 5, 9, 0)])
def native(s): """ Convert :py:class:`bytes` or :py:class:`unicode` to the native :py:class:`str` type, using UTF-8 encoding if conversion is necessary. :raise UnicodeError: The input string is not UTF-8 decodeable. :raise TypeError: The input is neither :py:class:`bytes` nor :py:class:`unicode`. """ if not isinstance(s, (binary_type, text_type)): raise TypeError("%r is neither bytes nor unicode" % s) if PY3: if isinstance(s, binary_type): return s.decode("utf-8") else: if isinstance(s, text_type): return s.encode("utf-8") return s
def _bn_to_int(self, bn): assert bn != self._ffi.NULL if six.PY3: # Python 3 has constant time from_bytes, so use that. bn_num_bytes = self._lib.BN_num_bytes(bn) bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes) bin_len = self._lib.BN_bn2bin(bn, bin_ptr) # A zero length means the BN has value 0 self.openssl_assert(bin_len >= 0) return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big") else: # Under Python 2 the best we can do is hex() hex_cdata = self._lib.BN_bn2hex(bn) self.openssl_assert(hex_cdata != self._ffi.NULL) hex_str = self._ffi.string(hex_cdata) self._lib.OPENSSL_free(hex_cdata) return int(hex_str, 16)
def test_stack_output(self): self.assertEqual(u'<pre>foo</pre>', mappings.stack_output('foo')) self.assertEqual(u'', mappings.stack_output(None)) outputs = ['one', 'two', 'three'] # On Python 3, the pretty JSON output doesn't add space before newline if six.PY3: expected_text = """[\n "one",\n "two",\n "three"\n]""" else: expected_text = """[\n "one", \n "two", \n "three"\n]""" self.assertEqual(u'<pre>%s</pre>' % html.escape(expected_text), mappings.stack_output(outputs)) outputs = {'foo': 'bar'} expected_text = """{\n "foo": "bar"\n}""" self.assertEqual(u'<pre>%s</pre>' % html.escape(expected_text), mappings.stack_output(outputs)) self.assertEqual( u'<a href="http://www.example.com/foo" target="_blank">' 'http://www.example.com/foo</a>', mappings.stack_output('http://www.example.com/foo'))
def test_admin_metadata_defs_create_namespace_invalid_json_post_raw(self): form_data = { 'source_type': 'raw', 'direct_input': 'invalidjson' } res = self.client.post(reverse(constants.METADATA_CREATE_URL), form_data) if six.PY3: err_msg = ('There was a problem loading the namespace: ' 'Expecting value: line 1 column 1 (char 0).') else: err_msg = ('There was a problem loading the namespace: ' 'No JSON object could be decoded.') self.assertFormError(res, "form", None, [err_msg])
def write(self, buffer): if self.mode == MODE_NUMBER: for i in xrange(0, len(self.data), 3): chars = self.data[i:i + 3] bit_length = NUMBER_LENGTH[len(chars)] buffer.put(int(chars), bit_length) elif self.mode == MODE_ALPHA_NUM: for i in xrange(0, len(self.data), 2): chars = self.data[i:i + 2] if len(chars) > 1: buffer.put( ALPHA_NUM.find(chars[0]) * 45 + ALPHA_NUM.find(chars[1]), 11) else: buffer.put(ALPHA_NUM.find(chars), 6) else: if six.PY3: # Iterating a bytestring in Python 3 returns an integer, # no need to ord(). data = self.data else: data = [ord(c) for c in self.data] for c in data: buffer.put(c, 8)
def write_response_to_filepath_or_buffer(filepath_or_buffer, response): """ Writes the response content to the filepath or buffer. """ if hasattr(filepath_or_buffer, "write"): if six.PY3 and filepath_or_buffer is sys.stdout: # Write bytes to stdout (https://stackoverflow.com/a/23932488) filepath_or_buffer = filepath_or_buffer.buffer mode = getattr(filepath_or_buffer, "mode", "w") for chunk in response.iter_content(chunk_size=1024): if chunk: if "b" not in mode and six.PY3: chunk = chunk.decode("utf-8") filepath_or_buffer.write(chunk) if filepath_or_buffer.seekable(): filepath_or_buffer.seek(0) else: with open(filepath_or_buffer, "wb") as f: for chunk in response.iter_content(chunk_size=1024): if chunk: f.write(chunk)
def read_content(queue): frame = yield queue.get() header = frame.payload children = [] for i in range(header.weight): content = yield read_content(queue) children.append(content) size = header.size read = 0 buf = six.StringIO() while read < size: body = yield queue.get() content = body.payload.content # if this is the first instance of real binary content, convert the string buffer to BytesIO # Not a nice fix but it preserves the original behaviour if six.PY3 and isinstance(content, bytes) and isinstance(buf, six.StringIO): buf = six.BytesIO() buf.write(content) read += len(content) defer.returnValue(Content(buf.getvalue(), children, header.properties.copy()))
def _instance_callable(obj): """Given an object, return True if the object is callable. For classes, return True if instances would be callable.""" if not isinstance(obj, ClassTypes): # already an instance return getattr(obj, '__call__', None) is not None if six.PY3: # *could* be broken by a class overriding __mro__ or __dict__ via # a metaclass for base in (obj,) + obj.__mro__: if base.__dict__.get('__call__') is not None: return True else: klass = obj # uses __bases__ instead of __mro__ so that we work with old style classes if klass.__dict__.get('__call__') is not None: return True for base in klass.__bases__: if _instance_callable(base): return True return False
def query_params(self, value=None): """ Return or set a dictionary of query params :param dict value: new dictionary of values """ if value is not None: return URL._mutate(self, query=unicode_urlencode(value, doseq=True)) query = '' if self._tuple.query is None else self._tuple.query # In Python 2.6, urlparse needs a bytestring so we encode and then # decode the result. if not six.PY3: result = parse_qs(to_utf8(query), True) return dict_to_unicode(result) return parse_qs(query, True)
def run_with(*drivers): """Used to mark tests that are only applicable for certain db driver. Skips test if driver is not available. """ def decorator(test): if isinstance(test, type) and issubclass(test, TestBase): # Decorate all test methods for attr in dir(test): value = getattr(test, attr) if callable(value) and attr.startswith('test_'): if six.PY3: value._run_with = drivers else: value.__func__._run_with = drivers else: test._run_with = drivers return test return decorator
def _test_run_expirer_ttl_enabled(self, ttl_name, data_name): content = ("[database]\n" "%s=1\n" "connection=log://localhost\n" % ttl_name) if six.PY3: content = content.encode('utf-8') self.tempfile = fileutils.write_to_tempfile(content=content, prefix='panko', suffix='.conf') subp = subprocess.Popen(['panko-expirer', '-d', "--config-file=%s" % self.tempfile], stdout=subprocess.PIPE) out, __ = subp.communicate() self.assertEqual(0, subp.poll()) msg = "Dropping %s data with TTL 1" % data_name if six.PY3: msg = msg.encode('utf-8') self.assertIn(msg, out)
def _verify_docker_image_size(self, image_name): """Verifies size of Docker image. Args: image_name: name of the Docker image. Returns: True if image size is withing the limits, False otherwise. """ shell_call(['docker', 'pull', image_name]) try: image_size = subprocess.check_output( ['docker', 'inspect', '--format={{.Size}}', image_name]).strip() image_size = int(image_size) if PY3 else long(image_size) except (ValueError, subprocess.CalledProcessError) as e: logging.error('Failed to determine docker image size: %s', e) return False logging.info('Size of docker image %s is %d', image_name, image_size) if image_size > MAX_DOCKER_IMAGE_SIZE: logging.error('Image size exceeds limit %d', MAX_DOCKER_IMAGE_SIZE) return image_size <= MAX_DOCKER_IMAGE_SIZE
def compute_signature(self, uri, params, utf=PY3): """Compute the signature for a given request :param uri: full URI that Twilio requested on your server :param params: post vars that Twilio sent with the request :param utf: whether return should be bytestring or unicode (python3) :returns: The computed signature """ s = uri if len(params) > 0: for k, v in sorted(params.items()): s += k + v # compute signature and compare signatures mac = hmac.new(self.token, s.encode("utf-8"), sha1) computed = base64.b64encode(mac.digest()) if utf: computed = computed.decode('utf-8') return computed.strip()
def install_hooks(request): urllibver = request.getfixturevalue('urllibver') if urllibver == 'urllib2': if six.PY3: yield None return module = urllib2 else: module = urllib.request old_opener = module._opener old_callee_headers = CONFIG.callee_name_headers old_endpoint_headers = CONFIG.callee_endpoint_headers urllib2_hooks.install_patches.__original_func() CONFIG.callee_name_headers = ['Remote-Loc'] CONFIG.callee_endpoint_headers = ['Remote-Op'] yield module module.install_opener(old_opener) CONFIG.callee_name_headers = old_callee_headers CONFIG.callee_endpoint_headers = old_endpoint_headers
def __init__(self, data): super(TestResponse, self).__init__() self._content_consumed = True if isinstance(data, dict): self.status_code = data.get('status_code', 200) # Fake the text attribute to streamline Response creation text = data.get('text', "") if isinstance(text, (dict, list)): self._content = json.dumps(text) default_headers = { "Content-Type": "application/json", } else: self._content = text default_headers = {} if six.PY3 and isinstance(self._content, six.string_types): self._content = self._content.encode('utf-8', 'strict') self.headers = data.get('headers') or default_headers else: self.status_code = data
def testToStrLongIntegers(self): if not PY3: # There is no longs in python3 self._rrulestr_reverse_test(rrule(MINUTELY, count=long(2), interval=long(2), bymonth=long(2), byweekday=long(3), byhour=long(6), byminute=long(6), bysecond=long(6), dtstart=datetime(1997, 9, 2, 9, 0))) self._rrulestr_reverse_test(rrule(YEARLY, count=long(2), bymonthday=long(5), byweekno=long(2), dtstart=datetime(1997, 9, 2, 9, 0)))
def tzname_in_python2(namefunc): """Change unicode output into bytestrings in Python 2 tzname() API changed in Python 3. It used to return bytes, but was changed to unicode strings """ def adjust_encoding(*args, **kwargs): name = namefunc(*args, **kwargs) if name is not None and not PY3: name = name.encode() return name return adjust_encoding # The following is adapted from Alexander Belopolsky's tz library # https://github.com/abalkin/tz
def get_id(source_uuid): """Derive a short (12 character) id from a random UUID. The supplied UUID must be a version 4 UUID object. """ if isinstance(source_uuid, six.string_types): source_uuid = uuid.UUID(source_uuid) if source_uuid.version != 4: raise ValueError(_('Invalid UUID version (%d)') % source_uuid.version) # The "time" field of a v4 UUID contains 60 random bits # (see RFC4122, Section 4.4) random_bytes = _to_byte_string(source_uuid.time, 60) # The first 12 bytes (= 60 bits) of base32-encoded output is our data encoded = base64.b32encode(six.b(random_bytes))[:12] if six.PY3: return encoded.lower().decode('utf-8') else: return encoded.lower()