Python six 模块,binary_type() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.binary_type()

项目:DropboxConnect    作者:raguay    | 项目源码 | 文件源码
def _params_to_urlencoded(params):
    """
    Returns a application/x-www-form-urlencoded ``str`` representing the
    key/value pairs in ``params``.

    Keys are values are ``str()``'d before calling ``urllib.urlencode``, with
    the exception of unicode objects which are utf8-encoded.
    """
    def encode(o):
        if isinstance(o, six.binary_type):
            return o
        else:
            if isinstance(o, six.text_type):
                return o.encode('utf-8')
            else:
                return str(o).encode('utf-8')

    utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)}
    return url_encode(utf8_params)
项目:placebo    作者:huseyinyilmaz    | 项目源码 | 文件源码
def test_last_request(self, mock):
        response = requests.post(GetMock.url,
                                 headers={'custom-header': 'huseyin'},
                                 data={'name': 'huseyin'})
        self.assertEqual(response.status_code, 200)
        last_request = mock.last_request
        # Test if last request has expected values.
        self.assertEqual(last_request.url, GetMock.url)

        body = last_request.body
        # In python 3 httpretty backend returns binary string for body.
        # So we are decoding it back to unicode to test.
        if isinstance(body, six.binary_type):
            body = body.decode('utf-8')
        self.assertEqual(body, 'name=huseyin')
        self.assertEqual(last_request.headers.get('custom-header'),
                         'huseyin')
        # Make sure that class's last_request is same as instances.
        self.assertIs(GetMock.last_request, mock.last_request)
项目:database_assetstore    作者:OpenGeoscience    | 项目源码 | 文件源码
def csv_safe_unicode(row, ignoreErrors=True):
    """
    Given an array of values, make sure all strings are unicode in Python 3 and
    str in Python 2.  The csv.writer in Python 2 does not handle unicode
    strings and in Python 3 it does not handle byte strings.

    :param row: an array which could contain mixed types.
    :param ignoreErrors: if True, convert what is possible and ignore errors.
        If false, conversion errors will raise an exception.
    :returns: either the original array if safe, or an array with all byte
        strings converted to unicode in Python 3 or str in Python 2.
    """
    # This is unicode in Python 2 and bytes in Python 3
    wrong_type = six.text_type if str == six.binary_type else six.binary_type
    # If all of the data is not the wrong type, just return it as is.  This
    # allows non-string types, such as integers and floats.
    if not any(isinstance(value, wrong_type) for value in row):
        return row
    # Convert the wrong type of string to the type needed for this version of
    # Python.  For Python 2 unicode gets encoded to str (bytes).  For Python 3
    # bytes get decoded to str (unicode).
    func = 'encode' if str == six.binary_type else 'decode'
    row = [getattr(value, func)('utf8', 'ignore' if ignoreErrors else 'strict')
           if isinstance(value, wrong_type) else value for value in row]
    return row
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def _from_bytes(value):
    """Converts bytes to a string value, if necessary.

    Args:
        value: The string/bytes value to be converted.

    Returns:
        The original value converted to unicode (if bytes) or as passed in
        if it started out as unicode.

    Raises:
        ValueError if the value could not be converted to unicode.
    """
    result = (value.decode('utf-8')
              if isinstance(value, six.binary_type) else value)
    if isinstance(result, six.text_type):
        return result
    else:
        raise ValueError('%r could not be converted to unicode' % (value,))
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def clean_headers(headers):
    """Forces header keys and values to be strings, i.e not unicode.

    The httplib module just concats the header keys and values in a way that
    may make the message header a unicode string, which, if it then tries to
    contatenate to a binary request body may result in a unicode decode error.

    Args:
        headers: dict, A dictionary of headers.

    Returns:
        The same dictionary but with all the keys converted to strings.
    """
    clean = {}
    try:
        for k, v in six.iteritems(headers):
            if not isinstance(k, six.binary_type):
                k = str(k)
            if not isinstance(v, six.binary_type):
                v = str(v)
            clean[_to_bytes(k)] = _to_bytes(v)
    except UnicodeEncodeError:
        raise NonAsciiHeaderError(k, ': ', v)
    return clean
项目:shub-image    作者:scrapinghub    | 项目源码 | 文件源码
def list_cmd(image_name, project, endpoint, apikey):
    """Short version of list cmd to use with deploy cmd."""

    settings = {}
    if project:
        settings = _get_project_settings(project, endpoint, apikey)

    # Run a local docker container to run list-spiders cmd
    status_code, logs = _run_list_cmd(project, image_name, settings)
    if status_code != 0:
        click.echo(logs)
        raise shub_exceptions.ShubException(
            'Container with list cmd exited with code %s' % status_code)

    spiders = utils.valid_spiders(
        logs.decode('utf-8') if isinstance(logs, binary_type) else logs)
    return spiders
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def make_header_prefix(self, message_class, version=Connection.protocol_version, stream_id=0):
        if Connection.protocol_version < 3:
            return six.binary_type().join(map(uint8_pack, [
                0xff & (HEADER_DIRECTION_TO_CLIENT | version),
                0,  # flags (compression)
                stream_id,
                message_class.opcode  # opcode
            ]))
        else:
            return six.binary_type().join(map(uint8_pack, [
                0xff & (HEADER_DIRECTION_TO_CLIENT | version),
                0,  # flags (compression)
                0,  # MSB for v3+ stream
                stream_id,
                message_class.opcode  # opcode
            ]))
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_partial_header_read(self, *args):
        c = self.make_connection()

        header = self.make_header_prefix(SupportedMessage)
        options = self.make_options_body()
        message = self.make_msg(header, options)

        # read in the first byte
        c._socket.recv.return_value = message[0:1]
        c.handle_read(None, 0)
        self.assertEqual(c._iobuf.getvalue(), message[0:1])

        c._socket.recv.return_value = message[1:]
        c.handle_read(None, 0)
        self.assertEqual(six.binary_type(), c._iobuf.getvalue())

        # let it write out a StartupMessage
        c.handle_write(None, 0)

        header = self.make_header_prefix(ReadyMessage, stream_id=1)
        c._socket.recv.return_value = self.make_msg(header)
        c.handle_read(None, 0)

        self.assertTrue(c.connected_event.is_set())
        self.assertFalse(c.is_defunct)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def test_partial_header_read(self, *args):
        c = self.make_connection()

        header = self.make_header_prefix(SupportedMessage)
        options = self.make_options_body()
        message = self.make_msg(header, options)

        c.socket.recv.return_value = message[0:1]
        c.handle_read()
        self.assertEqual(c._iobuf.getvalue(), message[0:1])

        c.socket.recv.return_value = message[1:]
        c.handle_read()
        self.assertEqual(six.binary_type(), c._iobuf.getvalue())

        # let it write out a StartupMessage
        c.handle_write()

        header = self.make_header_prefix(ReadyMessage, stream_id=1)
        c.socket.recv.return_value = self.make_msg(header)
        c.handle_read()

        self.assertTrue(c.connected_event.is_set())
        self.assertFalse(c.is_defunct)
项目:deb-python-cassandra-driver    作者:openstack    | 项目源码 | 文件源码
def varint_pack(big):
    pos = True
    if big == 0:
        return b'\x00'
    if big < 0:
        bytelength = bit_length(abs(big) - 1) // 8 + 1
        big = (1 << bytelength * 8) + big
        pos = False
    revbytes = bytearray()
    while big > 0:
        revbytes.append(big & 0xff)
        big >>= 8
    if pos and revbytes[-1] & 0x80:
        revbytes.append(0)
    revbytes.reverse()
    return six.binary_type(revbytes)
项目:Projects    作者:it2school    | 项目源码 | 文件源码
def _params_to_urlencoded(params):
    """
    Returns a application/x-www-form-urlencoded ``str`` representing the
    key/value pairs in ``params``.

    Keys are values are ``str()``'d before calling ``urllib.urlencode``, with
    the exception of unicode objects which are utf8-encoded.
    """
    def encode(o):
        if isinstance(o, six.binary_type):
            return o
        else:
            if isinstance(o, six.text_type):
                return o.encode('utf-8')
            else:
                return str(o).encode('utf-8')

    utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)}
    return url_encode(utf8_params)
项目:drf-swagger-extras    作者:ssaavedra    | 项目源码 | 文件源码
def parse_schema(schema):
    if type(schema) is six.binary_type or type(schema) is six.text_type:
        return {
            'type': schema,
        }
    elif type(schema) is list:
        return {
            'type': 'list',
        }
    elif type(schema) is dict:
        title = schema.get(':title', None)
        required_elts = list(get_required(schema.keys()))
        properties = get_object_properties(schema)
        return {
            'type': 'object',
            'title': title,
            'properties': {
                prop_name: parse_schema(subschema)
                for prop_name, subschema in properties.items()
            },
            'required': required_elts,
        }
    else:
        raise Exception('Unsupported schema definition')
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def native(s):
    """
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
        :py:class:`unicode`.
    """
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
    else:
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
项目:noc-orchestrator    作者:DirceuSilvaLabs    | 项目源码 | 文件源码
def native(s):
    """
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
        :py:class:`unicode`.
    """
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
    else:
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
项目:mos-horizon    作者:Mirantis    | 项目源码 | 文件源码
def get_template_files(template_data=None, template_url=None):
    if template_data:
        tpl = template_data
    elif template_url:
        with contextlib.closing(request.urlopen(template_url)) as u:
            tpl = u.read()
    else:
        return {}, None
    if not tpl:
        return {}, None
    if isinstance(tpl, six.binary_type):
        tpl = tpl.decode('utf-8')
    template = template_format.parse(tpl)
    files = {}
    _get_file_contents(template, files)
    return files, template
项目:gixy    作者:yandex    | 项目源码 | 文件源码
def parse(self, data):
        """
        Returns the parsed tree.
        """
        if isinstance(data, six.binary_type):
            if data[:3] == codecs.BOM_UTF8:
                encoding = 'utf-8-sig'
            else:
                encoding = 'latin1'
            content = data.decode(encoding).strip()
        else:
            content = data.strip()

        if not content:
            return ParseResults()

        return self.script.parseString(content, parseAll=True)
项目:python-pankoclient    作者:openstack    | 项目源码 | 文件源码
def convert_into_with_meta(self, item, resp):
        if isinstance(item, six.string_types):
            if six.PY2 and isinstance(item, six.text_type):
                return UnicodeWithMeta(item, resp)
            else:
                return StrWithMeta(item, resp)
        elif isinstance(item, six.binary_type):
            return BytesWithMeta(item, resp)
        elif isinstance(item, list):
            return ListWithMeta(item, resp)
        elif isinstance(item, tuple):
            return TupleWithMeta(item, resp)
        elif item is None:
            return TupleWithMeta((), resp)
        else:
            return DictWithMeta(item, resp)
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def _from_bytes(value):
    """Converts bytes to a string value, if necessary.

    Args:
        value: The string/bytes value to be converted.

    Returns:
        The original value converted to unicode (if bytes) or as passed in
        if it started out as unicode.

    Raises:
        ValueError if the value could not be converted to unicode.
    """
    result = (value.decode('utf-8')
              if isinstance(value, six.binary_type) else value)
    if isinstance(result, six.text_type):
        return result
    else:
        raise ValueError(
            '{0!r} could not be converted to unicode'.format(value))
项目:GAMADV-XTD    作者:taers232c    | 项目源码 | 文件源码
def clean_headers(headers):
    """Forces header keys and values to be strings, i.e not unicode.

    The httplib module just concats the header keys and values in a way that
    may make the message header a unicode string, which, if it then tries to
    contatenate to a binary request body may result in a unicode decode error.

    Args:
        headers: dict, A dictionary of headers.

    Returns:
        The same dictionary but with all the keys converted to strings.
    """
    clean = {}
    try:
        for k, v in six.iteritems(headers):
            if not isinstance(k, six.binary_type):
                k = str(k)
            if not isinstance(v, six.binary_type):
                v = str(v)
            clean[_helpers._to_bytes(k)] = _helpers._to_bytes(v)
    except UnicodeEncodeError:
        from oauth2client.client import NonAsciiHeaderError
        raise NonAsciiHeaderError(k, ': ', v)
    return clean
项目:panko    作者:openstack    | 项目源码 | 文件源码
def decode_unicode(input):
    """Decode the unicode of the message, and encode it into utf-8."""
    if isinstance(input, dict):
        temp = {}
        # If the input data is a dict, create an equivalent dict with a
        # predictable insertion order to avoid inconsistencies in the
        # message signature computation for equivalent payloads modulo
        # ordering
        for key, value in sorted(six.iteritems(input)):
            temp[decode_unicode(key)] = decode_unicode(value)
        return temp
    elif isinstance(input, (tuple, list)):
        # When doing a pair of JSON encode/decode operations to the tuple,
        # the tuple would become list. So we have to generate the value as
        # list here.
        return [decode_unicode(element) for element in input]
    elif six.PY2 and isinstance(input, six.text_type):
        return input.encode('utf-8')
    elif six.PY3 and isinstance(input, six.binary_type):
        return input.decode('utf-8')
    else:
        return input
项目:artman    作者:googleapis    | 项目源码 | 文件源码
def replace_vars(data, repl_vars):
    """Return the given data structure with appropriate variables replaced.

    Args:
        data (Any): Data of an arbitrary type.

    Returns:
        Any: Data of the same time, possibly with replaced values.
    """
    if isinstance(data, (six.text_type, six.binary_type)):
        for k, v in repl_vars.items():
            data = data.replace('${' + k + '}', v)
        return data
    if isinstance(data, collections.Sequence):
        return type(data)([replace_vars(d, repl_vars) for d in data])
    if isinstance(data, collections.Mapping):
        return type(data)([(k, replace_vars(v, repl_vars))
                           for k, v in data.items()])
    return data
项目:ChemDataExtractor    作者:mcs07    | 项目源码 | 文件源码
def clean_markup(self, markup, parser=None):
        """Apply ``Cleaner`` to markup string or document and return a cleaned string or document."""
        result_type = type(markup)
        if isinstance(markup, six.string_types):
            doc = fromstring(markup, parser=parser)
        else:
            doc = copy.deepcopy(markup)
        self(doc)
        if issubclass(result_type, six.binary_type):
            return tostring(doc, encoding='utf-8')
        elif issubclass(result_type, six.text_type):
            return tostring(doc, encoding='unicode')
        else:
            return doc


#: A default Cleaner instance, which kills comments, processing instructions, script tags, style tags.
项目:ChemDataExtractor    作者:mcs07    | 项目源码 | 文件源码
def __init__(self, *elements):
        """Initialize a Document manually by passing one or more Document elements (Paragraph, Heading, Table, etc.)

        Strings that are passed to this constructor are automatically wrapped into Paragraph elements.

        :param list[chemdataextractor.doc.element.BaseElement|string] elements: Elements in this Document.
        """
        self._elements = []
        for element in elements:
            # Convert raw text to Paragraph elements
            if isinstance(element, six.text_type):
                element = Paragraph(element)
            elif isinstance(element, six.binary_type):
                # Try guess encoding if byte string
                encoding = get_encoding(element)
                log.warning('Guessed bytestring encoding as %s. Use unicode strings to avoid this warning.', encoding)
                element = Paragraph(element.decode(encoding))
            element.document = self
            self._elements.append(element)
        log.debug('%s: Initializing with %s elements' % (self.__class__.__name__, len(self.elements)))
项目:OpenStackClient_VBS    作者:Huawei    | 项目源码 | 文件源码
def mixin_meta(item, resp):
        if isinstance(item, six.string_types):
            if six.PY2 and isinstance(item, six.text_type):
                return resource.UnicodeWithMeta(item, resp)
            else:
                return resource.StrWithMeta(item, resp)
        elif isinstance(item, six.binary_type):
            return resource.BytesWithMeta(item, resp)
        elif isinstance(item, list):
            return resource.ListWithMeta(item, resp)
        elif isinstance(item, tuple):
            return resource.TupleWithMeta(item, resp)
        elif item is None:
            return resource.TupleWithMeta((), resp)
        else:
            return resource.DictWithMeta(item, resp)
项目:tomato    作者:sertansenturk    | 项目源码 | 文件源码
def _call_audio_metadata(self, audio_meta, filepath):
        if audio_meta is False:  # metadata crawling is disabled
            audio_meta = None
        elif audio_meta is None:  # no MBID is given, attempt to get
            # it from id3 tag
            audio_meta = self.crawl_musicbrainz_metadata(filepath)
        elif isinstance(audio_meta, (six.string_types, six.binary_type)):
            # MBID is given
            audio_meta = self.crawl_musicbrainz_metadata(audio_meta)
        elif not isinstance(audio_meta, dict):
            warn_str = 'The "metadata" input can be "False" (skipped), ' \
                       '"basestring" (MBID input), "None" (attempt to get ' \
                       'the MBID from audio file tags) or "dict" (already ' \
                       'computed)'
            warnings.warn(warn_str, stacklevel=2)
        return audio_meta
项目:storj-python-sdk    作者:Storj    | 项目源码 | 文件源码
def _assert_shard_manager(self, content, mode, mock_tree, mock_challenges):
        with tempfile.NamedTemporaryFile(mode, delete=False) as tmp_file:
            tmp_file.write(content)
            tmp_file.flush()

            nchallenges = 2
            sm = ShardManager(
                tmp_file.name,
                nchallenges=nchallenges)

            assert sm.filepath == tmp_file.name
            assert len(sm.shards) > 0

        mock_challenges.assert_called_once_with(nchallenges)
        if isinstance(content, six.binary_type):
            mock_tree.assert_called_once_with(
                mock_challenges.return_value, content)
        else:
            mock_tree.assert_called_once_with(
                mock_challenges.return_value, bytes(content.encode('utf-8')))

        mock_tree.reset_mock()
        mock_challenges.reset_mock()
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def b64decode(data):
    """JOSE Base64 decode.

    :param data: Base64 string to be decoded. If it's unicode, then
                 only ASCII characters are allowed.
    :type data: `bytes` or `unicode`

    :returns: Decoded data.
    :rtype: bytes

    :raises TypeError: if input is of incorrect type
    :raises ValueError: if input is unicode with non-ASCII characters

    """
    if isinstance(data, six.string_types):
        try:
            data = data.encode('ascii')
        except UnicodeEncodeError:
            raise ValueError(
                'unicode argument should contain only ASCII characters')
    elif not isinstance(data, six.binary_type):
        raise TypeError('argument should be a str or unicode')

    return base64.urlsafe_b64decode(data + b'=' * (4 - (len(data) % 4)))
项目:TCP-IP    作者:JackZ0    | 项目源码 | 文件源码
def certificate(self, cert, name, alt_host=None, port=443):
        """Verifies the certificate presented at name is cert"""
        if alt_host is None:
            host = socket.gethostbyname(name)
        elif isinstance(alt_host, six.binary_type):
            host = alt_host
        else:
            host = alt_host.encode()
        name = name if isinstance(name, six.binary_type) else name.encode()

        try:
            presented_cert = crypto_util.probe_sni(name, host, port)
        except acme_errors.Error as error:
            logger.exception(error)
            return False

        return presented_cert.digest("sha256") == cert.digest("sha256")
项目:warcio    作者:webrecorder    | 项目源码 | 文件源码
def headers_to_str_headers(headers):
    '''
    Converts dict or tuple-based headers of bytes or str to
    tuple-based headers of str, which is the python norm (pep 3333)
    '''
    ret = []

    if isinstance(headers, collections_abc.Mapping):
        h = headers.items()
    else:
        h = headers

    if six.PY2:  #pragma: no cover
        return h

    for tup in h:
        k, v = tup
        if isinstance(k, six.binary_type):
            k = k.decode('iso-8859-1')
        if isinstance(v, six.binary_type):
            v = v.decode('iso-8859-1')
        ret.append((k, v))
    return ret
项目:aws-cfn-plex    作者:lordmuffin    | 项目源码 | 文件源码
def native(s):
    """
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
        :py:class:`unicode`.
    """
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
    else:
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
项目:txaio-etcd    作者:crossbario    | 项目源码 | 文件源码
def __validate(self):
        if type(self._key) == six.binary_type:
            if self._range_end:
                self._key = KeySet(self._key, range_end=self._range_end)
            else:
                self._key = KeySet(self._key)
        elif isinstance(self._key, KeySet):
            pass
        else:
            raise TypeError(
                'key must either be bytes or a KeySet object, not {}'.format(type(self._key)))

        if self._key.type == KeySet._SINGLE:
            self._range_end = None
        elif self._key.type == KeySet._PREFIX:
            self._range_end = _increment_last_byte(self._key.key)
        elif self._key.type == KeySet._RANGE:
            self._range_end = self._key.range_end
        else:
            raise Exception('logic error')
项目:txaio-etcd    作者:crossbario    | 项目源码 | 文件源码
def __validate(self):
        if type(self._key) == six.binary_type:
            self._key = KeySet(self._key)
        elif isinstance(self._key, KeySet):
            pass
        else:
            raise TypeError(
                'key must either be bytes or a KeySet object, not {}'.format(type(self._key)))

        if self._return_previous is not None and type(self._return_previous) != bool:
            raise TypeError('return_previous must be bool, not {}'.format(
                type(self._return_previous)))

        if self._key.type == KeySet._SINGLE:
            self._range_end = None
        elif self._key.type == KeySet._PREFIX:
            self._range_end = _increment_last_byte(self._key.key)
        elif self._key.type == KeySet._RANGE:
            self._range_end = self._key.range_end
        else:
            raise Exception('logic error')
项目:txaio-etcd    作者:crossbario    | 项目源码 | 文件源码
def __init__(self, key, return_previous=None):
        """

        :param key: The key or key set to delete.
        :type key: bytes or :class:`txaioetcd.KeySet`

        :param return_previous: If enabled, return the deleted key-value pairs
        :type return_previous: bool or None
        """
        Op.__init__(self)

        if key is not None and type(key) != six.binary_type and not isinstance(key, KeySet):
            raise TypeError('key must be bytes or KeySet, not {}'.format(type(key)))

        if isinstance(key, KeySet):
            self.key = key
        else:
            self.key = KeySet(key)

        if return_previous is not None and type(return_previous) != bool:
            raise TypeError('return_previous must be bool, not {}'.format(type(return_previous)))

        self.return_previous = return_previous
项目:filters    作者:eflglobal    | 项目源码 | 文件源码
def _apply(self, value):
        value = super(Array, self)._apply(value) # type: Sequence

        if self._has_errors:
            return None

        if isinstance(value, (binary_type, text_type)):
            return self._invalid_value(
                value   = value,
                reason  = self.CODE_WRONG_TYPE,

                template_vars = {
                    'incoming': self.get_type_name(type(value)),
                    'allowed':  self.get_allowed_type_names(),
                },
            )

        return value
项目:MMdnn    作者:Microsoft    | 项目源码 | 文件源码
def assign_attr_value(attr, val):
    from mmdnn.conversion.common.IR.graph_pb2 import TensorShape
    '''Assign value to AttrValue proto according to data type.'''
    if isinstance(val, bool):
        attr.b = val
    elif isinstance(val, integer_types):
        attr.i = val
    elif isinstance(val, float):
        attr.f = val
    elif isinstance(val, binary_type) or isinstance(val, text_type):
        if hasattr(val, 'encode'):
            val = val.encode()
        attr.s = val
    elif isinstance(val, TensorShape):
        attr.shape.MergeFromString(val.SerializeToString())
    elif isinstance(val, list):
        if not val: return
        if isinstance(val[0], integer_types):
            attr.list.i.extend(val)
        elif isinstance(val[0], TensorShape):
            attr.list.shape.extend(val)
        else:
            raise NotImplementedError('AttrValue cannot be of list[{}].'.format(val[0]))
    else:
        raise NotImplementedError('AttrValue cannot be of %s' % type(val))
项目:ngraph    作者:NervanaSystems    | 项目源码 | 文件源码
def write_np_values(values, f):
    """
    Arguments:
        values: {str: np.array}
        f: filename or filelike object
    """
    with ZipFile(f, 'w') as zf:
        for k, v in values.items():
            # Need to do this because Python zipfile has some odd support for filenames:
            # http://bugs.python.org/issue24110
            if len(k) == 16 and isinstance(k, six.binary_type):  # valid UUID bytes
                zf.writestr(str(uuid.UUID(bytes=k)), v.tostring())
            else:
                zf.writestr(six.u(k), v.tostring())

        zf.writestr(MANIFEST_FILENAME, json_dumps_manifest(values))
项目:ion-python    作者:amzn    | 项目源码 | 文件源码
def generate_scalars_binary(scalars_map, preceding_symbols=0):
    for ion_type, values in six.iteritems(scalars_map):
        for native, expected in values:
            native_expected = expected
            has_symbols = False
            if native is None:
                # An un-adorned 'None' doesn't contain enough information to determine its Ion type
                native_expected = b'\x0f'
            elif ion_type is IonType.CLOB:
                # All six.binary_type are treated as BLOBs unless wrapped by an _IonNature
                tid = six.byte2int(expected) + 0x10  # increment upper nibble for clob -> blob; keep lower nibble
                native_expected = bytearray([tid]) + expected[1:]
            elif ion_type is IonType.SYMBOL and native is not None:
                has_symbols = True
            elif ion_type is IonType.STRING:
                # Encode all strings as symbols too.
                symbol_expected = _serialize_symbol(
                    IonEvent(IonEventType.SCALAR, IonType.SYMBOL, SymbolToken(None, 10 + preceding_symbols)))
                yield _Parameter(IonType.SYMBOL.name + ' ' + native,
                                 IonPyText.from_value(IonType.SYMBOL, native), symbol_expected, True)
            yield _Parameter('%s %s' % (ion_type.name, native), native, native_expected, has_symbols)
            wrapper = _FROM_ION_TYPE[ion_type].from_value(ion_type, native)
            yield _Parameter(repr(wrapper), wrapper, expected, has_symbols)
项目:aliyun-log-python-sdk    作者:aliyun    | 项目源码 | 文件源码
def convert_unicode_to_str(data):
        """
        Py2, always translate to utf8 from unicode
        Py3, always translate to unicode
        :param data:
        :return:
        """
        if six.PY2 and isinstance(data, six.text_type):
            return data.encode('utf8')
        elif six.PY3 and isinstance(data, six.binary_type):
            return data.decode('utf8')
        elif isinstance(data, collections.Mapping):
            return dict((Util.convert_unicode_to_str(k), Util.convert_unicode_to_str(v))
                        for k, v in six.iteritems(data))
        elif isinstance(data, collections.Iterable) and not isinstance(data, (six.binary_type, six.string_types)):
            return type(data)(map(Util.convert_unicode_to_str, data))

        return data
项目:Intranet-Penetration    作者:yuxiaokui    | 项目源码 | 文件源码
def native(s):
    """
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
        :py:class:`unicode`.
    """
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
    else:
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
项目:deb-oslo.versionedobjects    作者:openstack    | 项目源码 | 文件源码
def test_get_fingerprint(self):
        # Make sure _get_fingerprint() generates a consistent fingerprint
        MyObject.VERSION = '1.1'
        argspec = 'vulpix'

        with mock.patch('inspect.getargspec') as mock_gas:
            mock_gas.return_value = argspec
            fp = self.ovc._get_fingerprint(MyObject.__name__)

        exp_fields = sorted(list(MyObject.fields.items()))
        exp_methods = sorted([('remotable_method', argspec),
                              ('remotable_classmethod', argspec)])
        expected_relevant_data = (exp_fields, exp_methods)
        expected_hash = hashlib.md5(six.binary_type(repr(
            expected_relevant_data).encode())).hexdigest()
        expected_fp = '%s-%s' % (MyObject.VERSION, expected_hash)

        self.assertEqual(expected_fp, fp, "_get_fingerprint() did not "
                                          "generate a correct fingerprint.")
项目:deb-oslo.versionedobjects    作者:openstack    | 项目源码 | 文件源码
def test_get_fingerprint_with_child_versions(self):
        # Make sure _get_fingerprint() generates a consistent fingerprint
        # when child_versions are present
        child_versions = {'1.0': '1.0', '1.1': '1.1'}
        MyObject.VERSION = '1.1'
        MyObject.child_versions = child_versions
        argspec = 'onix'

        with mock.patch('inspect.getargspec') as mock_gas:
            mock_gas.return_value = argspec
            fp = self.ovc._get_fingerprint(MyObject.__name__)

        exp_fields = sorted(list(MyObject.fields.items()))
        exp_methods = sorted([('remotable_method', argspec),
                              ('remotable_classmethod', argspec)])
        exp_child_versions = collections.OrderedDict(sorted(
            child_versions.items()))
        exp_relevant_data = (exp_fields, exp_methods, exp_child_versions)

        expected_hash = hashlib.md5(six.binary_type(repr(
            exp_relevant_data).encode())).hexdigest()
        expected_fp = '%s-%s' % (MyObject.VERSION, expected_hash)

        self.assertEqual(expected_fp, fp, "_get_fingerprint() did not "
                                          "generate a correct fingerprint.")
项目:callisto-core    作者:project-callisto    | 项目源码 | 文件源码
def _decrypt(self):
        decrypted_eval_data = []
        for row in EvalRow.objects.all():
            decrypted_row = {'pk': row.pk,
                             'user': row.user_identifier,
                             'record': row.record_identifier,
                             'action': row.action,
                             'timestamp': row.timestamp.__str__()}
            gpg = gnupg.GPG()
            gpg.import_keys(settings.CALLISTO_EVAL_PRIVATE_KEY)
            decrypted_eval_row = six.text_type(
                gpg.decrypt(six.binary_type(row.row)))
            if decrypted_eval_row:
                decrypted_row.update(json.loads(decrypted_eval_row))
            decrypted_eval_data.append(decrypted_row)
        return decrypted_eval_data
项目:odoo-rpc-client    作者:katyukha    | 项目源码 | 文件源码
def test_20_dump_drop_restore(self):
        # dump db
        dump_data = self.client.services.db.dump_db(self.env.super_password,
                                                    self.env.dbname)
        self.assertIsInstance(dump_data, six.binary_type)

        # drop it
        self.client.services.db.drop_db(self.env.super_password,
                                        self.env.dbname)
        self.assertNotIn(self.env.dbname, self.client.services.db)

        # and try to restore it
        self.client.services.db.restore_db(self.env.super_password,
                                           self.env.dbname,
                                           dump_data)
        self.assertIn(self.env.dbname, self.client.services.db)

        # check if objects were restored
        time.sleep(2)
        cl = self.client.login(self.env.dbname,
                               self.env.user,
                               self.env.password)
        self.assertIsNotNone(cl.uid)
        self.assertIsNotNone(cl.user)
项目:MKFQ    作者:maojingios    | 项目源码 | 文件源码
def native(s):
    """
    Convert :py:class:`bytes` or :py:class:`unicode` to the native
    :py:class:`str` type, using UTF-8 encoding if conversion is necessary.

    :raise UnicodeError: The input string is not UTF-8 decodeable.

    :raise TypeError: The input is neither :py:class:`bytes` nor
        :py:class:`unicode`.
    """
    if not isinstance(s, (binary_type, text_type)):
        raise TypeError("%r is neither bytes nor unicode" % s)
    if PY3:
        if isinstance(s, binary_type):
            return s.decode("utf-8")
    else:
        if isinstance(s, text_type):
            return s.encode("utf-8")
    return s
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def _from_bytes(value):
    """Converts bytes to a string value, if necessary.

    Args:
        value: The string/bytes value to be converted.

    Returns:
        The original value converted to unicode (if bytes) or as passed in
        if it started out as unicode.

    Raises:
        ValueError if the value could not be converted to unicode.
    """
    result = (value.decode('utf-8')
              if isinstance(value, six.binary_type) else value)
    if isinstance(result, six.text_type):
        return result
    else:
        raise ValueError(
            '{0!r} could not be converted to unicode'.format(value))
项目:deb-python-oauth2client    作者:openstack    | 项目源码 | 文件源码
def clean_headers(headers):
    """Forces header keys and values to be strings, i.e not unicode.

    The httplib module just concats the header keys and values in a way that
    may make the message header a unicode string, which, if it then tries to
    contatenate to a binary request body may result in a unicode decode error.

    Args:
        headers: dict, A dictionary of headers.

    Returns:
        The same dictionary but with all the keys converted to strings.
    """
    clean = {}
    try:
        for k, v in six.iteritems(headers):
            if not isinstance(k, six.binary_type):
                k = str(k)
            if not isinstance(v, six.binary_type):
                v = str(v)
            clean[_helpers._to_bytes(k)] = _helpers._to_bytes(v)
    except UnicodeEncodeError:
        from oauth2client.client import NonAsciiHeaderError
        raise NonAsciiHeaderError(k, ': ', v)
    return clean
项目:btlejuice-python-bindings    作者:DigitalSecurity    | 项目源码 | 文件源码
def recv_packet(self):
        try:
            packet_text = self._connection.recv()
        except WebSocketTimeoutException as e:
            raise TimeoutError('recv timed out (%s)' % e)
        except SSLError as e:
            raise ConnectionError('recv disconnected by SSL (%s)' % e)
        except WebSocketConnectionClosedException as e:
            raise ConnectionError('recv disconnected (%s)' % e)
        except SocketError as e:
            raise ConnectionError('recv disconnected (%s)' % e)
        if not isinstance(packet_text, six.binary_type):
            packet_text = packet_text.encode('utf-8')
        engineIO_packet_type, engineIO_packet_data = parse_packet_text(
            packet_text)
        yield engineIO_packet_type, engineIO_packet_data
项目:csodium    作者:ereOn    | 项目源码 | 文件源码
def crypto_box(msg, nonce, pk, sk):
    _assert_len('nonce', nonce, crypto_box_NONCEBYTES)
    _assert_len('pk', pk, crypto_box_PUBLICKEYBYTES)
    _assert_len('sk', sk, crypto_box_SECRETKEYBYTES)

    c = bytearray(crypto_box_MACBYTES + len(msg))
    _raise_on_error(
        lib.crypto_box_easy(
            ffi.cast("unsigned char *", ffi.from_buffer(c)),
            msg,
            len(msg),
            nonce,
            pk,
            sk,
        ),
    )

    return binary_type(c)
项目:csodium    作者:ereOn    | 项目源码 | 文件源码
def crypto_box_open(c, nonce, pk, sk):
    _assert_min_len('c', c, crypto_box_MACBYTES)
    _assert_len('nonce', nonce, crypto_box_NONCEBYTES)
    _assert_len('pk', pk, crypto_box_PUBLICKEYBYTES)
    _assert_len('sk', sk, crypto_box_SECRETKEYBYTES)

    msg = bytearray(len(c) - crypto_box_MACBYTES)
    _raise_on_error(
        lib.crypto_box_open_easy(
            ffi.cast("unsigned char *", ffi.from_buffer(msg)),
            c,
            len(c),
            nonce,
            pk,
            sk,
        ),
    )

    return binary_type(msg)
项目:csodium    作者:ereOn    | 项目源码 | 文件源码
def crypto_box_open_afternm(c, nonce, k):
    _assert_min_len('c', c, crypto_box_MACBYTES)
    _assert_len('nonce', nonce, crypto_box_NONCEBYTES)
    _assert_len('k', k, crypto_box_BEFORENMBYTES)

    msg = bytearray(len(c) - crypto_box_MACBYTES)
    _raise_on_error(
        lib.crypto_box_open_easy_afternm(
            ffi.cast("unsigned char *", ffi.from_buffer(msg)),
            c,
            len(c),
            nonce,
            k,
        ),
    )

    return binary_type(msg)