我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用struct.unpack()。
def encode_8(bytes, key, terminator): """ Encode the bytecode with the given 8-bit XOR key. :type bytes: str :param bytes: Bytecode to encode. :type key: str :param key: 8-bit XOR key. :type terminator: str :param terminator: 8-bit terminator. :rtype: str :return: Encoded bytecode. """ if not bytes.endswith(terminator): bytes += terminator fmt = "B" * len(bytes) unpack = struct.unpack pad = unpack("B", key) * len(bytes) bytes = unpack(fmt, bytes) bytes = [ bytes[i] ^ pad[i] for i in xrange(len(bytes)) ] return struct.pack(fmt, *bytes)
def decrypt_file(key, in_filename, out_filename=None, chunksize=24*1024): # Split .crypt extension to restore file format if not out_filename: out_filename = os.path.splitext(in_filename)[0] with open(in_filename, 'rb') as infile: origsize = struct.unpack('<Q', infile.read(struct.calcsize('Q')))[0] iv = infile.read(16) decryptor = AES.new(key, AES.MODE_CBC, iv) with open(out_filename, 'wb') as outfile: while True: chunk = infile.read(chunksize) if len(chunk) == 0: break outfile.write(decryptor.decrypt(chunk)) # Truncate file to original size outfile.truncate(origsize)
def parse_default(field, ftype, fdefault): if not (ftype == 'bool' and fdefault == 'true'): try: fdefault = literal_eval(fdefault.rstrip('LDF')) except (ValueError, SyntaxError): fdefault = None if type(fdefault) is int: if ftype[0] != 'u' and ftype[:5] != 'fixed': if fdefault >> 63: fdefault = c_long(fdefault).value elif fdefault >> 31 and ftype[-2:] != '64': fdefault = c_int(fdefault).value else: fdefault &= (1 << int(ftype[-2:])) - 1 if ftype == 'float' and abs(fdefault) >> 23: fdefault = unpack('=f', pack('=i', fdefault))[0] elif ftype == 'double' and abs(fdefault) >> 52: fdefault = unpack('=d', pack('=q', fdefault))[0] if fdefault: field.default_value = str(fdefault)
def parse_bgzf_header(f): cur_pos = f.tell() header_fmt = "BBBBIBBH" d = f.read(12) # We are at EOF when read returns an empty string if d == '': return None header = struct.unpack(header_fmt, d) # Check for a valid gzip header if header[0] != 31 or header[1] != 139: raise Exception("Not a valid gzip header") xlen = header[7] bsize = get_bsize(f, f.tell(), xlen) next_pos = cur_pos + bsize + 1 f.seek(next_pos) return next_pos
def get_user_hashes(user_key, hbootkey): samaddr = user_key.space rid = int(user_key.Name, 16) V = None for v in values(user_key): if v.Name == 'V': V = samaddr.read(v.Data.value, v.DataLength.value) if not V: return None hash_offset = unpack("<L", V[0x9c:0x9c+4])[0] + 0xCC lm_exists = True if unpack("<L", V[0x9c+4:0x9c+8])[0] == 20 else False nt_exists = True if unpack("<L", V[0x9c+16:0x9c+20])[0] == 20 else False enc_lm_hash = V[hash_offset+4:hash_offset+20] if lm_exists else "" enc_nt_hash = V[hash_offset+(24 if lm_exists else 8):hash_offset+(24 if lm_exists else 8)+16] if nt_exists else "" return decrypt_hashes(rid, enc_lm_hash, enc_nt_hash, hbootkey)
def decode(self, offset): """Decode a section of the data section starting at offset Arguments: offset -- the location of the data structure to decode """ new_offset = offset + 1 (ctrl_byte,) = struct.unpack(b'!B', self._buffer[offset:new_offset]) type_num = ctrl_byte >> 5 # Extended type if not type_num: (type_num, new_offset) = self._read_extended(new_offset) (size, new_offset) = self._size_from_ctrl_byte( ctrl_byte, new_offset, type_num) return self._type_decoder[type_num](self, size, new_offset)
def _size_from_ctrl_byte(self, ctrl_byte, offset, type_num): size = ctrl_byte & 0x1f if type_num == 1: return size, offset bytes_to_read = 0 if size < 29 else size - 28 new_offset = offset + bytes_to_read size_bytes = self._buffer[offset:new_offset] # Using unpack rather than int_from_bytes as it is about 200 lookups # per second faster here. if size == 29: size = 29 + struct.unpack(b'!B', size_bytes)[0] elif size == 30: size = 285 + struct.unpack(b'!H', size_bytes)[0] elif size > 30: size = struct.unpack( b'!I', size_bytes.rjust(4, b'\x00'))[0] + 65821 return size, new_offset
def _read_node(self, node_number, index): base_offset = node_number * self._metadata.node_byte_size record_size = self._metadata.record_size if record_size == 24: offset = base_offset + index * 3 node_bytes = b'\x00' + self._buffer[offset:offset + 3] elif record_size == 28: (middle,) = struct.unpack( b'!B', self._buffer[base_offset + 3:base_offset + 4]) if index: middle &= 0x0F else: middle = (0xF0 & middle) >> 4 offset = base_offset + index * 4 node_bytes = byte_from_int( middle) + self._buffer[offset:offset + 3] elif record_size == 32: offset = base_offset + index * 4 node_bytes = self._buffer[offset:offset + 4] else: raise InvalidDatabaseError( 'Unknown record size: {0}'.format(record_size)) return struct.unpack(b'!I', node_bytes)[0]
def grayscale(self): """ Convert the image into a (24-bit) grayscale one, using the Y'UV method. """ # http://en.wikipedia.org/wiki/YUV Wr = 0.299 Wb = 0.114 Wg = 0.587 mod_bitmap = "" f = StringIO(self.bitmap_data) for row_num in xrange(0, self.height): for pix in xrange(0, self.width): pixel = struct.unpack("3B", f.read(3)) out_pix = chr(int(Wr * pixel[2] + Wg * pixel[1] + Wb * pixel[0])) mod_bitmap += out_pix * 3 mod_bitmap += chr(0x00) * self.padding_size f.seek(self.padding_size, 1) self.bitmap_data = mod_bitmap return self
def _receiveMsg(self): """ Receive OSC message from a socket and decode. If an error occurs, None is returned, else the message. """ # get OSC packet size from stream which is prepended each transmission chunk = self._receive(4) if chunk == None: print("SERVER: Socket has been closed.") return None # extract message length from big endian unsigned long (32 bit) slen = struct.unpack(">L", chunk)[0] # receive the actual message chunk = self._receive(slen) if chunk == None: print("SERVER: Socket has been closed.") return None # decode OSC data and dispatch msg = decodeOSC(chunk) if msg == None: raise OSCError("SERVER: Message decoding failed.") return msg
def _receiveMsgWithTimeout(self): """ Receive OSC message from a socket and decode. If an error occurs, None is returned, else the message. """ # get OSC packet size from stream which is prepended each transmission chunk = self._receiveWithTimeout(4) if not chunk: return None # extract message length from big endian unsigned long (32 bit) slen = struct.unpack(">L", chunk)[0] # receive the actual message chunk = self._receiveWithTimeout(slen) if not chunk: return None # decode OSC content msg = decodeOSC(chunk) if msg == None: raise OSCError("CLIENT: Message decoding failed.") return msg
def _receiveMsg(self): """ Receive OSC message from a socket and decode. If an error occurs, None is returned, else the message. """ # get OSC packet size from stream which is prepended each transmission chunk = self._receive(4) if chunk == None: print "SERVER: Socket has been closed." return None # extract message length from big endian unsigned long (32 bit) slen = struct.unpack(">L", chunk)[0] # receive the actual message chunk = self._receive(slen) if chunk == None: print "SERVER: Socket has been closed." return None # decode OSC data and dispatch msg = decodeOSC(chunk) if msg == None: raise OSCError("SERVER: Message decoding failed.") return msg
def from_key_val_list(value): """Take an object and test to see if it can be represented as a dictionary. Unless it can not be represented as such, return an OrderedDict, e.g., :: >>> from_key_val_list([('key', 'val')]) OrderedDict([('key', 'val')]) >>> from_key_val_list('string') ValueError: need more than 1 value to unpack >>> from_key_val_list({'key': 'val'}) OrderedDict([('key', 'val')]) :rtype: OrderedDict """ if value is None: return None if isinstance(value, (str, bytes, bool, int)): raise ValueError('cannot encode objects that are not 2-tuples') return OrderedDict(value)
def addressInNetwork(self, ip, cidr): # the ip can be the emtpy string ('') in cases where the connection # is made via a web proxy. in these cases the sensor cannot report # the true remote IP as DNS resolution happens on the web proxy (and # not the endpoint) if '' == ip: return False try: net = cidr.split('/')[0] bits = cidr.split('/')[1] if int(ip) > 0: ipaddr = struct.unpack('<L', socket.inet_aton(ip))[0] else: ipaddr = struct.unpack('<L', socket.inet_aton(".".join(map(lambda n: str(int(ip)>>n & 0xFF), [24,16,8,0]))))[0] netaddr = struct.unpack('<L', socket.inet_aton(net))[0] netmask = ((1L << int(bits)) - 1) return ipaddr & netmask == netaddr & netmask except: return False
def file_scanlines(self, infile): """ Generates boxed rows in flat pixel format, from the input file `infile`. It assumes that the input file is in a "Netpbm-like" binary format, and is positioned at the beginning of the first pixel. The number of pixels to read is taken from the image dimensions (`width`, `height`, `planes`) and the number of bytes per value is implied by the image `bitdepth`. """ # Values per row vpr = self.width * self.planes row_bytes = vpr if self.bitdepth > 8: assert self.bitdepth == 16 row_bytes *= 2 fmt = '>%dH' % vpr def line(): return array('H', struct.unpack(fmt, infile.read(row_bytes))) else: def line(): scanline = array('B', infile.read(row_bytes)) return scanline for y in range(self.height): yield line()
def chunklentype(self): """Reads just enough of the input to determine the next chunk's length and type, returned as a (*length*, *type*) pair where *type* is a string. If there are no more chunks, ``None`` is returned. """ x = self.file.read(8) if not x: return None if len(x) != 8: raise FormatError( 'End of file whilst reading chunk length and type.') length,type = struct.unpack('!I4s', x) if length > 2**31-1: raise FormatError('Chunk %s is too large: %d.' % (type,length)) return length,type
def _process_tRNS(self, data): # http://www.w3.org/TR/PNG/#11tRNS self.trns = data if self.colormap: if not self.plte: warnings.warn("PLTE chunk is required before tRNS chunk.") else: if len(data) > len(self.plte)/3: # Was warning, but promoted to Error as it # would otherwise cause pain later on. raise FormatError("tRNS chunk is too long.") else: if self.alpha: raise FormatError( "tRNS chunk is not valid with colour type %d." % self.color_type) try: self.transparent = \ struct.unpack("!%dH" % self.color_planes, data) except struct.error: raise FormatError("tRNS chunk has incorrect length.")
def parse_policy(raw_data): """Parse policy data.""" policy = {} raw_int = _raw_to_int(raw_data) policy['domain_id'] = DOMAINS_REV[raw_int[3] & 0x0F] policy['enabled'] = bool(raw_int[3] & 0x10) policy['per_domain_enabled'] = bool(raw_int[3] & 0x20) policy['global_enabled'] = bool(raw_int[3] & 0x40) policy['created_by_nm'] = not bool(raw_int[3] & 0x80) policy['policy_trigger'] = TRIGGERS_REV[raw_int[4] & 0x0F] policy['power_policy'] = bool(raw_int[4] & 0x10) power_correction = CPU_CORRECTION_REV[raw_int[4] & 0x60] policy['cpu_power_correction'] = power_correction policy['storage'] = STORAGE_REV[raw_int[4] & 0x80] policy['action'] = ACTIONS_REV[raw_int[5] & 0x01] policy['power_domain'] = POWER_DOMAIN_REV[raw_int[5] & 0x80] policy_values = struct.unpack('<HIHH', bytearray(raw_int[6:])) policy_names = ('target_limit', 'correction_time', 'trigger_limit', 'reporting_period') _add_to_dict(policy, policy_values, policy_names) return policy
def parse_capabilities(raw_data): """Parse capabilities data.""" capabilities = {} raw_int = _raw_to_int(raw_data) capabilities['max_policies'] = raw_int[3] capabilities_values = struct.unpack('<HHIIHH', bytearray( raw_int[4:20])) capabilities_names = ('max_limit_value', 'min_limit_value', 'min_correction_time', 'max_correction_time', 'min_reporting_period', 'max_reporting_period') _add_to_dict(capabilities, capabilities_values, capabilities_names) capabilities['domain_id'] = DOMAINS_REV[raw_int[20] & 0x0F] power_domain = POWER_DOMAIN_REV[raw_int[20] & 0x80] capabilities['power_domain'] = power_domain return capabilities
def __init__(self, *args, **kwargs): if args: self.type = struct.unpack('>I', args[0][:4])[0] # check if it's an event data record that uses archival timestamps and if we've set archival if self.type in ARCHIVAL_RCD_TYPES and config.test_bit(Struct.get_flags(), 23) and 'reserved' not in self._field_names_: self._fields_.extend([('timestamp', 'uint32', 0), ('reserved', 'uint32', 0)]) self._field_names_.extend(['timestamp', 'reserved']) self._field_format_.update({'timestamp': 'I', 'reserved': 'I'}) else: pass # The field values do not reset after being extended for some reason (metaclass). Without this, all events parsed after the first ARCHIVAL_RCD gets parsed as if it has the 'reserved' and 'timestamp field' #map(self._fields_.remove, [f for f in self._fields_ if f[0] in ['timestamp', 'reserved']]) #map(self._field_names_.remove, [f for f in self._field_names_ if f[0] in ['timestamp', 'reserved']]) self._fields_ = [f for f in self._fields_ if f[0] not in ['timestamp', 'reserved']] self._field_names_ = [f for f in self._field_names_ if f[0] not in ['timestamp', 'reserved']] for k in ['timestamp', 'reserved']: self._field_format_.pop(k) super(EventData, self).__init__(*args, **kwargs) self._unpack_data()
def __parse_header(self): self.header_length, = struct.unpack('<I', await self.buffer.read(4)) self.header_chunk_count, = struct.unpack('<I', await self.buffer.read(4)) self.header_chunks = dict() self.header = dict() # Save header data from binary. for nr in range(self.header_chunk_count): chunk_id, = struct.unpack('<I', await self.buffer.read(4)) chunk_size, = struct.unpack('<I', await self.buffer.read(4)) self.header_chunks[chunk_id] = chunk_size & ~0x80000000 # Parse all header chunks. for chunk_id, chunk_size in self.header_chunks.items(): self.strings.reset() self.header.update(await self.__parse_chunk(chunk_id, chunk_size)) return self.header
def to_rain(cls, val): if val is None: return cls.new(typi.null, 0, 0, cls.null) elif val is False: return cls.new(typi.bool, 0, 0, cls.null) elif val is True: return cls.new(typi.bool, 0, 1, cls.null) elif isinstance(val, int): return cls.new(typi.int, 0, val, cls.null) elif isinstance(val, float): raw = struct.pack('d', val) intrep = struct.unpack('Q', raw)[0] return cls.new(typi.float, 0, intrep, cls.null) elif isinstance(val, str): str_p = ct.create_string_buffer(val.encode('utf-8')) cls._saves_.append(str_p) return cls.new(typi.str, len(val), ct.cast(str_p, ct.c_void_p).value, cls.null) raise Exception("Can't convert value {!r} to Rain".format(val))
def _check_crc (self, address, binary): ''' Compares the CRC of the local binary to the one calculated by the bootloader. ''' # Check the CRC crc_data = self._get_crc_internal_flash(address, len(binary)) # Now interpret the returned bytes as the CRC crc_bootloader = struct.unpack('<I', crc_data[0:4])[0] # Calculate the CRC locally crc_function = crcmod.mkCrcFun(0x104c11db7, initCrc=0, xorOut=0xFFFFFFFF) crc_loader = crc_function(binary, 0) if crc_bootloader != crc_loader: raise TockLoaderException('Error: CRC check failed. Expected: 0x{:04x}, Got: 0x{:04x}'.format(crc_loader, crc_bootloader)) else: print('CRC check passed. Binaries successfully loaded.')
def _checksum (self, buffer): ''' Calculate the TBF header checksum. ''' # Add 0s to the end to make sure that we are multiple of 4. padding = len(buffer) % 4 if padding != 0: padding = 4 - padding buffer += bytes([0]*padding) # Loop throw checksum = 0 for i in range(0, len(buffer), 4): checksum ^= struct.unpack('<I', buffer[i:i+4])[0] return checksum
def __init__(self, file, align=True, bigendian=True, inclheader=False): import struct self.closed = False self.align = align # whether to align to word (2-byte) boundaries if bigendian: strflag = '>' else: strflag = '<' self.file = file self.chunkname = file.read(4) if len(self.chunkname) < 4: raise EOFError try: self.chunksize = struct.unpack(strflag+'L', file.read(4))[0] except struct.error: raise EOFError if inclheader: self.chunksize = self.chunksize - 8 # subtract header self.size_read = 0 try: self.offset = self.file.tell() except (AttributeError, IOError): self.seekable = False else: self.seekable = True
def py_suffix_importer(filename, finfo, fqname): file = filename[:-3] + _suffix t_py = long(finfo[8]) t_pyc = _timestamp(file) code = None if t_pyc is not None and t_pyc >= t_py: f = open(file, 'rb') if f.read(4) == imp.get_magic(): t = struct.unpack('<I', f.read(4))[0] if t == t_py: code = marshal.load(f) f.close() if code is None: file = filename code = _compile(file, t_py) return 0, code, { '__file__' : file }
def scan_opcodes(self, co, unpack = struct.unpack): # Scan the code, and yield 'interesting' opcode combinations # Version for Python 2.4 and older code = co.co_code names = co.co_names consts = co.co_consts while code: c = code[0] if c in STORE_OPS: oparg, = unpack('<H', code[1:3]) yield "store", (names[oparg],) code = code[3:] continue if c == LOAD_CONST and code[3] == IMPORT_NAME: oparg_1, oparg_2 = unpack('<xHxH', code[:6]) yield "import", (consts[oparg_1], names[oparg_2]) code = code[6:] continue if c >= HAVE_ARGUMENT: code = code[3:] else: code = code[1:]
def _fms(msg, gain, offset, sbyte, ebyte=None): """ Extract a number from a series of bytes, a number 1,2,4 bytes long :param msg: the series of bytes :param gain: the value to multiply the value by :param offset: the offset of the value :param sbyte: the start byte of the value (indexed from 1) :param ebyte: the end byte of the value (indexed from 1), if None, assume the value is 1 byte long :return: """ if ebyte is None: return msg[sbyte - 1] * gain + offset if ebyte is None: ebyte = sbyte + 1 sbyte -= 1 # interpret bytes as n-byte unsigned int, # multiply by gain + offset byte_len = ebyte - sbyte fmt = { 1: '>B', 2: '<H', 4: '<I' } return struct.unpack(fmt[byte_len], msg[sbyte: ebyte])[0] * gain + offset
def _iesubelmsmtrptlci_(s,sid): """ :returns: parsed LCI optional subelement """ ret = s if sid == std.EID_MSMT_RPT_LCI_AZIMUTH: ret = { 'azimuth-rpt':_iesubelmsmtrptlicazimuth_(struct.unpack_from('=H',s)[0]) } elif sid == std.EID_MSMT_RPT_LCI_ORIGIN: ret = {'originator':_hwaddr_(struct.unpack('=6B',s))} elif sid == std.EID_MSMT_RPT_LCI_TARGET: ret = {'target':_hwaddr_(struct.unpack('=6B',s))} elif sid == std.EID_MSMT_RPT_LCI_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # MSMT Report->Azimuth Report fields Std Fig. 8-164
def get_terminal_size(): def ioctl_GWINSZ(fd): try: import fcntl import termios import struct cr = struct.unpack('hh', fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) except: return None return cr cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) if not cr: try: fd = os.open(os.ctermid(), os.O_RDONLY) cr = ioctl_GWINSZ(fd) os.close(fd) except: pass if not cr: try: cr = (os.env['LINES'], os.env['COLUMNS']) except: cr = (25, 80) return int(cr[1]), int(cr[0])
def handle(self, sock): """ Handle the actual TCP connection """ try: size = struct.unpack("<I", sock.recv(4))[0] data = "" while len(data) < size: data += sock.recv(size - len(data)) if len(self.testcase) >= 100: del self.testcase self.testcase = list() self.testcase.append(data) sock.close() except socket.error as e: raise PJFSocketError(e.message if hasattr(e, "message") else str(e)) except Exception as e: raise PJFBaseException(e.message)
def _get_terminal_size(fd): handle = windll.kernel32.GetStdHandle(_handle_ids[fd]) if handle == 0: raise OSError('handle cannot be retrieved') if handle == -1: raise WinError() csbi = create_string_buffer(22) res = windll.kernel32.GetConsoleScreenBufferInfo(handle, csbi) if res: res = struct.unpack("hhhhHhhhhhh", csbi.raw) left, top, right, bottom = res[5:9] columns = right - left + 1 lines = bottom - top + 1 return terminal_size(columns, lines) else: raise WinError()
def _async_recv_msg(self): """Internal use only; use 'recv_msg' with 'yield' instead. Message is tagged with length of the payload (data). This method receives length of payload, then the payload and returns the payload. """ n = AsyncSocket._MsgLengthSize try: data = yield self.recvall(n) except socket.error as err: if err.args[0] == 'hangup': # raise socket.error(errno.EPIPE, 'Insufficient data') raise StopIteration('') else: raise if len(data) != n: # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n)) raise StopIteration('') n = struct.unpack('>L', data)[0] # assert n >= 0 if n: try: data = yield self.recvall(n) except socket.error as err: if err.args[0] == 'hangup': # raise socket.error(errno.EPIPE, 'Insufficient data') raise StopIteration('') else: raise if len(data) != n: # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n)) raise StopIteration('') raise StopIteration(data) else: raise StopIteration('')
def _sync_recv_msg(self): """Internal use only; use 'recv_msg' instead. Synchronous version of async_recv_msg. """ n = AsyncSocket._MsgLengthSize try: data = self._sync_recvall(n) except socket.error as err: if err.args[0] == 'hangup': # raise socket.error(errno.EPIPE, 'Insufficient data') return '' else: raise if len(data) != n: # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n)) return '' n = struct.unpack('>L', data)[0] # assert n >= 0 if n: try: data = self._sync_recvall(n) except socket.error as err: if err.args[0] == 'hangup': # raise socket.error(errno.EPIPE, 'Insufficient data') return '' else: raise if len(data) != n: # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n)) return '' return data else: return ''
def _async_recv_msg(self): """Internal use only; use 'recv_msg' with 'yield' instead. Message is tagged with length of the payload (data). This method receives length of payload, then the payload and returns the payload. """ n = AsyncSocket._MsgLengthSize try: data = yield self.recvall(n) except socket.error as err: if err.args[0] == 'hangup': # raise socket.error(errno.EPIPE, 'Insufficient data') raise StopIteration(b'') else: raise if len(data) != n: # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n)) raise StopIteration(b'') n = struct.unpack('>L', data)[0] # assert n >= 0 if n: try: data = yield self.recvall(n) except socket.error as err: if err.args[0] == 'hangup': # raise socket.error(errno.EPIPE, 'Insufficient data') raise StopIteration(b'') else: raise if len(data) != n: # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n)) raise StopIteration(b'') raise StopIteration(data) else: raise StopIteration(b'')
def _sync_recv_msg(self): """Internal use only; use 'recv_msg' instead. Synchronous version of async_recv_msg. """ n = AsyncSocket._MsgLengthSize try: data = self._sync_recvall(n) except socket.error as err: if err.args[0] == 'hangup': # raise socket.error(errno.EPIPE, 'Insufficient data') return b'' else: raise if len(data) != n: # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n)) return b'' n = struct.unpack('>L', data)[0] # assert n >= 0 if n: try: data = self._sync_recvall(n) except socket.error as err: if err.args[0] == 'hangup': # raise socket.error(errno.EPIPE, 'Insufficient data') return b'' else: raise if len(data) != n: # raise socket.error(errno.EPIPE, 'Insufficient data: %s / %s' % (len(data), n)) return b'' return data else: return b''
def get_user_name(user_key): samaddr = user_key.space V = None for v in values(user_key): if v.Name == 'V': V = samaddr.read(v.Data.value, v.DataLength.value) if not V: return None name_offset = unpack("<L", V[0x0c:0x10])[0] + 0xCC name_length = unpack("<L", V[0x10:0x14])[0] username = V[name_offset:name_offset+name_length].decode('utf-16-le') return username
def __init__(self, name, address, space): super(Primitive,self).__init__(name, address, space) length, fmt = builtin_types[name] data = space.read(address,length) if not data: self.value = None else: self.value = unpack(fmt,data)[0]
def read_long(self, addr): string = self.read(addr, 4) (longval, ) = struct.unpack('L', string) return longval
def read_long_phys(self, addr): string = self.base.read(addr, 4) (longval, ) = struct.unpack('L', string) return longval
def _decode_packed_type(type_code, type_size, pad=False): # pylint: disable=protected-access, missing-docstring def unpack_type(self, size, offset): if not pad: self._verify_size(size, type_size) new_offset = offset + type_size packed_bytes = self._buffer[offset:new_offset] if pad: packed_bytes = packed_bytes.rjust(type_size, b'\x00') (value,) = struct.unpack(type_code, packed_bytes) return value, new_offset return unpack_type
def _read_extended(self, offset): (next_byte,) = struct.unpack(b'!B', self._buffer[offset:offset + 1]) type_num = next_byte + 7 if type_num < 7: raise InvalidDatabaseError( 'Something went horribly wrong in the decoder. An ' 'extended type resolved to a type number < 8 ' '({type})'.format(type=type_num)) return type_num, offset + 1
def rgb_split(self): """Splits one BMP object into three; one with only the red channel, one with the green, and one with the blue. Returns a tuple (R, G, B) of BMP instances.""" if self.empty == True: die("Attempted to call rgb_split() on an empty BMP object!") f = StringIO(self.bitmap_data) red_data = self.all_headers green_data = self.all_headers blue_data = self.all_headers for row_num in xrange(0, self.height): for pix in xrange(0, self.width): pixel = struct.unpack("3B", f.read(3)) red_data += chr(0x00) + chr(0x00) + chr(pixel[2]) green_data += chr(0x00) + chr(pixel[1]) + chr(0x00) blue_data += chr(pixel[0]) + chr(0x00) + chr(0x00) red_data += chr(0x00) * self.padding_size blue_data += chr(0x00) * self.padding_size green_data += chr(0x00) * self.padding_size f.seek(self.padding_size, 1) # I'm not fond of the constructor hack... but it was the easiest way. return (BMP(red_data, True), BMP(green_data, True), BMP(blue_data, True))
def rgb_merge(self, r, g, b): """ (Re)combine the red, green and blue color channels from three separate pictures of identical size into one. """ if self.empty != True: warn("Running rgb_merge() on a non-empty BMP; the existing data will be overwritten!") # Ugh... if len(set((r.width, g.width, b.width))) != 1 or len(set((r.height, g.height, b.height))) != 1 or len(set((r.bpp, g.bpp, b.bpp))) != 1: die("The dimensions and/or bpp differs between the input images to rgb_merge()!") rf = StringIO(r.bitmap_data) gf = StringIO(g.bitmap_data) bf = StringIO(b.bitmap_data) out_bitmap_data = "" for row_num in xrange(0, b.height): for pix in xrange(0, b.width): red_pixel = struct.unpack("3B", rf.read(3))[2] green_pixel = struct.unpack("3B", gf.read(3))[1] blue_pixel = struct.unpack("3B", bf.read(3))[0] out_bitmap_data += "".join( (chr(blue_pixel), chr(green_pixel), chr(red_pixel)) ) out_bitmap_data += chr(0x00) * r.padding_size rf.seek(r.padding_size, 1) gf.seek(g.padding_size, 1) bf.seek(b.padding_size, 1) return BMP(r.all_headers + out_bitmap_data, True)
def checksum(*data): data = b''.join(data) if len(data) % 2: data += b'\x00' csum = sum(struct.unpack('!H', data[x:x+2])[0] for x in range(0, len(data), 2)) csum = (csum >> 16) + (csum & 0xffff) csum += csum >> 16 return ~csum & 0xffff