我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用six.byte2int()。
def _get_close_args(self, data): """ this functions extracts the code, reason from the close body if they exists, and if the self.on_close except three arguments """ import inspect # if the on_close callback is "old", just return empty list if sys.version_info < (3, 0): if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3: return [] else: if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3: return [] if data and len(data) >= 2: code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2]) reason = data[2:].decode('utf-8') return [code, reason] return [None, None]
def _bytes_text(code_point_iter, quote, prefix=b'', suffix=b''): quote_code_point = six.byte2int(quote) buf = BytesIO() buf.write(prefix) buf.write(quote) for code_point in code_point_iter: if code_point == quote_code_point: buf.write(b'\\' + quote) elif code_point == six.byte2int(b'\\'): buf.write(b'\\\\') elif _is_printable_ascii(code_point): buf.write(six.int2byte(code_point)) else: buf.write(_escape(code_point)) buf.write(quote) buf.write(suffix) return buf.getvalue()
def generate_scalars_binary(scalars_map, preceding_symbols=0): for ion_type, values in six.iteritems(scalars_map): for native, expected in values: native_expected = expected has_symbols = False if native is None: # An un-adorned 'None' doesn't contain enough information to determine its Ion type native_expected = b'\x0f' elif ion_type is IonType.CLOB: # All six.binary_type are treated as BLOBs unless wrapped by an _IonNature tid = six.byte2int(expected) + 0x10 # increment upper nibble for clob -> blob; keep lower nibble native_expected = bytearray([tid]) + expected[1:] elif ion_type is IonType.SYMBOL and native is not None: has_symbols = True elif ion_type is IonType.STRING: # Encode all strings as symbols too. symbol_expected = _serialize_symbol( IonEvent(IonEventType.SCALAR, IonType.SYMBOL, SymbolToken(None, 10 + preceding_symbols))) yield _Parameter(IonType.SYMBOL.name + ' ' + native, IonPyText.from_value(IonType.SYMBOL, native), symbol_expected, True) yield _Parameter('%s %s' % (ion_type.name, native), native, native_expected, has_symbols) wrapper = _FROM_ION_TYPE[ion_type].from_value(ion_type, native) yield _Parameter(repr(wrapper), wrapper, expected, has_symbols)
def get_time(self): """Read out stations internal time. @return: datetime """ bintime = self._send_command(SIReader.C_GET_TIME, b'')[1] year = byte2int(bintime[0]) month = byte2int(bintime[1]) day = byte2int(bintime[2]) am_pm = byte2int(bintime[3]) & 0b1 second = SIReader._to_int(bintime[4:6]) hour = am_pm * 12 + second // 3600 second %= 3600 minute = second // 60 second %= 60 ms = int(round(byte2int(bintime[6]) / 256.0 * 1000000)) self.beep() try: return datetime(year, month, day, hour, minute, second, ms) except ValueError: # return None if the time reported by the station is impossible return None
def _update_proto_config(self): # Read protocol configuration ret = self._send_command(SIReader.C_GET_SYS_VAL, SIReader.O_PROTO + b'\x01') config_byte = byte2int(ret[1][1]) self.proto_config = {} self.proto_config['ext_proto'] = config_byte & (1 << 0) != 0 self.proto_config['auto_send'] = config_byte & (1 << 1) != 0 self.proto_config['handshake'] = config_byte & (1 << 2) != 0 self.proto_config['pw_access'] = config_byte & (1 << 4) != 0 self.proto_config['punch_read'] = config_byte & (1 << 7) != 0 # Read operating mode ret = self._send_command(SIReader.C_GET_SYS_VAL, SIReader.O_MODE + b'\x01') self.proto_config['mode'] = byte2int(ret[1][1]) return self.proto_config
def _get_close_args(self, data): """ this functions extracts the code, reason from the close body if they exists, and if the self.on_close except three arguments """ import inspect # if the on_close callback is "old", just return empty list if sys.version_info < (3, 0): if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3: return [] else: if not self.on_close or len(inspect.getfullargspec(self.on_close).args) != 3: return [] if data and len(data) >= 2: code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2]) reason = data[2:].decode('utf-8') return [code, reason] return [None, None]
def decode(self, encoded_payload): """Decode a transmitted payload.""" self.packets = [] while encoded_payload: if six.byte2int(encoded_payload[0:1]) <= 1: packet_len = 0 i = 1 while six.byte2int(encoded_payload[i:i + 1]) != 255: packet_len = packet_len * 10 + six.byte2int( encoded_payload[i:i + 1]) i += 1 self.packets.append(packet.Packet( encoded_packet=encoded_payload[i + 1:i + 1 + packet_len])) else: i = encoded_payload.find(b':') if i == -1: raise ValueError('invalid payload') packet_len = int(encoded_payload[0:i]) pkt = encoded_payload[i + 1: i + 1 + packet_len] self.packets.append(packet.Packet(encoded_packet=pkt)) encoded_payload = encoded_payload[i + 1 + packet_len:]
def validate(self, skip_utf8_validation=False): """ validate the ABNF frame. skip_utf8_validation: skip utf8 validation. """ if self.rsv1 or self.rsv2 or self.rsv3: raise WebSocketProtocolException("rsv is not implemented, yet") if self.opcode not in ABNF.OPCODES: raise WebSocketProtocolException("Invalid opcode %r", self.opcode) if self.opcode == ABNF.OPCODE_PING and not self.fin: raise WebSocketProtocolException("Invalid ping frame.") if self.opcode == ABNF.OPCODE_CLOSE: l = len(self.data) if not l: return if l == 1 or l >= 126: raise WebSocketProtocolException("Invalid close frame.") if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]): raise WebSocketProtocolException("Invalid close frame.") code = 256*six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2]) if not self._is_valid_close_status(code): raise WebSocketProtocolException("Invalid close opcode.")
def _scalar_event_pairs(data, events, info): """Generates event pairs for all scalars. Each scalar is represented by a sequence whose first element is the raw data and whose following elements are the expected output events. """ first = True delimiter, in_container = info space_delimited = not (b',' in delimiter) for event in events: input_event = NEXT if first: input_event = e_read(data + delimiter) if space_delimited and event.value is not None \ and ((event.ion_type is IonType.SYMBOL) or (event.ion_type is IonType.STRING and six.byte2int(b'"') != six.indexbytes(data, 0))): # triple-quoted strings # Because annotations and field names are symbols, a space delimiter after a symbol isn't enough to # generate a symbol event immediately. Similarly, triple-quoted strings may be followed by another # triple-quoted string if only delimited by whitespace or comments. yield input_event, INC if in_container: # Within s-expressions, these types are delimited in these tests by another value - in this case, # int 0 (but it could be anything). yield e_read(b'0' + delimiter), event input_event, event = (NEXT, e_int(0)) else: # This is a top-level value, so it may be flushed with NEXT after INCOMPLETE. input_event, event = (NEXT, event) first = False yield input_event, event
def _decode_received(self, msg): """Returns either bytes or str, depending on message type.""" if not isinstance(msg, six.binary_type): # already decoded - do nothing return msg # only decode from utf-8 if message is not binary data type = six.byte2int(msg[0:1]) if type >= 48: # no binary return msg.decode('utf-8') # binary message, don't try to decode return msg
def decode(self, encoded_packet): """Decode a transmitted package.""" b64 = False if not isinstance(encoded_packet, binary_types): encoded_packet = encoded_packet.encode('utf-8') elif not isinstance(encoded_packet, bytes): encoded_packet = bytes(encoded_packet) self.packet_type = six.byte2int(encoded_packet[0:1]) if self.packet_type == 98: # 'b' --> binary base64 encoded packet self.binary = True encoded_packet = encoded_packet[1:] self.packet_type = six.byte2int(encoded_packet[0:1]) self.packet_type -= 48 b64 = True elif self.packet_type >= 48: self.packet_type -= 48 self.binary = False else: self.binary = True self.data = None if len(encoded_packet) > 1: if self.binary: if b64: self.data = base64.b64decode(encoded_packet[1:]) else: self.data = encoded_packet[1:] else: try: self.data = self.json.loads( encoded_packet[1:].decode('utf-8')) except ValueError: self.data = encoded_packet[1:].decode('utf-8')
def extract_message(cls, raw_bytes): if len(raw_bytes) < 2: return None, raw_bytes if six.byte2int(raw_bytes) != 0x1e: raise FramingError( 'Start marker is missing: %s' % raw_bytes) if b'\x0a' in raw_bytes: b_msg, rest = raw_bytes.split(b'\x0a', 1) return b_msg[1:], rest else: if b'\x1e' in raw_bytes[1:]: raise FramingError( 'End marker is missing: %s' % raw_bytes) return None, raw_bytes
def extract_message(cls, raw_bytes): if len(raw_bytes) < 2: return None, raw_bytes if six.byte2int(raw_bytes) != 123: raise FramingError( 'Broken state. Expected JSON Object, got: %s' % raw_bytes) stack = [123] uniesc = 0 poppers = {91: [93], 123: [125], 34: [34]} adders = {91: [34, 91, 123], 123: [34, 91, 123], 34: [92], 92: [117]} for idx in range(1, len(raw_bytes)): cbyte = six.indexbytes(raw_bytes, idx) if cbyte in poppers.get(stack[-1], []): stack.pop() elif cbyte in adders.get(stack[-1], []): stack.append(cbyte) elif stack[-1] == 92: stack.pop() elif stack[-1] == 117: uniesc += 1 if uniesc >= 4: stack = stack[:-2] uniesc = 0 if not stack: return raw_bytes[:idx + 1], raw_bytes[idx + 1:] return None, raw_bytes
def _get_close_args(self, data): if data and len(data) >= 2: code = 256 * six.byte2int(data[0:1]) + six.byte2int(data[1:2]) reason = data[2:].decode('utf-8') return [code, reason] return [None, None]
def _get_close_args(self, data): """ this functions extracts the code, reason from the close body if they exists, and if the self.on_close except three arguments """ import inspect # if the on_close callback is "old", just return empty list if not self.on_close or len(inspect.getargspec(self.on_close).args) != 3: return [] if data and len(data) >= 2: code = 256*six.byte2int(data[0:1]) + six.byte2int(data[1:2]) reason = data[2:].decode('utf-8') return [code, reason] return [None, None]
def byte2int(x): try: return x[0] except TypeError: return x
def _decode_cardnr(number): """Decodes a 4 byte cardnr to an int. SI-Card numbering is a bit odd: SI-Card 5: - byte 0: always 0 (not stored on the card) - byte 1: card series (stored on the card as CNS) - byte 2,3: card number - printed: 100'000*CNS + card number - nr range: 1-65'000 + 200'001-265'000 + 300'001-365'000 + 400'001-465'000 SI-Card 6/6*/8/9/10/11/pCard/tCard/fCard/SIAC1: - byte 0: card series (SI6:00, SI9:01, SI8:02, pCard:04, tCard:06, fCard:0E, SI10+SI11+SIAC1:0F) - byte 1-3: card number - printed: only card number - nr range: - SI6: 500'000-999'999 + 2'003'000-2'003'400 (WM2003) + 16'711'680-16'777'215 (SI6*) - SI9: 1'000'000-1'999'999, SI8: 2'000'000-2'999'999 - pCard: 4'000'000-4'999'999, tCard: 6'000'000-6'999'999 - SI10: 7'000'000-7'999'999, SIAC1: 8'000'000-8'999'999 - SI11: 9'000'000-9'999'999, fCard: 14'000'000-14'999'999 The card nr ranges guarantee that no ambigous values are possible (500'000 = 0x07A120 > 0x04FFFF = 465'535 = highest technically possible value on a SI5) """ if number[0:1] != b'\x00': raise SIReaderException('Unknown card series') nr = SIReader._to_int(number[1:4]) if nr < 500000: # SI5 card ret = SIReader._to_int(number[2:4]) if byte2int(number[1]) < 2: # Card series 0 and 1 do not have the 0/1 printed on the card return ret else: return byte2int(number[1]) * 100000 + ret else: # SI6/8/9 return nr
def poll_punch(self): """Polls for new punches. @return: list of (cardnr, punchtime) tuples, empty list if no new punches are available """ if not self.proto_config['ext_proto']: raise SIReaderException('This command only supports stations in "Extended Protocol" ' 'mode. Switch mode first') if not self.proto_config['auto_send']: raise SIReaderException('This command only supports stations in "Autosend" ' 'mode. Switch mode first') punches = [] while True: try: c = self._read_command(timeout=0) except SIReaderTimeout: break if c[0] == SIReader.C_TRANS_REC: cur_offset = SIReader._to_int(c[1][SIReader.T_OFFSET:SIReader.T_OFFSET + 3]) if self._next_offset is not None: while self._next_offset < cur_offset: # recover lost punches punches.append(self._read_punch(self._next_offset)) self._next_offset += SIReader.REC_LEN self._next_offset = cur_offset + SIReader.REC_LEN punches.append((self._decode_cardnr(c[1][SIReader.T_CN:SIReader.T_CN + 4]), self._decode_time(c[1][SIReader.T_TIME:SIReader.T_TIME + 2]))) else: raise SIReaderException('Unexpected command %s received' % hex(byte2int(c[0]))) return punches
def __init__(self, value=0): if not isinstance(value, six.integer_types): value = six.byte2int(value) self.v = C + norm(abs(int(value)))
def validate(self, skip_utf8_validation=False): """ validate the ABNF frame. skip_utf8_validation: skip utf8 validation. """ if self.rsv1 or self.rsv2 or self.rsv3: raise WebSocketProtocolException("rsv is not implemented, yet") if self.opcode not in ABNF.OPCODES: raise WebSocketProtocolException("Invalid opcode %r", self.opcode) if self.opcode == ABNF.OPCODE_PING and not self.fin: raise WebSocketProtocolException("Invalid ping frame.") if self.opcode == ABNF.OPCODE_CLOSE: l = len(self.data) if not l: return if l == 1 or l >= 126: raise WebSocketProtocolException("Invalid close frame.") if l > 2 and not skip_utf8_validation and not validate_utf8(self.data[2:]): raise WebSocketProtocolException("Invalid close frame.") code = 256 * \ six.byte2int(self.data[0:1]) + six.byte2int(self.data[1:2]) if not self._is_valid_close_status(code): raise WebSocketProtocolException("Invalid close opcode.")
def decode(self, encoded_packet): """Decode a transmitted package.""" b64 = False if not isinstance(encoded_packet, six.binary_type): encoded_packet = encoded_packet.encode('utf-8') self.packet_type = six.byte2int(encoded_packet[0:1]) if self.packet_type == 98: # 'b' --> binary base64 encoded packet self.binary = True encoded_packet = encoded_packet[1:] self.packet_type = six.byte2int(encoded_packet[0:1]) self.packet_type -= 48 b64 = True elif self.packet_type >= 48: self.packet_type -= 48 self.binary = False else: self.binary = True self.data = None if len(encoded_packet) > 1: if self.binary: if b64: self.data = base64.b64decode(encoded_packet[1:]) else: self.data = encoded_packet[1:] else: try: self.data = self.json.loads( encoded_packet[1:].decode('utf-8')) except ValueError: self.data = encoded_packet[1:].decode('utf-8')
def byte2int(bytes): return int(bytes[0])
def test_byte2int(): assert six.byte2int(six.b("\x03")) == 3 assert six.byte2int(six.b("\x03\x04")) == 3 py.test.raises(IndexError, six.byte2int, six.b(""))
def decode(self, encoded_packet): """Decode a transmitted package.""" b64 = False self.packet_type = six.byte2int(encoded_packet[0:1]) if self.packet_type == 98: # 'b' --> binary base64 encoded packet self.binary = True encoded_packet = encoded_packet[1:] self.packet_type = six.byte2int(encoded_packet[0:1]) self.packet_type -= 48 b64 = True elif self.packet_type >= 48: self.packet_type -= 48 self.binary = False else: self.binary = True self.data = None if len(encoded_packet) > 1: if self.binary: if b64: self.data = base64.b64decode(encoded_packet[1:]) else: self.data = encoded_packet[1:] else: try: self.data = self.json.loads( encoded_packet[1:].decode('utf-8')) except ValueError: self.data = encoded_packet[1:].decode('utf-8')