我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用struct.pack()。
def encode_8(bytes, key, terminator): """ Encode the bytecode with the given 8-bit XOR key. :type bytes: str :param bytes: Bytecode to encode. :type key: str :param key: 8-bit XOR key. :type terminator: str :param terminator: 8-bit terminator. :rtype: str :return: Encoded bytecode. """ if not bytes.endswith(terminator): bytes += terminator fmt = "B" * len(bytes) unpack = struct.unpack pad = unpack("B", key) * len(bytes) bytes = unpack(fmt, bytes) bytes = [ bytes[i] ^ pad[i] for i in xrange(len(bytes)) ] return struct.pack(fmt, *bytes)
def process_call(self, addr, cmd, val): """Perform a smbus process call by writing a word (2 byte) value to the specified register of the device, and then reading a word of response data (which is returned). """ assert self._device is not None, 'Bus must be opened before operations are made against it!' # Build ctypes values to marshall between ioctl and Python. data = create_string_buffer(struct.pack('=BH', cmd, val)) result = c_uint16() # Build ioctl request. request = make_i2c_rdwr_data([ (addr, 0, 3, cast(pointer(data), POINTER(c_uint8))), # Write data. (addr, I2C_M_RD, 2, cast(pointer(result), POINTER(c_uint8))) # Read word (2 bytes). ]) # Make ioctl call and return result data. ioctl(self._device.fileno(), I2C_RDWR, request) # Note the python-smbus code appears to have a rather serious bug and # does not return the result value! This is fixed below by returning it. return result.value
def get_iphostname(): '''??linux?????????IP??''' def get_ip(ifname): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ipaddr = socket.inet_ntoa(fcntl.ioctl( sock.fileno(), 0x8915, # SIOCGIFADDR struct.pack('256s', ifname[:15]) )[20:24] ) sock.close() return ipaddr try: ip = get_ip('eth0') except IOError: ip = get_ip('eno1') hostname = socket.gethostname() return {'hostname': hostname, 'ip':ip}
def parse_default(field, ftype, fdefault): if not (ftype == 'bool' and fdefault == 'true'): try: fdefault = literal_eval(fdefault.rstrip('LDF')) except (ValueError, SyntaxError): fdefault = None if type(fdefault) is int: if ftype[0] != 'u' and ftype[:5] != 'fixed': if fdefault >> 63: fdefault = c_long(fdefault).value elif fdefault >> 31 and ftype[-2:] != '64': fdefault = c_int(fdefault).value else: fdefault &= (1 << int(ftype[-2:])) - 1 if ftype == 'float' and abs(fdefault) >> 23: fdefault = unpack('=f', pack('=i', fdefault))[0] elif ftype == 'double' and abs(fdefault) >> 52: fdefault = unpack('=d', pack('=q', fdefault))[0] if fdefault: field.default_value = str(fdefault)
def _write_header(self, initlength): assert not self._headerwritten self._file.write('RIFF') if not self._nframes: self._nframes = initlength / (self._nchannels * self._sampwidth) self._datalength = self._nframes * self._nchannels * self._sampwidth self._form_length_pos = self._file.tell() self._file.write(struct.pack('<l4s4slhhllhh4s', 36 + self._datalength, 'WAVE', 'fmt ', 16, WAVE_FORMAT_PCM, self._nchannels, self._framerate, self._nchannels * self._framerate * self._sampwidth, self._nchannels * self._sampwidth, self._sampwidth * 8, 'data')) self._data_length_pos = self._file.tell() self._file.write(struct.pack('<l', self._datalength)) self._headerwritten = True
def send_testcase(json, ip, port): """ Send a raw testcase """ try: json = struct.pack("<I", len(json)) + json try: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((ip, int(port))) s.send(json) s.shutdown(socket.SHUT_RDWR) s.close() return True except socket.error: return False except socket.error as e: raise PJFSocketError(e.message if hasattr(e, "message") else str(e)) except Exception as e: raise PJFBaseException(e.message)
def v4_int_to_packed(address): """The binary representation of this address. Args: address: An integer representation of an IPv4 IP address. Returns: The binary representation of this address. Raises: ValueError: If the integer is too large to be an IPv4 IP address. """ if address > _BaseV4._ALL_ONES: raise ValueError('Address too large for IPv4') return Bytes(struct.pack('!I', address))
def _create_header(cls, width, height): """ Internal function used to create headers when changing them is necessary, e.g. when image dimensions change. This function creates both a BMP header and a V3 DIB header, with some values left as defaults (e.g. pixels/meter) """ total_header_size = cls.bmp_header_len + 40 # V3 len = 40 bytes padding_size = width & 3 # Magic stuff bitmap_size = ((width * 3) + padding_size) * height file_size = total_header_size + bitmap_size # BMP header: Magic (2 bytes), file size, 2 ignored values, bitmap offset header = struct.pack('<2s I 2H I', "BM", file_size, 0, 0, total_header_size) # DIB V3 header: header size, px width, px height, num of color planes, bpp, compression method, # bitmap data size, horizontal resolution, vertical resolution, number of colors in palette, number of important colors used # Few of these matter, so there are a bunch of default/"magic" numbers here... header += struct.pack('I 2i H H I I 2i 2I', 40, width, height, 1, 24, 0, bitmap_size, 0x0B13, 0x0B13, 0, 0) return header
def _load_bytecode_from_dump(input): # Read the data. data = input.read() # If it's an hexadecimal dump, decode and return it. if _re_is_hexa.match(data): hexstr = ''.join(_re_get_hexa.findall(data)) hexdump = [ int(hexstr[i:i+2], 16) for i in xrange(0, len(hexstr), 2) ] return struct.pack('B' * len(hexdump), *hexdump) # If it's base64 encoded data, decode and return it. if _re_is_b64.match(data): return data.decode('base64') # Assume it's a raw binary dump and return it unchanged. return data #-----------------------------------------------------------------------------#
def OSCTimeTag(time): """Convert a time in floating seconds to its OSC binary representation """ if time > 0: fract, secs = math.modf(time) secs = secs - NTP_epoch binary = struct.pack('>LL', int(secs), int(fract * NTP_units_per_second)) else: binary = struct.pack('>LL', 0, 1) return binary ###### # # OSCMessage decoding functions # ######
def OSCTimeTag(time): """Convert a time in floating seconds to its OSC binary representation """ if time > 0: fract, secs = math.modf(time) secs = secs - NTP_epoch binary = struct.pack('>LL', long(secs), long(fract * NTP_units_per_second)) else: binary = struct.pack('>LL', 0L, 1L) return binary ###### # # OSCMessage decoding functions # ######
def build_header(self): timestamp = utctimestamp() padding = str.encode('\0\0' * 14) data = pack( '<i2IQH14s', timestamp, self.metadata['incremental'], self.metadata['segment_size'], self.metadata['sectors'], len(self.metadata['bases']), padding ) checksum = crc32(data) for i in self.metadata['bases']: data += i checksum = crc32(i, checksum) return data, checksum
def write_body(self, f): checksum = 0 for segment, meta in self.segments.items(): data = pack( '<2IH2B20s', segment, meta['incremental'], meta['base'], meta['encryption'], meta['compression'], meta['sha1_hash'] ) f.write(data) checksum = crc32(data, checksum) """ Backfill the body_checksum """ f.seek(24, 0) f.write(pack('<I', checksum))
def close(self): """Close the _Stream object. No operation should be done on it afterwards. """ if self.closed: return self.closed = True try: if self.mode == "w" and self.comptype != "tar": self.buf += self.cmp.flush() if self.mode == "w" and self.buf: self.fileobj.write(self.buf) self.buf = b"" if self.comptype == "gz": self.fileobj.write(struct.pack("<L", self.crc)) self.fileobj.write(struct.pack("<L", self.pos & 0xffffFFFF)) finally: if not self._extfileobj: self.fileobj.close()
def __pack__(self): fmt = self.endian value_list = [] for field in self._field_names_: fmt_ = self._field_format_[field] val = getattr(self, field) if isinstance(fmt_, StructArray): value_list.extend([ getattr(struct_, field) for struct_ in fmt_.structure_list for field in struct_._field_names_ ]) elif isinstance(fmt_, basestring) and (fmt_.startswith('BBB') or fmt_.startswith('bbb')): value_list.extend([(val >> i & 0xFF) for i in [x for x in range(0, len(fmt_) * 8, 8)]]) else: try: value_list.append(val.encode('ascii', 'ignore')) except AttributeError: value_list.append(val) fmt += str(len(val)) + 's' if fmt_ == 'variable' else fmt_.get_struct() if isinstance(fmt_, StructArray) else fmt_ try: return struct.pack(fmt, *value_list) except struct.error as exc: raise_from(PackError("Unable to pack structure"), exc)
def test_ttag_values_packet(self): pkt = sdds_pkt.sdds_packet() ttag_=0 ttage_=0 sddstime=Time() sddstime.set(ttag_,ttage_) res=struct.pack("!QI", ttag_, ttage_) pkt.set_time( ttag_, ttage_) self.assertEqual( pkt.header.ttag.tstamp.asString(), res ) self.assertEqual( pkt.get_SDDSTime(), sddstime ) pkt.set_SDDSTime( sddstime ) self.assertEqual( pkt.get_SDDSTime(), sddstime ) ttag_= 4294967296 ttage_= 8388608 sddstime=Time() sddstime.set(ttag_,ttage_) res=struct.pack("!QI", ttag_, ttage_) res=struct.pack("!QI", ttag_, ttage_) pkt.set_time( ttag_, ttage_) self.assertEqual( pkt.header.ttag.tstamp.asString(), res ) self.assertEqual( pkt.get_SDDSTime(), sddstime ) pkt.set_SDDSTime( sddstime ) self.assertEqual( pkt.get_SDDSTime(), sddstime )
def test_ttag_info(self): msptr_=0 msdelta_=0 ttag_=0 ttage_=0 res=struct.pack("!HHQI", msptr_, msdelta_, ttag_, ttage_) ttag_val = ttag_info() self.assertEqual( ttag_val.asString(), res ) # test big endian format for number ttag_= 4294967296 ttage_= 8388608 msptr_=256 msdelta_=256 res=struct.pack("!HHQI", msptr_, msdelta_, ttag_, ttage_) ttag_val.info.msptr.msptr=msptr_ ttag_val.info.msptr.msdelta=msdelta_ ttag_val.tstamp.ttag=ttag_ ttag_val.tstamp.ttage=ttage_ self.assertEqual( ttag_val.asString(), res )
def pack_ext_header(hdr, structured=0): """ Packs the value of the given BLUE file hdr dictionary's 'ext_header' key into the BLUE file extended header format and updates the value of 'ext_size'. The value of 'ext_header' can be a list of (key, value) tuples or a dict. Before writing this out to disk at the end of a BLUE file it must be padded out to a multiple of 512 bytes. If the keywords given are already a string it is presumed they are already packed and the 'ext_size' field is updated but the string itself is left alone. If <structured> is true, any embedded Python dictionaries, lists or tuples will pack their structure into the keywords with them. See pack_keywords() for more info. """ packed = pack_keywords(hdr['ext_header'], _rep_tran[hdr['head_rep']], structured=structured) hdr['ext_header'] = packed hdr['ext_size'] = len(packed)
def encrypt_file(key, in_filename, out_filename=None, chunksize=64*1024): if not out_filename: out_filename = in_filename + '.crypt' iv = ''.join(chr(random.randint(0, 0xFF)) for i in range(16)) encryptor = AES.new(key, AES.MODE_CBC, iv) filesize = os.path.getsize(in_filename) with open(in_filename, 'rb') as infile: with open(out_filename, 'wb') as outfile: outfile.write(struct.pack('<Q', filesize)) outfile.write(iv) while True: chunk = infile.read(chunksize) if len(chunk) == 0: break elif len(chunk) % 16 != 0: chunk += ' ' * (16 - len(chunk) % 16) outfile.write(encryptor.encrypt(chunk))
def Send_File_Client(): sendSock = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sendSock.connect(ADDR) fhead=struct.pack('IdI',1,float(time.time()),os.stat(filename).st_size) print(fhead) sendSock.send(fhead) fp = open(filename,'rb') while 1: filedata = fp.read(BUFSIZE) if not filedata: break sendSock.send(filedata) ''' print u"?????????????...\n" fp.close() sendSock.close() print u"?????...\n" '''
def to_rain(cls, val): if val is None: return cls.new(typi.null, 0, 0, cls.null) elif val is False: return cls.new(typi.bool, 0, 0, cls.null) elif val is True: return cls.new(typi.bool, 0, 1, cls.null) elif isinstance(val, int): return cls.new(typi.int, 0, val, cls.null) elif isinstance(val, float): raw = struct.pack('d', val) intrep = struct.unpack('Q', raw)[0] return cls.new(typi.float, 0, intrep, cls.null) elif isinstance(val, str): str_p = ct.create_string_buffer(val.encode('utf-8')) cls._saves_.append(str_p) return cls.new(typi.str, len(val), ct.cast(str_p, ct.c_void_p).value, cls.null) raise Exception("Can't convert value {!r} to Rain".format(val))
def _change_baud_rate (self, baud_rate): ''' If the bootloader on the board supports it and if it succeeds, try to increase the baud rate to make everything faster. ''' pkt = struct.pack('<BI', 0x01, baud_rate) success, ret = self._issue_command(self.COMMAND_CHANGE_BAUD_RATE, pkt, True, 0, self.RESPONSE_OK, show_errors=False) if success: # The bootloader is new enough to support this. # Increase the baud rate self.sp.baudrate = baud_rate # Now confirm that everything is working. pkt = struct.pack('<BI', 0x02, baud_rate) success, ret = self._issue_command(self.COMMAND_CHANGE_BAUD_RATE, pkt, False, 0, self.RESPONSE_OK, show_errors=False) if not success: # Something went wrong. Go back to old baud rate self.sp.baudrate = 115200
def read_range (self, address, length): # Can only read up to 4095 bytes at a time. MAX_READ = 4095 read = bytes() this_length = 0 remaining = length while remaining > 0: if remaining > MAX_READ: this_length = MAX_READ remaining -= MAX_READ else: this_length = remaining remaining = 0 message = struct.pack('<IH', address, this_length) success, flash = self._issue_command(self.COMMAND_READ_RANGE, message, True, this_length, self.RESPONSE_READ_RANGE) if not success: raise TockLoaderException('Error: Could not read flash') else: read += flash address += this_length return read
def _get_crc_internal_flash (self, address, length): ''' Get the bootloader to compute a CRC. ''' message = struct.pack('<II', address, length) success, crc = self._issue_command(self.COMMAND_CRC_INTERNAL_FLASH, message, True, 4, self.RESPONSE_CRC_INTERNAL_FLASH) # There is a bug in a version of the bootloader where the CRC returns 6 # bytes and not just 4. Need to read just in case to grab those extra # bytes. self.sp.read(2) if not success: if crc[1] == self.RESPONSE_BADADDR: raise TockLoaderException('Error: RESPONSE_BADADDR: Invalid address for CRC (address: 0x{:X})'.format(address)) elif crc[1] == self.RESPONSE_BADARGS: raise TockLoaderException('Error: RESPONSE_BADARGS: Invalid length for CRC check') else: raise TockLoaderException('Error: 0x{:X}'.format(crc[1])) return crc
def save_int(self, obj, pack=struct.pack): if self.bin: # If the int is small enough to fit in a signed 4-byte 2's-comp # format, we can store it more efficiently than the general # case. # First one- and two-byte unsigned ints: if obj >= 0: if obj <= 0xff: self.write(BININT1 + chr(obj)) return if obj <= 0xffff: self.write("%c%c%c" % (BININT2, obj&0xff, obj>>8)) return # Next check for 4-byte signed ints: high_bits = obj >> 31 # note that Python shift sign-extends if high_bits == 0 or high_bits == -1: # All high bits are copies of bit 2**31, so the value # fits in a 4-byte signed int. self.write(BININT + pack("<i", obj)) return # Text pickle, or int too big to fit in signed 4-byte format. self.write(INT + repr(obj) + '\n')
def save_string(self, obj, pack=struct.pack): unicode = obj.isunicode() if self.bin: if unicode: obj = obj.encode("utf-8") l = len(obj) if l < 256 and not unicode: self.write(SHORT_BINSTRING + chr(l) + obj) else: s = pack("<i", l) if unicode: self.write(BINUNICODE + s + obj) else: self.write(BINSTRING + s + obj) else: if unicode: obj = obj.replace("\\", "\\u005c") obj = obj.replace("\n", "\\u000a") obj = obj.encode('raw-unicode-escape') self.write(UNICODE + obj + '\n') else: self.write(STRING + repr(obj) + '\n') self.memoize(obj)
def parse_netconn(self, seq, netconn): parts = netconn.split('|') new_conn = {} timestamp = convert_event_time(parts[0]) try: new_conn['remote_ip'] = socket.inet_ntoa(struct.pack('>i', int(parts[1]))) except: new_conn['remote_ip'] = '0.0.0.0' new_conn['remote_port'] = int(parts[2]) new_conn['proto'] = protocols[int(parts[3])] new_conn['domain'] = parts[4] if parts[5] == 'true': new_conn['direction'] = 'Outbound' else: new_conn['direction'] = 'Inbound' return CbNetConnEvent(self.process_model, timestamp, seq, new_conn)
def parse_netconn(self, seq, netconn): new_conn = {} timestamp = convert_event_time(netconn.get("timestamp", None)) direction = netconn.get("direction", "true") if direction == 'true': new_conn['direction'] = 'Outbound' else: new_conn['direction'] = 'Inbound' for ipfield in ('remote_ip', 'local_ip', 'proxy_ip'): try: new_conn[ipfield] = socket.inet_ntoa(struct.pack('>i', int(netconn.get(ipfield, 0)))) except: new_conn[ipfield] = netconn.get(ipfield, '0.0.0.0') for portfield in ('remote_port', 'local_port', 'proxy_port'): new_conn[portfield] = int(netconn.get(portfield, 0)) new_conn['proto'] = protocols.get(int(netconn.get('proto', 0)), "Unknown") new_conn['domain'] = netconn.get('domain', '') return CbNetConnEvent(self.process_model, timestamp, seq, new_conn, version=2)
def spawn(self, lines, additional_args = [ '-p', ''], width = None): (mouse_x, mouse_y) = get_mouse_location() if not width: width = 100 # some default width width = max(width, 101) # width has to be 100 at least (rofi restriction) # first, compute the top left corner of the menu menu_x = min(max(mouse_x - width/2, self.x), self.x + self.panel_width - width) menu_y = self.y # then, specify these coordinates relative to the mouse cursor menu_x -= mouse_x menu_y -= mouse_y # compile rofi arguments cmd = ['rofi', '-dmenu', '-sep' , '\\0' ] cmd += ['-monitor', '-3' ] # position relative to mouse cursor cmd += ['-layout', '1' ] # specify top left corner of the menu cmd += ['-width', str(width) ] cmd += ['-xoffset', str(menu_x), '-yoffset', str(menu_y) ] cmd += self.rofi_args cmd += additional_args rofi = subprocess.Popen(cmd,stdout=subprocess.PIPE,stdin=subprocess.PIPE) for i in lines: rofi.stdin.write(i.encode('utf-8')) rofi.stdin.write(struct.pack('B', 0)) rofi.stdin.close() rofi.wait()
def encodeDeltaRunAsBytes_(deltas, offset, stream): runLength = 0 pos = offset numDeltas = len(deltas) while pos < numDeltas and runLength < 64: value = deltas[pos] if value < -128 or value > 127: break # Within a byte-encoded run of deltas, a single zero # is best stored literally as 0x00 value. However, # if are two or more zeroes in a sequence, it is # better to start a new run. For example, the sequence # of deltas [15, 15, 0, 15, 15] becomes 6 bytes # (04 0F 0F 00 0F 0F) when storing the zero value # literally, but 7 bytes (01 0F 0F 80 01 0F 0F) # when starting a new run. if value == 0 and pos+1 < numDeltas and deltas[pos+1] == 0: break pos += 1 runLength += 1 assert runLength >= 1 and runLength <= 64 stream.write(bytechr(runLength - 1)) for i in range(offset, pos): stream.write(struct.pack('b', round(deltas[i]))) return pos
def compile(self, ttFont): vorgs = list(self.VOriginRecords.values()) names = list(self.VOriginRecords.keys()) nameMap = ttFont.getReverseGlyphMap() lenRecords = len(vorgs) try: gids = map(operator.getitem, [nameMap]*lenRecords, names) except KeyError: nameMap = ttFont.getReverseGlyphMap(rebuild=True) gids = map(operator.getitem, [nameMap]*lenRecords, names) vOriginTable = list(zip(gids, vorgs)) self.numVertOriginYMetrics = lenRecords vOriginTable.sort() # must be in ascending GID order dataList = [ struct.pack(">Hh", rec[0], rec[1]) for rec in vOriginTable] header = struct.pack(">HHhH", self.majorVersion, self.minorVersion, self.defaultVertOriginY, self.numVertOriginYMetrics) dataList.insert(0, header) data = bytesjoin(dataList) return data
def compile(self, ttFont): self.tables.sort() # sort according to the spec; see CmapSubtable.__lt__() numSubTables = len(self.tables) totalOffset = 4 + 8 * numSubTables data = struct.pack(">HH", self.tableVersion, numSubTables) tableData = b"" seen = {} # Some tables are the same object reference. Don't compile them twice. done = {} # Some tables are different objects, but compile to the same data chunk for table in self.tables: try: offset = seen[id(table.cmap)] except KeyError: chunk = table.compile(ttFont) if chunk in done: offset = done[chunk] else: offset = seen[id(table.cmap)] = done[chunk] = totalOffset + len(tableData) tableData = tableData + chunk data = data + struct.pack(">HHl", table.platformID, table.platEncID, offset) return data + tableData
def compile(self, ttFont): if self.data: return struct.pack(">HHH", self.format, self.length, self.language) + self.data cmap = self.cmap codes = sorted(cmap.keys()) if codes: # yes, there are empty cmap tables. codes = list(range(codes[0], codes[-1] + 1)) firstCode = codes[0] valueList = [cmap.get(code, ".notdef") for code in codes] valueList = map(ttFont.getGlyphID, valueList) gids = array.array("H", valueList) if sys.byteorder != "big": gids.byteswap() data = gids.tostring() else: data = b"" firstCode = 0 header = struct.pack(">HHHHH", 6, len(data) + 10, self.language, firstCode, len(codes)) return header + data
def compile(self, ttFont): if not hasattr(self, "names"): # only happens when there are NO name table entries read # from the TTX file self.names = [] names = self.names names.sort() # sort according to the spec; see NameRecord.__lt__() stringData = b"" format = 0 n = len(names) stringOffset = 6 + n * sstruct.calcsize(nameRecordFormat) data = struct.pack(b">HHH", format, n, stringOffset) lastoffset = 0 done = {} # remember the data so we can reuse the "pointers" for name in names: string = name.toBytes() if string in done: name.offset, name.length = done[string] else: name.offset, name.length = done[string] = len(stringData), len(string) stringData = bytesjoin([stringData, string]) data = data + sstruct.pack(nameRecordFormat, name) return data + stringData
def compile(self, ttFont): # First make sure that all the data lines up properly. Format 4 # must have all its data lined up consecutively. If not this will fail. for curLoc, nxtLoc in zip(self.locations, self.locations[1:]): assert curLoc[1] == nxtLoc[0], "Data must be consecutive in indexSubTable format 4" offsets = list(self.locations[0]) + [loc[1] for loc in self.locations[1:]] # Image data offset must be less than or equal to the minimum of locations. # Resetting this offset may change the value for round tripping but is safer # and allows imageDataOffset to not be required to be in the XML version. self.imageDataOffset = min(offsets) offsets = [offset - self.imageDataOffset for offset in offsets] glyphIds = list(map(ttFont.getGlyphID, self.names)) # Create an iterator over the ids plus a padding value. idsPlusPad = list(itertools.chain(glyphIds, [0])) dataList = [EblcIndexSubTable.compile(self, ttFont)] dataList.append(struct.pack(">L", len(glyphIds))) tmp = [struct.pack(codeOffsetPairFormat, *cop) for cop in zip(idsPlusPad, offsets)] dataList += tmp data = bytesjoin(dataList) return data
def discover_peers(self, port=None): """This method can be invoked (periodically?) to broadcast message to discover peers, if there is a chance initial broadcast message may be lost (as these messages are sent over UDP). """ ping_msg = {'signature': self._signature, 'name': self._name, 'version': __version__} def _discover(addrinfo, port, task=None): ping_sock = AsyncSocket(socket.socket(addrinfo.family, socket.SOCK_DGRAM)) ping_sock.settimeout(2) if addrinfo.family == socket.AF_INET: ping_sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) else: # addrinfo.family == socket.AF_INET6 ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_HOPS, struct.pack('@i', 1)) ping_sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, addrinfo.ifn) ping_sock.bind((addrinfo.ip, 0)) if not port: port = addrinfo.udp_sock.getsockname()[1] ping_msg['location'] = addrinfo.location try: yield ping_sock.sendto('ping:'.encode() + serialize(ping_msg), (addrinfo.broadcast, port)) except: pass ping_sock.close() for addrinfo in self._addrinfos: SysTask(_discover, addrinfo, port)
def _async_send_msg(self, data): """Internal use only; use 'send_msg' with 'yield' instead. Messages are tagged with length of the data, so on the receiving side, recv_msg knows how much data to receive. """ yield self.sendall(struct.pack('>L', len(data)) + data)
def _sync_send_msg(self, data): """Internal use only; use 'send_msg' instead. Synchronous version of async_send_msg. """ return self._sync_sendall(struct.pack('>L', len(data)) + data)
def get_pixels_slow(self, ximage): ''' Retrieve all pixels from a monitor. Pixels have to be RGB. (!) Insanely slow version, see doc/linux-slow-version. (!) ''' # @TODO: this part takes most of the time. Need a better solution. def pix(pixel, _resultats={}, p__=pack): ''' Apply shifts to a pixel to get the RGB values. This method uses of memoization. ''' # pylint: disable=dangerous-default-value if pixel not in _resultats: _resultats[pixel] = p__('<B', (pixel & rmask) >> 16) + \ p__('<B', (pixel & gmask) >> 8) + \ p__('<B', pixel & bmask) return _resultats[pixel] self.width = ximage.contents.width self.height = ximage.contents.height rmask = ximage.contents.red_mask bmask = ximage.contents.blue_mask gmask = ximage.contents.green_mask get_pix = self.xlib.XGetPixel pixels = [pix(get_pix(ximage, x, y)) for y in range(self.height) for x in range(self.width)] self.image = b''.join(pixels) return self.image
def decrypt_single_hash(rid, hbootkey, enc_hash, lmntstr): (des_k1,des_k2) = sid_to_key(rid) d1 = DES.new(des_k1, DES.MODE_ECB) d2 = DES.new(des_k2, DES.MODE_ECB) md5 = MD5.new() md5.update(hbootkey[:0x10] + pack("<L",rid) + lmntstr) rc4_key = md5.digest() rc4 = ARC4.new(rc4_key) obfkey = rc4.encrypt(enc_hash) hash = d1.decrypt(obfkey[:8]) + d2.decrypt(obfkey[8:]) return hash
def write_word_data(self, addr, cmd, val): """Write a word (2 bytes) of data to the specified cmd register of the device. Note that this will write the data in the endianness of the processor running Python (typically little endian)! """ assert self._device is not None, 'Bus must be opened before operations are made against it!' # Construct a string of data to send with the command register and word value. data = struct.pack('=BH', cmd & 0xFF, val & 0xFFFF) # Send the data to the device. self._select_device(addr) self._device.write(data)
def _decode_pointer(self, size, offset): pointer_size = ((size >> 3) & 0x3) + 1 new_offset = offset + pointer_size pointer_bytes = self._buffer[offset:new_offset] packed = pointer_bytes if pointer_size == 4 else struct.pack( b'!c', byte_from_int(size & 0x7)) + pointer_bytes unpacked = int_from_bytes(packed) pointer = unpacked + self._pointer_base + \ self._pointer_value_offset[pointer_size] if self._pointer_test: return pointer, new_offset (value, _) = self.decode(pointer) return value, new_offset
def v6_int_to_packed(address): """The binary representation of this address. Args: address: An integer representation of an IPv6 IP address. Returns: The binary representation of this address. """ return Bytes(struct.pack('!QQ', address >> 64, address & (2**64 - 1)))
def ipv4_checksum(ip4, udp, payload): ip4_src = ip4['src'] ip4_dst = ip4['dst'] ip4_len = len(payload) + len(UDP) return checksum(ip4_src or b'\x00' * 4, ip4_dst or b'\x00' * 4, struct.pack('!HH', ip4['p'], ip4_len), udp.view, payload)
def _load_bytecode_from_source(input): # Load the regular expressions as local variables, # since we're using them in a loop. re_is_line = _re_is_line re_parse_line = _re_parse_line # We'll accumulate the hexadecimal characters here. hexa = [] # Flag to signal errors during parsing. parse_warning = False # For each line of text in the input file... for line in input.readlines(): # Skip lines that don't contain bytecode. if re_is_line.match(line): # Extract hexadecimal sequences. sequence = re_parse_line.findall(line) if sequence: hexa.extend(sequence) # Flag parsing errors. elif '\\x' in line or '%' in line: parse_warning = True # Show a warning if we had parsing errors. if parse_warning: warnings.warn("Source code layout was changed, possible load errors!") # Raise an exception if no bytecode was extracted. if not hexa: raise IOError("Load error, no bytecode found") # Pack the bytecode and return it. hexdump = [int(x, 16) for x in hexa] return struct.pack('B' * len(hexdump), *hexdump) #-----------------------------------------------------------------------------#
def OSCString(next): """Convert a string into a zero-padded OSC String. The length of the resulting string is always a multiple of 4 bytes. The string ends with 1 to 4 zero-bytes ('\x00') """ OSCstringLength = math.ceil((len(next)+1) / 4.0) * 4 if sys.version_info[0] > 2: next = bytes(next.encode("UTF-8")) else: next = str(next) return struct.pack(">%ds" % (OSCstringLength), next)
def OSCBlob(next): """Convert a string into an OSC Blob. An OSC-Blob is a binary encoded block of data, prepended by a 'size' (int32). The size is always a mutiple of 4 bytes. The blob ends with 0 to 3 zero-bytes ('\x00') """ if type(next) in (str, bytes): OSCblobLength = math.ceil((len(next)) / 4.0) * 4 binary = struct.pack(">i%ds" % (OSCblobLength), OSCblobLength, next) else: binary = "" return binary