我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用struct.error()。
def v4_int_to_packed(address): """Represent an address as 4 packed bytes in network (big-endian) order. Args: address: An integer representation of an IPv4 IP address. Returns: The integer address packed as 4 bytes in network (big-endian) order. Raises: ValueError: If the integer is negative or too large to be an IPv4 IP address. """ try: return _compat_to_bytes(address, 4, 'big') except (struct.error, OverflowError): raise ValueError("Address negative or too large for IPv4")
def _parse_hextet(cls, hextet_str): """Convert an IPv6 hextet string into an integer. Args: hextet_str: A string, the number to parse. Returns: The hextet as an integer. Raises: ValueError: if the input isn't strictly a hex number from [0..FFFF]. """ # Whitelist the characters, since int() allows a lot of bizarre stuff. if not cls._HEX_DIGITS.issuperset(hextet_str): raise ValueError("Only hex digits permitted in %r" % hextet_str) # We do the length check second, since the invalid character error # is likely to be more informative for the user if len(hextet_str) > 4: msg = "At most 4 characters permitted in %r" raise ValueError(msg % hextet_str) # Length check means we can skip checking the integer value return int(hextet_str, 16)
def main(): """Small main program""" import sys, getopt try: opts, args = getopt.getopt(sys.argv[1:], 'deut') except getopt.error as msg: sys.stdout = sys.stderr print(msg) print("""usage: %s [-d|-e|-u|-t] [file|-] -d, -u: decode -e: encode (default) -t: encode and decode string 'Aladdin:open sesame'"""%sys.argv[0]) sys.exit(2) func = encode for o, a in opts: if o == '-e': func = encode if o == '-d': func = decode if o == '-u': func = decode if o == '-t': test(); return if args and args[0] != '-': with open(args[0], 'rb') as f: func(f, sys.stdout.buffer) else: func(sys.stdin.buffer, sys.stdout.buffer)
def _process_tRNS(self, data): # http://www.w3.org/TR/PNG/#11tRNS self.trns = data if self.colormap: if not self.plte: warnings.warn("PLTE chunk is required before tRNS chunk.") else: if len(data) > len(self.plte)/3: # Was warning, but promoted to Error as it # would otherwise cause pain later on. raise FormatError("tRNS chunk is too long.") else: if self.alpha: raise FormatError( "tRNS chunk is not valid with colour type %d." % self.color_type) try: self.transparent = \ struct.unpack("!%dH" % self.color_planes, data) except struct.error: raise FormatError("tRNS chunk has incorrect length.")
def _handle_parsing_error(func): """Decorator for handling errors in raw output data.""" @six.wraps(func) def wrapper(raw_data): msg = _('Data from Intel Node Manager %s') try: return func(raw_data) except (IndexError, struct.error): raise ironic_exception.IPMIFailure(msg % _('has wrong length.')) except KeyError: raise ironic_exception.IPMIFailure(msg % _('is corrupted.')) except ValueError: raise ironic_exception.IPMIFailure(msg % _('cannot be converted.')) return wrapper
def __pack__(self): fmt = self.endian value_list = [] for field in self._field_names_: fmt_ = self._field_format_[field] val = getattr(self, field) if isinstance(fmt_, StructArray): value_list.extend([ getattr(struct_, field) for struct_ in fmt_.structure_list for field in struct_._field_names_ ]) elif isinstance(fmt_, basestring) and (fmt_.startswith('BBB') or fmt_.startswith('bbb')): value_list.extend([(val >> i & 0xFF) for i in [x for x in range(0, len(fmt_) * 8, 8)]]) else: try: value_list.append(val.encode('ascii', 'ignore')) except AttributeError: value_list.append(val) fmt += str(len(val)) + 's' if fmt_ == 'variable' else fmt_.get_struct() if isinstance(fmt_, StructArray) else fmt_ try: return struct.pack(fmt, *value_list) except struct.error as exc: raise_from(PackError("Unable to pack structure"), exc)
def __init__(self, file, align=True, bigendian=True, inclheader=False): import struct self.closed = False self.align = align # whether to align to word (2-byte) boundaries if bigendian: strflag = '>' else: strflag = '<' self.file = file self.chunkname = file.read(4) if len(self.chunkname) < 4: raise EOFError try: self.chunksize = struct.unpack(strflag+'L', file.read(4))[0] except struct.error: raise EOFError if inclheader: self.chunksize = self.chunksize - 8 # subtract header self.size_read = 0 try: self.offset = self.file.tell() except (AttributeError, IOError): self.seekable = False else: self.seekable = True
def seek(self, pos, whence=0): """Seek to specified position into the chunk. Default position is 0 (start of chunk). If the file is not seekable, this will result in an error. """ if self.closed: raise ValueError, "I/O operation on closed file" if not self.seekable: raise IOError, "cannot seek" if whence == 1: pos = pos + self.size_read elif whence == 2: pos = pos + self.chunksize if pos < 0 or pos > self.chunksize: raise RuntimeError self.file.seek(self.offset + pos, 0) self.size_read = pos
def close(self, remove=os.unlink,error=os.error): if self.pipe: rc = self.pipe.close() else: rc = 255 if self.tmpfile: try: remove(self.tmpfile) except error: pass return rc # Alias
def _node(default=''): """ Helper to determine the node name of this machine. """ try: import socket except ImportError: # No sockets... return default try: return socket.gethostname() except socket.error: # Still not working... return default # os.path.abspath is new in Python 1.5.2:
def _iesubelmsmtreqstasta_(s,sid): """ :returns: parsed subelement of type sta request for sta counters in msmt request """ ret = s if sid == std.EID_MSMT_REQ_SUBELEMENT_STA_RPT: # Std Fig. 8-117 cnt,to,t = struct.unpack_from('=I2H',s) ts = s[8:] ret = {'msmt-cnt':cnt, 'trigger-timeout':to, 'sta-cntr-trigger-cond':_stacntrtriggerconds_(t), 'thresholds':{}} # optional count fields are 4-bytes assuming they are appending in order for thresh in ['fail','fcs-error','mult-retry','dup','rts-fail','ack-fail','retry-cnt']: if ret['sta-cntr-trigger-cond'][thresh]: ret['thresholds'][thresh] = struct.unpack_from('=I',ts)[0] ts = ts[4:] elif sid == std.EID_MSMT_REQ_SUBELEMENT_STA_VEND: ret = _parseie_(std.EID_VEND_SPEC,s) return ret # STA Counter Trigger Conditions Std Fig. 8-118
def ampduflags_get(mn,f): try: return bitmask_get(_AMPDU_FLAGS_,mn,f) except KeyError: raise error("Invalid A-MPDU flag {0}".format(f)) # --> VHT Known <-- http://www.radiotap.org/defined-fields/VHT # stbc STBC known # txop TXOP_PS_NOT_ALLOWED known # gi Guard Interval # short Short GI NYSI disambiguation known # ldpc LDPC extra OFDM symbol known # beam Beamformed known # bw Bandwidth known # gid Group ID known # paid Partial AID known
def vhtknown_get(mn,f): try: return bitmask_get(_VHT_KNOWN_,mn,f) except KeyError: raise error("Invalid VHT Known field {0}".format(f)) # --> VHT Flags <-- http://www.radiotap.org/defined-fields/VHT # stbc Space-Time Block Coding (Set to 0 if no spatial streams of any user has STBC. # Set to 1 if all spatial streams of all users have STBC. # txop TXOP_PS_NOT_ALLOWED known Valid only for AP transmitters. Set to 0 if STAs # may doze during TXOP. Set to 1 if STAs may not doze during TXOP or trans # is non-AP. # gi Guard Interval Set to 0 for long GI. Set to 1 for short GI. # short Short GI NYSI disambiguation Valid only if short GI is used. Set to 0 if # NSYM mod 10 != 9 or short GI not used. Set to 1 if NSYM mod 10 = 9. # ldpc LDPC extra OFDM symbol Set to 1 if >= 1 users are using LDPC and the encoding # process resulted in extra OFDM symbol(s). Set to 0 otherwise # beam Beamformed Valid for SU PPDUs only
def close(self): """ Closes the file pointer, if possible. This operation will destroy the image core and release its memory. The image data will be unusable afterward. This function is only required to close images that have not had their file read and closed by the :py:meth:`~PIL.Image.Image.load` method. """ try: self.fp.close() except Exception as msg: logger.debug("Error closing: %s", msg) # Instead of simply setting to None, we're setting up a # deferred error that will better explain that the core image # object is gone. self.im = deferred_error(ValueError("Operation on closed image"))
def getData(self): """Assemble the data for this writer/table, without subtables.""" items = list(self.items) # make a shallow copy pos = self.pos numItems = len(items) for i in range(numItems): item = items[i] if hasattr(item, "getData"): if item.longOffset: items[i] = packULong(item.pos - pos) else: try: items[i] = packUShort(item.pos - pos) except struct.error: # provide data to fix overflow problem. overflowErrorRecord = self.getOverflowErrorRecord(item) raise OTLOffsetOverflowError(overflowErrorRecord) return bytesjoin(items)
def _bson_to_dict(data, opts): """Decode a BSON string to document_class.""" try: obj_size = _UNPACK_INT(data[:4])[0] except struct.error as exc: raise InvalidBSON(str(exc)) if obj_size != len(data): raise InvalidBSON("invalid object size") if data[obj_size - 1:obj_size] != b"\x00": raise InvalidBSON("bad eoo") try: if _raw_document_class(opts.document_class): return opts.document_class(data, opts) return _elements_to_dict(data, 4, obj_size - 1, opts) except InvalidBSON: raise except Exception: # Change exception type to InvalidBSON but preserve traceback. _, exc_value, exc_tb = sys.exc_info() reraise(InvalidBSON, exc_value, exc_tb)
def crc(self, cid, data): "Read and verify checksum" # Skip CRC checks for ancillary chunks if allowed to load truncated images # 5th byte of first char is 1 [specs, section 5.4] if ImageFile.LOAD_TRUNCATED_IMAGES and (i8(cid[0]) >> 5 & 1): self.crc_skip(cid, data) return try: crc1 = Image.core.crc32(data, Image.core.crc32(cid)) crc2 = i16(self.fp.read(2)), i16(self.fp.read(2)) if crc1 != crc2: raise SyntaxError("broken PNG file (bad header checksum in %r)" % cid) except struct.error: raise SyntaxError("broken PNG file (incomplete checksum in %r)" % cid)
def close(self): """ Closes the file pointer, if possible. This operation will destroy the image core and release its memory. The image data will be unusable afterward. This function is only required to close images that have not had their file read and closed by the :py:meth:`~PIL.Image.Image.load` method. """ try: self.fp.close() self.fp = None except Exception as msg: logger.debug("Error closing: %s", msg) if getattr(self, 'map', None): self.map = None # Instead of simply setting to None, we're setting up a # deferred error that will better explain that the core image # object is gone. self.im = deferred_error(ValueError("Operation on closed image"))
def _read_elf_file(self, f): # read the ELF file header LEN_FILE_HEADER = 0x34 try: (ident,_type,machine,_version, self.entrypoint,_phoff,shoff,_flags, _ehsize, _phentsize,_phnum,_shentsize, _shnum,shstrndx) = struct.unpack("<16sHHLLLLLHHHHHH", f.read(LEN_FILE_HEADER)) except struct.error as e: raise FatalError("Failed to read a valid ELF header from %s: %s" % (self.name, e)) if byte(ident, 0) != 0x7f or ident[1:4] != b'ELF': raise FatalError("%s has invalid ELF magic header" % self.name) if machine != 0x5e: raise FatalError("%s does not appear to be an Xtensa ELF file. e_machine=%04x" % (self.name, machine)) self._read_sections(f, shoff, shstrndx)
def __init__(self, address): """ Args: address: A string or integer representing the IP '192.168.1.1' Additionally, an integer can be passed, so IPv4Address('192.168.1.1') == IPv4Address(3232235777). or, more generally IPv4Address(int(IPv4Address('192.168.1.1'))) == IPv4Address('192.168.1.1') Raises: AddressValueError: If ipaddr isn't a valid IPv4 address. """ _BaseV4.__init__(self, address) # Efficient constructor from integer. if isinstance(address, (int, long)): self._ip = address if address < 0 or address > self._ALL_ONES: raise AddressValueError(address) return # Constructing from a packed address if isinstance(address, Bytes): try: self._ip, = struct.unpack('!I', address) except struct.error: raise AddressValueError(address) # Wrong length. return # Assume input argument to be string or any object representation # which converts into a formatted IP string. addr_str = str(address) self._ip = self._ip_int_from_string(addr_str)
def _compat_to_bytes(intval, length, endianess): assert isinstance(intval, _compat_int_types) assert endianess == 'big' if length == 4: if intval < 0 or intval >= 2 ** 32: raise struct.error("integer out of range for 'I' format code") return struct.pack(b'!I', intval) elif length == 16: if intval < 0 or intval >= 2 ** 128: raise struct.error("integer out of range for 'QQ' format code") return struct.pack(b'!QQ', intval >> 64, intval & 0xffffffffffffffff) else: raise NotImplementedError()
def v6_int_to_packed(address): """Represent an address as 16 packed bytes in network (big-endian) order. Args: address: An integer representation of an IPv6 IP address. Returns: The integer address packed as 16 bytes in network (big-endian) order. """ try: return _compat_to_bytes(address, 16, 'big') except (struct.error, OverflowError): raise ValueError("Address negative or too large for IPv6")
def _parse_octet(cls, octet_str): """Convert a decimal octet into an integer. Args: octet_str: A string, the number to parse. Returns: The octet as an integer. Raises: ValueError: if the octet isn't strictly a decimal from [0..255]. """ if not octet_str: raise ValueError("Empty octet not permitted") # Whitelist the characters, since int() allows a lot of bizarre stuff. if not cls._DECIMAL_DIGITS.issuperset(octet_str): msg = "Only decimal digits permitted in %r" raise ValueError(msg % octet_str) # We do the length check second, since the invalid character error # is likely to be more informative for the user if len(octet_str) > 3: msg = "At most 3 characters permitted in %r" raise ValueError(msg % octet_str) # Convert to integer (we know digits are legal) octet_int = int(octet_str, 10) # Any octets that look like they *might* be written in octal, # and which don't look exactly the same in both octal and # decimal are rejected as ambiguous if octet_int > 7 and octet_str[0] == '0': msg = "Ambiguous (octal/decimal) value in %r not permitted" raise ValueError(msg % octet_str) if octet_int > 255: raise ValueError("Octet %d (> 255) not permitted" % octet_int) return octet_int
def iterstraight(self, raw): """Iterator that undoes the effect of filtering, and yields each row in serialised format (as a sequence of bytes). Assumes input is straightlaced. `raw` should be an iterable that yields the raw bytes in chunks of arbitrary size. """ # length of row, in bytes rb = self.row_bytes a = array('B') # The previous (reconstructed) scanline. None indicates first # line of image. recon = None for some in raw: a.extend(some) while len(a) >= rb + 1: filter_type = a[0] scanline = a[1:rb+1] del a[:rb+1] recon = self.undo_filter(filter_type, scanline, recon) yield recon if len(a) != 0: # :file:format We get here with a file format error: # when the available bytes (after decompressing) do not # pack into exact rows. raise FormatError( 'Wrong size for decompressed IDAT chunk.') assert len(a) == 0
def _process_bKGD(self, data): try: if self.colormap: if not self.plte: warnings.warn( "PLTE chunk is required before bKGD chunk.") self.background = struct.unpack('B', data) else: self.background = struct.unpack("!%dH" % self.color_planes, data) except struct.error: raise FormatError("bKGD chunk has incorrect length.")
def _process_gAMA(self, data): try: self.gamma = struct.unpack("!L", data)[0] / 100000.0 except struct.error: raise FormatError("gAMA chunk has incorrect length.")
def __unpack__(self, type_, buf, _size=None): fmt = self.endian + type_ size = struct.calcsize(fmt) if _size is None else _size try: unpacked = struct.unpack(fmt, buf[:size]), buf[size:] except struct.error as exc: raise_from(UnpackError("Unable to unpack structure"), exc) else: return unpacked