我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用ftplib.FTP_TLS。
def test_context(self): self.client.quit() ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, keyfile=CERTFILE, context=ctx) self.client = ftplib.FTP_TLS(context=ctx, timeout=10) self.client.connect(self.server.host, self.server.port) self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) self.client.auth() self.assertIs(self.client.sock.context, ctx) self.assertIsInstance(self.client.sock, ssl.SSLSocket) self.client.prot_p() with self.client.transfercmd('list') as sock: self.assertIs(sock.context, ctx) self.assertIsInstance(sock, ssl.SSLSocket)
def __init__(self, hostname, root_path, base_url, username=None, password=None, passive=True, secure=False, **kwargs): if isinstance(hostname, FTP): self.ftp_client = hostname else: # pragma: nocover if secure: self.ftp_client = FTP_TLS(host=hostname, user=username, passwd=password, **kwargs) # noinspection PyUnresolvedReferences self.ftp_client.prot_p() else: self.ftp_client = FTP(host=hostname, user=username, passwd=password, **kwargs) self.ftp_client.set_pasv(passive) self.root_path = root_path self.base_url = base_url.rstrip('/')
def test_context(self): self.client.quit() ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, keyfile=CERTFILE, context=ctx) self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) self.client.connect(self.server.host, self.server.port) self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) self.client.auth() self.assertIs(self.client.sock.context, ctx) self.assertIsInstance(self.client.sock, ssl.SSLSocket) self.client.prot_p() sock = self.client.transfercmd('list') try: self.assertIs(sock.context, ctx) self.assertIsInstance(sock, ssl.SSLSocket) finally: sock.close()
def test_context(self): self.client.quit() ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, keyfile=CERTFILE, context=ctx) self.client = ftplib.FTP_TLS(context=ctx, timeout=2) self.client.connect(self.server.host, self.server.port) self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) self.client.auth() self.assertIs(self.client.sock.context, ctx) self.assertIsInstance(self.client.sock, ssl.SSLSocket) self.client.prot_p() with self.client.transfercmd('list') as sock: self.assertIs(sock.context, ctx) self.assertIsInstance(sock, ssl.SSLSocket)
def __init__(self): self.name = "" self.commands = {} self.commands_function = {} self.user = "" self.server = "" self.password = "" self.port = 0 self.copy_dir = "" self.dest = "" self.assets = "" self.ftp = FTP_TLS() self.folder = ".tfc" self.config_file = "config.ini" self.config_path = self.folder + os.path.sep + self.config_file self.config_file_content = "" self.db_name = "ftp_files.db" self.db_path = self.folder + os.path.sep + self.db_name self.place_holder_config = "files/config.ini" self.help_path = "files/help.json" self.time_out = 300 self.time = 0
def test_context(self): self.client.quit() ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, context=ctx) self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE, keyfile=CERTFILE, context=ctx) self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) self.client.connect(self.server.host, self.server.port) self.assertNotIsInstance(self.client.sock, ssl.SSLSocket) self.client.auth() self.assertIs(self.client.sock.context, ctx) self.assertIsInstance(self.client.sock, ssl.SSLSocket) self.client.prot_p() with self.client.transfercmd('list') as sock: self.assertIs(sock.context, ctx) self.assertIsInstance(sock, ssl.SSLSocket)
def __init__(self, host, user, password): ftplib.FTP_TLS.__init__(self) self.connect(host, 21) self.login(user, password) # Set up encrypted data connection. self.prot_p()
def get_conn(self): """ Returns a FTPS connection object. """ if self.conn is None: params = self.get_connection(self.ftp_conn_id) self.conn = FTP_TLS(timeout=60) self.conn.connect(params.host, params.port) self.conn.auth() self.conn.prot_p() self.conn.login(params.login, params.password) return self.conn
def upload_archive_file(self, local_filename, remote_filename, target_dir): yield("Uploading {} to FTP in directory: {}, filename: {}".format(local_filename, target_dir, remote_filename)) local_filesize = os.stat(local_filename).st_size self.upload_total = os.stat(local_filename).st_size self.upload_current = 0 if self.settings.ftps['no_certificate_check']: context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.verify_mode = ssl.CERT_NONE context.check_hostname = False else: context = ssl.create_default_context() ftps = FTP_TLS( host=self.settings.ftps['address'], user=self.settings.ftps['user'], passwd=self.settings.ftps['passwd'], context=context, source_address=self.settings.ftps[ 'source_address'], timeout=self.settings.timeout_timer ) ftps.cwd(target_dir) ftps.encoding = 'utf8' ftps.prot_p() for line in ftps.mlsd(facts=["size"]): if(line[0] == remote_filename and local_filesize == int(line[1]["size"])): yield("File exists and size is equal.") ftps.close() return with open(local_filename, "rb") as file: for retry_count in range(3): try: ftps.storbinary( 'STOR %s' % remote_filename, file, callback=lambda data, args=self.print_method: self.print_progress(data, args) ) except (ConnectionResetError, socket.timeout, TimeoutError): yield("Upload failed, retrying...") else: break yield("\nFile uploaded.") ftps.close()
def setUp(self): self.server = DummyTLS_FTPServer((HOST, 0)) self.server.start() self.client = ftplib.FTP_TLS(timeout=10) self.client.connect(self.server.host, self.server.port) # enable TLS self.client.auth() self.client.prot_p()
def setUp(self): self.server = DummyTLS_FTPServer((HOST, 0)) self.server.start() self.client = ftplib.FTP_TLS(timeout=10) self.client.connect(self.server.host, self.server.port)
def get_conn(self): """ Returns a FTPS connection object. """ if self.conn is None: params = self.get_connection(self.ftp_conn_id) if params.port: ftplib.FTP_TLS.port=params.port self.conn = ftplib.FTP_TLS( params.host, params.login, params.password ) return self.conn
def setUp(self): self.server = DummyTLS_FTPServer((HOST, 0)) self.server.start() self.client = ftplib.FTP_TLS(timeout=TIMEOUT) self.client.connect(self.server.host, self.server.port)
def test_check_hostname(self): self.client.quit() ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) ctx.verify_mode = ssl.CERT_REQUIRED ctx.check_hostname = True ctx.load_verify_locations(CAFILE) self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) # 127.0.0.1 doesn't match SAN self.client.connect(self.server.host, self.server.port) with self.assertRaises(ssl.CertificateError): self.client.auth() # exception quits connection self.client.connect(self.server.host, self.server.port) self.client.prot_p() with self.assertRaises(ssl.CertificateError): self.client.transfercmd("list").close() self.client.quit() self.client.connect("localhost", self.server.port) self.client.auth() self.client.quit() self.client.connect("localhost", self.server.port) self.client.prot_p() self.client.transfercmd("list").close()
def setUp(self): self.server = DummyTLS_FTPServer((HOST, 0)) self.server.start() self.client = ftplib.FTP_TLS(timeout=2) self.client.connect(self.server.host, self.server.port) # enable TLS self.client.auth() self.client.prot_p()
def setUp(self): self.server = DummyTLS_FTPServer((HOST, 0)) self.server.start() self.client = ftplib.FTP_TLS(timeout=2) self.client.connect(self.server.host, self.server.port)
def connect(self): if(self.readConfig()): try: self.ftp = FTP_TLS(self.server) password = self.__askPassWord() self.ftp.login(self.user, password) #self.ftp.prot_p() return True except Exception, e: print(color.TFC + "ftc " + color.WARNING + "Unable to log in...") self.__writeConfig("Config", "time", 0) return False
def do_cmd(term): session = getsession() sep_ok = getattr(term, color_secondary)(u'::') sep_bad = getattr(term, color_primary)(u'::') colors = {'highlight': getattr(term, color_primary)} echo(u'\r\n\r\n i hope you know glftpd cmds :)') # feel free to change these echoes to personalize your installation echo(u'\r\n\r\n if you dont, type quit') echo(u'\r\n\r\n basically all this is good for at the moment is for msg and request') echo (u'\r\n\r\n e.g \'msg kniffy hi\' or \'request coolthing -for:<you>\'') for _ in range(max_attempts): echo(u'\r\n\r\n{sep} tYPE cMD -> '.format(sep=sep_ok)) handle = LineEditor(max_length, colors=colors ).read() or u'' if handle.strip() == u'': continue # user says goodbye if handle.lower() in bye_u: return else: # do cmd person = session.user.handle # session.user.handle = person ftps = FTP_TLS() #ftps.set_debuglevel(2) # if you broke something, uncomment this (run it directly, not from eggdrop) ftps.connect('127.0.0.1', '1234') # enter your server and port within the quotes ftps.login('cmd', '<make this a non-sysop user please>') # enter your user and pass within the quotes (remember, not a user with privs) # ftps.login(person, auth) ftps.prot_p() ftps.sendcmd('site ' + handle) echo(u'\r\n\r\n cmd sent') ftps.quit() # echo(u'\r\n\r\n{sep} Password: '.format(sep=sep_ok)) # password = LineEditor(password_max_length, # colors=colors, # hidden=hidden_char # ).read() or u'' # user = authenticate_user(handle, password) # if not user: # echo(u'\r\n\r\n{sep} Login failed.'.format(sep=sep_bad)) # continue # goto(top_script, handle=user.handle) # echo(u'\r\n\r\n{sep} Too many authentication attempts.\r\n' # .format(sep=sep_bad))
def main(): term = getterminal() do_cmd(term) #print "hi :) running cmd.." #ftps = FTP_TLS() #ftps.set_debuglevel(2) # if you broke something, uncomment this (run it directly, not from eggdrop) #ftps.connect('127.0.0.1', '1234') # enter your server and port within the quotes #ftps.login('cmd', 'asdf') # enter your user and pass within the quotes (remember, not a user with privs) #ftps.prot_p() #ftps.sendcmd('site' + argument) #print "logged in, what cmd do you want to issue?" #FANCY = raw_input() #FANCY = raw_input(promt) #ftps.sendcmd('site ' + FANCY) # the tcl script i included will take any output from this python and spam # it into the channel.. set this how you like, or turn it off # if you're a tcl guru comment this out and make your tcl do the accounce #print "cmd sent, sleeping now zzzz" #ftps.quit() #quit()
def delete_user(term, tgt_user, point): """ Delete given user. You may delete yourself. """ _color1, _color2 = [getattr(term, _color) for _color in (color_lowlight, color_highlight)] lb, rb, colon = _color1('['), _color1(']'), _color1(':') echo(term.move(*point)) echo(u'Delete {handle} {lb}yN{rb}{colon}{clear_eos} ?\b\b' .format(handle=_color2(tgt_user.handle), rb=rb, lb=lb, colon=colon, clear_eos=term.clear_eos)) inp = term.inkey() echo(inp + term.move(point.y + 2, point.x)) if inp == u'y': if tgt_user.handle != 'anonymous': ftps = FTP_TLS() ftps.connect('127.0.0.1', '1234') ftps.login('bbs', '<please make this a special sysop user in glftpd>') ftps.prot_p() # for ssl mode ftps.sendcmd('site deluser ' + tgt_user.handle ) ftps.sendcmd('site msg sysop user ' + tgt_user.handle + ' deleted, maybe purge them too? ' ) ftps.quit() tgt_user.delete() echo(_color2('Deleted !')) time.sleep(1) return True echo(_color2('Canceled !')) time.sleep(1) return False
def setUp(self): self.server = DummyTLS_FTPServer((HOST, 0)) self.server.start() self.client = ftplib.FTP_TLS(timeout=TIMEOUT) self.client.connect(self.server.host, self.server.port) # enable TLS self.client.auth() self.client.prot_p()
def test_check_hostname(self): self.client.quit() ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) ctx.verify_mode = ssl.CERT_REQUIRED ctx.check_hostname = True ctx.load_verify_locations(CAFILE) self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT) # 127.0.0.1 doesn't match SAN self.client.connect(self.server.host, self.server.port) with self.assertRaises(ssl.CertificateError): self.client.auth() # exception quits connection self.client.connect(self.server.host, self.server.port) self.client.prot_p() with self.assertRaises(ssl.CertificateError): with self.client.transfercmd("list") as sock: pass self.client.quit() self.client.connect("localhost", self.server.port) self.client.auth() self.client.quit() self.client.connect("localhost", self.server.port) self.client.prot_p() with self.client.transfercmd("list") as sock: pass
def login(self, *args, **kwargs): ftplib.FTP_TLS.login(self, *args, **kwargs) self.prot_p()
def setUp(self): self.server = FTPSServer() self.server.start() self.client = ftplib.FTP_TLS(timeout=TIMEOUT) self.client.connect(self.server.host, self.server.port)
def _fetch_remote(source, destination, parsed_url, creds, tmp_file, tmp_path): if parsed_url['scheme'] in ('ftp', 'ftps'): if parsed_url['scheme'] == 'ftp': connection = ftplib.FTP() connection.connect(*smoke_zephyr.utilities.parse_server(parsed_url['netloc'], 21)) else: connection = ftplib.FTP_TLS() connection.connect(*smoke_zephyr.utilities.parse_server(parsed_url['netloc'], 990)) connection.login(creds.username or '', creds.password or '') connection.retrbinary('RETR ' + parsed_url['path'], tmp_file.write) connection.quit() elif parsed_url['scheme'] in ('git', 'git+ssh', 'git+http', 'git+https'): parsed_url['scheme'] = parsed_url['scheme'].split('+', 1)[-1] branch = parsed_url['fragment'] if branch: parsed_url['fragment'] = '' else: branch = None os.mkdir(destination, mode=MAKEDIR_MODE) repo = git.Repo.clone_from(urllib.parse.urlunparse(parsed_url.values()), destination) if branch is None or branch == repo.active_branch.name: return origin = repo.remotes['origin'] origin.fetch() branch_ref = next((ref for ref in repo.refs if isinstance(ref, git.RemoteReference) and ref.remote_head == branch), None) if branch_ref is None: raise ValueError('failed to find reference to remote branch name: ' + branch) branch_ref.checkout(b=branch) elif parsed_url['scheme'] in ('http', 'https'): request = urllib.request.Request(urllib.parse.urlunparse(parsed_url.values())) if creds.username is not None: request.add_header( 'Authorization', 'Basic ' + base64.b64encode("{0}:{1}".format(creds.username, creds.password).encode('utf-8')).decode('utf-8') ) url_h = urllib.request.urlopen(request) shutil.copyfileobj(url_h, tmp_file) url_h.close()
def get_conn(self): """ Returns a FTPS connection object. """ if self.conn is None: params = self.get_connection(self.ftp_conn_id) self.conn = ftplib.FTP_TLS( params.host, params.login, params.password ) return self.conn
def __init__(self, server, port=21, username="anonymous", password="", use_ssl=False): self._server = server self._port = port self._username = username self._password = password self._use_ssl = use_ssl if use_ssl: from ftplib import FTP_TLS self._ftp = FTP_TLS() else: from ftplib import FTP self._ftp = FTP()
def main(handle=u''): """ Main procedure. """ # set syncterm font, if any term = getterminal() if term.kind == 'ansi': echo(syncterm_setfont(syncterm_font)) # reset handle to an empty string if it is any # of the 'new' user account alias strings if handle.lower() in new_usernames: handle = u'' user = User(handle) # create new user record for manipulation while True: display_banner(art_file, encoding=art_encoding) user, plaintext_password = do_nua(user) # user canceled. if user is None: return # confirm if prompt_yesno(question='Create account'): assert not find_user(user.handle), ( # prevent race condition, scenario: `billy' begins new account # process, waits at 'Create account [yn] ?' prompt until a # second `billy' finishes account creation process, then the # first `billy' enters 'y', overwriting the existing account. 'Security race condition: account already exists') # real_ip = getssession().addrport ftps = FTP_TLS() ftps.connect('127.0.0.1', '1234') # this can be remote ftps.login('asdf', '<please set up a glftpd user for this>') ftps.prot_p() ftps.sendcmd('site gadduser bbsuser ' + user.handle + ' ' + plaintext_password + ' *@127.0.0.1 ' ) ftps.sendcmd('site deluser ' + user.handle ) # for validation reasons ftps.sendcmd('site msg sysop ' + user.handle + ' added, please validate them ' ) ftps.quit() user.save() goto(top_script, user.handle)