Python urllib 模块,response() 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用urllib.response()

项目:Basic-discord-bot    作者:TheWizoid    | 项目源码 | 文件源码
def eight_ball():
    response = [
    "It is certain",
    "It is decidedly so",
    "Without a doubt",
    "Yes, definitely",
    "You may rely on it",
    "As I see it, yes",
    "Most likely",
    "Yes",
    "Signs point to yes",
    "Don't count on it",
    "My reply is no",
    "My sources say no",
    "Very doubtful"]
    index = randint(0,12)
    return response[index]
项目:prms-door    作者:prmakerspace    | 项目源码 | 文件源码
def authenticate_with_apikey(self, api_key, scope=None):
        """perform authentication by api key and store result for execute_request method
        api_key -- secret api key from account settings
        scope -- optional scope of authentication request. If None full list of API scopes will be used.
        """
        scope = "auto" if scope is None else scope
        data = {
            "grant_type": "client_credentials",
            "scope": scope
        }
        encoded_data = urllib.parse.urlencode(data).encode()
        request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
        request.add_header("ContentType", "application/x-www-form-urlencoded")
        request.add_header("Authorization", 'Basic ' + base64.standard_b64encode(('APIKEY:' + api_key).encode()).decode())
        response = urllib.request.urlopen(request)
        self._token = WaApiClient._parse_response(response)
        self._token.retrieved_at = datetime.datetime.now()
项目:prms-door    作者:prmakerspace    | 项目源码 | 文件源码
def authenticate_with_contact_credentials(self, username, password, scope=None):
        """perform authentication by contact credentials and store result for execute_request method
        username -- typically a contact email
        password -- contact password
        scope -- optional scope of authentication request. If None full list of API scopes will be used.
        """
        scope = "auto" if scope is None else scope
        data = {
            "grant_type": "password",
            "username": username,
            "password": password,
            "scope": scope
        }
        encoded_data = urllib.parse.urlencode(data).encode()
        request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
        request.add_header("ContentType", "application/x-www-form-urlencoded")
        auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
        request.add_header("Authorization", 'Basic ' + auth_header)
        response = urllib.request.urlopen(request)
        self._token = WaApiClient._parse_response(response)
        self._token.retrieved_at = datetime.datetime.now()
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_urllib(self):
        """
        Tests that urllib isn't changed from under our feet. (This might not
        even be a problem?)
        """
        from future import standard_library
        import urllib
        orig_file = urllib.__file__
        with standard_library.hooks():
            import urllib.response
        self.assertEqual(orig_file, urllib.__file__)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_urllib_imports_moves(self):
        import future.moves.urllib
        import future.moves.urllib.parse
        import future.moves.urllib.request
        import future.moves.urllib.robotparser
        import future.moves.urllib.error
        import future.moves.urllib.response
        self.assertTrue(True)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_urllib_imports_install_aliases(self):
        with standard_library.suspend_hooks():
            standard_library.install_aliases()
            import urllib
            import urllib.parse
            import urllib.request
            import urllib.robotparser
            import urllib.error
            import urllib.response
            self.assertTrue(True)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_urllib_imports_cm(self):
        with standard_library.hooks():
            import urllib
            import urllib.parse
            import urllib.request
            import urllib.robotparser
            import urllib.error
            import urllib.response
        self.assertTrue(True)
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_urllib_imports_install_hooks(self):
        standard_library.remove_hooks()
        standard_library.install_hooks()
        import urllib
        import urllib.parse
        import urllib.request
        import urllib.robotparser
        import urllib.error
        import urllib.response
        self.assertTrue(True)
项目:prms-door    作者:prmakerspace    | 项目源码 | 文件源码
def execute_request(self, api_url, api_request_object=None, method=None):
        """
        perform api request and return result as an instance of ApiObject or list of ApiObjects
        api_url -- absolute or relative api resource url
        api_request_object -- any json serializable object to send to API
        method -- HTTP method of api request. Default: GET if api_request_object is None else POST
        """
        if self._token is None:
            raise ApiException("Access token is not abtained. "
                               "Call authenticate_with_apikey or authenticate_with_contact_credentials first.")

        if not api_url.startswith("http"):
            api_url = self.api_endpoint + api_url

        if method is None:
            if api_request_object is None:
                method = "GET"
            else:
                method = "POST"

        request = urllib.request.Request(api_url, method=method)
        if api_request_object is not None:
            request.data = json.dumps(api_request_object, cls=_ApiObjectEncoder).encode()

        request.add_header("Content-Type", "application/json")
        request.add_header("Accept", "application/json")
        request.add_header("Authorization", "Bearer " + self._get_access_token())

        try:
            response = urllib.request.urlopen(request)
            return WaApiClient._parse_response(response)
        except urllib.error.HTTPError as httpErr:
            if httpErr.code == 400:
                raise ApiException(httpErr.read())
            else:
                raise
项目:prms-door    作者:prmakerspace    | 项目源码 | 文件源码
def _refresh_auth_token(self):
        data = {
            "grant_type": "refresh_token",
            "refresh_token": self._token.refresh_token
        }
        encoded_data = urllib.parse.urlencode(data).encode()
        request = urllib.request.Request(self.auth_endpoint, encoded_data, method="POST")
        request.add_header("ContentType", "application/x-www-form-urlencoded")
        auth_header = base64.standard_b64encode((self.client_id + ':' + self.client_secret).encode()).decode()
        request.add_header("Authorization", 'Basic ' + auth_header)
        response = urllib.request.urlopen(request)
        self._token = WaApiClient._parse_response(response)
        self._token.retrieved_at = datetime.datetime.now()
项目:packaging    作者:blockstack    | 项目源码 | 文件源码
def test_future_moves(self):
        """
        Ensure everything is available from the future.moves interface that we
        claim and expect. (Issue #104).
        """
        from future.moves.collections import Counter, OrderedDict   # backported to Py2.6
        from future.moves.collections import UserDict, UserList, UserString

        from future.moves import configparser
        from future.moves import copyreg

        from future.moves.itertools import filterfalse, zip_longest

        from future.moves import html
        import future.moves.html.entities
        import future.moves.html.parser

        from future.moves import http
        import future.moves.http.client
        import future.moves.http.cookies
        import future.moves.http.cookiejar
        import future.moves.http.server

        from future.moves import queue

        from future.moves import socketserver

        from future.moves.subprocess import check_output              # even on Py2.6
        from future.moves.subprocess import getoutput, getstatusoutput

        from future.moves.sys import intern

        from future.moves import urllib
        import future.moves.urllib.error
        import future.moves.urllib.parse
        import future.moves.urllib.request
        import future.moves.urllib.response
        import future.moves.urllib.robotparser

        try:
            # Is _winreg available on Py2? If so, ensure future.moves._winreg is available too:
            import _winreg
        except ImportError:
            pass
        else:
            from future.moves import winreg

        from future.moves import xmlrpc
        import future.moves.xmlrpc.client
        import future.moves.xmlrpc.server

        from future.moves import _dummy_thread
        from future.moves import _markupbase
        from future.moves import _thread
项目:Basic-discord-bot    作者:TheWizoid    | 项目源码 | 文件源码
def logging_consent(message):
    try:
        logging_consent = open("{0}/{0}_logging_chat.txt".format(message.server),"r",)
        for i in logging_consent:
            if i == "":
                raise FileNotFoundError
    except:
        try:
            os.mkdir("{}".format(message.server))
        except:
            pass
        logging_consent = open("{0}/{0}_logging_chat.txt".format(message.server),"w",encoding="utf-8")
        logging_consent.write("True")

    logging_consent.close()
    logging_consent = open("{0}/{0}_logging_chat.txt".format(message.server),"r",)
    logging_chat = logging_consent.read()
    logging_consent.close()

    if logging_chat == "True":
        chatlog = open("{0}/{0}_chatlog.txt".format(message.server),"a",encoding="utf-8")
        try:
            message.content += (" " + str(message.attachments[0]["url"]))
            url = str(message.attachments[0]["url"])
            file_name = url.split('/')[-1]
            response = urllib.request.urlopen("{}".format(url))
            #Discord doesn't allow me to download files?
            response_temp = response.read()
        except:
            pass
        print("[{}]{}:{}".format(message.server,message.author.name,message.content))
        try:
            start = int("1f1e6", 16)
            end = int("1f93f", 16)
            emoji_dict = pickle.load(open("{}/emoji_amount.txt".format(str(message.server)),"rb"))
        except:
            pickle.dump({},open("{}/emoji_amount.txt".format(str(message.server)),"wb"))
            emoji_dict = pickle.load(open("{}/emoji_amount.txt".format(str(message.server)),"rb"))

            for i in range(start,end):
                emoji_dict[i] = 0
        """
        unicode_message_content = str.encode(message.content,"utf-8")
        print(unicode_message_content)
        if bytes(128515) in unicode_message_content:
            print("PogChamp")
        print(bytes(128515))
        print(bytes("64",encoding="utf-8"))

        for i in range(start,end):#still doesn't work
            temp = int(str.lower(hex(i)),16)
            if bytes(temp) in unicode_message_content:
                emoji_dict[temp] += 1

        print(emoji_dict)
        #print(emoji_dict["0x1f604"])

        pickle.dump(emoji_dict,open("{}/emoji_amount.txt".format(str(message.server)),"wb"))
        """
        chatlog.write("[" +str(message.timestamp)[0:19]+ "]"+ message.author.name + ":" + message.content + "\n")
        chatlog.close()
项目:JshBot-legacy    作者:jkchen2    | 项目源码 | 文件源码
def play_sound_tag(server_id, voice_channel_id, sound_tag_name, user_id):
    """Plays the sound from the given sound tag if it is available."""

    try:
        if servermanager.is_muted(server_id, voice_channel_id):
            raise bot_exception(EXCEPT_TYPE, "The bot is muted in this voice channel")
    except KeyError:
        raise bot_exception(EXCEPT_TYPE, "You are not in a voice channel (are you perhaps on a different server?)")

    sound_tag_data = get_sound_tag_data(server_id, sound_tag_name)
    check_sound_tag_access(server_id, sound_tag_data, user_id, need_owner=False)
    update_sound_tag(server_id, sound_tag_name, increment_hits=True) # Increment hits

    from jshbot.botmanager import client
    from jshbot.botmanager import voice_player
    global timeout_goal

    channel = botmanager.get_voice_channel(server_id, voice_channel_id)
    if client.voice == None or client.voice.channel != channel or not client.voice.is_connected(): # Connect to channel
        if client.voice: # Disconnect from old channel
            await client.voice.disconnect()
        client.voice = await client.join_voice_channel(channel)
        voice_player.server_id = server_id

    if voice_player.player is not None and voice_player.player.is_playing(): # Stop if playing
        voice_player.player.stop()

    if sound_tag_data['type'] == 'YouTube':
        # To prevent playlist downloads
        # 'noplaylist': True
        voice_player.player = await client.voice.create_ytdl_player(sound_tag_data['url'])
    else: # Direct download (or stream? Not sure)
        try:
            # One day, I will figure out how to stream this crap. But today is not that day.
            #response = urllib.request.urlopen(sound_tag_data['url'])
            #voice_player.player = client.voice.create_ffmpeg_player(response, use_avconv=True)
            urllib.request.urlretrieve (sound_tag_data['url'], '{}/tempsound'.format(configmanager.data_directory))
            voice_player.player = client.voice.create_ffmpeg_player('{}/tempsound'.format(configmanager.data_directory))
        except:
            raise bot_exception(EXCEPT_TYPE, "An error occurred when downloading the sound file")
    voice_player.player.start()

    timeout = configmanager.config['voice_timeout']
    if timeout >= 0:
        timeout_reset_lock.acquire()
        timeout_goal = time.time() + ((timeout*60) if timeout > 0 else (sound_tag_data['length']+1))
        timeout_reset_lock.release()
        await timeout_disconnect()