Python imaplib 模块,IMAP4_SSL 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用imaplib.IMAP4_SSL

项目:Personal_AI_Assistant    作者:PratylenClub    | 项目源码 | 文件源码
def read(username, password, sender_of_interest):
    # Login to INBOX
    imap = imaplib.IMAP4_SSL("imap.gmail.com", 993)
    imap.login(username, password)
    imap.select('INBOX')

    status, response = imap.search(None, '(UNSEEN)')
    unread_msg_nums = response[0].split()

    # Print the count of all unread messages
    print len(unread_msg_nums)

    # Print all unread messages from a certain sender of interest
    status, response = imap.search(None, '(UNSEEN)', '(FROM "%s")' % (sender_of_interest))
    unread_msg_nums = response[0].split()
    da = []
    for e_id in unread_msg_nums:
        _, response = imap.fetch(e_id, '(UID BODY[TEXT])')
        da.append(response[0][1])
    print da

    # Mark them as seen
    for e_id in unread_msg_nums:
        imap.store(e_id, '+FLAGS', '\Seen')
项目:PySMS    作者:clayshieh    | 项目源码 | 文件源码
def init_server(self):
        self.logger.info("Initializing SMTP/IMAP servers.")
        # PySMS at minimum uses smtp server
        try:
            if self.ssl:
                self.smtp = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port)
            else:
                self.smtp = smtplib.SMTP(self.smtp_server, self.smtp_port)
                self.smtp.starttls()
            self.smtp.login(self.address, self.password)
        except smtplib.SMTPException:
            raise PySMSException("Unable to start smtp server, please check credentials.")

        # If responding functionality is enabled
        if self.imap_server:
            try:
                if self.ssl:
                    self.imap = imaplib.IMAP4_SSL(self.imap_server)
                else:
                    self.imap = imaplib.IMAP4(self.imap_server)
                r, data = self.imap.login(self.address, self.password)
                if r == "OK":
                    r, data = self.imap.select(self.imap_mailbox)
                    if r != "OK":
                        raise PySMSException("Unable to select mailbox: {0}".format(self.imap_mailbox))
                else:
                    raise PySMSException("Unable to login to IMAP server with given credentials.")
            except imaplib.IMAP4.error:
                raise PySMSException("Unable to start IMAP server, please check address and SSL/TLS settings.")
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def test_imap_old(user):
    storage = Storage('credentials_file')
    credentials = storage.get()
    xoauth = xoauth2_str(user, credentials.access_token)
    conn = imaplib.IMAP4_SSL('imap.googlemail.com')
    conn.debug = 4

    conn.authenticate('XOAUTH2', lambda x: xoauth)

    status, labels = conn.list()

    conn.select("[Gmail]/All Mail")
    # Once authenticated everything from the impalib.IMAP4_SSL class will
    # work as per usual without any modification to your code.
    typ, msgnums = conn.search(None, 'X-GM-RAW', 'vget')

    print 'typ', typ
    print 'num', msgnums
    # conn.select('INBOX')
    # print conn.list()
项目:pymotw3    作者:reingart    | 项目源码 | 文件源码
def open_connection(verbose=False):
    # Read the config file
    config = configparser.ConfigParser()
    config.read([os.path.expanduser('~/.pymotw')])

    # Connect to the server
    hostname = config.get('server', 'hostname')
    if verbose:
        print('Connecting to', hostname)
    connection = imaplib.IMAP4_SSL(hostname)

    # Login to our account
    username = config.get('account', 'username')
    password = config.get('account', 'password')
    if verbose:
        print('Logging in as', username)
    connection.login(username, password)
    return connection
项目:prestashop-sync    作者:dragoon    | 项目源码 | 文件源码
def connect(self):
        """Connects to and authenticates with an IMAP4 mail server"""

        import imaplib

        M = None
        try:
            if (self.keyfile and self.certfile) or self.ssl:
                M = imaplib.IMAP4_SSL(self.host, self.port, self.keyfile, self.certfile)
            else:
                M = imaplib.IMAP4(self.host, self.port)

            M.login(self.username, self.password)
            M.select()
        except socket.error, err:
            raise
        else:
            return M
项目:imap_tools    作者:ikvk    | 项目源码 | 文件源码
def __init__(self, host='', port=None, ssl=True, keyfile=None, certfile=None, ssl_context=None):
        """
        :param host: host's name (default: localhost)
        :param port: port number (default: standard IMAP4 SSL port)
        :param ssl: use client class over SSL connection (IMAP4_SSL) if True, else use IMAP4
        :param keyfile: PEM formatted file that contains your private key (default: None)
        :param certfile: PEM formatted certificate chain file (default: None)
        :param ssl_context: SSLContext object that contains your certificate chain and private key (default: None)
        Note: if ssl_context is provided, then parameters keyfile or
              certfile should not be set otherwise ValueError is raised.
        """
        self._host = host
        self._port = port
        self._keyfile = keyfile
        self._certfile = certfile
        self._ssl_context = ssl_context
        if ssl:
            self.box = imaplib.IMAP4_SSL(
                host, port or imaplib.IMAP4_SSL_PORT, keyfile, certfile, ssl_context)
        else:
            self.box = imaplib.IMAP4(host, port or imaplib.IMAP4_PORT)
        self._username = None
        self._password = None
        self._initial_folder = None
        self.folder = None
项目:textMe    作者:RParkerE    | 项目源码 | 文件源码
def read(hostname, username, password):
    # Get the imap server for the host
    hostname = 'imap.' + hostname.lower()
    # Start SSL connection with the host server
    server = IMAP4_SSL(hostname)
    try:
        # Login to the given account
        server.login(username, password)
        # Select the Inbox and make it read only
        server.select('INBOX', readonly=True)
        # Check how many unread messages are in the inbox
        msgnums = server.search(None, 'UnSeen')
        if(msgnums[0] == 'OK'):
            # List how many unread messages there are [1,2, 3, ...]
            for num in msgnums[1]:
                # Return number of unread messages (The last one in the list msgnums)
                return (num[-1])
        else:
            return ('Failed')
    except IMAP4.error:
        return ('Failed')
项目:Pikaptcha    作者:sriyegna    | 项目源码 | 文件源码
def email_verify(plusmail, googlepass):
    time.sleep(5)
    #Waiting 5 seconds before checking email
    email_address = plusmail
    M = imaplib.IMAP4_SSL('imap.gmail.com')    
    try:
        M.login(email_address, googlepass)
        print "Logged in to: " + email_address

        rv, mailboxes = M.list()
        rv, data = M.select("INBOX")
        if rv == 'OK':
            print "Processing mailbox..."
            proc_mail(M)
            M.close()
        M.logout()
    except imaplib.IMAP4.error:
        print "Unable to login to: " + email_address + ". Was not verified\n"
项目:DET    作者:sensepost    | 项目源码 | 文件源码
def listen():
    app_exfiltrate.log_message('info', "[gmail] Listening for mails...")
    client_imap = imaplib.IMAP4_SSL(server)
    try:
        client_imap.login(gmail_user, gmail_pwd)
    except:
        app_exfiltrate.log_message(
            'warning', "[gmail] Did not manage to authenticate with creds: {}:{}".format(gmail_user, gmail_pwd))
        sys.exit(-1)

    while True:
        client_imap.select("INBOX")
        typ, id_list = client_imap.uid(
            'search', None, "(UNSEEN SUBJECT 'det:toolkit')")
        for msg_id in id_list[0].split():
            msg_data = client_imap.uid('fetch', msg_id, '(RFC822)')
            raw_email = msg_data[1][0][1]
            # continue inside the same for loop as above
            raw_email_string = raw_email.decode('utf-8')
            # converts byte literal to string removing b''
            email_message = email.message_from_string(raw_email_string)
            # this will loop through all the available multiparts in mail
            for part in email_message.walk():
                if part.get_content_type() == "text/plain":  # ignore attachments/html
                    body = part.get_payload(decode=True)
                    data = body.split('\r\n')[0]
                    # print data
                    try:
                        app_exfiltrate.retrieve_data(base64.b64decode(data))
                    except Exception, e:
                        print e
                else:
                    continue
        time.sleep(2)
项目:Atlantr    作者:SUP3RIA    | 项目源码 | 文件源码
def imap(usr, pw, host):
    socket.setdefaulttimeout(time_out)
    usr = usr.lower()
    try:
        if len(host) < 2:
            port = 993
        else:
            port = int(host[1])
        mail = imaplib.IMAP4_SSL(str(host[0]), port)
        a = str(mail.login(usr, pw))
        return a[2: 4]
    except imaplib.IMAP4.error:
        return False
    except BaseException:
        return "Error"

#/-----------------IMAP-------------------------#


#------GETUNKNOWN--HOST--------------------------#
项目:Atlantr    作者:SUP3RIA    | 项目源码 | 文件源码
def getunknown_imap(subb):
    socket.setdefaulttimeout(time_out)
    try:
        # TODO: Change to dynamic matchers
        sub = [
            'imap',
            'mail',
            'pop',
            'pop3',
            'imap-mail',
            'inbound',
            'mx',
            'imaps',
            'smtp',
            'm']
        for host in sub:
            host = host + '.' + subb
            try:
                mail = imaplib.IMAP4_SSL(str(host))
                mail.login('test', 'test')
            except imaplib.IMAP4.error:
                return host
    except BaseException:
        return None
项目:pnu    作者:erasaur    | 项目源码 | 文件源码
def reconnect(self):
        auth_attempts = 0

        while auth_attempts <= constants.MAX_RECONNECT_RETRIES:
            auth_attempts += 1

            try:
                self.mail = imaplib.IMAP4_SSL(pub_config['gmail']['imap'])
                u, data = self.mail.login(private_config['gmail']['username'],
                                          private_config['gmail']['password'])
                return
            except imaplib.IMAP4.error as e:
                logging.error("Login to retrieve emails failed!")
                logging.error(e)

            except Exception as e:
                logging.error("Error from IMAP! Trying to reconnect...")
                logging.error(e)

            time.sleep(constants.SMTP_RECONNECT_SLEEP_TIME)

        if auth_attempts > constants.MAX_RECONNECT_RETRIES:
            from pnu.outbound.alerter import smtp
            logging.critical("Could not connect to IMAP handler")
            smtp.send_error("Could not connect to IMAP handler")
项目:qxf2-page-object-model    作者:qxf2    | 项目源码 | 文件源码
def connect(self, raise_errors=True):
        # try:
        #     self.imap = imaplib.IMAP4_SSL(self.GMAIL_IMAP_HOST, self.GMAIL_IMAP_PORT)
        # except socket.error:
        #     if raise_errors:
        #         raise Exception('Connection failure.')
        #     self.imap = None

        self.imap = imaplib.IMAP4_SSL(self.GMAIL_IMAP_HOST, self.GMAIL_IMAP_PORT)

        # self.smtp = smtplib.SMTP(self.server,self.port)
        # self.smtp.set_debuglevel(self.debug)
        # self.smtp.ehlo()
        # self.smtp.starttls()
        # self.smtp.ehlo()

        return self.imap
项目:Positions    作者:nikoulis    | 项目源码 | 文件源码
def readMail(self, subject):
        connection = imaplib.IMAP4_SSL('imap.mail.yahoo.com')
        connection.login('pnikoulis', 'ai024709th3669')
        connection.select(readonly=True)
        typ, data = connection.search(None, 'SUBJECT', subject)
        ids = data[0].split()
        id = ids[0]
        typ, data = connection.fetch(id, '(RFC822)')
        message = email.message_from_string(data[0][1])
        print '------------------------------------------> Message', id
        print 'Date:', message['Date']
        print 'From:', message['From']
        print 'Subject:', message['Subject']
        self.positions = getPositionsFromMessage(message)

    #------------------------------------------------------------------------
    # Check if a position is in portfolio (by checking symbol and exit date)
    #------------------------------------------------------------------------
项目:Positions    作者:nikoulis    | 项目源码 | 文件源码
def readMail(self, subject):
        connection = imaplib.IMAP4_SSL('imap.mail.yahoo.com')
        connection.login('pnikoulis', 'ai024709th3669')
        connection.select(readonly=True)
        typ, data = connection.search(None, 'SUBJECT', subject)
        ids = data[0].split()
        id = ids[0]
        typ, data = connection.fetch(id, '(RFC822)')
        message = email.message_from_string(data[0][1])
        print '------------------------------------------> Message', id
        print 'Date:', message['Date']
        print 'From:', message['From']
        print 'Subject:', message['Subject']
        self.positions = getPositionsFromMessage(message)

    #------------------------------------------------------------------------
    # Check if a position is in portfolio (by checking symbol and exit date)
    #------------------------------------------------------------------------
项目:vieterp-mailbox    作者:vieterp    | 项目源码 | 文件源码
def connect(self):
        if self.source_id and self.source_id.id:
            self.ensure_one()
            if self.source_id.type == 'imap':
                if self.source_id.is_ssl:
                    connection = IMAP4_SSL(self.source_id.server, int(self.source_id.port))
                else:
                    connection = IMAP4(self.source_id.server, int(self.source_id.port))
                connection.login(self.user, self.password)
            elif self.type == 'pop':
                if self.source_id.is_ssl:
                    connection = POP3_SSL(self.source_id.server, int(self.source_id.port))
                else:
                    connection = POP3(self.source_id.server, int(self.source_id.port))
                # TODO: use this to remove only unread messages
                # connection.user("recent:"+server.user)
                connection.user(self.user)
                connection.pass_(self.password)
            # Add timeout on socket
            connection.sock.settimeout(MAIL_TIMEOUT)
            return connection
        return super(vieterp_fetchmail_server, self).connect()
项目:Python    作者:Gitcha-Beginners    | 项目源码 | 文件源码
def read(self):
        try:
            # connect to server
            server = imaplib.IMAP4_SSL('imap.' + self.ID[self.ID.index('@') + 1:])

            # login
            server.login(self.ID, self.PASSWORD)
            server.list()
            server.select()

            typ, data = server.search(None, 'ALL')
            for num in data[0].split():
                typ, data = server.fetch(num, '(RFC822)')
                print('Message %s\n%s\n' % (num, data[0][1]))

            server.logout()
        except Exception as e:
            print("Reading is failed : {}".format(e.__str__()))
项目:code    作者:ActiveState    | 项目源码 | 文件源码
def login(self, username, password):
        self.M = imaplib.IMAP4_SSL(self.IMAP_SERVER, self.IMAP_PORT)
        self.S = smtplib.SMTP_SSL(self.SMTP_SERVER, self.SMTP_PORT)
        rc, self.response = self.M.login(username, password)
        sc, self.response_s = self.S.login(username, password)
        self.username = username
        return rc, sc
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def test_imap(user):
    credentials = get_credentials()
    conn = IMAPClient('imap.googlemail.com', use_uid=True, ssl=True)
    # conn.debug = 4

    conn.oauth2_login(user, credentials.access_token)

    # status, labels = conn.list()
    folders = conn.list_folders()
    try:
        all_box = next(box for (flags, _, box) in folders if '\All' in flags)
    except StopIteration:
        raise Error('all message box not found')

    logging.debug('All message box is {}'.format(all_box))

    conn.select_folder(all_box)
    # Once authenticated everything from the impalib.IMAP4_SSL class will
    # work as per usual without any modification to your code.
    # typ, msgnums = conn.search('X-GM-RAW vget')
    tid = int('14095f27c538b207', 16)
    # msgs = conn.search('X-GM-THRID {}'.format(tid))
    msgs = conn.search('X-GM-RAW uniquetokenXXX')

    print msgs
    # print conn.fetch(msgs, 'X-GM-MSGID')
    # print conn.fetch(msgs, 'RFC822')
    # conn.select('INBOX')
    # print conn.list()
项目:SPF    作者:Exploit-install    | 项目源码 | 文件源码
def connect(self, mailserver, port="993"):
        self.mailserver = mailserver
        self.port = port
        try:
            self.srv = imaplib.IMAP4_SSL(self.mailserver, self.port)
        except:
            self.srv = None
            pass

#-----------------------------------------------------------------------------
# POP3 subclass of Pillager Class
#-----------------------------------------------------------------------------
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def __init__(self, host, port, certfile=None, keyfile=None, timeout=None, ca_certs=None):
        self._timeout = timeout
        self.ca_certs = ca_certs

        imaplib.IMAP4_SSL.__init__(self, host, port, certfile, keyfile)
项目:abusehelper    作者:Exploit-install    | 项目源码 | 文件源码
def __init__(self, host, port, certfile=None, keyfile=None, timeout=None, ca_certs=None):
        self._timeout = timeout
        self.ca_certs = ca_certs

        imaplib.IMAP4_SSL.__init__(self, host, port, certfile, keyfile)
项目:lama    作者:CSE-POST    | 项目源码 | 文件源码
def _connect_imap(self):
        print("CONNECTION ...")
        self._mail_server = imaplib.IMAP4_SSL(self._server)
        self._mail_server.login(self._user, self._password)
项目:centos-base-consul    作者:zeroc0d3lab    | 项目源码 | 文件源码
def compute_state(self, key):
        if not key.username or not key.password:
            self.warn('Username and password are not configured')
            return None
        if key.use_ssl:
            mail = IMAP4_SSL(key.server, key.port)
        else:
            mail = IMAP4(key.server, key.port)
        mail.login(key.username, key.password)
        rc, message = mail.status(key.folder, '(UNSEEN)')
        unread_str = message[0].decode('utf-8')
        unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1))
        return unread_count
项目:PiBunny    作者:tholum    | 项目源码 | 文件源码
def __init__(self, target):
        # Target comes as protocol://target:port
        self.target = target
        proto, host, port = target.split(':')
        host = host[2:]
        if int(port) == 993 or proto.upper() == 'IMAPS':
            self.session = imaplib.IMAP4_SSL(host,int(port))
        else:
            #assume non-ssl IMAP
            self.session = imaplib.IMAP4(host,port)
        if 'AUTH=NTLM' not in self.session.capabilities:
            logging.error('IMAP server does not support NTLM authentication!')
            return False
        self.authtag = self.session._new_tag()
        self.lastresult = None
项目:Taigabot    作者:FrozenPigs    | 项目源码 | 文件源码
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
            lambda x: oauth2.build_xoauth_string(url, consumer, token))
项目:Fetch    作者:bourdakos1    | 项目源码 | 文件源码
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
            lambda x: oauth2.build_xoauth_string(url, consumer, token))
项目:Python-Network-Programming-Cookbook-Second-Edition    作者:PacktPublishing    | 项目源码 | 文件源码
def check_email(username): 
    mailbox = imaplib.IMAP4_SSL(GOOGLE_IMAP_SERVER, '993') 
    password = getpass.getpass(prompt="Enter your Google password: ") 
    mailbox.login(username, password)
    mailbox.select('Inbox')
    typ, data = mailbox.search(None, 'ALL')
    for num in data[0].split():
        typ, data = mailbox.fetch(num, '(RFC822)')
        print ('Message %s\n%s\n' % (num, data[0][1]))
        break
    mailbox.close()
    mailbox.logout()
项目:stephanie-va    作者:SlapBot    | 项目源码 | 文件源码
def do_init(self):
        try:
            self.conn = imaplib.IMAP4_SSL('imap.gmail.com')
            self.conn.debug = 0
            self.conn.login(self.gmail_address, self.password)
        except:
            response = ("Either your credentials are wrong mate, or there is some problem going on, do me a favor, I know "
                        "you won't but whatever, just inform me in the forums.")
            print(response)
            return response
项目:Personal_AI_Assistant    作者:PratylenClub    | 项目源码 | 文件源码
def read_message(assistant, instance_vlc, player_vlc, username, password, sender_of_interest=None):
    imap = imaplib.IMAP4_SSL("imap.gmail.com", 993)
    imap.login(username, password)
    imap.select('INBOX')
    if sender_of_interest is None:
        status, response = imap.search(None, '(UNSEEN)')
        unread_msg_nums = response[0].split()
    else:
        status, response = imap.search(None, '(UNSEEN)', '(FROM "%s")' % (sender_of_interest))
        unread_msg_nums = response[0].split()
    if unread_msg_nums:
        assistant.speak("Very good! Please wait, my assistant will read your messages!")
    else:
        assistant.speak("You do not have new messages!")
    for e_id in unread_msg_nums:
        _, header = imap.fetch(e_id, '(UID BODY[HEADER])')
        header = header[0][-1]
        header = "".join(header.split("\r"))
        header = [h for h in header.split("\n") if h != ""]
        subject = header[-1]
        sender = header[-3].split("@")[0]
        _, text = imap.fetch(e_id, '(UID BODY[1])')
        text = text[0][1]
        text = "Content :"+text
        message = sender + ". " + subject + ". " + text
        read_nicely_text(message, instance_vlc, player_vlc)
    # Mark them as seen
    for e_id in unread_msg_nums:
        imap.store(e_id, '+FLAGS', '\Seen')
项目:qqbot    作者:pandolia    | 项目源码 | 文件源码
def __init__(self, account, auth_code, name='', **config):
        account_name, server_name = account.split('@')

        self.smtp = 'smtp.' + server_name
        self.imap = 'imap.' + server_name
        self.server_name = server_name
        self.smtp_port = 0
        self.imap_port = 0
        self.use_ssl = True

        self.__dict__.update(SERVER_LIB.get(server_name, {}))
        self.__dict__.update(config)

        self.name = '%s <%s>' % (name or account_name, account)
        self.account = account
        self.auth_code = auth_code

        st_SMTP = smtplib.SMTP_SSL if self.use_ssl else smtplib.SMTP
        st_IMAP = imaplib.IMAP4_SSL if self.use_ssl else imaplib.IMAP4

        if self.smtp_port:
            self.st_SMTP = lambda : st_SMTP(self.smtp, self.smtp_port)
        else:
            self.st_SMTP = lambda : st_SMTP(self.smtp)

        if self.imap_port:
            self.st_IMAP = lambda : st_IMAP(self.imap, self.imap_port)
        else:
            self.st_IMAP = lambda : st_IMAP(self.imap)

        self.SMTP = lambda : SMTP(self)
        self.IMAP = lambda : IMAP(self)
项目:gdog    作者:maldevel    | 项目源码 | 文件源码
def __init__(self):
        self.c = imaplib.IMAP4_SSL(server)
        self.c.login(gmail_user, gmail_pwd)
项目:CVE-2017-7494    作者:joxeankoret    | 项目源码 | 文件源码
def __init__(self, target):
        # Target comes as protocol://target:port
        self.target = target
        proto, host, port = target.split(':')
        host = host[2:]
        if int(port) == 993 or proto.upper() == 'IMAPS':
            self.session = imaplib.IMAP4_SSL(host,int(port))
        else:
            #assume non-ssl IMAP
            self.session = imaplib.IMAP4(host,port)
        if 'AUTH=NTLM' not in self.session.capabilities:
            logging.error('IMAP server does not support NTLM authentication!')
            return False
        self.authtag = self.session._new_tag()
        self.lastresult = None
项目:jessy    作者:jessy-project    | 项目源码 | 文件源码
def fetch_unread_emails(profile, since=None, mark_read=False, limit=None):
    """
        Fetches a list of unread email objects from a user's Gmail inbox.

        Arguments:
        profile -- contains information related to the user (e.g., Gmail
                   address)
        since -- if provided, no emails before this date will be returned
        markRead -- if True, marks all returned emails as read in target inbox

        Returns:
        A list of unread email objects.
    """
    conn = imaplib.IMAP4_SSL('imap.gmail.com')
    conn.debug = 0
    conn.login(profile['gmail_address'], profile['gmail_password'])
    conn.select(readonly=(not mark_read))

    msgs = []
    (retcode, messages) = conn.search(None, '(UNSEEN)')

    if retcode == 'OK' and messages != ['']:
        numUnread = len(messages[0].split(' '))
        if limit and numUnread > limit:
            return numUnread

        for num in messages[0].split(' '):
            # parse email RFC822 format
            ret, data = conn.fetch(num, '(RFC822)')
            msg = email.message_from_string(data[0][1])

            if not since or get_date(msg) > since:
                msgs.append(msg)
    conn.close()
    conn.logout()

    return msgs
项目:grical    作者:wikical    | 项目源码 | 文件源码
def __init__(self, *args, **kwargs): # {{{2
        if settings.IMAP_SSL:
            self.mailbox = IMAP4_SSL(
                    host = settings.IMAP_SERVER, port = settings.IMAP_PORT)
        else:
            self.mailbox = IMAP4( settings.IMAP_SERVER )
        self.mailbox.login( settings.IMAP_LOGIN, settings.IMAP_PASSWD )
        self.mailbox.select()
        super( Command, self ).__init__( *args, **kwargs )
项目:luath    作者:kerrmarin    | 项目源码 | 文件源码
def __init__(self):
        """
        Connects to Google's imap mailbox via SSL
        """
        self.mailbox = imaplib.IMAP4_SSL('imap.gmail.com')
项目:OneClickDTU    作者:satwikkansal    | 项目源码 | 文件源码
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
            lambda x: oauth2.build_xoauth_string(url, consumer, token))
项目:SWCheckIn    作者:gsugar87    | 项目源码 | 文件源码
def signIn(verbose=False):
    mail = imaplib.IMAP4_SSL('imap.gmail.com')
    mail.login(username, password)
    if verbose:
        print('Signed into ' + username)
    return mail
项目:textMe    作者:RParkerE    | 项目源码 | 文件源码
def read(hostname, username, password):
    hostname = 'imap.' + hostname 
    server = IMAP4_SSL(hostname)
    try:
        server.login(username, password)
        server.select('INBOX', readonly=True)
        msgnums = server.search(None, 'UnSeen')
        if(msgnums[0] == 'OK'):
            for num in msgnums[1]:
                unread.append(num[-1])
        else:
            unread.append(0)
    except IMAP4.error:
        failed = True
项目:SrsAutoLogin    作者:ebatuhankaynak    | 项目源码 | 文件源码
def get_mails(mail_address, mail_pass):
    mail = imaplib.IMAP4_SSL('mail.bilkent.edu.tr')
    mail.login(mail_address, mail_pass)
    mail.list()
    mail.select("inbox")
    result, data = mail.search(None, "ALL")

    ids = data[0]
    id_list = ids.split()
    latest_email_id = id_list[-1]

    result, data = mail.fetch(latest_email_id, "(RFC822)")

    raw_email = data[0][1]
    return raw_email
项目:TumblrVideos    作者:moedje    | 项目源码 | 文件源码
def authenticate(self, url, consumer, token):
        if consumer is not None and not isinstance(consumer, oauth2.Consumer):
            raise ValueError("Invalid consumer.")

        if token is not None and not isinstance(token, oauth2.Token):
            raise ValueError("Invalid token.")

        imaplib.IMAP4_SSL.authenticate(self, 'XOAUTH',
            lambda x: oauth2.build_xoauth_string(url, consumer, token))
项目:j2f    作者:jasper2fork    | 项目源码 | 文件源码
def fetch_unread_emails(self, since=None, markRead=False, limit=None):
        """
            Fetches a list of unread email objects from a user's Gmail inbox.

            Arguments:
            since -- if provided, no emails before this date will be returned
            markRead -- if True, marks all returned emails as read in target
                        inbox

            Returns:
            A list of unread email objects.
        """
        conn = imaplib.IMAP4_SSL('imap.gmail.com')
        conn.debug = 0
        conn.login(self.profile['gmail_address'],
                   self.profile['gmail_password'])
        conn.select(readonly=(not markRead))

        msgs = []
        (retcode, messages) = conn.search(None, '(UNSEEN)')

        if retcode == 'OK' and messages != ['']:
            numUnread = len(messages[0].split(' '))
            if limit and numUnread > limit:
                return numUnread

            for num in messages[0].split(' '):
                # parse email RFC822 format
                ret, data = conn.fetch(num, '(RFC822)')
                msg = email.message_from_string(data[0][1])

                if not since or get_date(msg) > since:
                    msgs.append(msg)
        conn.close()
        conn.logout()

        return msgs
项目:manage-maintenance    作者:colinmcintosh    | 项目源码 | 文件源码
def connect(self):
        self._imap = imaplib.IMAP4_SSL(self._address)
        return_code, data = self._imap.login(self._username, self._password)
        if return_code != "OK":
            raise Exception("Error logging in to IMAP as {}: {} {}".format(self._username, return_code, data))
项目:alicloud-duplicity    作者:aliyun    | 项目源码 | 文件源码
def resetConnection(self):
        parsed_url = self.url
        try:
            imap_server = os.environ['IMAP_SERVER']
        except KeyError:
            imap_server = parsed_url.hostname

        #  Try to close the connection cleanly
        try:
            self.conn.close()
        except Exception:
            pass

        if (parsed_url.scheme == "imap"):
            cl = imaplib.IMAP4
            self.conn = cl(imap_server, 143)
        elif (parsed_url.scheme == "imaps"):
            cl = imaplib.IMAP4_SSL
            self.conn = cl(imap_server, 993)

        log.Debug("Type of imap class: %s" % (cl.__name__))
        self.remote_dir = re.sub(r'^/', r'', parsed_url.path, 1)

        #  Login
        if (not(globals.imap_full_address)):
            self.conn.login(self.username, self.password)
            self.conn.select(globals.imap_mailbox)
            log.Info("IMAP connected")
        else:
            self.conn.login(self.username + "@" + parsed_url.hostname, self.password)
            self.conn.select(globals.imap_mailbox)
            log.Info("IMAP connected")
项目:jasper-modules    作者:mattcurrycom    | 项目源码 | 文件源码
def fetchUnreadEmails(profile, since=None, markRead=False, limit=None):
    """
        Fetches a list of unread email objects from a user's Gmail inbox.

        Arguments:
        profile -- contains information related to the user (e.g., Gmail
                   address)
        since -- if provided, no emails before this date will be returned
        markRead -- if True, marks all returned emails as read in target inbox

        Returns:
        A list of unread email objects.
    """
    conn = imaplib.IMAP4_SSL('imap.gmail.com')
    conn.debug = 0
    conn.login(profile['gmail_address'], profile['gmail_password'])
    conn.select(readonly=(not markRead))

    msgs = []
    (retcode, messages) = conn.search(None, '(UNSEEN)')

    if retcode == 'OK' and messages != ['']:
        numUnread = len(messages[0].split(' '))
        if limit and numUnread > limit:
            return numUnread

        for num in messages[0].split(' '):
            # parse email RFC822 format
            ret, data = conn.fetch(num, '(RFC822)')
            msg = email.message_from_string(data[0][1])

            if not since or getDate(msg) > since:
                msgs.append(msg)
    conn.close()
    conn.logout()

    return msgs
项目:qxf2-page-object-model    作者:qxf2    | 项目源码 | 文件源码
def connect(self,imap_host):
        "Connect with the host"
        self.mail = imaplib.IMAP4_SSL(imap_host)

        return self.mail
项目:dotfiles    作者:gbraad    | 项目源码 | 文件源码
def compute_state(self, key):
        if not key.username or not key.password:
            self.warn('Username and password are not configured')
            return None
        if key.use_ssl:
            mail = IMAP4_SSL(key.server, key.port)
        else:
            mail = IMAP4(key.server, key.port)
        mail.login(key.username, key.password)
        rc, message = mail.status(key.folder, '(UNSEEN)')
        unread_str = message[0].decode('utf-8')
        unread_count = int(re.search('UNSEEN (\d+)', unread_str).group(1))
        return unread_count
项目:Automation-Bots    作者:ab-anand    | 项目源码 | 文件源码
def main():
    global i
    clear()
    mail = imaplib.IMAP4_SSL(SMTP_SERVER)
    mail.login(FROM_EMAIL, FROM_PWD)
    mail.list()
    mail.select('inbox')
    inp = 'n'


    while(inp!='q'):
        print('Fetching mails. Please wait...')
        if inp == 'n':
            i += 1
        elif inp == 'p' and i != 1:
            i -= 1
        else:
            print('Please enter valid input.')
        result, data = mail.uid('search', None, "ALL") # search and return uids instead
        latest_email_uid = data[0].split()[-i] # fetch mails 
        result, data = mail.uid('fetch', latest_email_uid, '(RFC822)')
        raw_email = data[0][1]
        email_message = email.message_from_string(raw_email)

        clear() # clear screen and print mail
        print 'To:', email_message['To']
        print 'Sent from:', email_message['From']
        print 'Date:', email_message['Date']
        print 'Subject:', email_message['Subject']
        print '*'*30, 'MESSAGE', '*'*30
        maintype = email_message.get_content_maintype()
        #print maintype

        if maintype == 'multipart': # get the body of the mail
            print email_message.get_payload()[0].get_payload() # to get the plain text only
        elif maintype == 'text':
            line = email_message.get_payload()[ 0 ]
            print line
        print '*'*69
        welcome()
        inp = raw_input('>> Enter your choice: ').lower()
项目:Web-Scraping-and-Python-Codes    作者:dushyantRathore    | 项目源码 | 文件源码
def readmail():
    username = raw_input("Enter username : ")
    password = raw_input("Enter password : ")

    mail = imaplib.IMAP4_SSL(smtp_server)

    mail.login(username, password)
    mail.select('inbox')

    type, data = mail.search(None, 'ALL')
    mail_ids = data[0]
    id_list = mail_ids.split()

    first_id = int(id_list[0])
    last_id = int(id_list[-1])

    # print first_id
    # print last_id

    for i in range(last_id, first_id, -1):
        typ, data = mail.fetch(i, '(RFC822)')

        for response in data:
            if isinstance(response, tuple):
                msg = email.message_from_string(response[1])
                email_subject = msg['subject']
                email_from = msg['from']
                print 'From : ' + email_from + '\n'
                print 'Subject : ' + email_subject + '\n'
项目:Steam-Trade-Bot    作者:YegorDia    | 项目源码 | 文件源码
def connect(self):
        self.imap_conn = imaplib.IMAP4_SSL(self.imap_server)