我们从Python开源项目中,提取了以下36个代码示例,用于说明如何使用urllib.request.request()。
def run(self, suppress = False): """ Blocks until our request is complete then provides the descriptors. If we haven't yet started our request then this does so. :param bool suppress: avoids raising exceptions if **True** :returns: list for the requested :class:`~stem.descriptor.__init__.Descriptor` instances :raises: Using the iterator can fail with the following if **suppress** is **False**... * **ValueError** if the descriptor contents is malformed * **socket.timeout** if our request timed out * **urllib2.URLError** for most request failures Note that the urllib2 module may fail with other exception types, in which case we'll pass it along. """ return list(self._run(suppress))
def _pick_url(self, use_authority = False): """ Provides a url that can be queried. If we have multiple endpoints then one will be picked randomly. :param bool use_authority: ignores our endpoints and uses a directory authority instead :returns: **str** for the url being queried by this request """ if use_authority or not self.endpoints: authority = random.choice(filter(HAS_V3IDENT, get_authorities().values())) address, dirport = authority.address, authority.dir_port else: address, dirport = random.choice(self.endpoints) return 'http://%s:%i/%s' % (address, dirport, self.resource.lstrip('/'))
def _make_request(self, opener, request): """Make the API call and return the response. This is separated into it's own function, so we can mock it easily for testing. :param opener: :type opener: :param request: url payload to request :type request: urllib.Request object :return: urllib response """ try: return opener.open(request) except HTTPError as err: exc = handle_error(err) exc.__cause__ = None raise exc
def make_request(self, method, request_body=None, query_params=None, request_headers=None): method = method.upper() if request_headers: self._set_headers(request_headers) request_body = json.dumps(request_body) if request_body else None query_params = query_params if query_params else None opener = urllib.build_opener() request = urllib.Request(self._build_url(query_params), data=request_body) for key, value in self.request_headers.iteritems(): request.add_header(key, value) request.get_method = lambda: method self._response = opener.open(request) self._set_response(self._response) self._reset()
def http(method, url, body=None, headers=None): url_info = urlparse.urlparse(url) if url_info.scheme == "https": con = httplib.HTTPSConnection(url_info.hostname, url_info.port or 443) else: con = httplib.HTTPConnection(url_info.hostname, url_info.port or 80) con.request(method, url_info.path, body, headers) response = con.getresponse() try: if 400 <= response.status < 500: raise HttpClientError(response.status, response.reason, response.read()) elif 500 <= response.status < 600: raise HttpServerError(response.status, response.reason, response.read()) else: yield response finally: con.close()
def get_response_tweets(): """ Iterates through all tweets in database 1/minute If photo is no longer available, deletes tweet Otherwise, looks for responses and adds those to the database :return: nothing """ original_tweets = pull_all_original_tweets() # one request/minute number_processed = 0 for tweet in original_tweets: number_processed += 1 if number_processed % 10 == 0: print("currently on tweet #" + str(number_processed) + '. Tweet id: ' + str(tweet['tweet_id'])) if is_valid_image(tweet['image_url']): pull_tweet_responses(tweet['username'], tweet['tweet_id']) pulled = True else: delete_tweet(tweet['tweet_id']) pulled = False if pulled: time.sleep(60)
def download_and_unpack(url, dest): if not os.path.exists(dest): try: import urllib urllib.urlretrieve('http://google.com') except AttributeError: import urllib.request as urllib print("downloading " + dest + " ...") archive_name = dest + ".zip" urllib.urlretrieve(url, archive_name) in_file = open(archive_name, 'rb') z = zipfile.ZipFile(in_file) for name in z.namelist(): print("extracting " + name) outpath = "./" z.extract(name, outpath) in_file.close() os.remove(archive_name) print("done.")
def atnt_database_directory(): global atnt_downloaded_directory if atnt_downloaded_directory: return atnt_downloaded_directory if os.path.exists(atnt_default_directory): return atnt_default_directory import sys, tempfile if sys.version_info[0] <= 2: import urllib2 as urllib else: import urllib.request as urllib atnt_downloaded_directory = tempfile.mkdtemp(prefix='atnt_db_') db_url = "http://www.cl.cam.ac.uk/Research/DTG/attarchive/pub/data/att_faces.zip" logger.warn("Downloading the AT&T database from '%s' to '%s' ...", db_url, atnt_downloaded_directory) logger.warn("To avoid this, please download the database manually, extract the data and set the ATNT_DATABASE_DIRECTORY environment variable to this directory.") # to avoid re-downloading in parallel test execution os.environ['ATNT_DATABASE_DIRECTORY'] = atnt_downloaded_directory # download url = urllib.urlopen(db_url) local_zip_file = os.path.join(atnt_downloaded_directory, 'att_faces.zip') dfile = open(local_zip_file, 'wb') dfile.write(url.read()) dfile.close() # unzip import zipfile zip = zipfile.ZipFile(local_zip_file) zip.extractall(atnt_downloaded_directory) os.remove(local_zip_file) return atnt_downloaded_directory
def download_setuptools(packagename, to_dir): # making sure we use the absolute path to_dir = os.path.abspath(to_dir) try: from urllib.request import urlopen except ImportError: from urllib2 import urlopen chksum, url = get_pypi_src_download(packagename) tgz_name = os.path.basename(url) saveto = os.path.join(to_dir, tgz_name) src = dst = None if not os.path.exists(saveto): # Avoid repeated downloads try: log.warn("Downloading %s", url) src = urlopen(url) # Read/write all in one block, so we don't create a corrupt file # if the download is interrupted. data = src.read() if chksum is not None: data_sum = md5(data).hexdigest() if data_sum != chksum: raise RuntimeError("Downloading %s failed: corrupt checksum"%(url,)) dst = open(saveto, "wb") dst.write(data) finally: if src: src.close() if dst: dst.close() return os.path.realpath(saveto)
def get_server_descriptors(self, fingerprints = None, **query_args): """ Provides the server descriptors with the given fingerprints. If no fingerprints are provided then this returns all descriptors in the present consensus. :param str,list fingerprints: fingerprint or list of fingerprints to be retrieved, gets all descriptors if **None** :param query_args: additional arguments for the :class:`~stem.descriptor.remote.Query` constructor :returns: :class:`~stem.descriptor.remote.Query` for the server descriptors :raises: **ValueError** if we request more than 96 descriptors by their fingerprints (this is due to a limit on the url length by squid proxies). """ resource = '/tor/server/all.z' if isinstance(fingerprints, str): fingerprints = [fingerprints] if fingerprints: if len(fingerprints) > MAX_FINGERPRINTS: raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS) resource = '/tor/server/fp/%s.z' % '+'.join(fingerprints) return self.query(resource, **query_args)
def get_extrainfo_descriptors(self, fingerprints = None, **query_args): """ Provides the extrainfo descriptors with the given fingerprints. If no fingerprints are provided then this returns all descriptors in the present consensus. :param str,list fingerprints: fingerprint or list of fingerprints to be retrieved, gets all descriptors if **None** :param query_args: additional arguments for the :class:`~stem.descriptor.remote.Query` constructor :returns: :class:`~stem.descriptor.remote.Query` for the extrainfo descriptors :raises: **ValueError** if we request more than 96 descriptors by their fingerprints (this is due to a limit on the url length by squid proxies). """ resource = '/tor/extra/all.z' if isinstance(fingerprints, str): fingerprints = [fingerprints] if fingerprints: if len(fingerprints) > MAX_FINGERPRINTS: raise ValueError('Unable to request more than %i descriptors at a time by their fingerprints' % MAX_FINGERPRINTS) resource = '/tor/extra/fp/%s.z' % '+'.join(fingerprints) return self.query(resource, **query_args)
def get_microdescriptors(self, hashes, **query_args): """ Provides the microdescriptors with the given hashes. To get these see the 'microdescriptor_hashes' attribute of :class:`~stem.descriptor.router_status_entry.RouterStatusEntryV3`. Note that these are only provided via a microdescriptor consensus (such as 'cached-microdesc-consensus' in your data directory). :param str,list hashes: microdescriptor hash or list of hashes to be retrieved :param query_args: additional arguments for the :class:`~stem.descriptor.remote.Query` constructor :returns: :class:`~stem.descriptor.remote.Query` for the microdescriptors :raises: **ValueError** if we request more than 92 microdescriptors by their hashes (this is due to a limit on the url length by squid proxies). """ if isinstance(hashes, str): hashes = [hashes] if len(hashes) > MAX_MICRODESCRIPTOR_HASHES: raise ValueError('Unable to request more than %i microdescriptors at a time by their hashes' % MAX_MICRODESCRIPTOR_HASHES) return self.query('/tor/micro/d/%s.z' % '-'.join(hashes), **query_args)
def query(self, resource, **query_args): """ Issues a request for the given resource. :param str resource: resource being fetched, such as '/tor/server/all.z' :param query_args: additional arguments for the :class:`~stem.descriptor.remote.Query` constructor :returns: :class:`~stem.descriptor.remote.Query` for the descriptors :raises: **ValueError** if resource is clearly invalid or the descriptor type can't be determined when 'descriptor_type' is **None** """ args = dict(self._default_args) args.update(query_args) if 'endpoints' not in args: args['endpoints'] = self._endpoints if 'fall_back_to_authority' not in args: args['fall_back_to_authority'] = True return Query( resource, **args )
def download(url, server_fname, local_fname=None, progress_update_percentage=5): """ An internet download utility modified from http://stackoverflow.com/questions/22676/ how-do-i-download-a-file-over-http-using-python/22776#22776 """ try: import urllib urllib.urlretrieve('http://google.com') except AttributeError: import urllib.request as urllib u = urllib.urlopen(url) if local_fname is None: local_fname = server_fname full_path = local_fname meta = u.info() with open(full_path, 'wb') as f: try: file_size = int(meta.get("Content-Length")) except TypeError: print("WARNING: Cannot get file size, displaying bytes instead!") file_size = 100 print("Downloading: %s Bytes: %s" % (server_fname, file_size)) file_size_dl = 0 block_sz = int(1E7) p = 0 while True: buffer = u.read(block_sz) if not buffer: break file_size_dl += len(buffer) f.write(buffer) if (file_size_dl * 100. / file_size) > p: status = r"%10d [%3.2f%%]" % (file_size_dl, file_size_dl * 100. / file_size) print(status) p += progress_update_percentage
def api_request_native(url, data=None, token=None, https_proxy=None, method=None): request = urllib.Request(url) # print('API request url:', request.get_full_url()) if method: request.get_method = lambda: method token = token if token != None else token_auth_string() request.add_header('Authorization', 'token ' + token) request.add_header('Accept', 'application/json') request.add_header('Content-Type', 'application/json') if data is not None: request.add_data(bytes(data.encode('utf8'))) # print('API request data:', request.get_data()) # print('API request header:', request.header_items()) # https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy') # if https_proxy: # opener = urllib.build_opener(urllib.HTTPHandler(), urllib.HTTPSHandler(), # urllib.ProxyHandler({'https': https_proxy})) # urllib.install_opener(opener) try: with contextlib.closing(urllib.urlopen(request)) as response: if response.code == 204: # No Content return None else: return json.loads(response.read().decode('utf8', 'ignore')) except urllib.HTTPError as err: with contextlib.closing(err): raise SimpleHTTPError(err.code, err.read())
def api_request_curl(url, data=None, token=None, https_proxy=None, method=None): command = ["curl", '-K', '-', url] token = token if token != None else token_auth_string() config = ['--header "Authorization: token ' + token + '"', '--header "Accept: application/json"', '--header "Content-Type: application/json"', "--silent"] if method: config.append('--request "%s"' % method) # https_proxy = https_proxy if https_proxy != None else settings.get('https_proxy') # if https_proxy: # config.append(https_proxy) with named_tempfile() as header_output_file: config.append('--dump-header "%s"' % header_output_file.name) header_output_file.close() with named_tempfile() as data_file: if data is not None: data_file.write(bytes(data.encode('utf8'))) data_file.close() config.append('--data-binary "@%s"' % data_file.name) process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) response, _ = process.communicate(bytes('\n'.join(config).encode('utf8'))) returncode = process.returncode if returncode != 0: raise subprocess.CalledProcessError(returncode, 'curl') with open(header_output_file.name, "r") as headers: _, responsecode, message = headers.readline().split(None, 2) responsecode = int(responsecode) if responsecode == 204: # No Content return None elif 200 <= responsecode < 300 or responsecode == 100: # Continue return json.loads(response.decode('utf8', 'ignore')) else: raise SimpleHTTPError(responsecode, response)
def player_request(self, line, player_id=None, raw=False, wait=True): """Makes a single request to a particular player (or the current)""" try: player_id = (player_id or self.cur_player_id or list(self.players.values())[0]["playerid"]) return self._request(["%s %s" % (player_id, line)], raw=raw, wait=wait)[0] except IndexError: return None
def _update_headers(self, request_headers): """Update the headers for the request :param request_headers: headers to set for the API call :type response: dictionary :return: dictionary """ self.request_headers.update(request_headers)
def _make_request(self, opener, request): if 200 <= self.response_code < 299: # if successsful code return MockResponse(self.response_code) else: raise handle_error(MockException(self.response_code))
def fetch_or_load(spec_path): """ Fetch a new specification or use the cache if it's current. :argument cache_path: the path to a cached specification """ headers = {} try: modified = datetime.utcfromtimestamp(os.path.getmtime(spec_path)) date = modified.strftime("%a, %d %b %Y %I:%M:%S UTC") headers["If-Modified-Since"] = date except OSError as error: if error.errno != errno.ENOENT: raise request = urllib.Request(VALIDATION_SPEC, headers=headers) response = urllib.urlopen(request) if response.code == 200: with open(spec_path, "w+b") as spec: spec.writelines(response) spec.seek(0) return html.parse(spec) with open(spec_path) as spec: return html.parse(spec)
def test_patched_modules(self): new_mod = """ from eventlet import patcher patcher.monkey_patch() import socket try: import urllib.request as urllib except ImportError: import urllib print("newmod {0} {1}".format(socket.socket, urllib.socket.socket)) """ self.write_to_tempfile("newmod", new_mod) output, lines = self.launch_subprocess('newmod.py') assert lines[0].startswith('newmod'), repr(output) self.assertEqual(lines[0].count('GreenSocket'), 2, repr(output))
def twitterreq(url, method, parameters): """ Construct, sign and open a twitter request using credentials above :param url: request url :param method: POST or GET :param parameters: (irrelevant, for Posting) :return: Twitter response """ req = oauth.Request.from_consumer_and_token(oauth_consumer, token=oauth_token, http_method=http_method, http_url=url, parameters=parameters) req.sign_request(signature_method_hmac_sha1, oauth_consumer, oauth_token) headers = req.to_header() if http_method == "POST": encoded_post_data = req.to_postdata() else: encoded_post_data = None url = req.to_url() opener = urllib.OpenerDirector() opener.add_handler(http_handler) opener.add_handler(https_handler) response = opener.open(url, encoded_post_data) return response
def download_goodTests(GOODTESTS_URL=None): ''' download_goodTests - Attempts to download GoodTests, using the default global url (or one provided). @return <int> - 0 on success (program should continue), otherwise non-zero (program should abort with this exit status) ''' if GOODTESTS_URL is None: GOODTESTS_URL = globals()['GOODTESTS_URL'] validAnswer = False while validAnswer == False: sys.stdout.write('GoodTests not found. Would you like to install it to local folder? (y/n): ') sys.stdout.flush() answer = sys.stdin.readline().strip().lower() if answer not in ('y', 'n', 'yes', 'no'): continue validAnswer = True answer = answer[0] if answer == 'n': sys.stderr.write('Cannot run tests without installing GoodTests. http://pypi.python.org/pypi/GoodTests or https://github.com/kata198/Goodtests\n') return 1 try: import urllib2 as urllib except ImportError: try: import urllib.request as urllib except: sys.stderr.write('Failed to import urllib. Trying pip.\n') import subprocess pipe = subprocess.Popen('pip install GoodTests', shell=True) res = pipe.wait() if res != 0: sys.stderr.write('Failed to install GoodTests with pip ordirect download. aborting.\n') return 1 try: response = urllib.urlopen(GOODTESTS_URL) contents = response.read() if str != bytes: contents = contents.decode('ascii') except Exception as e: sys.stderr.write('Failed to download GoodTests.py from "%s"\n%s\n' %(GOODTESTS_URL, str(e))) return 1 try: with open('GoodTests.py', 'w') as f: f.write(contents) except Exception as e: sys.stderr.write('Failed to write to GoodTests.py\n%s\n' %(str(e,))) return 1 try: os.chmod('GoodTests.py', 0o775) except: sys.stderr.write('WARNING: Failed to chmod +x GoodTests.py, may not be able to be executed.\n') try: import GoodTests except ImportError: sys.stderr.write('Seemed to download GoodTests okay, but still cannot import. Aborting.\n') return 1 return 0
def _request(self, lines, raw=False, wait=True): """ Send multiple pipelined requests to the server, if connected, and return their responses, assuming order is maintained (which seems safe). :type lines list[str] :rtype list[str] """ if not self.ssl_wrap.is_connected: return [] if not (lines and len(lines)): return [] lines = [l.rstrip() for l in lines] first_word = lines[0].split()[0] if not (self.ssl_wrap.is_connected or first_word == 'login'): print_d("Can't do '%s' - not connected" % first_word, self) return if self._debug: print_d("<<<< " + "\n..<< ".join(lines)) request = "\n".join(lines) + "\n" raw_response = self.ssl_wrap.communicate(request, wait=wait) if not wait: return [] if not raw_response: raise SqueezeboxException( "No further response from %s. Login problem?" % self) raw_response = raw_response.rstrip("\n") response = raw_response if raw else self._unquote(raw_response) if self._debug: print_d(">>>> " + "\n..>> ".join(response.splitlines())) def start_point(text): if first_word == 'login': return 6 delta = -1 if text.endswith('?') else 1 return len(self._unquote(text) if raw else text) + delta resp_lines = response.splitlines() if len(lines) != len(resp_lines): raise ValueError("Response problem: %s != %s" % (lines, resp_lines)) return [resp_line[start_point(line):] for line, resp_line in zip(lines, resp_lines)]