Python six 模块,binary_type() 实例源码


项目:DropboxConnect    作者:raguay    | 项目源码 | 文件源码
def _params_to_urlencoded(params):
    Returns a application/x-www-form-urlencoded ``str`` representing the
    key/value pairs in ``params``.

    Keys are values are ``str()``'d before calling ``urllib.urlencode``, with
    the exception of unicode objects which are utf8-encoded.
    def encode(o):
        if isinstance(o, six.binary_type):
            return o
            if isinstance(o, six.text_type):
                return o.encode('utf-8')
                return str(o).encode('utf-8')

    utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)}
    return url_encode(utf8_params)
项目:placebo    作者:huseyinyilmaz    | 项目源码 | 文件源码
def test_last_request(self, mock):
        response =,
                                 headers={'custom-header': 'huseyin'},
                                 data={'name': 'huseyin'})
        self.assertEqual(response.status_code, 200)
        last_request = mock.last_request
        # Test if last request has expected values.
        self.assertEqual(last_request.url, GetMock.url)

        body = last_request.body
        # In python 3 httpretty backend returns binary string for body.
        # So we are decoding it back to unicode to test.
        if isinstance(body, six.binary_type):
            body = body.decode('utf-8')
        self.assertEqual(body, 'name=huseyin')
        # Make sure that class's last_request is same as instances.
        self.assertIs(GetMock.last_request, mock.last_request)
项目:database_assetstore    作者:OpenGeoscience    | 项目源码 | 文件源码
def csv_safe_unicode(row, ignoreErrors=True):
    Given an array of values, make sure all strings are unicode in Python 3 and
    str in Python 2.  The csv.writer in Python 2 does not handle unicode
    strings and in Python 3 it does not handle byte strings.

    :param row: an array which could contain mixed types.
    :param ignoreErrors: if True, convert what is possible and ignore errors.
        If false, conversion errors will raise an exception.
    :returns: either the original array if safe, or an array with all byte
        strings converted to unicode in Python 3 or str in Python 2.
    # This is unicode in Python 2 and bytes in Python 3
    wrong_type = six.text_type if str == six.binary_type else six.binary_type
    # If all of the data is not the wrong type, just return it as is.  This
    # allows non-string types, such as integers and floats.
    if not any(isinstance(value, wrong_type) for value in row):
        return row
    # Convert the wrong type of string to the type needed for this version of
    # Python.  For Python 2 unicode gets encoded to str (bytes).  For Python 3
    # bytes get decoded to str (unicode).
    func = 'encode' if str == six.binary_type else 'decode'
    row = [getattr(value, func)('utf8', 'ignore' if ignoreErrors else 'strict')
           if isinstance(value, wrong_type) else value for value in row]
    return row
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _from_bytes(value):
    """Converts bytes to a string value, if necessary.

        value: The string/bytes value to be converted.

        The original value converted to unicode (if bytes) or as passed in
        if it started out as unicode.

        ValueError if the value could not be converted to unicode.
    result = (value.decode('utf-8')
              if isinstance(value, six.binary_type) else value)
    if isinstance(result, six.text_type):
        return result
        raise ValueError('%r could not be converted to unicode' % (value,))
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def clean_headers(headers):
    """Forces header keys and values to be strings, i.e not unicode.

    The httplib module just concats the header keys and values in a way that
    may make the message header a unicode string, which, if it then tries to
    contatenate to a binary request body may result in a unicode decode error.

        headers: dict, A dictionary of headers.

        The same dictionary but with all the keys converted to strings.
    clean = {}
        for k, v in six.iteritems(headers):
            if not isinstance(k, six.binary_type):
                k = str(k)
            if not isinstance(v, six.binary_type):
                v = str(v)
            clean[_to_bytes(k)] = _to_bytes(v)
    except UnicodeEncodeError:
        raise NonAsciiHeaderError(k, ': ', v)
    return clean
项目:shub-image    作者:scrapinghub    | 项目源码 | 文件源码
def list_cmd(image_name, project, endpoint, apikey):
    """Short version of list cmd to use with deploy cmd."""

    settings = {}
    if project:
        settings = _get_project_settings(project, endpoint, apikey)

    # Run a local docker container to run list-spiders cmd
    status_code, logs = _run_list_cmd(project, image_name, settings)
    if status_code != 0:
        raise shub_exceptions.ShubException(
            'Container with list cmd exited with code %s' % status_code)

    spiders = utils.valid_spiders(
        logs.decode('utf-8') if isinstance(logs, binary_type) else logs)
    return spiders
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def make_header_prefix(self, message_class, version=Connection.protocol_version, stream_id=0):
        if Connection.protocol_version < 3:
            return six.binary_type().join(map(uint8_pack, [
                0xff & (HEADER_DIRECTION_TO_CLIENT | version),
                0,  # flags (compression)
                message_class.opcode  # opcode
            return six.binary_type().join(map(uint8_pack, [
                0xff & (HEADER_DIRECTION_TO_CLIENT | version),
                0,  # flags (compression)
                0,  # MSB for v3+ stream
                message_class.opcode  # opcode
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_partial_header_read(self, *args):
        c = self.make_connection()

        header = self.make_header_prefix(SupportedMessage)
        options = self.make_options_body()
        message = self.make_msg(header, options)

        # read in the first byte
        c._socket.recv.return_value = message[0:1]
        c.handle_read(None, 0)
        self.assertEqual(c._iobuf.getvalue(), message[0:1])

        c._socket.recv.return_value = message[1:]
        c.handle_read(None, 0)
        self.assertEqual(six.binary_type(), c._iobuf.getvalue())

        # let it write out a StartupMessage
        c.handle_write(None, 0)

        header = self.make_header_prefix(ReadyMessage, stream_id=1)
        c._socket.recv.return_value = self.make_msg(header)
        c.handle_read(None, 0)

项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_partial_header_read(self, *args):
        c = self.make_connection()

        header = self.make_header_prefix(SupportedMessage)
        options = self.make_options_body()
        message = self.make_msg(header, options)

        c.socket.recv.return_value = message[0:1]
        self.assertEqual(c._iobuf.getvalue(), message[0:1])

        c.socket.recv.return_value = message[1:]
        self.assertEqual(six.binary_type(), c._iobuf.getvalue())

        # let it write out a StartupMessage

        header = self.make_header_prefix(ReadyMessage, stream_id=1)
        c.socket.recv.return_value = self.make_msg(header)

项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def varint_pack(big):
    pos = True
    if big == 0:
        return b'\x00'
    if big < 0:
        bytelength = bit_length(abs(big) - 1) // 8 + 1
        big = (1 << bytelength * 8) + big
        pos = False
    revbytes = bytearray()
    while big > 0:
        revbytes.append(big & 0xff)
        big >>= 8
    if pos and revbytes[-1] & 0x80:
    return six.binary_type(revbytes)
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def _params_to_urlencoded(params):
    Returns a application/x-www-form-urlencoded ``str`` representing the
    key/value pairs in ``params``.

    Keys are values are ``str()``'d before calling ``urllib.urlencode``, with
    the exception of unicode objects which are utf8-encoded.
    def encode(o):
        if isinstance(o, six.binary_type):
            return o
            if isinstance(o, six.text_type):
                return o.encode('utf-8')
                return str(o).encode('utf-8')

    utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)}
    return url_encode(utf8_params)
项目:drf-swagger-extras    作者:ssaavedra    | 项目源码 | 文件源码
def parse_schema(schema):
    if type(schema) is six.binary_type or type(schema) is six.text_type:
        return {
            'type': schema,
    elif type(schema) is list:
        return {
            'type': 'list',
    elif type(schema) is dict:
        title = schema.get(':title', None)
        required_elts = list(get_required(schema.keys()))
        properties = get_object_properties(schema)
        return {
            'type': 'object',
            'title': title,
            'properties': {
                prop_name: parse_schema(subschema)
                for prop_name, subschema in properties.items()
            'required': required_elts,
        raise Exception('Unsupported schema definition')
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def native(s):
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def native(s):
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def get_template_files(template_data=None, template_url=None):
    if template_data:
        tpl = template_data
    elif template_url:
        with contextlib.closing(request.urlopen(template_url)) as u:
            tpl =
        return {}, None
    if not tpl:
        return {}, None
    if isinstance(tpl, six.binary_type):
        tpl = tpl.decode('utf-8')
    template = template_format.parse(tpl)
    files = {}
    _get_file_contents(template, files)
    return files, template
项目:gixy    作者:yandex    | 项目源码 | 文件源码
def parse(self, data):
        Returns the parsed tree.
        if isinstance(data, six.binary_type):
            if data[:3] == codecs.BOM_UTF8:
                encoding = 'utf-8-sig'
                encoding = 'latin1'
            content = data.decode(encoding).strip()
            content = data.strip()

        if not content:
            return ParseResults()

        return self.script.parseString(content, parseAll=True)
项目:python-pankoclient    作者:openstack    | 项目源码 | 文件源码
def convert_into_with_meta(self, item, resp):
        if isinstance(item, six.string_types):
            if six.PY2 and isinstance(item, six.text_type):
                return UnicodeWithMeta(item, resp)
                return StrWithMeta(item, resp)
        elif isinstance(item, six.binary_type):
            return BytesWithMeta(item, resp)
        elif isinstance(item, list):
            return ListWithMeta(item, resp)
        elif isinstance(item, tuple):
            return TupleWithMeta(item, resp)
        elif item is None:
            return TupleWithMeta((), resp)
            return DictWithMeta(item, resp)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _from_bytes(value):
    """Converts bytes to a string value, if necessary.

        value: The string/bytes value to be converted.

        The original value converted to unicode (if bytes) or as passed in
        if it started out as unicode.

        ValueError if the value could not be converted to unicode.
    result = (value.decode('utf-8')
              if isinstance(value, six.binary_type) else value)
    if isinstance(result, six.text_type):
        return result
        raise ValueError(
            '{0!r} could not be converted to unicode'.format(value))
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def clean_headers(headers):
    """Forces header keys and values to be strings, i.e not unicode.

    The httplib module just concats the header keys and values in a way that
    may make the message header a unicode string, which, if it then tries to
    contatenate to a binary request body may result in a unicode decode error.

        headers: dict, A dictionary of headers.

        The same dictionary but with all the keys converted to strings.
    clean = {}
        for k, v in six.iteritems(headers):
            if not isinstance(k, six.binary_type):
                k = str(k)
            if not isinstance(v, six.binary_type):
                v = str(v)
            clean[_helpers._to_bytes(k)] = _helpers._to_bytes(v)
    except UnicodeEncodeError:
        from oauth2client.client import NonAsciiHeaderError
        raise NonAsciiHeaderError(k, ': ', v)
    return clean
项目:panko    作者:openstack    | 项目源码 | 文件源码
def decode_unicode(input):
    """Decode the unicode of the message, and encode it into utf-8."""
    if isinstance(input, dict):
        temp = {}
        # If the input data is a dict, create an equivalent dict with a
        # predictable insertion order to avoid inconsistencies in the
        # message signature computation for equivalent payloads modulo
        # ordering
        for key, value in sorted(six.iteritems(input)):
            temp[decode_unicode(key)] = decode_unicode(value)
        return temp
    elif isinstance(input, (tuple, list)):
        # When doing a pair of JSON encode/decode operations to the tuple,
        # the tuple would become list. So we have to generate the value as
        # list here.
        return [decode_unicode(element) for element in input]
    elif six.PY2 and isinstance(input, six.text_type):
        return input.encode('utf-8')
    elif six.PY3 and isinstance(input, six.binary_type):
        return input.decode('utf-8')
        return input
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def replace_vars(data, repl_vars):
    """Return the given data structure with appropriate variables replaced.

        data (Any): Data of an arbitrary type.

        Any: Data of the same time, possibly with replaced values.
    if isinstance(data, (six.text_type, six.binary_type)):
        for k, v in repl_vars.items():
            data = data.replace('${' + k + '}', v)
        return data
    if isinstance(data, collections.Sequence):
        return type(data)([replace_vars(d, repl_vars) for d in data])
    if isinstance(data, collections.Mapping):
        return type(data)([(k, replace_vars(v, repl_vars))
                           for k, v in data.items()])
    return data
项目:ChemDataExtractor    作者:mcs07    | 项目源码 | 文件源码
def clean_markup(self, markup, parser=None):
        """Apply ``Cleaner`` to markup string or document and return a cleaned string or document."""
        result_type = type(markup)
        if isinstance(markup, six.string_types):
            doc = fromstring(markup, parser=parser)
            doc = copy.deepcopy(markup)
        if issubclass(result_type, six.binary_type):
            return tostring(doc, encoding='utf-8')
        elif issubclass(result_type, six.text_type):
            return tostring(doc, encoding='unicode')
            return doc

#: A default Cleaner instance, which kills comments, processing instructions, script tags, style tags.
项目:ChemDataExtractor    作者:mcs07    | 项目源码 | 文件源码
def __init__(self, *elements):
        """Initialize a Document manually by passing one or more Document elements (Paragraph, Heading, Table, etc.)

        Strings that are passed to this constructor are automatically wrapped into Paragraph elements.

        :param list[chemdataextractor.doc.element.BaseElement|string] elements: Elements in this Document.
        self._elements = []
        for element in elements:
            # Convert raw text to Paragraph elements
            if isinstance(element, six.text_type):
                element = Paragraph(element)
            elif isinstance(element, six.binary_type):
                # Try guess encoding if byte string
                encoding = get_encoding(element)
                log.warning('Guessed bytestring encoding as %s. Use unicode strings to avoid this warning.', encoding)
                element = Paragraph(element.decode(encoding))
            element.document = self
        log.debug('%s: Initializing with %s elements' % (self.__class__.__name__, len(self.elements)))
项目:OpenStackClient_VBS    作者:Huawei    | 项目源码 | 文件源码
def mixin_meta(item, resp):
        if isinstance(item, six.string_types):
            if six.PY2 and isinstance(item, six.text_type):
                return resource.UnicodeWithMeta(item, resp)
                return resource.StrWithMeta(item, resp)
        elif isinstance(item, six.binary_type):
            return resource.BytesWithMeta(item, resp)
        elif isinstance(item, list):
            return resource.ListWithMeta(item, resp)
        elif isinstance(item, tuple):
            return resource.TupleWithMeta(item, resp)
        elif item is None:
            return resource.TupleWithMeta((), resp)
            return resource.DictWithMeta(item, resp)
项目:tomato    作者:sertansenturk    | 项目源码 | 文件源码
def _call_audio_metadata(self, audio_meta, filepath):
        if audio_meta is False:  # metadata crawling is disabled
            audio_meta = None
        elif audio_meta is None:  # no MBID is given, attempt to get
            # it from id3 tag
            audio_meta = self.crawl_musicbrainz_metadata(filepath)
        elif isinstance(audio_meta, (six.string_types, six.binary_type)):
            # MBID is given
            audio_meta = self.crawl_musicbrainz_metadata(audio_meta)
        elif not isinstance(audio_meta, dict):
            warn_str = 'The "metadata" input can be "False" (skipped), ' \
                       '"basestring" (MBID input), "None" (attempt to get ' \
                       'the MBID from audio file tags) or "dict" (already ' \
            warnings.warn(warn_str, stacklevel=2)
        return audio_meta
项目:storj-python-sdk    作者:Storj    | 项目源码 | 文件源码
def _assert_shard_manager(self, content, mode, mock_tree, mock_challenges):
        with tempfile.NamedTemporaryFile(mode, delete=False) as tmp_file:

            nchallenges = 2
            sm = ShardManager(

            assert sm.filepath ==
            assert len(sm.shards) > 0

        if isinstance(content, six.binary_type):
                mock_challenges.return_value, content)
                mock_challenges.return_value, bytes(content.encode('utf-8')))

项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def b64decode(data):
    """JOSE Base64 decode.

    :param data: Base64 string to be decoded. If it's unicode, then
                 only ASCII characters are allowed.
    :type data: `bytes` or `unicode`

    :returns: Decoded data.
    :rtype: bytes

    :raises TypeError: if input is of incorrect type
    :raises ValueError: if input is unicode with non-ASCII characters

    if isinstance(data, six.string_types):
            data = data.encode('ascii')
        except UnicodeEncodeError:
            raise ValueError(
                'unicode argument should contain only ASCII characters')
    elif not isinstance(data, six.binary_type):
        raise TypeError('argument should be a str or unicode')

    return base64.urlsafe_b64decode(data + b'=' * (4 - (len(data) % 4)))
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def certificate(self, cert, name, alt_host=None, port=443):
        """Verifies the certificate presented at name is cert"""
        if alt_host is None:
            host = socket.gethostbyname(name)
        elif isinstance(alt_host, six.binary_type):
            host = alt_host
            host = alt_host.encode()
        name = name if isinstance(name, six.binary_type) else name.encode()

            presented_cert = crypto_util.probe_sni(name, host, port)
        except acme_errors.Error as error:
            return False

        return presented_cert.digest("sha256") == cert.digest("sha256")
项目:warcio    作者:webrecorder    | 项目源码 | 文件源码
def headers_to_str_headers(headers):
    Converts dict or tuple-based headers of bytes or str to
    tuple-based headers of str, which is the python norm (pep 3333)
    ret = []

    if isinstance(headers, collections_abc.Mapping):
        h = headers.items()
        h = headers

    if six.PY2:  #pragma: no cover
        return h

    for tup in h:
        k, v = tup
        if isinstance(k, six.binary_type):
            k = k.decode('iso-8859-1')
        if isinstance(v, six.binary_type):
            v = v.decode('iso-8859-1')
        ret.append((k, v))
    return ret
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def native(s):
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
项目:txaio-etcd    作者:crossbario    | 项目源码 | 文件源码
def __validate(self):
        if type(self._key) == six.binary_type:
            if self._range_end:
                self._key = KeySet(self._key, range_end=self._range_end)
                self._key = KeySet(self._key)
        elif isinstance(self._key, KeySet):
            raise TypeError(
                'key must either be bytes or a KeySet object, not {}'.format(type(self._key)))

        if self._key.type == KeySet._SINGLE:
            self._range_end = None
        elif self._key.type == KeySet._PREFIX:
            self._range_end = _increment_last_byte(self._key.key)
        elif self._key.type == KeySet._RANGE:
            self._range_end = self._key.range_end
            raise Exception('logic error')
项目:txaio-etcd    作者:crossbario    | 项目源码 | 文件源码
def __validate(self):
        if type(self._key) == six.binary_type:
            self._key = KeySet(self._key)
        elif isinstance(self._key, KeySet):
            raise TypeError(
                'key must either be bytes or a KeySet object, not {}'.format(type(self._key)))

        if self._return_previous is not None and type(self._return_previous) != bool:
            raise TypeError('return_previous must be bool, not {}'.format(

        if self._key.type == KeySet._SINGLE:
            self._range_end = None
        elif self._key.type == KeySet._PREFIX:
            self._range_end = _increment_last_byte(self._key.key)
        elif self._key.type == KeySet._RANGE:
            self._range_end = self._key.range_end
            raise Exception('logic error')
项目:txaio-etcd    作者:crossbario    | 项目源码 | 文件源码
def __init__(self, key, return_previous=None):

        :param key: The key or key set to delete.
        :type key: bytes or :class:`txaioetcd.KeySet`

        :param return_previous: If enabled, return the deleted key-value pairs
        :type return_previous: bool or None

        if key is not None and type(key) != six.binary_type and not isinstance(key, KeySet):
            raise TypeError('key must be bytes or KeySet, not {}'.format(type(key)))

        if isinstance(key, KeySet):
            self.key = key
            self.key = KeySet(key)

        if return_previous is not None and type(return_previous) != bool:
            raise TypeError('return_previous must be bool, not {}'.format(type(return_previous)))

        self.return_previous = return_previous
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def _apply(self, value):
        value = super(Array, self)._apply(value) # type: Sequence

        if self._has_errors:
            return None

        if isinstance(value, (binary_type, text_type)):
            return self._invalid_value(
                value   = value,
                reason  = self.CODE_WRONG_TYPE,

                template_vars = {
                    'incoming': self.get_type_name(type(value)),
                    'allowed':  self.get_allowed_type_names(),

        return value
项目:MMdnn    作者:Microsoft    | 项目源码 | 文件源码
def assign_attr_value(attr, val):
    from mmdnn.conversion.common.IR.graph_pb2 import TensorShape
    '''Assign value to AttrValue proto according to data type.'''
    if isinstance(val, bool):
        attr.b = val
    elif isinstance(val, integer_types):
        attr.i = val
    elif isinstance(val, float):
        attr.f = val
    elif isinstance(val, binary_type) or isinstance(val, text_type):
        if hasattr(val, 'encode'):
            val = val.encode()
        attr.s = val
    elif isinstance(val, TensorShape):
    elif isinstance(val, list):
        if not val: return
        if isinstance(val[0], integer_types):
        elif isinstance(val[0], TensorShape):
            raise NotImplementedError('AttrValue cannot be of list[{}].'.format(val[0]))
        raise NotImplementedError('AttrValue cannot be of %s' % type(val))
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def write_np_values(values, f):
        values: {str: np.array}
        f: filename or filelike object
    with ZipFile(f, 'w') as zf:
        for k, v in values.items():
            # Need to do this because Python zipfile has some odd support for filenames:
            if len(k) == 16 and isinstance(k, six.binary_type):  # valid UUID bytes
                zf.writestr(str(uuid.UUID(bytes=k)), v.tostring())
                zf.writestr(six.u(k), v.tostring())

        zf.writestr(MANIFEST_FILENAME, json_dumps_manifest(values))
项目:ion-python    作者:amzn    | 项目源码 | 文件源码
def generate_scalars_binary(scalars_map, preceding_symbols=0):
    for ion_type, values in six.iteritems(scalars_map):
        for native, expected in values:
            native_expected = expected
            has_symbols = False
            if native is None:
                # An un-adorned 'None' doesn't contain enough information to determine its Ion type
                native_expected = b'\x0f'
            elif ion_type is IonType.CLOB:
                # All six.binary_type are treated as BLOBs unless wrapped by an _IonNature
                tid = six.byte2int(expected) + 0x10  # increment upper nibble for clob -> blob; keep lower nibble
                native_expected = bytearray([tid]) + expected[1:]
            elif ion_type is IonType.SYMBOL and native is not None:
                has_symbols = True
            elif ion_type is IonType.STRING:
                # Encode all strings as symbols too.
                symbol_expected = _serialize_symbol(
                    IonEvent(IonEventType.SCALAR, IonType.SYMBOL, SymbolToken(None, 10 + preceding_symbols)))
                yield _Parameter( + ' ' + native,
                                 IonPyText.from_value(IonType.SYMBOL, native), symbol_expected, True)
            yield _Parameter('%s %s' % (, native), native, native_expected, has_symbols)
            wrapper = _FROM_ION_TYPE[ion_type].from_value(ion_type, native)
            yield _Parameter(repr(wrapper), wrapper, expected, has_symbols)
项目:aliyun-log-python-sdk    作者:aliyun    | 项目源码 | 文件源码
def convert_unicode_to_str(data):
        Py2, always translate to utf8 from unicode
        Py3, always translate to unicode
        :param data:
        if six.PY2 and isinstance(data, six.text_type):
            return data.encode('utf8')
        elif six.PY3 and isinstance(data, six.binary_type):
            return data.decode('utf8')
        elif isinstance(data, collections.Mapping):
            return dict((Util.convert_unicode_to_str(k), Util.convert_unicode_to_str(v))
                        for k, v in six.iteritems(data))
        elif isinstance(data, collections.Iterable) and not isinstance(data, (six.binary_type, six.string_types)):
            return type(data)(map(Util.convert_unicode_to_str, data))

        return data
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def native(s):
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
项目:deb-oslo.versionedobjects    作者:openstack    | 项目源码 | 文件源码
def test_get_fingerprint(self):
        # Make sure _get_fingerprint() generates a consistent fingerprint
        MyObject.VERSION = '1.1'
        argspec = 'vulpix'

        with mock.patch('inspect.getargspec') as mock_gas:
            mock_gas.return_value = argspec
            fp = self.ovc._get_fingerprint(MyObject.__name__)

        exp_fields = sorted(list(MyObject.fields.items()))
        exp_methods = sorted([('remotable_method', argspec),
                              ('remotable_classmethod', argspec)])
        expected_relevant_data = (exp_fields, exp_methods)
        expected_hash = hashlib.md5(six.binary_type(repr(
        expected_fp = '%s-%s' % (MyObject.VERSION, expected_hash)

        self.assertEqual(expected_fp, fp, "_get_fingerprint() did not "
                                          "generate a correct fingerprint.")
项目:deb-oslo.versionedobjects    作者:openstack    | 项目源码 | 文件源码
def test_get_fingerprint_with_child_versions(self):
        # Make sure _get_fingerprint() generates a consistent fingerprint
        # when child_versions are present
        child_versions = {'1.0': '1.0', '1.1': '1.1'}
        MyObject.VERSION = '1.1'
        MyObject.child_versions = child_versions
        argspec = 'onix'

        with mock.patch('inspect.getargspec') as mock_gas:
            mock_gas.return_value = argspec
            fp = self.ovc._get_fingerprint(MyObject.__name__)

        exp_fields = sorted(list(MyObject.fields.items()))
        exp_methods = sorted([('remotable_method', argspec),
                              ('remotable_classmethod', argspec)])
        exp_child_versions = collections.OrderedDict(sorted(
        exp_relevant_data = (exp_fields, exp_methods, exp_child_versions)

        expected_hash = hashlib.md5(six.binary_type(repr(
        expected_fp = '%s-%s' % (MyObject.VERSION, expected_hash)

        self.assertEqual(expected_fp, fp, "_get_fingerprint() did not "
                                          "generate a correct fingerprint.")
项目:callisto-core    作者:project-callisto    | 项目源码 | 文件源码
def _decrypt(self):
        decrypted_eval_data = []
        for row in EvalRow.objects.all():
            decrypted_row = {'pk':,
                             'user': row.user_identifier,
                             'record': row.record_identifier,
                             'action': row.action,
                             'timestamp': row.timestamp.__str__()}
            gpg = gnupg.GPG()
            decrypted_eval_row = six.text_type(
            if decrypted_eval_row:
        return decrypted_eval_data
项目:odoo-rpc-client    作者:katyukha    | 项目源码 | 文件源码
def test_20_dump_drop_restore(self):
        # dump db
        dump_data =,
        self.assertIsInstance(dump_data, six.binary_type)

        # drop it,

        # and try to restore it,

        # check if objects were restored
        cl = self.client.login(self.env.dbname,
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def native(s):
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def _from_bytes(value):
    """Converts bytes to a string value, if necessary.

        value: The string/bytes value to be converted.

        The original value converted to unicode (if bytes) or as passed in
        if it started out as unicode.

        ValueError if the value could not be converted to unicode.
    result = (value.decode('utf-8')
              if isinstance(value, six.binary_type) else value)
    if isinstance(result, six.text_type):
        return result
        raise ValueError(
            '{0!r} could not be converted to unicode'.format(value))
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def clean_headers(headers):
    """Forces header keys and values to be strings, i.e not unicode.

    The httplib module just concats the header keys and values in a way that
    may make the message header a unicode string, which, if it then tries to
    contatenate to a binary request body may result in a unicode decode error.

        headers: dict, A dictionary of headers.

        The same dictionary but with all the keys converted to strings.
    clean = {}
        for k, v in six.iteritems(headers):
            if not isinstance(k, six.binary_type):
                k = str(k)
            if not isinstance(v, six.binary_type):
                v = str(v)
            clean[_helpers._to_bytes(k)] = _helpers._to_bytes(v)
    except UnicodeEncodeError:
        from oauth2client.client import NonAsciiHeaderError
        raise NonAsciiHeaderError(k, ': ', v)
    return clean
项目:btlejuice-python-bindings    作者:DigitalSecurity    | 项目源码 | 文件源码
def recv_packet(self):
            packet_text = self._connection.recv()
        except WebSocketTimeoutException as e:
            raise TimeoutError('recv timed out (%s)' % e)
        except SSLError as e:
            raise ConnectionError('recv disconnected by SSL (%s)' % e)
        except WebSocketConnectionClosedException as e:
            raise ConnectionError('recv disconnected (%s)' % e)
        except SocketError as e:
            raise ConnectionError('recv disconnected (%s)' % e)
        if not isinstance(packet_text, six.binary_type):
            packet_text = packet_text.encode('utf-8')
        engineIO_packet_type, engineIO_packet_data = parse_packet_text(
        yield engineIO_packet_type, engineIO_packet_data
项目:csodium    作者:ereOn    | 项目源码 | 文件源码
def crypto_box(msg, nonce, pk, sk):
    _assert_len('nonce', nonce, crypto_box_NONCEBYTES)
    _assert_len('pk', pk, crypto_box_PUBLICKEYBYTES)
    _assert_len('sk', sk, crypto_box_SECRETKEYBYTES)

    c = bytearray(crypto_box_MACBYTES + len(msg))
            ffi.cast("unsigned char *", ffi.from_buffer(c)),

    return binary_type(c)
项目:csodium    作者:ereOn    | 项目源码 | 文件源码
def crypto_box_open(c, nonce, pk, sk):
    _assert_min_len('c', c, crypto_box_MACBYTES)
    _assert_len('nonce', nonce, crypto_box_NONCEBYTES)
    _assert_len('pk', pk, crypto_box_PUBLICKEYBYTES)
    _assert_len('sk', sk, crypto_box_SECRETKEYBYTES)

    msg = bytearray(len(c) - crypto_box_MACBYTES)
            ffi.cast("unsigned char *", ffi.from_buffer(msg)),

    return binary_type(msg)
项目:csodium    作者:ereOn    | 项目源码 | 文件源码
def crypto_box_open_afternm(c, nonce, k):
    _assert_min_len('c', c, crypto_box_MACBYTES)
    _assert_len('nonce', nonce, crypto_box_NONCEBYTES)
    _assert_len('k', k, crypto_box_BEFORENMBYTES)

    msg = bytearray(len(c) - crypto_box_MACBYTES)
            ffi.cast("unsigned char *", ffi.from_buffer(msg)),

    return binary_type(msg)