Python requests_toolbelt 模块,MultipartEncoder() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests_toolbelt.MultipartEncoder()

项目:math_bot    作者:chirag9127    | 项目源码 | 文件源码
def send_image_local(recipient_id, image_path):
    data = {
        'recipient': str({
            'id': recipient_id
        }),
        'message': str({
            'attachment': {
                'type': 'image',
                'payload': {
                }
            }
        }),
        'filedata': (os.path.basename(image_path), open(image_path, 'rb'),
                     'image/png')
    }
    data = MultipartEncoder(data)
    params = {
        'access_token': os.environ["PAGE_ACCESS_TOKEN"]
    }
    headers = {
        'Content-Type': data.content_type
    }
    r = requests.post("https://graph.facebook.com/v2.6/me/messages",
                      params=params, headers=headers, data=data).json()
项目:ig_upload_bot    作者:skawm    | 项目源码 | 文件源码
def uploadPhoto(self, photo, caption = None, upload_id = None):
        if upload_id is None:
            upload_id = str(int(time.time() * 1000))
        data = {
        'upload_id'         : upload_id,
        '_uuid'             : self.uuid,
        '_csrftoken'        : self.token,
        'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
        'photo'             : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
        }
        m = MultipartEncoder(data, boundary=self.uuid)
        self.s.headers.update ({'X-IG-Capabilities' : '3Q4=',
                                'X-IG-Connection-Type' : 'WIFI',
                                'Cookie2' : '$Version=1',
                                'Accept-Language' : 'en-US',
                                'Accept-Encoding' : 'gzip, deflate',
                                'Content-type': m.content_type,
                                'Connection' : 'close',
                                'User-Agent' : self.USER_AGENT})
        response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string())
        if response.status_code == 200:
            if self.configure(upload_id, photo, caption):
                self.expose()
        return False
项目:ig_upload_bot    作者:skawm    | 项目源码 | 文件源码
def uploadPhoto(self, photo, caption = None, upload_id = None):
        if upload_id is None:
            upload_id = str(int(time.time() * 1000))
        data = {
        'upload_id'         : upload_id,
        '_uuid'             : self.uuid,
        '_csrftoken'        : self.token,
        'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
        'photo'             : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
        }
        m = MultipartEncoder(data, boundary=self.uuid)
        self.s.headers.update ({'X-IG-Capabilities' : '3Q4=',
                                'X-IG-Connection-Type' : 'WIFI',
                                'Cookie2' : '$Version=1',
                                'Accept-Language' : 'en-US',
                                'Accept-Encoding' : 'gzip, deflate',
                                'Content-type': m.content_type,
                                'Connection' : 'close',
                                'User-Agent' : self.USER_AGENT})
        response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string())
        if response.status_code == 200:
            if self.configure(upload_id, photo, caption):
                self.expose()
        return False
项目:InstagramPi    作者:sabas1080    | 项目源码 | 文件源码
def uploadPhoto(self, photo, caption = None, upload_id = None):
        if upload_id is None:
            upload_id = str(int(time.time() * 1000))
        data = {
        'upload_id'         : upload_id,
        '_uuid'             : self.uuid,
        '_csrftoken'        : self.token,
        'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
        'photo'             : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
        }
        m = MultipartEncoder(data, boundary=self.uuid)
        self.s.headers.update ({'X-IG-Capabilities' : '3Q4=',
                                'X-IG-Connection-Type' : 'WIFI',
                                'Cookie2' : '$Version=1',
                                'Accept-Language' : 'en-US',
                                'Accept-Encoding' : 'gzip, deflate',
                                'Content-type': m.content_type,
                                'Connection' : 'close',
                                'User-Agent' : self.USER_AGENT})
        response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string())
        if response.status_code == 200:
            if self.configure(upload_id, photo, caption):
                self.expose()
        return False
项目:instagram_analyzer    作者:dsouzarc    | 项目源码 | 文件源码
def uploadPhoto(self, photo, caption = None, upload_id = None):
        if upload_id is None:
            upload_id = str(int(time.time() * 1000))
        data = {
        'upload_id'         : upload_id,
        '_uuid'             : self.uuid,
        '_csrftoken'        : self.token,
        'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
        'photo'             : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
        }
        m = MultipartEncoder(data, boundary=self.uuid)
        self.session.headers.update ({'X-IG-Capabilities' : '3Q4=',
                                'X-IG-Connection-Type' : 'WIFI',
                                'Cookie2' : '$Version=1',
                                'Accept-Language' : 'en-US',
                                'Accept-Encoding' : 'gzip, deflate',
                                'Content-type': m.content_type,
                                'Connection' : 'close',
                                'User-Agent' : self.USER_AGENT})
        response = self.session.post(self.API_URL + "upload/photo/", data=m.to_string())
        if response.status_code == 200:
            if self.configure(upload_id, photo, caption):
                self.expose()
        return False
项目:whereis    作者:matfowle    | 项目源码 | 文件源码
def sendSparkPOST(url, data):
    m = MultipartEncoder(fields=data)
    request = requests.post(url, data=m,
                            headers={"Content-Type": m.content_type,
                            "Authorization": "Bearer "+bearer})
    return request
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def test_basic_multiple(self):
        first = ValueTarget()
        second = ValueTarget()
        third = ValueTarget()

        encoder = MultipartEncoder(fields={
            'first': 'foo',
            'second': 'bar',
            'third': 'baz'
        })

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})

        parser.register('first', first)
        parser.register('second', second)
        parser.register('third', third)

        parser.data_received(encoder.to_string())

        self.assertEqual(first.value, b'foo')
        self.assertEqual(second.value, b'bar')
        self.assertEqual(third.value, b'baz')
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def test_chunked_single(self):
        expected_value = 'hello world'

        target = ValueTarget()

        encoder = MultipartEncoder(fields={'value': expected_value})

        body = encoder.to_string()

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.register('value', target)

        index = body.index(b'world')

        parser.data_received(body[:index])
        parser.data_received(body[index:])

        self.assertEqual(target.value, expected_value.encode('utf-8'))
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def main():
    args = parse_args()

    with open(args.filename, 'rb') as file_:
        content_type = mimetypes.guess_type(args.filename)[0]

        fields = {
            'name': 'hello world',
            'lines': 'first line\r\n\r\nsecond line',
            'file': (args.filename, file_, content_type)
        }

        body = MultipartEncoder(fields=fields).to_string()

        if args.decode:
            print(body.decode('utf-8'))
        else:
            print(body)
项目:humble.bot    作者:anisayari    | 项目源码 | 文件源码
def uploadPhoto(self, photo, caption = None, upload_id = None):
        if upload_id is None:
            upload_id = str(int(time.time() * 1000))
        data = {
        'upload_id'         : upload_id,
        '_uuid'             : self.uuid,
        '_csrftoken'        : self.token,
        'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
        'photo'             : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
        }
        m = MultipartEncoder(data, boundary=self.uuid)
        self.s.headers.update ({'X-IG-Capabilities' : '3Q4=',
                                'X-IG-Connection-Type' : 'WIFI',
                                'Cookie2' : '$Version=1',
                                'Accept-Language' : 'en-US',
                                'Accept-Encoding' : 'gzip, deflate',
                                'Content-type': m.content_type,
                                'Connection' : 'close',
                                'User-Agent' : self.USER_AGENT})
        response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string())
        if response.status_code == 200:
            if self.configure(upload_id, photo, caption):
                self.expose()
        return False
项目:zabbix-alert    作者:annProg    | 项目源码 | 文件源码
def http_send_attachmail(to, cc, sub, content, filelist=[], mail_format="html", mail_from=mail_from):
    attachNum = str(len(filelist))
    attachs = {}
    i = 1
    for attach in filelist:
        idx = 'attach' + str(i)
        attachs[idx] = (attach, open(attach, "rb"))
        i+=1

    fields = {"tos":to, "cc":cc, "subject":sub, "content":content, "from":mail_from, "format":mail_format, "attachNum":attachNum}
    fields = dict(fields, **attachs)
    m = MultipartEncoder(fields)
    headers = {"content-type":m.content_type}
    r = requests.post(mail_api, data=m, headers=headers)
    ret = r.json()
    status = str(ret['status']) + "-" + ret['msg']
    sendlog(status, to + "|cc:" +cc, sub)
    return ret
项目:catalearn    作者:Catalearn    | 项目源码 | 文件源码
def upload_data(gpu_ip, job_hash, data_path):
    url = 'http://%s:%s/runJobDecorator' % (gpu_ip, settings.GPU_PORT)
    file_size = path.getsize(data_path)
    pbar = tqdm(total=file_size, unit='B', unit_scale=True)

    def callback(monitor):
        progress = monitor.bytes_read - callback.last_bytes_read
        pbar.update(progress)
        callback.last_bytes_read = monitor.bytes_read
    callback.last_bytes_read = 0

    with open(data_path, 'rb') as f:
        data = {
            'file': ('uploads.pkl', f, 'application/octet-stream'),
            'hash': job_hash
        }
        encoder = MultipartEncoder(
            fields=data
        )
        monitor = MultipartEncoderMonitor(encoder, callback)
        r = requests.post(url, data=monitor, headers={
            'Content-Type': monitor.content_type})

    remove(data_path)
    # pbar might not close when the user interrupts, need to fix this
    pbar.close()
    status_check(r)
项目:mender-backend-cli    作者:bboozzoo    | 项目源码 | 文件源码
def do_artifacts_artifact_add(opts):
    logging.debug('add artifact %r', opts)
    url = artifacts_url(opts.service)
    image = {
        'name': opts.name,
        'description': opts.description,
    }
    # build contents of multipart/form-data, image meta must come first, hence
    # we use an OrderedDict to preserve the order
    files = OrderedDict()
    for k, v in image.items():
        files[k] = (None, io.StringIO(v))
    # followed by firmware data
    # but first, try to find out the size of firmware data
    files['size'] = str(os.stat(opts.infile).st_size)
    files['artifact'] = (opts.infile, open(opts.infile, 'rb'), "application/octet-stream", {})

    encoder = MultipartEncoder(files)

    if sys.stderr.isatty():
        try:
            from requests_toolbelt import MultipartEncoderMonitor
            from clint.textui.progress import Bar as ProgressBar

            pb = ProgressBar(expected_size=encoder.len, filled_char='=', every=1024*1024)
            monitor = MultipartEncoderMonitor(encoder,
                                              lambda mon: pb.show(mon.bytes_read))
            encoder = monitor
        except ImportError:
            pass

    with api_from_opts(opts) as api:
        rsp = api.post(url, data=encoder,
                       headers={'Content-Type': encoder.content_type})
        if rsp.status_code == 201:
            # created
            location = rsp.headers.get('Location', '')
            print("created with URL: {}".format(location))
            print('artifact ID: ', location.rsplit('/')[-1])
        else:
            errorprinter(rsp)
项目:pymessenger    作者:davidchua    | 项目源码 | 文件源码
def send_attachment(self, recipient_id, attachment_type, attachment_path,
                        notification_type=NotificationType.regular):
        """Send an attachment to the specified recipient using local path.
        Input:
            recipient_id: recipient id to send to
            attachment_type: type of attachment (image, video, audio, file)
            attachment_path: Path of attachment
        Output:
            Response from API as <dict>
        """
        payload = {
            'recipient': {
                {
                    'id': recipient_id
                }
            },
            'notification_type': notification_type,
            'message': {
                {
                    'attachment': {
                        'type': attachment_type,
                        'payload': {}
                    }
                }
            },
            'filedata': (os.path.basename(attachment_path), open(attachment_path, 'rb'))
        }
        multipart_data = MultipartEncoder(payload)
        multipart_header = {
            'Content-Type': multipart_data.content_type
        }
        return requests.post(self.graph_url, data=multipart_data,
                             params=self.auth_args, headers=multipart_header).json()
项目:mail.ru-uploader    作者:instefa    | 项目源码 | 文件源码
def post_file(session, domain='', file='', login=LOGIN):
    """ posts file to the cloud's upload server
    param: file - string filename with path
    """
    assert domain is not None, 'no domain'
    assert file is not None, 'no file'

    filetype = guess_type(file)[0]
    if not filetype:
        filetype = DEFAULT_FILETYPE
        if LOGGER:
            LOGGER.warning('File {} type is unknown, using default: {}'.format(file, DEFAULT_FILETYPE))

    filename = os.path.basename(file)
    quoted_login = quote_plus(login)
    timestamp = str(int(time.mktime(datetime.datetime.now().timetuple()))) + TIME_AMEND
    url = urljoin(domain, '?cloud_domain=' + str(CLOUD_DOMAIN_ORD) + '&x-email=' + quoted_login + '&fileapi' + timestamp)
    m = MultipartEncoder(fields={'file': (quote_plus(filename), open(file, 'rb'), filetype)})

    try:
        r = session.post(url, data=m, headers={'Content-Type': m.content_type}, verify = VERIFY_SSL)
    except Exception as e:
        if LOGGER:
            LOGGER.error('Post file HTTP request error: {}'.format(e))
        return (None, None)

    if r.status_code == requests.codes.ok:
        if len(r.content):
            hash = r.content[:40].decode()
            size = int(r.content[41:-2])
            return (hash, size)
        elif LOGGER:
            LOGGER.error('File {} post error, no hash and size received'.format(file))
    elif LOGGER:
        LOGGER.error('File {} post error, http code: {}, msg: {}'.format(file, r.status_code, r.text))
    return (None, None)
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def __repr__(self):
        return '<MultipartEncoder: {0!r}>'.format(self.fields)
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def __init__(self, encoder, callback=None):
        #: Instance of the :class:`MultipartEncoder` being monitored
        self.encoder = encoder

        #: Optionally function to call after a read
        self.callback = callback or IDENTITY

        #: Number of bytes already read from the :class:`MultipartEncoder`
        #: instance
        self.bytes_read = 0

        #: Avoid the same problem in bug #80
        self.len = self.encoder.len
项目:flickr_downloader    作者:Denisolt    | 项目源码 | 文件源码
def from_fields(cls, fields, boundary=None, encoding='utf-8',
                    callback=None):
        encoder = MultipartEncoder(fields, boundary, encoding)
        return cls(encoder, callback)
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def __repr__(self):
        return '<MultipartEncoder: {0!r}>'.format(self.fields)
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def __init__(self, encoder, callback=None):
        #: Instance of the :class:`MultipartEncoder` being monitored
        self.encoder = encoder

        #: Optionally function to call after a read
        self.callback = callback or IDENTITY

        #: Number of bytes already read from the :class:`MultipartEncoder`
        #: instance
        self.bytes_read = 0

        #: Avoid the same problem in bug #80
        self.len = self.encoder.len
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def from_fields(cls, fields, boundary=None, encoding='utf-8',
                    callback=None):
        encoder = MultipartEncoder(fields, boundary, encoding)
        return cls(encoder, callback)
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def send_image(self, recipient_id, image_path):
        '''Send an image to the specified recipient.
        Image must be PNG or JPEG or GIF (more might be supported).
        https://developers.facebook.com/docs/messenger-platform/send-api-reference/image-attachment
        Input:
            recipient_id: recipient id to send to
            image_path: path to image to be sent
        Output:
            Response from API as <dict>
        '''
        payload = {
            'recipient': json.dumps(
                {
                    'id': recipient_id
                }
            ),
            'message': json.dumps(
                {
                    'attachment': {
                        'type': 'image',
                        'payload': {}
                    }
                }
            ),
            'filedata': (image_path, open(image_path, 'rb'))
        }
        multipart_data = MultipartEncoder(payload)
        multipart_header = {
            'Content-Type': multipart_data.content_type
        }
        return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def send_audio(self, recipient_id, audio_path):
        '''Send audio to the specified recipient.
        Audio must be MP3 or WAV
        https://developers.facebook.com/docs/messenger-platform/send-api-reference/audio-attachment
        Input:
            recipient_id: recipient id to send to
            audio_path: path to audio to be sent
        Output:
            Response from API as <dict>
        '''
        payload = {
            'recipient': json.dumps(
                {
                    'id': recipient_id
                }
            ),
            'message': json.dumps(
                {
                    'attachment': {
                        'type': 'audio',
                        'payload': {}
                    }
                }
            ),
            'filedata': (audio_path, open(image_path, 'rb'))
        }
        multipart_data = MultipartEncoder(payload)
        multipart_header = {
            'Content-Type': multipart_data.content_type
        }
        return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
项目:Liljimbo-Chatbot    作者:chrisjim316    | 项目源码 | 文件源码
def send_file(self, recipient_id, file_path):
        '''Send file to the specified recipient.
        https://developers.facebook.com/docs/messenger-platform/send-api-reference/file-attachment
        Input:
            recipient_id: recipient id to send to
            file_path: path to file to be sent
        Output:
            Response from API as <dict>
        '''
        payload = {
            'recipient': json.dumps(
                {
                    'id': recipient_id
                }
            ),
            'message': json.dumps(
                {
                    'attachment': {
                        'type': 'file',
                        'payload': {}
                    }
                }
            ),
            'filedata': (file_path, open(image_path, 'rb'))
        }
        multipart_data = MultipartEncoder(payload)
        multipart_header = {
            'Content-Type': multipart_data.content_type
        }
        return requests.post(self.base_url, data=multipart_data, headers=multipart_header).json()
项目:tempfiles_cmdline    作者:periket2000    | 项目源码 | 文件源码
def create_upload(self, filepath):
        return MultipartEncoder({
            'file': (os.path.basename(filepath), open(filepath, 'rb'), self.file_type)
            })
项目:html-telegraph-poster    作者:mercuree    | 项目源码 | 文件源码
def _upload(title, author, text,
            author_url='', tph_uuid=None, page_id=None, user_agent=default_user_agent, convert_html=True,
            clean_html=True):

    if not title:
        raise TitleRequiredError('Title is required')
    if not text:
        raise TextRequiredError('Text is required')

    content = convert_html_to_telegraph_format(text, clean_html) if convert_html else text
    cookies = dict(tph_uuid=tph_uuid) if tph_uuid and page_id else None

    fields = {
        'Data': ('content.html', content, 'plain/text'),
        'title': title,
        'author': author,
        'author_url': author_url,
        'page_id': page_id or '0'
    }

    m = MultipartEncoder(fields, boundary='TelegraPhBoundary21')

    headers = {
        'Content-Type': m.content_type,
        'Accept': 'application/json, text/javascript, */*; q=0.01',
        'User-Agent': user_agent
    }
    r = requests.Session()
    r.mount('https://', requests.adapters.HTTPAdapter(max_retries=3))
    response = r.post(save_url, timeout=4, headers=headers, cookies=cookies, data=m.to_string())

    result = json.loads(response.text)
    if 'path' in result:
        result['tph_uuid'] = response.cookies.get('tph_uuid') or tph_uuid
        result['url'] = base_url + '/' + result['path']
        return result
    else:
        error_msg = result['error'] if 'error' in result else ''
        raise TelegraphError(error_msg)
项目:kolibri    作者:learningequality    | 项目源码 | 文件源码
def handle(self, *args, **options):

        self.stdout.write("Uploading database to central server...\n")

        encoder = MultipartEncoder({
            "project": options['project'],
            "file": ("db.sqlite3", open(DB_PATH, "rb"), "application/octet-stream")
        })
        monitor = MultipartEncoderMonitor(encoder, create_callback(encoder))
        r = requests.post(CENTRAL_SERVER_DB_UPLOAD_URL, data=monitor, headers={"Content-Type": monitor.content_type})
        print("\nUpload finished! (Returned status {0} {1})".format(r.status_code, r.reason))
项目:send-cli    作者:ehuggett    | 项目源码 | 文件源码
def api_upload(service, encData, encMeta, keys):
    '''
       Uploads data to Send.
       Caution! Data is uploaded as given, this function will not encrypt it for you
    '''
    service += 'api/upload'
    files = requests_toolbelt.MultipartEncoder(fields={'file': ('blob', encData, 'application/octet-stream') })
    pbar = progbar(files.len)
    monitor = requests_toolbelt.MultipartEncoderMonitor(files, lambda files: pbar.update(monitor.bytes_read - pbar.n))

    headers = {
        'X-File-Metadata' : unpadded_urlsafe_b64encode(encMeta),
        'Authorization' : 'send-v1 ' + unpadded_urlsafe_b64encode(keys.authKey),
        'Content-type' : monitor.content_type
    }

    r = requests.post(service, data=monitor, headers=headers, stream=True)
    r.raise_for_status()
    pbar.close()

    body_json = r.json()
    secretUrl = body_json['url'] + '#' + unpadded_urlsafe_b64encode(keys.secretKey)
    fileId = body_json['id']
    fileNonce = unpadded_urlsafe_b64decode(r.headers['WWW-Authenticate'].replace('send-v1 ', ''))
    try:
        owner_token = body_json['owner']
    except:
        owner_token = body_json['delete']
    return secretUrl, fileId, fileNonce, owner_token
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def __repr__(self):
        return '<MultipartEncoder: {0!r}>'.format(self.fields)
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def __init__(self, encoder, callback=None):
        #: Instance of the :class:`MultipartEncoder` being monitored
        self.encoder = encoder

        #: Optionally function to call after a read
        self.callback = callback or IDENTITY

        #: Number of bytes already read from the :class:`MultipartEncoder`
        #: instance
        self.bytes_read = 0

        #: Avoid the same problem in bug #80
        self.len = self.encoder.len
项目:kapsel    作者:conda    | 项目源码 | 文件源码
def from_fields(cls, fields, boundary=None, encoding='utf-8', callback=None):
        encoder = MultipartEncoder(fields, boundary, encoding)
        return cls(encoder, callback)
项目:python-avs    作者:lddias    | 项目源码 | 文件源码
def generate_payload(event):
    """
    returns a file-like MultipartEncoder instance that can be used to write-out the multi-part request headers and body


    :param event: dict payload to send as "metadata" part in multi-part request
    :return: MultipartEncoder
    """
    return MultipartEncoder({"metadata": (None, io.BytesIO(json.dumps(event).encode()), 'application/json')})
项目:whereis    作者:matfowle    | 项目源码 | 文件源码
def sendSparkPOST(url, data):
    m = MultipartEncoder(fields=data)
    request = requests.post(url, data=m,
                            headers={"Content-Type": m.content_type,
                            "Authorization": "Bearer "+bearer})
    return request
项目:green-button    作者:bernard357    | 项目源码 | 文件源码
def post_update(context, update):
    """
    Updates a Cisco Spark room

    :param context: button state and configuration
    :type context: ``dict``

    :param update: content of the update to be posted there
    :type update: ``str`` or ``dict``

    If the update is a simple string, it is sent as such to Cisco Spark.
    Else if it a dictionary, then it is encoded as MIME Multipart.
    """

    logging.info("Posting update to Cisco Spark room")

    url = 'https://api.ciscospark.com/v1/messages'
    headers = {'Authorization': 'Bearer '+context['spark']['CISCO_SPARK_BTTN_BOT']}

    if isinstance(update, dict):
        update['roomId'] = context['spark']['id']
        payload = MultipartEncoder(fields=update)
        headers['Content-Type'] = payload.content_type
    else:
        payload = {'roomId': context['spark']['id'], 'text': update }

    response = requests.post(url=url, headers=headers, data=payload)

    if response.status_code != 200:
        logging.info(response.json())
        raise Exception("Received error code {}".format(response.status_code))

    logging.info('- done, check the room with Cisco Spark client software')

#
# handle Twilio API
#
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def load_file(path):
    _, filename = os.path.split(path)

    with open(path, 'rb') as file_:
        fields = {
            filename: (filename, file_, 'text/plain')
        }

        encoder = MultipartEncoder(fields=fields)

        return (encoder.content_type, encoder.to_string())
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def test_smoke(self):
        encoder = MultipartEncoder(fields={'name': 'hello'})

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})

        parser.data_received(encoder.to_string())
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def test_basic_single(self):
        target = ValueTarget()

        encoder = MultipartEncoder(fields={'value': 'hello world'})

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.register('value', target)

        parser.data_received(encoder.to_string())

        self.assertEqual(target.value, b'hello world')
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def test_break_chunk_at_boundary(self):
        expected_first_value = 'hello' * 500
        expected_second_value = 'hello' * 500

        first = ValueTarget()
        second = ValueTarget()

        encoder = MultipartEncoder(fields={
            'first': 'hello' * 500,
            'second': 'hello' * 500
        })

        body = encoder.to_string()
        boundary = encoder.boundary.encode('utf-8')

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})

        parser.register('first', first)
        parser.register('second', second)

        index = body[50:].index(boundary) + 5

        parser.data_received(body[:index])
        parser.data_received(body[index:])

        self.assertEqual(first.value, expected_first_value.encode('utf-8'))
        self.assertEqual(second.value, expected_second_value.encode('utf-8'))
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def test_mixed_content_varying_chunk_size(self):
        with open(data_file_path('file.txt'), 'rb') as file_:
            expected_value = file_.read()

        with open(data_file_path('file.txt'), 'rb') as file_:
            fields = {
                'name': 'hello world',
                'age': '10',
                'cv.txt': ('file.txt', file_, 'text/plain')
            }

            encoder = MultipartEncoder(fields=fields)

            body = encoder.to_string()
            content_type = encoder.content_type

        for index in range(len(body)):
            name = ValueTarget()
            age = ValueTarget()
            cv = ValueTarget()

            parser = StreamingFormDataParser(
                headers={'Content-Type': content_type})

            parser.register('name', name)
            parser.register('age', age)
            parser.register('cv.txt', cv)

            parser.data_received(body[:index])
            parser.data_received(body[index:])

            self.assertEqual(name.value, b'hello world')
            self.assertEqual(age.value, b'10')
            self.assertEqual(cv.value, expected_value)
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def test_parameter_contains_crlf(self):
        target = ValueTarget()

        encoder = MultipartEncoder(fields={'value': 'hello\r\nworld'})

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.register('value', target)
        parser.data_received(encoder.to_string())

        self.assertEqual(target.value, b'hello\r\nworld')
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def test_parameter_ends_with_crlf(self):
        target = ValueTarget()

        encoder = MultipartEncoder(fields={'value': 'hello\r\n'})

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.register('value', target)

        parser.data_received(encoder.to_string())

        self.assertEqual(target.value, b'hello\r\n')
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def test_parameter_starts_with_crlf(self):
        target = ValueTarget()

        encoder = MultipartEncoder(fields={'value': '\r\nworld'})

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.register('value', target)

        parser.data_received(encoder.to_string())

        self.assertEqual(target.value, b'\r\nworld')
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def test_register_after_data_received(self):
        encoder = MultipartEncoder(fields={'name': 'hello'})

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.data_received(encoder.to_string())

        self.assertRaises(ParseFailedException, parser.register,
                          'name', ValueTarget())
项目:streaming-form-data    作者:siddhantgoel    | 项目源码 | 文件源码
def main():
    args = parse_args()

    with open(args.filename, 'rb') as fd:
        encoder = MultipartEncoder(fields={
            'file': ('file', fd, args.content_type)
        })

        parser = StreamingFormDataParser(
            headers={'Content-Type': encoder.content_type})
        parser.register('file', ValueTarget())

        parser.data_received(encoder.to_string())
项目:napalm-panos    作者:napalm-automation-community    | 项目源码 | 文件源码
def _import_file(self, filename):
        if not self.api_key:
            key = self.device.keygen()
        else:
            key = self.api_key

        params = {
            'type': 'import',
            'category': 'configuration',
            'key': key
        }

        path = os.path.basename(filename)

        mef = requests_toolbelt.MultipartEncoder(
            fields={
                'file': (path, open(filename, 'rb'), 'application/octet-stream')
            }
        )

        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
        url = 'https://{0}/api/'.format(self.hostname)
        request = requests.post(
            url,
            verify=False,
            params=params,
            headers={'Content-Type': mef.content_type},
            data=mef
        )

        # if something goes wrong just raise an exception
        request.raise_for_status()
        response = xml.etree.ElementTree.fromstring(request.content)

        if response.attrib['status'] == 'error':
            return False
        else:
            return path
项目:OctoFusion    作者:tapnair    | 项目源码 | 文件源码
def __repr__(self):
        return '<MultipartEncoder: {0!r}>'.format(self.fields)
项目:OctoFusion    作者:tapnair    | 项目源码 | 文件源码
def __init__(self, encoder, callback=None):
        #: Instance of the :class:`MultipartEncoder` being monitored
        self.encoder = encoder

        #: Optionally function to call after a read
        self.callback = callback or IDENTITY

        #: Number of bytes already read from the :class:`MultipartEncoder`
        #: instance
        self.bytes_read = 0

        #: Avoid the same problem in bug #80
        self.len = self.encoder.len
项目:OctoFusion    作者:tapnair    | 项目源码 | 文件源码
def from_fields(cls, fields, boundary=None, encoding='utf-8',
                    callback=None):
        encoder = MultipartEncoder(fields, boundary, encoding)
        return cls(encoder, callback)
项目:Instagram-API-python    作者:LevPasha    | 项目源码 | 文件源码
def uploadPhoto(self, photo, caption = None, upload_id = None, is_sidecar = None):
        if upload_id is None:
            upload_id = str(int(time.time() * 1000))
        data = {
        'upload_id'         : upload_id,
        '_uuid'             : self.uuid,
        '_csrftoken'        : self.token,
        'image_compression' : '{"lib_name":"jt","lib_version":"1.3.0","quality":"87"}',
        'photo'             : ('pending_media_%s.jpg'%upload_id, open(photo, 'rb'), 'application/octet-stream', {'Content-Transfer-Encoding':'binary'})
        }
        if is_sidecar:
            data['is_sidecar']='1'
        m = MultipartEncoder(data, boundary=self.uuid)
        self.s.headers.update ({'X-IG-Capabilities' : '3Q4=',
                                'X-IG-Connection-Type' : 'WIFI',
                                'Cookie2' : '$Version=1',
                                'Accept-Language' : 'en-US',
                                'Accept-Encoding' : 'gzip, deflate',
                                'Content-type': m.content_type,
                                'Connection' : 'close',
                                'User-Agent' : self.USER_AGENT})
        response = self.s.post(self.API_URL + "upload/photo/", data=m.to_string())
        if response.status_code == 200:
            if self.configure(upload_id, photo, caption):
                self.expose()
        return False
项目:anaconda-project    作者:Anaconda-Platform    | 项目源码 | 文件源码
def __repr__(self):
        return '<MultipartEncoder: {0!r}>'.format(self.fields)