我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用imaplib.IMAP4。
def init_server(self): self.logger.info("Initializing SMTP/IMAP servers.") # PySMS at minimum uses smtp server try: if self.ssl: self.smtp = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) else: self.smtp = smtplib.SMTP(self.smtp_server, self.smtp_port) self.smtp.starttls() self.smtp.login(self.address, self.password) except smtplib.SMTPException: raise PySMSException("Unable to start smtp server, please check credentials.") # If responding functionality is enabled if self.imap_server: try: if self.ssl: self.imap = imaplib.IMAP4_SSL(self.imap_server) else: self.imap = imaplib.IMAP4(self.imap_server) r, data = self.imap.login(self.address, self.password) if r == "OK": r, data = self.imap.select(self.imap_mailbox) if r != "OK": raise PySMSException("Unable to select mailbox: {0}".format(self.imap_mailbox)) else: raise PySMSException("Unable to login to IMAP server with given credentials.") except imaplib.IMAP4.error: raise PySMSException("Unable to start IMAP server, please check address and SSL/TLS settings.")
def __init__(self, hostname, port=None, ssl=True): self.hostname = hostname self.port = port kwargs = {} if ssl: self.transport = IMAP4_SSL if not self.port: self.port = 993 else: self.transport = IMAP4 if not self.port: self.port = 143 self.server = self.transport(self.hostname, self.port) logger.debug("Created IMAP4 transport for {host}:{port}" .format(host=self.hostname, port=self.port))
def connect(self): """Connects to and authenticates with an IMAP4 mail server""" import imaplib M = None try: if (self.keyfile and self.certfile) or self.ssl: M = imaplib.IMAP4_SSL(self.host, self.port, self.keyfile, self.certfile) else: M = imaplib.IMAP4(self.host, self.port) M.login(self.username, self.password) M.select() except socket.error, err: raise else: return M
def __init__(self, host='', port=None, ssl=True, keyfile=None, certfile=None, ssl_context=None): """ :param host: host's name (default: localhost) :param port: port number (default: standard IMAP4 SSL port) :param ssl: use client class over SSL connection (IMAP4_SSL) if True, else use IMAP4 :param keyfile: PEM formatted file that contains your private key (default: None) :param certfile: PEM formatted certificate chain file (default: None) :param ssl_context: SSLContext object that contains your certificate chain and private key (default: None) Note: if ssl_context is provided, then parameters keyfile or certfile should not be set otherwise ValueError is raised. """ self._host = host self._port = port self._keyfile = keyfile self._certfile = certfile self._ssl_context = ssl_context if ssl: self.box = imaplib.IMAP4_SSL( host, port or imaplib.IMAP4_SSL_PORT, keyfile, certfile, ssl_context) else: self.box = imaplib.IMAP4(host, port or imaplib.IMAP4_PORT) self._username = None self._password = None self._initial_folder = None self.folder = None
def test_aborted_authentication(self): class MyServer(SimpleIMAPHandler): def cmd_AUTHENTICATE(self, tag, args): self._send_textline('+') self.response = yield if self.response == b'*\r\n': self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted') else: self._send_tagged(tag, 'OK', 'MYAUTH successful') with self.reaped_pair(MyServer) as (server, client): with self.assertRaises(imaplib.IMAP4.error): code, data = client.authenticate('MYAUTH', lambda x: None)
def connect(self): if self.source_id and self.source_id.id: self.ensure_one() if self.source_id.type == 'imap': if self.source_id.is_ssl: connection = IMAP4_SSL(self.source_id.server, int(self.source_id.port)) else: connection = IMAP4(self.source_id.server, int(self.source_id.port)) connection.login(self.user, self.password) elif self.type == 'pop': if self.source_id.is_ssl: connection = POP3_SSL(self.source_id.server, int(self.source_id.port)) else: connection = POP3(self.source_id.server, int(self.source_id.port)) # TODO: use this to remove only unread messages # connection.user("recent:"+server.user) connection.user(self.user) connection.pass_(self.password) # Add timeout on socket connection.sock.settimeout(MAIL_TIMEOUT) return connection return super(vieterp_fetchmail_server, self).connect()
def connect(self, mailserver, port="143"): self.mailserver = mailserver self.port = port try: self.srv = imaplib.IMAP4(self.mailserver) except: self.srv = None pass
def validate(self, user, password): if (not self.srv): return self.user = user self.password = password try: self.srv.login(user, password) except ssl.SSLError as e: return False except imaplib.IMAP4.error as e: return False return True
def run(self): value = getword() try: print "-"*12 print "User:",user[:-1],"Password:",value M = imaplib.IMAP4(ip) M = login(user[:-1], value) print "\t\nLogin successful:",user, value M.close() M.logout() work.join() sys.exit(2) except(IMAP4.error, socket.gaierror, socket.error, socket.herror), msg: print "An error occurred:", msg pass
def run(self): value = getword() try: print "-"*12 print "User:",user[:-1],"Password:",value M = imaplib.IMAP4(ipaddr[0]) M = login(user[:-1], value) print "\t\nLogin successful:",user, value M.close() M.logout() work.join() sys.exit(2) except(IMAP4.error, socket.gaierror, socket.error, socket.herror), msg: print "An error occurred:", msg pass
def run(self): value, user = getword() try: print "-"*12 print "User:",user,"Password:",value M = imaplib.IMAP4(sys.argv[1]) M = login(user, value) print "\t\nLogin successful:",user, value M.close() M.logout() work.join() sys.exit(2) except(IMAP4.error, socket.gaierror, socket.error, socket.herror), msg: print "An error occurred:", msg pass
def __init__(self, host, port, timeout=None): self._timeout = timeout imaplib.IMAP4.__init__(self, host, port)
def run_mailbox(self, min_delay=5.0, max_delay=60.0): mailbox = None try: while True: item = yield idiokit.next() while True: delay = min(min_delay, max_delay) while mailbox is None: try: mailbox = yield idiokit.thread(self.connect) except (imaplib.IMAP4.abort, socket.error) as error: self.log.error("Failed IMAP connection ({0})".format(utils.format_exception(error))) else: break self.log.info("Retrying connection in {0:.2f} seconds".format(delay)) yield idiokit.sleep(delay) delay = min(2 * delay, max_delay) event, name, args, keys = item if event.result().unsafe_is_set(): break try: method = getattr(mailbox, name) result = yield idiokit.thread(method, *args, **keys) except (imaplib.IMAP4.abort, socket.error) as error: yield idiokit.thread(self.disconnect, mailbox) self.log.error("Lost IMAP connection ({0})".format(utils.format_exception(error))) mailbox = None except imaplib.IMAP4.error as error: event.fail(type(error), error, None) break else: event.succeed(result) break finally: if mailbox is not None: yield idiokit.thread(self.disconnect, mailbox)
def connect(self): self.log.info("Connecting to IMAP server {0!r} port {1}".format( self.mail_server, self.mail_port)) if self.mail_disable_ssl: mailbox = _IMAP4( self.mail_server, self.mail_port, timeout=self.mail_connection_timeout ) else: mailbox = _IMAP4_SSL( self.mail_server, self.mail_port, timeout=self.mail_connection_timeout, ca_certs=self.mail_ca_certs ) self.log.info("Logging in to IMAP server {0!r} port {1}".format( self.mail_server, self.mail_port)) mailbox.login(self.mail_user, self.mail_password) try: status, msgs = mailbox.select(self.mail_box, readonly=False) if status != "OK": for msg in msgs: raise imaplib.IMAP4.abort(msg) except: mailbox.logout() raise self.log.info("Logged in to IMAP server {0!r} port {1}".format( self.mail_server, self.mail_port)) return mailbox
def disconnect(self, mailbox): try: mailbox.close() except (imaplib.IMAP4.error, socket.error): pass try: mailbox.logout() except (imaplib.IMAP4.error, socket.error): pass
def compute_state(self, key): if not key.username or not key.password: self.warn('Username and password are not configured') return None if key.use_ssl: mail = IMAP4_SSL(key.server, key.port) else: mail = IMAP4(key.server, key.port) mail.login(key.username, key.password) rc, message = mail.status(key.folder, '(UNSEEN)') unread_str = message[0].decode('utf-8') unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1)) return unread_count
def __init__(self, target): # Target comes as protocol://target:port self.target = target proto, host, port = target.split(':') host = host[2:] if int(port) == 993 or proto.upper() == 'IMAPS': self.session = imaplib.IMAP4_SSL(host,int(port)) else: #assume non-ssl IMAP self.session = imaplib.IMAP4(host,port) if 'AUTH=NTLM' not in self.session.capabilities: logging.error('IMAP server does not support NTLM authentication!') return False self.authtag = self.session._new_tag() self.lastresult = None
def is_authenticated(self, user, password): host = "" if self.configuration.has_option("auth", "imap_host"): host = self.configuration.get("auth", "imap_host") secure = True if self.configuration.has_option("auth", "imap_secure"): secure = self.configuration.getboolean("auth", "imap_secure") try: if ":" in host: address, port = host.rsplit(":", maxsplit=1) else: address, port = host, 143 address, port = address.strip("[] "), int(port) except ValueError as e: raise RuntimeError( "Failed to parse address %r: %s" % (host, e)) from e if sys.version_info < (3, 4) and secure: raise RuntimeError("Secure IMAP is not availabe in Python < 3.4") try: connection = imaplib.IMAP4(host=address, port=port) try: if sys.version_info < (3, 4): connection.starttls() else: connection.starttls(ssl.create_default_context()) except (imaplib.IMAP4.error, ssl.CertificateError) as e: if secure: raise self.logger.debug("Failed to establish secure connection: %s", e, exc_info=True) try: connection.login(user, password) except imaplib.IMAP4.error as e: self.logger.debug( "IMAP authentication failed: %s", e, exc_info=True) return False connection.logout() return True except (OSError, imaplib.IMAP4.error) as e: raise RuntimeError("Failed to communicate with IMAP server %r: " "%s" % (host, e)) from e
def fetch(self): """Fetches email messages from an IMAP4 server""" messages = {} typ, data = self.handle.search(None, 'ALL') for num in data[0].split(): typ, data = self.handle.fetch(num, '(RFC822)') messages[num] = self.parse_email(data[0][1]) return messages
def disconnect(self): """Closes the IMAP4 handle""" self.handle.expunge() self.handle.close() self.handle.logout()
def test_issue5949(self): class EOFHandler(socketserver.StreamRequestHandler): def handle(self): # EOF without sending a complete welcome message. self.wfile.write(b'* OK') with self.reaped_server(EOFHandler) as server: self.assertRaises(imaplib.IMAP4.abort, self.imap_class, *server.server_address)
def test_line_termination(self): class BadNewlineHandler(SimpleIMAPHandler): def cmd_CAPABILITY(self, tag, args): self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII')) with self.reaped_server(BadNewlineHandler) as server: self.assertRaises(imaplib.IMAP4.abort, self.imap_class, *server.server_address)
def __init__(self, account, auth_code, name='', **config): account_name, server_name = account.split('@') self.smtp = 'smtp.' + server_name self.imap = 'imap.' + server_name self.server_name = server_name self.smtp_port = 0 self.imap_port = 0 self.use_ssl = True self.__dict__.update(SERVER_LIB.get(server_name, {})) self.__dict__.update(config) self.name = '%s <%s>' % (name or account_name, account) self.account = account self.auth_code = auth_code st_SMTP = smtplib.SMTP_SSL if self.use_ssl else smtplib.SMTP st_IMAP = imaplib.IMAP4_SSL if self.use_ssl else imaplib.IMAP4 if self.smtp_port: self.st_SMTP = lambda : st_SMTP(self.smtp, self.smtp_port) else: self.st_SMTP = lambda : st_SMTP(self.smtp) if self.imap_port: self.st_IMAP = lambda : st_IMAP(self.imap, self.imap_port) else: self.st_IMAP = lambda : st_IMAP(self.imap) self.SMTP = lambda : SMTP(self) self.IMAP = lambda : IMAP(self)
def test_issue5949(self): class EOFHandler(SocketServer.StreamRequestHandler): def handle(self): # EOF without sending a complete welcome message. self.wfile.write('* OK') with self.reaped_server(EOFHandler) as server: self.assertRaises(imaplib.IMAP4.abort, self.imap_class, *server.server_address)
def test_linetoolong(self): class TooLongHandler(SimpleIMAPHandler): def handle(self): # Send a very long response line self.wfile.write('* OK ' + imaplib._MAXLINE*'x' + '\r\n') with self.reaped_server(TooLongHandler) as server: self.assertRaises(imaplib.IMAP4.error, self.imap_class, *server.server_address)
def __init__(self, *args, **kwargs): # {{{2 if settings.IMAP_SSL: self.mailbox = IMAP4_SSL( host = settings.IMAP_SERVER, port = settings.IMAP_PORT) else: self.mailbox = IMAP4( settings.IMAP_SERVER ) self.mailbox.login( settings.IMAP_LOGIN, settings.IMAP_PASSWD ) self.mailbox.select() super( Command, self ).__init__( *args, **kwargs )
def test_line_termination(self): class BadNewlineHandler(SimpleIMAPHandler): def cmd_CAPABILITY(self, tag, args): self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') self._send_tagged(tag, 'OK', 'CAPABILITY completed') with self.reaped_server(BadNewlineHandler) as server: self.assertRaises(imaplib.IMAP4.abort, self.imap_class, *server.server_address)
def test_bad_auth_name(self): class MyServer(SimpleIMAPHandler): def cmd_AUTHENTICATE(self, tag, args): self._send_tagged(tag, 'NO', 'unrecognized authentication ' 'type {}'.format(args[0])) with self.reaped_pair(MyServer) as (server, client): with self.assertRaises(imaplib.IMAP4.error): client.authenticate('METHOD', lambda: 1)
def test_invalid_authentication(self): class MyServer(SimpleIMAPHandler): def cmd_AUTHENTICATE(self, tag, args): self._send_textline('+') self.response = yield self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') with self.reaped_pair(MyServer) as (server, client): with self.assertRaises(imaplib.IMAP4.error): code, data = client.authenticate('MYAUTH', lambda x: b'fake')
def test_linetoolong(self): class TooLongHandler(SimpleIMAPHandler): def handle(self): # Send a very long response line self.wfile.write(b'* OK ' + imaplib._MAXLINE*b'x' + b'\r\n') with self.reaped_server(TooLongHandler) as server: self.assertRaises(imaplib.IMAP4.error, self.imap_class, *server.server_address)
def get_inbox(self): mailbox = imaplib.IMAP4(self.host) mailbox.login(bytes(self.username, 'utf8'), bytes(self.password, 'utf8')) mailbox.select() x, data = mailbox.search(None, 'ALL') messages = [] for num in data[0].split(): x, message = mailbox.fetch(num, '(RFC822)') messages.append(message[0][1]) return messages
def resetConnection(self): parsed_url = self.url try: imap_server = os.environ['IMAP_SERVER'] except KeyError: imap_server = parsed_url.hostname # Try to close the connection cleanly try: self.conn.close() except Exception: pass if (parsed_url.scheme == "imap"): cl = imaplib.IMAP4 self.conn = cl(imap_server, 143) elif (parsed_url.scheme == "imaps"): cl = imaplib.IMAP4_SSL self.conn = cl(imap_server, 993) log.Debug("Type of imap class: %s" % (cl.__name__)) self.remote_dir = re.sub(r'^/', r'', parsed_url.path, 1) # Login if (not(globals.imap_full_address)): self.conn.login(self.username, self.password) self.conn.select(globals.imap_mailbox) log.Info("IMAP connected") else: self.conn.login(self.username + "@" + parsed_url.hostname, self.password) self.conn.select(globals.imap_mailbox) log.Info("IMAP connected")
def _put(self, source_path, remote_filename): f = source_path.open("rb") allowedTimeout = globals.timeout if (allowedTimeout == 0): # Allow a total timeout of 1 day allowedTimeout = 2880 while allowedTimeout > 0: try: self.conn.select(remote_filename) body = self.prepareBody(f, remote_filename) # If we don't select the IMAP folder before # append, the message goes into the INBOX. self.conn.select(globals.imap_mailbox) self.conn.append(globals.imap_mailbox, None, None, body) break except (imaplib.IMAP4.abort, socket.error, socket.sslerror): allowedTimeout -= 1 log.Info("Error saving '%s', retrying in 30s " % remote_filename) time.sleep(30) while allowedTimeout > 0: try: self.resetConnection() break except (imaplib.IMAP4.abort, socket.error, socket.sslerror): allowedTimeout -= 1 log.Info("Error reconnecting, retrying in 30s ") time.sleep(30) log.Info("IMAP mail with '%s' subject stored" % remote_filename)
def __init__(self, timeout, host='', port=993, keyfile=None, certfile=None): self.keyfile = keyfile self.certfile = certfile self.timeout = timeout self.sslobj = None imaplib.IMAP4.__init__(self, host, port) # pylint: disable=W0222
def open(self, host, port): """Setup connection to remote server on "host:port". (default: localhost:standard IMAP4 SSL port). This connection will be used by the routines: read, readline, send, shutdown. """ self.host = host self.port = port self.sock = socket.create_connection((host, port)) self.sslobj = socket.ssl(self.sock, self.keyfile, self.certfile)
def socket(self): """Return socket instance used to connect to IMAP4 server. socket = <instance>.socket() """ return self.sock