Python ftplib 模块,FTP_TLS 实例源码

我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用ftplib.FTP_TLS

项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def test_context(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          keyfile=CERTFILE, context=ctx)

        self.client = ftplib.FTP_TLS(context=ctx, timeout=10)
        self.client.connect(self.server.host, self.server.port)
        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
        self.client.auth()
        self.assertIs(self.client.sock.context, ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)

        self.client.prot_p()
        with self.client.transfercmd('list') as sock:
            self.assertIs(sock.context, ctx)
            self.assertIsInstance(sock, ssl.SSLSocket)
项目:sqlalchemy-media    作者:pylover    | 项目源码 | 文件源码
def __init__(self, hostname, root_path, base_url,
                 username=None, password=None, passive=True, secure=False, **kwargs):

        if isinstance(hostname, FTP):
            self.ftp_client = hostname

        else:  # pragma: nocover
            if secure:
                self.ftp_client = FTP_TLS(host=hostname, user=username, passwd=password, **kwargs)
                # noinspection PyUnresolvedReferences
                self.ftp_client.prot_p()

            else:
                self.ftp_client = FTP(host=hostname, user=username, passwd=password, **kwargs)

            self.ftp_client.set_pasv(passive)

        self.root_path = root_path
        self.base_url = base_url.rstrip('/')
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_context(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          keyfile=CERTFILE, context=ctx)

        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
        self.client.auth()
        self.assertIs(self.client.sock.context, ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)

        self.client.prot_p()
        sock = self.client.transfercmd('list')
        try:
            self.assertIs(sock.context, ctx)
            self.assertIsInstance(sock, ssl.SSLSocket)
        finally:
            sock.close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_context(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          keyfile=CERTFILE, context=ctx)

        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
        self.client.auth()
        self.assertIs(self.client.sock.context, ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)

        self.client.prot_p()
        sock = self.client.transfercmd('list')
        try:
            self.assertIs(sock.context, ctx)
            self.assertIsInstance(sock, ssl.SSLSocket)
        finally:
            sock.close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def test_context(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          keyfile=CERTFILE, context=ctx)

        self.client = ftplib.FTP_TLS(context=ctx, timeout=2)
        self.client.connect(self.server.host, self.server.port)
        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
        self.client.auth()
        self.assertIs(self.client.sock.context, ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)

        self.client.prot_p()
        with self.client.transfercmd('list') as sock:
            self.assertIs(sock.context, ctx)
            self.assertIsInstance(sock, ssl.SSLSocket)
项目:tfc    作者:Aborres    | 项目源码 | 文件源码
def __init__(self):
    self.name = ""
    self.commands = {}
    self.commands_function = {}
    self.user = ""
    self.server = ""
    self.password = ""
    self.port = 0
    self.copy_dir = ""
    self.dest = ""
    self.assets = ""
    self.ftp = FTP_TLS()
    self.folder = ".tfc"
    self.config_file = "config.ini"
    self.config_path = self.folder + os.path.sep + self.config_file
    self.config_file_content = ""
    self.db_name = "ftp_files.db"
    self.db_path = self.folder + os.path.sep + self.db_name
    self.place_holder_config = "files/config.ini"
    self.help_path = "files/help.json"
    self.time_out = 300
    self.time = 0
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_context(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          keyfile=CERTFILE, context=ctx)

        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
        self.client.auth()
        self.assertIs(self.client.sock.context, ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)

        self.client.prot_p()
        with self.client.transfercmd('list') as sock:
            self.assertIs(sock.context, ctx)
            self.assertIsInstance(sock, ssl.SSLSocket)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_context(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          context=ctx)
        self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
                          keyfile=CERTFILE, context=ctx)

        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
        self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
        self.client.auth()
        self.assertIs(self.client.sock.context, ctx)
        self.assertIsInstance(self.client.sock, ssl.SSLSocket)

        self.client.prot_p()
        with self.client.transfercmd('list') as sock:
            self.assertIs(sock.context, ctx)
            self.assertIsInstance(sock, ssl.SSLSocket)
项目:lama    作者:CSE-POST    | 项目源码 | 文件源码
def __init__(self, host, user, password):
        ftplib.FTP_TLS.__init__(self)
        self.connect(host, 21)
        self.login(user, password)
        # Set up encrypted data connection.
        self.prot_p()
项目:airflow-hovercraft    作者:gtoonstra    | 项目源码 | 文件源码
def get_conn(self):
        """
        Returns a FTPS connection object.
        """
        if self.conn is None:
            params = self.get_connection(self.ftp_conn_id)

            self.conn = FTP_TLS(timeout=60)
            self.conn.connect(params.host, params.port)
            self.conn.auth()
            self.conn.prot_p()
            self.conn.login(params.login, params.password)

        return self.conn
项目:pandachaika    作者:pandabuilder    | 项目源码 | 文件源码
def upload_archive_file(self, local_filename, remote_filename, target_dir):

        yield("Uploading {} to FTP in directory: {}, filename: {}".format(local_filename, target_dir, remote_filename))

        local_filesize = os.stat(local_filename).st_size
        self.upload_total = os.stat(local_filename).st_size
        self.upload_current = 0

        if self.settings.ftps['no_certificate_check']:
            context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
            context.verify_mode = ssl.CERT_NONE
            context.check_hostname = False
        else:
            context = ssl.create_default_context()
        ftps = FTP_TLS(
            host=self.settings.ftps['address'],
            user=self.settings.ftps['user'],
            passwd=self.settings.ftps['passwd'],
            context=context,
            source_address=self.settings.ftps[
                'source_address'],
            timeout=self.settings.timeout_timer
        )
        ftps.cwd(target_dir)
        ftps.encoding = 'utf8'
        ftps.prot_p()
        for line in ftps.mlsd(facts=["size"]):
            if(line[0] == remote_filename and
                    local_filesize == int(line[1]["size"])):
                yield("File exists and size is equal.")
                ftps.close()
                return
        with open(local_filename, "rb") as file:
            for retry_count in range(3):
                try:
                    ftps.storbinary(
                        'STOR %s' % remote_filename,
                        file,
                        callback=lambda data, args=self.print_method: self.print_progress(data, args)
                    )
                except (ConnectionResetError, socket.timeout, TimeoutError):
                    yield("Upload failed, retrying...")
                else:
                    break
        yield("\nFile uploaded.")
        ftps.close()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=10)
        self.client.connect(self.server.host, self.server.port)
        # enable TLS
        self.client.auth()
        self.client.prot_p()
项目:zippy    作者:securesystemslab    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=10)
        self.client.connect(self.server.host, self.server.port)
项目:incubator-airflow-old    作者:apache    | 项目源码 | 文件源码
def get_conn(self):
        """
        Returns a FTPS connection object.
        """
        if self.conn is None:
            params = self.get_connection(self.ftp_conn_id)

            if params.port:
               ftplib.FTP_TLS.port=params.port

            self.conn = ftplib.FTP_TLS(
                params.host, params.login, params.password
            )

        return self.conn
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=10)
        self.client.connect(self.server.host, self.server.port)
        # enable TLS
        self.client.auth()
        self.client.prot_p()
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
项目:oil    作者:oilshell    | 项目源码 | 文件源码
def test_check_hostname(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        ctx.verify_mode = ssl.CERT_REQUIRED
        ctx.check_hostname = True
        ctx.load_verify_locations(CAFILE)
        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)

        # 127.0.0.1 doesn't match SAN
        self.client.connect(self.server.host, self.server.port)
        with self.assertRaises(ssl.CertificateError):
            self.client.auth()
        # exception quits connection

        self.client.connect(self.server.host, self.server.port)
        self.client.prot_p()
        with self.assertRaises(ssl.CertificateError):
            self.client.transfercmd("list").close()
        self.client.quit()

        self.client.connect("localhost", self.server.port)
        self.client.auth()
        self.client.quit()

        self.client.connect("localhost", self.server.port)
        self.client.prot_p()
        self.client.transfercmd("list").close()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=10)
        self.client.connect(self.server.host, self.server.port)
        # enable TLS
        self.client.auth()
        self.client.prot_p()
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
项目:python2-tracer    作者:extremecoders-re    | 项目源码 | 文件源码
def test_check_hostname(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        ctx.verify_mode = ssl.CERT_REQUIRED
        ctx.check_hostname = True
        ctx.load_verify_locations(CAFILE)
        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)

        # 127.0.0.1 doesn't match SAN
        self.client.connect(self.server.host, self.server.port)
        with self.assertRaises(ssl.CertificateError):
            self.client.auth()
        # exception quits connection

        self.client.connect(self.server.host, self.server.port)
        self.client.prot_p()
        with self.assertRaises(ssl.CertificateError):
            self.client.transfercmd("list").close()
        self.client.quit()

        self.client.connect("localhost", self.server.port)
        self.client.auth()
        self.client.quit()

        self.client.connect("localhost", self.server.port)
        self.client.prot_p()
        self.client.transfercmd("list").close()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=2)
        self.client.connect(self.server.host, self.server.port)
        # enable TLS
        self.client.auth()
        self.client.prot_p()
项目:web_ctp    作者:molebot    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=2)
        self.client.connect(self.server.host, self.server.port)
项目:tfc    作者:Aborres    | 项目源码 | 文件源码
def connect(self):
    if(self.readConfig()):
      try:
        self.ftp = FTP_TLS(self.server)
        password = self.__askPassWord()
        self.ftp.login(self.user, password)
        #self.ftp.prot_p()
        return True
      except Exception, e:
        print(color.TFC + "ftc " + color.WARNING + "Unable to log in...")
        self.__writeConfig("Config", "time", 0)
        return False
项目:POR-84    作者:kniffy    | 项目源码 | 文件源码
def do_cmd(term):
    session = getsession()
    sep_ok = getattr(term, color_secondary)(u'::')
    sep_bad = getattr(term, color_primary)(u'::')
    colors = {'highlight': getattr(term, color_primary)}
    echo(u'\r\n\r\n i hope you know glftpd cmds :)') # feel free to change these echoes to personalize your installation
    echo(u'\r\n\r\n if you dont, type quit')
    echo(u'\r\n\r\n basically all this is good for at the moment is for msg and request')
    echo (u'\r\n\r\n e.g \'msg kniffy hi\' or \'request coolthing -for:<you>\'')
    for _ in range(max_attempts):
        echo(u'\r\n\r\n{sep} tYPE cMD -> '.format(sep=sep_ok))
        handle = LineEditor(max_length, colors=colors
                            ).read() or u''

        if handle.strip() == u'':
            continue

        # user says goodbye
        if handle.lower() in bye_u:
            return

        else:
            # do cmd
            person = session.user.handle
#            session.user.handle = person
            ftps = FTP_TLS()
            #ftps.set_debuglevel(2)                  # if you broke something, uncomment this (run it directly, not from eggdrop)
            ftps.connect('127.0.0.1', '1234')        # enter your server and port within the quotes
            ftps.login('cmd', '<make this a non-sysop user please>') # enter your user and pass within the quotes (remember, not a user with privs)
#            ftps.login(person, auth)
            ftps.prot_p()

            ftps.sendcmd('site ' + handle)

            echo(u'\r\n\r\n cmd sent')

            ftps.quit()

#            echo(u'\r\n\r\n{sep} Password: '.format(sep=sep_ok))
#            password = LineEditor(password_max_length,
#                                  colors=colors,
#                                  hidden=hidden_char
#                                  ).read() or u''

#            user = authenticate_user(handle, password)
#            if not user:
#                echo(u'\r\n\r\n{sep} Login failed.'.format(sep=sep_bad))
#                continue

#        goto(top_script, handle=user.handle)

#    echo(u'\r\n\r\n{sep} Too many authentication attempts.\r\n'
#         .format(sep=sep_bad))
项目:POR-84    作者:kniffy    | 项目源码 | 文件源码
def main():

    term = getterminal()

    do_cmd(term)


#print "hi :) running cmd.."

#ftps = FTP_TLS()
#ftps.set_debuglevel(2)                 # if you broke something, uncomment this (run it directly, not from eggdrop)
#ftps.connect('127.0.0.1', '1234')          # enter your server and port within the quotes
#ftps.login('cmd', 'asdf')          # enter your user and pass within the quotes (remember, not a user with privs)
#ftps.prot_p()

#ftps.sendcmd('site' + argument)

#print "logged in, what cmd do you want to issue?"
#FANCY = raw_input()

#FANCY = raw_input(promt)

#ftps.sendcmd('site ' + FANCY)

# the tcl script i included will take any output from this python and spam
# it into the channel.. set this how you like, or turn it off
# if you're a tcl guru comment this out and make your tcl do the accounce
#print "cmd sent, sleeping now zzzz"

#ftps.quit()

#quit()
项目:POR-84    作者:kniffy    | 项目源码 | 文件源码
def delete_user(term, tgt_user, point):
    """ Delete given user. You may delete yourself. """
    _color1, _color2 = [getattr(term, _color)
                        for _color in (color_lowlight, color_highlight)]
    lb, rb, colon = _color1('['), _color1(']'), _color1(':')

    echo(term.move(*point))
    echo(u'Delete {handle} {lb}yN{rb}{colon}{clear_eos} ?\b\b'
         .format(handle=_color2(tgt_user.handle),
                 rb=rb, lb=lb, colon=colon,
                 clear_eos=term.clear_eos))
    inp = term.inkey()
    echo(inp + term.move(point.y + 2, point.x))
    if inp == u'y':
        if tgt_user.handle != 'anonymous':

            ftps = FTP_TLS()
            ftps.connect('127.0.0.1', '1234')
            ftps.login('bbs', '<please make this a special sysop user in glftpd>')
            ftps.prot_p() # for ssl mode
            ftps.sendcmd('site deluser ' + tgt_user.handle )
            ftps.sendcmd('site msg sysop user ' + tgt_user.handle + ' deleted, maybe purge them too? ' )

            ftps.quit()

            tgt_user.delete()
        echo(_color2('Deleted !'))
        time.sleep(1)
        return True

    echo(_color2('Canceled !'))
    time.sleep(1)
    return False
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=10)
        self.client.connect(self.server.host, self.server.port)
        # enable TLS
        self.client.auth()
        self.client.prot_p()
项目:pefile.pypy    作者:cloudtracer    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=10)
        self.client.connect(self.server.host, self.server.port)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
        # enable TLS
        self.client.auth()
        self.client.prot_p()
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
项目:ouroboros    作者:pybee    | 项目源码 | 文件源码
def test_check_hostname(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        ctx.verify_mode = ssl.CERT_REQUIRED
        ctx.check_hostname = True
        ctx.load_verify_locations(CAFILE)
        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)

        # 127.0.0.1 doesn't match SAN
        self.client.connect(self.server.host, self.server.port)
        with self.assertRaises(ssl.CertificateError):
            self.client.auth()
        # exception quits connection

        self.client.connect(self.server.host, self.server.port)
        self.client.prot_p()
        with self.assertRaises(ssl.CertificateError):
            with self.client.transfercmd("list") as sock:
                pass
        self.client.quit()

        self.client.connect("localhost", self.server.port)
        self.client.auth()
        self.client.quit()

        self.client.connect("localhost", self.server.port)
        self.client.prot_p()
        with self.client.transfercmd("list") as sock:
            pass
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=10)
        self.client.connect(self.server.host, self.server.port)
        # enable TLS
        self.client.auth()
        self.client.prot_p()
项目:ndk-python    作者:gittor    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=10)
        self.client.connect(self.server.host, self.server.port)
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def login(self, *args, **kwargs):
            ftplib.FTP_TLS.login(self, *args, **kwargs)
            self.prot_p()
项目:sdk-samples    作者:cradlepoint    | 项目源码 | 文件源码
def setUp(self):
        self.server = FTPSServer()
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
项目:jesse-james    作者:zeroSteiner    | 项目源码 | 文件源码
def _fetch_remote(source, destination, parsed_url, creds, tmp_file, tmp_path):
    if parsed_url['scheme'] in ('ftp', 'ftps'):
        if parsed_url['scheme'] == 'ftp':
            connection = ftplib.FTP()
            connection.connect(*smoke_zephyr.utilities.parse_server(parsed_url['netloc'], 21))
        else:
            connection = ftplib.FTP_TLS()
            connection.connect(*smoke_zephyr.utilities.parse_server(parsed_url['netloc'], 990))
        connection.login(creds.username or '', creds.password or '')
        connection.retrbinary('RETR ' + parsed_url['path'], tmp_file.write)
        connection.quit()
    elif parsed_url['scheme'] in ('git', 'git+ssh', 'git+http', 'git+https'):
        parsed_url['scheme'] = parsed_url['scheme'].split('+', 1)[-1]
        branch = parsed_url['fragment']
        if branch:
            parsed_url['fragment'] = ''
        else:
            branch = None
        os.mkdir(destination, mode=MAKEDIR_MODE)
        repo = git.Repo.clone_from(urllib.parse.urlunparse(parsed_url.values()), destination)
        if branch is None or branch == repo.active_branch.name:
            return
        origin = repo.remotes['origin']
        origin.fetch()
        branch_ref = next((ref for ref in repo.refs if isinstance(ref, git.RemoteReference) and ref.remote_head == branch), None)
        if branch_ref is None:
            raise ValueError('failed to find reference to remote branch name: ' + branch)
        branch_ref.checkout(b=branch)
    elif parsed_url['scheme'] in ('http', 'https'):
        request = urllib.request.Request(urllib.parse.urlunparse(parsed_url.values()))
        if creds.username is not None:
            request.add_header(
                'Authorization',
                'Basic ' + base64.b64encode("{0}:{1}".format(creds.username, creds.password).encode('utf-8')).decode('utf-8')
            )
        url_h = urllib.request.urlopen(request)
        shutil.copyfileobj(url_h, tmp_file)
        url_h.close()
项目:airflow    作者:apache-airflow    | 项目源码 | 文件源码
def get_conn(self):
        """
        Returns a FTPS connection object.
        """
        if self.conn is None:
            params = self.get_connection(self.ftp_conn_id)
            self.conn = ftplib.FTP_TLS(
                params.host, params.login, params.password
            )
        return self.conn
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
        # enable TLS
        self.client.auth()
        self.client.prot_p()
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def setUp(self):
        self.server = DummyTLS_FTPServer((HOST, 0))
        self.server.start()
        self.client = ftplib.FTP_TLS(timeout=TIMEOUT)
        self.client.connect(self.server.host, self.server.port)
项目:kbe_server    作者:xiaohaoppy    | 项目源码 | 文件源码
def test_check_hostname(self):
        self.client.quit()
        ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
        ctx.verify_mode = ssl.CERT_REQUIRED
        ctx.check_hostname = True
        ctx.load_verify_locations(CAFILE)
        self.client = ftplib.FTP_TLS(context=ctx, timeout=TIMEOUT)

        # 127.0.0.1 doesn't match SAN
        self.client.connect(self.server.host, self.server.port)
        with self.assertRaises(ssl.CertificateError):
            self.client.auth()
        # exception quits connection

        self.client.connect(self.server.host, self.server.port)
        self.client.prot_p()
        with self.assertRaises(ssl.CertificateError):
            with self.client.transfercmd("list") as sock:
                pass
        self.client.quit()

        self.client.connect("localhost", self.server.port)
        self.client.auth()
        self.client.quit()

        self.client.connect("localhost", self.server.port)
        self.client.prot_p()
        with self.client.transfercmd("list") as sock:
            pass
项目:inotiftpsync    作者:Major1201    | 项目源码 | 文件源码
def __init__(self, server, port=21, username="anonymous", password="", use_ssl=False):
        self._server = server
        self._port = port
        self._username = username
        self._password = password
        self._use_ssl = use_ssl
        if use_ssl:
            from ftplib import FTP_TLS
            self._ftp = FTP_TLS()
        else:
            from ftplib import FTP
            self._ftp = FTP()
项目:POR-84    作者:kniffy    | 项目源码 | 文件源码
def main(handle=u''):
    """
    Main procedure.
    """

    # set syncterm font, if any
    term = getterminal()
    if term.kind == 'ansi':
        echo(syncterm_setfont(syncterm_font))

    # reset handle to an empty string if it is any
    # of the 'new' user account alias strings
    if handle.lower() in new_usernames:
        handle = u''

    user = User(handle)

    # create new user record for manipulation
    while True:
        display_banner(art_file, encoding=art_encoding)
        user, plaintext_password = do_nua(user)

        # user canceled.
        if user is None:
            return

        # confirm
        if prompt_yesno(question='Create account'):
            assert not find_user(user.handle), (
                # prevent race condition, scenario: `billy' begins new account
                # process, waits at 'Create account [yn] ?' prompt until a
                # second `billy' finishes account creation process, then the
                # first `billy' enters 'y', overwriting the existing account.
                'Security race condition: account already exists')

#            real_ip = getssession().addrport

            ftps = FTP_TLS()
            ftps.connect('127.0.0.1', '1234') # this can be remote
            ftps.login('asdf', '<please set up a glftpd user for this>')
            ftps.prot_p()
            ftps.sendcmd('site gadduser bbsuser ' + user.handle + ' ' + plaintext_password + ' *@127.0.0.1 ' )
            ftps.sendcmd('site deluser ' + user.handle ) # for validation reasons
            ftps.sendcmd('site msg sysop ' + user.handle + ' added, please validate them ' )

            ftps.quit()

            user.save()
            goto(top_script, user.handle)