我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests_toolbelt.MultipartEncoder()。
def send_image_local(recipient_id, image_path): data = { 'recipient': str({ 'id': recipient_id }), 'message': str({ 'attachment': { 'type': 'image', 'payload': { } } }), 'filedata': (os.path.basename(image_path), open(image_path, 'rb'), 'image/png') } data = MultipartEncoder(data) params = { 'access_token': os.environ["PAGE_ACCESS_TOKEN"] } headers = { 'Content-Type': data.content_type } r = requests.post("https://graph.facebook.com/v2.6/me/messages", params=params, headers=headers, data=data).json()
def uploadPhoto(self, photo, caption = None, upload_id = None): if upload_id is None: upload_id = str(int(time.time() * 1000)) data = { 'upload_id' : upload_id, '_uuid' : self.uuid, '_csrftoken' : self.token, 'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}', 'photo' : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'}) } m = MultipartEncoder(data, boundary=self.uuid) self.s.headers.update ({'X-IG-Capabilities' : '3Q4=', 'X-IG-Connection-Type' : 'WIFI', 'Cookie2' : '$Version=1', 'Accept-Language' : 'en-US', 'Accept-Encoding' : 'gzip, deflate', 'Content-type': m.content_type, 'Connection' : 'close', 'User-Agent' : self.USER_AGENT}) response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string()) if response.status_code == 200: if self.configure(upload_id, photo, caption): self.expose() return False
def uploadPhoto(self, photo, caption = None, upload_id = None): if upload_id is None: upload_id = str(int(time.time() * 1000)) data = { 'upload_id' : upload_id, '_uuid' : self.uuid, '_csrftoken' : self.token, 'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}', 'photo' : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'}) } m = MultipartEncoder(data, boundary=self.uuid) self.session.headers.update ({'X-IG-Capabilities' : '3Q4=', 'X-IG-Connection-Type' : 'WIFI', 'Cookie2' : '$Version=1', 'Accept-Language' : 'en-US', 'Accept-Encoding' : 'gzip, deflate', 'Content-type': m.content_type, 'Connection' : 'close', 'User-Agent' : self.USER_AGENT}) response = self.session.post(self.API_URL + "upload/photo/", data=m.to_string()) if response.status_code == 200: if self.configure(upload_id, photo, caption): self.expose() return False
def sendSparkPOST(url, data): m = MultipartEncoder(fields=data) request = requests.post(url, data=m, headers={"Content-Type": m.content_type, "Authorization": "Bearer "+bearer}) return request
def test_basic_multiple(self): first = ValueTarget() second = ValueTarget() third = ValueTarget() encoder = MultipartEncoder(fields={ 'first': 'foo', 'second': 'bar', 'third': 'baz' }) parser = StreamingFormDataParser( headers={'Content-Type': encoder.content_type}) parser.register('first', first) parser.register('second', second) parser.register('third', third) parser.data_received(encoder.to_string()) self.assertEqual(first.value, b'foo') self.assertEqual(second.value, b'bar') self.assertEqual(third.value, b'baz')
def test_chunked_single(self): expected_value = 'hello world' target = ValueTarget() encoder = MultipartEncoder(fields={'value': expected_value}) body = encoder.to_string() parser = StreamingFormDataParser( headers={'Content-Type': encoder.content_type}) parser.register('value', target) index = body.index(b'world') parser.data_received(body[:index]) parser.data_received(body[index:]) self.assertEqual(target.value, expected_value.encode('utf-8'))
def main(): args = parse_args() with open(args.filename, 'rb') as file_: content_type = mimetypes.guess_type(args.filename)[0] fields = { 'name': 'hello world', 'lines': 'first line\r\n\r\nsecond line', 'file': (args.filename, file_, content_type) } body = MultipartEncoder(fields=fields).to_string() if args.decode: print(body.decode('utf-8')) else: print(body)
def http_send_attachmail(to, cc, sub, content, filelist=[], mail_format="html", mail_from=mail_from): attachNum = str(len(filelist)) attachs = {} i = 1 for attach in filelist: idx = 'attach' + str(i) attachs[idx] = (attach, open(attach, "rb")) i+=1 fields = {"tos":to, "cc":cc, "subject":sub, "content":content, "from":mail_from, "format":mail_format, "attachNum":attachNum} fields = dict(fields, **attachs) m = MultipartEncoder(fields) headers = {"content-type":m.content_type} r = requests.post(mail_api, data=m, headers=headers) ret = r.json() status = str(ret['status']) + "-" + ret['msg'] sendlog(status, to + "|cc:" +cc, sub) return ret
def upload_data(gpu_ip, job_hash, data_path): url = 'http://%s:%s/runJobDecorator' % (gpu_ip, settings.GPU_PORT) file_size = path.getsize(data_path) pbar = tqdm(total=file_size, unit='B', unit_scale=True) def callback(monitor): progress = monitor.bytes_read - callback.last_bytes_read pbar.update(progress) callback.last_bytes_read = monitor.bytes_read callback.last_bytes_read = 0 with open(data_path, 'rb') as f: data = { 'file': ('uploads.pkl', f, 'application/octet-stream'), 'hash': job_hash } encoder = MultipartEncoder( fields=data ) monitor = MultipartEncoderMonitor(encoder, callback) r = requests.post(url, data=monitor, headers={ 'Content-Type': monitor.content_type}) remove(data_path) # pbar might not close when the user interrupts, need to fix this pbar.close() status_check(r)
def do_artifacts_artifact_add(opts): logging.debug('add artifact %r', opts) url = artifacts_url(opts.service) image = { 'name': opts.name, 'description': opts.description, } # build contents of multipart/form-data, image meta must come first, hence # we use an OrderedDict to preserve the order files = OrderedDict() for k, v in image.items(): files[k] = (None, io.StringIO(v)) # followed by firmware data # but first, try to find out the size of firmware data files['size'] = str(os.stat(opts.infile).st_size) files['artifact'] = (opts.infile, open(opts.infile, 'rb'), "application/octet-stream", {}) encoder = MultipartEncoder(files) if sys.stderr.isatty(): try: from requests_toolbelt import MultipartEncoderMonitor from clint.textui.progress import Bar as ProgressBar pb = ProgressBar(expected_size=encoder.len, filled_char='=', every=1024*1024) monitor = MultipartEncoderMonitor(encoder, lambda mon: pb.show(mon.bytes_read)) encoder = monitor except ImportError: pass with api_from_opts(opts) as api: rsp = api.post(url, data=encoder, headers={'Content-Type': encoder.content_type}) if rsp.status_code == 201: # created location = rsp.headers.get('Location', '') print("created with URL: {}".format(location)) print('artifact ID: ', location.rsplit('/')[-1]) else: errorprinter(rsp)
def send_attachment(self, recipient_id, attachment_type, attachment_path, notification_type=NotificationType.regular): """Send an attachment to the specified recipient using local path. Input: recipient_id: recipient id to send to attachment_type: type of attachment (image, video, audio, file) attachment_path: Path of attachment Output: Response from API as <dict> """ payload = { 'recipient': { { 'id': recipient_id } }, 'notification_type': notification_type, 'message': { { 'attachment': { 'type': attachment_type, 'payload': {} } } }, 'filedata': (os.path.basename(attachment_path), open(attachment_path, 'rb')) } multipart_data = MultipartEncoder(payload) multipart_header = { 'Content-Type': multipart_data.content_type } return requests.post(self.graph_url, data=multipart_data, params=self.auth_args, headers=multipart_header).json()
def post_file(session, domain='', file='', login=LOGIN): """ posts file to the cloud's upload server param: file - string filename with path """ assert domain is not None, 'no domain' assert file is not None, 'no file' filetype = guess_type(file)[0] if not filetype: filetype = DEFAULT_FILETYPE if LOGGER: LOGGER.warning('File {} type is unknown, using default: {}'.format(file, DEFAULT_FILETYPE)) filename = os.path.basename(file) quoted_login = quote_plus(login) timestamp = str(int(time.mktime(datetime.datetime.now().timetuple()))) + TIME_AMEND url = urljoin(domain, '?cloud_domain=' + str(CLOUD_DOMAIN_ORD) + '&x-email=' + quoted_login + '&fileapi' + timestamp) m = MultipartEncoder(fields={'file': (quote_plus(filename), open(file, 'rb'), filetype)}) try: r = session.post(url, data=m, headers={'Content-Type': m.content_type}, verify = VERIFY_SSL) except Exception as e: if LOGGER: LOGGER.error('Post file HTTP request error: {}'.format(e)) return (None, None) if r.status_code == requests.codes.ok: if len(r.content): hash = r.content[:40].decode() size = int(r.content[41:-2]) return (hash, size) elif LOGGER: LOGGER.error('File {} post error, no hash and size received'.format(file)) elif LOGGER: LOGGER.error('File {} post error, http code: {}, msg: {}'.format(file, r.status_code, r.text)) return (None, None)
def __repr__(self): return '<MultipartEncoder: {0!r}>'.format(self.fields)
def __init__(self, encoder, callback=None): #: Instance of the :class:`MultipartEncoder` being monitored self.encoder = encoder #: Optionally function to call after a read self.callback = callback or IDENTITY #: Number of bytes already read from the :class:`MultipartEncoder` #: instance self.bytes_read = 0 #: Avoid the same problem in bug #80 self.len = self.encoder.len
def from_fields(cls, fields, boundary=None, encoding='utf-8', callback=None): encoder = MultipartEncoder(fields, boundary, encoding) return cls(encoder, callback)
def send_image(self, recipient_id, image_path): '''Send an image to the specified recipient. Image must be PNG or JPEG or GIF (more might be supported). https://developers.facebook.com/docs/messenger-platform/send-api-reference/image-attachment Input: recipient_id: recipient id to send to image_path: path to image to be sent Output: Response from API as <dict> ''' payload = { 'recipient': json.dumps( { 'id': recipient_id } ), 'message': json.dumps( { 'attachment': { 'type': 'image', 'payload': {} } } ), 'filedata': (image_path, open(image_path, 'rb')) } multipart_data = MultipartEncoder(payload) multipart_header = { 'Content-Type': multipart_data.content_type } return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
def send_audio(self, recipient_id, audio_path): '''Send audio to the specified recipient. Audio must be MP3 or WAV https://developers.facebook.com/docs/messenger-platform/send-api-reference/audio-attachment Input: recipient_id: recipient id to send to audio_path: path to audio to be sent Output: Response from API as <dict> ''' payload = { 'recipient': json.dumps( { 'id': recipient_id } ), 'message': json.dumps( { 'attachment': { 'type': 'audio', 'payload': {} } } ), 'filedata': (audio_path, open(image_path, 'rb')) } multipart_data = MultipartEncoder(payload) multipart_header = { 'Content-Type': multipart_data.content_type } return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
def send_file(self, recipient_id, file_path): '''Send file to the specified recipient. https://developers.facebook.com/docs/messenger-platform/send-api-reference/file-attachment Input: recipient_id: recipient id to send to file_path: path to file to be sent Output: Response from API as <dict> ''' payload = { 'recipient': json.dumps( { 'id': recipient_id } ), 'message': json.dumps( { 'attachment': { 'type': 'file', 'payload': {} } } ), 'filedata': (file_path, open(image_path, 'rb')) } multipart_data = MultipartEncoder(payload) multipart_header = { 'Content-Type': multipart_data.content_type } return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
def create_upload(self, filepath): return MultipartEncoder({ 'file': (os.path.basename(filepath), open(filepath, 'rb'), self.file_type) })
def _upload(title, author, text, author_url='', tph_uuid=None, page_id=None, user_agent=default_user_agent, convert_html=True, clean_html=True): if not title: raise TitleRequiredError('Title is required') if not text: raise TextRequiredError('Text is required') content = convert_html_to_telegraph_format(text, clean_html) if convert_html else text cookies = dict(tph_uuid=tph_uuid) if tph_uuid and page_id else None fields = { 'Data': ('content.html', content, 'plain/text'), 'title': title, 'author': author, 'author_url': author_url, 'page_id': page_id or '0' } m = MultipartEncoder(fields, boundary='TelegraPhBoundary21') headers = { 'Content-Type': m.content_type, 'Accept': 'application/json, text/javascript, */*; q=0.01', 'User-Agent': user_agent } r = requests.Session() r.mount('https://', requests.adapters.HTTPAdapter(max_retries=3)) response = r.post(save_url, timeout=4, headers=headers, cookies=cookies, data=m.to_string()) result = json.loads(response.text) if 'path' in result: result['tph_uuid'] = response.cookies.get('tph_uuid') or tph_uuid result['url'] = base_url + '/' + result['path'] return result else: error_msg = result['error'] if 'error' in result else '' raise TelegraphError(error_msg)
def handle(self, *args, **options): self.stdout.write("Uploading database to central server...\n") encoder = MultipartEncoder({ "project": options['project'], "file": ("db.sqlite3", open(DB_PATH, "rb"), "application/octet-stream") }) monitor = MultipartEncoderMonitor(encoder, create_callback(encoder)) r = requests.post(CENTRAL_SERVER_DB_UPLOAD_URL, data=monitor, headers={"Content-Type": monitor.content_type}) print("\nUpload finished! (Returned status {0} {1})".format(r.status_code, r.reason))
def api_upload(service, encData, encMeta, keys): ''' Uploads data to Send. Caution! Data is uploaded as given, this function will not encrypt it for you ''' service += 'api/upload' files = requests_toolbelt.MultipartEncoder(fields={'file': ('blob', encData, 'application/octet-stream') }) pbar = progbar(files.len) monitor = requests_toolbelt.MultipartEncoderMonitor(files, lambda files: pbar.update(monitor.bytes_read - pbar.n)) headers = { 'X-File-Metadata' : unpadded_urlsafe_b64encode(encMeta), 'Authorization' : 'send-v1 ' + unpadded_urlsafe_b64encode(keys.authKey), 'Content-type' : monitor.content_type } r = requests.post(service, data=monitor, headers=headers, stream=True) r.raise_for_status() pbar.close() body_json = r.json() secretUrl = body_json['url'] + '#' + unpadded_urlsafe_b64encode(keys.secretKey) fileId = body_json['id'] fileNonce = unpadded_urlsafe_b64decode(r.headers['WWW-Authenticate'].replace('send-v1 ', '')) try: owner_token = body_json['owner'] except: owner_token = body_json['delete'] return secretUrl, fileId, fileNonce, owner_token
def generate_payload(event): """ returns a file-like MultipartEncoder instance that can be used to write-out the multi-part request headers and body :param event: dict payload to send as "metadata" part in multi-part request :return: MultipartEncoder """ return MultipartEncoder({"metadata": (None, io.BytesIO(json.dumps(event).encode()), 'application/json')})
def post_update(context, update): """ Updates a Cisco Spark room :param context: button state and configuration :type context: ``dict`` :param update: content of the update to be posted there :type update: ``str`` or ``dict`` If the update is a simple string, it is sent as such to Cisco Spark. Else if it a dictionary, then it is encoded as MIME Multipart. """ logging.info("Posting update to Cisco Spark room") url = 'https://api.ciscospark.com/v1/messages' headers = {'Authorization': 'Bearer '+context['spark']['CISCO_SPARK_BTTN_BOT']} if isinstance(update, dict): update['roomId'] = context['spark']['id'] payload = MultipartEncoder(fields=update) headers['Content-Type'] = payload.content_type else: payload = {'roomId': context['spark']['id'], 'text': update } response = requests.post(url=url, headers=headers, data=payload) if response.status_code != 200: logging.info(response.json()) raise Exception("Received error code {}".format(response.status_code)) logging.info('- done, check the room with Cisco Spark client software') # # handle Twilio API #
def load_file(path): _, filename = os.path.split(path) with open(path, 'rb') as file_: fields = { filename: (filename, file_, 'text/plain') } encoder = MultipartEncoder(fields=fields) return (encoder.content_type, encoder.to_string())
def test_smoke(self): encoder = MultipartEncoder(fields={'name': 'hello'}) parser = StreamingFormDataParser( headers={'Content-Type': encoder.content_type}) parser.data_received(encoder.to_string())
def test_basic_single(self): target = ValueTarget() encoder = MultipartEncoder(fields={'value': 'hello world'}) parser = StreamingFormDataParser( headers={'Content-Type': encoder.content_type}) parser.register('value', target) parser.data_received(encoder.to_string()) self.assertEqual(target.value, b'hello world')
def test_break_chunk_at_boundary(self): expected_first_value = 'hello' * 500 expected_second_value = 'hello' * 500 first = ValueTarget() second = ValueTarget() encoder = MultipartEncoder(fields={ 'first': 'hello' * 500, 'second': 'hello' * 500 }) body = encoder.to_string() boundary = encoder.boundary.encode('utf-8') parser = StreamingFormDataParser( headers={'Content-Type': encoder.content_type}) parser.register('first', first) parser.register('second', second) index = body[50:].index(boundary) + 5 parser.data_received(body[:index]) parser.data_received(body[index:]) self.assertEqual(first.value, expected_first_value.encode('utf-8')) self.assertEqual(second.value, expected_second_value.encode('utf-8'))
def test_mixed_content_varying_chunk_size(self): with open(data_file_path('file.txt'), 'rb') as file_: expected_value = file_.read() with open(data_file_path('file.txt'), 'rb') as file_: fields = { 'name': 'hello world', 'age': '10', 'cv.txt': ('file.txt', file_, 'text/plain') } encoder = MultipartEncoder(fields=fields) body = encoder.to_string() content_type = encoder.content_type for index in range(len(body)): name = ValueTarget() age = ValueTarget() cv = ValueTarget() parser = StreamingFormDataParser( headers={'Content-Type': content_type}) parser.register('name', name) parser.register('age', age) parser.register('cv.txt', cv) parser.data_received(body[:index]) parser.data_received(body[index:]) self.assertEqual(name.value, b'hello world') self.assertEqual(age.value, b'10') self.assertEqual(cv.value, expected_value)
def test_parameter_contains_crlf(self): target = ValueTarget() encoder = MultipartEncoder(fields={'value': 'hello\r\nworld'}) parser = StreamingFormDataParser( headers={'Content-Type': encoder.content_type}) parser.register('value', target) parser.data_received(encoder.to_string()) self.assertEqual(target.value, b'hello\r\nworld')
def test_parameter_ends_with_crlf(self): target = ValueTarget() encoder = MultipartEncoder(fields={'value': 'hello\r\n'}) parser = StreamingFormDataParser( headers={'Content-Type': encoder.content_type}) parser.register('value', target) parser.data_received(encoder.to_string()) self.assertEqual(target.value, b'hello\r\n')
def test_parameter_starts_with_crlf(self): target = ValueTarget() encoder = MultipartEncoder(fields={'value': '\r\nworld'}) parser = StreamingFormDataParser( headers={'Content-Type': encoder.content_type}) parser.register('value', target) parser.data_received(encoder.to_string()) self.assertEqual(target.value, b'\r\nworld')
def test_register_after_data_received(self): encoder = MultipartEncoder(fields={'name': 'hello'}) parser = StreamingFormDataParser( headers={'Content-Type': encoder.content_type}) parser.data_received(encoder.to_string()) self.assertRaises(ParseFailedException, parser.register, 'name', ValueTarget())
def main(): args = parse_args() with open(args.filename, 'rb') as fd: encoder = MultipartEncoder(fields={ 'file': ('file', fd, args.content_type) }) parser = StreamingFormDataParser( headers={'Content-Type': encoder.content_type}) parser.register('file', ValueTarget()) parser.data_received(encoder.to_string())
def _import_file(self, filename): if not self.api_key: key = self.device.keygen() else: key = self.api_key params = { 'type': 'import', 'category': 'configuration', 'key': key } path = os.path.basename(filename) mef = requests_toolbelt.MultipartEncoder( fields={ 'file': (path, open(filename, 'rb'), 'application/octet-stream') } ) requests.packages.urllib3.disable_warnings(InsecureRequestWarning) url = 'https://{0}/api/'.format(self.hostname) request = requests.post( url, verify=False, params=params, headers={'Content-Type': mef.content_type}, data=mef ) # if something goes wrong just raise an exception request.raise_for_status() response = xml.etree.ElementTree.fromstring(request.content) if response.attrib['status'] == 'error': return False else: return path
def uploadPhoto(self, photo, caption = None, upload_id = None, is_sidecar = None): if upload_id is None: upload_id = str(int(time.time() * 1000)) data = { 'upload_id' : upload_id, '_uuid' : self.uuid, '_csrftoken' : self.token, 'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}', 'photo' : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'}) } if is_sidecar: data['is_sidecar']='1' m = MultipartEncoder(data, boundary=self.uuid) self.s.headers.update ({'X-IG-Capabilities' : '3Q4=', 'X-IG-Connection-Type' : 'WIFI', 'Cookie2' : '$Version=1', 'Accept-Language' : 'en-US', 'Accept-Encoding' : 'gzip, deflate', 'Content-type': m.content_type, 'Connection' : 'close', 'User-Agent' : self.USER_AGENT}) response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string()) if response.status_code == 200: if self.configure(upload_id, photo, caption): self.expose() return False