我们从Python开源项目中,提取了以下31个代码示例,用于说明如何使用cgi.parse_multipart()。
def do_POST(self): try: self.send_response(301) self.send_header('Content-type', 'text/html') self.end_headers() ctype, pdict = cgi.parse_header( self.headers.getheader('content-type')) if ctype == 'multipart/form-data': fields = cgi.parse_multipart(self.rfile, pdict) messagecontent = fields.get('message') output = "" output += "<html><body>" output += " <h2> Okay, how about this: </h2>" output += "<h1> %s </h1>" % messagecontent[0] output += '''<form method='POST' enctype='multipart/form-data' action='/hello'><h2>What would you like me to say?</h2><input name="message" type="text" ><input type="submit" value="Submit"> </form>''' output += "</body></html>" self.wfile.write(output) print output except: pass
def do_POST(self): try: if self.path.endswith("/restaurants/new"): ctype, pdict = cgi.parse_header( self.headers.getheader('content-type')) if ctype == 'multipart/form-data': fields = cgi.parse_multipart(self.rfile, pdict) messagecontent = fields.get('newRestaurantName') # Create new Restaurant Object newRestaurant = Restaurant(name=messagecontent[0]) session.add(newRestaurant) session.commit() self.send_response(301) self.send_header('Content-type', 'text/html') self.send_header('Location', '/restaurants') self.end_headers() except: pass
def do_POST(self): # http://stackoverflow.com/questions/4233218/python-basehttprequesthandler-post-variables ctype, pdict = cgi.parse_header(self.headers['content-type']) if ctype == 'multipart/form-data': postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers['content-length']) postvars = cgi.parse_qs(self.rfile.read(length), keep_blank_values=1) else: postvars = {} # print(postvars) if 'Username' not in list(postvars.keys()) \ or 'Password' not in list(postvars.keys()): log('E', 'vali.', 'No credentials.') self.exit_on_error('No credentials.') return if not validate_id(postvars['Username'][0], postvars['Password'][0]): log('E', 'vali.', 'Wrong credentials.') self.exit_on_error('Wrong credentials.') return # print(postvars) try: dispatch(postvars) self.write_response({'Status': 'OK'}) except: log('E', 'hand.', 'Handler throws an exception.') self.exit_on_error('Handler throws and exception.')
def do_POST(self): try: if self.path.endswith("/edit"): ctype, pdict = cgi.parse_header( self.headers.getheader('content-type')) if ctype == 'multipart/form-data': fields = cgi.parse_multipart(self.rfile, pdict) messagecontent = fields.get('newRestaurantName') restaurantIDPath = self.path.split("/")[2] myRestaurantQuery = session.query(Restaurant).filter_by( id=restaurantIDPath).one() if myRestaurantQuery != []: myRestaurantQuery.name = messagecontent[0] session.add(myRestaurantQuery) session.commit() self.send_response(301) self.send_header('Content-type', 'text/html') self.send_header('Location', '/restaurants') self.end_headers() if self.path.endswith("/restaurants/new"): ctype, pdict = cgi.parse_header( self.headers.getheader('content-type')) if ctype == 'multipart/form-data': fields = cgi.parse_multipart(self.rfile, pdict) messagecontent = fields.get('newRestaurantName') # Create new Restaurant Object newRestaurant = Restaurant(name=messagecontent[0]) session.add(newRestaurant) session.commit() self.send_response(301) self.send_header('Content-type', 'text/html') self.send_header('Location', '/restaurants') self.end_headers() except: pass
def do_POST(self): try: refer = self.headers.getheader('Referer') netloc = urlparse.urlparse(refer).netloc if not netloc.startswith("127.0.0.1") and not netloc.startswitch("localhost"): logging.warn("web control ref:%s refuse", netloc) return except: pass logging.debug ('PHP web_control %s %s %s ', self.address_string(), self.command, self.path) try: ctype, pdict = cgi.parse_header(self.headers.getheader('content-type')) if ctype == 'multipart/form-data': self.postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers.getheader('content-length')) self.postvars = urlparse.parse_qs(self.rfile.read(length), keep_blank_values=1) else: self.postvars = {} except: self.postvars = {} path = urlparse.urlparse(self.path).path if path == "/config": return self.req_config_handler() else: self.wfile.write(b'HTTP/1.1 404\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n404 Not Found') logging.info('%s "%s %s HTTP/1.1" 404 -', self.address_string(), self.command, self.path)
def do_POST(self): xlog.debug('x-tunnel web_control %s %s %s ', self.address_string(), self.command, self.path) try: ctype, pdict = cgi.parse_header(self.headers.getheader('content-type')) if ctype == 'multipart/form-data': self.postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers.getheader('content-length')) self.postvars = urlparse.parse_qs(self.rfile.read(length), keep_blank_values=1) else: self.postvars = {} except: self.postvars = {} path = urlparse.urlparse(self.path).path if path == '/login': return self.req_login_handler() elif path == "/logout": return self.req_logout_handler() elif path == "/register": return self.req_login_handler() elif path == "/order": return self.req_order_handler() elif path == "/transfer": return self.req_transfer_handler() else: xlog.info('%s "%s %s HTTP/1.1" 404 -', self.address_string(), self.command, self.path) return self.send_not_found()
def do_POST(self): try: refer = self.headers.getheader('Referer') netloc = urlparse.urlparse(refer).netloc if not netloc.startswith("127.0.0.1") and not netloc.startswitch("localhost"): xlog.warn("web control ref:%s refuse", netloc) return except: pass xlog.debug ('GAEProxy web_control %s %s %s ', self.address_string(), self.command, self.path) try: ctype, pdict = cgi.parse_header(self.headers.getheader('content-type')) if ctype == 'multipart/form-data': self.postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers.getheader('content-length')) self.postvars = urlparse.parse_qs(self.rfile.read(length), keep_blank_values=1) else: self.postvars = {} except: self.postvars = {} path = urlparse.urlparse(self.path).path if path == '/deploy': return self.req_deploy_handler() elif path == "/config": return self.req_config_handler() elif path == "/scan_ip": return self.req_scan_ip_handler() elif path.startswith("/importip"): return self.req_importip_handler() else: self.wfile.write(b'HTTP/1.1 404\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n404 Not Found') xlog.info('%s "%s %s HTTP/1.1" 404 -', self.address_string(), self.command, self.path)
def _parse_POST(self): import cgi ctype, pdict = cgi.parse_header(self.headers.getheader('content-type')) if ctype == 'multipart/form-data': postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers.getheader('content-length')) postvars = cgi.parse_qs( self.rfile.read(length), keep_blank_values=1) else: postvars = {} return postvars # -------------- POST handler: where the magic happens --------------
def parse_post_vars(self): ctype, pdict = cgi.parse_header(self.headers['content-type']) if ctype == 'multipart/form-data': postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers['content-length']) postvars = parse_qs(self.rfile.read(length), keep_blank_values=1) else: postvars = {} return postvars # ugly dragons, turn back
def test_parse_multipart(self): fp = BytesIO(POSTDATA.encode('latin1')) env = {'boundary': BOUNDARY.encode('latin1'), 'CONTENT-LENGTH': '558'} result = cgi.parse_multipart(fp, env) expected = {'submit': [b' Add '], 'id': [b'1234'], 'file': [b'Testing 123.\n'], 'title': [b'']} self.assertEqual(result, expected)
def _parse_request_body(self): """Parse request body in order to extract arguments. This method recognizes both `multipart/form-data` and `application/x-www-form-urlencoded` encoded data. So the client can check how the tested Nginx behaves with different kinds of params. It's based on: http://stackoverflow.com/a/4233452 Returns: It returns a dictionary that contains all the parsed data. In case when body did not contain any arguments - an empty dict is returned. """ if 'content-type' not in self.headers: return {} ctype, pdict = parse_header(self.headers['content-type']) if ctype == 'multipart/form-data': postvars = parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': # This should work (TM) basing on HTML5 spec: # Which default character encoding to use can only be determined # on a case-by-case basis, but generally the best character # encoding to use as a default is the one that was used to # encode the page on which the form used to create the payload # was itself found. In the absence of a better default, # UTF-8 is suggested. length = int(self.headers['content-length']) post_data = self.rfile.read(length).decode('utf-8') postvars = parse_qs(post_data, keep_blank_values=1, encoding="utf-8", errors="strict", ) else: postvars = {} return postvars
def do_POST(self) -> None: """ Handler POST requests """ try: action_info = self._resolver.resolve(self.path,'POST') ctype, pdict = cgi.parse_header(self.headers.get_content_type()) if ctype == 'multipart/form-data': postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers.get('Content-Length')) postvars = cgi.parse_qs(self.rfile.read(length), keep_blank_values=1) else: postvars = {} if(action_info is not None): verb, handler, params = action_info decoded_str = self._decode_results(postvars) results = handler(decoded_str) response = self._prepare_json_result(results) self._send_response(*response) except KeyError as e: self._send_response({}, HTTPStatus.NOT_FOUND)
def parse_multipart(body, params): return cgi.parse_multipart( BytesIO(body), { key: value.encode('ascii') for key, value in params.items() } )
def _parseHeader(line): # cgi.parse_header requires a str key, pdict = cgi.parse_header(line.decode('charmap')) # We want the key as bytes, and cgi.parse_multipart (which consumes # pdict) expects a dict of str keys but bytes values key = key.encode('charmap') pdict = {x:y.encode('charmap') for x, y in pdict.items()} return (key, pdict)
def do_POST(self): if self.check_authorization() == False: self.send_content(401, 'text/plain', "Authorization Required", [ ('WWW-Authenticate', 'Basic realm="Secure Area"') ]) return # empty post doesn't provide a content-type. ctype = None try: (ctype, pdict) = cgi.parse_header(self.headers['content-type']) except: pass if ctype == 'multipart/form-data': postvars = cgi.FieldStorage(fp=self.rfile, headers=self.headers, environ={ 'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': self.headers['Content-Type'] }) #postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers['content-length']) postvars = parse_qs(self.rfile.read(length).decode('utf-8'), keep_blank_values=True) else: postvars = {} request = Request('POST', self.path, postvars, self.admin_access, self.headers) try: response = self.server.handle_post(request) except: obplayer.Log.log(traceback.format_exc(), 'error') #if 'content-type' in self.headers and self.headers['content-type'] == 'application/json': # response = { 'status': False, 'error': "Internal Server Error: " + traceback.format_exc() } #else: response = Response(500, 'text/plain', traceback.format_exc()) if type(response) == Response: self.send_content(response.status, response.mimetype, response.content, headers=response.headers) else: self.send_content(200, 'application/json', json.dumps(response))
def do_POST(self): global rootnode try: ctype, pdict = cgi.parse_header(self.headers.getheader('content-type')) if ctype == 'multipart/form-data': query=cgi.parse_multipart(self.rfile, pdict) self.send_response(200) self.end_headers() upfilecontent = query.get('upfile') print "filecontent", upfilecontent[0] self.wfile.write("<HTML>POST OK.<BR><BR>"); self.wfile.write(upfilecontent[0]); except : pass
def requestReceived(self, command, path, version): """Called by channel when all data has been received. This method is not intended for users. """ self.content.seek(0,0) self.args = {} self.stack = [] self.method, self.uri = command, path self.clientproto = version x = self.uri.split('?', 1) if len(x) == 1: self.path = self.uri else: self.path, argstring = x self.args = parse_qs(argstring, 1) # cache the client and server information, we'll need this later to be # serialized and sent with the request so CGIs will work remotely self.client = self.channel.transport.getPeer() self.host = self.channel.transport.getHost() # Argument processing args = self.args ctype = self.getHeader('content-type') if self.method == "POST" and ctype: mfd = 'multipart/form-data' key, pdict = cgi.parse_header(ctype) if key == 'application/x-www-form-urlencoded': args.update(parse_qs(self.content.read(), 1)) elif key == mfd: try: args.update(cgi.parse_multipart(self.content, pdict)) except KeyError, e: if e.args[0] == 'content-disposition': # Parse_multipart can't cope with missing # content-dispostion headers in multipart/form-data # parts, so we catch the exception and tell the client # it was a bad request. self.channel.transport.write( "HTTP/1.1 400 Bad Request\r\n\r\n") self.channel.transport.loseConnection() return raise self.process()
def do_POST(self): try: if self.path.endswith("/delete"): restaurantIDPath = self.path.split("/")[2] myRestaurantQuery = session.query(Restaurant).filter_by( id=restaurantIDPath).one() if myRestaurantQuery: session.delete(myRestaurantQuery) session.commit() self.send_response(301) self.send_header('Content-type', 'text/html') self.send_header('Location', '/restaurants') self.end_headers() if self.path.endswith("/edit"): ctype, pdict = cgi.parse_header( self.headers.getheader('content-type')) if ctype == 'multipart/form-data': fields = cgi.parse_multipart(self.rfile, pdict) messagecontent = fields.get('newRestaurantName') restaurantIDPath = self.path.split("/")[2] myRestaurantQuery = session.query(Restaurant).filter_by( id=restaurantIDPath).one() if myRestaurantQuery != []: myRestaurantQuery.name = messagecontent[0] session.add(myRestaurantQuery) session.commit() self.send_response(301) self.send_header('Content-type', 'text/html') self.send_header('Location', '/restaurants') self.end_headers() if self.path.endswith("/restaurants/new"): ctype, pdict = cgi.parse_header( self.headers.getheader('content-type')) if ctype == 'multipart/form-data': fields = cgi.parse_multipart(self.rfile, pdict) messagecontent = fields.get('newRestaurantName') # Create new Restaurant Object newRestaurant = Restaurant(name=messagecontent[0]) session.add(newRestaurant) session.commit() self.send_response(301) self.send_header('Content-type', 'text/html') self.send_header('Location', '/restaurants') self.end_headers() except: pass
def do_POST(self): #print(self.path) #idata = web.input() #print(idata) logging.debug('POST %s' % (self.path)) ctype, pdict = cgi.parse_header(self.headers['content-type']) postvars = {} if ctype == 'multipart/form-data': pdict['boundary']=bytes(pdict['boundary'],'utf-8') postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers['content-length']) postvars = cgi.parse_qs(self.rfile.read(length), keep_blank_values=1) else: pdict['boundary']=bytes(pdict['boundary'],'utf-8') postvars = cgi.parse_multipart(self.rfile, pdict) logging.debug('TYPE %s' % (ctype)) logging.debug('PATH %s' % (self.path)) logging.debug('ARGS %d' % (len(postvars))) string = "" npath = self.path npath.rstrip(' ') npath.rstrip('/') #print(npath) string += npath+"?" #print(string) #postvars = postvars.decode('utf-8') if len(postvars): i = 0 for key in postvars: logging.debug('ARG[%d] %s=%s' % (i, key, postvars[key])) #print(key,'=',postvars[key]) stt = postvars[key] stt = str(stt) #stt = stt.decode('utf-8') #key = key.decode('utf-8') xx = stt.find("'") stt = stt[xx+1:] #print(stt) xx = stt.find("'") stt = stt[:xx] #key = str(key) #xx = key.find("'") #key = key[xx+1:] #xx = key.find("'") #key = key[:xx] string +=str(key)+'='+str(stt)+'&' i += 1 #print(string) string = string.rstrip('&') #print(string) #print(postvars) self.path = string #print(self.path) return self.do_GET() self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers()
def do_POST(self): """HTTP POST handler. CGI "scripts" handled here.""" ctype, pdict = cgi.parse_header(self.headers.getheader('content-type')) try: if ctype == 'multipart/form-data': postvars = cgi.parse_multipart(self.rfile, pdict) elif ctype == 'application/x-www-form-urlencoded': length = int(self.headers.getheader('content-length')) postvars = cgi.parse_qs(self.rfile.read(length), keep_blank_values=1) else: postvars = {} except: postvars = {} if (self.path.startswith("/key.cgi") or self.path.startswith("/spakey.cgi") or self.path.startswith("/spabinary.cgi") or self.path.startswith("/screen.cgi") or self.path.startswith("/spascreen.cgi") or self.path.startswith("/status.cgi") or self.path.startswith("/spastatus.cgi")): mimetype = 'text/html' ret = "" if self.path.startswith("/key.cgi"): if 'key' in postvars: key = postvars['key'][0] self.screen.sendKey(key) ret = "<html><head><title>key</title></head><body>"+key+"</body></html>\n" elif self.path.startswith("/spakey.cgi"): if 'key' in postvars: key = postvars['key'][0] self.spa.sendKey(key) # print "SPA - key "+key ret = "<html><head><title>key</title></head><body>"+key+"</body></html>\n" elif self.path.startswith("/spabinary.cgi"): ret = self.spa.text() + "|" + time.strftime("%_I:%M%P %_m/%d") + "|" if (self.spa.status['spa']=="ON"): ret += "1" else: ret += "0" if (self.spa.status['heat']=="ON"): ret += "1" else: ret += "0" if (self.spa.status['jets']=="ON"): ret += "1" else: ret += "0" elif self.path.startswith("/screen.cgi"): ret = self.screen.html() elif self.path.startswith("/spascreen.cgi"): ret = self.spa.html() elif self.path.startswith("/status.cgi"): ret = self.screen.status elif self.path.startswith("/spastatus.cgi"): ret = self.spa.status self.send_response(200) self.send_header('Content-Type', mimetype) self.end_headers() self.wfile.write(ret) else: self.send_error(404, 'File Not Found: %s' % self.path)