我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用requests.patch()。
def test_chain_with_different_for_the_nth_time_error(server: FakeServer): requests.patch(server.base_uri + "/users", json={"name": "new_name", "level": 5}) requests.patch(server.base_uri + "/users", json={"name": "new_name", "level": 6}) requests.patch(server.base_uri + "/users", json={"name": "new_name", "level": 7}) with pytest.raises(AssertionError) as error: expect_that(server.was_requested("patch", "/users"). for_the_first_time(). with_content_type("application/json"). with_body('{"name": "new_name", "level": 6}'). for_the_second_time(). with_content_type("application/json"). with_body('{"name": "new_name", "level": 7}'). for_the_3_time(). with_content_type("application/json"). with_body('{"name": "new_name", "level": 8}')) assert str(error.value) == ("Expect that server was requested with [PATCH] http://localhost:8081/users.\n" "For the 1 time: with body '{\"name\": \"new_name\", \"level\": 6}'.\n" "But for the 1 time: body was '{\"name\": \"new_name\", \"level\": 5}'.\n" "For the 2 time: with body '{\"name\": \"new_name\", \"level\": 7}'.\n" "But for the 2 time: body was '{\"name\": \"new_name\", \"level\": 6}'.\n" "For the 3 time: with body '{\"name\": \"new_name\", \"level\": 8}'.\n" "But for the 3 time: body was '{\"name\": \"new_name\", \"level\": 7}'.")
def finish_proxy_log(self, data): """ ???????, ?????? ??? :param data: ???? data = { "proxy_log_id": 123123, "date_finished": timestamp, } """ assert isinstance(data.get('date_finished'), (int, float)) data['date_finished'] = timestamp_to_datetime_str(data['date_finished']) data['is_failed'] = 1 if data.get('is_failed') else 0 data['is_finished'] = 1 proxy_log_id = data.get('proxy_log_id') or 0 r, content = self.patch('finish-proxy-log', pk=proxy_log_id, data=data) if r.status_code != 200: logging.warning('Finish proxy log failed: %s' % proxy_log_id) return False return True
def _send_raw_http_request(self, method, url, data=None): self.__logger.debug('%s %s' % (method, url)) if method in ['POST', 'PUT', 'PATCH']: self.__logger.log(TINTRI_LOG_LEVEL_DATA, 'Data: %s' % data) headers = {'content-type': 'application/json'} if self.__session_id: headers['cookie'] = 'JSESSIONID=%s' % self.__session_id if method in ['GET', 'POST', 'PUT', 'PATCH', 'DELETE']: if method == 'GET': httpresp = requests.get(url, headers=headers, verify=False) elif method == 'POST': httpresp = requests.post(url, data, headers=headers, verify=False) elif method == 'PUT': httpresp = requests.put(url, data, headers=headers, verify=False) elif method == 'PATCH': httpresp = requests.patch(url, data, headers=headers, verify=False) elif method == 'DELETE': httpresp = requests.delete(url, headers=headers, verify=False) self._httpresp = httpresp # self._httpresp is for debugging only, not thread-safe return httpresp else: raise TintriError(None, message='Invalid HTTP method: ' + method) # This should never happen
def patch(self, udid): data = tornado.escape.json_decode(self.request.body) id = data['id'] timeout = float(data.get('timeout', 20.0)) print 'Timeout:', timeout result = self.results.get(id) if result is None: self.results[id] = self yield gen.sleep(timeout) if self.results.get(id) == self: del(self.results[id]) self.write('null') self.finish() else: self.write(json.dumps(result)) self.results.pop(id, None)
def update_ports(self, service, ports): """Update ports in service or remove service if ports emtpy :param service: service to update :param ports: new ports for service :return: updated service """ name = service['metadata']['name'] namespace = service['metadata']['namespace'] if ports: data = {'spec': {'ports': ports}} rv = self.patch(name, namespace, data) raise_if_failure(rv, "Couldn't patch service ports") return rv else: rv = self.delete(name, namespace) raise_if_failure(rv, "Couldn't delete service") return None
def clear_snooze_label_if_set(github_auth, issue, snooze_label): issue_labels = {label["name"] for label in issue.get("labels", [])} if snooze_label not in issue_labels: logging.debug( "clear_snooze_label_if_set: Label {} not set on {}". format(snooze_label, issue["html_url"])) return False issue_labels.remove(snooze_label) auth = requests.auth.HTTPBasicAuth(*github_auth) r = requests.patch(issue["url"], auth=auth, json={"labels": list(issue_labels)}, headers=constants.GITHUB_HEADERS) r.raise_for_status() logging.debug( "clear_snooze_label_if_set: Removed snooze label from {}". format(issue["html_url"])) return True
def create_static(host, port, user, password, route, nexthop, insecure): """Function to create a static route on CSR1000V.""" url = "https://{h}:{p}/api/running/native/ip/route".format(h=HOST, p=PORT) headers = {'content-type': 'application/vnd.yang.data+json', 'accept': 'application/vnd.yang.data+json'} try: result = requests.patch(url, auth=(USER, PASS), data=data, headers=headers, verify=not insecure) except Exception: print(str(sys.exc_info()[0])) return -1 return result.text if result.status_code == 201: return 0 # somethine went wrong print(result.status_code, result.text) return -1
def promote_deployment(config, application, version, release, stage, execute): '''Promote deployment to new stage''' namespace = config.get('kubernetes_namespace') deployment_name = '{}-{}-{}'.format(application, version, release) info('Promoting deployment {} to {} stage..'.format(deployment_name, stage)) cluster_id = config.get('kubernetes_cluster') namespace = config.get('kubernetes_namespace') path = '/kubernetes-clusters/{}/namespaces/{}/resources'.format(cluster_id, namespace) resources_update = ResourcesUpdate() resources_update.set_label(deployment_name, 'stage', stage) response = request(config, requests.patch, path, json=resources_update.to_dict()) change_request_id = response.json()['id'] if execute: approve_and_execute(config, change_request_id) else: print(change_request_id)
def scale_deployment(config, application, version, release, replicas, execute): '''Scale a single deployment''' namespace = config.get('kubernetes_namespace') kubectl_login(config) deployment_name = '{}-{}-{}'.format(application, version, release) info('Scaling deployment {} to {} replicas..'.format(deployment_name, replicas)) resources_update = ResourcesUpdate() resources_update.set_number_of_replicas(deployment_name, replicas) cluster_id = config.get('kubernetes_cluster') namespace = config.get('kubernetes_namespace') path = '/kubernetes-clusters/{}/namespaces/{}/resources'.format(cluster_id, namespace) response = request(config, requests.patch, path, json=resources_update.to_dict()) change_request_id = response.json()['id'] if execute: approve_and_execute(config, change_request_id) else: print(change_request_id)
def save_job_source_code(base_url, token, job_id, source): headers = { 'X-Auth-Token': str(token), 'Content-Type': 'application/json' } url = '{}/jobs/{}/source-code'.format(base_url, job_id) r = requests.patch(url, data=json.dumps({'secret': token, 'source': source}), headers=headers) if r.status_code == 200: return json.loads(r.text) else: raise RuntimeError( u"Error loading data from stand: HTTP {} - {} ({})".format( r.status_code, r.text, url))
def set_settings( self, explicit_filter, allow_dms, friend_all, friend_mutual, friend_mutual_guild): settings = {} if explicit_filter is not None: if not (0 <= explicit_filter <= 2): raise ValueError("Explicit filter must be from 0 to 2.") settings["explicit_content_filter"] = explicit_filter if allow_dms is not None: settings["default_guilds_restricted"] = not allow_dms if not (friend_all is friend_mutual is friend_mutual_guild): friend_all = friend_all and friend_mutual and friend_mutual_guild settings["friend_source_flags"] = { "all": bool(friend_all), "mutual_friends": bool(friend_mutual), "mutual_guilds": bool(friend_mutual_guild), } r = self.patch("users/@me/settings", auth=True, json=settings) # Returns form errors if invalid; new settings otherwise return (r.ok, r.json())
def update_case(self, case): """ Update a case. :param case: The case to update. The case's `id` determines which case to update. :return: """ req = self.url + "/api/case/{}".format(case.id) # Choose which attributes to send update_keys = [ 'title', 'description', 'severity', 'startDate', 'owner', 'flag', 'tlp', 'tags', 'resolutionStatus', 'impactStatus', 'summary', 'endDate', 'metrics' ] data = {k: v for k, v in case.__dict__.items() if k in update_keys} try: return requests.patch(req, headers={'Content-Type': 'application/json'}, json=data, proxies=self.proxies, auth=self.auth, verify=self.cert) except requests.exceptions.RequestException: sys.exit(1)
def edit_issue(auth_token, username, repo, issue_number, issue_title, issue_body, insecure=False): """ Edit an issue. """ # https://developer.github.com/v3/issues/#edit-an-issue # Parameters (not required): # title - String containing the title of the issue. # body - String containing the body of the issue. parameters = {"title": issue_title, "body": issue_body} try: response = requests.patch("https://api.github.com/repos/{}/issues/{}".format(repo, issue_number), json=parameters, headers=auth_header(auth_token)) except requests.SSLError: print("A network error occurred.") if insecure: print("An SSL certificate error occurred.") return response.json()
def edit_issue_comment(auth_token, username, repo, comment_id, comment_body, insecure=False): """ Edits an existing comment on a specified issue. """ # https://developer.github.com/v3/issues/comments/#edit-a-comment # Note: Issue Comments are ordered by ascending ID. # Parameters: # body - Reqired string. The contents of the comment. parameters = {"body": comment_body} try: response = requests.patch("https://api.github.com/repos/{}/issues/comments/{}".format(repo, comment_id), json=parameters, headers=auth_header(auth_token)) except: print("A network error occurred.") if insecure: print("An SSL certificate error occurred.") return response.json()
def patch(*args, **kwargs): headers = { 'X-Access-Token': appconfig.get('provider_config', 'access-token'), 'X-Client-ID': appconfig.get('provider_config', 'client-id') } resp = None for _ in range(WRequests.MAX_RETRIES): try: resp = requests.patch(*args, headers=headers, **kwargs).json() except json.decoder.JSONDecodeError: resp = None else: break raise_if_error(resp) return resp
def patch(self, the_id, user_data, user_params={}, user_headers={}): strjsondata = ujson.dumps(user_data, ensure_ascii=False) resp = req.patch( self._url(self.endpoint, the_id), data=strjsondata, headers=self._headers(user_headers), params=self._params(user_params) ) if resp.status_code != 200: raise ApiError( "GET", self.endpoint, resp.status_code, resp.text) else: return self._formatreturn(resp)
def test_mock_works_with_multiple_methods(self): double = ServiceMock( service='foo', methods=['GET', 'POST', 'PATCH'], url='/foo', input_schema={'type': 'object'}, output_schema={'type': 'array'}, output=[] ) set_service_locations(dict(foo="http://localhost:1234/")) self.useFixture(double) self.assertEqual( 200, requests.post("http://localhost:1234/foo", json={}).status_code ) self.assertEqual( 200, requests.get("http://localhost:1234/foo", json={}).status_code ) self.assertEqual( 200, requests.patch("http://localhost:1234/foo", json={}).status_code )
def update_route(route_fqdn, route_name, config, logger, watcher): ipa_client = IPAClient( ipa_user = config.get('ipa_user'), ipa_password = config.get('ipa_password'), ipa_url = config.get('ipa_url'), ca_trust = config.get('ipa_ca_cert', False) ) ipa_client.create_host(route_fqdn) certificate, key = ipa_client.create_cert(route_fqdn, config.get('ipa_realm')) logger.info("[CERT CREATED]: {0}".format(route_fqdn)) logger.debug("Cert: {0}\nKey: {1}\n".format(certificate, key.exportKey().decode('UTF-8'))) req = requests.patch('https://{0}/oapi/v1/namespaces/{1}/routes/{2}'.format(watcher.config.k8s_endpoint, watcher.config.k8s_namespace, route_name), headers={'Authorization': 'Bearer {0}'.format(watcher.config.k8s_token), 'Content-Type':'application/strategic-merge-patch+json'}, data=json.dumps({'metadata': {'annotations': {'{0}.state'.format(config.get('need_cert_annotation')): 'created'}}, 'spec': {'tls': {'certificate': '-----BEGIN CERTIFICATE-----\n{0}\n-----END CERTIFICATE-----'.format( '\n'.join(certificate[i:i+65] for i in six.moves.range(0, len(certificate), 65))), 'key': '{0}'.format(key.exportKey('PEM').decode('UTF-8'))}}}), params="", verify=watcher.config.k8s_ca) logger.info("[ROUTE UPDATED]: {0}".format(route_fqdn))
def set_attributes(login: Roll20Login, character_id: str, attributes: Mapping[str, Union[str, int]], attribute_position: AttributePosition = AttributePosition.current) -> None: attribute_ids = get_attributes_ids(login=login, character_id=character_id) for attribute_name, attribute_value in attributes.items(): # Cast int values to str. attribute_value = str(attribute_value) attribute_id = attribute_ids[attribute_name] url = (f'{login.firebase_root}{login.campaign_path}/char-attribs/char/{character_id}/' f'{attribute_id}/.json?auth={login.auth_token}') response = requests.patch(url, data=json.dumps({attribute_position.value: attribute_value})) if response.status_code == requests.status_codes.codes.UNAUTHORIZED: raise Roll20PermissionError('Permission denied trying to set attribute.') else: updated_attribute = json.loads(response.text)[attribute_position.value] logger.debug(f'New {attribute_name}: {updated_attribute}')
def create_ability(login: Roll20Login, character_id: str, ability_name: str, ability_action: str, is_token_action: bool = True) -> None: new_ability = {'action': ability_action, 'name': ability_name, 'istokenaction': is_token_action} url = (f'{login.firebase_root}{login.campaign_path}/' f'char-abils/char/{character_id}.json?auth={login.auth_token}') response = requests.post(url, data=json.dumps(new_ability)) if response.status_code == requests.status_codes.codes.UNAUTHORIZED: raise Roll20PermissionError('Permission denied trying to create ability.') else: new_ability_id = json.loads(response.text)['name'] logger.debug(f'Created {ability_name} {new_ability_id}') url = (f'{login.firebase_root}{login.campaign_path}/char-abils/char/' f'{character_id}/{new_ability_id}/.json?auth={login.auth_token}') requests.patch(url, data=json.dumps({'id': new_ability_id}))
def update_latest_version(tag_name): params = { "access_token": GITHUB_TOKEN } r = requests.get(GH_BASE_URL + "/releases/tags/latest", params=params) if r.status_code == 404: release_id = _create_latest_version() elif r.status_code == 200: release_id = r.json()["id"] data = { "target_commitish": "master" } r = requests.patch(GH_BASE_URL + "/releases/{0}".format(release_id), params=params, json=data) assert r.status_code == 200 upload_url = r.json()['upload_url'].replace("{?name,label}", "") _delete_all_assets(release_id) _upload_asset(upload_url, "version", "text/plain", tag_name)
def _upload_chunk(self, data, offset, file_endpoint, headers=None, auth=None): floyd_logger.debug("Uploading %s bytes chunk from offset: %s", len(data), offset) h = { 'Content-Type': 'application/offset+octet-stream', 'Upload-Offset': str(offset), 'Tus-Resumable': self.TUS_VERSION, } if headers: h.update(headers) response = requests.patch(file_endpoint, headers=h, data=data, auth=auth) self.check_response_status(response) return int(response.headers["Upload-Offset"])
def add_or_update(self, name, config=None): # does it exist already? plugins_response = self.list() plugins_list = plugins_response.json().get('data', []) data = { "name": name, } if config is not None: data.update(config) plugin_id = self._get_plugin_id(name, plugins_list) if plugin_id is None: return requests.post(self.base_url, data, auth=self.auth) else: url = "{}/{}" . format (self.base_url, plugin_id) return requests.patch(url, data, auth=self.auth)
def _partial_update(cls, entity, parameters=None, ids={}): # pragma: no cover entity_id = getattr(entity, 'id', None) if entity_id is not None: ids['id'] = entity_id if isinstance(entity, BaseAbstractEntity): response = cls.REST_CLIENT.patch( cls.get_base_uri(cls.endpoint_single(), **ids), json=entity.get_json_data(), headers=make_headers(), params=parameters ) else: response = cls.REST_CLIENT.patch( cls.get_base_uri(cls.endpoint_single(), **ids), data=json.dumps(entity), headers=make_headers(), params=parameters ) response = cls.REST_CLIENT.handle_response(response) if cls.entity_class() is None or not issubclass(cls.entity_class(), BaseAbstractEntity): return response # pragma: no cover else: return cls.entity_class().from_dict(response.json())
def enable_survey(tower_config, job_template_id): url = 'https://{0}/api/v1/job_templates/{1}/'.format( tower_config['host'], job_template_id) headers = {'Content-type': 'application/json'} response = requests.patch( url, verify=False, auth=(tower_config['username'], tower_config['password']), headers=headers, data=json.dumps({ 'survey_enabled': True})) if response.status_code != 200: exit_failure('error enabling job template survey')
def test_list_assigned_submissions(mount, mount_dir, teacher_jwt, teacher_id): course = 'Programmeertalen' for assig in ls(mount_dir, course): for sub in ls(mount_dir, course, assig): if 'Stupid1' in sub: with open(join(mount_dir, course, assig, sub, '.cg-submission-id')) as f: sub_id = f.read().strip() r = requests.patch( f'http://localhost:5000/api/v1/submissions/{sub_id}/grader', json={'user_id': teacher_id}, headers={ 'Authorization': 'Bearer ' + teacher_jwt, } ) assert r.status_code == 204 mount(assigned_to_me=True) for assig in ls(mount_dir, course): for sub in ls(mount_dir, course, assig): assert 'Stupid1' in sub or sub[0] == '.'
def saveScore(analysis_id, score): """Save calculated score back to pit.""" a = ANALYSIS_TEMPLATE.copy() a['id'] = analysis_id a['score'] = score resp = requests.patch('/'.join([pit_url, 'analysis', str(analysis_id)]), data=json.dumps(a), headers=post_headers) resp.raise_for_status() # For an analysis item: # * Fetch the item. # * Fetch all relevant rules. # * Evaluate each rule, keeping a running total of score. # * Save score to analysis item.
def build_relationship(self, Left_TLO, Right_TLO): right_type = Right_TLO.type[:1].upper() + Right_TLO.type[1:] #handles CAPS issue submit_url = '{}/{}/{}/'.format(self.url, Left_TLO.collection, Left_TLO.get_ID()) params = {'api_key': self.api_key,'username': self.username} data = { 'action': 'forge_relationship', 'type_': Left_TLO.type, 'id_': Left_TLO.get_ID(), 'right_type': right_type, 'right_id': Right_TLO.get_ID(), 'rel_type': 'Related To', 'rel_date': datetime.datetime.now(), 'rel_reason': '', 'rel_confidence': 'low', 'get_rels': True } r = requests.patch(submit_url, params=params, data=data, verify=False) if r.status_code == 200: return json.loads(r.text) else: return None
def update(self, eid, data, token): """ Update a given Library Entry. :param eid str: Entry ID :param data dict: Attributes :param token str: OAuth token :return: True or ServerError :rtype: Bool or Exception """ final_dict = {"data": {"id": eid, "type": "libraryEntries", "attributes": data}} final_headers = self.header final_headers['Authorization'] = "Bearer {}".format(token) r = requests.patch(self.apiurl + "/library-entries/{}".format(eid), json=final_dict, headers=final_headers) if r.status_code != 200: raise ConnectionError(r.text) return True
def update(self, uid, data, token): """ Update a user's data. Requires an auth token. :param uid str: User ID to update :param data dict: The dictionary of data attributes to change. Just the attributes. :param token str: The authorization token for this user :return: True or Exception :rtype: Bool or ServerError """ final_dict = {"data": {"id": uid, "type": "users", "attributes": data}} final_headers = self.header final_headers['Authorization'] = "Bearer {}".format(token) r = requests.patch(self.apiurl + "/users/{}".format(uid), json=final_dict, headers=final_headers) if r.status_code != 200: raise ServerError return True
def maintain(self, nodename, remove=False): """ maintain node will disable or enable deployment onto the maintained node. """ node = NodeInfo(nodename) base_url = "http://deployd.lain:9003/api/constraints" operator = "Remove" if remove else "Add" if not remove: url = base_url + "?type=node&value=%s" % node.name info("PATCH %s" % url) resp = requests.patch(url) else: url = base_url + "?type=node&value=%s" % node.name info("DELETE %s" % url) resp = requests.delete(url) if resp.status_code >= 300: error("%s constraint on node %s fail: %s" % (operator, node.name, resp.text)) else: info("%s constraint on node %s success." % (operator, node.name))
def partial_update(self, resource_id, raise_exception=True, headers=None, data=None): """ Method for the partial update of resource, i.e. only a part of the resource's fields are updated. Example: PATCH endpoint/<pk>/ """ request_kwargs = { 'headers': headers, 'auth': self.auth, } request_kwargs.update(self.extract_write_data( data, raise_exception, partial=True)) r = requests.patch(self.format_endpoint(resource_id), **request_kwargs) return r
def add_or_update(self, name, config=None): # does it exist already? plugins_response = self.list() plugins_list = plugins_response.json().get('data', []) data = { "name": name, } if config is not None: data.update(config) plugin_id = self._get_plugin_id(name, plugins_list) if plugin_id is None: return requests.post(self.base_url, data) else: url = "{}/{}" . format (self.base_url, plugin_id) return requests.patch(url, data)
def test_requests_mocks(self): self.assertTrue(isinstance(requests.get, Mock)) self.assertTrue(isinstance(requests.put, Mock)) self.assertTrue(isinstance(requests.post, Mock)) self.assertTrue(isinstance(requests.patch, Mock)) self.assertTrue(isinstance(requests.delete, Mock))
def _get_url_and_header(self, path): url = self._base_url + path header = {'Content-Type': 'application/merge-patch+json', 'Accept': 'application/json'} if self.token: header.update({'Authorization': 'Bearer %s' % self.token}) return url, header
def patch_status(self, path, data): LOG.debug("Patch_status %(path)s: %(data)s", { 'path': path, 'data': data}) path = path + '/status' url, header = self._get_url_and_header(path) response = requests.patch(url, json={"status": data}, headers=header, cert=self.cert, verify=self.verify_server) if response.ok: return response.json().get('status') raise exc.K8sClientException(response.text)
def update(self, job_id, update_doc): endpoint = self.endpoint + job_id r = requests.patch(endpoint, headers=self.headers, data=json.dumps(update_doc), verify=self.verify) if r.status_code != 200: raise exceptions.ApiClientException(r) return r.json()['version']
def update(self, action_id, update_doc): endpoint = self.endpoint + action_id r = requests.patch(endpoint, headers=self.headers, data=json.dumps(update_doc), verify=self.verify) if r.status_code != 200: raise exceptions.ApiClientException(r) return r.json()['version']
def update(self, session_id, update_doc): endpoint = self.endpoint + session_id r = requests.patch(endpoint, headers=self.headers, data=json.dumps(update_doc), verify=self.verify) if r.status_code != 200: raise exceptions.ApiClientException(r) return r.json()['version']
def update(api_key_id, active): """ Enable or disable an api key with given ID """ action, url = _url((api_key_id,)) payload = { "apikey": { "active": active } } headers = api_utils.generate_headers('owner', method='PATCH', body=json.dumps(payload), action=action) try: response = requests.patch(url, json=payload, headers=headers) if response_utils.response_error(response): sys.stderr.write('Failed to %s api key with id: %s \n' % ('enable' if active else 'disable', api_key_id)) sys.exit(1) elif response.status_code == 200: sys.stdout.write('%s api key with id: %s\n' % ('Enabled' if active else 'Disabled', api_key_id)) handle_api_key_response(response) except requests.exceptions.RequestException as error: sys.stderr.write(error) sys.exit(1)
def rename_team(team_id, team_name): """ Rename team with the provided team_id. """ params = { 'team': { 'name': team_name, # as this is a patch request, it won't modify users in the team. # what we want is to update the name of the team only. 'users': [ {'id': ''} ] } } headers = api_utils.generate_headers('rw') try: response = requests.patch(_url((team_id,))[1], json=params, headers=headers) if response_utils.response_error(response): # Check response has no errors click.echo('Renaming team with id: %s failed.' % team_id, err=True) sys.exit(1) elif response.status_code == 200: click.echo("Team: '%s' renamed to: '%s'" % (team_id, team_name)) except requests.exceptions.RequestException as error: click.echo(error, err=True) sys.exit(1)
def add_user_to_team(team_id, user_key): """ Add user with the provided user_key to team with provided team_id. """ headers = api_utils.generate_headers('rw') params = {'teamid': team_id} try: response = requests.get(_url((team_id,))[1], params=params, headers=headers) if response.status_code == 200: params = { 'team': { 'name': response.json()['team']['name'], 'users': [ # we are doing a patch request here so it's safe to include the user_key # we want to add here {'id': user_key} ] } } headers = api_utils.generate_headers('rw') try: response = requests.patch(_url((team_id,))[1], json=params, headers=headers) if response_utils.response_error(response): # Check response has no errors click.echo('Adding user to team with key: %s failed.' % team_id, err=True) sys.exit(1) elif response.status_code == 200: click.echo('Added user with key: %s to team.' % user_key) except requests.exceptions.RequestException as error: click.echo(error, err=True) sys.exit(1) elif response_utils.response_error(response): click.echo('Cannot find team. Adding user to team %s failed.' % team_id, err=True) sys.exit(1) except requests.exceptions.RequestException as error: click.echo(error, err=True) sys.exit(1)
def test_handler_calls_many_times(server: FakeServer): server. \ on_("patch", "/channels"). \ response(status=204) response_0 = requests.patch(server.base_uri + "/channels") response_1 = requests.patch(server.base_uri + "/channels") assert response_0.status_code == 204 assert response_1.status_code == 204
def test_handler_called_nth_times(server: FakeServer): server. \ on_("patch", "/songs"). \ response(status=204)._3_times() response_0 = requests.patch(server.base_uri + "/songs") response_1 = requests.patch(server.base_uri + "/songs") response_2 = requests.patch(server.base_uri + "/songs") response_3 = requests.patch(server.base_uri + "/songs") assert response_0.status_code == 204 assert response_1.status_code == 204 assert response_2.status_code == 204 assert response_3.status_code == 500 assert response_3.text == "Server has not responses for [PATCH] http://localhost:8081/songs" assert response_3.headers["content-type"] == "text/plain"
def test_explicit_content_type_remove_application_json(server: FakeServer): (server. on_("patch", "/games"). response(status=200, json={"change": "content-type"}, content_type="text/plain")) response = requests.patch(server.base_uri + "/games") assert response.status_code == 200 assert response.headers["Content-Type"] == "text/plain"
def test_with_body_not_raise_error(server: FakeServer): requests.patch(server.base_uri + "/sleep", data="Good night!") expect_that(server.was_requested("patch", "/sleep"). for_the_first_time(). with_body("Good night!"))
def test_chain_of_different_with_errors(server: FakeServer): requests.patch(server.base_uri + "/users", json={"name": "new_name"}) with pytest.raises(AssertionError) as error: expect_that(server.was_requested("patch", "/users"). for_the_first_time(). with_content_type("text/plain"). with_body("old name")) assert str(error.value) == ("Expect that server was requested with [PATCH] http://localhost:8081/users.\n" "For the 1 time: with content type 'text/plain'.\n" "But for the 1 time: content type was 'application/json'.\n" "For the 1 time: with body 'old name'.\n" "But for the 1 time: body was '{\"name\": \"new_name\"}'.")