我们从Python开源项目中,提取了以下38个代码示例,用于说明如何使用urllib2.request()。
def files(self): if not self._files: path = '/ajax_details_filelist.php' url = self.url.path(path).query_param('id', self.id) request = urllib.request.Request( url, headers={'User-Agent': "Magic Browser"}) response = urllib.request.urlopen(request).read() root = html.document_fromstring(response) rows = root.findall('.//tr') if len(rows) == 1 and rows[0].find('td').get('colspan') == str(2): self._files = {} else: for row in rows: name, size = [unicode(v.text_content()) for v in row.findall('.//td')] self._files[name] = size.replace('\xa0', ' ') return self._files
def run(self, suppress = False): """ Blocks until our request is complete then provides the descriptors. If we haven't yet started our request then this does so. :param bool suppress: avoids raising exceptions if **True** :returns: list for the requested :class:`~stem.descriptor.__init__.Descriptor` instances :raises: Using the iterator can fail with the following if **suppress** is **False**... * **ValueError** if the descriptor contents is malformed * **socket.timeout** if our request timed out * **urllib2.URLError** for most request failures Note that the urllib2 module may fail with other exception types, in which case we'll pass it along. """ return list(self._run(suppress))
def _pick_url(self, use_authority = False): """ Provides a url that can be queried. If we have multiple endpoints then one will be picked randomly. :param bool use_authority: ignores our endpoints and uses a directory authority instead :returns: **str** for the url being queried by this request """ if use_authority or not self.endpoints: authority = random.choice(filter(HAS_V3IDENT, get_authorities().values())) address, dirport = authority.address, authority.dir_port else: address, dirport = random.choice(self.endpoints) return 'http://%s:%i/%s' % (address, dirport, self.resource.lstrip('/'))
def _get_data(url): """Helper function to get data over http or from a local file""" if url.startswith('http://'): # Try Python 2, use Python 3 on exception try: resp = urllib.urlopen(url) encoding = resp.headers.dict.get('content-encoding', 'plain') except AttributeError: resp = urllib.request.urlopen(url) encoding = resp.headers.get('content-encoding', 'plain') data = resp.read() if encoding == 'plain': pass elif encoding == 'gzip': data = StringIO(data) data = gzip.GzipFile(fileobj=data).read() else: raise RuntimeError('unknown encoding') else: with open(url, 'r') as fid: data = fid.read() return data
def _get_data(url): """Helper function to get data over http or from a local file""" if url.startswith('http://'): # Try Python 2, use Python 3 on exception try: resp = urllib.urlopen(url) encoding = resp.headers.dict.get('content-encoding', 'plain') except AttributeError: resp = urllib.request.urlopen(url) encoding = resp.headers.get('content-encoding', 'plain') data = resp.read() if encoding == 'plain': pass elif encoding == 'gzip': data = StringIO(data) data = gzip.GzipFile(fileobj=data).read() else: raise RuntimeError('unknown encoding') else: with open(url, 'r') as fid: data = fid.read() fid.close() return data
def _make_request(self, opener, request): """Make the API call and return the response. This is separated into it's own function, so we can mock it easily for testing. :param opener: :type opener: :param request: url payload to request :type request: urllib.Request object :return: urllib response """ try: return opener.open(request) except HTTPError as err: exc = handle_error(err) exc.__cause__ = None raise exc
def make_request(self, method, request_body=None, query_params=None, request_headers=None): method = method.upper() if request_headers: self._set_headers(request_headers) request_body = json.dumps(request_body) if request_body else None query_params = query_params if query_params else None opener = urllib.build_opener() request = urllib.Request(self._build_url(query_params), data=request_body) for key, value in self.request_headers.iteritems(): request.add_header(key, value) request.get_method = lambda: method self._response = opener.open(request) self._set_response(self._response) self._reset()
def atnt_database_directory(): global atnt_downloaded_directory if atnt_downloaded_directory: return atnt_downloaded_directory if os.path.exists(atnt_default_directory): return atnt_default_directory import sys, tempfile if sys.version_info[0] <= 2: import urllib2 as urllib else: import urllib.request as urllib atnt_downloaded_directory = tempfile.mkdtemp(prefix='atnt_db_') db_url = "http://www.cl.cam.ac.uk/Research/DTG/attarchive/pub/data/att_faces.zip" logger.warn("Downloading the AT&T database from '%s' to '%s' ...", db_url, atnt_downloaded_directory) logger.warn("To avoid this, please download the database manually, extract the data and set the ATNT_DATABASE_DIRECTORY environment variable to this directory.") # to avoid re-downloading in parallel test execution os.environ['ATNT_DATABASE_DIRECTORY'] = atnt_downloaded_directory # download url = urllib.urlopen(db_url) local_zip_file = os.path.join(atnt_downloaded_directory, 'att_faces.zip') dfile = open(local_zip_file, 'wb') dfile.write(url.read()) dfile.close() # unzip import zipfile zip = zipfile.ZipFile(local_zip_file) zip.extractall(atnt_downloaded_directory) os.remove(local_zip_file) return atnt_downloaded_directory
def items(self): """ Request URL and parse response. Yield a ``Torrent`` for every torrent on page. """ request = urllib.request.Request( self.url, headers={'User-Agent': "Magic Browser"}) response = urllib.request.urlopen(request).read() root = html.document_fromstring(str(response)) items = [self._build_torrent(row) for row in self._get_torrent_rows(root)] for item in items: yield item
def info(self): if self._info is None: request = urllib.request.Request( self.url, headers={'User-Agent': "Magic Browser"}) response = urllib.request.urlopen(request).read() root = html.document_fromstring(response) info = root.cssselect('#details .nfo pre')[0].text_content() self._info = info return self._info
def get_server_descriptors(self, fingerprints = None, **query_args): """ Provides the server descriptors with the given fingerprints. If no fingerprints are provided then this returns all descriptors in the present consensus. :param str,list fingerprints: fingerprint or list of fingerprints to be retrieved, gets all descriptors if **None** :param query_args: additional arguments for the :class:`~stem.descriptor.remote.Query` constructor :returns: :class:`~stem.descriptor.remote.Query` for the server descriptors :raises: **ValueError** if we request more than 96 descriptors by their fingerprints (this is due to a limit on the url length by squid proxies). """ resource = '/tor/server/all.z' if isinstance(fingerprints, str): fingerprints = [fingerprints] if fingerprints: if len(fingerprints) > MAX_FINGERPRINTS: raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS) resource = '/tor/server/fp/%s.z' % '+'.join(fingerprints) return self.query(resource, **query_args)
def get_extrainfo_descriptors(self, fingerprints = None, **query_args): """ Provides the extrainfo descriptors with the given fingerprints. If no fingerprints are provided then this returns all descriptors in the present consensus. :param str,list fingerprints: fingerprint or list of fingerprints to be retrieved, gets all descriptors if **None** :param query_args: additional arguments for the :class:`~stem.descriptor.remote.Query` constructor :returns: :class:`~stem.descriptor.remote.Query` for the extrainfo descriptors :raises: **ValueError** if we request more than 96 descriptors by their fingerprints (this is due to a limit on the url length by squid proxies). """ resource = '/tor/extra/all.z' if isinstance(fingerprints, str): fingerprints = [fingerprints] if fingerprints: if len(fingerprints) > MAX_FINGERPRINTS: raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS) resource = '/tor/extra/fp/%s.z' % '+'.join(fingerprints) return self.query(resource, **query_args)
def get_microdescriptors(self, hashes, **query_args): """ Provides the microdescriptors with the given hashes. To get these see the 'microdescriptor_hashes' attribute of :class:`~stem.descriptor.router_status_entry.RouterStatusEntryV3`. Note that these are only provided via a microdescriptor consensus (such as 'cached-microdesc-consensus' in your data directory). :param str,list hashes: microdescriptor hash or list of hashes to be retrieved :param query_args: additional arguments for the :class:`~stem.descriptor.remote.Query` constructor :returns: :class:`~stem.descriptor.remote.Query` for the microdescriptors :raises: **ValueError** if we request more than 92 microdescriptors by their hashes (this is due to a limit on the url length by squid proxies). """ if isinstance(hashes, str): hashes = [hashes] if len(hashes) > MAX_MICRODESCRIPTOR_HASHES: raise ValueError('Unable to request more than %i microdescriptors at a time by their hashes' % MAX_MICRODESCRIPTOR_HASHES) return self.query('/tor/micro/d/%s.z' % '-'.join(hashes), **query_args)
def query(self, resource, **query_args): """ Issues a request for the given resource. :param str resource: resource being fetched, such as '/tor/server/all.z' :param query_args: additional arguments for the :class:`~stem.descriptor.remote.Query` constructor :returns: :class:`~stem.descriptor.remote.Query` for the descriptors :raises: **ValueError** if resource is clearly invalid or the descriptor type can't be determined when 'descriptor_type' is **None** """ args = dict(self._default_args) args.update(query_args) if 'endpoints' not in args: args['endpoints'] = self._endpoints if 'fall_back_to_authority' not in args: args['fall_back_to_authority'] = True return Query( resource, **args )
def api_request_native(url, data=None, token=None, https_proxy=None, method=None): request = urllib.Request(url) # print('API request url:', request.get_full_url()) if method: request.get_method = lambda: method token = token if token != None else token_auth_string() request.add_header('Authorization', 'token ' + token) request.add_header('Accept', 'application/json') request.add_header('Content-Type', 'application/json') if data is not None: request.add_data(bytes(data.encode('utf8'))) # print('API request data:', request.get_data()) # print('API request header:', request.header_items()) # https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy') # if https_proxy: # opener = urllib.build_opener(urllib.HTTPHandler(), urllib.HTTPSHandler(), # urllib.ProxyHandler({'https': https_proxy})) # urllib.install_opener(opener) try: with contextlib.closing(urllib.urlopen(request)) as response: if response.code == 204: # No Content return None else: return json.loads(response.read().decode('utf8', 'ignore')) except urllib.HTTPError as err: with contextlib.closing(err): raise SimpleHTTPError(err.code, err.read())
def api_request_curl(url, data=None, token=None, https_proxy=None, method=None): command = ["curl", '-K', '-', url] token = token if token != None else token_auth_string() config = ['--header "Authorization: token ' + token + '"', '--header "Accept: application/json"', '--header "Content-Type: application/json"', "--silent"] if method: config.append('--request "%s"' % method) # https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy') # if https_proxy: # config.append(https_proxy) with named_tempfile() as header_output_file: config.append('--dump-header "%s"' % header_output_file.name) header_output_file.close() with named_tempfile() as data_file: if data is not None: data_file.write(bytes(data.encode('utf8'))) data_file.close() config.append('--data-binary "@%s"' % data_file.name) process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) response, _ = process.communicate(bytes('\n'.join(config).encode('utf8'))) returncode = process.returncode if returncode != 0: raise subprocess.CalledProcessError(returncode, 'curl') with open(header_output_file.name, "r") as headers: _, responsecode, message = headers.readline().split(None, 2) responsecode = int(responsecode) if responsecode == 204: # No Content return None elif 200 <= responsecode < 300 or responsecode == 100: # Continue return json.loads(response.decode('utf8', 'ignore')) else: raise SimpleHTTPError(responsecode, response)
def test_s3upload_get_presigned_url_bad_region(self): """Test that the region parser provides a valid default region when an invalid region used.""" import json responses.add(responses.POST, "https://30yinsv8k6.execute-api.us-east-1.amazonaws.com/prod/get-signed-url", body="http://test/", status=200) ec2rlcore.s3upload.get_presigned_url("https://aws-support-uploader.s3.amazonaws.com/uploader?" "account-id=9999999999&case-id=99999999&expiration=1486577795&" "key=92e1ab350e7f5302551e0b05a89616381bb6c66" "9c9492d9acfbf63701e455ef6", "test", "in-valid-1") request = json.loads(responses.calls[0].request.body) self.assertEqual(len(responses.calls), 1) self.assertEqual("us-east-1", request["region"])
def test_s3upload_get_presigned_url_good_region(self): """Test that the region parser provides a valid region.""" import json responses.add(responses.POST, "https://30yinsv8k6.execute-api.us-east-1.amazonaws.com/prod/get-signed-url", body="http://test/", status=200) ec2rlcore.s3upload.get_presigned_url("https://aws-support-uploader.s3.amazonaws.com/uploader?" "account-id=9999999999&case-id=99999999&expiration=1486577795&" "key=92e1ab350e7f5302551e0b05a89616381bb6c66" "9c9492d9acfbf63701e455ef6", "test", "eu-west-1") request = json.loads(responses.calls[0].request.body) self.assertEqual(len(responses.calls), 1) self.assertEqual("eu-west-1", request["region"])
def _update_headers(self, request_headers): """Update the headers for the request :param request_headers: headers to set for the API call :type response: dictionary :return: dictionary """ self.request_headers.update(request_headers)
def _make_request(self, opener, request): if 200 <= self.response_code < 299: # if successsful code return MockResponse(self.response_code) else: raise handle_error(MockException(self.response_code))
def fetch_or_load(spec_path): """ Fetch a new specification or use the cache if it's current. :argument cache_path: the path to a cached specification """ headers = {} try: modified = datetime.utcfromtimestamp(os.path.getmtime(spec_path)) date = modified.strftime("%a, %d %b %Y %I:%M:%S UTC") headers["If-Modified-Since"] = date except OSError as error: if error.errno != errno.ENOENT: raise request = urllib.Request(VALIDATION_SPEC, headers=headers) response = urllib.urlopen(request) if response.code == 200: with open(spec_path, "w+b") as spec: spec.writelines(response) spec.seek(0) return html.parse(spec) with open(spec_path) as spec: return html.parse(spec)
def twitterreq(url, method, parameters): """ Construct, sign and open a twitter request using credentials above :param url: request url :param method: POST or GET :param parameters: (irrelevant, for Posting) :return: Twitter response """ req = oauth.Request.from_consumer_and_token(oauth_consumer, token=oauth_token, http_method=http_method, http_url=url, parameters=parameters) req.sign_request(signature_method_hmac_sha1, oauth_consumer, oauth_token) headers = req.to_header() if http_method == "POST": encoded_post_data = req.to_postdata() else: encoded_post_data = None url = req.to_url() opener = urllib.OpenerDirector() opener.add_handler(http_handler) opener.add_handler(https_handler) response = opener.open(url, encoded_post_data) return response
def download_goodTests(GOODTESTS_URL=None): ''' download_goodTests - Attempts to download GoodTests, using the default global url (or one provided). @return <int> - 0 on success (program should continue), otherwise non-zero (program should abort with this exit status) ''' if GOODTESTS_URL is None: GOODTESTS_URL = globals()['GOODTESTS_URL'] validAnswer = False while validAnswer == False: sys.stdout.write('GoodTests not found. Would you like to install it to local folder? (y/n): ') sys.stdout.flush() answer = sys.stdin.readline().strip().lower() if answer not in ('y', 'n', 'yes', 'no'): continue validAnswer = True answer = answer[0] if answer == 'n': sys.stderr.write('Cannot run tests without installing GoodTests. http://pypi.python.org/pypi/GoodTests or https://github.com/kata198/Goodtests\n') return 1 try: import urllib2 as urllib except ImportError: try: import urllib.request as urllib except: sys.stderr.write('Failed to import urllib. Trying pip.\n') import subprocess pipe = subprocess.Popen('pip install GoodTests', shell=True) res = pipe.wait() if res != 0: sys.stderr.write('Failed to install GoodTests with pip ordirect download. aborting.\n') return 1 try: response = urllib.urlopen(GOODTESTS_URL) contents = response.read() if str != bytes: contents = contents.decode('ascii') except Exception as e: sys.stderr.write('Failed to download GoodTests.py from "%s"\n%s\n' %(GOODTESTS_URL, str(e))) return 1 try: with open('GoodTests.py', 'w') as f: f.write(contents) except Exception as e: sys.stderr.write('Failed to write to GoodTests.py\n%s\n' %(str(e,))) return 1 try: os.chmod('GoodTests.py', 0o775) except: sys.stderr.write('WARNING: Failed to chmod +x GoodTests.py, may not be able to be executed.\n') try: import GoodTests except ImportError: sys.stderr.write('Seemed to download GoodTests okay, but still cannot import. Aborting.\n') return 1 return 0