Python six 模块,PY3 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.PY3

项目:tvlinker    作者:ozmartian    | 项目源码 | 文件源码
def sort(cmpfn):
        if not this.Class in ('Array', 'Arguments'):
            return this.to_object() # do nothing
        arr = []
        for i in xrange(len(this)):
            arr.append(this.get(six.text_type(i)))

        if not arr:
            return this
        if not cmpfn.is_callable():
            cmpfn = None
        cmp = lambda a,b: sort_compare(a, b, cmpfn)
        if six.PY3:
            key = functools.cmp_to_key(cmp)
            arr.sort(key=key)
        else:
            arr.sort(cmp=cmp)
        for i in xrange(len(arr)):
            this.put(six.text_type(i), arr[i])

        return this
项目:ironic-staging-drivers    作者:openstack    | 项目源码 | 文件源码
def parse_slave_and_channel(file_path):
    """Parse the dumped file to get slave address and channel number.

    :param file_path: file path of dumped SDR file.
    :return: slave address and channel number of target device.
    """
    prefix = '5701000d01'
    # According to Intel Node Manager spec, section 4.5, for Intel NM
    # discovery OEM SDR records are type C0h. It contains manufacture ID
    # and OEM data in the record body.
    # 0-2 bytes are OEM ID, byte 3 is 0Dh and byte 4 is 01h. Byte 5, 6
    # is Intel NM device slave address and channel number/sensor owner LUN.
    with open(file_path, 'rb') as bin_fp:
        data_str = binascii.hexlify(bin_fp.read())

    if six.PY3:
        data_str = data_str.decode()
    oem_id_index = data_str.find(prefix)
    if oem_id_index != -1:
        ret = data_str[oem_id_index + len(prefix):
                       oem_id_index + len(prefix) + 4]
        # Byte 5 is slave address. [7:4] from byte 6 is channel
        # number, so just pick ret[2] here.
        return ('0x' + ret[0:2], '0x0' + ret[2])
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_mon_map(service):
    """
    Returns the current monitor map.
    :param service: six.string_types. The Ceph user name to run the command under
    :return: json string. :raise: ValueError if the monmap fails to parse.
      Also raises CalledProcessError if our ceph command fails
    """
    try:
        mon_status = check_output(['ceph', '--id', service,
                                   'mon_status', '--format=json'])
        if six.PY3:
            mon_status = mon_status.decode('UTF-8')
        try:
            return json.loads(mon_status)
        except ValueError as v:
            log("Unable to parse mon_status json: {}. Error: {}"
                .format(mon_status, str(v)))
            raise
    except CalledProcessError as e:
        log("mon_status command failed with message: {}"
            .format(str(e)))
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_cache_mode(service, pool_name):
    """
    Find the current caching mode of the pool_name given.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :return: int or None
    """
    validator(value=service, valid_type=six.string_types)
    validator(value=pool_name, valid_type=six.string_types)
    out = check_output(['ceph', '--id', service,
                        'osd', 'dump', '--format=json'])
    if six.PY3:
        out = out.decode('UTF-8')
    try:
        osd_json = json.loads(out)
        for pool in osd_json['pools']:
            if pool['pool_name'] == pool_name:
                return pool['cache_mode']
        return None
    except ValueError:
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_mon_map(service):
    """
    Returns the current monitor map.
    :param service: six.string_types. The Ceph user name to run the command under
    :return: json string. :raise: ValueError if the monmap fails to parse.
      Also raises CalledProcessError if our ceph command fails
    """
    try:
        mon_status = check_output(['ceph', '--id', service,
                                   'mon_status', '--format=json'])
        if six.PY3:
            mon_status = mon_status.decode('UTF-8')
        try:
            return json.loads(mon_status)
        except ValueError as v:
            log("Unable to parse mon_status json: {}. Error: {}"
                .format(mon_status, str(v)))
            raise
    except CalledProcessError as e:
        log("mon_status command failed with message: {}"
            .format(str(e)))
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def get_cache_mode(service, pool_name):
    """
    Find the current caching mode of the pool_name given.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :return: int or None
    """
    validator(value=service, valid_type=six.string_types)
    validator(value=pool_name, valid_type=six.string_types)
    out = check_output(['ceph', '--id', service,
                        'osd', 'dump', '--format=json'])
    if six.PY3:
        out = out.decode('UTF-8')
    try:
        osd_json = json.loads(out)
        for pool in osd_json['pools']:
            if pool['pool_name'] == pool_name:
                return pool['cache_mode']
        return None
    except ValueError:
        raise
项目:charm-swift-proxy    作者:openstack    | 项目源码 | 文件源码
def write(self, config_file):
        """
        Write a single config file, raises if config file is not registered.
        """
        if config_file not in self.templates:
            log('Config not registered: %s' % config_file, level=ERROR)
            raise OSConfigException

        _out = self.render(config_file)
        if six.PY3:
            _out = _out.encode('UTF-8')

        with open(config_file, 'wb') as out:
            out.write(_out)

        log('Wrote template %s.' % config_file, level=INFO)
项目:pocket-cli    作者:rakanalh    | 项目源码 | 文件源码
def output_articles(articles):
    if len(articles) == 0:
        print('No articles found')
        return

    try:
        pager = subprocess.Popen(['less'],
                                 stdin=subprocess.PIPE,
                                 stdout=sys.stdout)
        for article in articles:
            if int(article['reading_time']) <= 0:
                article['reading_time'] = 'Unknown'
            content = format_article(article, line=True)

            if six.PY3:
                content = bytearray(content, 'utf-8')

            pager.stdin.write(content)

        pager.stdin.close()
        pager.wait()
    except (KeyboardInterrupt, ValueError):
        pass
项目:pocket-cli    作者:rakanalh    | 项目源码 | 文件源码
def write(self, data):
        if not data:
            return

        write_header = False
        if self.is_empty():
            write_header = True

        mode = 'a+b'
        if six.PY3:
            mode = 'a+t'

        with open(self._filename, mode) as csv_file:
            dict_writer = csv.DictWriter(csv_file, data[0].keys())
            if write_header:
                dict_writer.writeheader()

            dict_writer.writerows(self._encode_data(data))
项目:Splunk_CBER_App    作者:MHaggis    | 项目源码 | 文件源码
def testLongIntegers(self):
        if not PY3:  # There is no longs in python3
            self.assertEqual(list(rrule(MINUTELY,
                                  count=long(2),
                                  interval=long(2),
                                  bymonth=long(2),
                                  byweekday=long(3),
                                  byhour=long(6),
                                  byminute=long(6),
                                  bysecond=long(6),
                                  dtstart=datetime(1997, 9, 2, 9, 0))),
                             [datetime(1998, 2, 5, 6, 6, 6),
                              datetime(1998, 2, 12, 6, 6, 6)])
            self.assertEqual(list(rrule(YEARLY,
                                  count=long(2),
                                  bymonthday=long(5),
                                  byweekno=long(2),
                                  dtstart=datetime(1997, 9, 2, 9, 0))),
                             [datetime(1998, 1, 5, 9, 0),
                              datetime(2004, 1, 5, 9, 0)])
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def get_mon_map(service):
    """
    Returns the current monitor map.
    :param service: six.string_types. The Ceph user name to run the command under
    :return: json string. :raise: ValueError if the monmap fails to parse.
      Also raises CalledProcessError if our ceph command fails
    """
    try:
        mon_status = check_output(['ceph', '--id', service,
                                   'mon_status', '--format=json'])
        if six.PY3:
            mon_status = mon_status.decode('UTF-8')
        try:
            return json.loads(mon_status)
        except ValueError as v:
            log("Unable to parse mon_status json: {}. Error: {}"
                .format(mon_status, str(v)))
            raise
    except CalledProcessError as e:
        log("mon_status command failed with message: {}"
            .format(str(e)))
        raise
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def get_cache_mode(service, pool_name):
    """
    Find the current caching mode of the pool_name given.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :return: int or None
    """
    validator(value=service, valid_type=six.string_types)
    validator(value=pool_name, valid_type=six.string_types)
    out = check_output(['ceph', '--id', service,
                        'osd', 'dump', '--format=json'])
    if six.PY3:
        out = out.decode('UTF-8')
    try:
        osd_json = json.loads(out)
        for pool in osd_json['pools']:
            if pool['pool_name'] == pool_name:
                return pool['cache_mode']
        return None
    except ValueError:
        raise
项目:charm-heat    作者:openstack    | 项目源码 | 文件源码
def write(self, config_file):
        """
        Write a single config file, raises if config file is not registered.
        """
        if config_file not in self.templates:
            log('Config not registered: %s' % config_file, level=ERROR)
            raise OSConfigException

        _out = self.render(config_file)
        if six.PY3:
            _out = _out.encode('UTF-8')

        with open(config_file, 'wb') as out:
            out.write(_out)

        log('Wrote template %s.' % config_file, level=INFO)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def get_mon_map(service):
    """
    Returns the current monitor map.
    :param service: six.string_types. The Ceph user name to run the command under
    :return: json string. :raise: ValueError if the monmap fails to parse.
      Also raises CalledProcessError if our ceph command fails
    """
    try:
        mon_status = check_output(['ceph', '--id', service,
                                   'mon_status', '--format=json'])
        if six.PY3:
            mon_status = mon_status.decode('UTF-8')
        try:
            return json.loads(mon_status)
        except ValueError as v:
            log("Unable to parse mon_status json: {}. Error: {}"
                .format(mon_status, str(v)))
            raise
    except CalledProcessError as e:
        log("mon_status command failed with message: {}"
            .format(str(e)))
        raise
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def get_cache_mode(service, pool_name):
    """
    Find the current caching mode of the pool_name given.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :return: int or None
    """
    validator(value=service, valid_type=six.string_types)
    validator(value=pool_name, valid_type=six.string_types)
    out = check_output(['ceph', '--id', service,
                        'osd', 'dump', '--format=json'])
    if six.PY3:
        out = out.decode('UTF-8')
    try:
        osd_json = json.loads(out)
        for pool in osd_json['pools']:
            if pool['pool_name'] == pool_name:
                return pool['cache_mode']
        return None
    except ValueError:
        raise
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def write(self, config_file):
        """
        Write a single config file, raises if config file is not registered.
        """
        if config_file not in self.templates:
            log('Config not registered: %s' % config_file, level=ERROR)
            raise OSConfigException

        _out = self.render(config_file)
        if six.PY3:
            _out = _out.encode('UTF-8')

        with open(config_file, 'wb') as out:
            out.write(_out)

        log('Wrote template %s.' % config_file, level=INFO)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def get_mon_map(service):
    """
    Returns the current monitor map.
    :param service: six.string_types. The Ceph user name to run the command under
    :return: json string. :raise: ValueError if the monmap fails to parse.
      Also raises CalledProcessError if our ceph command fails
    """
    try:
        mon_status = check_output(['ceph', '--id', service,
                                   'mon_status', '--format=json'])
        if six.PY3:
            mon_status = mon_status.decode('UTF-8')
        try:
            return json.loads(mon_status)
        except ValueError as v:
            log("Unable to parse mon_status json: {}. Error: {}"
                .format(mon_status, str(v)))
            raise
    except CalledProcessError as e:
        log("mon_status command failed with message: {}"
            .format(str(e)))
        raise
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def get_cache_mode(service, pool_name):
    """
    Find the current caching mode of the pool_name given.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :return: int or None
    """
    validator(value=service, valid_type=six.string_types)
    validator(value=pool_name, valid_type=six.string_types)
    out = check_output(['ceph', '--id', service,
                        'osd', 'dump', '--format=json'])
    if six.PY3:
        out = out.decode('UTF-8')
    try:
        osd_json = json.loads(out)
        for pool in osd_json['pools']:
            if pool['pool_name'] == pool_name:
                return pool['cache_mode']
        return None
    except ValueError:
        raise
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def write(self, config_file):
        """
        Write a single config file, raises if config file is not registered.
        """
        if config_file not in self.templates:
            log('Config not registered: %s' % config_file, level=ERROR)
            raise OSConfigException

        _out = self.render(config_file)
        if six.PY3:
            _out = _out.encode('UTF-8')

        with open(config_file, 'wb') as out:
            out.write(_out)

        log('Wrote template %s.' % config_file, level=INFO)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def get_mon_map(service):
    """
    Returns the current monitor map.
    :param service: six.string_types. The Ceph user name to run the command under
    :return: json string. :raise: ValueError if the monmap fails to parse.
      Also raises CalledProcessError if our ceph command fails
    """
    try:
        mon_status = check_output(['ceph', '--id', service,
                                   'mon_status', '--format=json'])
        if six.PY3:
            mon_status = mon_status.decode('UTF-8')
        try:
            return json.loads(mon_status)
        except ValueError as v:
            log("Unable to parse mon_status json: {}. Error: {}"
                .format(mon_status, str(v)))
            raise
    except CalledProcessError as e:
        log("mon_status command failed with message: {}"
            .format(str(e)))
        raise
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def get_cache_mode(service, pool_name):
    """
    Find the current caching mode of the pool_name given.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :return: int or None
    """
    validator(value=service, valid_type=six.string_types)
    validator(value=pool_name, valid_type=six.string_types)
    out = check_output(['ceph', '--id', service,
                        'osd', 'dump', '--format=json'])
    if six.PY3:
        out = out.decode('UTF-8')
    try:
        osd_json = json.loads(out)
        for pool in osd_json['pools']:
            if pool['pool_name'] == pool_name:
                return pool['cache_mode']
        return None
    except ValueError:
        raise
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def write(self, config_file):
        """
        Write a single config file, raises if config file is not registered.
        """
        if config_file not in self.templates:
            log('Config not registered: %s' % config_file, level=ERROR)
            raise OSConfigException

        _out = self.render(config_file)
        if six.PY3:
            _out = _out.encode('UTF-8')

        with open(config_file, 'wb') as out:
            out.write(_out)

        log('Wrote template %s.' % config_file, level=INFO)
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def get_mon_map(service):
    """
    Returns the current monitor map.
    :param service: six.string_types. The Ceph user name to run the command under
    :return: json string. :raise: ValueError if the monmap fails to parse.
      Also raises CalledProcessError if our ceph command fails
    """
    try:
        mon_status = check_output(['ceph', '--id', service,
                                   'mon_status', '--format=json'])
        if six.PY3:
            mon_status = mon_status.decode('UTF-8')
        try:
            return json.loads(mon_status)
        except ValueError as v:
            log("Unable to parse mon_status json: {}. Error: {}"
                .format(mon_status, str(v)))
            raise
    except CalledProcessError as e:
        log("mon_status command failed with message: {}"
            .format(str(e)))
        raise
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def get_cache_mode(service, pool_name):
    """
    Find the current caching mode of the pool_name given.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :return: int or None
    """
    validator(value=service, valid_type=six.string_types)
    validator(value=pool_name, valid_type=six.string_types)
    out = check_output(['ceph', '--id', service,
                        'osd', 'dump', '--format=json'])
    if six.PY3:
        out = out.decode('UTF-8')
    try:
        osd_json = json.loads(out)
        for pool in osd_json['pools']:
            if pool['pool_name'] == pool_name:
                return pool['cache_mode']
        return None
    except ValueError:
        raise
项目:charm-keystone    作者:openstack    | 项目源码 | 文件源码
def write(self, config_file):
        """
        Write a single config file, raises if config file is not registered.
        """
        if config_file not in self.templates:
            log('Config not registered: %s' % config_file, level=ERROR)
            raise OSConfigException

        _out = self.render(config_file)
        if six.PY3:
            _out = _out.encode('UTF-8')

        with open(config_file, 'wb') as out:
            out.write(_out)

        log('Wrote template %s.' % config_file, level=INFO)
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def get_mon_map(service):
    """
    Returns the current monitor map.
    :param service: six.string_types. The Ceph user name to run the command under
    :return: json string. :raise: ValueError if the monmap fails to parse.
      Also raises CalledProcessError if our ceph command fails
    """
    try:
        mon_status = check_output(['ceph', '--id', service,
                                   'mon_status', '--format=json'])
        if six.PY3:
            mon_status = mon_status.decode('UTF-8')
        try:
            return json.loads(mon_status)
        except ValueError as v:
            log("Unable to parse mon_status json: {}. Error: {}"
                .format(mon_status, str(v)))
            raise
    except CalledProcessError as e:
        log("mon_status command failed with message: {}"
            .format(str(e)))
        raise
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def get_cache_mode(service, pool_name):
    """
    Find the current caching mode of the pool_name given.
    :param service: six.string_types. The Ceph user name to run the command under
    :param pool_name: six.string_types
    :return: int or None
    """
    validator(value=service, valid_type=six.string_types)
    validator(value=pool_name, valid_type=six.string_types)
    out = check_output(['ceph', '--id', service,
                        'osd', 'dump', '--format=json'])
    if six.PY3:
        out = out.decode('UTF-8')
    try:
        osd_json = json.loads(out)
        for pool in osd_json['pools']:
            if pool['pool_name'] == pool_name:
                return pool['cache_mode']
        return None
    except ValueError:
        raise
项目:charm-nova-cloud-controller    作者:openstack    | 项目源码 | 文件源码
def write(self, config_file):
        """
        Write a single config file, raises if config file is not registered.
        """
        if config_file not in self.templates:
            log('Config not registered: %s' % config_file, level=ERROR)
            raise OSConfigException

        _out = self.render(config_file)
        if six.PY3:
            _out = _out.encode('UTF-8')

        with open(config_file, 'wb') as out:
            out.write(_out)

        log('Wrote template %s.' % config_file, level=INFO)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def native(s):
    """
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
        :py:class:`unicode`.
    """
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
    else:
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _bn_to_int(self, bn):
        assert bn != self._ffi.NULL
        if six.PY3:
            # Python 3 has constant time from_bytes, so use that.
            bn_num_bytes = self._lib.BN_num_bytes(bn)
            bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
            bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
            # A zero length means the BN has value 0
            self.openssl_assert(bin_len >= 0)
            return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
        else:
            # Under Python 2 the best we can do is hex()
            hex_cdata = self._lib.BN_bn2hex(bn)
            self.openssl_assert(hex_cdata != self._ffi.NULL)
            hex_str = self._ffi.string(hex_cdata)
            self._lib.OPENSSL_free(hex_cdata)
            return int(hex_str, 16)
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def native(s):
    """
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
        :py:class:`unicode`.
    """
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
    else:
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def _bn_to_int(self, bn):
        assert bn != self._ffi.NULL
        if six.PY3:
            # Python 3 has constant time from_bytes, so use that.
            bn_num_bytes = self._lib.BN_num_bytes(bn)
            bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
            bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
            # A zero length means the BN has value 0
            self.openssl_assert(bin_len >= 0)
            return int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
        else:
            # Under Python 2 the best we can do is hex()
            hex_cdata = self._lib.BN_bn2hex(bn)
            self.openssl_assert(hex_cdata != self._ffi.NULL)
            hex_str = self._ffi.string(hex_cdata)
            self._lib.OPENSSL_free(hex_cdata)
            return int(hex_str, 16)
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def test_stack_output(self):
        self.assertEqual(u'<pre>foo</pre>', mappings.stack_output('foo'))
        self.assertEqual(u'', mappings.stack_output(None))

        outputs = ['one', 'two', 'three']
        # On Python 3, the pretty JSON output doesn't add space before newline
        if six.PY3:
            expected_text = """[\n  "one",\n  "two",\n  "three"\n]"""
        else:
            expected_text = """[\n  "one", \n  "two", \n  "three"\n]"""

        self.assertEqual(u'<pre>%s</pre>' % html.escape(expected_text),
                         mappings.stack_output(outputs))

        outputs = {'foo': 'bar'}
        expected_text = """{\n  "foo": "bar"\n}"""
        self.assertEqual(u'<pre>%s</pre>' % html.escape(expected_text),
                         mappings.stack_output(outputs))

        self.assertEqual(
            u'<a href="http://www.example.com/foo" target="_blank">'
            'http://www.example.com/foo</a>',
            mappings.stack_output('http://www.example.com/foo'))
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def test_admin_metadata_defs_create_namespace_invalid_json_post_raw(self):
        form_data = {
            'source_type': 'raw',
            'direct_input': 'invalidjson'
        }

        res = self.client.post(reverse(constants.METADATA_CREATE_URL),
                               form_data)

        if six.PY3:
            err_msg = ('There was a problem loading the namespace: '
                       'Expecting value: line 1 column 1 (char 0).')
        else:
            err_msg = ('There was a problem loading the namespace: '
                       'No JSON object could be decoded.')
        self.assertFormError(res, "form", None, [err_msg])
项目:workflows.kyoyue    作者:wizyoung    | 项目源码 | 文件源码
def write(self, buffer):
        if self.mode == MODE_NUMBER:
            for i in xrange(0, len(self.data), 3):
                chars = self.data[i:i + 3]
                bit_length = NUMBER_LENGTH[len(chars)]
                buffer.put(int(chars), bit_length)
        elif self.mode == MODE_ALPHA_NUM:
            for i in xrange(0, len(self.data), 2):
                chars = self.data[i:i + 2]
                if len(chars) > 1:
                    buffer.put(
                        ALPHA_NUM.find(chars[0]) * 45 +
                        ALPHA_NUM.find(chars[1]), 11)
                else:
                    buffer.put(ALPHA_NUM.find(chars), 6)
        else:
            if six.PY3:
                # Iterating a bytestring in Python 3 returns an integer,
                # no need to ord().
                data = self.data
            else:
                data = [ord(c) for c in self.data]
            for c in data:
                buffer.put(c, 8)
项目:quantrocket-client    作者:quantrocket-llc    | 项目源码 | 文件源码
def write_response_to_filepath_or_buffer(filepath_or_buffer, response):
    """
    Writes the response content to the filepath or buffer.
    """
    if hasattr(filepath_or_buffer, "write"):
        if six.PY3 and filepath_or_buffer is sys.stdout:
            # Write bytes to stdout (https://stackoverflow.com/a/23932488)
            filepath_or_buffer = filepath_or_buffer.buffer
        mode = getattr(filepath_or_buffer, "mode", "w")
        for chunk in response.iter_content(chunk_size=1024):
            if chunk:
                if "b" not in mode and six.PY3:
                    chunk = chunk.decode("utf-8")
                filepath_or_buffer.write(chunk)
        if filepath_or_buffer.seekable():
            filepath_or_buffer.seek(0)
    else:
        with open(filepath_or_buffer, "wb") as f:
            for chunk in response.iter_content(chunk_size=1024):
                if chunk:
                    f.write(chunk)
项目:txamqp    作者:txamqp    | 项目源码 | 文件源码
def read_content(queue):
    frame = yield queue.get()
    header = frame.payload
    children = []
    for i in range(header.weight):
        content = yield read_content(queue)
        children.append(content)
    size = header.size
    read = 0
    buf = six.StringIO()
    while read < size:
        body = yield queue.get()
        content = body.payload.content

        # if this is the first instance of real binary content, convert the string buffer to BytesIO
        # Not a nice fix but it preserves the original behaviour
        if six.PY3 and isinstance(content, bytes) and isinstance(buf, six.StringIO):
            buf = six.BytesIO()

        buf.write(content)
        read += len(content)
    defer.returnValue(Content(buf.getvalue(), children, header.properties.copy()))
项目:auger    作者:laffra    | 项目源码 | 文件源码
def _instance_callable(obj):
    """Given an object, return True if the object is callable.
    For classes, return True if instances would be callable."""
    if not isinstance(obj, ClassTypes):
        # already an instance
        return getattr(obj, '__call__', None) is not None

    if six.PY3:
        # *could* be broken by a class overriding __mro__ or __dict__ via
        # a metaclass
        for base in (obj,) + obj.__mro__:
            if base.__dict__.get('__call__') is not None:
                return True
    else:
        klass = obj
        # uses __bases__ instead of __mro__ so that we work with old style classes
        if klass.__dict__.get('__call__') is not None:
            return True

        for base in klass.__bases__:
            if _instance_callable(base):
                return True
    return False
项目:purelove    作者:hucmosin    | 项目源码 | 文件源码
def query_params(self, value=None):
        """
        Return or set a dictionary of query params

        :param dict value: new dictionary of values
        """
        if value is not None:
            return URL._mutate(self, query=unicode_urlencode(value, doseq=True))
        query = '' if self._tuple.query is None else self._tuple.query

        # In Python 2.6, urlparse needs a bytestring so we encode and then
        # decode the result.
        if not six.PY3:
            result = parse_qs(to_utf8(query), True)
            return dict_to_unicode(result)

        return parse_qs(query, True)
项目:panko    作者:openstack    | 项目源码 | 文件源码
def run_with(*drivers):
    """Used to mark tests that are only applicable for certain db driver.

    Skips test if driver is not available.
    """
    def decorator(test):
        if isinstance(test, type) and issubclass(test, TestBase):
            # Decorate all test methods
            for attr in dir(test):
                value = getattr(test, attr)
                if callable(value) and attr.startswith('test_'):
                    if six.PY3:
                        value._run_with = drivers
                    else:
                        value.__func__._run_with = drivers
        else:
            test._run_with = drivers
        return test
    return decorator
项目:panko    作者:openstack    | 项目源码 | 文件源码
def _test_run_expirer_ttl_enabled(self, ttl_name, data_name):
        content = ("[database]\n"
                   "%s=1\n"
                   "connection=log://localhost\n" % ttl_name)
        if six.PY3:
            content = content.encode('utf-8')
        self.tempfile = fileutils.write_to_tempfile(content=content,
                                                    prefix='panko',
                                                    suffix='.conf')
        subp = subprocess.Popen(['panko-expirer',
                                 '-d',
                                 "--config-file=%s" % self.tempfile],
                                stdout=subprocess.PIPE)
        out, __ = subp.communicate()
        self.assertEqual(0, subp.poll())
        msg = "Dropping %s data with TTL 1" % data_name
        if six.PY3:
            msg = msg.encode('utf-8')
        self.assertIn(msg, out)
项目:cleverhans    作者:tensorflow    | 项目源码 | 文件源码
def _verify_docker_image_size(self, image_name):
    """Verifies size of Docker image.

    Args:
      image_name: name of the Docker image.

    Returns:
      True if image size is withing the limits, False otherwise.
    """
    shell_call(['docker', 'pull', image_name])
    try:
      image_size = subprocess.check_output(
          ['docker', 'inspect', '--format={{.Size}}', image_name]).strip()
      image_size = int(image_size) if PY3 else long(image_size)
    except (ValueError, subprocess.CalledProcessError) as e:
      logging.error('Failed to determine docker image size: %s', e)
      return False
    logging.info('Size of docker image %s is %d', image_name, image_size)
    if image_size > MAX_DOCKER_IMAGE_SIZE:
      logging.error('Image size exceeds limit %d', MAX_DOCKER_IMAGE_SIZE)
    return image_size <= MAX_DOCKER_IMAGE_SIZE
项目:tvlinker    作者:ozmartian    | 项目源码 | 文件源码
def sort(cmpfn):
        if not this.Class in ('Array', 'Arguments'):
            return this.to_object() # do nothing
        arr = []
        for i in xrange(len(this)):
            arr.append(this.get(six.text_type(i)))

        if not arr:
            return this
        if not cmpfn.is_callable():
            cmpfn = None
        cmp = lambda a,b: sort_compare(a, b, cmpfn)
        if six.PY3:
            key = functools.cmp_to_key(cmp)
            arr.sort(key=key)
        else:
            arr.sort(cmp=cmp)
        for i in xrange(len(arr)):
            this.put(six.text_type(i), arr[i])

        return this
项目:Texty    作者:sarthfrey    | 项目源码 | 文件源码
def compute_signature(self, uri, params, utf=PY3):
        """Compute the signature for a given request

        :param uri: full URI that Twilio requested on your server
        :param params: post vars that Twilio sent with the request
        :param utf: whether return should be bytestring or unicode (python3)

        :returns: The computed signature
        """
        s = uri
        if len(params) > 0:
            for k, v in sorted(params.items()):
                s += k + v

        # compute signature and compare signatures
        mac = hmac.new(self.token, s.encode("utf-8"), sha1)
        computed = base64.b64encode(mac.digest())
        if utf:
            computed = computed.decode('utf-8')

        return computed.strip()
项目:opentracing-python-instrumentation    作者:uber-common    | 项目源码 | 文件源码
def install_hooks(request):
    urllibver = request.getfixturevalue('urllibver')

    if urllibver == 'urllib2':
        if six.PY3:
            yield None
            return
        module = urllib2
    else:
        module = urllib.request

    old_opener = module._opener
    old_callee_headers = CONFIG.callee_name_headers
    old_endpoint_headers = CONFIG.callee_endpoint_headers

    urllib2_hooks.install_patches.__original_func()
    CONFIG.callee_name_headers = ['Remote-Loc']
    CONFIG.callee_endpoint_headers = ['Remote-Op']

    yield module

    module.install_opener(old_opener)
    CONFIG.callee_name_headers = old_callee_headers
    CONFIG.callee_endpoint_headers = old_endpoint_headers
项目:python-bileanclient    作者:openstack    | 项目源码 | 文件源码
def __init__(self, data):
        super(TestResponse, self).__init__()
        self._content_consumed = True
        if isinstance(data, dict):
            self.status_code = data.get('status_code', 200)
            # Fake the text attribute to streamline Response creation
            text = data.get('text', "")
            if isinstance(text, (dict, list)):
                self._content = json.dumps(text)
                default_headers = {
                    "Content-Type": "application/json",
                }
            else:
                self._content = text
                default_headers = {}
            if six.PY3 and isinstance(self._content, six.string_types):
                self._content = self._content.encode('utf-8', 'strict')
            self.headers = data.get('headers') or default_headers
        else:
            self.status_code = data
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def testLongIntegers(self):
        if not PY3:  # There is no longs in python3
            self.assertEqual(list(rrule(MINUTELY,
                                  count=long(2),
                                  interval=long(2),
                                  bymonth=long(2),
                                  byweekday=long(3),
                                  byhour=long(6),
                                  byminute=long(6),
                                  bysecond=long(6),
                                  dtstart=datetime(1997, 9, 2, 9, 0))),
                             [datetime(1998, 2, 5, 6, 6, 6),
                              datetime(1998, 2, 12, 6, 6, 6)])
            self.assertEqual(list(rrule(YEARLY,
                                  count=long(2),
                                  bymonthday=long(5),
                                  byweekno=long(2),
                                  dtstart=datetime(1997, 9, 2, 9, 0))),
                             [datetime(1998, 1, 5, 9, 0),
                              datetime(2004, 1, 5, 9, 0)])
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def testToStrLongIntegers(self):
        if not PY3:  # There is no longs in python3
            self._rrulestr_reverse_test(rrule(MINUTELY,
                                  count=long(2),
                                  interval=long(2),
                                  bymonth=long(2),
                                  byweekday=long(3),
                                  byhour=long(6),
                                  byminute=long(6),
                                  bysecond=long(6),
                                  dtstart=datetime(1997, 9, 2, 9, 0)))

            self._rrulestr_reverse_test(rrule(YEARLY,
                                  count=long(2),
                                  bymonthday=long(5),
                                  byweekno=long(2),
                                  dtstart=datetime(1997, 9, 2, 9, 0)))
项目:Dshield    作者:ywjt    | 项目源码 | 文件源码
def tzname_in_python2(namefunc):
    """Change unicode output into bytestrings in Python 2

    tzname() API changed in Python 3. It used to return bytes, but was changed
    to unicode strings
    """
    def adjust_encoding(*args, **kwargs):
        name = namefunc(*args, **kwargs)
        if name is not None and not PY3:
            name = name.encode()

        return name

    return adjust_encoding


# The following is adapted from Alexander Belopolsky's tz library
# https://github.com/abalkin/tz
项目:zun    作者:openstack    | 项目源码 | 文件源码
def get_id(source_uuid):
    """Derive a short (12 character) id from a random UUID.

    The supplied UUID must be a version 4 UUID object.
    """

    if isinstance(source_uuid, six.string_types):
        source_uuid = uuid.UUID(source_uuid)
    if source_uuid.version != 4:
        raise ValueError(_('Invalid UUID version (%d)') % source_uuid.version)

    # The "time" field of a v4 UUID contains 60 random bits
    # (see RFC4122, Section 4.4)
    random_bytes = _to_byte_string(source_uuid.time, 60)
    # The first 12 bytes (= 60 bits) of base32-encoded output is our data
    encoded = base64.b32encode(six.b(random_bytes))[:12]

    if six.PY3:
        return encoded.lower().decode('utf-8')
    else:
        return encoded.lower()