我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用urllib.response()。
def eight_ball(): response = [ "It is certain", "It is decidedly so", "Without a doubt", "Yes, definitely", "You may rely on it", "As I see it, yes", "Most likely", "Yes", "Signs point to yes", "Don't count on it", "My reply is no", "My sources say no", "Very doubtful"] index = randint(0,12) return response[index]
def authenticate_with_apikey(self, api_key, scope=None): """perform authentication by api key and store result for execute_request method api_key -- secret api key from account settings scope -- optional scope of authentication request. If None full list of API scopes will be used. """ scope = "auto" if scope is None else scope data = { "grant_type": "client_credentials", "scope": scope } encoded_data = urllib.parse.urlencode(data).encode() request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST") request.add_header("ContentType", "application/x-www-form-urlencoded") request.add_header("Authorization", 'Basic ' + base64.standard_b64encode(('APIKEY:' + api_key).encode()).decode()) response = urllib.request.urlopen(request) self._token = WaApiClient._parse_response(response) self._token.retrieved_at = datetime.datetime.now()
def authenticate_with_contact_credentials(self, username, password, scope=None): """perform authentication by contact credentials and store result for execute_request method username -- typically a contact email password -- contact password scope -- optional scope of authentication request. If None full list of API scopes will be used. """ scope = "auto" if scope is None else scope data = { "grant_type": "password", "username": username, "password": password, "scope": scope } encoded_data = urllib.parse.urlencode(data).encode() request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST") request.add_header("ContentType", "application/x-www-form-urlencoded") auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode() request.add_header("Authorization", 'Basic ' + auth_header) response = urllib.request.urlopen(request) self._token = WaApiClient._parse_response(response) self._token.retrieved_at = datetime.datetime.now()
def test_urllib(self): """ Tests that urllib isn't changed from under our feet. (This might not even be a problem?) """ from future import standard_library import urllib orig_file = urllib.__file__ with standard_library.hooks(): import urllib.response self.assertEqual(orig_file, urllib.__file__)
def test_urllib_imports_moves(self): import future.moves.urllib import future.moves.urllib.parse import future.moves.urllib.request import future.moves.urllib.robotparser import future.moves.urllib.error import future.moves.urllib.response self.assertTrue(True)
def test_urllib_imports_install_aliases(self): with standard_library.suspend_hooks(): standard_library.install_aliases() import urllib import urllib.parse import urllib.request import urllib.robotparser import urllib.error import urllib.response self.assertTrue(True)
def test_urllib_imports_cm(self): with standard_library.hooks(): import urllib import urllib.parse import urllib.request import urllib.robotparser import urllib.error import urllib.response self.assertTrue(True)
def test_urllib_imports_install_hooks(self): standard_library.remove_hooks() standard_library.install_hooks() import urllib import urllib.parse import urllib.request import urllib.robotparser import urllib.error import urllib.response self.assertTrue(True)
def execute_request(self, api_url, api_request_object=None, method=None): """ perform api request and return result as an instance of ApiObject or list of ApiObjects api_url -- absolute or relative api resource url api_request_object -- any json serializable object to send to API method -- HTTP method of api request. Default: GET if api_request_object is None else POST """ if self._token is None: raise ApiException("Access token is not abtained. " "Call authenticate_with_apikey or authenticate_with_contact_credentials first.") if not api_url.startswith("http"): api_url = self.api_endpoint + api_url if method is None: if api_request_object is None: method = "GET" else: method = "POST" request = urllib.request.Request(api_url, method=method) if api_request_object is not None: request.data = json.dumps(api_request_object, cls=_ApiObjectEncoder).encode() request.add_header("Content-Type", "application/json") request.add_header("Accept", "application/json") request.add_header("Authorization", "Bearer " + self._get_access_token()) try: response = urllib.request.urlopen(request) return WaApiClient._parse_response(response) except urllib.error.HTTPError as httpErr: if httpErr.code == 400: raise ApiException(httpErr.read()) else: raise
def _refresh_auth_token(self): data = { "grant_type": "refresh_token", "refresh_token": self._token.refresh_token } encoded_data = urllib.parse.urlencode(data).encode() request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST") request.add_header("ContentType", "application/x-www-form-urlencoded") auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode() request.add_header("Authorization", 'Basic ' + auth_header) response = urllib.request.urlopen(request) self._token = WaApiClient._parse_response(response) self._token.retrieved_at = datetime.datetime.now()
def test_future_moves(self): """ Ensure everything is available from the future.moves interface that we claim and expect. (Issue #104). """ from future.moves.collections import Counter, OrderedDict # backported to Py2.6 from future.moves.collections import UserDict, UserList, UserString from future.moves import configparser from future.moves import copyreg from future.moves.itertools import filterfalse, zip_longest from future.moves import html import future.moves.html.entities import future.moves.html.parser from future.moves import http import future.moves.http.client import future.moves.http.cookies import future.moves.http.cookiejar import future.moves.http.server from future.moves import queue from future.moves import socketserver from future.moves.subprocess import check_output # even on Py2.6 from future.moves.subprocess import getoutput, getstatusoutput from future.moves.sys import intern from future.moves import urllib import future.moves.urllib.error import future.moves.urllib.parse import future.moves.urllib.request import future.moves.urllib.response import future.moves.urllib.robotparser try: # Is _winreg available on Py2? If so, ensure future.moves._winreg is available too: import _winreg except ImportError: pass else: from future.moves import winreg from future.moves import xmlrpc import future.moves.xmlrpc.client import future.moves.xmlrpc.server from future.moves import _dummy_thread from future.moves import _markupbase from future.moves import _thread
def logging_consent(message): try: logging_consent = open("{0}/{0}_logging_chat.txt".format(message.server),"r",) for i in logging_consent: if i == "": raise FileNotFoundError except: try: os.mkdir("{}".format(message.server)) except: pass logging_consent = open("{0}/{0}_logging_chat.txt".format(message.server),"w",encoding="utf-8") logging_consent.write("True") logging_consent.close() logging_consent = open("{0}/{0}_logging_chat.txt".format(message.server),"r",) logging_chat = logging_consent.read() logging_consent.close() if logging_chat == "True": chatlog = open("{0}/{0}_chatlog.txt".format(message.server),"a",encoding="utf-8") try: message.content += (" " + str(message.attachments[0]["url"])) url = str(message.attachments[0]["url"]) file_name = url.split('/')[-1] response = urllib.request.urlopen("{}".format(url)) #Discord doesn't allow me to download files? response_temp = response.read() except: pass print("[{}]{}:{}".format(message.server,message.author.name,message.content)) try: start = int("1f1e6", 16) end = int("1f93f", 16) emoji_dict = pickle.load(open("{}/emoji_amount.txt".format(str(message.server)),"rb")) except: pickle.dump({},open("{}/emoji_amount.txt".format(str(message.server)),"wb")) emoji_dict = pickle.load(open("{}/emoji_amount.txt".format(str(message.server)),"rb")) for i in range(start,end): emoji_dict[i] = 0 """ unicode_message_content = str.encode(message.content,"utf-8") print(unicode_message_content) if bytes(128515) in unicode_message_content: print("PogChamp") print(bytes(128515)) print(bytes("64",encoding="utf-8")) for i in range(start,end):#still doesn't work temp = int(str.lower(hex(i)),16) if bytes(temp) in unicode_message_content: emoji_dict[temp] += 1 print(emoji_dict) #print(emoji_dict["0x1f604"]) pickle.dump(emoji_dict,open("{}/emoji_amount.txt".format(str(message.server)),"wb")) """ chatlog.write("[" +str(message.timestamp)[0:19]+ "]"+ message.author.name + ":" + message.content + "\n") chatlog.close()
def play_sound_tag(server_id, voice_channel_id, sound_tag_name, user_id): """Plays the sound from the given sound tag if it is available.""" try: if servermanager.is_muted(server_id, voice_channel_id): raise bot_exception(EXCEPT_TYPE, "The bot is muted in this voice channel") except KeyError: raise bot_exception(EXCEPT_TYPE, "You are not in a voice channel (are you perhaps on a different server?)") sound_tag_data = get_sound_tag_data(server_id, sound_tag_name) check_sound_tag_access(server_id, sound_tag_data, user_id, need_owner=False) update_sound_tag(server_id, sound_tag_name, increment_hits=True) # Increment hits from jshbot.botmanager import client from jshbot.botmanager import voice_player global timeout_goal channel = botmanager.get_voice_channel(server_id, voice_channel_id) if client.voice == None or client.voice.channel != channel or not client.voice.is_connected(): # Connect to channel if client.voice: # Disconnect from old channel await client.voice.disconnect() client.voice = await client.join_voice_channel(channel) voice_player.server_id = server_id if voice_player.player is not None and voice_player.player.is_playing(): # Stop if playing voice_player.player.stop() if sound_tag_data['type'] == 'YouTube': # To prevent playlist downloads # 'noplaylist': True voice_player.player = await client.voice.create_ytdl_player(sound_tag_data['url']) else: # Direct download (or stream? Not sure) try: # One day, I will figure out how to stream this crap. But today is not that day. #response = urllib.request.urlopen(sound_tag_data['url']) #voice_player.player = client.voice.create_ffmpeg_player(response, use_avconv=True) urllib.request.urlretrieve (sound_tag_data['url'], '{}/tempsound'.format(configmanager.data_directory)) voice_player.player = client.voice.create_ffmpeg_player('{}/tempsound'.format(configmanager.data_directory)) except: raise bot_exception(EXCEPT_TYPE, "An error occurred when downloading the sound file") voice_player.player.start() timeout = configmanager.config['voice_timeout'] if timeout >= 0: timeout_reset_lock.acquire() timeout_goal = time.time() + ((timeout*60) if timeout > 0 else (sound_tag_data['length']+1)) timeout_reset_lock.release() await timeout_disconnect()