我们从Python开源项目中,提取了以下49个代码示例,用于说明如何使用cgi.FieldStorage()。
def parse_multipart(fp, ctype, clength, encoding): """ Parse multipart/form-data request. Returns a tuple (form, files). """ fs = FieldStorage( fp=fp, environ=MULTIPART_ENVIRON, headers={ 'content-type': ctype, 'content-length': clength }, keep_blank_values=True ) form = {} files = {} for f in fs.list: if f.filename: files.setdefault(f.name, []).append(f) else: form.setdefault(f.name, []).append(f.value) return form, files
def test_app(environ, start_response): """Probably not the most efficient example.""" import cgi start_response('200 OK', [('Content-Type', 'text/html')]) yield '<html><head><title>Hello World!</title></head>\n' \ '<body>\n' \ '<p>Hello World!</p>\n' \ '<table border="1">' names = environ.keys() names.sort() for name in names: yield '<tr><td>%s</td><td>%s</td></tr>\n' % ( name, cgi.escape(`environ[name]`)) form = cgi.FieldStorage(fp=environ['wsgi.input'], environ=environ, keep_blank_values=1) if form.list: yield '<tr><th colspan="2">Form data</th></tr>' for field in form.list: yield '<tr><td>%s</td><td>%s</td></tr>\n' % ( field.name, field.value) yield '</table>\n' \ '</body></html>\n'
def test_cgi_field_storage(self): # encode a multipart form content_type, body, content_length = encode_multipart_data(files=dict(cat=self.cat_jpeg)) environ = { 'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': content_type, 'CONTENT_LENGTH': content_length } storage = cgi.FieldStorage(body, environ=environ) descriptor = AttachableDescriptor(storage['cat']) self.assertIsInstance(descriptor, CgiFieldStorageDescriptor) self.assertEqual(descriptor.content_type, 'image/jpeg') self.assertEqual(descriptor.original_filename, split(self.cat_jpeg)[1]) buffer = io.BytesIO() copy_stream(descriptor, buffer) buffer.seek(0) self.assertEqual(md5sum(buffer), md5sum(self.cat_jpeg))
def main(): print HTML_HEADER print HEAD data = cgi.FieldStorage() fileds = data['file'] if fileds.filename.endswith('.jpg') or fileds.filename.endswith('.png') or fileds.filename.endswith('.jpeg') or fileds.filename.endswith('.tiff') and fileds.filename.count('/') == -1: os.chdir('files') with open(fileds.filename, 'wb') as fout: shutil.copyfileobj(fileds.file, fout, 100000) os.chdir('../') # do NOT touch above code if fileds.filename.endswith('.png'): print lsb.reveal("files/"+fileds.filename) if fileds.filename.endswith('.jpg') or fileds.filename.endswith('.jpeg'): print exifHeader.reveal("files/"+fileds.filename) print "<p>Attempted to decode.</p>" print END
def main(): print HTML_HEADER print HEAD data = cgi.FieldStorage() fileds = data['file'] if fileds.filename.endswith('.jpg') or fileds.filename.endswith('.png') or fileds.filename.endswith('.jpeg') or fileds.filename.endswith('.tiff') and fileds.filename.count('/') == -1: os.chdir('files') with open(fileds.filename, 'wb') as fout: shutil.copyfileobj(fileds.file, fout, 100000) os.chdir('../') # do NOT touch above code if fileds.filename.endswith('.png'): sec = lsb.hide('files/'+fileds.filename, data['message'].value) sec.save('files/'+fileds.filename) if fileds.filename.endswith('.jpg') or fileds.filename.endswith('.jpeg'): secret = exifHeader.hide('files/'+fileds.filename, 'files/'+fileds.filename, secret_message=data['message'].value) print "Successfully generated." print '<a href="http://jonathanwong.koding.io/bstego/files/'+fileds.filename+'">Link here</a>' print END
def transcode_fs(self, fs, content_type): # transcode FieldStorage if PY3: # pragma: no cover decode = lambda b: b else: decode = lambda b: b.decode(self.charset, self.errors) data = [] for field in fs.list or (): field.name = decode(field.name) if field.filename: field.filename = decode(field.filename) data.append((field.name, field)) else: data.append((field.name, decode(field.value))) # TODO: transcode big requests to temp file content_type, fout = _encode_multipart( data, content_type, fout=io.BytesIO() ) return fout # TODO: remove in 1.4
def do_test(buf, method): env = {} if method == "GET": fp = None env['REQUEST_METHOD'] = 'GET' env['QUERY_STRING'] = buf elif method == "POST": fp = BytesIO(buf.encode('latin-1')) # FieldStorage expects bytes env['REQUEST_METHOD'] = 'POST' env['CONTENT_TYPE'] = 'application/x-www-form-urlencoded' env['CONTENT_LENGTH'] = str(len(buf)) else: raise ValueError("unknown method: %s" % method) try: return cgi.parse(fp, env, strict_parsing=1) except Exception as err: return ComparableException(err)
def test_strict(self): for orig, expect in parse_strict_test_cases: # Test basic parsing d = do_test(orig, "GET") self.assertEqual(d, expect, "Error parsing %s method GET" % repr(orig)) d = do_test(orig, "POST") self.assertEqual(d, expect, "Error parsing %s method POST" % repr(orig)) env = {'QUERY_STRING': orig} fs = cgi.FieldStorage(environ=env) if isinstance(expect, dict): # test dict interface self.assertEqual(len(expect), len(fs)) self.assertCountEqual(expect.keys(), fs.keys()) ##self.assertEqual(norm(expect.values()), norm(fs.values())) ##self.assertEqual(norm(expect.items()), norm(fs.items())) self.assertEqual(fs.getvalue("nonexistent field", "default"), "default") # test individual fields for key in expect.keys(): expect_val = expect[key] self.assertIn(key, fs) if len(expect_val) > 1: self.assertEqual(fs.getvalue(key), expect_val) else: self.assertEqual(fs.getvalue(key), expect_val[0])
def test_fieldstorage_multipart(self): #Test basic FieldStorage multipart parsing env = { 'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY), 'CONTENT_LENGTH': '558'} fp = BytesIO(POSTDATA.encode('latin-1')) fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1") self.assertEqual(len(fs.list), 4) expect = [{'name':'id', 'filename':None, 'value':'1234'}, {'name':'title', 'filename':None, 'value':''}, {'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'}, {'name':'submit', 'filename':None, 'value':' Add '}] for x in range(len(fs.list)): for k, exp in expect[x].items(): got = getattr(fs.list[x], k) self.assertEqual(got, exp)
def files(self): """ File uploads parsed from an `url-encoded` or `multipart/form-data` encoded POST or PUT request body. The values are instances of :class:`cgi.FieldStorage`. The most important attributes are: filename The filename, if specified; otherwise None; this is the client side filename, *not* the file name on which it is stored (that's a temporary file you don't deal with) file The file(-like) object from which you can read the data. value The value as a *string*; for file uploads, this transparently reads the file every time you request the value. Do not do this on big files. """ files = FormsDict() for name, item in self.POST.iterallitems(): if hasattr(item, 'filename'): files[name] = item return files
def POST(self): """ The values of :attr:`forms` and :attr:`files` combined into a single :class:`FormsDict`. Values are either strings (form values) or instances of :class:`cgi.FieldStorage` (file uploads). """ post = FormsDict() safe_env = {'QUERY_STRING':''} # Build a safe environment for cgi for key in ('REQUEST_METHOD', 'CONTENT_TYPE', 'CONTENT_LENGTH'): if key in self.environ: safe_env[key] = self.environ[key] if NCTextIOWrapper: fb = NCTextIOWrapper(self.body, encoding='ISO-8859-1', newline='\n') else: fb = self.body data = cgi.FieldStorage(fp=fb, environ=safe_env, keep_blank_values=True) for item in data.list or []: post[item.name] = item if item.filename else item.value return post
def test_fieldstorage_multipart_w3c(self): # Test basic FieldStorage multipart parsing (W3C sample) env = { 'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY_W3), 'CONTENT_LENGTH': str(len(POSTDATA_W3))} fp = BytesIO(POSTDATA_W3.encode('latin-1')) fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1") self.assertEqual(len(fs.list), 2) self.assertEqual(fs.list[0].name, 'submit-name') self.assertEqual(fs.list[0].value, 'Larry') self.assertEqual(fs.list[1].name, 'files') files = fs.list[1].value self.assertEqual(len(files), 2) expect = [{'name': None, 'filename': 'file1.txt', 'value': b'... contents of file1.txt ...'}, {'name': None, 'filename': 'file2.gif', 'value': b'...contents of file2.gif...'}] for x in range(len(files)): for k, exp in expect[x].items(): got = getattr(files[x], k) self.assertEqual(got, exp)
def main(): print("Content-Type: text/html") print() print('''<meta http-equiv="Content-type" content="text/html;charset=UTF-8">''') form = cgi.FieldStorage() reqid = form.getfirst('reqid', '').strip() if not check(reqid): return if not sendmail(reqid): return print('???????????????????????????????')
def main(): print("Content-Type: text/html") print() print('''<meta http-equiv="Content-type" content="text/html;charset=UTF-8">''') if os.environ['REQUEST_METHOD'] != 'POST': return form = cgi.FieldStorage() ret = check(form) if not ret: return mailaddr, title, desc = ret filename = save(mailaddr, title, desc) url = 'http://www.python.jp/cgi-bin/confirm-news.py?reqid=%s' % filename if not sendmail(mailaddr, title, desc, url): return print('????????????????????????????????')
def main(): print("Content-Type: text/html") print() print('''<meta http-equiv="Content-type" content="text/html;charset=UTF-8">''') if os.environ['REQUEST_METHOD'] != 'POST': return form = cgi.FieldStorage() ret = check(form) if not ret: return mailaddr, title, url, date, desc = ret filename = save(mailaddr, title, url, date, desc) conf_url = 'http://www.python.jp/cgi-bin/confirm-connpass-event.py?reqid=%s' % filename if not sendmail(mailaddr, title, url, date, desc, conf_url): return print('????????????????????????????????')
def main(): print("Content-Type: text/html") print() print('''<meta http-equiv="Content-type" content="text/html;charset=UTF-8">''') if os.environ['REQUEST_METHOD'] != 'POST': return form = cgi.FieldStorage() ret = check(form) if not ret: return mailaddr, name, url, date, desc = ret filename = save(mailaddr, name, url, date, desc) conf_url = 'http://www.python.jp/cgi-bin/confirm-jobboard.py?reqid=%s' % filename if not sendmail(mailaddr, name, url, date, desc, conf_url): return print('????????????????????????????????')
def do_POST(self): form = cgi.FieldStorage( fp=self.rfile, headers=self.headers, environ={"REQUEST_METHOD":"POST", "CONTENT_TYPE":self.headers["Content-Type"] }) if "text" in form: text = form["text"].value.decode("utf-8") if len(text) <= 10000: print(u"Document text: {}".format(text)) example = make_predictions(text, self.model) print_predictions(example) self.send_response(200) self.send_header("Content-Type", "application/json") self.end_headers() self.wfile.write(json.dumps(example)) return self.send_response(400) self.send_header("Content-Type", "application/json") self.end_headers()
def getUploads(self, field_name = None): """ Get uploads sent to this handler. Cheeky borrowed from blobstore_handlers.py - © 2007 Google Inc. Args: field_name: Only select uploads that were sent as a specific field. Returns: A list of BlobInfo records corresponding to each upload. Empty list if there are no blob-info records for field_name. """ uploads = collections.defaultdict(list) for key, value in request.current.get().request.params.items(): if isinstance(value, cgi.FieldStorage): if "blob-key" in value.type_options: uploads[key].append(blobstore.parse_blob_info(value)) if field_name: return list(uploads.get(field_name, [])) results = [] for uploads in uploads.itervalues(): results.extend(uploads) return results
def getUploads(self, field_name=None): """ Get uploads sent to this handler. Cheeky borrowed from blobstore_handlers.py - © 2007 Google Inc. Args: field_name: Only select uploads that were sent as a specific field. Returns: A list of BlobInfo records corresponding to each upload. Empty list if there are no blob-info records for field_name. """ uploads = collections.defaultdict(list) for key, value in request.current.get().request.params.items(): if isinstance(value, cgi.FieldStorage): if 'blob-key' in value.type_options: uploads[key].append(blobstore.parse_blob_info(value)) if field_name: return list(uploads.get(field_name, [])) else: results = [] for uploads in uploads.itervalues(): results.extend(uploads) return results
def process(self): """Returns information about a commit""" form = cgi.FieldStorage() commit = self.read_commit(form) print("Content-Type: text/plain; charset='utf-8'\r") print("Cache-Control: max-age=60\r") if form.getfirst("download", "false") == "true": print("Content-Disposition: attachment; filename=\"patch.txt\"\r") print("\r") print("#" + json.dumps(PostsaiCommitViewer.format_commit_header(commit), default=convert_to_builtin_type)) sys.stdout.flush() PostsaiCommitViewer.dump_commit_diff(commit)
def test_fieldstorage_multipart_leading_whitespace(self): env = { 'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY), 'CONTENT_LENGTH': '560'} # Add some leading whitespace to our post data that will cause the # first line to not be the innerboundary. fp = BytesIO(b"\r\n" + POSTDATA.encode('latin-1')) fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1") self.assertEqual(len(fs.list), 4) expect = [{'name':'id', 'filename':None, 'value':'1234'}, {'name':'title', 'filename':None, 'value':''}, {'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'}, {'name':'submit', 'filename':None, 'value':' Add '}] for x in range(len(fs.list)): for k, exp in expect[x].items(): got = getattr(fs.list[x], k) self.assertEqual(got, exp)
def test_fieldstorage_part_content_length(self): BOUNDARY = "JfISa01" POSTDATA = """--JfISa01 Content-Disposition: form-data; name="submit-name" Content-Length: 5 Larry --JfISa01""" env = { 'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY), 'CONTENT_LENGTH': str(len(POSTDATA))} fp = BytesIO(POSTDATA.encode('latin-1')) fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1") self.assertEqual(len(fs.list), 1) self.assertEqual(fs.list[0].name, 'submit-name') self.assertEqual(fs.list[0].value, 'Larry')
def get_uploads(self, field_name=None): """Get uploads sent to this handler. Args: field_name: Only select uploads that were sent as a specific field. Returns: A list of BlobInfo records corresponding to each upload. Empty list if there are no blob-info records for field_name. """ if self.__uploads is None: self.__uploads = collections.defaultdict(list) for key, value in self.request.params.items(): if isinstance(value, cgi.FieldStorage): if 'blob-key' in value.type_options: self.__uploads[key].append(blobstore.parse_blob_info(value)) if field_name: return list(self.__uploads.get(field_name, [])) else: results = [] for uploads in self.__uploads.itervalues(): results.extend(uploads) return results
def get_file_infos(self, field_name=None): """Get the file infos associated to the uploads sent to this handler. Args: field_name: Only select uploads that were sent as a specific field. Specify None to select all the uploads. Returns: A list of FileInfo records corresponding to each upload. Empty list if there are no FileInfo records for field_name. """ if self.__file_infos is None: self.__file_infos = collections.defaultdict(list) for key, value in self.request.params.items(): if isinstance(value, cgi.FieldStorage): if 'blob-key' in value.type_options: self.__file_infos[key].append(blobstore.parse_file_info(value)) if field_name: return list(self.__file_infos.get(field_name, [])) else: results = [] for uploads in self.__file_infos.itervalues(): results.extend(uploads) return results
def test_file_request(self): import cgi # fill in a file upload form... form = self.make_form() form["user"] = "john" data_control = form.find_control("data") data = "blah\nbaz\n" data_control.add_file(StringIO(data)) # print "data_control._upload_data", data_control._upload_data req = form.click() self.assertTrue( get_header(req, "Content-type").startswith( "multipart/form-data; boundary=")) # print "req.get_data()\n>>%s<<" % req.get_data() # ...and check the resulting request is understood by cgi module fs = cgi.FieldStorage( StringIO(req.get_data()), CaseInsensitiveDict(header_items(req)), environ={"REQUEST_METHOD": "POST"}) self.assert_(fs["user"].value == "john") self.assert_(fs["data"].value == data) self.assertEquals(fs["data"].filename, "")
def test_file_request_with_filename(self): import cgi # fill in a file upload form... form = self.make_form() form["user"] = "john" data_control = form.find_control("data") data = "blah\nbaz\n" data_control.add_file(StringIO(data), filename="afilename") req = form.click() self.assert_( get_header(req, "Content-type").startswith( "multipart/form-data; boundary=")) # ...and check the resulting request is understood by cgi module fs = cgi.FieldStorage( StringIO(req.get_data()), CaseInsensitiveDict(header_items(req)), environ={"REQUEST_METHOD": "POST"}) self.assert_(fs["user"].value == "john") self.assert_(fs["data"].value == data) self.assert_(fs["data"].filename == "afilename")