我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用os.strerror()。
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): def decorator(func): def _handle_timeout(signum, frame): raise TimeoutError(error_message) def wrapper(*args, **kwargs): signal.signal(signal.SIGALRM, _handle_timeout) signal.alarm(seconds) try: result = func(*args, **kwargs) finally: signal.alarm(0) return result return wraps(func)(wrapper) return decorator
def make_filesystem(blk_device, fstype='ext4', timeout=10): """Make a new filesystem on the specified block device.""" count = 0 e_noent = os.errno.ENOENT while not os.path.exists(blk_device): if count >= timeout: log('Gave up waiting on block device %s' % blk_device, level=ERROR) raise IOError(e_noent, os.strerror(e_noent), blk_device) log('Waiting for block device %s to appear' % blk_device, level=DEBUG) count += 1 time.sleep(1) else: log('Formatting block device %s as filesystem %s.' % (blk_device, fstype), level=INFO) check_call(['mkfs', '-t', fstype, blk_device])
def test_connection_refused(self): cleanup_func, port = refusing_port() self.addCleanup(cleanup_func) with ExpectLog(gen_log, ".*", required=False): self.http_client.fetch("http://127.0.0.1:%d/" % port, self.stop) response = self.wait() self.assertEqual(599, response.code) if sys.platform != 'cygwin': # cygwin returns EPERM instead of ECONNREFUSED here contains_errno = str(errno.ECONNREFUSED) in str(response.error) if not contains_errno and hasattr(errno, "WSAECONNREFUSED"): contains_errno = str(errno.WSAECONNREFUSED) in str(response.error) self.assertTrue(contains_errno, response.error) # This is usually "Connection refused". # On windows, strerror is broken and returns "Unknown error". expected_message = os.strerror(errno.ECONNREFUSED) self.assertTrue(expected_message in str(response.error), response.error)
def _handle_connect(self): err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) if err != 0: self.error = socket.error(err, os.strerror(err)) # IOLoop implementations may vary: some of them return # an error state before the socket becomes writable, so # in that case a connection failure would be handled by the # error path in _handle_events instead of here. if self._connect_future is None: gen_log.warning("Connect error on fd %s: %s", self.socket.fileno(), errno.errorcode[err]) self.close() return if self._connect_callback is not None: callback = self._connect_callback self._connect_callback = None self._run_callback(callback) if self._connect_future is not None: future = self._connect_future self._connect_future = None future.set_result(self) self._connecting = False
def _check(ret): if ret is None: errmsg = _lib.usb_strerror() else: if hasattr(ret, 'value'): ret = ret.value if ret < 0: errmsg = _lib.usb_strerror() # No error means that we need to get the error # message from the return code # Thanks to Nicholas Wheeler to point out the problem... # Also see issue #2860940 if errmsg.lower() == 'no error': errmsg = os.strerror(-ret) else: return ret raise USBError(errmsg, ret)
def formatExit(self, process): if CPU_I386: regname = "eax" elif CPU_X86_64: regname = "rax" elif CPU_PPC: regname = "result" else: raise NotImplementedError() self.result = process.getreg(regname) if self.restype.endswith("*"): text = formatAddress(self.result) else: uresult = self.result self.result = ulong2long(self.result) if self.result < 0: text = "%s (%s)" % ( self.result, strerror(-self.result)) elif not(0 <= self.result <= 9): text = "%s (%s)" % (self.result, formatWordHex(uresult)) else: text = str(self.result) return text
def decorator(minutes=1, error_message=os.strerror(errno.ETIME)): def dec(func): def _handle_timeout(signum, frame): msg = 'Timeout Error: %s' % (error_message) add_test_note(msg) raise TimeoutException(error_message) def wrapper(*args, **kwargs): if minutes > 0: signal.signal(signal.SIGALRM, _handle_timeout) signal.alarm(int(minutes * 60)) try: result = func(*args, **kwargs) finally: signal.alarm(0) return result return wraps(func)(wrapper) return dec
def __init__(self, cloexec=True, nonblock=True): self._init1, self._add_watch, self._rm_watch, self._read = load_inotify() flags = 0 if cloexec: flags |= self.CLOEXEC if nonblock: flags |= self.NONBLOCK self._inotify_fd = self._init1(flags) if self._inotify_fd == -1: raise INotifyError(os.strerror(ctypes.get_errno())) self._buf = ctypes.create_string_buffer(5000) self.fenc = get_preferred_file_name_encoding() self.hdr = struct.Struct(b'iIII') # We keep a reference to os to prevent it from being deleted # during interpreter shutdown, which would lead to errors in the # __del__ method self.os = os
def read(self, get_name=True): buf = [] while True: num = self._read(self._inotify_fd, self._buf, len(self._buf)) if num == 0: break if num < 0: en = ctypes.get_errno() if en == errno.EAGAIN: break # No more data if en == errno.EINTR: continue # Interrupted, try again raise OSError(en, self.os.strerror(en)) buf.append(self._buf.raw[:num]) raw = b''.join(buf) pos = 0 lraw = len(raw) while lraw - pos >= self.hdr.size: wd, mask, cookie, name_len = self.hdr.unpack_from(raw, pos) pos += self.hdr.size name = None if get_name: name = raw[pos:pos + name_len].rstrip(b'\0') pos += name_len self.process_event(wd, mask, cookie, name)
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)): def decorator(func): def _handle_timeout(signum, frame): raise TimeoutError(error_message) def wrapper(*args, **kwargs): signal.signal(signal.SIGALRM, _handle_timeout) signal.alarm(seconds) try: result = func(*args, **kwargs) finally: signal.alarm(0) return result return wraps(func)(wrapper) return decorator # special test case for running ssh commands at module level
def chdir(self, path): """ Change the "current directory" of this SFTP session. Since SFTP doesn't really have the concept of a current working directory, this is emulated by paramiko. Once you use this method to set a working directory, all operations on this SFTPClient object will be relative to that path. You can pass in C{None} to stop using a current working directory. @param path: new current working directory @type path: str @raise IOError: if the requested path doesn't exist on the server @since: 1.4 """ if path is None: self._cwd = None return if not stat.S_ISDIR(self.stat(path).st_mode): raise SFTPError(errno.ENOTDIR, "%s: %s" % (os.strerror(errno.ENOTDIR), path)) self._cwd = self.normalize(path).encode('utf-8')
def ccid_xfr_block(self, data, timeout=0.1): """Encapsulate host command *data* into an PC/SC Escape command to send to the device and extract the chip response if received within *timeout* seconds. """ frame = struct.pack("<BI5B", 0x6F, len(data), 0, 0, 0, 0, 0) + data self.transport.write(bytearray(frame)) frame = self.transport.read(int(timeout * 1000)) if not frame or len(frame) < 10: log.error("insufficient data for decoding ccid response") raise IOError(errno.EIO, os.strerror(errno.EIO)) if frame[0] != 0x80: log.error("expected a RDR_to_PC_DataBlock") raise IOError(errno.EIO, os.strerror(errno.EIO)) if len(frame) != 10 + struct.unpack("<I", buffer(frame, 1, 4))[0]: log.error("RDR_to_PC_DataBlock length mismatch") raise IOError(errno.EIO, os.strerror(errno.EIO)) return frame[10:]
def command(self, cmd_code, cmd_data, timeout): """Send a host command and return the chip response. """ log.log(logging.DEBUG-1, self.CMD[cmd_code]+" "+hexlify(cmd_data)) frame = bytearray([0xD4, cmd_code]) + bytearray(cmd_data) frame = bytearray([0xFF, 0x00, 0x00, 0x00, len(frame)]) + frame frame = self.ccid_xfr_block(frame, timeout) if not frame or len(frame) < 4: log.error("insufficient data for decoding chip response") raise IOError(errno.EIO, os.strerror(errno.EIO)) if not (frame[0] == 0xD5 and frame[1] == cmd_code + 1): log.error("received invalid chip response") raise IOError(errno.EIO, os.strerror(errno.EIO)) if not (frame[-2] == 0x90 and frame[-1] == 0x00): log.error("received pseudo apdu with error status") raise IOError(errno.EIO, os.strerror(errno.EIO)) return frame[2:-2]
def read(self, timeout): if self.tty is not None: self.tty.timeout = max(timeout/1E3, 0.05) frame = bytearray(self.tty.read(6)) if frame is None or len(frame) == 0: raise IOError(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT)) if frame.startswith(b"\x00\x00\xff\x00\xff\x00"): log.log(logging.DEBUG-1, "<<< %s", str(frame).encode("hex")) return frame LEN = frame[3] if LEN == 0xFF: frame += self.tty.read(3) LEN = frame[5] << 8 | frame[6] frame += self.tty.read(LEN + 1) log.log(logging.DEBUG-1, "<<< %s", hexlify(frame)) return frame
def read(self, timeout=0): if self.usb_inp is not None: try: ep_addr = self.usb_inp.getAddress() frame = self.usb_dev.bulkRead(ep_addr, 300, timeout) except libusb.USBErrorTimeout: raise IOError(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT)) except libusb.USBErrorNoDevice: raise IOError(errno.ENODEV, os.strerror(errno.ENODEV)) except libusb.USBError as error: log.error("%r", error) raise IOError(errno.EIO, os.strerror(errno.EIO)) if len(frame) == 0: log.error("bulk read returned zero data") raise IOError(errno.EIO, os.strerror(errno.EIO)) frame = bytearray(frame) log.log(logging.DEBUG-1, "<<< %s", hexlify(frame)) return frame
def __init__(self, chipset, logger): self.chipset = chipset self.log = logger try: chipset_communication = self.chipset.diagnose('line') except Chipset.Error: chipset_communication = False if chipset_communication is False: self.log.error("chipset communication test failed") raise IOError(errno.EIO, os.strerror(errno.EIO)) # for line in self._print_ciu_register_page(0, 1, 2, 3): # self.log.debug(line) # for addr in range(0, 0x03FF, 16): # xram = self.chipset.read_register(*range(addr, addr+16)) # xram = ' '.join(["%02X" % x for x in xram]) # self.log.debug("0x%04X: %s", addr, xram)
def formatError(self, errorcode): """ Returns the string associated with a Windows error message, such as the ones found in socket.error. Attempts direct lookup against the win32 API via ctypes and then pywin32 if available), then in the error table in the socket module, then finally defaulting to C{os.strerror}. @param errorcode: the Windows error code @type errorcode: C{int} @return: The error message string @rtype: C{str} """ if self.winError is not None: return self.winError(errorcode)[1] if self.formatMessage is not None: return self.formatMessage(errorcode) if self.errorTab is not None: result = self.errorTab.get(errorcode) if result is not None: return result return os.strerror(errorcode)
def _serve_one_listener(listener, handler_nursery, handler): async with listener: while True: try: stream = await listener.accept() except OSError as exc: if exc.errno in ACCEPT_CAPACITY_ERRNOS: LOGGER.error( "accept returned %s (%s); retrying in %s seconds", errno.errorcode[exc.errno], os.strerror(exc.errno), SLEEP_TIME, exc_info=True ) await trio.sleep(SLEEP_TIME) else: raise else: handler_nursery.start_soon(_run_handler, stream, handler)
def formatError(self, errorcode): """ Returns the string associated with a Windows error message, such as the ones found in socket.error. Attempts direct lookup against the win32 API via ctypes and then pywin32 if available), then in the error table in the socket module, then finally defaulting to C{os.strerror}. @param errorcode: the Windows error code @type errorcode: C{int} @return: The error message string @rtype: C{str} """ if self.winError is not None: return str(self.winError(errorcode)) if self.formatMessage is not None: return self.formatMessage(errorcode) if self.errorTab is not None: result = self.errorTab.get(errorcode) if result is not None: return result return os.strerror(errorcode)