我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用struct.calcsize()。
def decrypt_file(key, in_filename, out_filename=None, chunksize=24*1024): # Split .crypt extension to restore file format if not out_filename: out_filename = os.path.splitext(in_filename)[0] with open(in_filename, 'rb') as infile: origsize = struct.unpack('<Q', infile.read(struct.calcsize('Q')))[0] iv = infile.read(16) decryptor = AES.new(key, AES.MODE_CBC, iv) with open(out_filename, 'wb') as outfile: while True: chunk = infile.read(chunksize) if len(chunk) == 0: break outfile.write(decryptor.decrypt(chunk)) # Truncate file to original size outfile.truncate(origsize)
def run(self): print("VEDIO server starts...") self.sock.bind(self.ADDR) self.sock.listen(1) conn, addr = self.sock.accept() print("remote VEDIO client success connected...") data = "".encode("utf-8") payload_size = struct.calcsize("L") cv2.namedWindow('Remote', cv2.WINDOW_NORMAL) while True: while len(data) < payload_size: data += conn.recv(81920) packed_size = data[:payload_size] data = data[payload_size:] msg_size = struct.unpack("L", packed_size)[0] while len(data) < msg_size: data += conn.recv(81920) zframe_data = data[:msg_size] data = data[msg_size:] frame_data = zlib.decompress(zframe_data) frame = pickle.loads(frame_data) cv2.imshow('Remote', frame) if cv2.waitKey(1) & 0xFF == 27: break
def _getOffsets(self): """ Calculate offsets to VDMX_Group records. For each ratRange return a list of offset values from the beginning of the VDMX table to a VDMX_Group. """ lenHeader = sstruct.calcsize(VDMX_HeaderFmt) lenRatRange = sstruct.calcsize(VDMX_RatRangeFmt) lenOffset = struct.calcsize('>H') lenGroupHeader = sstruct.calcsize(VDMX_GroupFmt) lenVTable = sstruct.calcsize(VDMX_vTableFmt) # offset to the first group pos = lenHeader + self.numRatios*lenRatRange + self.numRatios*lenOffset groupOffsets = [] for group in self.groups: groupOffsets.append(pos) lenGroup = lenGroupHeader + len(group) * lenVTable pos += lenGroup # offset to next group offsets = [] for ratio in self.ratRanges: groupIndex = ratio['groupIndex'] offsets.append(groupOffsets[groupIndex]) return offsets
def decompile(self, data, ttFont): sstruct.unpack2(Silf_hdr_format, data, self) if self.version >= 5.0: (data, self.scheme) = grUtils.decompress(data) sstruct.unpack2(Silf_hdr_format_3, data, self) base = sstruct.calcsize(Silf_hdr_format_3) elif self.version < 3.0: self.numSilf = struct.unpack('>H', data[4:6]) self.scheme = 0 self.compilerVersion = 0 base = 8 else: self.scheme = 0 sstruct.unpack2(Silf_hdr_format_3, data, self) base = sstruct.calcsize(Silf_hdr_format_3) silfoffsets = struct.unpack_from(('>%dL' % self.numSilf), data[base:]) for offset in silfoffsets: s = Silf() self.silfs.append(s) s.decompile(data[offset:], ttFont, self.version)
def read_f(self, offset=None): if offset is not None: self.fid.seek(offset) d = {} for key, fmt in self.description: buf = self.fid.read(struct.calcsize(fmt)) if len(buf) != struct.calcsize(fmt): return None val = list(struct.unpack(fmt, buf)) for i, ival in enumerate(val): if hasattr(ival, 'replace'): ival = ival.replace(unicode.encode('\x03'), unicode.encode('')) ival = ival.replace(unicode.encode('\x00'), unicode.encode('')) val[i] = ival.decode("utf-8") if len(val) == 1: val = val[0] d[key] = val return d
def read_f(self, offset =None): if offset is not None : self.fid.seek(offset) d = { } for key, fmt in self.description : fmt = '<' + fmt # insures use of standard sizes buf = self.fid.read(struct.calcsize(fmt)) if len(buf) != struct.calcsize(fmt) : return None val = list(struct.unpack(fmt , buf)) for i, ival in enumerate(val): if hasattr(ival, 'split'): val[i] = ival.split('\x00', 1)[0] if len(val) == 1: val = val[0] d[key] = val return d
def read_f(self, offset=None): if offset is not None: self.fid.seek(offset) d = {} for key, fmt in self.description: buf = self.fid.read(struct.calcsize(fmt)) if len(buf) != struct.calcsize(fmt): return None val = list(struct.unpack(fmt, buf)) for i, ival in enumerate(val): if hasattr(ival, 'replace'): ival = ival.replace(str.encode('\x03'), str.encode('')) ival = ival.replace(str.encode('\x00'), str.encode('')) val[i] = ival.decode("utf-8") if len(val) == 1: val = val[0] d[key] = val return d
def _RegisterWndClass(self): className = "PythonSplash" global g_registeredClass if not g_registeredClass: message_map = {} wc = win32gui.WNDCLASS() wc.SetDialogProc() # Make it a dialog class. self.hinst = wc.hInstance = win32api.GetModuleHandle(None) wc.lpszClassName = className wc.style = 0 wc.hCursor = win32gui.LoadCursor( 0, win32con.IDC_ARROW ) wc.hbrBackground = win32con.COLOR_WINDOW + 1 wc.lpfnWndProc = message_map # could also specify a wndproc. wc.cbWndExtra = win32con.DLGWINDOWEXTRA + struct.calcsize("Pi") classAtom = win32gui.RegisterClass(wc) g_registeredClass = 1 return className
def _extract_images(filename): """??????????????????? :param filename: ????? :return: 4??numpy??[index, y, x, depth]? ???np.float32 """ images = [] print('Extracting {}'.format(filename)) with gzip.GzipFile(fileobj=open(filename, 'rb')) as f: buf = f.read() index = 0 magic, num_images, rows, cols = struct.unpack_from('>IIII', buf, index) if magic != 2051: raise ValueError('Invalid magic number {} in MNIST image file: {}'.format(magic, filename)) index += struct.calcsize('>IIII') for i in range(num_images): img = struct.unpack_from('>784B', buf, index) index += struct.calcsize('>784B') img = np.array(img, dtype=np.float32) # ????[0,255]???[0,1] img = np.multiply(img, 1.0 / 255.0) img = img.reshape(rows, cols, 1) images.append(img) return np.array(images, dtype=np.float32)
def _extract_labels(filename, num_classes=10): """??????????????? :param filename: ????? :param num_classes: ??one-hot??????????10? :return: 2??numpy??[index, num_classes]? ???np.float32 """ labels = [] print('Extracting {}'.format(filename)) with gzip.GzipFile(fileobj=open(filename, 'rb')) as f: buf = f.read() index = 0 magic, num_labels = struct.unpack_from('>II', buf, index) if magic != 2049: raise ValueError('Invalid magic number {} in MNIST label file: {}'.format(magic, filename)) index += struct.calcsize('>II') for i in range(num_labels): label = struct.unpack_from('>B', buf, index) index += struct.calcsize('>B') label_one_hot = np.zeros(num_classes, dtype=np.float32) label_one_hot[label[0]] = 1 labels.append(label_one_hot) return np.array(labels, dtype=np.float32)
def read_image(filename): f = open(filename, 'rb') index = 0 buf = f.read() f.close() magic, images, rows, columns = struct.unpack_from('>IIII' , buf , index) index += struct.calcsize('>IIII') for i in xrange(images): #for i in xrange(2000): image = Image.new('L', (columns, rows)) for x in xrange(rows): for y in xrange(columns): image.putpixel((y, x), int(struct.unpack_from('>B', buf, index)[0])) index += struct.calcsize('>B') print 'save ' + str(i) + 'image' image.save('./test/' + str(i) + '.png')
def read_label(filename, saveFilename): f = open(filename, 'rb') index = 0 buf = f.read() f.close() magic, labels = struct.unpack_from('>II' , buf , index) index += struct.calcsize('>II') labelArr = [0] * labels #labelArr = [0] * 2000 for x in xrange(labels): #for x in xrange(2000): labelArr[x] = int(struct.unpack_from('>B', buf, index)[0]) index += struct.calcsize('>B') save = open(saveFilename, 'w') save.write(','.join(map(lambda x: str(x), labelArr))) save.write('\n') save.close() print 'save labels success'
def unpack(self, buf): if len(buf) < self.pyr_len: raise StorageError("Buffer too short") self._magic, essidlen = struct.unpack(self.pyr_head, \ buf[:self.pyr_len]) if self._magic == 'PYR2': self._delimiter = '\n' elif self._magic == 'PYRT': self._delimiter = '\00' else: raise StorageError("Not a PYRT- or PYR2-buffer.") headfmt = "<%ssi16s" % (essidlen, ) headsize = struct.calcsize(headfmt) header = buf[self.pyr_len:self.pyr_len + headsize] if len(header) != headsize: raise StorageError("Invalid header size") header = struct.unpack(headfmt, header) self.essid, self._numElems, self._digest = header pmkoffset = self.pyr_len + headsize pwoffset = pmkoffset + self._numElems * 32 self._pwbuffer = buf[pwoffset:] self._pmkbuffer = buf[pmkoffset:pwoffset] if len(self._pmkbuffer) % 32 != 0: raise StorageError("pmkbuffer seems truncated")
def valid_ranges(*types): # given a sequence of numeric types, collect their _type_ # attribute, which is a single format character compatible with # the struct module, use the struct module to calculate the # minimum and maximum value allowed for this format. # Returns a list of (min, max) values. result = [] for t in types: fmt = t._type_ size = struct.calcsize(fmt) a = struct.unpack(fmt, ("\x00"*32)[:size])[0] b = struct.unpack(fmt, ("\xFF"*32)[:size])[0] c = struct.unpack(fmt, ("\x7F"+"\x00"*32)[:size])[0] d = struct.unpack(fmt, ("\x80"+"\xFF"*32)[:size])[0] result.append((min(a, b, c, d), max(a, b, c, d))) return result
def _types_bitmap(cls, data, error): bits = [] o = 0 while o < len(data): fmt = "!BB" fmtsz = struct.calcsize(fmt) dat = data[o:o+fmtsz] if len(dat) != fmtsz: e = ("_types_bitmap", o, 'offset out of range: data size = %d' % len(data)) error.append(e) return None block, bytes = struct.unpack(fmt, dat) o += fmtsz for i in range(bytes): b = struct.unpack("!B", data[o+i:o+i+1])[0] for j in range(8): if b & (1 << (7-j)): bits.append((block*32+i)*8+j) o += bytes return bits
def _do_query(cls, buf, offset, error): qry = {} res = cls._do_name(buf, offset, 0, error) if res is None: e = ("_do_query", offset, "_do_name failed") error.append(e) return None offset, name = res qry['Qname'] = name fmt = "!HH" reqlen = struct.calcsize(fmt) strng = buf[offset:offset + reqlen] if len(strng) != reqlen: e = ("_do_query", offset, 'offset out of range: buf size = %d' % len(buf)) error.append(e) return None res = struct.unpack(fmt, strng) qry['Qtype'] = cls._type_to_text(res[0]) qry['Qclass'] = cls._class_to_text(res[1]) return offset + reqlen, qry
def calc_padding(fmt, align): """Calculate how many padding bytes needed for ``fmt`` to be aligned to ``align``. Args: fmt (str): :mod:`struct` format. align (int): alignment (2, 4, 8, etc.) Returns: str: padding format (e.g., various number of 'x'). >>> calc_padding('b', 2) 'x' >>> calc_padding('b', 3) 'xx' """ remain = struct.calcsize(fmt) % align if remain == 0: return "" return 'x' * (align - remain)
def _read_extras(self): dbfile = self.dbfile # Read the extras HashReader._read_extras(self) # Set up for reading the index array indextype = self.extras["indextype"] self.indexbase = dbfile.tell() self.indexlen = self.extras["indexlen"] self.indexsize = struct.calcsize(indextype) # Set up the function to read values from the index array if indextype == "B": self._get_pos = dbfile.get_byte elif indextype == "H": self._get_pos = dbfile.get_ushort elif indextype == "i": self._get_pos = dbfile.get_int elif indextype == "I": self._get_pos = dbfile.get_uint elif indextype == "q": self._get_pos = dbfile.get_long else: raise Exception("Unknown index type %r" % indextype)
def doVersion(checkForArgs=True): forceCheck = simple = False if checkForArgs: while Cmd.ArgumentsRemaining(): myarg = getArgument() if myarg == u'check': forceCheck = True elif myarg == u'simple': simple = True else: unknownArgumentExit() if simple: writeStdout(__version__) return import struct version_data = u'GAM {0} - {1}\n{2}\nPython {3}.{4}.{5} {6}-bit {7}\ngoogle-api-python-client {8}\noauth2client {9}\n{10} {11}\nPath: {12}\n' writeStdout(version_data.format(__version__, GAM_URL, __author__, sys.version_info[0], sys.version_info[1], sys.version_info[2], struct.calcsize(u'P')*8, sys.version_info[3], googleapiclient.__version__, oauth2client.__version__, platform.platform(), platform.machine(), GM.Globals[GM.GAM_PATH])) if forceCheck: doGAMCheckForUpdates(forceCheck=True) # gam help
def receive(channel): """ Receive a message from a channel """ size = struct.calcsize("L") size = channel.recv(size) try: size = socket.ntohl(struct.unpack("L", size)[0]) except struct.error as e: return '' buf = "" while len(buf) < size: buf = channel.recv(size - len(buf)) return pickle.loads(buf)[0]
def UnpackNMITEMACTIVATE(lparam): format = _nmhdr_fmt + _nmhdr_align_padding if is64bit: # the struct module doesn't handle this correctly as some of the items # are actually structs in structs, which get individually aligned. format = format + "iiiiiiixxxxP" else: format = format + "iiiiiiiP" buf = win32gui.PyMakeBuffer(struct.calcsize(format), lparam) return _MakeResult("NMITEMACTIVATE hwndFrom idFrom code iItem iSubItem uNewState uOldState uChanged actionx actiony lParam", struct.unpack(format, buf)) # MENUITEMINFO struct # http://msdn.microsoft.com/library/default.asp?url=/library/en-us/winui/WinUI/WindowsUserInterface/Resources/Menus/MenuReference/MenuStructures/MENUITEMINFO.asp # We use the struct module to pack and unpack strings as MENUITEMINFO # structures. We also have special handling for the 'fMask' item in that # structure to avoid the caller needing to explicitly check validity # (None is used if the mask excludes/should exclude the value)
def PackMENUINFO(dwStyle = None, cyMax = None, hbrBack = None, dwContextHelpID = None, dwMenuData = None, fMask = 0): if dwStyle is None: dwStyle = 0 else: fMask |= win32con.MIM_STYLE if cyMax is None: cyMax = 0 else: fMask |= win32con.MIM_MAXHEIGHT if hbrBack is None: hbrBack = 0 else: fMask |= win32con.MIM_BACKGROUND if dwContextHelpID is None: dwContextHelpID = 0 else: fMask |= win32con.MIM_HELPID if dwMenuData is None: dwMenuData = 0 else: fMask |= win32con.MIM_MENUDATA # Create the struct. item = struct.pack( _menuinfo_fmt, struct.calcsize(_menuinfo_fmt), # cbSize fMask, dwStyle, cyMax, hbrBack, dwContextHelpID, dwMenuData) return array.array("b", item)
def _StructPackDecoder(wire_type, format): """Return a constructor for a decoder for a fixed-width field. Args: wire_type: The field's wire type. format: The format string to pass to struct.unpack(). """ value_size = struct.calcsize(format) local_unpack = struct.unpack # Reusing _SimpleDecoder is slightly slower than copying a bunch of code, but # not enough to make a significant difference. # Note that we expect someone up-stack to catch struct.error and convert # it to _DecodeError -- this way we don't have to set up exception- # handling blocks every time we parse one value. def InnerDecode(buffer, pos): new_pos = pos + value_size result = local_unpack(format, buffer[pos:new_pos])[0] return (result, new_pos) return _SimpleDecoder(wire_type, InnerDecode)
def _process_scrape(self, payload, trans): info_struct = '!LLL' info_size = struct.calcsize(info_struct) info_count = len(payload) / info_size hashes = trans['sent_hashes'] response = {} for info_offset in xrange(info_count): off = info_size * info_offset info = payload[off:off + info_size] seeders, completed, leechers = struct.unpack(info_struct, info) response[hashes[info_offset]] = { 'seeders': seeders, 'completed': completed, 'leechers': leechers, } return response
def send_one_ping(my_socket, dest_addr, ID): """ Send one ping to the given >dest_addr<. """ dest_addr = socket.gethostbyname(dest_addr) # Header is type (8), code (8), checksum (16), id (16), sequence (16) my_checksum = 0 # Make a dummy heder with a 0 checksum. header = struct.pack("bbHHh", ICMP_ECHO_REQUEST, 0, my_checksum, ID, 1) bytesInDouble = struct.calcsize("d") data = (192 - bytesInDouble) * "Q" data = struct.pack("d", default_timer()) + data # Calculate the checksum on the data and the dummy header. my_checksum = checksum(header + data) # Now that we have the right checksum, we put that in. It's just easier # to make up a new header than to stuff it into the dummy. header = struct.pack( "bbHHh", ICMP_ECHO_REQUEST, 0, socket.htons(my_checksum), ID, 1 ) packet = header + data my_socket.sendto(packet, (dest_addr, 1)) # Don't know about the 1
def get_platform(): """ get_platform() Get a string that specifies the platform more specific than sys.platform does. The result can be: linux32, linux64, win32, win64, osx32, osx64. Other platforms may be added in the future. """ # Get platform if sys.platform.startswith('linux'): plat = 'linux%i' elif sys.platform.startswith('win'): plat = 'win%i' elif sys.platform.startswith('darwin'): plat = 'osx%i' else: # pragma: no cover return None return plat % (struct.calcsize('P') * 8) # 32 or 64 bits
def decryptStreamChunkOld(self,response, wfile, chunksize=24*1024, startOffset=0): if ENCRYPTION_ENABLE == 0: return # with open(in_filename, 'rb') as infile: origsize = struct.unpack('<Q', response.read(struct.calcsize('Q')))[0] decryptor = AES.new(self.key, AES.MODE_ECB) count = 0 while True: chunk = response.read(chunksize) count = count + 1 if len(chunk) == 0: break responseChunk = decryptor.decrypt(chunk) if count == 1 and startOffset !=0: wfile.write(responseChunk[startOffset:]) elif (len(chunk)) < (len(responseChunk.strip())): wfile.write(responseChunk.strip()) else: wfile.write(responseChunk)
def decryptCalculatePadding(self,response, chunksize=24*1024): if ENCRYPTION_ENABLE == 0: return # with open(in_filename, 'rb') as infile: origsize = struct.unpack('<Q', response.read(struct.calcsize('Q')))[0] decryptor = AES.new(self.key, AES.MODE_ECB) count = 0 while True: chunk = response.read(chunksize) count = count + 1 if len(chunk) == 0: break responseChunk = decryptor.decrypt(chunk) return int(len(chunk) - len(responseChunk.strip()))
def read_texture_names(self): """Iterate through all brush textures in the map.""" tex_data = self.get_lump(BSP_LUMPS.TEXDATA_STRING_DATA) tex_table = self.get_lump(BSP_LUMPS.TEXDATA_STRING_TABLE) # tex_table is an array of int offsets into tex_data. tex_data is a # null-terminated block of strings. table_offsets = struct.unpack( # The number of ints + i, for the repetitions in the struct. str(len(tex_table) // struct.calcsize('i')) + 'i', tex_table, ) for off in table_offsets: # Look for the NULL at the end - strings are limited to 128 chars. str_off = 0 for str_off in range(off, off + 128): if tex_data[str_off] == 0: yield tex_data[off: str_off].decode('ascii') break else: # Reached the 128 char limit without finding a null. raise ValueError('Bad string at', off, 'in BSP! ("{}")'.format( tex_data[off:str_off] ))
def get_size(self): # XXX: even more hackish, we need a better way if self.type.is_void(): return 0 elif self.type.is_bool(): # not strictly correct, but we cannot return 1/8 return 0 import struct return struct.calcsize(self.get_fmt()) # ============================================= # hand-written union subclasses # ============================================= # # As of now, the compiler is not capable of generating different subclasses # for each union tag. In the meantime, write it by hand
def check(self, fmt, value): from random import randrange # build a buffer which is surely big enough to contain what we need # and check: # 1) that we correctly write the bytes we expect # 2) that we do NOT write outside the bounds # pattern = [six.int2byte(randrange(256)) for _ in range(256)] pattern = b''.join(pattern) buf = bytearray(pattern) buf2 = bytearray(pattern) offset = 16 pack_into(ord(fmt), buf, offset, value) struct.pack_into(fmt, buf2, offset, value) assert buf == buf2 # # check that it raises if it's out of bound out_of_bound = 256-struct.calcsize(fmt)+1 pytest.raises(IndexError, "pack_into(ord(fmt), buf, out_of_bound, value)")
def _get_launcher(self, kind): if struct.calcsize('P') == 8: # 64-bit bits = '64' else: bits = '32' name = '%s%s.exe' % (kind, bits) # Issue 31: don't hardcode an absolute package name, but # determine it relative to the current package distlib_package = __name__.rsplit('.', 1)[0] result = finder(distlib_package).find(name).bytes return result # Public API follows
def is_64bit(): return struct.calcsize("P") == 8
def _process_pHYs(self, data): # http://www.w3.org/TR/PNG/#11pHYs self.phys = data fmt = "!LLB" if len(data) != struct.calcsize(fmt): raise FormatError("pHYs chunk has incorrect length.") self.x_pixels_per_unit, self.y_pixels_per_unit, unit = struct.unpack(fmt,data) self.unit_is_meter = bool(unit)
def __unpack__(self, type_, buf, _size=None): fmt = self.endian + type_ size = struct.calcsize(fmt) if _size is None else _size try: unpacked = struct.unpack(fmt, buf[:size]), buf[size:] except struct.error as exc: raise_from(UnpackError("Unable to unpack structure"), exc) else: return unpacked
def __len__(self): fmt = '' more_len = 0 for field in self._field_names_: fmt_ = self._field_format_[field] if isinstance(fmt_, StructArray): fmt += fmt_.get_struct() elif isinstance(fmt_, type) and issubclass(fmt_, Struct): more_len = len(fmt) elif fmt_ != 'variable': fmt += fmt_ hdr_len = struct.calcsize(fmt) + more_len if hasattr(self, 'data'): hdr_len += len(self.data) return hdr_len
def pack_main_header_keywords(hdr): """ Replace the 'keywords' field of the given BLUE header dictionary with the X-Midas packed (str) form of the main header keywords. The order of the key value pairs is indeterminate. In packed form, keys are separated from values by a single '=', key-value pairs are separated from one another by a single '\0' and all values are stringized using str(). Hence, each key value pair takes up keylength + stringized value length + 2 characters. If the resulting packed string is longer than the max allowed for the BLUE header main keywords (96 characters) the string is truncated. If no 'keywords' field is present or if it is an empty dict, then the keyword fields are updated to represent an empty main header keywords section. """ keydict = hdr.get('keywords', {}) if keydict: hdr['keywords'] = '\0'.join([k + '=' + str(v) for k,v in keydict.items()]) + '\0' hdr['keylength'] = min(len(hdr['keywords']), struct.calcsize(_bluestructs['HEADER'] ['lookups']['keywords'][1])) if hdr['keylength'] < len(hdr['keywords']): print "WARNING: Main header keywords truncated" else: hdr['keywords'] = '\0' hdr['keylength'] = 0
def _getCaps(self): bits = struct.calcsize(self.datatype) * 8 if self.datatype in ('f', 'd'): return gst.Caps('audio/x-raw-float,endianness=%s,width=%d,rate=%d,channels=1' % (self.ENDIANNESS, bits, self.sample_rate)) else: # In struct module, unsigned types are uppercase, signed are lower if self.datatype.isupper(): signed = 'false' else: signed = 'true' return gst.Caps('audio/x-raw-int,endianness=%s,signed=%s,width=%d,depth=%d,rate=%d,channels=1' % (self.ENDIANNESS, signed, bits, bits, self.sample_rate))
def __init__(self, name, PortTypeClass, PortTransferType=TRANSFER_TYPE, logger=None, noData=None ): self.name = name self.logger = logger self.PortType = PortTypeClass self.PortTransferType=PortTransferType self.outConnections = {} # key=connectionId, value=port self.stats = OutStats(self.name, PortTransferType ) self.port_lock = threading.Lock() self.sriDict = {} # key=streamID value=SriMapStruct self.filterTable = [] if noData==None: self.noData = [] else: self.noData = noData # Determine maximum transfer size in advance self.byteSize = 1 if self.PortTransferType: self.byteSize = struct.calcsize(PortTransferType) # Multiply by some number < 1 to leave some margin for the CORBA header self.maxSamplesPerPush = int(MAX_TRANSFER_BYTES*.9)/self.byteSize # Make sure maxSamplesPerPush is even so that complex data case is handled properly if self.maxSamplesPerPush%2 != 0: self.maxSamplesPerPush = self.maxSamplesPerPush - 1 if self.logger == None: self.logger = logging.getLogger("redhawk.bulkio.outport."+name) if self.logger: self.logger.debug('bulkio::OutPort CTOR port:' + str(self.name))
def __init__(self, name, element_type ): self.enabled = True self.bitSize = struct.calcsize(element_type) * 8 self.historyWindow = 10 self.receivedStatistics = {} self.name = name self.receivedStatistics_idx = {} self.activeStreamIDs = [] self.connection_errors={}
def __init__(self, name, element_type ): self.enabled = True self.flushTime = None self.historyWindow = 10 self.receivedStatistics = [] self.name = name self.receivedStatistics_idx = 0 self.bitSize = struct.calcsize(element_type) * 8 self.activeStreamIDs = [] for i in range(self.historyWindow): self.receivedStatistics.append(self.statPoint()) self.runningStats = None
def FileHeader(self): """Return the per-file header as a string.""" dt = self.date_time dosdate = (dt[0] - 1980) << 9 | dt[1] << 5 | dt[2] dostime = dt[3] << 11 | dt[4] << 5 | (dt[5] // 2) if self.flag_bits & 0x08: # Set these to zero because we write them after the file data CRC = compress_size = file_size = 0 else: CRC = self.CRC compress_size = self.compress_size file_size = self.file_size extra = self.extra if file_size > ZIP64_LIMIT or compress_size > ZIP64_LIMIT: # File is larger than what fits into a 4 byte integer, # fall back to the ZIP64 extension fmt = '<HHQQ' extra = extra + struct.pack(fmt, 1, struct.calcsize(fmt)-4, file_size, compress_size) file_size = 0xffffffff compress_size = 0xffffffff self.extract_version = max(45, self.extract_version) self.create_version = max(45, self.extract_version) filename, flag_bits = self._encodeFilenameFlags() header = struct.pack(structFileHeader, stringFileHeader, self.extract_version, self.reserved, flag_bits, self.compress_type, dostime, dosdate, CRC, compress_size, file_size, len(filename), len(extra)) return header + filename + extra
def _findSoname_ldconfig(name): import struct if struct.calcsize('l') == 4: machine = os.uname()[4] + '-32' else: machine = os.uname()[4] + '-64' mach_map = { 'x86_64-64': 'libc6,x86-64', 'ppc64-64': 'libc6,64bit', 'sparc64-64': 'libc6,64bit', 's390x-64': 'libc6,64bit', 'ia64-64': 'libc6,IA-64', } abi_type = mach_map.get(machine, 'libc6') # XXX assuming GLIBC's ldconfig (with option -p) expr = r'(\S+)\s+\((%s(?:, OS ABI:[^\)]*)?)\)[^/]*(/[^\(\)\s]*lib%s\.[^\(\)\s]*)' \ % (abi_type, re.escape(name)) f = os.popen('/sbin/ldconfig -p 2>/dev/null') try: data = f.read() finally: f.close() res = re.search(expr, data) if not res: return None return res.group(1)
def _check_size(typ, typecode=None): # Check if sizeof(ctypes_type) against struct.calcsize. This # should protect somewhat against a misconfigured libffi. from struct import calcsize if typecode is None: # Most _type_ codes are the same as used in struct typecode = typ._type_ actual, required = sizeof(typ), calcsize(typecode) if actual != required: raise SystemError("sizeof(%s) wrong: %d instead of %d" % \ (typ, actual, required))