我们从Python开源项目中,提取了以下17个代码示例,用于说明如何使用aiohttp.FormData()。
def sendFile(session, url, apiKey, action, fileName): headers = { 'X-Api-Key': apiKey } filenameWithId = addUniqueIdToFile(fileName) data = {} if(action == actions.COMMAND_LOAD): data = FormData() data.add_field('file', open('data/file.gco','rb'), filename=filenameWithId, content_type='application/octet-stream') async with session.post(url,headers=headers, data=data) as response: await response.text() data = {'command': 'select'} async with session.post(url+'/'+filenameWithId, headers=headers, json=data) as responseCommand: return await responseCommand.read(), responseCommand.status
def _generate_form_data(**kwargs: Union[Any, BotoFairu]) -> aiohttp.FormData: form_fata = aiohttp.FormData() for name, value in kwargs.items(): if isinstance(value, BotoFairu): form_fata.add_field( name, value._content, content_type=value._content_type, filename=value._filename, content_transfer_encoding=value._content_transfer_encoding) elif isinstance(value, (list, dict)): form_fata.add_field(name, json.dumps(value)) elif isinstance(value, (int, float, str, bool)): form_fata.add_field(name, str(value)) else: raise TypeError("Unknown Type: {}".format(type(value))) return form_fata
def test_form_data_with_multipart(self, app, router): class MainRouting(Routing): @post('/') async def multipart(self): name = self.request.form.get('file') return name.name router.add(MainRouting) app.use(router) data = FormData() data.add_field('name', 'foo') data.add_field('file', 'file content', filename='file.file', content_type='image/img') res = TestClient(app).post('/', data=data) assert res.text == 'file.file'
def call( self, method: str, data: Dict[str, str]=None ) -> Dict[str, Any]: """Call API methods.""" async with aiohttp.ClientSession() as session: form = aiohttp.FormData(data or {}) form.add_field('token', self.config.TOKEN) async with session.post( 'https://slack.com/api/{}'.format(method), data=form ) as response: if response.status == 200: return await response.json(loads=ujson.loads) else: raise APICallError('fail to call {} with {}'.format( method, data ))
def test_POST_DATA_with_content_transfer_encoding(loop, test_client): @asyncio.coroutine def handler(request): data = yield from request.post() assert b'123' == data['name'] return web.Response() app = web.Application() app.router.add_post('/', handler) client = yield from test_client(app) form = FormData() form.add_field('name', b'123', content_transfer_encoding='base64') resp = yield from client.post('/', data=form) assert 200 == resp.status
def test_100_continue(loop, test_client): @asyncio.coroutine def handler(request): data = yield from request.post() assert b'123' == data['name'] return web.Response() form = FormData() form.add_field('name', b'123', content_transfer_encoding='base64') app = web.Application() app.router.add_post('/', handler) client = yield from test_client(app) resp = yield from client.post('/', data=form, expect100=True) assert 200 == resp.status
def generate(**kwargs: Any) -> Tuple[ Dict[str, str], Union[AnyStr, aiohttp.FormData]]: headers = {} # type: Dict[str, str] try: data = json.dumps(kwargs) headers["Content-Type"] = "application/json" except: # Fallback to Form Data. data = _generate_form_data(**kwargs) return headers, data
def upload_audio_message(api, multipart_data, peer_id) -> Optional[Attachment]: """Upload audio file `multipart_data` and return Attachment for sending to user with id `peer_id`(possibly)""" sender = api.get_default_sender("docs.getMessagesUploadServer") client = api.get_current_sender("docs.getMessagesUploadServer", sender=sender) data = aiohttp.FormData() data.add_field('file', multipart_data, filename="message.mp3", content_type='multipart/form-data') values = {'type': "audio_message", 'peer_id': peer_id} if client.group_id: values['group_id'] = client.group_id response = await api(sender=sender).docs.getMessagesUploadServer(**values) if not response or not response.get('upload_url'): return None upload_url = response['upload_url'] async with aiohttp.ClientSession() as sess: async with sess.post(upload_url, data=data) as resp: result = json.loads(await resp.text()) if not result: return None data = dict(file=result['file']) result = (await api(sender=sender).docs.save(**data))[0] return Attachment.from_upload_result(result, "doc")
def upload_doc(api, multipart_data, filename="image.png") -> Optional[Attachment]: """Upload file `multipart_data` and return Attachment for sending to user""" sender = api.get_default_sender("docs.getWallUploadServer") client = api.get_current_sender("docs.getWallUploadServer", sender=sender) data = aiohttp.FormData() data.add_field('file', multipart_data, filename=filename, content_type='multipart/form-data') values = {} if client.group_id: values['group_id'] = client.group_id response = await api(sender=sender).docs.getWallUploadServer(**values) if not response or not response.get('upload_url'): return None upload_url = response['upload_url'] async with aiohttp.ClientSession() as sess: async with sess.post(upload_url, data=data) as resp: result = json.loads(await resp.text()) if not result: return None data = dict(file=result['file']) result = (await api(sender=sender).docs.save(**data))[0] return Attachment.from_upload_result(result, "doc")
def upload_photo(api, multipart_data, peer_id=None): """ Upload photo file `multipart_data` and return Attachment for sending to user with id `peer_id`(optional but recommended) """ sender = api.get_default_sender('photos.getMessagesUploadServer') data = aiohttp.FormData() data.add_field('photo', multipart_data, filename='picture.png', content_type='multipart/form-data') kwargs = {} if peer_id: kwargs["peer_id"] = peer_id response = await api(sender=sender).photos.getMessagesUploadServer(**kwargs) if not response or not response.get('upload_url'): return None upload_url = response['upload_url'] async with aiohttp.ClientSession() as sess: async with sess.post(upload_url, data=data) as resp: result = json.loads(await resp.text()) if not result: return None data = {'photo': result['photo'], 'hash': result['hash'], 'server': result['server']} result = await api(sender=sender).photos.saveMessagesPhoto(**data) if not result: return None return Attachment.from_upload_result(result[0])
def call(method, file=None, **kwargs): r""" Perform an API call to Slack. :param file: File pointer :type file: file :param \**kwargs: see below :Keyword Arguments: All the arguments required by the method from the `Slack Web API`_. :returns: JSON response. :rtype: dict """ # JSON encode any sub-structure... for k, w in kwargs.items(): # keep str as is. if not isinstance(w, (bytes, str)): kwargs[k] = json.dumps(w) form = FormData(kwargs) # Handle file upload if file: form.add_field('file', file) logging.debug('POST (m=%s) /api/%s %s', form.is_multipart, method, kwargs) with ClientSession() as session: with Timeout(10): response = yield from session.post('https://{0}/api/{1}' .format(SLACK_DOMAIN, method), data=form) assert 200 == response.status, response try: body = yield from response.json() logging.debug('Response /api/%s %d %s', method, response.status, body) return body finally: yield from response.release()
def upload(): data = FormData() data.add_field('package', open('tests/QXmokuai_3.apk', 'rb'), filename='QXmokuai_3.apk') data.add_field('msg', 'test upload') _, response = app.test_client.post('/upload/app', data=data) response_normal_check(response) return response
def api_call(self, method, data=None): """ Slack API call. https://medium.com/@greut/a-slack-bot-with-pythons-3-5-asyncio-ad766d8b5d8f """ with aiohttp.ClientSession() as session: form = aiohttp.FormData(data or {}) form.add_field('token', self.api_token) async with session.post('https://slack.com/api/{0}'.format(method), data=form) as response: if response.status != 200: raise ValueError('{0} with {1} failed.'.format(method, data)) return await response.json()
def test_files_upload_with_same_key(loop, test_client): @asyncio.coroutine def handler(request): data = yield from request.post() files = data.getall('file') file_names = set() for _file in files: assert not _file.file.closed if _file.filename == 'test1.jpeg': assert _file.file.read() == b'binary data 1' if _file.filename == 'test2.jpeg': assert _file.file.read() == b'binary data 2' file_names.add(_file.filename) assert len(files) == 2 assert file_names == {'test1.jpeg', 'test2.jpeg'} resp = web.Response(body=b'OK') return resp app = web.Application() app.router.add_post('/', handler) client = yield from test_client(app) data = FormData() data.add_field('file', b'binary data 1', content_type='image/jpeg', filename='test1.jpeg') data.add_field('file', b'binary data 2', content_type='image/jpeg', filename='test2.jpeg') resp = yield from client.post('/', data=data) assert 200 == resp.status
def test_100_continue_custom(loop, test_client): expect_received = False @asyncio.coroutine def handler(request): data = yield from request.post() assert b'123' == data['name'] return web.Response() @asyncio.coroutine def expect_handler(request): nonlocal expect_received expect_received = True if request.version == HttpVersion11: request.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n") form = FormData() form.add_field('name', b'123', content_transfer_encoding='base64') app = web.Application() app.router.add_post('/', handler, expect_handler=expect_handler) client = yield from test_client(app) resp = yield from client.post('/', data=form, expect100=True) assert 200 == resp.status assert expect_received
def test_100_continue_custom_response(loop, test_client): @asyncio.coroutine def handler(request): data = yield from request.post() assert b'123', data['name'] return web.Response() @asyncio.coroutine def expect_handler(request): if request.version == HttpVersion11: if auth_err: return web.HTTPForbidden() request.transport.write(b"HTTP/1.1 100 Continue\r\n\r\n") form = FormData() form.add_field('name', b'123', content_transfer_encoding='base64') app = web.Application() app.router.add_post('/', handler, expect_handler=expect_handler) client = yield from test_client(app) auth_err = False resp = yield from client.post('/', data=form, expect100=True) assert 200 == resp.status auth_err = True resp = yield from client.post('/', data=form, expect100=True) assert 403 == resp.status
def request_webhook(self, partialurl, content=None, username=None, avatar_url=None, tts=False, file=None, embeds=None, filename=None): """Requests an webhook with the data provided to this function.""" if self.create_form_data: self.create_form_data = False self.partialurl = partialurl self.content = content self.username = username self.avatar_url = avatar_url self.tts = tts self.file = file self.embeds = embeds if filename is None: filename = 'image.jpg' if self.partialurl is not None: if self.content is not None: self.payload['content'] = self.content if self.username is not None: self.payload['username'] = self.username if self.avatar_url is not None: self.payload['avatar_url'] = self.avatar_url if self.tts: self.payload['tts'] = self.tts if self.file is not None: self.create_form_data = True if self.embeds is not None: self.payload['embeds'] = self.embeds if self.create_form_data: self.form = aiohttp.FormData() self.form.add_field('payload_json', discord.utils.to_json(self.payload)) self.form.add_field('file', self.file, filename=filename, content_type='multipart/form-data') yield from self.http.request( WebHookRoute( 'POST', self.partialurl), data=self.form) else: yield from self.http.request( WebHookRoute( 'POST', self.partialurl), json=self.payload)