我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.delete()。
def api_call(self, url, http_method, data): headers = {"Content-Type" : "application/json", "x-api-key" : self.api_key, "authorization" : "Bearer " + self.access_token} r = None if http_method is PyCurlVerbs.POST: r = requests.post(url, data=json.dumps(data), headers=headers) elif http_method is PyCurlVerbs.DELETE: r = requests.delete(url, headers=headers) elif http_method is PyCurlVerbs.PUT: r = requests.put(url, data=json.dumps(data), headers=headers) elif http_method is PyCurlVerbs.GET: r = requests.get(url, headers=headers) return r
def delete(api_key_id): """ Delete an api key with the provided ID """ action, url = _url((api_key_id,)) headers = api_utils.generate_headers('owner', method='DELETE', body='', action=action) try: response = requests.delete(url, headers=headers) if response_utils.response_error(response): sys.stderr.write('Deleting api key failed.') sys.exit(1) elif response.status_code == 204: sys.stdout.write('Deleted api key with id: %s \n' % api_key_id) except requests.exceptions.RequestException as error: sys.stderr.write(error) sys.exit(1)
def call_api(self, method, endpoint, payload): url = urlparse.urljoin(self.api_base_url, endpoint) if method == 'POST': response = requests.post(url, data=payload) elif method == 'delete': response = requests.delete(url) elif method == 'put': response = requests.put(url, data=payload) else: if self.api_key: payload.update({'api_token': self.api_key}) response = requests.get(url, params=payload) content = json.loads(response.content) return content
def delete_pipeline(app='', pipeline_name=''): """Delete _pipeline_name_ from _app_.""" safe_pipeline_name = normalize_pipeline_name(name=pipeline_name) url = murl.Url(API_URL) LOG.warning('Deleting Pipeline: %s', safe_pipeline_name) url.path = 'pipelines/{app}/{pipeline}'.format(app=app, pipeline=safe_pipeline_name) response = requests.delete(url.url, verify=GATE_CA_BUNDLE, cert=GATE_CLIENT_CERT) if not response.ok: LOG.debug('Delete response code: %d', response.status_code) if response.status_code == requests.status_codes.codes['method_not_allowed']: raise SpinnakerPipelineDeletionFailed('Failed to delete "{0}" from "{1}", ' 'possibly invalid Pipeline name.'.format(safe_pipeline_name, app)) else: LOG.debug('Pipeline missing, no delete required.') LOG.debug('Deleted "%s" Pipeline response:\n%s', safe_pipeline_name, response.text) return response.text
def delete_log(log_id): """ Delete a log with the provided log ID """ headers = api_utils.generate_headers('rw') try: response = requests.delete(_url(('logs', log_id))[1], headers=headers) if response_utils.response_error(response): sys.stderr.write('Delete log failed.') sys.exit(1) elif response.status_code == 204: sys.stdout.write('Deleted log with id: %s \n' % log_id) except requests.exceptions.RequestException as error: sys.stderr.write(error) sys.exit(1)
def delete_team(team_id): """ Delete a team with the provided team ID. """ headers = api_utils.generate_headers('rw') try: response = requests.delete(_url((team_id,))[1], headers=headers) if response_utils.response_error(response): # Check response has no errors click.echo('Delete team failed.', err=True) sys.exit(1) elif response.status_code == 204: click.echo('Deleted team with id: %s.' % team_id) except requests.exceptions.RequestException as error: click.echo(error, err=True) sys.exit(1)
def remove_show(self, beid): """Remove a given show from sonarr. It will not delete files. The backend ID we're given is not the ID we need, so the show is looked up first. It will only delete shows if the DELETE_SHOWS config value is set. Args: beid (int): The TVDB ID of the show. """ _logger.debug("Entering remove_show. Getting all shows being watched from sonarr.") if self._delete_shows: _logger.debug("Config line set to delete shows from sonarr. Continuing.") shows = self.get_watching_shows() _logger.debug("Got all shows. Attempting to find show with ID {0}".format(beid)) show = [x for x in shows if x['beid'] == beid][0] _logger.debug("Found show {0}. Deleting.".format(show['title'])) requests.delete("{0}/api/series/{1}?apikey={2}".format(self.url, show['id'], self.api_key)) _logger.debug("Config line set to leave shows in sonarr, exiting with no changes.")
def post_account_metadata(self, params, action): """ post account's metadata for add, delete :param params: params should iterable by key, value :param action: means add or delete ['add', 'delete'] :return: result status code """ # post account's metadata url = self.url headers = self.base_headers for key, value in params.iteritems(): if action == 'add': headers['X-Account-Meta-' + key] = value else: headers['X-Remove-Account-Meta-' + key] = value response = requests.post(url, headers=headers) return response.status_code
def post_container_metadata(self, container_name, params, action): """ post account's metadata for add, delete :param params: params should iterable by key, value :param action: means add or delete ['add', 'delete'] :return: result status code """ # post account's metadata url = self.url + '/' + container_name headers = self.base_headers for key, value in params.iteritems(): if action == 'add': headers['X-Container-Meta-' + key] = value else: headers['X-Remove-Container-Meta-' + key] = value response = requests.post(url, headers=headers) return response.status_code
def post_object_metadata(self, container_name, file_name, params, action): """ post account's metadata for add, delete :param params: params should iterable by key, value :param action: means add or delete ['add', 'delete'] :return: result status code """ # post account's metadata url = self.url + '/' + container_name + '/' + file_name headers = self.base_headers for key, value in params.iteritems(): if action == 'add': headers['X-Container-Meta-' + key] = value else: headers['X-Remove-Container-Meta-' + key] = value response = check_response_status( requests.post(url, headers=headers) ) return response.status_code
def post_container_metadata(self, container_name, params, action): """ post account's metadata for add, delete :param container_name: target container name :param params: params should iterable by key, value :param action: means add or delete ['add', 'delete'] :return: result status code """ # post account's metadata url = self.url + '/' + container_name headers = self.base_headers for key, value in params.iteritems(): if action == 'add': headers['X-Container-Meta-' + key] = value else: headers['X-Remove-Container-Meta-' + key] = value response = requests.post(url, headers=headers) return response.status_code
def delete_post(self, pk, force=False): """ Delete a Post. Arguments --------- pk : int The post id you want to delete. force : bool Whether to bypass trash and force deletion. """ resp = self._delete('posts/{0}'.format(pk), params=locals()) if resp.status_code == 200: return True else: raise Exception(resp.json()) # Post Reivion Methods
def deladmin(apikey, orgid, adminid, suppressprint=False): # # Confirm API Key has Admin Access Otherwise Raise Error # __hasorgaccess(apikey, orgid) calltype = 'Administrator' delurl = '{0}/organizations/{1}/admins/{2}'.format(str(base_url), str(orgid), str(adminid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } dashboard = requests.delete(delurl, headers=headers) # # Call return handler function to parse Dashboard response # result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result ### CLIENTS ### # List the clients of a device, up to a maximum of a month ago. The usage of each client is returned in kilobytes. If the device is a switch, the switchport is returned; otherwise the switchport field is null. # https://dashboard.meraki.com/api_docs#list-the-clients-of-a-device-up-to-a-maximum-of-a-month-ago
def deltemplate(apikey, orgid, templateid, suppressprint=False): # # Confirm API Key has Admin Access Otherwise Raise Error # __hasorgaccess(apikey, orgid) calltype = 'Template' delurl = '{0}/organizations/{1}/configTemplates/{2}'.format(str(base_url), str(orgid), str(templateid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } dashboard = requests.delete(delurl, headers=headers) # # Call return handler function to parse Dashboard response # result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result ### DEVICES ### # List the devices in a network # https://dashboard.meraki.com/api_docs#list-the-devices-in-a-network
def delnetwork(apikey, networkid, suppressprint=False): calltype = 'Network' delurl = '{0}/networks/{1}'.format(str(base_url), str(networkid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } dashboard = requests.delete(delurl, headers=headers) # # Call return handler function to parse Dashboard response # result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result # Bind a network to a template. # https://dashboard.meraki.com/api_docs#bind-a-network-to-a-template
def delphone(apikey, networkid, serial, suppressprint=False): calltype = 'Phone Assignment' delurl = '{0}/networks/{1}/phoneAssignments/{2}'.format(str(base_url), str(networkid), str(serial)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } dashboard = requests.delete(delurl, headers=headers) result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result ### PHONE CONTACTS ### # List the phone contacts in a network # https://dashboard.meraki.com/api_docs#list-the-phone-contacts-in-a-network
def delcontact(apikey, networkid, contactid, suppressprint=False): calltype = 'Phone Contact' delurl = '{0}/networks/{1}/phoneContacts/{2}'.format(str(base_url), str(networkid), str(contactid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } dashboard = requests.delete(delurl, headers=headers) result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result ### PHONE NUMBERS ### # List all the phone numbers in a network # https://dashboard.meraki.com/api_docs#list-all-the-phone-numbers-in-a-network
def delsamlrole(apikey, orgid, roleid, suppressprint=False): # # Confirm API Key has Admin Access Otherwise Raise Error # __hasorgaccess(apikey, orgid) calltype = 'SAML Role' delurl = '{0}/organizations/{1}/samlRoles/{2}'.format(str(base_url), str(orgid), str(roleid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } dashboard = requests.delete(delurl, headers=headers) # # Call return handler function to parse Dashboard response # result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result ### SM (Systems Manager) ### # List the devices enrolled in an SM network with various specified fields and filters # https://dashboard.meraki.com/api_docs#list-the-devices-enrolled-in-an-sm-network-with-various-specified-fields-and-filters
def addstaticroute(apikey, networkid, name, subnet, ip, suppressprint=False): calltype = 'Static Route' posturl = '{0}/networks/{1}/staticRoutes'.format(str(base_url), str(networkid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } postdata = { 'name': format(str(name)), 'subnet': format(str(subnet)), 'gatewayIp': format(str(ip)) } dashboard = requests.post(posturl, data=json.dumps(postdata), headers=headers) result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result # Delete a static route from a network # https://dashboard.meraki.com/api_docs#delete-a-static-route-from-a-network
def delstaticroute(apikey, networkid, routeid, suppressprint=False): calltype = 'Static Route' delurl = '{0}/networks/{1}/staticRoutes/{2}'.format(str(base_url), str(networkid), str(routeid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } dashboard = requests.delete(delurl, headers=headers) result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result ### SWITCH PORTS ### # List the switch ports for a switch # https://dashboard.meraki.com/api_docs#list-the-switch-ports-for-a-switch
def delvlan(apikey, networkid, vlanid, suppressprint=False): calltype = 'VLAN' delurl = '{0}/networks/{1}/vlans/{2}'.format(str(base_url), str(networkid), str(vlanid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } dashboard = requests.delete(delurl, headers=headers) # # Call return handler function to parse Dashboard response # result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result # MX performance score
def deltemplate(apikey, orgid, templateid, suppressprint=False): # # Confirm API Key has Admin Access Otherwise Raise Error # __hasorgaccess(apikey, orgid) calltype = 'Template' delurl = '{0}/organizations/{1}/configTemplates/{2}'.format(str(base_url), str(orgid), str(templateid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } dashboard = requests.delete(delurl, headers=headers) # # Call return handler function to parse Dashboard response # result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result
def delsamlrole(apikey, orgid, roleid, suppressprint=False): # # Confirm API Key has Admin Access Otherwise Raise Error # __hasorgaccess(apikey, orgid) calltype = 'SAML Role' delurl = '{0}/organizations/{1}/samlRoles/{2}'.format(str(base_url), str(orgid), str(roleid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } dashboard = requests.delete(delurl, headers=headers) # # Call return handler function to parse Dashboard response # result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result
def deladmin(apikey, orgid, adminid, suppressprint=False): # # Confirm API Key has Admin Access Otherwise Raise Error # __hasorgaccess(apikey, orgid) calltype = 'Administrator' delurl = '{0}/organizations/{1}/admins/{2}'.format(str(base_url), str(orgid), str(adminid)) headers = { 'x-cisco-meraki-api-key': format(str(apikey)), 'Content-Type': 'application/json' } dashboard = requests.delete(delurl, headers=headers) # # Call return handler function to parse Dashboard response # result = __returnhandler(dashboard.status_code, dashboard.text, calltype, suppressprint) return result
def del_admin(self, admin_id): """ Delete a specified admin account. Args: admin_id: ID string or email of the admin to be deleted. Returns: deleted: The request object of the deleted admin, or None if the passed admin ID doesn't exist. """ exists = self.__admin_exists(admin_id) if not exists: return None elif not admin_id.isdigit(): admin_id = exists["id"] url = self.url+"/"+admin_id return requests.delete(url, headers=self.headers)
def cleanup_os_logical_ports(self): """ Delete all logical ports created by OpenStack """ lports = self.get_logical_ports() os_lports = self.get_os_resources(lports) LOG.info("Number of OS Logical Ports to be deleted: %s", len(os_lports)) # logical port vif detachment self.update_logical_port_attachment(os_lports) for p in os_lports: endpoint = '/logical-ports/%s' % p['id'] response = self.delete(endpoint=endpoint) if response.status_code == requests.codes.ok: LOG.info("Successfully deleted logical port %s", p['id']) else: LOG.error("Failed to delete lport %(port_id)s, response " "code %(code)s", {'port_id': p['id'], 'code': response.status_code})
def _ghost2loggergpurge(self): # Let's deal with the JSON API calls here self._logger.info('[Ghost2logger]: Purge Ghost2logger') HEADERS = {"Content-Type": 'application/json'} _URL = self._ghost_url + "/v1/all" try: r = requests.delete(_URL, headers=HEADERS, verify=False, auth=(self._username, self._password)) _data = r.json() self._logger.info('[Ghost2logger]: Purged ghost2logger' + str(_data)) except Exception as e: self._logger.info('[Ghost2logger]: Cannot purge Ghost2logger: ' + str(e))
def _delete_test_cluster(self, cluster_id): self.logger.info("Attempting to clean up the test cluster that was spun up for the unit test: %s" % cluster_id) config = load_config() url, user_id, password = get_rnr_credentials(config) response = requests.get('%s/v1/solr_clusters/%s' % (url, cluster_id), auth=(user_id, password), headers={'x-global-transaction-id': 'Rishavs app', 'Content-type': 'application/json'}) response_text = json.dumps(response.json(), indent=4, sort_keys=True) if response.status_code == 200: self.logger.info('Found a test cluster that needs cleanup: %s' % response_text) response = requests.delete('%s/v1/solr_clusters/%s' % (url, cluster_id), auth=(user_id, password), headers={'x-global-transaction-id': 'Rishavs app', 'Content-type': 'application/json'}) response.raise_for_status() self.logger.info("Successfully deleted test cluster: %s" % cluster_id) else: self.logger.info('No cleanup required for cluster id: %s (got response: %s)' % (cluster_id, response_text))
def _try_deleting_ranker(self, ranker_id): config = load_config() url, user_id, password = get_rnr_credentials(config) response = requests.get(path.join(url, 'v1/rankers', ranker_id), auth=(user_id, password), headers={'x-global-transaction-id': 'Rishavs app', 'Content-type': 'application/json'}) response_text = json.dumps(response.json(), indent=4, sort_keys=True) if response.status_code == 200: self.logger.info('Found a test ranker that needs cleanup: %s' % response_text) response = requests.delete(path.join(url, 'v1/rankers', ranker_id), auth=(user_id, password), headers={'x-global-transaction-id': 'Rishavs app', 'Content-type': 'application/json'}) response.raise_for_status() self.logger.info("Successfully deleted test ranker: %s" % ranker_id) else: self.logger.info('No cleanup required for ranker id: %s (got response: %s)' % (ranker_id, response_text))
def delete_recipe(recipe_id): # clear fault injection rules url = '{0}/v1/rules?tag={1}'.format(a8_controller_url, recipe_id) headers = {} if a8_controller_token != "" : headers['Authorization'] = "Bearer " + token try: r = requests.delete(url, headers=headers) except Exception, e: sys.stderr.write("Could not DELETE {0}".format(url)) sys.stderr.write("\n") sys.stderr.write(str(e)) sys.stderr.write("\n") abort(500, "Could not DELETE {0}".format(url)) if r.status_code != 200 and r.status_code != 204: abort(r.status_code) return ""
def clear_rules_from_all_proxies(self): """ Clear fault injection rules from all known service proxies. """ if self.debug: print 'Clearing rules' try: headers = {"Content-Type" : "application/json"} if self.a8_controller_token != "" : headers['Authorization'] = "Bearer " + self.a8_controller_token for rule_id in self._rule_ids: resp = requests.delete(self.a8_controller_url + "?id=" + rule_id, headers = headers) resp.raise_for_status() except requests.exceptions.ConnectionError, e: print "FAILURE: Could not communicate with control plane %s" % self.a8_controller_url print e sys.exit(3) #TODO: Create a plugin model here, to support gremlinproxy and nginx
def roll_back_instantiation(self, serv_id): """ This method tries to roll back the instantiation workflow if an error occured. It will send messages to the SMR and the IA to remove deployed SSMs, FSMs and stacks. It will instruct the Repositories to delete the records. """ # Kill the stack corr_id = str(uuid.uuid4()) payload = json.dumps({'instance_uuid': serv_id}) self.manoconn.notify(t.IA_REMOVE, payload, reply_to=t.IA_REMOVE, correlation_id=corr_id) # Kill the SSMs and FSMs self.terminate_ssms(serv_id, require_resp=False) self.terminate_fsms(serv_id, require_resp=False) LOG.info("Instantiation aborted, cleanup completed") # TODO: Delete the records
def tearDown(self): #Killing the scaling Executive if self.plex_proc is not None: self.plex_proc.terminate() del self.plex_proc #Killing the connection with the broker try: self.manoconn.stop_connection() self.sm_connection.stop_connection() except Exception as e: LOG.exception("Stop connection exception.") #Clearing the threading helpers del self.wait_for_event1 del self.wait_for_event2 url_user = "{0}/api/users/specific-management".format(self.man_host) url_vhost = "{0}/api/vhosts/fsm-1234".format(self.man_host) requests.delete(url=url_user, headers=self.headers, auth=('guest', 'guest')) requests.delete(url=url_vhost, headers=self.headers, auth=('guest', 'guest')) #Method that terminates the timer that waits for an event
def delete_configuration(self, EP): try: url = self.base_url + self.CORPS_EP + self.corp + self.SITES_EP + self.site + EP with open(self.file) as data_file: data = json.load(data_file) for config in data['data']: requests.delete(url + "/" + config['id'], cookies=self.authn.cookies, headers=self.get_headers()) print("Delete complete!") except Exception as e: print('Error: %s ' % str(e)) print('Query: %s ' % url) quit()
def delete_all_zendesk_articles(self, category): ''' delete all articles in the category being filled with UV articles for our app, we had multiple uservoice knowledge bases, and each gets put in a zendesk category ''' print "**DELETING ALL SECTIONS/ARTICLES in destination category {}".format(category) url = '{}/api/v2/help_center/categories/{}/sections.json'.format(self.zendesk_url, category) response = requests.get(url, headers=self.headers, auth=self.credentials) if response.status_code != 200: print('FAILED to delete articles with error {}'.format(response.status_code)) exit() sections = response.json()['sections'] section_ids=list(section['id'] for section in sections) for section_id in section_ids: url = "{}/api/v2/help_center/sections/{}.json".format(self.zendesk_url, section_id) response = requests.delete(url, headers=self.headers, auth=self.credentials) if response.status_code != 204: print('FAILED to delete sections for category {} with error {}'.format(category, response.status_code)) exit()
def _send_raw_http_request(self, method, url, data=None): self.__logger.debug('%s %s' % (method, url)) if method in ['POST', 'PUT', 'PATCH']: self.__logger.log(TINTRI_LOG_LEVEL_DATA, 'Data: %s' % data) headers = {'content-type': 'application/json'} if self.__session_id: headers['cookie'] = 'JSESSIONID=%s' % self.__session_id if method in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']: if method == 'GET': httpresp = requests.get(url, headers=headers, verify=False) elif method == 'POST': httpresp = requests.post(url, data, headers=headers, verify=False) elif method == 'PUT': httpresp = requests.put(url, data, headers=headers, verify=False) elif method == 'PATCH': httpresp = requests.patch(url, data, headers=headers, verify=False) elif method == 'DELETE': httpresp = requests.delete(url, headers=headers, verify=False) self._httpresp = httpresp # self._httpresp is for debugging only, not thread-safe return httpresp else: raise TintriError(None, message='Invalid HTTP method: ' + method) # This should never happen
def sections_delete(self, section_id,): headers = {'token': self.token} p = requests.delete(self.base + "sections/%s/" %(section_id), headers=headers) print p.text sections_delete = json.loads(p.text) if p.status_code != 200: logging.error("phpipam.sections_delete: Failure %s" % (p.status_code)) logging.error(sections_delete) self.error = p.status_code self.error_message = sections_delete['message'] return self.error, self.error_message if not sections_delete['success']: logging.error("phpipam.sections_delete: FAILURE: %s" % (sections_delete['code'])) self.error = sections_delete['code'] return self.error logging.info("phpipam.sections_delete: success %s" % (sections_delete['success'])) return sections_delete['code']
def subnet_delete(self, subnet_id, ): headers = {'token': self.token} p = requests.delete(self.base + "subnets/%s/" % (subnet_id), headers=headers) print p.text subnets_delete = json.loads(p.text) if p.status_code != 200: logging.error("phpipam.subnets_delete: Failure %s" % (p.status_code)) logging.error(subnets_delete) self.error = p.status_code self.error_message = subnets_delete['message'] return self.error, self.error_message if not subnets_delete['success']: logging.error("phpipam.subnets_delete: FAILURE: %s" % (subnets_delete['code'])) self.error = subnets_delete['code'] return self.error logging.info("phpipam.subnets_delete: success %s" % (subnets_delete['success'])) return subnets_delete['code']
def vlan_delete(self, vlan_id, ): headers = {'token': self.token} p = requests.delete(self.base + "vlans/%s/" % (vlan_id), headers=headers) print p.text vlans_delete = json.loads(p.text) if p.status_code != 200: logging.error("phpipam.vlans_delete: Failure %s" % (p.status_code)) logging.error(vlans_delete) self.error = p.status_code self.error_message = vlans_delete['message'] return self.error, self.error_message if not vlans_delete['success']: logging.error("phpipam.vlans_delete: FAILURE: %s" % (vlans_delete['code'])) self.error = vlans_delete['code'] return self.error logging.info("phpipam.vlans_delete: success %s" % (vlans_delete['success'])) return vlans_delete['code']
def delete_domain_whitelist(): """ Deletes the domain whitelist set previously . :usage: >>> response = fbbotw.delete_domain_whitelist() :return: `Response object <http://docs.python-requests.org/en/\ master/api/#requests.Response>`_ :facebook docs: `/domain-whitelisting#delete <https://developers.facebook.\ com/docs/messenger-platform/messenger-profile/domain-whitelisting#delete>`_ """ url = MESSENGER_PROFILE_URL.format(access_token=PAGE_ACCESS_TOKEN) payload = {} payload['fields'] = ["whitelisted_domains"] data = json.dumps(payload) status = requests.delete(url, headers=HEADER, data=data) return status
def _pdns_delete(url): # We first delete the zone from nslord, the main authoritative source of our DNS data. # However, we do not want to wait for the zone to expire on the slave ("nsmaster"). # We thus issue a second delete request on nsmaster to delete the zone there immediately. r1 = requests.delete(settings.NSLORD_PDNS_API + url, headers=headers_nslord) if r1.status_code < 200 or r1.status_code >= 300: # Deletion technically does not fail if the zone didn't exist in the first place if r1.status_code == 422 and 'Could not find domain' in r1.text: pass else: raise PdnsException(r1) # Delete from nsmaster as well r2 = requests.delete(settings.NSMASTER_PDNS_API + url, headers=headers_nsmaster) if r2.status_code < 200 or r2.status_code >= 300: # Deletion technically does not fail if the zone didn't exist in the first place if r2.status_code == 422 and 'Could not find domain' in r2.text: pass else: raise PdnsException(r2) return (r1, r2)
def logs_download(): """Download (and then delete) the logs from DocuSign The logs are stored in files/log/<account_id>/log_<timestamp>_<orig_name> <orig_name> is the original name of the file from the platform. Eg: 00_OK_GetAccountSharedAccess.txt 01_OK_GetFolderList.txt 02_OK_ExecuteLoggedApiBusinessLogic.txt 03_Created_RequestRecipientToken.txt 04_OK_GetEnvelope.txt 05_OK_GetEnvelope.txt 06_OK_SendEnvelope.txt Returns {err: False or a problem string, entries: log_entries, err_code } returns an array of just the new log_entries """ global auth auth = ds_authentication.get_auth() if auth["err"]: return {"err": auth["err"], "err_code": auth["err_code"]} r = logging_do_download() # returns {err, new_entries} return r
def delete(self, path='', **kwargs): """ :param path: str api path :param kwargs: additional parameters for request :return: dict response :raises: HTTPError :raises: InvalidJSONError """ r = requests.delete(self._path_join(path), headers=self.auth_custom, auth=self.auth, **kwargs) r.raise_for_status() try: return r.json() except ValueError: raise InvalidJSONError(r.content)
def httpdo(method, url, data=None): """ Do HTTP Request """ if isinstance(data, dict): data = json.dumps(data) if DEBUG: print "Shell: curl -X {method} -d '{data}' '{url}'".format(method=method, data=data or '', url=url) fn = dict(GET=requests.get, POST=requests.post, DELETE=requests.delete)[method] response = fn(url, data=data) retjson = response.json() if DEBUG: print 'Return:', json.dumps(retjson, indent=4) r = convert(retjson) if r.status != 0: raise WebDriverError(r.status, r.value) return r
def delete_logstore(self, project_name, logstore_name): """ delete log store Unsuccessful opertaion will cause an LogException. :type project_name: string :param project_name: the Project name :type logstore_name: string :param logstore_name: the logstore name :return: DeleteLogStoreResponse :raise: LogException """ headers = {} params = {} resource = "/logstores/" + logstore_name (resp, header) = self._send("DELETE", project_name, None, resource, params, headers) return DeleteLogStoreResponse(header, resp)
def delete_shard(self, project_name, logstore_name, shardId): """ delete a readonly shard Unsuccessful opertaion will cause an LogException. :type project_name: string :param project_name: the Project name :type logstore_name: string :param logstore_name: the logstore name :type shardId: int :param shardId: the read only shard id :return: ListShardResponse :raise: LogException """ headers = {} params = {} resource = "/logstores/" + logstore_name + "/shards/" + str(shardId) (resp, header) = self._send("DELETE", project_name, None, resource, params, headers) return DeleteShardResponse(header, resp)
def delete_index(self, project_name, logstore_name): """ delete index of a logstore Unsuccessful opertaion will cause an LogException. :type project_name: string :param project_name: the Project name :type logstore_name: string :param logstore_name: the logstore name :return: DeleteIndexResponse :raise: LogException """ headers = {} params = {} resource = "/logstores/" + logstore_name + "/index" (resp, header) = self._send("DELETE", project_name, None, resource, params, headers) return DeleteIndexResponse(header, resp)
def delete_logtail_config(self, project_name, config_name): """ delete logtail config in a project Unsuccessful opertaion will cause an LogException. :type project_name: string :param project_name: the Project name :type config_name: string :param config_name: the logtail config name :return: DeleteLogtailConfigResponse :raise: LogException """ headers = {} params = {} resource = "/configs/" + config_name (resp, headers) = self._send("DELETE", project_name, None, resource, params, headers) return DeleteLogtailConfigResponse(headers, resp)