我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用urllib.request.data()。
def _try_send(self, ip, port, body, header, data): try: request = urllib.request.Request('http://%s:%s/upnp/control/basicevent1' % (ip, port)) request.add_header('Content-type', 'text/xml; charset="utf-8"') request.add_header('SOAPACTION', header) request_body = '<?xml version="1.0" encoding="utf-8"?>' request_body += '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/" s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">' request_body += '<s:Body>%s</s:Body></s:Envelope>' % body request.data = request_body.encode() result = urllib.request.urlopen(request, timeout=3) return self._extract(result.read().decode(), data) except Exception as e: # except: # raise print(str(e)) return None
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, **_3to2kwargs): if 'cadefault' in _3to2kwargs: cadefault = _3to2kwargs['cadefault']; del _3to2kwargs['cadefault'] else: cadefault = False if 'capath' in _3to2kwargs: capath = _3to2kwargs['capath']; del _3to2kwargs['capath'] else: capath = None if 'cafile' in _3to2kwargs: cafile = _3to2kwargs['cafile']; del _3to2kwargs['cafile'] else: cafile = None global _opener if cafile or capath or cadefault: if not _have_ssl: raise ValueError('SSL support not available') context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.options |= ssl.OP_NO_SSLv2 context.verify_mode = ssl.CERT_REQUIRED if cafile or capath: context.load_verify_locations(cafile, capath) else: context.set_default_verify_paths() https_handler = HTTPSHandler(context=context, check_hostname=True) opener = build_opener(https_handler) elif _opener is None: _opener = opener = build_opener() else: opener = _opener return opener.open(url, data, timeout)
def __init__(self, url, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None): # unwrap('<URL:type://host/path>') --> 'type://host/path' self.full_url = unwrap(url) self.full_url, self.fragment = splittag(self.full_url) self.data = data self.headers = {} self._tunnel_host = None for key, value in headers.items(): self.add_header(key, value) self.unredirected_hdrs = {} if origin_req_host is None: origin_req_host = request_host(self) self.origin_req_host = origin_req_host self.unverifiable = unverifiable self.method = method self._parse()
def http_error_407(self, url, fp, errcode, errmsg, headers, data=None, retry=False): """Error 407 -- proxy authentication required. This function supports Basic authentication only.""" if 'proxy-authenticate' not in headers: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) stuff = headers['proxy-authenticate'] match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff) if not match: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) scheme, realm = match.groups() if scheme.lower() != 'basic': URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) if not retry: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) name = 'retry_proxy_' + self.type + '_basic_auth' if data is None: return getattr(self,name)(url, realm) else: return getattr(self,name)(url, realm, data)
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, *, cafile=None, capath=None): global _opener if cafile or capath: if not _have_ssl: raise ValueError('SSL support not available') context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) context.options |= ssl.OP_NO_SSLv2 if cafile or capath: context.verify_mode = ssl.CERT_REQUIRED context.load_verify_locations(cafile, capath) check_hostname = True else: check_hostname = False https_handler = HTTPSHandler(context=context, check_hostname=check_hostname) opener = build_opener(https_handler) elif _opener is None: _opener = opener = build_opener() else: opener = _opener return opener.open(url, data, timeout)
def __init__(self, url, data=None, headers={}, origin_req_host=None, unverifiable=False): # unwrap('<URL:type://host/path>') --> 'type://host/path' self.full_url = unwrap(url) self.full_url, self.fragment = splittag(self.full_url) self.data = data self.headers = {} self._tunnel_host = None for key, value in headers.items(): self.add_header(key, value) self.unredirected_hdrs = {} if origin_req_host is None: origin_req_host = request_host(self) self.origin_req_host = origin_req_host self.unverifiable = unverifiable self._parse()
def http_error_401(self, url, fp, errcode, errmsg, headers, data=None, retry=False): """Error 401 -- authentication required. This function supports Basic authentication only.""" if 'www-authenticate' not in headers: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) stuff = headers['www-authenticate'] import re match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff) if not match: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) scheme, realm = match.groups() if scheme.lower() != 'basic': URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) if not retry: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) name = 'retry_' + self.type + '_basic_auth' if data is None: return getattr(self,name)(url, realm) else: return getattr(self,name)(url, realm, data)
def http_error_407(self, url, fp, errcode, errmsg, headers, data=None, retry=False): """Error 407 -- proxy authentication required. This function supports Basic authentication only.""" if 'proxy-authenticate' not in headers: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) stuff = headers['proxy-authenticate'] import re match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff) if not match: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) scheme, realm = match.groups() if scheme.lower() != 'basic': URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) if not retry: URLopener.http_error_default(self, url, fp, errcode, errmsg, headers) name = 'retry_proxy_' + self.type + '_basic_auth' if data is None: return getattr(self,name)(url, realm) else: return getattr(self,name)(url, realm, data)
def proxy_bypass_environment(host): """Test if proxies should not be used for a particular host. Checks the environment for a variable named no_proxy, which should be a list of DNS suffixes separated by commas, or '*' for all hosts. """ no_proxy = os.environ.get('no_proxy', '') or os.environ.get('NO_PROXY', '') # '*' is special case for always bypass if no_proxy == '*': return 1 # strip port off host hostonly, port = splitport(host) # check if the host ends with any of the DNS suffixes no_proxy_list = [proxy.strip() for proxy in no_proxy.split(',')] for name in no_proxy_list: if name and (hostonly.endswith(name) or host.endswith(name)): return 1 # otherwise, don't bypass return 0 # This code tests an OSX specific data structure but is testable on all # platforms
def authenticate_with_apikey(self, api_key, scope=None): """perform authentication by api key and store result for execute_request method api_key -- secret api key from account settings scope -- optional scope of authentication request. If None full list of API scopes will be used. """ scope = "auto" if scope is None else scope data = { "grant_type": "client_credentials", "scope": scope } encoded_data = urllib.parse.urlencode(data).encode() request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST") request.add_header("ContentType", "application/x-www-form-urlencoded") request.add_header("Authorization", 'Basic ' + base64.standard_b64encode(('APIKEY:' + api_key).encode()).decode()) response = urllib.request.urlopen(request) self._token = WaApiClient._parse_response(response) self._token.retrieved_at = datetime.datetime.now()
def authenticate_with_contact_credentials(self, username, password, scope=None): """perform authentication by contact credentials and store result for execute_request method username -- typically a contact email password -- contact password scope -- optional scope of authentication request. If None full list of API scopes will be used. """ scope = "auto" if scope is None else scope data = { "grant_type": "password", "username": username, "password": password, "scope": scope } encoded_data = urllib.parse.urlencode(data).encode() request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST") request.add_header("ContentType", "application/x-www-form-urlencoded") auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode() request.add_header("Authorization", 'Basic ' + auth_header) response = urllib.request.urlopen(request) self._token = WaApiClient._parse_response(response) self._token.retrieved_at = datetime.datetime.now()
def test_http_closed(self): """Test the connection is cleaned up when the response is closed""" for (transfer, data) in ( ("Connection: close", b"data"), ("Transfer-Encoding: chunked", b"4\r\ndata\r\n0\r\n\r\n"), ("Content-Length: 4", b"data"), ): header = "HTTP/1.1 200 OK\r\n{}\r\n\r\n".format(transfer) conn = test_urllib.fakehttp(header.encode() + data) handler = urllib.request.AbstractHTTPHandler() req = Request("http://dummy/") req.timeout = None with handler.do_open(conn, req) as resp: resp.read() self.assertTrue(conn.fakesock.closed, "Connection not closed with {!r}".format(transfer))
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, *, cafile=None, capath=None, cadefault=False, context=None): global _opener if cafile or capath or cadefault: if context is not None: raise ValueError( "You can't pass both context and any of cafile, capath, and " "cadefault" ) if not _have_ssl: raise ValueError('SSL support not available') context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=cafile, capath=capath) https_handler = HTTPSHandler(context=context) opener = build_opener(https_handler) elif context: https_handler = HTTPSHandler(context=context) opener = build_opener(https_handler) elif _opener is None: _opener = opener = build_opener() else: opener = _opener return opener.open(url, data, timeout)
def __init__(self, url, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None): self.full_url = url self.headers = {} self.unredirected_hdrs = {} self._data = None self.data = data self._tunnel_host = None for key, value in headers.items(): self.add_header(key, value) if origin_req_host is None: origin_req_host = request_host(self) self.origin_req_host = origin_req_host self.unverifiable = unverifiable if method: self.method = method
def build_http_request_obj(self, request_body): request = urllib.request.Request(self.url) request.add_header("Content-Type", "application/json") request.add_header("User-Agent", "gemstone-client") request.data = json.dumps(request_body).encode() request.method = "POST" return request
def authentication_required(f): """ This login decorator verifies that the correct username and password are sent over POST in the XML format. """ @wraps(f) def decorated_function(*args, **kwargs): postdata = request.data.decode('utf-8') if len(postdata) == 0: app.logger.error('Authentication: No xml post data in request') return abort(403) else: root = ETdefused.fromstring(postdata) user_data = root.find("./Authentication/username") pass_data = root.find("./Authentication/token") if user_data is None or pass_data is None: app.logger.error('Authentication: Invalid XML, token not present or empty') return abort(403) username = user_data.text password = pass_data.text if not authenticate(username, password): app.logger.error("Authentication failure for user %s", username) return abort(403) return f(*args, **kwargs) return decorated_function
def checkCommunityUser(): """ Checks if community credentials are used """ postdata = request.data.decode('utf-8') if len(postdata) == 0: app.logger.error('no xml post data in request') return abort(403) else: root = ETdefused.fromstring(postdata) user_data = root.find("./Authentication/username") pass_data = root.find("./Authentication/token") if user_data is None or pass_data is None: app.logger.error('Invalid XML: token not present or empty') return abort(403) username = user_data.text password = pass_data.text if username == app.config['COMMUNITYUSER'] and password == app.config['COMMUNITYTOKEN']: return True if not authenticate(username, password): app.logger.error("simplePostMessage-Authentication failure for user %s", username) return abort(403) return False
def querySingleIP(): """ Retrieve Attack data from index about a single IP """ # get result from cache getCacheResult = getCache(request.url, "url") if getCacheResult is not False: app.logger.debug('Returning /querySingleIP from Cache for %s' % str(request.remote_addr)) return Response(getCacheResult) # query ES else: returnResult = formatSingleIP(queryForSingleIP(app.config['MAXALERTS'], request.args.get('ip'), checkCommunityIndex(request))) setCache(request.url, returnResult, 60, "url") app.logger.debug('Returning /querySingleIP from ES for %s' % str(request.remote_addr)) return Response(returnResult, mimetype='text/xml') # Routes with both XML and JSON output
def postSimpleMessage(): if request.data: tree = putservice.checkPostData(request.data) if tree: putservice.handleAlerts(tree, checkCommunityUser(), es, cache) message = "<Result><StatusCode>OK</StatusCode><Text></Text></Result>" return Response(message, mimetype='text/xml') return app.config['DEFAULTRESPONSE'] ############### ### Main ###############
def get_method(self): """Return a string indicating the HTTP request method.""" if self.method is not None: return self.method elif self.data is not None: return "POST" else: return "GET"
def add_data(self, data): msg = "Request.add_data method is deprecated." warnings.warn(msg, DeprecationWarning, stacklevel=1) self.data = data