Python aiohttp 模块,post() 实例源码

我们从Python开源项目中,提取了以下11个代码示例,用于说明如何使用aiohttp.post()

项目:rci    作者:seecloud    | 项目源码 | 文件源码
def oauth(self, code, state):
        """Handler for Authorization callback URL.

        'code' and 'state' are GET variables given by github

        :param string code:
        :param string state:
        :return: Client instance
        :raises: exceptions.GithubException
        """
        try:
            scopes = self._requested_scopes.pop(state)
        except KeyError:
            raise UnknownState(state)
        data = {
            "client_id": self._client_id,
            "client_secret": self._client_secret,
            "code": code,
            "state": state,
        }
        headers = {
            "accept": "application/json"
        }
        response = yield from aiohttp.post(REQ_TOKEN_URL,
                                           data=data,
                                           headers=headers)
        data = yield from response.json()
        return Client(data["access_token"], scopes)
项目:HAL-9000    作者:AnsonRS    | 项目源码 | 文件源码
def parsePlaylist(url):
    try:
        page = await aiohttp.post(url, headers=headers)
        page = await page.text()

        #page = requests.get(url, headers=headers)
        soup = BeautifulSoup(page, 'html.parser')
        tags = soup.find_all("tr", class_="pl-video yt-uix-tile ")
        links = []

        for tag in tags:
            links.append("https://www.youtube.com/watch?v=" + tag['data-video-id'])
        if links != []:
            return links
        else:
            return False
    except:
        return False
项目:KeekoBot    作者:DavidNeon    | 项目源码 | 文件源码
def _strawpoll(self, ctx, *, question, options=None):
        """Makes a poll based on questions and choices or options. must be divided by "; "
            Examples:
            [p]strawpoll What is this person?; Who is this person?; Where is this person?; When is this person coming?
            [p]strawpoll What; Who?; Where?; When?; Why?"""
        options_list = question.split('; ')
        title = options_list[0]
        options_list.remove(title)
        if len(options_list) < 2:
            await self.bot.say("You need to specify 2 or more options")
        else:
            normal = {"title": title, "options": options_list}
            request = dict(normal, **self.settings)
            async with aiohttp.post('http://strawpoll.me/api/v2/polls', headers={'content-type': 'application/json'},
                                    data=json.dumps(request)) as resp:
                test = await resp.content.read()
                test = json.loads(test.decode())
                id = test["id"]
                await self.bot.say("Here's your strawpoll link: http://strawpoll.me/{}".format(id))
项目:rci    作者:seecloud    | 项目源码 | 文件源码
def post(self, uri, *args, full_response=False, **params):
        if args:
            uri = format_uri(uri, list(args))
        resp = yield from aiohttp.post(API_URL + uri,
                                       data=json.dumps(params),
                                       headers=self.post_headers)
        if 200 > resp.status > 300:
            raise GithubError(Response(resp))
        if full_response:
            return Response(resp)
        return (yield from resp.json())
项目:Inkxbot    作者:InkxtheSquid    | 项目源码 | 文件源码
def update(self):
        # Matt hasn't given me a key yet
    #    carbon_payload = {
            #'key': self.bot.carbon_key,
            #'servercount': len(self.bot.servers)
        #}

        #async with self.ses sion.post(CARBONAPIBOTDATA, data=carbon_payload) as resp:
            #log.info('Carbon statistics returned {0.status} for {1}'.format(resp, carbon_payload))

        payload = json.dumps({
            'server_count': len(self.bot.servers)
        })

        headers = {
            'authorization': self.bot.dbots_key,
            'content-type': 'application/json'
        }

        url = '{0}/bots/{1.user.id}/stats'.format(DBOTSAPI, self.bot)
        async with self.session.post(url, data=payload, headers=headers) as resp:
            log.info('DBots statistics returned {0.status} for {1}'.format(resp, payload))


        dlpayload = {
            "token": self.bot.discordlist_token,
            "servers": len(self.bot.servers)
        }

        serverdata = {
            "servers": len(self.bot.servers)
        }

        url = "https://bots.discordlist.net/api.php"
        resp = await aiohttp.post(url, data=dlpayload)
        resp.close()
        async with self.session.post(url, data=dlpayload) as resp:
            log.info('DiscordList statistics returned {0.status} for {1}'.format(resp, serverdata))
项目:Inkxbot    作者:InkxtheSquid    | 项目源码 | 文件源码
def get_new_splatnet_cookie(username, password):
    parameters = {'client_id': SPLATNET_CLIENT_ID,
                  'response_type': 'code',
                  'redirect_uri': SPLATNET_CALLBACK_URL,
                  'username': username,
                  'password': password}

    async with aiohttp.post(NINTENDO_LOGIN_PAGE, data=parameters) as response:
        cookie = response.history[-1].cookies.get('_wag_session')
        if cookie is None:
            print(req)
            raise Exception("Couldn't retrieve cookie")
        return cookie
项目:tuxbot-bot    作者:outout14    | 项目源码 | 文件源码
def get_new_splatnet_cookie(username, password):
    parameters = {'client_id': SPLATNET_CLIENT_ID,
                  'response_type': 'code',
                  'redirect_uri': SPLATNET_CALLBACK_URL,
                  'username': username,
                  'password': password}

    async with aiohttp.post(NINTENDO_LOGIN_PAGE, data=parameters) as response:
        cookie = response.history[-1].cookies.get('_wag_session')
        if cookie is None:
            print(req)
            raise Exception("Couldn't retrieve cookie")
        return cookie
项目:foxpy    作者:plusreed    | 项目源码 | 文件源码
def get_new_splatnet_cookie(username, password):
    parameters = {'client_id': SPLATNET_CLIENT_ID,
                  'response_type': 'code',
                  'redirect_uri': SPLATNET_CALLBACK_URL,
                  'username': username,
                  'password': password}

    async with aiohttp.post(NINTENDO_LOGIN_PAGE, data=parameters) as response:
        cookie = response.history[-1].cookies.get('_wag_session')
        if cookie is None:
            print(req)
            raise Exception("Couldn't retrieve cookie")
        return cookie
项目:jamdb    作者:CenterForOpenScience    | 项目源码 | 文件源码
def patch(self, handler):
        if not handler.payload:
            raise exceptions.BadRequest()

        if not handler.payload.attributes.get('id'):
            raise exceptions.BadRequest(detail='Id must be provided')

        error = None
        id = handler.payload.attributes.pop('id')

        try:
            doc = self.collection.read(id)
        except exceptions.NotFound as e:
            error = e

        permissions = handler.current_user.permissions | Permissions.get_permissions(handler.current_user, doc)
        if error or not (permissions & Permissions.UPDATE):
            return handler.write({
                'data': {
                    'id': id,
                    'type': 'suppressions',
                    'attributes': {}
                }
            })

        email = self.extract_email(doc=doc)
        headers = {'Authorization': 'Bearer {}'.format(self.sendgrid_key)}

        for group, subscribe in list(handler.payload.attributes.items()):
            if subscribe:
                async with aiohttp.post('https://api.sendgrid.com/v3/asm/groups/{}/suppressions'.format(group), headers=headers, data=json.dumps({'recipient_emails': [email]})) as response:
                    assert response.status == 201  # TODO Handle errors
            else:
                async with aiohttp.delete('https://api.sendgrid.com/v3/asm/groups/{}/suppressions/{}'.format(group, quote(email)), headers=headers) as response:
                    assert response.status == 204  # TODO Handle errors
            handler.payload.attributes[group] = bool(subscribe)

        return handler.write({
            'data': {
                'id': id,
                'type': 'supressions',
                'attributes': handler.payload.attributes
            }
        })
项目:LunaBot    作者:miraai    | 项目源码 | 文件源码
def coliru(self, ctx, *, code : CodeBlock):
        """Compiles code via Coliru.

        You have to pass in a codeblock with the language syntax
        either set to one of these:

        - cpp
        - python
        - py
        - haskell

        Anything else isn't supported. The C++ compiler uses g++ -std=c++14.

        Please don't spam this for Stacked's sake.
        """
        payload = {
            'cmd': code.command,
            'src': code.source
        }

        data = json.dumps(payload)

        async with aiohttp.post('http://coliru.stacked-crooked.com/compile', data=data) as resp:
            if resp.status != 200:
                await self.bot.say('Coliru did not respond in time.')
                return
            output = await resp.text()

            if len(output) < 1992:
                fmt = '```\n{}\n```'.format(output)
                await self.bot.say(fmt)
                return

            # output is too big so post it in gist
            gist = {
                'description': 'The response for {0.author}\'s compilation.'.format(ctx.message),
                'public': True,
                'files': {
                    'output': {
                        'content': output
                    },
                    'original': {
                        'content': code.source
                    }
                }
            }

            async with aiohttp.post('https://api.github.com/gists', data=json.dumps(gist)) as gh:
                if gh.status != 201:
                    await self.bot.say('Could not create gist.')
                else:
                    js = await gh.json()
                    await self.bot.say('Output too big. The content is in: {0[html_url]}'.format(js))
项目:ORELS-Cogs    作者:orels1    | 项目源码 | 文件源码
def _post(self, ctx, *, url):
        """Performs POST or PUT request to selected URL"""

        await self.bot.say('Set headers by typing them in a `name=value` format, one on each line, or `pass`')

        answer = await self.bot.wait_for_message(timeout=50, author=ctx.message.author)

        parsed = self._parse_headers(answer)
        headers = parsed['headers']
        public_headers = parsed['public_headers']

        await self.bot.say('Headers are set to:\n```json\n{}```\nSet body typing in in a `name=value` one on each line or `pass`\nNested objects are not supported'
                           .format(json.dumps(public_headers, indent=4, sort_keys=True)))

        answer = await self.bot.wait_for_message(timeout=50, author=ctx.message.author)

        body = self._parse_body(answer)

        await self.bot.say('Body is set to:\n```json\n{}```'.format(json.dumps(body, indent=4, sort_keys=True)))

        url = url.strip()
        method = ctx.invoked_with

        if method == 'post':
            t1 = time.perf_counter()
            async with aiohttp.post(url, headers=headers, data=json.dumps(body)) as r:
                t2 = time.perf_counter()
                data = await r.text()
                status = r.status

        if method == 'put':
            if 'Content-Type' not in headers:
                headers['Content-Type'] = 'application/json'

            t1 = time.perf_counter()
            async with aiohttp.put(url, headers=headers, data=json.dumps(body)) as r:
                t2 = time.perf_counter()
                data = await r.text()
                status = r.status

        try:
            parsed = json.loads(data)
        except:
            parsed = json.loads('{}')

        color = status == 200 and 0x2ecc71 or status >= 400 and 0xe74c3c
        embed = discord.Embed(title='Results for **{}** {}'.format(method.upper(), url),
                              color=color,
                              description='```json\n{}```'.format(len(data) < 700
                                                                  and
                                                                  json.dumps(parsed, indent=4, sort_keys=True)
                                                                  or
                                                                  json.dumps(parsed, indent=4, sort_keys=True)[:700]
                                                                  + '\n\n...\n\n'))
        embed.add_field(name='Status',
                        value=status)
        embed.add_field(name='Time',
                        value='{}ms'.format(str((t2-t1) * 1000)[:3]))
        await self.bot.say(embed=embed)