我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用exceptions.IOError()。
def connect(sdata, command, username, host, port=22, key_file=None, password=None): """ Connect to an SSH host (as it happens, persistently). """ sdata.set_conn_state('connecting') try: keys = [Key.fromFile(key_file)] if key_file else None except exceptions.IOError as e: print('### key load error:', str(e)) push_failure_message(str(e), sdata) return endpoint = SSHCommandClientEndpoint.newConnection( reactor, command, username, host, port=int(port), keys=keys, password=password, ui=None, knownHosts=PermissiveKnownHosts()) factory = Factory() factory.protocol = LineProtocol factory.sdata = sdata d = endpoint.connect(factory) # Very small race condition between here and the replacement # in connectionMade() above, but I've never managed to hit it. def disconnect(): sdata.log('Disconnecting while still attempting to connect, by request') d.cancel() sdata.transport_drop_cb = disconnect d.addErrback(lambda reason: push_failure_message(reason, sdata)) return d
def fit(self): try: for i in self.iter_fit(*self.data['train']): self.report_fun(i) except exceptions.IOError, e: pass except KeyboardInterrupt: self.quit_training() import sys sys.exit(0)
def connect(self): try: self.__client = SoapClient(self.__url + "/xmds.php?WSDL&v=" + str(self.__ver)) except exceptions.IOError, err: log.error(err) self.__client = None return self.__client is not None
def save_as(self, path): if not self.content: return 0 try: with open(path, 'w') as f: f.write(self.content) f.flush() os.fsync(f.fileno()) written = os.stat(path).st_size except IOError: written = 0 return written
def parse_file(self, path): try: with open(path, 'r') as f: return self.parse(f.read()) except IOError: return False
def download(self, sm, target_file, overwrite = False): remote_chunk = "chunks/%s" % self.get_filename() tmp_file = target_file + ".download" # if file exists, try if os.path.exists(target_file): # overwrite check if not overwrite: raise ChirriException("Chunk file '%s' already exists." % target_file) # yep! chunk is already on disk.. decide what to do... eh = chirribackup.crypto.hash_file(target_file) if eh.hash != h: # hashes doesn't match... it is surely an old partial chunk of # a previous restore operation (cancelled), delete it from disk logger.warning("Old tmp download '%s' found but corrupt. Reloading again.") os.unlink(target_file) else: # hashes match, so it is the file that we need -- continue as # usual without downloading anything logger.info("Found previous temp download '%s' with matching hash. Recycling it.") return # ok... download raw chunk sm.download_file(remote_chunk, tmp_file) # decompress it if self.compression is None: os.rename(tmp_file, target_file) else: try: decompressor = chirribackup.compression.Decompressor(self.compression, target_file) sh = chirribackup.crypto.ChirriHasher() with open(tmp_file, 'rb') as ifile: buf = ifile.read(READ_BLOCKSIZE) while len(buf) > 0: sh.update(decompressor.decompress(buf)) buf = ifile.read(READ_BLOCKSIZE) sh.update(decompressor.close()) if sh.hash != self.hash: raise ChirriException("Bad data recovered (%s, %s)" \ % (target_file, self.first_seen_as)) except exceptions.IOError, ex: os.unlink(target_file) raise ChirriException("Cannot hash & copy file '%s': %s" % (source_file, ex)) finally: os.unlink(tmp_file)