我们从Python开源项目中,提取了以下33个代码示例,用于说明如何使用ftplib.Error()。
def test_stor(self): try: data = b'abcde12345' * 100000 self.dummy_sendfile.write(data) self.dummy_sendfile.seek(0) self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile) self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write) self.dummy_recvfile.seek(0) datafile = self.dummy_recvfile.read() self.assertEqual(len(data), len(datafile)) self.assertEqual(hash(data), hash(datafile)) finally: # We do not use os.remove() because file could still be # locked by ftpd thread. If DELE through FTP fails try # os.remove() as last resort. if os.path.exists(TESTFN): try: self.client.delete(TESTFN) except (ftplib.Error, EOFError, socket.error): safe_remove(TESTFN)
def test_all_errors(self): exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm, ftplib.error_proto, ftplib.Error, IOError, EOFError) for x in exceptions: try: raise x('exception not included in all_errors set') except ftplib.all_errors: pass
def test_line_too_long(self): self.assertRaises(ftplib.Error, self.client.sendcmd, 'x' * self.client.maxline * 2)
def test_retrlines_too_long(self): self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2)) received = [] self.assertRaises(ftplib.Error, self.client.retrlines, 'retr', received.append)
def test_storlines_too_long(self): f = StringIO.StringIO('x' * self.client.maxline * 2) self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
def test_storlines_too_long(self): f = io.BytesIO(b'x' * self.client.maxline * 2) self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
def test_all_errors(self): exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm, ftplib.error_proto, ftplib.Error, OSError, EOFError) for x in exceptions: try: raise x('exception not included in all_errors set') except ftplib.all_errors: pass
def test_stor_ascii(self): # Test STOR in ASCII mode def store(cmd, fp, blocksize=8192): # like storbinary() except it sends "type a" instead of # "type i" before starting the transfer self.client.voidcmd('type a') with contextlib.closing(self.client.transfercmd(cmd)) as conn: while True: buf = fp.read(blocksize) if not buf: break conn.sendall(buf) return self.client.voidresp() try: data = b'abcde12345\r\n' * 100000 self.dummy_sendfile.write(data) self.dummy_sendfile.seek(0) store('stor ' + TESTFN, self.dummy_sendfile) self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write) expected = data.replace(b'\r\n', b(os.linesep)) self.dummy_recvfile.seek(0) datafile = self.dummy_recvfile.read() self.assertEqual(len(expected), len(datafile)) self.assertEqual(hash(expected), hash(datafile)) finally: # We do not use os.remove() because file could still be # locked by ftpd thread. If DELE through FTP fails try # os.remove() as last resort. if os.path.exists(TESTFN): try: self.client.delete(TESTFN) except (ftplib.Error, EOFError, socket.error): safe_remove(TESTFN)
def test_stor_ascii_2(self): # Test that no extra extra carriage returns are added to the # file in ASCII mode in case CRLF gets truncated in two chunks # (issue 116) def store(cmd, fp, blocksize=8192): # like storbinary() except it sends "type a" instead of # "type i" before starting the transfer self.client.voidcmd('type a') with contextlib.closing(self.client.transfercmd(cmd)) as conn: while True: buf = fp.read(blocksize) if not buf: break conn.sendall(buf) return self.client.voidresp() old_buffer = DTPHandler.ac_in_buffer_size try: # set a small buffer so that CRLF gets delivered in two # separate chunks: "CRLF", " f", "oo", " CR", "LF", " b", "ar" DTPHandler.ac_in_buffer_size = 2 data = b'\r\n foo \r\n bar' self.dummy_sendfile.write(data) self.dummy_sendfile.seek(0) store('stor ' + TESTFN, self.dummy_sendfile) expected = data.replace(b'\r\n', b(os.linesep)) self.client.retrbinary('retr ' + TESTFN, self.dummy_recvfile.write) self.dummy_recvfile.seek(0) self.assertEqual(expected, self.dummy_recvfile.read()) finally: DTPHandler.ac_in_buffer_size = old_buffer # We do not use os.remove() because file could still be # locked by ftpd thread. If DELE through FTP fails try # os.remove() as last resort. if os.path.exists(TESTFN): try: self.client.delete(TESTFN) except (ftplib.Error, EOFError, socket.error): safe_remove(TESTFN)
def test_stou(self): data = b'abcde12345' * 100000 self.dummy_sendfile.write(data) self.dummy_sendfile.seek(0) self.client.voidcmd('TYPE I') # filename comes in as "1xx FILE: <filename>" filename = self.client.sendcmd('stou').split('FILE: ')[1] try: with contextlib.closing(self.client.makeport()) as sock: conn, sockaddr = sock.accept() with contextlib.closing(conn): conn.settimeout(TIMEOUT) if hasattr(self.client_class, 'ssl_version'): conn = ssl.wrap_socket(conn) while True: buf = self.dummy_sendfile.read(8192) if not buf: break conn.sendall(buf) # transfer finished, a 226 response is expected self.assertEqual('226', self.client.voidresp()[:3]) self.client.retrbinary('retr ' + filename, self.dummy_recvfile.write) self.dummy_recvfile.seek(0) datafile = self.dummy_recvfile.read() self.assertEqual(len(data), len(datafile)) self.assertEqual(hash(data), hash(datafile)) finally: # We do not use os.remove() because file could still be # locked by ftpd thread. If DELE through FTP fails try # os.remove() as last resort. if os.path.exists(filename): try: self.client.delete(filename) except (ftplib.Error, EOFError, socket.error): safe_remove(filename)
def test_appe(self): try: data1 = b'abcde12345' * 100000 self.dummy_sendfile.write(data1) self.dummy_sendfile.seek(0) self.client.storbinary('stor ' + TESTFN, self.dummy_sendfile) data2 = b'fghil67890' * 100000 self.dummy_sendfile.write(data2) self.dummy_sendfile.seek(len(data1)) self.client.storbinary('appe ' + TESTFN, self.dummy_sendfile) self.client.retrbinary("retr " + TESTFN, self.dummy_recvfile.write) self.dummy_recvfile.seek(0) datafile = self.dummy_recvfile.read() self.assertEqual(len(data1 + data2), len(datafile)) self.assertEqual(hash(data1 + data2), hash(datafile)) finally: # We do not use os.remove() because file could still be # locked by ftpd thread. If DELE through FTP fails try # os.remove() as last resort. if os.path.exists(TESTFN): try: self.client.delete(TESTFN) except (ftplib.Error, EOFError, socket.error): safe_remove(TESTFN)
def test_abor_during_transfer(self): # Case 4: ABOR while a data transfer on DTP channel is in # progress: close data channel, respond with 426, respond # with 226. data = b'abcde12345' * 1000000 with open(TESTFN, 'w+b') as f: f.write(data) try: self.client.voidcmd('TYPE I') with contextlib.closing( self.client.transfercmd('retr ' + TESTFN)) as conn: bytes_recv = 0 while bytes_recv < 65536: chunk = conn.recv(BUFSIZE) bytes_recv += len(chunk) # stop transfer while it isn't finished yet self.client.putcmd('ABOR') # transfer isn't finished yet so ftpd should respond with 426 self.assertEqual(self.client.getline()[:3], "426") # transfer successfully aborted, so should now respond # with a 226 self.assertEqual('226', self.client.voidresp()[:3]) finally: # We do not use os.remove() because file could still be # locked by ftpd thread. If DELE through FTP fails try # os.remove() as last resort. try: self.client.delete(TESTFN) except (ftplib.Error, EOFError, socket.error): safe_remove(TESTFN)
def cmdresp(self, cmd): """Send a command and return response, also if the command failed.""" try: return self.client.sendcmd(cmd) except ftplib.Error as err: return str(err)