我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用ftplib.error_temp()。
def close(self): if not self.closed: with self._lock: try: if self._write_conn is not None: self._write_conn.close() self._write_conn = None if self._read_conn is not None: self._read_conn.close() self._read_conn = None try: self.ftp.quit() except error_temp: # pragma: nocover pass finally: super(FTPFile, self).close()
def test_max_connections_per_ip(self): # Test FTPServer.max_cons_per_ip attribute with self.server.lock: self.server.server.max_cons_per_ip = 3 self.client.quit() c1 = self.client_class() c2 = self.client_class() c3 = self.client_class() c4 = self.client_class() try: c1.connect(self.server.host, self.server.port) c2.connect(self.server.host, self.server.port) c3.connect(self.server.host, self.server.port) self.assertRaises(ftplib.error_temp, c4.connect, self.server.host, self.server.port) # Make sure client has been disconnected. # socket.error (Windows) or EOFError (Linux) exception is # supposed to be raised in such a case. self.assertRaises((socket.error, EOFError), c4.sendcmd, 'noop') finally: for c in (c1, c2, c3, c4): try: c.quit() except (socket.error, EOFError): # already disconnected c.close()
def test_active_conn_error(self): # we open a socket() but avoid to invoke accept() to # reproduce this error condition: # http://code.google.com/p/pyftpdlib/source/detail?r=905 with contextlib.closing(socket.socket()) as sock: sock.bind((HOST, 0)) port = sock.getsockname()[1] self.client.sock.settimeout(.1) try: resp = self.client.sendport(HOST, port) except ftplib.error_temp as err: self.assertEqual(str(err)[:3], '425') except (socket.timeout, getattr(ssl, "SSLError", object())): pass else: self.assertNotEqual(str(resp)[:3], '200')
def list(self, dir, skip_mtime=False): month_to_int = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12} try: buffer = [] self.ftp.dir('-a ', dir, buffer.append) except ftplib.error_temp: buffer = [] self.ftp.dir(dir, buffer.append) dirs = [] files = {} for line in buffer: cols = line.split(None, 8) name = os.path.split(cols[8])[1] if cols[0] == 'total' or name in ('.', '..'): continue if cols[0].startswith('d'): dirs.append(name) else: if skip_mtime: mtime = 0 else: month = month_to_int[cols[5]] day = int(cols[6]) if cols[7].find(':') == -1: year = int(cols[7]) hour = minute = 0 else: year = datetime.date.today().year hour, minute = [int(s) for s in cols[7].split(':')] mtime = datetime.datetime(year, month, day, hour, minute) mtime = int(time.mktime(mtime.timetuple())) size = int(cols[4]) files[name] = { 'size': size, 'mtime': mtime, } return (dirs, files)
def test_exceptions(self): self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 400') self.assertRaises(ftplib.error_temp, self.client.sendcmd, 'echo 499') self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 500') self.assertRaises(ftplib.error_perm, self.client.sendcmd, 'echo 599') self.assertRaises(ftplib.error_proto, self.client.sendcmd, 'echo 999')
def test_all_errors(self): exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm, ftplib.error_proto, ftplib.Error, IOError, EOFError) for x in exceptions: try: raise x('exception not included in all_errors set') except ftplib.all_errors: pass
def ftp_errors(fs, path=None): try: with fs._lock: yield except socket.error: raise errors.RemoteConnectionError( msg='unable to connect to {}'.format(fs.host) ) except error_temp as error: if path is not None: raise errors.ResourceError( path, msg="ftp error on resource '{}' ({})".format(path, error) ) else: raise errors.OperationFailed( msg='ftp error ({})'.format(error) ) except error_perm as error: code, message = _parse_ftp_error(error) if code == 552: raise errors.InsufficientStorage( path=path, msg=message ) elif code in (501, 550): raise errors.ResourceNotFound(path=path) raise errors.PermissionDenied( msg=message )
def test_all_errors(self): exceptions = (ftplib.error_reply, ftplib.error_temp, ftplib.error_perm, ftplib.error_proto, ftplib.Error, OSError, EOFError) for x in exceptions: try: raise x('exception not included in all_errors set') except ftplib.all_errors: pass
def test_stou_rest(self): # Watch for STOU preceded by REST, which makes no sense. self.client.sendcmd('type i') self.client.sendcmd('rest 10') self.assertRaisesRegex(ftplib.error_temp, "Can't STOU while REST", self.client.sendcmd, 'stou')
def test_appe_rest(self): # Watch for APPE preceded by REST, which makes no sense. self.client.sendcmd('type i') self.client.sendcmd('rest 10') self.assertRaisesRegex(ftplib.error_temp, "Can't APPE while REST", self.client.sendcmd, 'appe x')
def test_max_connections(self): # Test FTPServer.max_cons attribute with self.server.lock: self.server.server.max_cons = 3 self.client.quit() c1 = self.client_class() c2 = self.client_class() c3 = self.client_class() try: c1.connect(self.server.host, self.server.port) c2.connect(self.server.host, self.server.port) self.assertRaises(ftplib.error_temp, c3.connect, self.server.host, self.server.port) # with passive data channel established c2.quit() c1.login(USER, PASSWD) c1.makepasv() self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, self.server.port) # with passive data socket waiting for connection c1.login(USER, PASSWD) c1.sendcmd('pasv') self.assertRaises(ftplib.error_temp, c2.connect, self.server.host, self.server.port) # with active data channel established c1.login(USER, PASSWD) with contextlib.closing(c1.makeport()): self.assertRaises( ftplib.error_temp, c2.connect, self.server.host, self.server.port) finally: for c in (c1, c2, c3): try: c.quit() except (socket.error, EOFError): # already disconnected c.close()
def test_on_incomplete_file_received(self): _file = [] class TestHandler(FTPHandler): def on_incomplete_file_received(self, file): _file.append(file) self._setUp(TestHandler) data = b'abcde12345' * 100000 self.dummyfile.write(data) self.dummyfile.seek(0) with contextlib.closing( self.client.transfercmd('stor ' + TESTFN)) as conn: bytes_sent = 0 while True: chunk = self.dummyfile.read(BUFSIZE) conn.sendall(chunk) bytes_sent += len(chunk) # stop transfer while it isn't finished yet if bytes_sent >= INTERRUPTED_TRANSF_SIZE or not chunk: self.client.putcmd('abor') break self.assertRaises(ftplib.error_temp, self.client.getresp) # 426 self.client.quit() # prevent race conditions call_until(lambda: _file, "ret == [os.path.abspath(TESTFN)]")
def main_run(self, args): """ .. py:attribute:: main_run() Run threads by passing a leading directory to `traverse_branch` function. :param args: a tuple contain root and another tuple contains base and leadings. The root is the path of parent directory (assigned to a process) base is a tuple contain the path of sub-directory and file names that are associated with. :type args: iterable :rtype: None """ if self.resume: root, leadings = args base = ['/'] else: root, (base, leadings) = args print ('---' * 5, datetime.now(), '{}'.format(root), '---' * 5) try: # base, leadings = self.find_leading(root) # print("base and leadings for {} --> {}, {}".format(root, base, leadings)) if not self.resume: leadings = [(ospath.join('/', root, i.strip('/')), root) for i in leadings] if leadings: pool = ThreadPool() pool.map(self.traverse_branch, leadings) pool.close() pool.join() else: self.all_path.put(base[0]) except (ftplib.error_temp, ftplib.error_perm, socket.gaierror) as exp: print(exp)
def _retreive_file_lines(self, filename_format, station, year): string = BytesIO() if self.ftp is None: self.ftp = self._get_ftp_connection() for station_id in self._get_potential_station_ids(station): filename = filename_format.format(station=station_id, year=year) try: self.ftp.retrbinary('RETR {}'.format(filename), string.write) except (IOError, ftplib.error_perm) as e1: logger.warn( "Failed FTP RETR for station {}: {}." " Not attempting reconnect." .format(station_id, e1) ) except (ftplib.error_temp, EOFError) as e2: # Bad connection. attempt to reconnect. logger.warn( "Failed FTP RETR for station {}: {}." " Attempting reconnect." .format(station_id, e2) ) self.ftp.close() self.ftp = self._get_ftp_connection() try: self.ftp.retrbinary('RETR {}'.format(filename), string.write) except (IOError, ftplib.error_perm) as e3: logger.warn( "Failed FTP RETR for station {}: {}." " Trying another station id." .format(station_id, e3) ) else: break else: break logger.info( 'Successfully retrieved ftp://ftp.ncdc.noaa.gov{}' .format(filename) ) string.seek(0) f = gzip.GzipFile(fileobj=string) lines = f.readlines() string.close() return lines