我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用email.message_from_file()。
def read_file(self, fileob): """Read the metadata values from a file object.""" msg = message_from_file(fileob) self._fields['Metadata-Version'] = msg['metadata-version'] # When reading, get all the fields we can for field in _ALL_FIELDS: if field not in msg: continue if field in _LISTFIELDS: # we can have multiple lines values = msg.get_all(field) if field in _LISTTUPLEFIELDS and values is not None: values = [tuple(value.split(',')) for value in values] self.set(field, values) else: # single line value = msg[field] if value is not None and value != 'UNKNOWN': self.set(field, value) self.set_metadata_version()
def test_default_type(self): eq = self.assertEqual fp = openfile('msg_30.txt') try: msg = email.message_from_file(fp) finally: fp.close() container1 = msg.get_payload(0) eq(container1.get_default_type(), 'message/rfc822') eq(container1.get_content_type(), 'message/rfc822') container2 = msg.get_payload(1) eq(container2.get_default_type(), 'message/rfc822') eq(container2.get_content_type(), 'message/rfc822') container1a = container1.get_payload(0) eq(container1a.get_default_type(), 'text/plain') eq(container1a.get_content_type(), 'text/plain') container2a = container2.get_payload(0) eq(container2a.get_default_type(), 'text/plain') eq(container2a.get_content_type(), 'text/plain')
def test_default_type_with_explicit_container_type(self): eq = self.assertEqual fp = openfile('msg_28.txt') try: msg = email.message_from_file(fp) finally: fp.close() container1 = msg.get_payload(0) eq(container1.get_default_type(), 'message/rfc822') eq(container1.get_content_type(), 'message/rfc822') container2 = msg.get_payload(1) eq(container2.get_default_type(), 'message/rfc822') eq(container2.get_content_type(), 'message/rfc822') container1a = container1.get_payload(0) eq(container1a.get_default_type(), 'text/plain') eq(container1a.get_content_type(), 'text/plain') container2a = container2.get_payload(0) eq(container2a.get_default_type(), 'text/plain') eq(container2a.get_content_type(), 'text/plain')
def test_message_from_file_with_class(self): unless = self.failUnless # Create a subclass class MyMessage(Message): pass fp = openfile('msg_01.txt') try: msg = email.message_from_file(fp, MyMessage) finally: fp.close() unless(isinstance(msg, MyMessage)) # Try something more complicated fp = openfile('msg_02.txt') try: msg = email.message_from_file(fp, MyMessage) finally: fp.close() for subpart in msg.walk(): unless(isinstance(subpart, MyMessage))
def test__all__(self): module = __import__('email') # Can't use sorted() here due to Python 2.3 compatibility all = module.__all__[:] all.sort() self.assertEqual(all, [ # Old names 'Charset', 'Encoders', 'Errors', 'Generator', 'Header', 'Iterators', 'MIMEAudio', 'MIMEBase', 'MIMEImage', 'MIMEMessage', 'MIMEMultipart', 'MIMENonMultipart', 'MIMEText', 'Message', 'Parser', 'Utils', 'base64MIME', # new names 'base64mime', 'charset', 'encoders', 'errors', 'generator', 'header', 'iterators', 'message', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quopriMIME', 'quoprimime', 'utils', ])
def test_message_from_file_with_class(self): # Create a subclass class MyMessage(Message): pass fp = openfile('msg_01.txt') try: msg = email.message_from_file(fp, MyMessage) finally: fp.close() self.assertIsInstance(msg, MyMessage) # Try something more complicated fp = openfile('msg_02.txt') try: msg = email.message_from_file(fp, MyMessage) finally: fp.close() for subpart in msg.walk(): self.assertIsInstance(subpart, MyMessage)
def test__all__(self): module = __import__('email') all = module.__all__ all.sort() self.assertEqual(all, [ # Old names 'Charset', 'Encoders', 'Errors', 'Generator', 'Header', 'Iterators', 'MIMEAudio', 'MIMEBase', 'MIMEImage', 'MIMEMessage', 'MIMEMultipart', 'MIMENonMultipart', 'MIMEText', 'Message', 'Parser', 'Utils', 'base64MIME', # new names 'base64mime', 'charset', 'encoders', 'errors', 'generator', 'header', 'iterators', 'message', 'message_from_file', 'message_from_string', 'mime', 'parser', 'quopriMIME', 'quoprimime', 'utils', ])
def request(self, uri, method="GET", body=None, headers=None, redirections=5): path = urlparse.urlparse(uri)[2] fname = os.path.join(HTTP_SRC_DIR, path[1:]) if not os.path.exists(fname): index = self.hit_counter.get(fname, 1) if os.path.exists(fname + "." + str(index)): self.hit_counter[fname] = index + 1 fname = fname + "." + str(index) if os.path.exists(fname): f = file(fname, "r") response = message_from_file(f) f.close() body = response.get_payload() response_headers = httplib2.Response(response) return (response_headers, body) else: return (httplib2.Response({"status": "404"}), "")
def _get_version_from_pkg_metadata(package_name): """Get the version from package metadata if present. This looks for PKG-INFO if present (for sdists), and if not looks for METADATA (for wheels) and failing that will return None. """ pkg_metadata_filenames = ['PKG-INFO', 'METADATA'] pkg_metadata = {} for filename in pkg_metadata_filenames: try: pkg_metadata_file = open(filename, 'r') except (IOError, OSError): continue try: pkg_metadata = email.message_from_file(pkg_metadata_file) except email.MessageError: continue # Check to make sure we're in our own dir if pkg_metadata.get('Name', None) != package_name: return None return pkg_metadata.get('Version', None)
def _get_version_from_pkg_metadata(package_name): """Get the version from package metadata if present. This looks for PKG-INFO if present (for sdists), and if not looks for METADATA (for wheels) and failing that will return None. """ pkg_metadata_filenames = ['PKG-INFO', 'METADATA'] pkg_metadata = {} for filename in pkg_metadata_filenames: try: pkg_metadata_file = open(filename, 'r') except (IOError, OSError): continue try: pkg_metadata = email.message_from_file(pkg_metadata_file) except email.errors.MessageError: continue # Check to make sure we're in our own dir if pkg_metadata.get('Name', None) != package_name: return None return pkg_metadata.get('Version', None)
def parse(self): f = open(self.filename, 'r') headers = Message(f) c = f.read() f.close() if not c.strip(): c = headers.get_payload() if not headers.keys(): raise Exception( "File %s has no headers" % self.filename) self.description = headers['Description'] self.expect = headers.get('Expect', '') self.ignore = headers.get('Ignore') self.options = [ o.strip() for o in headers.get('Options', '').split(',') if o.strip()] parts = bar_re.split(c) self.input = parts[0].rstrip() + '\n' if parts[1:]: self.expect = parts[1].rstrip() + '\n' else: self.expect = None