我们从Python开源项目中,提取了以下28个代码示例,用于说明如何使用requests.options()。
def authorization(self,controller): headers = {'token': self.token} p = requests.options(self.base+"%s/" %(controller), headers=headers) auth_json = json.loads(p.text) if p.status_code != 200: logging.error("phpipam.authorization: Failure %s" %(p.status_code)) logging.error(auth_json) self.error = p.status_code self.error_message=auth_json['message'] return self.error,self.error_message if not auth_json['success']: logging.error("phpipam.authorization: FAILURE: %s" %(auth_json['code'])) self.error=auth_json['code'] return self.error logging.info("phpipam.authorization: success %s" %(auth_json['success'])) return auth_json['data']['methods']
def show_options(self, url, view): """Send options request to `url` and display results in pop-up. """ res = options(url, timeout=3) if not res.ok: return names = ['Allow', 'Access-Control-Allow-Methods', 'Access-Control-Max-Age'] headers = [res.headers.get(name, None) for name in names] items = '\n'.join('<li>{}: {}</li>'.format(n, h) for n, h in zip(names, headers) if h) content = '<h2>OPTIONS: {}</h2>\n<ul>{}</ul>'.format(url, items) try: json_dict = res.json() except: pass else: content = '{}\n<pre><code>{}</pre></code>'.format( content, json.dumps(json_dict, sort_keys=True, indent=2, separators=(',', ': ')) ) view.show_popup(content, max_width=700, max_height=500)
def test_can_support_custom_cors(smoke_test_app): response = requests.get(smoke_test_app.url + '/custom_cors') response.raise_for_status() expected_allow_origin = 'https://foo.example.com' assert response.headers[ 'Access-Control-Allow-Origin'] == expected_allow_origin # Should also have injected an OPTIONs request. response = requests.options(smoke_test_app.url + '/custom_cors') response.raise_for_status() headers = response.headers assert headers['Access-Control-Allow-Origin'] == expected_allow_origin assert headers['Access-Control-Allow-Headers'] == ( 'Authorization,Content-Type,X-Amz-Date,X-Amz-Security-Token,' 'X-Api-Key,X-Special-Header') _assert_contains_access_control_allow_methods( headers, ['GET', 'POST', 'PUT', 'OPTIONS']) assert headers['Access-Control-Max-Age'] == '600' assert headers['Access-Control-Expose-Headers'] == 'X-Special-Header' assert headers['Access-Control-Allow-Credentials'] == 'true'
def send_http_request_used_exec(self, url, method, request_body="", header="", cookie=None): if method not in ["get", "post", "put", "delete", "head", "options"]: raise Exception("Not supported method: %s" % method) _cookie_obj = cookie _response = None if header is not "": _request_api_string = "_response = requests.%s(%s, data=%s, header=%s, _cookie_obj)" % (method, url, request_body, header) else: _request_api_string = "_response = requests.%s(%s, data=%s, _cookie_obj)" % (method, url, request_body) exec _request_api_string return _response
def test_cors_preflight_allowed(self): """Test cross origin resource sharing preflight (OPTIONS) request.""" headers = { const.HTTP_HEADER_ORIGIN: HTTP_BASE_URL, 'Access-Control-Request-Method': 'GET', 'Access-Control-Request-Headers': 'x-ha-access' } req = requests.options(_url(const.URL_API), headers=headers) allow_origin = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_ORIGIN allow_headers = const.HTTP_HEADER_ACCESS_CONTROL_ALLOW_HEADERS assert req.status_code == 200 assert req.headers.get(allow_origin) == HTTP_BASE_URL assert req.headers.get(allow_headers) == \ const.HTTP_HEADER_HA_AUTH.upper()
def exploit(request, response, method, key, is_array=False): if config.dbconn().fetch_rows('result', condition="exploit='%s' and result != 'continue' and `host`='%s'" % (os.path.basename(__file__)[:-3], request['host']), order="id asc", limit="1", fetchone=True): return allow = requests.options(request['uri']).headers.get('Allow', '') if allow.find('PUT') != -1 or allow.find('PATCH') != -1: return {'result': 'vul', 'info': "Server support put/patch method", 'hash': None, 'level': "middle"} else: return {'result': 'safe', 'info': "Server does not support put/patch method", 'hash': None, 'level': "middle"}
def test_should_respond_to_any_options(self): server.on('OPTIONS').status(200) res = requests.options('http://localhost:8080/some/url', headers={'foo': 'bar'}) self.assertEqual(res.status_code, 200)
def test_should_always_respond_to_matching_queries(self): server.always('OPTIONS').status(200) res = requests.options('http://localhost:8080/some/url', headers={'foo': 'bar'}) self.assertEqual(res.status_code, 200) res = requests.options('http://localhost:8080/some/url', headers={'foo': 'bar'}) self.assertEqual(res.status_code, 200)
def test_should_reset_always_rules(self): server.always('OPTIONS').status(200) server.reset() res = requests.options('http://localhost:8080/some/url', headers={'foo': 'bar'}) self.assertEqual(res.status_code, 500)
def __accept_post_options(self, inbox, **kwargs): r = requests.options(inbox, **kwargs) if r.status_code == requests.codes.ok and 'accept-post' in r.headers: if self.JSON_LD in r.headers['accept-post']: return self.JSON_LD for content_type in r.headers['accept-post'].split(','): return self.content_type_to_mime_type(content_type)
def test_can_accept_options_request(config, sample_app, local_server_factory): local_server, port = local_server_factory(sample_app, config) response = local_server.make_call(requests.options, '/test-cors', port) assert response.headers['Content-Length'] == '0' assert response.headers['Access-Control-Allow-Methods'] == 'POST,OPTIONS' assert response.text == ''
def test_can_accept_multiple_options_request(config, sample_app, local_server_factory): local_server, port = local_server_factory(sample_app, config) response = local_server.make_call(requests.options, '/test-cors', port) assert response.headers['Content-Length'] == '0' assert response.headers['Access-Control-Allow-Methods'] == 'POST,OPTIONS' assert response.text == '' response = local_server.make_call(requests.options, '/test-cors', port) assert response.headers['Content-Length'] == '0' assert response.headers['Access-Control-Allow-Methods'] == 'POST,OPTIONS' assert response.text == ''
def test_can_support_cors(smoke_test_app): response = requests.get(smoke_test_app.url + '/cors') response.raise_for_status() assert response.headers['Access-Control-Allow-Origin'] == '*' # Should also have injected an OPTIONs request. response = requests.options(smoke_test_app.url + '/cors') response.raise_for_status() headers = response.headers assert headers['Access-Control-Allow-Origin'] == '*' assert headers['Access-Control-Allow-Headers'] == ( 'Authorization,Content-Type,X-Amz-Date,X-Amz-Security-Token,' 'X-Api-Key') _assert_contains_access_control_allow_methods( headers, ['GET', 'POST', 'PUT', 'OPTIONS'])
def is_cross_origin_accessible(path, origin='http://example.org'): """ Returns True if the path is accessible at the given origin (default: example.org). Raises AssertionError otherwise. """ response = requests.options(path, headers={'Origin': origin}) assert response.status_code == 200 assert is_allowed_origin(origin, response) return True
def do_options(self): logger.debug(self.neuron_name + " do_options method called") r = requests.options(url=self.url, **self.parameters) self.post_processing_request(r)
def test_OPTIONS_CORS_headers_valid_origin(self): # before sending a POST, the browser will send an OPTION request as a preflight to see the CORS headers. # the backend will only return the required CORS headers, if the Origin is set to a allowed domain. post_payload = create_post_payload() valid_origin = 'http://testdomain.com' preflight_response = requests.options(url=COMMENT_SIDECAR_URL, json=post_payload, headers={'Origin': valid_origin}) assert_cors_headers_exists(preflight_response, valid_origin) assert_that(preflight_response.text).is_empty() assert_that(get_comments().json())\ .described_as("No comment should have been created after an OPTIONS request")\ .is_empty()
def test_OPTIONS_CORS_headers_invalid_origin(self): post_payload = create_post_payload() valid_origin = 'http://invalid.com' preflight_response = requests.options(url=COMMENT_SIDECAR_URL, json=post_payload, headers={'Origin': valid_origin}) assert_cors_headers_doesnt_exists(preflight_response) assert_that(preflight_response.text).is_empty() assert_that(get_comments().json()) \ .described_as("No comment should have been created after an OPTIONS request") \ .is_empty()
def get(env) -> "(url -- response_object)": "Returns a request object that represents a OPTIONS request to the given url" url = env.stack.pop().val req = wrap_req(requests.options(url)) env.stack.push(req)
def get_site_method(self): try: req_headers = dict(requests.options('http://'+self.url).headers) if req_headers.has_key('Allow'):return {"Method":req_headers["Allow"]} else:return {"Method":"NO Information"} except:return {"Method":"NO Information"}
def requests_method(self): # ???????????????????? httpmthdcmbbx = ui.httpmthdcmbbx httpmthd_1 = httpmthdcmbbx.currentText() httpmthdlnedt = ui.httpmthdlnedt httpmthd_2 = httpmthdlnedt.text() dlwthrspnscmbbx = ui.dlwthrspnscmbbx httpmthd_3 = dlwthrspnscmbbx.currentText() global r # ????? if httpmthd_1 == "get": r = requests.get(self.url, httpmthd_2) elif httpmthd_1 == "post": r = requests.post(self.url, httpmthd_2) elif httpmthd_1 == "put": r = requests.put(self.url, httpmthd_2) elif httpmthd_1 == "delete": r = requests.delete(self.url, httpmthd_2) elif httpmthd_1 == "head": r = requests.head(self.url) else: r = requests.options(self.url, httpmthd_2) # ??????? if httpmthd_3 == "????????": print(r.text) elif httpmthd_3 == "?????????": print(r.content) elif httpmthd_3 == "??json????": print(r.json) else: pass messages.append(r.text) ui.showmssg.setText('\n'.join(messages)) print(httpmthd_1) print(httpmthd_2)
def test_preflight(testserver): r = requests.options(testserver) assert r.status_code == 200 assert r.headers.get('Access-Control-Allow-Origin') == '*'
def options(self, uri, **kwargs): # pragma: no cover if 'timeout' not in kwargs: kwargs['timeout'] = self._timeout() try: url = self._base_url() + uri self._log("OPTIONS", url, **kwargs) return requests.options(url, **kwargs) except rexc.Timeout as e: # pragma: no cover raise TimeoutException(-1, e) except rexc.ConnectionError as e: # pragma: no cover raise ApiUnavailableException(-1, e)
def do_OPTIONS(self): self.data_bytes = None self.method = requests.options self.forward('OPTIONS')
def do_OPTIONS(self): self.method = requests.options self.forward('OPTIONS')
def test_options_request(self): # OPTIONS????????? r = requests.options('http://127.0.0.1:8080') expected = { 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Methods': 'POST' } for name, value in expected.items(): self.assertTrue(name in r.headers) self.assertEqual(value, r.headers[name])
def options(self, headers=None): """ Method for making a HTTP OPTIONS request on resource's endpoint. """ r = requests.options(self.endpoint, headers=headers, auth=self.auth) return r
def assembleRequest(data, method, URL, safe, headerDict, proxyDict, verbose): # connect to the API endpoint, and send the data that was given # note the response variable will hold all of the information received back from the server try: if method == "GET": response = requests.get(URL, data, headers=headerDict,proxies=proxyDict, verify=safe) elif method == "POST": response = requests.post(URL, data, headers=headerDict, verify=safe, proxies=proxyDict) elif method == "PUT": response = requests.put(URL, data, headers=headerDict, verify=safe, proxies=proxyDict) elif method == "DELETE": response = requests.delete(URL, headers=headerDict, verify=safe, proxies=proxyDict) elif method == "OPTIONS": response = requests.options(URL, verify=safe, proxies=proxyDict) elif method == "HEAD": response = requests.head(URL, verify=safe, proxies=proxyDict) else: print "method not currently supported" print "current methods: GET, POST, PUT, DELETE, OPTIONS, HEAD" sys.exit() print "" # print response code (i.e. 404, 500, 200) print response.status_code if verbose: # response headers are saved as a dictionary. Loop through the dictionary and print information. for i in response.headers: print i + ":", response.headers[i] print "" # output response body print response.content else: pass print "" print "" except requests.exceptions.Timeout: print "Connection timeout - make sure a WAF or firewall rule is not blocking traffic." except requests.exceptions.TooManyRedirects: print "Too many redirects - URL may be bad" except requests.exceptions.SSLError: print "could not verify HTTPS certificate." print 'pURL tries to verify HTTPS certificates. If using a proxy try "--unsafe"' except requests.exceptions.RequestException as e: print e except Exception, e: print "catastrophic failure see below for details" print e sys.exit(1)
def send_http_request(self, url, method, request_body="", header="", cookie=None): try: if method.lower() == "get": if header is not "": _response = requests.get(url, data=request_body, headers=eval(header), cookies=cookie) return _response else: _response = requests.get(url, data=request_body, cookies=cookie) return _response elif method.lower() == "post": if header is not "": _response = requests.post(url, data=request_body, headers=eval(header), cookies=cookie) return _response else: _response = requests.post(url, data=request_body, cookies=cookie) return _response elif method.lower() == "put": if header is not "": _response = requests.put(url, headers=eval(header), data=request_body, cookies=cookie) return _response else: _response = requests.put(url, data=request_body, cookies=cookie) return _response elif method.lower() == "delete": if header is not "": _response = requests.delete(url, headers=eval(header), data=request_body, cookies=cookie) return _response else: _response = requests.delete(url, data=request_body, cookies=cookie) return _response elif method.lower() == "head": if header is not "": _response = requests.head(url, headers=eval(header), data=request_body, cookies=cookie) return _response else: _response = requests.head(url, data=request_body, cookies=cookie) return _response elif method.lower() == "options": if header is not "": _response = requests.options(url, headers=eval(header), data=request_body, cookies=cookie) return _response else: _response = requests.options(url, data=request_body, cookies=cookie) return _response else: raise Exception("Not supported method: %s" % method) except Exception, ex: raise HTTPRequestException(ex.message)