我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.put()。
def api_call(self, url, http_method, data): headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key, "authorization" : "Bearer " + self.access_token} r = None if http_method is PyCurlVerbs.POST: r = requests.post(url, data=json.dumps(data), headers=headers) elif http_method is PyCurlVerbs.DELETE: r = requests.delete(url, headers=headers) elif http_method is PyCurlVerbs.PUT: r = requests.put(url, data=json.dumps(data), headers=headers) elif http_method is PyCurlVerbs.GET: r = requests.get(url, headers=headers) return r
def send_response(event, context, responseStatus, responseData): responseBody = {'Status': responseStatus, 'Reason': 'See the details in CloudWatch Log Stream: ' + context.log_stream_name, 'PhysicalResourceId': context.log_stream_name, 'StackId': event['StackId'], 'RequestId': event['RequestId'], 'LogicalResourceId': event['LogicalResourceId'], 'Data': responseData} req = None try: req = requests.put(event['ResponseURL'], data=json.dumps(responseBody)) if req.status_code != 200: print(req.text) raise Exception('Recieved non 200 response while sending response to CFN.') return except requests.exceptions.RequestException as e: if req != None: print(req.text) print(e) raise
def set_state(self, device_id, state): """Set device state.""" payload = { 'attributeName': 'desireddoorstate', 'myQDeviceId': device_id, 'AttributeValue': state, } device_action = requests.put( 'https://{host_uri}/{device_set_endpoint}'.format( host_uri=HOST_URI, device_set_endpoint=self.DEVICE_SET_ENDPOINT), data=payload, headers={ 'MyQApplicationId': self.brand[APP_ID], 'SecurityToken': self.security_token, 'User-Agent': self.USERAGENT } ) return device_action.status_code == 200
def call_api(self, method, endpoint, payload): url = urlparse.urljoin(self.api_base_url, endpoint) if method == 'POST': response = requests.post(url, data=payload) elif method == 'delete': response = requests.delete(url) elif method == 'put': response = requests.put(url, data=payload) else: if self.api_key: payload.update({'api_token': self.api_key}) response = requests.get(url, params=payload) content = json.loads(response.content) return content
def replace_log(log_id, params): """ Replace the given log with the details provided """ headers = api_utils.generate_headers('rw') try: response = requests.put(_url(('logs', log_id))[1], json=params, headers=headers) if response_utils.response_error(response): sys.stderr.write('Update log failed.\n') sys.exit(1) elif response.status_code == 200: sys.stdout.write('Log: %s updated to:\n' % log_id) api_utils.pretty_print_string_as_json(response.text) except requests.exceptions.RequestException as error: sys.stderr.write(error) sys.exit(1)
def cfn_response(response_status, response_data, reason, physical_resource_id, event, context): # Put together the response to be sent to the S3 pre-signed URL if reason: reason = reason else: reason = 'See the details in CloudWatch Log Stream: ' + context.log_stream_name responseBody = {'Status': response_status, 'Reason': 'See the details in CloudWatch Log Stream: ' + context.log_stream_name, 'PhysicalResourceId': physical_resource_id, 'StackId': event['StackId'], 'RequestId': event['RequestId'], 'LogicalResourceId': event['LogicalResourceId'], 'Data': response_data } print('Response Body:', responseBody) response = requests.put(event['ResponseURL'], data=json.dumps(responseBody)) if response.status_code != 200: print(response.text) raise Exception('Response error received.') return
def __requests_put(self, uri, data, headers): if self.mesos_user: # print "User " + self.mesos_user + "Pass " + self.mesos_pass # print "URI " + self.mesos_host + uri response = requests.put(self.mesos_host + uri, data, headers=headers, verify=False, auth=(self.mesos_user, self.mesos_pass)) else: response = requests.put(self.mesos_host + uri, data, headers=headers, verify=False) # print "Response:" + str(response) return response.json()
def __requests_put(self, uri, data, headers): if self.marathon_user: # print "User " + self.marathon_user + "Pass " + self.marathon_pass # print "URI " + self.marathon_host + uri response = requests.put(self.marathon_host + uri, data, headers=headers, verify=False, auth=(self.marathon_user, self.marathon_pass)) else: response = requests.put(self.marathon_host + uri, data, headers=headers, verify=False) # print "Response:" + str(response) return response
def sendResponse(event, context, responseStatus, responseData): responseBody = {'Status': responseStatus, 'Reason': 'See the details in CloudWatch Log Stream: ' + context.log_stream_name, 'PhysicalResourceId': context.log_stream_name, 'StackId': event['StackId'], 'RequestId': event['RequestId'], 'LogicalResourceId': event['LogicalResourceId'], 'Data': responseData} print 'RESPONSE BODY:n' + json.dumps(responseBody) try: req = requests.put(event['ResponseURL'], data=json.dumps(responseBody)) if req.status_code != 200: print req.text raise Exception('Recieved non 200 response while sending response to CFN.') return except requests.exceptions.RequestException as e: print e raise
def create_cloud(self, cloud, data=None): cloud_url = self.url + "/clouds" payload = {"name": cloud} print("Creating cloud entry for %s with data %s" % (payload, data)) resp = requests.post(cloud_url, headers=self.headers, data=json.dumps(payload), verify=False) if resp.status_code != 201: raise Exception(resp.text) self.cloud = resp.json() if data: reg_id = self.cloud["id"] cloud_data_url = self.url + "/clouds/%s/variables" % reg_id resp = requests.put(cloud_data_url, headers=self.headers, data=json.dumps(data), verify=False) if resp.status_code != 200: print(resp.text)
def create_region(self, region, data=None): region_url = self.url + "/regions" payload = {"name": region, "cloud_id": self.cloud.get("id")} print("Creating region entry for %s with data %s" % (payload, data)) resp = requests.post(region_url, headers=self.headers, data=json.dumps(payload), verify=False) if resp.status_code != 201: raise Exception(resp.text) self.region = resp.json() if data: reg_id = self.region["id"] region_data_url = self.url + "/regions/%s/variables" % reg_id resp = requests.put(region_data_url, headers=self.headers, data=json.dumps(data), verify=False) if resp.status_code != 200: print(resp.text)
def create_cell(self, cell, data=None): region_url = self.url + "/cells" payload = {"region_id": self.region.get("id"), "cloud_id": self.cloud.get("id"), "name": cell} print("Creating cell entry %s with data %s" % (payload, data)) resp = requests.post(region_url, headers=self.headers, data=json.dumps(payload), verify=False) if resp.status_code != 201: raise Exception(resp.text) self.cell = resp.json() if data: c_id = resp.json()["id"] region_data_url = self.url + "/cells/%s/variables" % c_id resp = requests.put(region_data_url, headers=self.headers, data=json.dumps(data), verify=False) if resp.status_code != 200: print(resp.text)
def upload_es_json(es_json, es_index, es_base, id, doc_type='article'): """Uploads a document to the ElasticSearch index.""" # Using requests url = "{es_base}/{index}/{doc_type}/{id}".format( es_base=es_base, index=es_index.lower(), # ES index names must be lowercase doc_type=doc_type, id=id, ) headers = {"Content-Type": "application/json"} logger.info("Uploading to ES: PUT %s" % url) r = requests.put(url, headers=headers, data=es_json) if r.status_code >= 400: recoverable_error("ES upload failed with error: '%s'" % r.text, config.bypass_errors)
def predict(path): global n global model parentNode = os.path.split(path)[0] j = requests.get(FIREBASE_URL + parentNode + FIREBASE_ENDING_URL).json() sample = np.asarray([j[x] for x in j.keys() if "x_" in x]).reshape(1, -1) if sample.shape[1] == n: j['y_predicted'] = model.predict_proba(sample)[0][1] j['action'] = model.predict(sample)[0] r = requests.put(FIREBASE_URL + parentNode + FIREBASE_ENDING_URL, data = json.dumps(j)) print "Features: ", sample print "Prediction: ", model.predict_proba(sample) else: print "Couldn't make prediction, wrong dimension: ", sample.shape[1], " vs ", n # 2nd: SSEClient that prints stuff
def upload_model_to_nexus(project): """ Use 'pyb -P model_name="<model_name>" upload_model_to_nexus' to upload archived model to Nexus repository of the lab. If model_name == 'deeppavlov_docs', then documentation from build/docs will be archived and uploaded. archive_model task will be executed before. """ import requests, datetime os.chdir('build') model_name = project.get_property('model_name') file_name = model_name + '_' + datetime.date.today().strftime("%y%m%d") + '.tar.gz' url = 'http://share.ipavlov.mipt.ru:8080/repository/' url += 'docs/' if model_name == 'deeppavlov_docs' else 'models/' headers = {'Content-Type': 'application/binary'} with open(file_name, 'rb') as artifact: requests.put(url + model_name + '/' + file_name, headers=headers, data=artifact, auth=('jenkins', 'jenkins123'))
def renameorg(apikey, orgid, neworgname, suppressprint=False): __hasorgaccess(apikey, orgid) calltype = 'Organization Rename' puturl = '{0}/organizations/{1}'.format(str(base_url), str(orgid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } putdata = { 'name': format(str(neworgname)) } dashboard = requests.put(puturl, data=json.dumps(putdata), headers=headers) # # Call return handler function to parse Dashboard response # result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result # Create a new organization # https://dashboard.meraki.com/api_docs#create-a-new-organization
def updatecontact(apikey, networkid, contactid, name, suppressprint=False): calltype = 'Phone Contact' puturl = '{0}/networks/{1}/phoneContacts/{2}'.format(str(base_url), str(networkid), str(contactid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } putdata = {'name': name} dashboard = requests.put(puturl, data=json.dumps(putdata), headers=headers) result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result # Delete a phone contact. Google Directory contacts cannot be removed. # https://dashboard.meraki.com/api_docs#delete-a-phone-contact
def updatevlan(apikey, networkid, vlanid, vlanname=None, mxip=None, subnetip=None, suppressprint=False): calltype = 'VLAN' puturl = '{0}/networks/{1}/vlans/{2}'.format(str(base_url), str(networkid), str(vlanid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } putdata = {} if vlanname is not None: putdata['name'] = format(str(vlanname)) if mxip is not None: putdata['applianceIp'] = format(str(mxip)) if subnetip is not None: putdata['subnet'] = format(str(subnetip)) putdata = json.dumps(putdata) dashboard = requests.put(puturl, data=putdata, headers=headers) # # Call return handler function to parse Dashboard response # result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result
def renameorg(apikey, orgid, neworgname, suppressprint=False): __hasorgaccess(apikey, orgid) calltype = 'Organization Rename' puturl = '{0}/organizations/{1}'.format(str(base_url), str(orgid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } putdata = { 'name': format(str(neworgname)) } dashboard = requests.put(puturl, data=json.dumps(putdata), headers=headers) # # Call return handler function to parse Dashboard response # result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result
def lockTask(self, task): """Receives a task dictionary, obtainned with getTasks() method, and locks the task so other users can't access this task at same time. Usage: >>> pe.lockTask(task) """ locked = requests.get(self.client.baseurl +task['stepElement'], auth = self.client.cred) eTag = locked.headers['ETag'] locked = requests.put(self.client.baseurl + task['stepElement'], auth = self.client.cred, params={'action':'lock', 'If-Match':eTag} )
def endTask(self, task, comment=None): """Receives a task and finishes it, finishing the workflow itself or moving to the next step in the task. Is also possible to create a comment before ending the task. Usage: >>> pe.endTask(task) #or >>> pe.endTask(task, u'Completed the task!') """ params = {'action':'dispatch'} step = self.getStep(task) if step.get('systemProperties').get('responses')\ and not step.get('systemProperties').get('selectedResponse'): return "This task needs to be updated. Check the updateTask method." else: params['selectedResponse'] = 1 if comment: task = self.saveAndUnlockTask(task, comment) lock = self.lockTask(task) params['If-Match'] = task['ETag'] dispatched = requests.put(self.client.baseurl + task['stepElement'], auth = self.client.cred, params=params)
def upload_lifecycle(self, data): path = '/api/1.0/lifecycle_event/' url = self.url + path payload = data headers = { 'Authorization': 'Basic ' + base64.b64encode(self.username + ':' + self.password), 'Content-Type': 'application/x-www-form-urlencoded' } r = requests.put(url, data=payload, headers=headers, verify=False) if DEBUG: print '\t[+] Posting data: %s' % str(payload) print '\t[*] Status code: %d' % r.status_code print '\t[*] Response: %s' % str(r.text) return r.json()
def _upload_file(self, url: str, data: str, content_type: str, rrs: bool = False): ensure_aws4auth() auth = AWS4Auth(self.access_key, self.secret_key, self.region, 's3') if rrs: storage_class = 'REDUCED_REDUNDANCY' else: storage_class = 'STANDARD' headers = { 'Cache-Control': 'max-age=' + str(self.max_age), 'x-amz-acl': self.acl, 'x-amz-storage-class': storage_class } if content_type: headers['Content-Type'] = content_type res = requests.put(url, auth=auth, data=data, headers=headers) if not 200 <= res.status_code < 300: raise S3Error(res.text)
def setdevicedata(p_apikey, p_shardhost, p_nwid, p_device, p_field, p_value, p_movemarker): #modifies value of device record. Returns the new value #on failure returns one device record, with all values 'null' #p_movemarker is boolean: True/False movevalue = "false" if p_movemarker: movevalue = "true" time.sleep(API_EXEC_DELAY) try: r = requests.put('https://%s/api/v0/networks/%s/devices/%s' % (p_shardhost, p_nwid, p_device), data=json.dumps({p_field: p_value, 'moveMapMarker': movevalue}), headers={'X-Cisco-Meraki-API-Key': p_apikey, 'Content-Type': 'application/json'}) except: printusertext('ERROR 16: Unable to contact Meraki cloud') sys.exit(2) if r.status_code != requests.codes.ok: return ('null') return('ok')
def saucelabsSetup(self): self.log("Setup for saucelabs") auth = (self._remoteUsername, os.getenv("REMOTE_ACCESSKEY")) baseURL = "{0}/{1}".format(self.SAUCELABS_API_BASE, self._remoteUsername) jobsURL = "{0}/jobs".format(baseURL) response = requests.get(jobsURL, \ auth = auth, \ headers = { 'content-type': 'application/json' }, \ params = { 'full': 'true' }) self.log("response={0}".format(json.dumps(response.json()))) remoteJobid = self.saucelabsFindJobId(response.json(), self._remoteTestID()) self.saucelabsJobURL = '{0}/jobs/{1}'.format(baseURL, remoteJobid) self._resultReference = "{0}/{1}".format(self.SAUCELABS_RESULTS_BASE, remoteJobid) response = requests.put(self.saucelabsJobURL, auth = auth, headers = { 'content-type': 'application/json' }, data = json.dumps({'name': self._buildID() }))
def execute(self): # get signature auth = DSSAuth(self.http_method, self.access_key, self.secret_key, self.dss_op_path, content_type = 'application/octet-stream') signature = auth.get_signature() self.http_headers['Authorization'] = signature self.http_headers['Date'] = formatdate(usegmt=True) statinfo = os.stat(self.local_file_name) self.http_headers['Content-Length'] = statinfo.st_size self.http_headers['Content-Type'] = 'application/octet-stream' # construct request request_url = self.dss_url + self.dss_op_path data = open(self.local_file_name, 'rb') # make request resp = requests.put(request_url, headers = self.http_headers, data=data, verify = self.is_secure_request) return resp
def execute(self): # get signature query_str = 'partNumber=' + self.part_number + '&uploadId=' + self.upload_id auth = DSSAuth(self.http_method, self.access_key, self.secret_key, self.dss_op_path, query_str = self.dss_query_str_for_signature, content_type = 'application/octet-stream') signature = auth.get_signature() self.http_headers['Authorization'] = signature self.http_headers['Date'] = formatdate(usegmt=True) statinfo = os.stat(self.local_file_name) self.http_headers['Content-Length'] = statinfo.st_size self.http_headers['Content-Type'] = 'application/octet-stream' # construct request request_url = self.dss_url + self.dss_op_path if(self.dss_query_str is not None): request_url += '?' + self.dss_query_str data = open(self.local_file_name, 'rb') # make request resp = requests.put(request_url, headers = self.http_headers, data=data, verify = self.is_secure_request) return resp
def create_vh(self, sm_type, uuid): exists = False vh_name = '{0}-{1}'.format(sm_type,uuid) api = '/api/vhosts/' url_list = '{0}{1}'.format(self.host,api) url_create = '{0}{1}{2}'.format(self.host,api,vh_name) url_permission = '{0}/api/permissions/{1}/specific-management'.format(self.host,vh_name) data = '{"configure":".*","write":".*","read":".*"}' list = requests.get(url=url_list, auth= ('guest','guest')).json() for i in range(len(list)): if list[i]['name'] == vh_name: exists = True break if not exists: response = requests.put(url=url_create, headers=self.headers, auth=('guest', 'guest')) permission = requests.put(url=url_permission, headers=self.headers, data=data, auth=('guest', 'guest')) return response.status_code, permission.status_code else: return 0,0
def delete_all_zendesk_articles(self, category): ''' delete all articles in the category being filled with UV articles for our app, we had multiple uservoice knowledge bases, and each gets put in a zendesk category ''' print "**DELETING ALL SECTIONS/ARTICLES in destination category {}".format(category) url = '{}/api/v2/help_center/categories/{}/sections.json'.format(self.zendesk_url, category) response = requests.get(url, headers=self.headers, auth=self.credentials) if response.status_code != 200: print('FAILED to delete articles with error {}'.format(response.status_code)) exit() sections = response.json()['sections'] section_ids=list(section['id'] for section in sections) for section_id in section_ids: url = "{}/api/v2/help_center/sections/{}.json".format(self.zendesk_url, section_id) response = requests.delete(url, headers=self.headers, auth=self.credentials) if response.status_code != 204: print('FAILED to delete sections for category {} with error {}'.format(category, response.status_code)) exit()
def _upload_assets_to_OSF(dlgr_id, osf_id, provider="osfstorage"): """Upload experimental assets to the OSF.""" root = "https://files.osf.io/v1" snapshot_filename = "{}-code.zip".format(dlgr_id) snapshot_path = os.path.join("snapshots", snapshot_filename) r = requests.put( "{}/resources/{}/providers/{}/".format( root, osf_id, provider, ), params={ "kind": "file", "name": snapshot_filename, }, headers={ "Authorization": "Bearer {}".format( config.get("osf_access_token")), "Content-Type": "text/plain", }, data=open(snapshot_path, 'rb'), ) r.raise_for_status()
def _send_raw_http_request(self, method, url, data=None): self.__logger.debug('%s %s' % (method, url)) if method in ['POST', 'PUT', 'PATCH']: self.__logger.log(TINTRI_LOG_LEVEL_DATA, 'Data: %s' % data) headers = {'content-type': 'application/json'} if self.__session_id: headers['cookie'] = 'JSESSIONID=%s' % self.__session_id if method in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']: if method == 'GET': httpresp = requests.get(url, headers=headers, verify=False) elif method == 'POST': httpresp = requests.post(url, data, headers=headers, verify=False) elif method == 'PUT': httpresp = requests.put(url, data, headers=headers, verify=False) elif method == 'PATCH': httpresp = requests.patch(url, data, headers=headers, verify=False) elif method == 'DELETE': httpresp = requests.delete(url, headers=headers, verify=False) self._httpresp = httpresp # self._httpresp is for debugging only, not thread-safe return httpresp else: raise TintriError(None, message='Invalid HTTP method: ' + method) # This should never happen
def toggle_tracking(self, enabled: bool=True): """ Enable or disable automatic deck tracking :param bool enabled: If True, tracking is enabled, else it is disabled :return None: :raises: requests.exceptions.HTTPError on error """ logger.info('Called toggle_tracking()') endpoint = '/profile/settings/decks/toggle' url = self._url + endpoint val = 'true' if enabled else 'false' data = {'user[deck_tracking]': val, '_method': 'put'} logger.debug('POST on %s', endpoint) r = requests.post(url, auth=self._auth, data=data) r.raise_for_status()
def put(self, path='', **kwargs): """ :param path: str api path :param kwargs: additional parameters for request :return: dict response :raises: HTTPError :raises: InvalidJSONError """ r = requests.put(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs) r.raise_for_status() try: return r.json() except ValueError: raise InvalidJSONError(r.content)
def send_response(event, context, status, reason, data): import requests body = json.dumps( { 'Status': status, 'RequestId': event['RequestId'], 'StackId': event['StackId'], 'PhysicalResourceId': context.log_stream_name, 'Reason': reason, 'LogicalResourceId': event['LogicalResourceId'], 'Data': data } ) headers = { 'content-type': '', 'content-length': len(body) } r = requests.put(event['ResponseURL'], data=body, headers=headers)
def test_requests_mocks(self): self.assertTrue(isinstance(requests.get, Mock)) self.assertTrue(isinstance(requests.put, Mock)) self.assertTrue(isinstance(requests.post, Mock)) self.assertTrue(isinstance(requests.patch, Mock)) self.assertTrue(isinstance(requests.delete, Mock))
def put(self, path, params=None, body=None): if body: return requests.put(self.endpoint + path, params=params, data=body, headers=self.headers, verify=False) else: return requests.put(self.endpoint + path, params=params, headers=self.headers, verify=False)
def put_json(location, json_data): """ Performs a PUT and passes the data to the URL location """ result = requests.put(location, data=json_data, auth=(USERNAME, PASSWORD), verify=SSL_VERIFY, headers=POST_HEADERS) return result.json()
def authenticated_request_put(self, url, data): self.custom_http_headers['content-type'] = 'application/json' response = requests.put(url, data, headers=self.custom_http_headers) del self.custom_http_headers['content-type'] return response
def put(self, endpoint, payload): return self.call_api('put', endpoint, payload)
def delete(self, endpoint): return self.call_api('delete', endpoint, payload=None) # put methods
def update(self, element_id, **kwargs): """ updates an element on pipedrive using /endpoint/:id as url """ endpoint = str(self.endpoint) + '/' + str(element_id) restpoint = self.restify(endpoint) content = self.put(restpoint, kwargs) return content
def update(self, api_obj, data, obj_id=0, url_params=''): if not data: raise TypeError("Missing object data.") if url_params: url_params = '?' + url_params.lstrip("?") if not obj_id: obj_id = data['id'] url = self.server + '/' + api_obj + '/' + str(obj_id) + url_params r = requests.put(url, data=json.dumps(data), headers=self.tokenHeaderJson, verify=self.sslVerify) return self.__check_result(r) # Delete object using HTTP DELETE request.
def add_job(self, session_id, job_id): # endpoint /v1/sessions/{sessions_id}/jobs/{job_id} endpoint = '{0}{1}/jobs/{2}'.format(self.endpoint, session_id, job_id) r = requests.put(endpoint, headers=self.headers, verify=self.verify) if r.status_code != 204: raise exceptions.ApiClientException(r) return
def replace_logset(logset_id, params): """ Replace a given logset with the details provided """ headers = api_utils.generate_headers('rw') try: response = requests.put(_url((logset_id,))[1], json=params, headers=headers) handle_response(response, 'Update logset with details %s failed.\n' % params, 200) except requests.exceptions.RequestException as error: sys.stderr.write(error) sys.exit(1)
def delete_user_from_team(team_id, user_key): """ Delete a user from a team. """ headers = api_utils.generate_headers('rw') params = {'teamid': team_id} try: response = requests.request('GET', _url((team_id,))[1], params=params, headers=headers) if response.status_code == 200: params = { 'team': { 'name': response.json()['team']['name'], 'users': [user for user in response.json()['team']['users'] if user['id'] != user_key] } } headers = api_utils.generate_headers('rw') try: response = requests.put(_url((team_id,))[1], json=params, headers=headers) if response_utils.response_error(response): # Check response has no errors click.echo('Deleting user from team with key: %s failed.' % team_id, err=True) sys.exit(1) elif response.status_code == 200: click.echo("Deleted user with key: '%s' from team: %s" % (user_key, team_id)) except requests.exceptions.RequestException as error: click.echo(error, err=True) sys.exit(1) elif response_utils.response_error(response): click.echo('Cannot find team. Deleting user from team %s failed.' % team_id, err=True) sys.exit(1) except requests.exceptions.RequestException as error: click.echo(error, err=True) sys.exit(1)
def put_json(location, json_data): """ Performs a PUT and passes the data to the URL location """ result = requests.put( location, data=json_data, auth=(USERNAME, PASSWORD), verify=True, headers=POST_HEADERS) return result.json()
def add_update_show(self, beid, subgroup): """Adds or edits a show in sonarr. Args: beid (int): The TVDB ID of the show we're adding or editing. subgroup (str): The subgroup we're using for this show. """ _logger.debug("Entering add_update_show.") tag = self._subgroup_tag(subgroup) show = None _logger.debug("Getting all shows being watched and searching for show with ID {0}.".format(beid)) shows = self.get_watching_shows() for item in shows: if int(item['tvdbId']) == int(beid): _logger.debug("Found show {0} with ID {1} in watching shows! Updating instead of creating.".format(item['title'], item['tvdbId'])) show = item if not show: _logger.debug("Show not found in watching list, creating a new one.") show = self.get_show(beid) payload = { "tvdbId": int(beid), "title": show['title'], "qualityProfileId": self._quality_profile, "titleSlug": show['titleSlug'], "images": show['images'], "seasons": show['seasons'], "rootFolderPath": self._library_path, "seriesType": "anime", "seasonFolder": True, "addOptions": {"ignoreEpisodesWithFiles":True}, "tags": [tag] } out = requests.post("{0}/api/series?apikey={1}".format(self.url, self.api_key), json=payload) else: show['tags'] = [tag] requests.put("{0}/api/series?apikey={1}".format(self.url, self.api_key), json=show)
def put_container(self, container_name): """ create container for authorized account :param container_name: container's name for create :return: http response """ # create or update container url = self.url + '/' + container_name # request api response = requests.put(url, headers=self.base_headers) return response
def put_object_to_container(self, container_name, file_path=None, file_name=None, file_stream=None): """ upload file to target container :param container_name: target container_name - string required=True :param file_path: file's path for upload - string :param file_name: file's name for get urls - string :param file_stream: file's data string - string :return: submit response """ # make file object to comfortable for uploading if not file_name: file_name = file_path.split('/')[-1] url = self.url + '/' + container_name + '/' + file_name if not file_stream: file_stream = open(file_path, 'rb').read() response = requests.put( url, data=file_stream, headers=self.base_headers ) return response