我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.binary_type()。
def _params_to_urlencoded(params): """ Returns a application/x-www-form-urlencoded ``str`` representing the key/value pairs in ``params``. Keys are values are ``str()``'d before calling ``urllib.urlencode``, with the exception of unicode objects which are utf8-encoded. """ def encode(o): if isinstance(o, six.binary_type): return o else: if isinstance(o, six.text_type): return o.encode('utf-8') else: return str(o).encode('utf-8') utf8_params = {encode(k): encode(v) for k, v in six.iteritems(params)} return url_encode(utf8_params)
def test_last_request(self, mock): response = requests.post(GetMock.url, headers={'custom-header': 'huseyin'}, data={'name': 'huseyin'}) self.assertEqual(response.status_code, 200) last_request = mock.last_request # Test if last request has expected values. self.assertEqual(last_request.url, GetMock.url) body = last_request.body # In python 3 httpretty backend returns binary string for body. # So we are decoding it back to unicode to test. if isinstance(body, six.binary_type): body = body.decode('utf-8') self.assertEqual(body, 'name=huseyin') self.assertEqual(last_request.headers.get('custom-header'), 'huseyin') # Make sure that class's last_request is same as instances. self.assertIs(GetMock.last_request, mock.last_request)
def csv_safe_unicode(row, ignoreErrors=True): """ Given an array of values, make sure all strings are unicode in Python 3 and str in Python 2. The csv.writer in Python 2 does not handle unicode strings and in Python 3 it does not handle byte strings. :param row: an array which could contain mixed types. :param ignoreErrors: if True, convert what is possible and ignore errors. If false, conversion errors will raise an exception. :returns: either the original array if safe, or an array with all byte strings converted to unicode in Python 3 or str in Python 2. """ # This is unicode in Python 2 and bytes in Python 3 wrong_type = six.text_type if str == six.binary_type else six.binary_type # If all of the data is not the wrong type, just return it as is. This # allows non-string types, such as integers and floats. if not any(isinstance(value, wrong_type) for value in row): return row # Convert the wrong type of string to the type needed for this version of # Python. For Python 2 unicode gets encoded to str (bytes). For Python 3 # bytes get decoded to str (unicode). func = 'encode' if str == six.binary_type else 'decode' row = [getattr(value, func)('utf8', 'ignore' if ignoreErrors else 'strict') if isinstance(value, wrong_type) else value for value in row] return row
def _from_bytes(value): """Converts bytes to a string value, if necessary. Args: value: The string/bytes value to be converted. Returns: The original value converted to unicode (if bytes) or as passed in if it started out as unicode. Raises: ValueError if the value could not be converted to unicode. """ result = (value.decode('utf-8') if isinstance(value, six.binary_type) else value) if isinstance(result, six.text_type): return result else: raise ValueError('%r could not be converted to unicode' % (value,))
def clean_headers(headers): """Forces header keys and values to be strings, i.e not unicode. The httplib module just concats the header keys and values in a way that may make the message header a unicode string, which, if it then tries to contatenate to a binary request body may result in a unicode decode error. Args: headers: dict, A dictionary of headers. Returns: The same dictionary but with all the keys converted to strings. """ clean = {} try: for k, v in six.iteritems(headers): if not isinstance(k, six.binary_type): k = str(k) if not isinstance(v, six.binary_type): v = str(v) clean[_to_bytes(k)] = _to_bytes(v) except UnicodeEncodeError: raise NonAsciiHeaderError(k, ': ', v) return clean
def list_cmd(image_name, project, endpoint, apikey): """Short version of list cmd to use with deploy cmd.""" settings = {} if project: settings = _get_project_settings(project, endpoint, apikey) # Run a local docker container to run list-spiders cmd status_code, logs = _run_list_cmd(project, image_name, settings) if status_code != 0: click.echo(logs) raise shub_exceptions.ShubException( 'Container with list cmd exited with code %s' % status_code) spiders = utils.valid_spiders( logs.decode('utf-8') if isinstance(logs, binary_type) else logs) return spiders
def make_header_prefix(self, message_class, version=Connection.protocol_version, stream_id=0): if Connection.protocol_version < 3: return six.binary_type().join(map(uint8_pack, [ 0xff & (HEADER_DIRECTION_TO_CLIENT | version), 0, # flags (compression) stream_id, message_class.opcode # opcode ])) else: return six.binary_type().join(map(uint8_pack, [ 0xff & (HEADER_DIRECTION_TO_CLIENT | version), 0, # flags (compression) 0, # MSB for v3+ stream stream_id, message_class.opcode # opcode ]))
def test_partial_header_read(self, *args): c = self.make_connection() header = self.make_header_prefix(SupportedMessage) options = self.make_options_body() message = self.make_msg(header, options) # read in the first byte c._socket.recv.return_value = message[0:1] c.handle_read(None, 0) self.assertEqual(c._iobuf.getvalue(), message[0:1]) c._socket.recv.return_value = message[1:] c.handle_read(None, 0) self.assertEqual(six.binary_type(), c._iobuf.getvalue()) # let it write out a StartupMessage c.handle_write(None, 0) header = self.make_header_prefix(ReadyMessage, stream_id=1) c._socket.recv.return_value = self.make_msg(header) c.handle_read(None, 0) self.assertTrue(c.connected_event.is_set()) self.assertFalse(c.is_defunct)
def test_partial_header_read(self, *args): c = self.make_connection() header = self.make_header_prefix(SupportedMessage) options = self.make_options_body() message = self.make_msg(header, options) c.socket.recv.return_value = message[0:1] c.handle_read() self.assertEqual(c._iobuf.getvalue(), message[0:1]) c.socket.recv.return_value = message[1:] c.handle_read() self.assertEqual(six.binary_type(), c._iobuf.getvalue()) # let it write out a StartupMessage c.handle_write() header = self.make_header_prefix(ReadyMessage, stream_id=1) c.socket.recv.return_value = self.make_msg(header) c.handle_read() self.assertTrue(c.connected_event.is_set()) self.assertFalse(c.is_defunct)
def varint_pack(big): pos = True if big == 0: return b'\x00' if big < 0: bytelength = bit_length(abs(big) - 1) // 8 + 1 big = (1 << bytelength * 8) + big pos = False revbytes = bytearray() while big > 0: revbytes.append(big & 0xff) big >>= 8 if pos and revbytes[-1] & 0x80: revbytes.append(0) revbytes.reverse() return six.binary_type(revbytes)
def parse_schema(schema): if type(schema) is six.binary_type or type(schema) is six.text_type: return { 'type': schema, } elif type(schema) is list: return { 'type': 'list', } elif type(schema) is dict: title = schema.get(':title', None) required_elts = list(get_required(schema.keys())) properties = get_object_properties(schema) return { 'type': 'object', 'title': title, 'properties': { prop_name: parse_schema(subschema) for prop_name, subschema in properties.items() }, 'required': required_elts, } else: raise Exception('Unsupported schema definition')
def native(s): """ Convert :py:class:`bytes` or :py:class:`unicode` to the native :py:class:`str` type, using UTF-8 encoding if conversion is necessary. :raise UnicodeError: The input string is not UTF-8 decodeable. :raise TypeError: The input is neither :py:class:`bytes` nor :py:class:`unicode`. """ if not isinstance(s, (binary_type, text_type)): raise TypeError("%r is neither bytes nor unicode" % s) if PY3: if isinstance(s, binary_type): return s.decode("utf-8") else: if isinstance(s, text_type): return s.encode("utf-8") return s
def get_template_files(template_data=None, template_url=None): if template_data: tpl = template_data elif template_url: with contextlib.closing(request.urlopen(template_url)) as u: tpl = u.read() else: return {}, None if not tpl: return {}, None if isinstance(tpl, six.binary_type): tpl = tpl.decode('utf-8') template = template_format.parse(tpl) files = {} _get_file_contents(template, files) return files, template
def parse(self, data): """ Returns the parsed tree. """ if isinstance(data, six.binary_type): if data[:3] == codecs.BOM_UTF8: encoding = 'utf-8-sig' else: encoding = 'latin1' content = data.decode(encoding).strip() else: content = data.strip() if not content: return ParseResults() return self.script.parseString(content, parseAll=True)
def convert_into_with_meta(self, item, resp): if isinstance(item, six.string_types): if six.PY2 and isinstance(item, six.text_type): return UnicodeWithMeta(item, resp) else: return StrWithMeta(item, resp) elif isinstance(item, six.binary_type): return BytesWithMeta(item, resp) elif isinstance(item, list): return ListWithMeta(item, resp) elif isinstance(item, tuple): return TupleWithMeta(item, resp) elif item is None: return TupleWithMeta((), resp) else: return DictWithMeta(item, resp)
def _from_bytes(value): """Converts bytes to a string value, if necessary. Args: value: The string/bytes value to be converted. Returns: The original value converted to unicode (if bytes) or as passed in if it started out as unicode. Raises: ValueError if the value could not be converted to unicode. """ result = (value.decode('utf-8') if isinstance(value, six.binary_type) else value) if isinstance(result, six.text_type): return result else: raise ValueError( '{0!r} could not be converted to unicode'.format(value))
def clean_headers(headers): """Forces header keys and values to be strings, i.e not unicode. The httplib module just concats the header keys and values in a way that may make the message header a unicode string, which, if it then tries to contatenate to a binary request body may result in a unicode decode error. Args: headers: dict, A dictionary of headers. Returns: The same dictionary but with all the keys converted to strings. """ clean = {} try: for k, v in six.iteritems(headers): if not isinstance(k, six.binary_type): k = str(k) if not isinstance(v, six.binary_type): v = str(v) clean[_helpers._to_bytes(k)] = _helpers._to_bytes(v) except UnicodeEncodeError: from oauth2client.client import NonAsciiHeaderError raise NonAsciiHeaderError(k, ': ', v) return clean
def decode_unicode(input): """Decode the unicode of the message, and encode it into utf-8.""" if isinstance(input, dict): temp = {} # If the input data is a dict, create an equivalent dict with a # predictable insertion order to avoid inconsistencies in the # message signature computation for equivalent payloads modulo # ordering for key, value in sorted(six.iteritems(input)): temp[decode_unicode(key)] = decode_unicode(value) return temp elif isinstance(input, (tuple, list)): # When doing a pair of JSON encode/decode operations to the tuple, # the tuple would become list. So we have to generate the value as # list here. return [decode_unicode(element) for element in input] elif six.PY2 and isinstance(input, six.text_type): return input.encode('utf-8') elif six.PY3 and isinstance(input, six.binary_type): return input.decode('utf-8') else: return input
def replace_vars(data, repl_vars): """Return the given data structure with appropriate variables replaced. Args: data (Any): Data of an arbitrary type. Returns: Any: Data of the same time, possibly with replaced values. """ if isinstance(data, (six.text_type, six.binary_type)): for k, v in repl_vars.items(): data = data.replace('${' + k + '}', v) return data if isinstance(data, collections.Sequence): return type(data)([replace_vars(d, repl_vars) for d in data]) if isinstance(data, collections.Mapping): return type(data)([(k, replace_vars(v, repl_vars)) for k, v in data.items()]) return data
def clean_markup(self, markup, parser=None): """Apply ``Cleaner`` to markup string or document and return a cleaned string or document.""" result_type = type(markup) if isinstance(markup, six.string_types): doc = fromstring(markup, parser=parser) else: doc = copy.deepcopy(markup) self(doc) if issubclass(result_type, six.binary_type): return tostring(doc, encoding='utf-8') elif issubclass(result_type, six.text_type): return tostring(doc, encoding='unicode') else: return doc #: A default Cleaner instance, which kills comments, processing instructions, script tags, style tags.
def __init__(self, *elements): """Initialize a Document manually by passing one or more Document elements (Paragraph, Heading, Table, etc.) Strings that are passed to this constructor are automatically wrapped into Paragraph elements. :param list[chemdataextractor.doc.element.BaseElement|string] elements: Elements in this Document. """ self._elements = [] for element in elements: # Convert raw text to Paragraph elements if isinstance(element, six.text_type): element = Paragraph(element) elif isinstance(element, six.binary_type): # Try guess encoding if byte string encoding = get_encoding(element) log.warning('Guessed bytestring encoding as %s. Use unicode strings to avoid this warning.', encoding) element = Paragraph(element.decode(encoding)) element.document = self self._elements.append(element) log.debug('%s: Initializing with %s elements' % (self.__class__.__name__, len(self.elements)))
def mixin_meta(item, resp): if isinstance(item, six.string_types): if six.PY2 and isinstance(item, six.text_type): return resource.UnicodeWithMeta(item, resp) else: return resource.StrWithMeta(item, resp) elif isinstance(item, six.binary_type): return resource.BytesWithMeta(item, resp) elif isinstance(item, list): return resource.ListWithMeta(item, resp) elif isinstance(item, tuple): return resource.TupleWithMeta(item, resp) elif item is None: return resource.TupleWithMeta((), resp) else: return resource.DictWithMeta(item, resp)
def _call_audio_metadata(self, audio_meta, filepath): if audio_meta is False: # metadata crawling is disabled audio_meta = None elif audio_meta is None: # no MBID is given, attempt to get # it from id3 tag audio_meta = self.crawl_musicbrainz_metadata(filepath) elif isinstance(audio_meta, (six.string_types, six.binary_type)): # MBID is given audio_meta = self.crawl_musicbrainz_metadata(audio_meta) elif not isinstance(audio_meta, dict): warn_str = 'The "metadata" input can be "False" (skipped), ' \ '"basestring" (MBID input), "None" (attempt to get ' \ 'the MBID from audio file tags) or "dict" (already ' \ 'computed)' warnings.warn(warn_str, stacklevel=2) return audio_meta
def _assert_shard_manager(self, content, mode, mock_tree, mock_challenges): with tempfile.NamedTemporaryFile(mode, delete=False) as tmp_file: tmp_file.write(content) tmp_file.flush() nchallenges = 2 sm = ShardManager( tmp_file.name, nchallenges=nchallenges) assert sm.filepath == tmp_file.name assert len(sm.shards) > 0 mock_challenges.assert_called_once_with(nchallenges) if isinstance(content, six.binary_type): mock_tree.assert_called_once_with( mock_challenges.return_value, content) else: mock_tree.assert_called_once_with( mock_challenges.return_value, bytes(content.encode('utf-8'))) mock_tree.reset_mock() mock_challenges.reset_mock()
def b64decode(data): """JOSE Base64 decode. :param data: Base64 string to be decoded. If it's unicode, then only ASCII characters are allowed. :type data: `bytes` or `unicode` :returns: Decoded data. :rtype: bytes :raises TypeError: if input is of incorrect type :raises ValueError: if input is unicode with non-ASCII characters """ if isinstance(data, six.string_types): try: data = data.encode('ascii') except UnicodeEncodeError: raise ValueError( 'unicode argument should contain only ASCII characters') elif not isinstance(data, six.binary_type): raise TypeError('argument should be a str or unicode') return base64.urlsafe_b64decode(data + b'=' * (4 - (len(data) % 4)))
def certificate(self, cert, name, alt_host=None, port=443): """Verifies the certificate presented at name is cert""" if alt_host is None: host = socket.gethostbyname(name) elif isinstance(alt_host, six.binary_type): host = alt_host else: host = alt_host.encode() name = name if isinstance(name, six.binary_type) else name.encode() try: presented_cert = crypto_util.probe_sni(name, host, port) except acme_errors.Error as error: logger.exception(error) return False return presented_cert.digest("sha256") == cert.digest("sha256")
def headers_to_str_headers(headers): ''' Converts dict or tuple-based headers of bytes or str to tuple-based headers of str, which is the python norm (pep 3333) ''' ret = [] if isinstance(headers, collections_abc.Mapping): h = headers.items() else: h = headers if six.PY2: #pragma: no cover return h for tup in h: k, v = tup if isinstance(k, six.binary_type): k = k.decode('iso-8859-1') if isinstance(v, six.binary_type): v = v.decode('iso-8859-1') ret.append((k, v)) return ret
def __validate(self): if type(self._key) == six.binary_type: if self._range_end: self._key = KeySet(self._key, range_end=self._range_end) else: self._key = KeySet(self._key) elif isinstance(self._key, KeySet): pass else: raise TypeError( 'key must either be bytes or a KeySet object, not {}'.format(type(self._key))) if self._key.type == KeySet._SINGLE: self._range_end = None elif self._key.type == KeySet._PREFIX: self._range_end = _increment_last_byte(self._key.key) elif self._key.type == KeySet._RANGE: self._range_end = self._key.range_end else: raise Exception('logic error')
def __validate(self): if type(self._key) == six.binary_type: self._key = KeySet(self._key) elif isinstance(self._key, KeySet): pass else: raise TypeError( 'key must either be bytes or a KeySet object, not {}'.format(type(self._key))) if self._return_previous is not None and type(self._return_previous) != bool: raise TypeError('return_previous must be bool, not {}'.format( type(self._return_previous))) if self._key.type == KeySet._SINGLE: self._range_end = None elif self._key.type == KeySet._PREFIX: self._range_end = _increment_last_byte(self._key.key) elif self._key.type == KeySet._RANGE: self._range_end = self._key.range_end else: raise Exception('logic error')
def __init__(self, key, return_previous=None): """ :param key: The key or key set to delete. :type key: bytes or :class:`txaioetcd.KeySet` :param return_previous: If enabled, return the deleted key-value pairs :type return_previous: bool or None """ Op.__init__(self) if key is not None and type(key) != six.binary_type and not isinstance(key, KeySet): raise TypeError('key must be bytes or KeySet, not {}'.format(type(key))) if isinstance(key, KeySet): self.key = key else: self.key = KeySet(key) if return_previous is not None and type(return_previous) != bool: raise TypeError('return_previous must be bool, not {}'.format(type(return_previous))) self.return_previous = return_previous
def _apply(self, value): value = super(Array, self)._apply(value) # type: Sequence if self._has_errors: return None if isinstance(value, (binary_type, text_type)): return self._invalid_value( value = value, reason = self.CODE_WRONG_TYPE, template_vars = { 'incoming': self.get_type_name(type(value)), 'allowed': self.get_allowed_type_names(), }, ) return value
def assign_attr_value(attr, val): from mmdnn.conversion.common.IR.graph_pb2 import TensorShape '''Assign value to AttrValue proto according to data type.''' if isinstance(val, bool): attr.b = val elif isinstance(val, integer_types): attr.i = val elif isinstance(val, float): attr.f = val elif isinstance(val, binary_type) or isinstance(val, text_type): if hasattr(val, 'encode'): val = val.encode() attr.s = val elif isinstance(val, TensorShape): attr.shape.MergeFromString(val.SerializeToString()) elif isinstance(val, list): if not val: return if isinstance(val[0], integer_types): attr.list.i.extend(val) elif isinstance(val[0], TensorShape): attr.list.shape.extend(val) else: raise NotImplementedError('AttrValue cannot be of list[{}].'.format(val[0])) else: raise NotImplementedError('AttrValue cannot be of %s' % type(val))
def write_np_values(values, f): """ Arguments: values: {str: np.array} f: filename or filelike object """ with ZipFile(f, 'w') as zf: for k, v in values.items(): # Need to do this because Python zipfile has some odd support for filenames: # http://bugs.python.org/issue24110 if len(k) == 16 and isinstance(k, six.binary_type): # valid UUID bytes zf.writestr(str(uuid.UUID(bytes=k)), v.tostring()) else: zf.writestr(six.u(k), v.tostring()) zf.writestr(MANIFEST_FILENAME, json_dumps_manifest(values))
def generate_scalars_binary(scalars_map, preceding_symbols=0): for ion_type, values in six.iteritems(scalars_map): for native, expected in values: native_expected = expected has_symbols = False if native is None: # An un-adorned 'None' doesn't contain enough information to determine its Ion type native_expected = b'\x0f' elif ion_type is IonType.CLOB: # All six.binary_type are treated as BLOBs unless wrapped by an _IonNature tid = six.byte2int(expected) + 0x10 # increment upper nibble for clob -> blob; keep lower nibble native_expected = bytearray([tid]) + expected[1:] elif ion_type is IonType.SYMBOL and native is not None: has_symbols = True elif ion_type is IonType.STRING: # Encode all strings as symbols too. symbol_expected = _serialize_symbol( IonEvent(IonEventType.SCALAR, IonType.SYMBOL, SymbolToken(None, 10 + preceding_symbols))) yield _Parameter(IonType.SYMBOL.name + ' ' + native, IonPyText.from_value(IonType.SYMBOL, native), symbol_expected, True) yield _Parameter('%s %s' % (ion_type.name, native), native, native_expected, has_symbols) wrapper = _FROM_ION_TYPE[ion_type].from_value(ion_type, native) yield _Parameter(repr(wrapper), wrapper, expected, has_symbols)
def convert_unicode_to_str(data): """ Py2, always translate to utf8 from unicode Py3, always translate to unicode :param data: :return: """ if six.PY2 and isinstance(data, six.text_type): return data.encode('utf8') elif six.PY3 and isinstance(data, six.binary_type): return data.decode('utf8') elif isinstance(data, collections.Mapping): return dict((Util.convert_unicode_to_str(k), Util.convert_unicode_to_str(v)) for k, v in six.iteritems(data)) elif isinstance(data, collections.Iterable) and not isinstance(data, (six.binary_type, six.string_types)): return type(data)(map(Util.convert_unicode_to_str, data)) return data
def test_get_fingerprint(self): # Make sure _get_fingerprint() generates a consistent fingerprint MyObject.VERSION = '1.1' argspec = 'vulpix' with mock.patch('inspect.getargspec') as mock_gas: mock_gas.return_value = argspec fp = self.ovc._get_fingerprint(MyObject.__name__) exp_fields = sorted(list(MyObject.fields.items())) exp_methods = sorted([('remotable_method', argspec), ('remotable_classmethod', argspec)]) expected_relevant_data = (exp_fields, exp_methods) expected_hash = hashlib.md5(six.binary_type(repr( expected_relevant_data).encode())).hexdigest() expected_fp = '%s-%s' % (MyObject.VERSION, expected_hash) self.assertEqual(expected_fp, fp, "_get_fingerprint() did not " "generate a correct fingerprint.")
def test_get_fingerprint_with_child_versions(self): # Make sure _get_fingerprint() generates a consistent fingerprint # when child_versions are present child_versions = {'1.0': '1.0', '1.1': '1.1'} MyObject.VERSION = '1.1' MyObject.child_versions = child_versions argspec = 'onix' with mock.patch('inspect.getargspec') as mock_gas: mock_gas.return_value = argspec fp = self.ovc._get_fingerprint(MyObject.__name__) exp_fields = sorted(list(MyObject.fields.items())) exp_methods = sorted([('remotable_method', argspec), ('remotable_classmethod', argspec)]) exp_child_versions = collections.OrderedDict(sorted( child_versions.items())) exp_relevant_data = (exp_fields, exp_methods, exp_child_versions) expected_hash = hashlib.md5(six.binary_type(repr( exp_relevant_data).encode())).hexdigest() expected_fp = '%s-%s' % (MyObject.VERSION, expected_hash) self.assertEqual(expected_fp, fp, "_get_fingerprint() did not " "generate a correct fingerprint.")
def _decrypt(self): decrypted_eval_data = [] for row in EvalRow.objects.all(): decrypted_row = {'pk': row.pk, 'user': row.user_identifier, 'record': row.record_identifier, 'action': row.action, 'timestamp': row.timestamp.__str__()} gpg = gnupg.GPG() gpg.import_keys(settings.CALLISTO_EVAL_PRIVATE_KEY) decrypted_eval_row = six.text_type( gpg.decrypt(six.binary_type(row.row))) if decrypted_eval_row: decrypted_row.update(json.loads(decrypted_eval_row)) decrypted_eval_data.append(decrypted_row) return decrypted_eval_data
def test_20_dump_drop_restore(self): # dump db dump_data = self.client.services.db.dump_db(self.env.super_password, self.env.dbname) self.assertIsInstance(dump_data, six.binary_type) # drop it self.client.services.db.drop_db(self.env.super_password, self.env.dbname) self.assertNotIn(self.env.dbname, self.client.services.db) # and try to restore it self.client.services.db.restore_db(self.env.super_password, self.env.dbname, dump_data) self.assertIn(self.env.dbname, self.client.services.db) # check if objects were restored time.sleep(2) cl = self.client.login(self.env.dbname, self.env.user, self.env.password) self.assertIsNotNone(cl.uid) self.assertIsNotNone(cl.user)
def recv_packet(self): try: packet_text = self._connection.recv() except WebSocketTimeoutException as e: raise TimeoutError('recv timed out (%s)' % e) except SSLError as e: raise ConnectionError('recv disconnected by SSL (%s)' % e) except WebSocketConnectionClosedException as e: raise ConnectionError('recv disconnected (%s)' % e) except SocketError as e: raise ConnectionError('recv disconnected (%s)' % e) if not isinstance(packet_text, six.binary_type): packet_text = packet_text.encode('utf-8') engineIO_packet_type, engineIO_packet_data = parse_packet_text( packet_text) yield engineIO_packet_type, engineIO_packet_data
def crypto_box(msg, nonce, pk, sk): _assert_len('nonce', nonce, crypto_box_NONCEBYTES) _assert_len('pk', pk, crypto_box_PUBLICKEYBYTES) _assert_len('sk', sk, crypto_box_SECRETKEYBYTES) c = bytearray(crypto_box_MACBYTES + len(msg)) _raise_on_error( lib.crypto_box_easy( ffi.cast("unsigned char *", ffi.from_buffer(c)), msg, len(msg), nonce, pk, sk, ), ) return binary_type(c)
def crypto_box_open(c, nonce, pk, sk): _assert_min_len('c', c, crypto_box_MACBYTES) _assert_len('nonce', nonce, crypto_box_NONCEBYTES) _assert_len('pk', pk, crypto_box_PUBLICKEYBYTES) _assert_len('sk', sk, crypto_box_SECRETKEYBYTES) msg = bytearray(len(c) - crypto_box_MACBYTES) _raise_on_error( lib.crypto_box_open_easy( ffi.cast("unsigned char *", ffi.from_buffer(msg)), c, len(c), nonce, pk, sk, ), ) return binary_type(msg)
def crypto_box_open_afternm(c, nonce, k): _assert_min_len('c', c, crypto_box_MACBYTES) _assert_len('nonce', nonce, crypto_box_NONCEBYTES) _assert_len('k', k, crypto_box_BEFORENMBYTES) msg = bytearray(len(c) - crypto_box_MACBYTES) _raise_on_error( lib.crypto_box_open_easy_afternm( ffi.cast("unsigned char *", ffi.from_buffer(msg)), c, len(c), nonce, k, ), ) return binary_type(msg)