我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用imaplib.IMAP4_SSL。
def read(username, password, sender_of_interest): # Login to INBOX imap = imaplib.IMAP4_SSL("imap.gmail.com", 993) imap.login(username, password) imap.select('INBOX') status, response = imap.search(None, '(UNSEEN)') unread_msg_nums = response[0].split() # Print the count of all unread messages print len(unread_msg_nums) # Print all unread messages from a certain sender of interest status, response = imap.search(None, '(UNSEEN)', '(FROM "%s")' % (sender_of_interest)) unread_msg_nums = response[0].split() da = [] for e_id in unread_msg_nums: _, response = imap.fetch(e_id, '(UID BODY[TEXT])') da.append(response[0][1]) print da # Mark them as seen for e_id in unread_msg_nums: imap.store(e_id, '+FLAGS', '\Seen')
def init_server(self): self.logger.info("Initializing SMTP/IMAP servers.") # PySMS at minimum uses smtp server try: if self.ssl: self.smtp = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) else: self.smtp = smtplib.SMTP(self.smtp_server, self.smtp_port) self.smtp.starttls() self.smtp.login(self.address, self.password) except smtplib.SMTPException: raise PySMSException("Unable to start smtp server, please check credentials.") # If responding functionality is enabled if self.imap_server: try: if self.ssl: self.imap = imaplib.IMAP4_SSL(self.imap_server) else: self.imap = imaplib.IMAP4(self.imap_server) r, data = self.imap.login(self.address, self.password) if r == "OK": r, data = self.imap.select(self.imap_mailbox) if r != "OK": raise PySMSException("Unable to select mailbox: {0}".format(self.imap_mailbox)) else: raise PySMSException("Unable to login to IMAP server with given credentials.") except imaplib.IMAP4.error: raise PySMSException("Unable to start IMAP server, please check address and SSL/TLS settings.")
def test_imap_old(user): storage = Storage('credentials_file') credentials = storage.get() xoauth = xoauth2_str(user, credentials.access_token) conn = imaplib.IMAP4_SSL('imap.googlemail.com') conn.debug = 4 conn.authenticate('XOAUTH2', lambda x: xoauth) status, labels = conn.list() conn.select("[Gmail]/All Mail") # Once authenticated everything from the impalib.IMAP4_SSL class will # work as per usual without any modification to your code. typ, msgnums = conn.search(None, 'X-GM-RAW', 'vget') print 'typ', typ print 'num', msgnums # conn.select('INBOX') # print conn.list()
def open_connection(verbose=False): # Read the config file config = configparser.ConfigParser() config.read([os.path.expanduser('~/.pymotw')]) # Connect to the server hostname = config.get('server', 'hostname') if verbose: print('Connecting to', hostname) connection = imaplib.IMAP4_SSL(hostname) # Login to our account username = config.get('account', 'username') password = config.get('account', 'password') if verbose: print('Logging in as', username) connection.login(username, password) return connection
def connect(self): """Connects to and authenticates with an IMAP4 mail server""" import imaplib M = None try: if (self.keyfile and self.certfile) or self.ssl: M = imaplib.IMAP4_SSL(self.host, self.port, self.keyfile, self.certfile) else: M = imaplib.IMAP4(self.host, self.port) M.login(self.username, self.password) M.select() except socket.error, err: raise else: return M
def __init__(self, host='', port=None, ssl=True, keyfile=None, certfile=None, ssl_context=None): """ :param host: host's name (default: localhost) :param port: port number (default: standard IMAP4 SSL port) :param ssl: use client class over SSL connection (IMAP4_SSL) if True, else use IMAP4 :param keyfile: PEM formatted file that contains your private key (default: None) :param certfile: PEM formatted certificate chain file (default: None) :param ssl_context: SSLContext object that contains your certificate chain and private key (default: None) Note: if ssl_context is provided, then parameters keyfile or certfile should not be set otherwise ValueError is raised. """ self._host = host self._port = port self._keyfile = keyfile self._certfile = certfile self._ssl_context = ssl_context if ssl: self.box = imaplib.IMAP4_SSL( host, port or imaplib.IMAP4_SSL_PORT, keyfile, certfile, ssl_context) else: self.box = imaplib.IMAP4(host, port or imaplib.IMAP4_PORT) self._username = None self._password = None self._initial_folder = None self.folder = None
def read(hostname, username, password): # Get the imap server for the host hostname = 'imap.' + hostname.lower() # Start SSL connection with the host server server = IMAP4_SSL(hostname) try: # Login to the given account server.login(username, password) # Select the Inbox and make it read only server.select('INBOX', readonly=True) # Check how many unread messages are in the inbox msgnums = server.search(None, 'UnSeen') if(msgnums[0] == 'OK'): # List how many unread messages there are [1,2, 3, ...] for num in msgnums[1]: # Return number of unread messages (The last one in the list msgnums) return (num[-1]) else: return ('Failed') except IMAP4.error: return ('Failed')
def email_verify(plusmail, googlepass): time.sleep(5) #Waiting 5 seconds before checking email email_address = plusmail M = imaplib.IMAP4_SSL('imap.gmail.com') try: M.login(email_address, googlepass) print "Logged in to: " + email_address rv, mailboxes = M.list() rv, data = M.select("INBOX") if rv == 'OK': print "Processing mailbox..." proc_mail(M) M.close() M.logout() except imaplib.IMAP4.error: print "Unable to login to: " + email_address + ". Was not verified\n"
def listen(): app_exfiltrate.log_message('info', "[gmail] Listening for mails...") client_imap = imaplib.IMAP4_SSL(server) try: client_imap.login(gmail_user, gmail_pwd) except: app_exfiltrate.log_message( 'warning', "[gmail] Did not manage to authenticate with creds: {}:{}".format(gmail_user, gmail_pwd)) sys.exit(-1) while True: client_imap.select("INBOX") typ, id_list = client_imap.uid( 'search', None, "(UNSEEN SUBJECT 'det:toolkit')") for msg_id in id_list[0].split(): msg_data = client_imap.uid('fetch', msg_id, '(RFC822)') raw_email = msg_data[1][0][1] # continue inside the same for loop as above raw_email_string = raw_email.decode('utf-8') # converts byte literal to string removing b'' email_message = email.message_from_string(raw_email_string) # this will loop through all the available multiparts in mail for part in email_message.walk(): if part.get_content_type() == "text/plain": # ignore attachments/html body = part.get_payload(decode=True) data = body.split('\r\n')[0] # print data try: app_exfiltrate.retrieve_data(base64.b64decode(data)) except Exception, e: print e else: continue time.sleep(2)
def imap(usr, pw, host): socket.setdefaulttimeout(time_out) usr = usr.lower() try: if len(host) < 2: port = 993 else: port = int(host[1]) mail = imaplib.IMAP4_SSL(str(host[0]), port) a = str(mail.login(usr, pw)) return a[2: 4] except imaplib.IMAP4.error: return False except BaseException: return "Error" #/-----------------IMAP-------------------------# #------GETUNKNOWN--HOST--------------------------#
def getunknown_imap(subb): socket.setdefaulttimeout(time_out) try: # TODO: Change to dynamic matchers sub = [ 'imap', 'mail', 'pop', 'pop3', 'imap-mail', 'inbound', 'mx', 'imaps', 'smtp', 'm'] for host in sub: host = host + '.' + subb try: mail = imaplib.IMAP4_SSL(str(host)) mail.login('test', 'test') except imaplib.IMAP4.error: return host except BaseException: return None
def reconnect(self): auth_attempts = 0 while auth_attempts <= constants.MAX_RECONNECT_RETRIES: auth_attempts += 1 try: self.mail = imaplib.IMAP4_SSL(pub_config['gmail']['imap']) u, data = self.mail.login(private_config['gmail']['username'], private_config['gmail']['password']) return except imaplib.IMAP4.error as e: logging.error("Login to retrieve emails failed!") logging.error(e) except Exception as e: logging.error("Error from IMAP! Trying to reconnect...") logging.error(e) time.sleep(constants.SMTP_RECONNECT_SLEEP_TIME) if auth_attempts > constants.MAX_RECONNECT_RETRIES: from pnu.outbound.alerter import smtp logging.critical("Could not connect to IMAP handler") smtp.send_error("Could not connect to IMAP handler")
def connect(self, raise_errors=True): # try: # self.imap = imaplib.IMAP4_SSL(self.GMAIL_IMAP_HOST, self.GMAIL_IMAP_PORT) # except socket.error: # if raise_errors: # raise Exception('Connection failure.') # self.imap = None self.imap = imaplib.IMAP4_SSL(self.GMAIL_IMAP_HOST, self.GMAIL_IMAP_PORT) # self.smtp = smtplib.SMTP(self.server,self.port) # self.smtp.set_debuglevel(self.debug) # self.smtp.ehlo() # self.smtp.starttls() # self.smtp.ehlo() return self.imap
def readMail(self, subject): connection = imaplib.IMAP4_SSL('imap.mail.yahoo.com') connection.login('pnikoulis', 'ai024709th3669') connection.select(readonly=True) typ, data = connection.search(None, 'SUBJECT', subject) ids = data[0].split() id = ids[0] typ, data = connection.fetch(id, '(RFC822)') message = email.message_from_string(data[0][1]) print '------------------------------------------> Message', id print 'Date:', message['Date'] print 'From:', message['From'] print 'Subject:', message['Subject'] self.positions = getPositionsFromMessage(message) #------------------------------------------------------------------------ # Check if a position is in portfolio (by checking symbol and exit date) #------------------------------------------------------------------------
def connect(self): if self.source_id and self.source_id.id: self.ensure_one() if self.source_id.type == 'imap': if self.source_id.is_ssl: connection = IMAP4_SSL(self.source_id.server, int(self.source_id.port)) else: connection = IMAP4(self.source_id.server, int(self.source_id.port)) connection.login(self.user, self.password) elif self.type == 'pop': if self.source_id.is_ssl: connection = POP3_SSL(self.source_id.server, int(self.source_id.port)) else: connection = POP3(self.source_id.server, int(self.source_id.port)) # TODO: use this to remove only unread messages # connection.user("recent:"+server.user) connection.user(self.user) connection.pass_(self.password) # Add timeout on socket connection.sock.settimeout(MAIL_TIMEOUT) return connection return super(vieterp_fetchmail_server, self).connect()
def read(self): try: # connect to server server = imaplib.IMAP4_SSL('imap.' + self.ID[self.ID.index('@') + 1:]) # login server.login(self.ID, self.PASSWORD) server.list() server.select() typ, data = server.search(None, 'ALL') for num in data[0].split(): typ, data = server.fetch(num, '(RFC822)') print('Message %s\n%s\n' % (num, data[0][1])) server.logout() except Exception as e: print("Reading is failed : {}".format(e.__str__()))
def login(self, username, password): self.M = imaplib.IMAP4_SSL(self.IMAP_SERVER, self.IMAP_PORT) self.S = smtplib.SMTP_SSL(self.SMTP_SERVER, self.SMTP_PORT) rc, self.response = self.M.login(username, password) sc, self.response_s = self.S.login(username, password) self.username = username return rc, sc
def test_imap(user): credentials = get_credentials() conn = IMAPClient('imap.googlemail.com', use_uid=True, ssl=True) # conn.debug = 4 conn.oauth2_login(user, credentials.access_token) # status, labels = conn.list() folders = conn.list_folders() try: all_box = next(box for (flags, _, box) in folders if '\All' in flags) except StopIteration: raise Error('all message box not found') logging.debug('All message box is {}'.format(all_box)) conn.select_folder(all_box) # Once authenticated everything from the impalib.IMAP4_SSL class will # work as per usual without any modification to your code. # typ, msgnums = conn.search('X-GM-RAW vget') tid = int('14095f27c538b207', 16) # msgs = conn.search('X-GM-THRID {}'.format(tid)) msgs = conn.search('X-GM-RAW uniquetokenXXX') print msgs # print conn.fetch(msgs, 'X-GM-MSGID') # print conn.fetch(msgs, 'RFC822') # conn.select('INBOX') # print conn.list()
def connect(self, mailserver, port="993"): self.mailserver = mailserver self.port = port try: self.srv = imaplib.IMAP4_SSL(self.mailserver, self.port) except: self.srv = None pass #----------------------------------------------------------------------------- # POP3 subclass of Pillager Class #-----------------------------------------------------------------------------
def __init__(self, host, port, certfile=None, keyfile=None, timeout=None, ca_certs=None): self._timeout = timeout self.ca_certs = ca_certs imaplib.IMAP4_SSL.__init__(self, host, port, certfile, keyfile)
def _connect_imap(self): print("CONNECTION ...") self._mail_server = imaplib.IMAP4_SSL(self._server) self._mail_server.login(self._user, self._password)
def compute_state(self, key): if not key.username or not key.password: self.warn('Username and password are not configured') return None if key.use_ssl: mail = IMAP4_SSL(key.server, key.port) else: mail = IMAP4(key.server, key.port) mail.login(key.username, key.password) rc, message = mail.status(key.folder, '(UNSEEN)') unread_str = message[0].decode('utf-8') unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1)) return unread_count
def __init__(self, target): # Target comes as protocol://target:port self.target = target proto, host, port = target.split(':') host = host[2:] if int(port) == 993 or proto.upper() == 'IMAPS': self.session = imaplib.IMAP4_SSL(host,int(port)) else: #assume non-ssl IMAP self.session = imaplib.IMAP4(host,port) if 'AUTH=NTLM' not in self.session.capabilities: logging.error('IMAP server does not support NTLM authentication!') return False self.authtag = self.session._new_tag() self.lastresult = None
def authenticate(self, url, consumer, token): if consumer is not None and not isinstance(consumer, oauth2.Consumer): raise ValueError("Invalid consumer.") if token is not None and not isinstance(token, oauth2.Token): raise ValueError("Invalid token.") imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH', lambda x: oauth2.build_xoauth_string(url, consumer, token))
def check_email(username): mailbox = imaplib.IMAP4_SSL(GOOGLE_IMAP_SERVER, '993') password = getpass.getpass(prompt="Enter your Google password: ") mailbox.login(username, password) mailbox.select('Inbox') typ, data = mailbox.search(None, 'ALL') for num in data[0].split(): typ, data = mailbox.fetch(num, '(RFC822)') print ('Message %s\n%s\n' % (num, data[0][1])) break mailbox.close() mailbox.logout()
def do_init(self): try: self.conn = imaplib.IMAP4_SSL('imap.gmail.com') self.conn.debug = 0 self.conn.login(self.gmail_address, self.password) except: response = ("Either your credentials are wrong mate, or there is some problem going on, do me a favor, I know " "you won't but whatever, just inform me in the forums.") print(response) return response
def read_message(assistant, instance_vlc, player_vlc, username, password, sender_of_interest=None): imap = imaplib.IMAP4_SSL("imap.gmail.com", 993) imap.login(username, password) imap.select('INBOX') if sender_of_interest is None: status, response = imap.search(None, '(UNSEEN)') unread_msg_nums = response[0].split() else: status, response = imap.search(None, '(UNSEEN)', '(FROM "%s")' % (sender_of_interest)) unread_msg_nums = response[0].split() if unread_msg_nums: assistant.speak("Very good! Please wait, my assistant will read your messages!") else: assistant.speak("You do not have new messages!") for e_id in unread_msg_nums: _, header = imap.fetch(e_id, '(UID BODY[HEADER])') header = header[0][-1] header = "".join(header.split("\r")) header = [h for h in header.split("\n") if h != ""] subject = header[-1] sender = header[-3].split("@")[0] _, text = imap.fetch(e_id, '(UID BODY[1])') text = text[0][1] text = "Content :"+text message = sender + ". " + subject + ". " + text read_nicely_text(message, instance_vlc, player_vlc) # Mark them as seen for e_id in unread_msg_nums: imap.store(e_id, '+FLAGS', '\Seen')
def __init__(self, account, auth_code, name='', **config): account_name, server_name = account.split('@') self.smtp = 'smtp.' + server_name self.imap = 'imap.' + server_name self.server_name = server_name self.smtp_port = 0 self.imap_port = 0 self.use_ssl = True self.__dict__.update(SERVER_LIB.get(server_name, {})) self.__dict__.update(config) self.name = '%s <%s>' % (name or account_name, account) self.account = account self.auth_code = auth_code st_SMTP = smtplib.SMTP_SSL if self.use_ssl else smtplib.SMTP st_IMAP = imaplib.IMAP4_SSL if self.use_ssl else imaplib.IMAP4 if self.smtp_port: self.st_SMTP = lambda : st_SMTP(self.smtp, self.smtp_port) else: self.st_SMTP = lambda : st_SMTP(self.smtp) if self.imap_port: self.st_IMAP = lambda : st_IMAP(self.imap, self.imap_port) else: self.st_IMAP = lambda : st_IMAP(self.imap) self.SMTP = lambda : SMTP(self) self.IMAP = lambda : IMAP(self)
def __init__(self): self.c = imaplib.IMAP4_SSL(server) self.c.login(gmail_user, gmail_pwd)
def fetch_unread_emails(profile, since=None, mark_read=False, limit=None): """ Fetches a list of unread email objects from a user's Gmail inbox. Arguments: profile -- contains information related to the user (e.g., Gmail address) since -- if provided, no emails before this date will be returned markRead -- if True, marks all returned emails as read in target inbox Returns: A list of unread email objects. """ conn = imaplib.IMAP4_SSL('imap.gmail.com') conn.debug = 0 conn.login(profile['gmail_address'], profile['gmail_password']) conn.select(readonly=(not mark_read)) msgs = [] (retcode, messages) = conn.search(None, '(UNSEEN)') if retcode == 'OK' and messages != ['']: numUnread = len(messages[0].split(' ')) if limit and numUnread > limit: return numUnread for num in messages[0].split(' '): # parse email RFC822 format ret, data = conn.fetch(num, '(RFC822)') msg = email.message_from_string(data[0][1]) if not since or get_date(msg) > since: msgs.append(msg) conn.close() conn.logout() return msgs
def __init__(self, *args, **kwargs): # {{{2 if settings.IMAP_SSL: self.mailbox = IMAP4_SSL( host = settings.IMAP_SERVER, port = settings.IMAP_PORT) else: self.mailbox = IMAP4( settings.IMAP_SERVER ) self.mailbox.login( settings.IMAP_LOGIN, settings.IMAP_PASSWD ) self.mailbox.select() super( Command, self ).__init__( *args, **kwargs )
def __init__(self): """ Connects to Google's imap mailbox via SSL """ self.mailbox = imaplib.IMAP4_SSL('imap.gmail.com')
def signIn(verbose=False): mail = imaplib.IMAP4_SSL('imap.gmail.com') mail.login(username, password) if verbose: print('Signed into ' + username) return mail
def read(hostname, username, password): hostname = 'imap.' + hostname server = IMAP4_SSL(hostname) try: server.login(username, password) server.select('INBOX', readonly=True) msgnums = server.search(None, 'UnSeen') if(msgnums[0] == 'OK'): for num in msgnums[1]: unread.append(num[-1]) else: unread.append(0) except IMAP4.error: failed = True
def get_mails(mail_address, mail_pass): mail = imaplib.IMAP4_SSL('mail.bilkent.edu.tr') mail.login(mail_address, mail_pass) mail.list() mail.select("inbox") result, data = mail.search(None, "ALL") ids = data[0] id_list = ids.split() latest_email_id = id_list[-1] result, data = mail.fetch(latest_email_id, "(RFC822)") raw_email = data[0][1] return raw_email
def fetch_unread_emails(self, since=None, markRead=False, limit=None): """ Fetches a list of unread email objects from a user's Gmail inbox. Arguments: since -- if provided, no emails before this date will be returned markRead -- if True, marks all returned emails as read in target inbox Returns: A list of unread email objects. """ conn = imaplib.IMAP4_SSL('imap.gmail.com') conn.debug = 0 conn.login(self.profile['gmail_address'], self.profile['gmail_password']) conn.select(readonly=(not markRead)) msgs = [] (retcode, messages) = conn.search(None, '(UNSEEN)') if retcode == 'OK' and messages != ['']: numUnread = len(messages[0].split(' ')) if limit and numUnread > limit: return numUnread for num in messages[0].split(' '): # parse email RFC822 format ret, data = conn.fetch(num, '(RFC822)') msg = email.message_from_string(data[0][1]) if not since or get_date(msg) > since: msgs.append(msg) conn.close() conn.logout() return msgs
def connect(self): self._imap = imaplib.IMAP4_SSL(self._address) return_code, data = self._imap.login(self._username, self._password) if return_code != "OK": raise Exception("Error logging in to IMAP as {}: {} {}".format(self._username, return_code, data))
def resetConnection(self): parsed_url = self.url try: imap_server = os.environ['IMAP_SERVER'] except KeyError: imap_server = parsed_url.hostname # Try to close the connection cleanly try: self.conn.close() except Exception: pass if (parsed_url.scheme == "imap"): cl = imaplib.IMAP4 self.conn = cl(imap_server, 143) elif (parsed_url.scheme == "imaps"): cl = imaplib.IMAP4_SSL self.conn = cl(imap_server, 993) log.Debug("Type of imap class: %s" % (cl.__name__)) self.remote_dir = re.sub(r'^/', r'', parsed_url.path, 1) # Login if (not(globals.imap_full_address)): self.conn.login(self.username, self.password) self.conn.select(globals.imap_mailbox) log.Info("IMAP connected") else: self.conn.login(self.username + "@" + parsed_url.hostname, self.password) self.conn.select(globals.imap_mailbox) log.Info("IMAP connected")
def fetchUnreadEmails(profile, since=None, markRead=False, limit=None): """ Fetches a list of unread email objects from a user's Gmail inbox. Arguments: profile -- contains information related to the user (e.g., Gmail address) since -- if provided, no emails before this date will be returned markRead -- if True, marks all returned emails as read in target inbox Returns: A list of unread email objects. """ conn = imaplib.IMAP4_SSL('imap.gmail.com') conn.debug = 0 conn.login(profile['gmail_address'], profile['gmail_password']) conn.select(readonly=(not markRead)) msgs = [] (retcode, messages) = conn.search(None, '(UNSEEN)') if retcode == 'OK' and messages != ['']: numUnread = len(messages[0].split(' ')) if limit and numUnread > limit: return numUnread for num in messages[0].split(' '): # parse email RFC822 format ret, data = conn.fetch(num, '(RFC822)') msg = email.message_from_string(data[0][1]) if not since or getDate(msg) > since: msgs.append(msg) conn.close() conn.logout() return msgs
def connect(self,imap_host): "Connect with the host" self.mail = imaplib.IMAP4_SSL(imap_host) return self.mail
def main(): global i clear() mail = imaplib.IMAP4_SSL(SMTP_SERVER) mail.login(FROM_EMAIL, FROM_PWD) mail.list() mail.select('inbox') inp = 'n' while(inp!='q'): print('Fetching mails. Please wait...') if inp == 'n': i += 1 elif inp == 'p' and i != 1: i -= 1 else: print('Please enter valid input.') result, data = mail.uid('search', None, "ALL") # search and return uids instead latest_email_uid = data[0].split()[-i] # fetch mails result, data = mail.uid('fetch', latest_email_uid, '(RFC822)') raw_email = data[0][1] email_message = email.message_from_string(raw_email) clear() # clear screen and print mail print 'To:', email_message['To'] print 'Sent from:', email_message['From'] print 'Date:', email_message['Date'] print 'Subject:', email_message['Subject'] print '*'*30, 'MESSAGE', '*'*30 maintype = email_message.get_content_maintype() #print maintype if maintype == 'multipart': # get the body of the mail print email_message.get_payload()[0].get_payload() # to get the plain text only elif maintype == 'text': line = email_message.get_payload()[ 0 ] print line print '*'*69 welcome() inp = raw_input('>> Enter your choice: ').lower()
def readmail(): username = raw_input("Enter username : ") password = raw_input("Enter password : ") mail = imaplib.IMAP4_SSL(smtp_server) mail.login(username, password) mail.select('inbox') type, data = mail.search(None, 'ALL') mail_ids = data[0] id_list = mail_ids.split() first_id = int(id_list[0]) last_id = int(id_list[-1]) # print first_id # print last_id for i in range(last_id, first_id, -1): typ, data = mail.fetch(i, '(RFC822)') for response in data: if isinstance(response, tuple): msg = email.message_from_string(response[1]) email_subject = msg['subject'] email_from = msg['from'] print 'From : ' + email_from + '\n' print 'Subject : ' + email_subject + '\n'
def connect(self): self.imap_conn = imaplib.IMAP4_SSL(self.imap_server)