我们从Python开源项目中,提取了以下6个代码示例,用于说明如何使用email.message.add_header()。
def _get_upload_content(field_storage): """Returns an email.Message holding the values of the file transfer. It decodes the content of the field storage and creates a new email.Message. Args: field_storage: cgi.FieldStorage that represents uploaded blob. Returns: An email.message.Message holding the upload information. """ message = email.message.Message() message.add_header( 'content-transfer-encoding', field_storage.headers.getheader('Content-Transfer-Encoding', '')) message.set_payload(field_storage.file.read()) payload = message.get_payload(decode=True) return email.message_from_string(payload)
def _run_test_success(self, upload_data, upload_url): """Basic dispatcher request flow.""" request_path = urlparse.urlparse(upload_url)[2] # Get session key from upload url. session_key = upload_url.split('/')[-1] self.environ['PATH_INFO'] = request_path self.environ['CONTENT_TYPE'] = ( 'multipart/form-data; boundary="================1234=="') status, _, response_body, forward_environ, forward_body = ( self.run_dispatcher(upload_data)) self.assertEquals('200 OK', status) self.assertEquals('Forwarded successfully.', response_body) self.assertNotEquals(None, forward_environ) # These must NOT be unicode strings. self.assertIsInstance(forward_environ['PATH_INFO'], str) if 'QUERY_STRING' in forward_environ: self.assertIsInstance(forward_environ['QUERY_STRING'], str) self.assertRegexpMatches(forward_environ['CONTENT_TYPE'], r'multipart/form-data; boundary="[^"]+"') self.assertEquals(len(forward_body), int(forward_environ['CONTENT_LENGTH'])) self.assertIn(constants.FAKE_IS_ADMIN_HEADER, forward_environ) self.assertEquals('1', forward_environ[constants.FAKE_IS_ADMIN_HEADER]) new_request = email.message_from_string( 'Content-Type: %s\n\n%s' % (forward_environ['CONTENT_TYPE'], forward_body)) (upload,) = new_request.get_payload() self.assertEquals('message/external-body', upload.get_content_type()) message = email.message.Message() message.add_header('Content-Type', upload['Content-Type']) blob_key = message.get_param('blob-key') blob_contents = blobstore.BlobReader(blob_key).read() self.assertEquals('value', blob_contents) self.assertRaises(datastore_errors.EntityNotFoundError, datastore.Get, session_key) return upload, forward_environ, forward_body