我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.post()。
def send_message_to_facebook(access_token, user_id, message_data): """ Makes use of Send API: https://developers.facebook.com/docs/messenger-platform/send-api-reference """ headers = { 'Content-Type': 'application/json', } params = { 'access_token': access_token, } payload = { 'recipient': { 'id': user_id, }, 'message': message_data, } url = 'https://graph.facebook.com/v2.6/me/messages' response = requests.post(url, headers=headers, params=params, data=json.dumps(payload)) response.raise_for_status() return response.json()
def api_call(self, url, http_method, data): headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key, "authorization" : "Bearer " + self.access_token} r = None if http_method is PyCurlVerbs.POST: r = requests.post(url, data=json.dumps(data), headers=headers) elif http_method is PyCurlVerbs.DELETE: r = requests.delete(url, headers=headers) elif http_method is PyCurlVerbs.PUT: r = requests.put(url, data=json.dumps(data), headers=headers) elif http_method is PyCurlVerbs.GET: r = requests.get(url, headers=headers) return r
def primary_auth(self): """ Performs primary auth against Okta """ auth_data = { "username": self.username, "password": self.password } resp = requests.post(self.base_url+'/api/v1/authn', json=auth_data) resp_json = resp.json() if 'status' in resp_json: if resp_json['status'] == 'MFA_REQUIRED': factors_list = resp_json['_embedded']['factors'] state_token = resp_json['stateToken'] session_token = self.verify_mfa(factors_list, state_token) elif resp_json['status'] == 'SUCCESS': session_token = resp_json['sessionToken'] elif resp.status_code != 200: print(resp_json['errorSummary']) exit(1) else: print(resp_json) exit(1) return session_token
def upload_prediction(self, file_path): filename, signedRequest, headers, status_code = self.authorize(file_path) if status_code!=200: return status_code dataset_id, comp_id, status_code = self.get_current_competition() if status_code!=200: return status_code with open(file_path, 'rb') as fp: r = requests.Request('PUT', signedRequest, data=fp.read()) prepped = r.prepare() s = requests.Session() resp = s.send(prepped) if resp.status_code!=200: return resp.status_code r = requests.post(self._submissions_url, data={'competition_id':comp_id, 'dataset_id':dataset_id, 'filename':filename}, headers=headers) return r.status_code
def sendRequest(ip, port, route, data=None, protocol="http"): url = "{protocol}://{ip}:{port}{route}".format(protocol=protocol, ip=ip, port=port, route=route) if data is not None: try: resp = requests.post(url, data=data) except requests.HTTPError as e: raise PipelineServiceError("{reason}".format(reason=e)) else: try: resp = requests.get(url) except requests.HTTPError as e: raise PipelineServiceError("{reason}".format(reason=e)) return resp
def put_comments(self, resource, comment): """ Post a comment on a file or URL. The initial idea of VirusTotal Community was that users should be able to make comments on files and URLs, the comments may be malware analyses, false positive flags, disinfection instructions, etc. Imagine you have some automatic setup that can produce interesting results related to a given sample or URL that you submit to VirusTotal for antivirus characterization, you might want to give visibility to your setup by automatically reviewing samples and URLs with the output of your automation. :param resource: either a md5/sha1/sha256 hash of the file you want to review or the URL itself that you want to comment on. :param comment: the actual review, you can tag it using the "#" twitter-like syntax (e.g. #disinfection #zbot) and reference users using the "@" syntax (e.g. @VirusTotalTeam). :return: If the comment was successfully posted the response code will be 1, 0 otherwise. """ params = {'apikey': self.api_key, 'resource': resource, 'comment': comment} try: response = requests.post(self.base + 'comments/put', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def getESCXBalance(address): try: payload = { "method": "get_balances", "params": { "filters":[{"field": "address", "op": "==", "value": address}, {"field": "asset", "op": "==", "value": "ESCX"}], "filterop": "and" }, "jsonrpc":"2.0", "id":0 } response = requests.post(url, data=json.dumps(payload), headers=headers, auth=auth) json_data = response.json() #quantity = json_data.quantity return (json_data['result'].pop()['quantity']) / 100000000 except: return 0;
def add_transaction(tx): """ POST transaction to etherscan.io. """ tx_hex = rlp.encode(tx).encode("hex") # use https://etherscan.io/pushTx to debug print("tx_hex:", tx_hex) url = 'https://api.etherscan.io/api' url += '?module=proxy&action=eth_sendRawTransaction' if ETHERSCAN_API_KEY: '&apikey=%' % ETHERSCAN_API_KEY # TODO: handle 504 timeout, 403 and other errors from etherscan response = requests.post(url, data={'hex': tx_hex}) # response is like: # {'jsonrpc': '2.0', 'result': '0x24a8...14ea', 'id': 1} # or on error like this: # {'jsonrpc': '2.0', 'id': 1, 'error': { # 'message': 'Insufficient funds...', 'code': -32010, 'data': None}} response_json = response.json() print("response_json:", response_json) PyWalib.handle_etherscan_tx_error(response_json) tx_hash = response_json['result'] # the response differs from the other responses return tx_hash
def register(name): # hit api to see if name is already registered if check_name(name)['status'] == 'error': print('{} already registered.'.format(name)) else: # generate new keypair (pub, priv) = rsa.newkeys(512) if os.path.exists(KEY_LOCATION) == False: os.mkdir(KEY_LOCATION) # save to disk with open('{}/.key'.format(KEY_LOCATION), 'wb') as f: pickle.dump((pub, priv), f, pickle.HIGHEST_PROTOCOL) r = requests.post('{}/names'.format(API_LOCATION), data = {'name' : name, 'n' : pub.n, 'e' : pub.e}) if r.json()['status'] == 'success': print('Successfully registered new name: {}'.format(name)) else: print('Error registering name: {}'.format(name))
def _make_request(self, path, cni_envs, expected_status=None): method = 'POST' address = config.CONF.cni_daemon.bind_address url = 'http://%s/%s' % (address, path) try: LOG.debug('Making request to CNI Daemon. %(method)s %(path)s\n' '%(body)s', {'method': method, 'path': url, 'body': cni_envs}) resp = requests.post(url, json=cni_envs, headers={'Connection': 'close'}) except requests.ConnectionError: LOG.exception('Looks like %s cannot be reached. Is kuryr-daemon ' 'running?', address) raise LOG.debug('CNI Daemon returned "%(status)d %(reason)s".', {'status': resp.status_code, 'reason': resp.reason}) if expected_status and resp.status_code != expected_status: LOG.error('CNI daemon returned error "%(status)d %(reason)s".', {'status': resp.status_code, 'reason': resp.reason}) raise k_exc.CNIError('Got invalid status code from CNI daemon.') return resp
def scan_file(self, this_file): """ Submit a file to be scanned by VirusTotal :param this_file: File to be scanned (32MB file size limit) :return: JSON response that contains scan_id and permalink. """ params = {'apikey': self.api_key} try: if type(this_file) == str and os.path.isfile(this_file): files = {'file': (this_file, open(this_file, 'rb'))} elif isinstance(this_file, StringIO.StringIO): files = {'file': this_file.read()} else: files = {'file': this_file} except TypeError as e: return dict(error=e.message) try: response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def rescan_file(self, this_hash): """ Rescan a previously submitted filed or schedule an scan to be performed in the future. :param this_hash: a md5/sha1/sha256 hash. You can also specify a CSV list made up of a combination of any of the three allowed hashes (up to 25 items), this allows you to perform a batch request with one single call. Note that the file must already be present in our file store. :return: JSON response that contains scan_id and permalink. """ params = {'apikey': self.api_key, 'resource': this_hash} try: response = requests.post(self.base + 'file/rescan', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def scan_url(self, this_url): """ Submit a URL to be scanned by VirusTotal. :param this_url: The URL that should be scanned. This parameter accepts a list of URLs (up to 4 with the standard request rate) so as to perform a batch scanning request with one single call. The URLs must be separated by a new line character. :return: JSON response that contains scan_id and permalink. """ params = {'apikey': self.api_key, 'url': this_url} try: response = requests.post(self.base + 'url/scan', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def scan_file(self, this_file, notify_url=None, notify_changes_only=None): """ Submit a file to be scanned by VirusTotal. Allows you to send a file for scanning with VirusTotal. Before performing your submissions we encourage you to retrieve the latest report on the files, if it is recent enough you might want to save time and bandwidth by making use of it. File size limit is 32MB, in order to submmit files up to 200MB in size you must request a special upload URL. :param this_file: The file to be uploaded. :param notify_url: A URL to which a POST notification should be sent when the scan finishes. :param notify_changes_only: Used in conjunction with notify_url. Indicates if POST notifications should be sent only if the scan results differ from the previous analysis. :return: JSON response that contains scan_id and permalink. """ params = {'apikey': self.api_key} files = {'file': (this_file, open(this_file, 'rb'))} try: response = requests.post(self.base + 'file/scan', files=files, params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def cancel_rescan_file(self, resource): """ Delete a previously scheduled scan. Deletes a scheduled file rescan task. The file rescan api allows you to schedule periodic scans of a file, this API call tells VirusTotal to stop rescanning a file that you have previously enqueued for recurrent scanning. :param resource: The md5/sha1/sha256 hash of the file whose dynamic behavioural report you want to retrieve. :return: JSON acknowledgement. In the event that the scheduled scan deletion fails for whatever reason, the response code will be -1. """ params = {'apikey': self.api_key, 'resource': resource} try: response = requests.post(self.base + 'rescan/delete', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def put_comments(self, resource, comment): """ Post a comment on a file or URL. Allows you to place comments on URLs and files, these comments will be publicly visible in VirusTotal Community, under the corresponding tab in the reports for each particular item. Comments can range from URLs and locations where a given file was found in the wild to full reverse engineering reports on a given malware specimen, anything that may help other analysts in extending their knowledge about a particular file or URL. :param resource: Either an md5/sha1/sha256 hash of the file you want to review or the URL itself that you want to comment on. :param comment: The actual review, you can tag it using the "#" twitter-like syntax (e.g. #disinfection #zbot) and reference users using the "@" syntax (e.g. @VirusTotalTeam). :return: JSON response """ params = {'apikey': self.api_key, 'resource': resource, 'comment': comment} try: response = requests.post(self.base + 'comments/put', params=params, proxies=self.proxies) except requests.RequestException as e: return dict(error=e.message) return _return_response_and_status_code(response)
def get_auth_code(self): """ Get access token for connect to youtube api """ oauth_url = 'https://accounts.google.com/o/oauth2/token' data = dict( refresh_token=self.refresh_token, client_id=self.client_id, client_secret=self.client_secret, grant_type='refresh_token', ) headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json' } response = requests.post(oauth_url, data=data, headers=headers) response = response.json() return response.get('access_token')
def create_cd(col_name, type, display_name): url = 'https://api.kentik.com/api/v5/customdimension' json_template = ''' { "name": "{{ column }}", "type": "{{ data_type }}", "display_name": "{{ pretty_name }}" } ''' t = Template(json_template) data = json.loads(t.render(column = col_name, data_type = type, pretty_name = display_name)) response = requests.post(url, headers=headers, data=data) if response.status_code != 201: print("Unable to create custom dimension column. Exiting.") print("Status code: {}").format(response.status_code) print("Error message: {}").format(response.json()['error']) exit() else: print("Custom dimension \"{}\" created as id: {}").format(display_name, \ response.json()['customDimension']['id']) return(response.json()['customDimension']['id'])
def post_to_all(self, url = '/', data={}): if (url == '/'): res = [] for masterip in masterips: try: requests.post("http://"+getip(masterip)+":"+master_port+"/isalive/",data=data) except Exception as e: logger.debug(e) continue res.append(masterip) return res data = dict(data) data['token'] = session['token'] logger.info("Docklet Request: user = %s data = %s, url = %s"%(session['username'], data, url)) result = {} for masterip in masterips: try: res = requests.post("http://"+getip(masterip)+":"+master_port+url,data=data).json() except Exception as e: logger.debug(e) continue result[masterip] = res logger.debug("get result from " + getip(masterip)) return result
def get_token(): headers = { 'Host': 'authorization.shanghaidisneyresort.com', 'Content-Type': 'application/x-www-form-urlencoded; charset=utf-8', 'X-NewRelic-ID': 'Uw4BWVZSGwICU1VRAgkH', 'X-Conversation-Id': 'shdrA5320488-E03F-4795-A818-286C658EEBB6', 'Accept': 'application/json', 'User-Agent': 'SHDR/4.1.1 (iPhone; iOS 9.3.5; Scale/2.00)', 'Accept-Language': 'zh-Hans-CN;q=1', } data = 'assertion_type=public&client_id=DPRD-SHDR.MOBILE.IOS-PROD&client_secret=&grant_type=assertion' resp = requests.post('https://authorization.shanghaidisneyresort.com/curoauth/v1/token', headers=headers, data=data) resp.raise_for_status() token = resp.json()['access_token'] return token
def login(self): """Log in to the MyQ service.""" params = { 'username': self.username, 'password': self.password } login = requests.post( 'https://{host_uri}/{login_endpoint}'.format( host_uri=HOST_URI, login_endpoint=self.LOGIN_ENDPOINT), json=params, headers={ 'MyQApplicationId': self.brand[APP_ID], 'User-Agent': self.USERAGENT } ) auth = login.json() self.security_token = auth['SecurityToken'] self._logger.debug('Logged in to MyQ API') return True
def test_last_request(self, mock): response = requests.post(GetMock.url, headers={'custom-header': 'huseyin'}, data={'name': 'huseyin'}) self.assertEqual(response.status_code, 200) last_request = mock.last_request # Test if last request has expected values. self.assertEqual(last_request.url, GetMock.url) body = last_request.body # In python 3 httpretty backend returns binary string for body. # So we are decoding it back to unicode to test. if isinstance(body, six.binary_type): body = body.decode('utf-8') self.assertEqual(body, 'name=huseyin') self.assertEqual(last_request.headers.get('custom-header'), 'huseyin') # Make sure that class's last_request is same as instances. self.assertIs(GetMock.last_request, mock.last_request)
def test_fetch_from_url_or_retry_post_json(self): # mocked requests identifier = "1csb, 2pah" base_url = c.http_pdbe endpoint_url = "api/pdb/entry/summary/" response = response_mocker(kwargs={}, base_url=base_url, endpoint_url=endpoint_url, content_type='application/octet-stream', post=True, data=identifier) self.fetch_from_url_or_retry = MagicMock(return_value=response) url = base_url + endpoint_url + identifier r = self.fetch_from_url_or_retry(url, json=True, post=True, data=identifier, header={'application/octet-stream'}, retry_in=None, wait=0, n_retries=10, stream=False).json() self.assertEqual(r, json.loads('{"data": "some json formatted output"}'))
def call_api(self, method, endpoint, payload): url = urlparse.urljoin(self.api_base_url, endpoint) if method == 'POST': response = requests.post(url, data=payload) elif method == 'delete': response = requests.delete(url) elif method == 'put': response = requests.put(url, data=payload) else: if self.api_key: payload.update({'api_token': self.api_key}) response = requests.get(url, params=payload) content = json.loads(response.content) return content
def verify_single_factor(self, factor_id, state_token): """ Verifies a single MFA factor """ factor_answer = raw_input('Enter MFA token: ') req_data = { "stateToken": state_token, "answer": factor_answer } post_url = "%s/api/v1/authn/factors/%s/verify" % (self.base_url, factor_id) resp = requests.post(post_url, json=req_data) resp_json = resp.json() if 'status' in resp_json: if resp_json['status'] == "SUCCESS": return resp_json['sessionToken'] elif resp.status_code != 200: print(resp_json['errorSummary']) exit(1) else: print(resp_json) exit(1)
def read_and_report(force_alert=False): h, t = dht.read_retry(dht.DHT22, TEMP_SENSOR_PIN) data = { "reading": { "humidity": h, "temperature": t, "sensor_id": mac } } if force_alert: data['reading']['force_alert'] = True print("Sending reading: {}".format(data)) try: response = requests.post( "http://{DOMAIN}/readings".format(DOMAIN=SERVER_DOMAIN), data=json.dumps(data), headers=headers) print("Reading Sent") except requests.exceptions.RequestException as e: print("Error sending reading: {}".format(e))
def api_request(self, call, params, kind='auth', http_call='get'): """ General API request. Generally, use the convenience functions below :param kind: the type of request to make. 'auth' makes an authenticated call; 'basic' is unauthenticated :param call: the API call to make :param params: a dict of query parameters :return: a json response, a BitXAPIError is thrown if the api returns with an error """ url = self.construct_url(call) auth = self.auth if kind == 'auth' else None if http_call == 'get': response = requests.get(url, params, headers=self.headers, auth=auth) elif http_call == 'post': response = requests.post(url, data = params, headers=self.headers, auth=auth) else: raise ValueError('Invalid http_call parameter') try: result = response.json() except ValueError: result = {'error': 'No JSON content returned'} if response.status_code != 200 or 'error' in result: raise BitXAPIError(response) else: return result
def create_limit_order(self, order_type, volume, price): """ Create a new limit order :param order_type: 'buy' or 'sell' :param volume: the volume, in BTC :param price: the ZAR price per bitcoin :return: the order id """ data = { 'pair': self.pair, 'type': 'BID' if order_type == 'buy' else 'ASK', 'volume': str(volume), 'price': str(price) } result = self.api_request('postorder', params=data, http_call='post') return result
def validate_captcha(captcha_post): """ Validates the Google re-captcha data. :param str captcha_post: The re-captcha post data :return bool: """ validated = False data = { 'secret': settings.GOOGLE_RECAPTCHA_SECRET_KEY, 'response': captcha_post } response = requests.post('https://www.google.com/recaptcha/api/siteverify', data=data, verify=False).json() if response['success']: validated = True return validated
def send_message_android(destination, message): headers = { 'Authorization': 'key=' + settings.FIREBASE_SERVER_KEY, 'Content - Type': 'application/json' } payload = { "to": destination, "data": { "title": config.TITLE_PUSH_NOTIFICATION, "detail": message } } request = requests.post( settings.FIREBASE_API_URL, json=payload, headers=headers ) print(request.text)
def send_message_ios(destination, message): headers = { 'Authorization': 'key=' + settings.FIREBASE_SERVER_KEY, 'Content - Type': 'application/json' } payload = { "to": destination, "priority": "high", "badge": 0, "notification": { "title": config.TITLE_PUSH_NOTIFICATION, "text": message, "sound": "default", } } request = requests.post( settings.FIREBASE_API_URL, json=payload, headers=headers ) print(request.text)
def obtain_access_token(config, password): info("Will now attempt to obtain an JWT...") auth_request_data = {'client_id': 'jumpauth', 'username': config.username, 'password': password, 'grant_type': 'password'} auth_url = get_auth_url(config) auth_response = requests.post(auth_url, auth_request_data) if auth_response.status_code not in [200, 201]: log.info("Authentication failed: {0}". format(auth_response.json().get("error"))) auth_response.raise_for_status() else: log.info("Authentication OK!") # TODO bail out if there was no access token in the answer access_token = auth_response.json()['access_token'] info("Access token was received.") verbose("Access token is:\n'{0}'".format(access_token)) return access_token
def authenticate(username, password): global Cookies, Host try: postData = "password=" + getPasswordEnc(password) + "&clientType=android&username=" + \ username + "&key=" + getSessionEnc(AccessToken) + "&code=8&clientip=" + LocalIpAddress header['Content-Length'] = '219' url = 'http://' + Host + '/wf.do' resp = requests.post(url=url, headers=header, cookies=Cookies, data=postData) resp.encoding = "GBK" print resp.request.headers text = resp.text print text if 'auth00' in text > 0: print "????." return True else: print "????[" + getErrorMsg(resp) + "]" return False except Exception as e: print e print "(6/10) ????????" return False
def __call__(self): headers = { "Content-Type": "application/x-www-form-urlencoded", "Cache-Control": "no-cache", } body = urlparse.urlencode({ "client_id": self.api_key, "client_secret": self.client_secret, "jwt_token": self.jwt_token }) r = requests.post(self.endpoint, headers=headers, data=body) if r.status_code != 200: raise RuntimeError("Unable to authorize against {}:\n" "Response Code: {:d}, Response Text: {}\n" "Response Headers: {}]".format(self.endpoint, r.status_code, r.text, r.headers)) self.set_expiry(r.json()['expires_in']) return r.json()['access_token']
def nightly_build(ctx, project_name, branch_name, circle_token): url = 'https://circleci.com/api/v1/project/{}/tree/{}?circle-token={}'.format( project_name, branch_name, circle_token ) headers = { 'Accept': 'application/json', 'Content-Type': 'application/json', } body = { 'build_parameters': { 'RUN_NIGHTLY_BUILD': "true", } } response = requests.post(url, json=body, headers=headers) print("Nightly Build queued on CircleCI: http_status={}, response={}".format( response.status_code, response.json() ))
def start_job(self, job_id): """ Request to start a job :param job_id: the id of the job to start :return: the response obj: { result: string 'success' or 'already started' } """ # endpoint /v1/jobs/{job_id}/event endpoint = '{0}{1}/event'.format(self.endpoint, job_id) doc = {"start": None} r = requests.post(endpoint, headers=self.headers, data=json.dumps(doc), verify=self.verify) if r.status_code != 202: raise exceptions.ApiClientException(r) return r.json()
def stop_job(self, job_id): """ Request to stop a job :param job_id: the id of the job to start :return: the response obj: { result: string 'success' or 'already stopped' } """ # endpoint /v1/jobs/{job_id}/event endpoint = '{0}{1}/event'.format(self.endpoint, job_id) doc = {"stop": None} r = requests.post(endpoint, headers=self.headers, data=json.dumps(doc), verify=self.verify) if r.status_code != 202: raise exceptions.ApiClientException(r) return r.json()
def abort_job(self, job_id): """ Request to abort a job :param job_id: the id of the job to start :return: the response obj: { result: string 'success' or 'already stopped' } """ # endpoint /v1/jobs/{job_id}/event endpoint = '{0}{1}/event'.format(self.endpoint, job_id) doc = {"abort": None} r = requests.post(endpoint, headers=self.headers, data=json.dumps(doc), verify=self.verify) if r.status_code != 202: raise exceptions.ApiClientException(r) return r.json()
def end_session(self, session_id, job_id, session_tag, result): """ Informs the freezer service that the job has ended. Provides information about the job's result and the session tag :param session_id: :param job_id: :param session_tag: :param result: :return: """ # endpoint /v1/sessions/{sessions_id}/action endpoint = '{0}{1}/action'.format(self.endpoint, session_id) doc = {"end": { "job_id": job_id, "current_tag": session_tag, "result": result }} r = requests.post(endpoint, headers=self.headers, data=json.dumps(doc), verify=self.verify) if r.status_code != 202: raise exceptions.ApiClientException(r) return r.json()
def send(notif_type, **params): ''' Send slack notifications. ''' url = config()['base_url'] + config()['endpoint'] (text, color) = notification.get( notif_type, config=get_config(), notif_config=config(), create_link=create_link, **params ) payload = { 'attachments': [ { 'color': color, 'text': text } ] } requests.post(url, json=payload)
def send(notif_type, **params): ''' Send hipchat notifications. ''' url = API_BASE_URL.format( company_name=config()['company_name'], room_id=config()['room_id'], auth_token=config()['auth_token'] ) (text, color) = notification.get( notif_type, config=get_config(), notif_config=config(), create_link=create_link, **params ) payload = { 'color': color, 'message': text, 'notify': config()['notify'], 'message_format': 'html' } requests.post(url, json=payload)
def create_logset(logset_name=None, params=None): """ Add a new logset to the current account. If a filename is given, load the contents of the file as json parameters for the request. If a name is given, create a new logset with the given name """ if params is not None: request_params = params else: request_params = { 'logset': { 'name': logset_name } } headers = api_utils.generate_headers('rw') try: response = requests.post(_url()[1], json=request_params, headers=headers) handle_response(response, 'Creating logset failed.\n', 201) except requests.exceptions.RequestException as error: sys.stderr.write(error) sys.exit(1)
def create(payload): """ Create an api key with the provided ID """ action, url = _url() headers = api_utils.generate_headers('owner', method='POST', body=json.dumps(payload), action=action) try: response = requests.post(url, headers=headers, json=payload) if response_utils.response_error(response): sys.stderr.write('Create api key failed.') sys.exit(1) elif response.status_code == 201: handle_api_key_response(response) except requests.exceptions.RequestException as error: sys.stderr.write(error) sys.exit(1)
def execute(self, document, variable_values=None, timeout=None): query_str = print_ast(document) payload = { 'query': query_str, 'variables': variable_values or {} } data_key = 'json' if self.use_json else 'data' post_args = { 'headers': self.headers, 'auth': self.auth, 'cookies': self.cookies, 'timeout': timeout or self.default_timeout, data_key: payload } request = requests.post(self.url, **post_args) request.raise_for_status() result = request.json() assert 'errors' in result or 'data' in result, 'Received non-compatible response "{}"'.format(result) return ExecutionResult( errors=result.get('errors'), data=result.get('data') )
def refresh_access_token(self, refresh_token): """ Returns the access token for Spotify Web API using a refresh token. Args: refresh_token (string): The token has been returned by the access token request. Returns: SpotifyCredentials: The parsed response from Spotify. """ # Get new auth token payload = { 'grant_type': 'refresh_token', 'refresh_token': refresh_token } headers = self._make_authorization_headers() return requests.post( 'https://accounts.spotify.com/api/token', data=payload, headers=headers ).json()
def request_access_token(self, token): """ Request access token from Last.fm. Args: token (string): Token from redirect. Return: dict: Response from get session call. """ payload = { 'api_key': self.__key, 'method': 'auth.getSession', 'token': token } payload['api_sig'] = self.sign(payload) payload['format'] = 'json' response = requests.post( 'https://ws.audioscrobbler.com/2.0/', params=payload).json() return LastfmCredentials(response['session']['key'])