我们从Python开源项目中,提取了以下42个代码示例,用于说明如何使用http.client.OK。
def test_validate_image_status_before_upload_ok_v1(self): mock_conn = mock.Mock() fake_url = 'http://fake_host/fake_path/fake_image_id' mock_check_resp_status_and_retry = self.mock_patch_object( self.glance, 'check_resp_status_and_retry') mock_head_resp = mock.Mock() mock_head_resp.status = httplib.OK mock_head_resp.read.return_value = 'fakeData' mock_head_resp.getheader.return_value = 'queued' mock_conn.getresponse.return_value = mock_head_resp self.glance.validate_image_status_before_upload_v1( mock_conn, fake_url, extra_headers=mock.Mock()) self.assertTrue(mock_conn.getresponse.called) self.assertEqual(mock_head_resp.read.call_count, 2) self.assertFalse(mock_check_resp_status_and_retry.called)
def test_validate_image_status_before_upload_req_head_exception_v1(self): mock_conn = mock.Mock() mock_conn.request.side_effect = Fake_HTTP_Request_Error() fake_url = 'http://fake_host/fake_path/fake_image_id' mock_head_resp = mock.Mock() mock_head_resp.status = httplib.OK mock_head_resp.read.return_value = 'fakeData' mock_head_resp.getheader.return_value = 'queued' mock_conn.getresponse.return_value = mock_head_resp self.assertRaises(self.glance.RetryableError, self.glance.validate_image_status_before_upload_v1, mock_conn, fake_url, extra_headers=mock.Mock()) mock_conn.request.assert_called_once() mock_head_resp.read.assert_not_called() mock_conn.getresponse.assert_not_called()
def test_validate_image_status_before_upload_ok_v2_using_api_service(self): mock_conn = mock.Mock() fake_url = 'http://fake_host:fake_port/fake_path/fake_image_id' mock_check_resp_status_and_retry = self.mock_patch_object( self.glance, 'check_resp_status_and_retry') mock_head_resp = mock.Mock() mock_head_resp.status = httplib.OK mock_head_resp.read.return_value = '{"status": "queued"}' mock_conn.getresponse.return_value = mock_head_resp fake_extra_headers = mock.Mock() expected_api_path = '/v2/images/%s' % 'fake_image_id' self.glance.validate_image_status_before_upload_v2( mock_conn, fake_url, fake_extra_headers, expected_api_path) self.assertTrue(mock_conn.getresponse.called) self.assertEqual( mock_head_resp.read.call_count, 2) self.assertFalse(mock_check_resp_status_and_retry.called) mock_conn.request.assert_called_with('GET', '/v2/images/fake_image_id', headers=fake_extra_headers)
def test_validate_image_status_before_upload_ok_v2_using_uwsgi(self): mock_conn = mock.Mock() fake_url = 'http://fake_host/fake_path/fake_image_id' mock_check_resp_status_and_retry = self.mock_patch_object( self.glance, 'check_resp_status_and_retry') mock_head_resp = mock.Mock() mock_head_resp.status = httplib.OK mock_head_resp.read.return_value = '{"status": "queued"}' mock_conn.getresponse.return_value = mock_head_resp fake_extra_headers = mock.Mock() fake_patch_path = 'fake_patch_path' self.glance.validate_image_status_before_upload_v2( mock_conn, fake_url, fake_extra_headers, fake_patch_path) self.assertTrue(mock_conn.getresponse.called) self.assertEqual( mock_head_resp.read.call_count, 2) self.assertFalse(mock_check_resp_status_and_retry.called) mock_conn.request.assert_called_with('GET', 'fake_patch_path', headers=fake_extra_headers)
def fetch_from_url(url, config, data=None, handlers=None): """Returns data retrieved from a URL. @param url: URL to attempt to open @type url: basestring @param config: SSL context configuration @type config: Configuration @return data retrieved from URL or None """ return_code, return_message, response = open_url(url, config, data=data, handlers=handlers) if return_code and return_code == http_client_.OK: return_data = response.read() response.close() return return_data else: raise URLFetchError(return_message)
def fetch_from_url_to_file(url, config, output_file, data=None, handlers=None): """Writes data retrieved from a URL to a file. @param url: URL to attempt to open @type url: basestring @param config: SSL context configuration @type config: Configuration @param output_file: output file @type output_file: basestring @return: tuple ( returned HTTP status code or 0 if an error occurred returned message boolean indicating whether access was successful) """ return_code, return_message, response = open_url(url, config, data=data, handlers=handlers) if return_code == http_client_.OK: return_data = response.read() response.close() outfile = open(output_file, "w") outfile.write(return_data) outfile.close() return return_code, return_message, return_code == http_client_.OK
def fetch_stream_from_url(url, config, data=None, handlers=None): """Returns data retrieved from a URL. @param url: URL to attempt to open @type url: basestring @param config: SSL context configuration @type config: Configuration @param data: HTTP POST data @type data: str @param handlers: list of custom urllib2 handlers to add to the request @type handlers: iterable @return: data retrieved from URL or None @rtype: file derived type """ return_code, return_message, response = open_url(url, config, data=data, handlers=handlers) if return_code and return_code == http_client_.OK: return response else: raise URLFetchError(return_message)
def test_get_picture__success(self): test_utils.setup_images() item = Item.create(**TEST_ITEM) picture = Picture.create(item=item, **TEST_PICTURE) open("{path}/{picture_uuid}.jpg".format( path=utils.get_image_folder(), picture_uuid=picture.uuid), "wb") resp = self.app.get('/pictures/{picture_uuid}'.format( picture_uuid=picture.uuid)) assert resp.status_code == client.OK test_picture = TEST_PICTURE.copy() test_picture['item_uuid'] = item.uuid assert resp.data == b'' assert resp.headers['Content-Type'] == 'image/jpeg' test_utils.clean_images()
def post(self): request_data = request.get_json(force=True) if 'email' not in request_data or 'password' not in request_data: abort(client.BAD_REQUEST) email = request_data['email'] password = request_data['password'] try: user = User.get(User.email == email) except User.DoesNotExist: abort(client.BAD_REQUEST) if not user.verify_password(password): abort(client.UNAUTHORIZED) login_user(user) return generate_response({}, client.OK)
def get(self): query = request.args.get('query') limit = int(request.args.get('limit', -1)) min_limit, max_limit = 0, 100 limit_in_range = limit > min_limit and limit <= max_limit if query is not None and limit_in_range: matches = Item.search(query, Item.select(), limit) return generate_response(Item.json_list(matches), client.OK) def fmt_error(msg): return {'detail': msg} errors = {"errors": []} if not query: errors['errors'].append(fmt_error('Missing query.')) if not limit_in_range: msg = 'Limit out of range. must be between {} and {}. Requested: {}' errors['errors'].append( fmt_error(msg.format(min_limit, max_limit, limit))) return errors, client.BAD_REQUEST
def _is_valid_service(self): """Checks if the service url is a valid url or not. Returns: True - if the service url is valid False - if the service url is invalid Raises: requests Connection Error -- requests.exceptions.ConnectionError requests Timeout Error -- requests.exceptions.Timeout """ try: response = requests.get(self._commcell_object._web_service, timeout=184) # Valid service if the status code is 200 and response is True return response.status_code == httplib.OK and response.ok except (requests.exceptions.ConnectionError, requests.exceptions.Timeout) as error: raise error
def _logout(self): """Posts a logout request to the server. Returns: str - response string from server upon logout success """ flag, response = self.make_request('POST', self._commcell_object._services['LOGOUT']) if flag: self._commcell_object._headers['Authtoken'] = None if response.status_code == httplib.OK: return response.text else: return 'Failed to logout the user' else: return 'User already logged out'
def download_pdf(self, qbbo, item_id): url = self.api_url + "/company/{0}/{1}/{2}/pdf".format(self.company_id, qbbo.lower(), item_id) if self.session is None: self.create_session() headers = { 'Content-Type': 'application/pdf', 'Accept': 'application/pdf, application/json', } response = self.session.request("GET", url, True, self.company_id, headers=headers) if response.status_code is not httplib.OK: try: json = response.json() except: raise QuickbooksException("Error reading json response: {0}".format(response.text), 10000) self.handle_exceptions(json["Fault"]) else: return response.content
def test_upload_tarball_by_url_http_v1(self): fake_conn = mock.Mock() mock_HTTPConn = self.mock_patch_object( self.glance, '_create_connection', fake_conn) mock_validate_image = self.mock_patch_object( self.glance, 'validate_image_status_before_upload_v1') mock_create_tarball = self.mock_patch_object( self.glance.utils, 'create_tarball') mock_check_resp_status = self.mock_patch_object( self.glance, 'check_resp_status_and_retry') self.glance._create_connection().getresponse = mock.Mock() self.glance._create_connection().getresponse().status = httplib.OK fake_extra_headers = {} fake_properties = {} fake_endpoint = 'http://fake_netloc/fake_path' expected_url = "%(glance_endpoint)s/v1/images/%(image_id)s" % { 'glance_endpoint': fake_endpoint, 'image_id': 'fake_image_id'} self.glance._upload_tarball_by_url_v1( 'fake_staging_path', 'fake_image_id', fake_endpoint, fake_extra_headers, fake_properties) self.assertTrue(mock_HTTPConn.called) mock_validate_image.assert_called_with(fake_conn, expected_url, fake_extra_headers) self.assertTrue(mock_create_tarball.called) self.assertTrue( mock_HTTPConn.return_value.getresponse.called) self.assertFalse(mock_check_resp_status.called)
def test_upload_tarball_by_url_https_v1(self): fake_conn = mock.Mock() mock_HTTPSConn = self.mock_patch_object( self.glance, '_create_connection', fake_conn) mock_validate_image = self.mock_patch_object( self.glance, 'validate_image_status_before_upload_v1') mock_create_tarball = self.mock_patch_object( self.glance.utils, 'create_tarball') mock_check_resp_status = self.mock_patch_object( self.glance, 'check_resp_status_and_retry') self.glance._create_connection().getresponse = mock.Mock() self.glance._create_connection().getresponse().status = httplib.OK fake_extra_headers = {} fake_properties = {} fake_endpoint = 'https://fake_netloc/fake_path' expected_url = "%(glance_endpoint)s/v1/images/%(image_id)s" % { 'glance_endpoint': fake_endpoint, 'image_id': 'fake_image_id'} self.glance._upload_tarball_by_url_v1( 'fake_staging_path', 'fake_image_id', fake_endpoint, fake_extra_headers, fake_properties) self.assertTrue(mock_HTTPSConn.called) mock_validate_image.assert_called_with(fake_conn, expected_url, fake_extra_headers) self.assertTrue(mock_create_tarball.called) self.assertTrue( mock_HTTPSConn.return_value.getresponse.called) self.assertFalse(mock_check_resp_status.called)
def test_update_image_meta_ok_v2_using_api_service(self): fake_conn = mock.Mock() fake_extra_headers = {'fake_type': 'fake_content'} fake_properties = {'fake_path': True} new_fake_properties = {'path': '/fake-path', 'value': "True", 'op': 'add'} fake_body = [ {"path": "/container_format", "value": "ovf", "op": "add"}, {"path": "/disk_format", "value": "vhd", "op": "add"}, {"path": "/visibility", "value": "private", "op": "add"}] fake_body.append(new_fake_properties) fake_body_json = json.dumps(fake_body) fake_headers = { 'Content-Type': 'application/openstack-images-v2.1-json-patch'} fake_headers.update(**fake_extra_headers) fake_conn.getresponse.return_value = mock.Mock() fake_conn.getresponse().status = httplib.OK expected_api_path = '/v2/images/%s' % 'fake_image_id' self.glance._update_image_meta_v2(fake_conn, fake_extra_headers, fake_properties, expected_api_path) fake_conn.request.assert_called_with('PATCH', '/v2/images/%s' % 'fake_image_id', body=fake_body_json, headers=fake_headers) fake_conn.getresponse.assert_called()
def test_update_image_meta_ok_v2_using_uwsgi_service(self): fake_conn = mock.Mock() fake_extra_headers = {'fake_type': 'fake_content'} fake_properties = {'fake_path': True} new_fake_properties = {'path': '/fake-path', 'value': "True", 'op': 'add'} fake_body = [ {"path": "/container_format", "value": "ovf", "op": "add"}, {"path": "/disk_format", "value": "vhd", "op": "add"}, {"path": "/visibility", "value": "private", "op": "add"}] fake_body.append(new_fake_properties) fake_body_json = json.dumps(fake_body) fake_headers = { 'Content-Type': 'application/openstack-images-v2.1-json-patch'} fake_headers.update(**fake_extra_headers) fake_conn.getresponse.return_value = mock.Mock() fake_conn.getresponse().status = httplib.OK expected_wsgi_path = '/fake_path/v2/images/%s' % 'fake_image_id' self.glance._update_image_meta_v2(fake_conn, fake_extra_headers, fake_properties, expected_wsgi_path) fake_conn.request.assert_called_with('PATCH', '/fake_path/v2/images/%s' % 'fake_image_id', body=fake_body_json, headers=fake_headers) fake_conn.getresponse.assert_called()
def test_validate_image_status_before_upload_rep_body_too_long_v1(self): mock_conn = mock.Mock() fake_url = 'http://fake_host/fake_path/fake_image_id' mock_head_resp = mock.Mock() mock_head_resp.status = httplib.OK mock_head_resp.read.return_value = 'fakeData longer than 8' mock_head_resp.getheader.return_value = 'queued' mock_conn.getresponse.return_value = mock_head_resp self.assertRaises(self.glance.RetryableError, self.glance.validate_image_status_before_upload_v1, mock_conn, fake_url, extra_headers=mock.Mock()) mock_conn.request.assert_called_once() mock_conn.getresponse.assert_called_once() mock_head_resp.read.assert_called_once()
def test_validate_image_status_before_upload_get_image_failed_v2(self): mock_conn = mock.Mock() mock_conn.request.side_effect = Fake_HTTP_Request_Error() fake_url = 'http://fake_host/fake_path/fake_image_id' mock_head_resp = mock.Mock() mock_head_resp.status = httplib.OK mock_conn.getresponse.return_value = mock_head_resp expected_wsgi_path = '/fake_path/v2/images/%s' % 'fake_image_id' self.assertRaises(self.glance.RetryableError, self.glance.validate_image_status_before_upload_v2, mock_conn, fake_url, mock.Mock(), expected_wsgi_path) mock_conn.request.assert_called_once() mock_head_resp.read.assert_not_called() mock_conn.getresponse.assert_not_called()
def test_get_item_pictures__success(self): item = Item.create(**TEST_ITEM) Picture.create(item=item, **TEST_PICTURE) Picture.create(item=item, **TEST_PICTURE2) resp = self.app.get('/items/{item_uuid}/pictures/'.format( item_uuid=item.uuid)) assert resp.status_code == client.OK test_utils.assert_valid_response( resp.data, EXPECTED_RESULTS['get_item_pictures__success'])
def test_get_items__success(self): Item.create(**TEST_ITEM) Item.create(**TEST_ITEM2) resp = self.app.get('/items/') assert resp.status_code == client.OK assert_valid_response(resp.data, EXPECTED_RESULTS['get_items__success'])
def test_get_item__success(self): item = Item.create(**TEST_ITEM) resp = self.app.get('/items/{item_uuid}'.format(item_uuid=item.uuid)) assert resp.status_code == client.OK assert_valid_response(resp.data, EXPECTED_RESULTS['get_item__success'])
def test_patch_change1item__success(self): item = Item.create(**TEST_ITEM) post_data = format_jsonapi_request('item', {'name': 'new-name'}) resp = self.app.patch('/items/{item_uuid}'.format(item_uuid=item.uuid), data=json.dumps(post_data), content_type='application/json') assert resp.status_code == client.OK expected_result = EXPECTED_RESULTS['patch_change1item__success'] assert_valid_response(resp.data, expected_result) assert Item.get().name == 'new-name'
def get(self, item_uuid): """Retrieve every picture of an item""" pictures = Picture.select().join(Item).where(Item.uuid == item_uuid) if pictures: data = Picture.json_list(pictures) return generate_response(data, client.OK) return None, client.NOT_FOUND
def get(self): user_addrs = list(Address.select().where( Address.user == auth.current_user)) return generate_response(Address.json_list(user_addrs), OK)
def get(self, address_uuid): try: addr = Address.get( Address.user == auth.current_user, Address.uuid == address_uuid ) return generate_response(addr.json(), OK) except Address.DoesNotExist: return None, NOT_FOUND
def patch(self, address_uuid): try: obj = Address.get(Address.user == auth.current_user, Address.uuid == address_uuid) except Address.DoesNotExist: return None, NOT_FOUND res = request.get_json(force=True) errors = Address.validate_input(res) if errors: return errors, BAD_REQUEST data = res['data']['attributes'] country = data.get('country') city = data.get('city') post_code = data.get('post_code') address = data.get('address') phone = data.get('phone') if country and country != obj.country: obj.country = country if city and city != obj.city: obj.city = city if post_code and post_code != obj.post_code: obj.post_code = post_code if address and address != obj.address: obj.address = address if phone and phone != obj.phone: obj.phone = phone obj.save() return generate_response(obj.json(), OK)
def get(self): """Retrieve every item""" data = Item.json_list(Item.select()) return generate_response(data, client.OK)
def get(self, item_uuid): """Retrieve the item specified by item_uuid""" try: item = Item.get(Item.uuid == item_uuid) return generate_response(item.json(), client.OK) except Item.DoesNotExist: return None, client.NOT_FOUND
def patch(self, item_uuid): """Edit the item specified by item_uuid""" try: obj = Item.get(Item.uuid == item_uuid) except Item.DoesNotExist: return None, client.NOT_FOUND request_data = request.get_json(force=True) errors = Item.validate_input(request_data, partial=True) if errors: return errors, client.BAD_REQUEST data = request_data['data']['attributes'] name = data.get('name') price = data.get('price') description = data.get('description') availability = data.get('availability') category = data.get('category') if name: obj.name = name if price: obj.price = price if description: obj.description = description if availability: obj.availability = availability if category: obj.category = category obj.save() return generate_response(obj.json(), client.OK)
def _apply(self, request_data, expected_response_type=None): logger.debug("Sending request\n%s", pprint.pformat(request_data)) request_name = request_data.__class__.__name__ message = common_pb2.WireMessage() message.name = 'org.apache.calcite.avatica.proto.Requests${}'.format(request_name) message.wrapped_message = request_data.SerializeToString() body = message.SerializeToString() headers = {'content-type': 'application/x-google-protobuf'} response = self._post_request(body, headers) response_body = response.read() if response.status != httplib.OK: logger.debug("Received response\n%s", response_body) if b'<html>' in response_body: parse_error_page(response_body) else: # assume the response is in protobuf format parse_error_protobuf(response_body) raise errors.InterfaceError('RPC request returned invalid status code', response.status) message = common_pb2.WireMessage() message.ParseFromString(response_body) logger.debug("Received response\n%s", message) if expected_response_type is None: expected_response_type = request_name.replace('Request', 'Response') expected_response_type = 'org.apache.calcite.avatica.proto.Responses$' + expected_response_type if message.name != expected_response_type: raise errors.InterfaceError('unexpected response type "{}"'.format(message.name)) return message.wrapped_message
def make_request(self, request_type, url, request_body=None, content_type='application/json'): params = {} if self.minorversion: params['minorversion'] = self.minorversion print "params",type (params), params if not request_body: request_body = {} if self.session is None: self.create_session() headers = { 'Content-Type': content_type, 'Accept': 'application/json' } req = self.session.request(request_type, url, True, self.company_id, headers=headers, params=params, data=request_body) try: result = req.json() except: raise QuickbooksException("Error reading json response: {0}".format(req.text), 10000) if req.status_code is not httplib.OK or "Fault" in result: self.handle_exceptions(result["Fault"]) else: return result
def make_request_query(self, request_type, url, request_body=None, content_type='application/json'): params = {} if self.minorversion: params['minorversion'] = self.minorversion params['query'] = request_body print "params",type (params), params ,type (params['query']) if not request_body: request_body = {} if self.session is None: self.create_session() headers = { 'Content-Type': content_type, 'Accept': 'application/json' } req = self.session.request(request_type, url, True, str(self.company_id), headers=headers, params=params) # req = self.session.request(request_type, url, True, self.company_id, headers=headers, params=params, data=request_body) try: result = req.json() except: raise QuickbooksException("Error reading json response: {0}".format(req.text), 10000) if req.status_code is not httplib.OK or "Fault" in result: self.handle_exceptions(result["Fault"]) else: return result
def init_app(self, app): """ Initialize the OpenAPI instance for the given app. This should be used for a deferred initialization, supporting the Flask factory pattern. Args: app (flask.Flask): The Flask app to apply this extension to. """ self.app = app for key, value in _DEFAULT_CONFIG.items(): app.config.setdefault(key, value) swagger_json_url = self._config('swagger_json_url') swagger_yaml_url = self._config('swagger_yaml_url') @self.response(OK, { 'description': 'This OpenAPI specification document.' }) @app.route(swagger_json_url) def json_handler(): """ Get the `swagger` object in JSON format. """ return jsonify(self.swagger) @self.response(OK, { 'description': 'This OpenAPI specification document.' }) @app.route(swagger_yaml_url) def yaml_handler(): """ Get the `swagger` object in YAML format. """ # YAML doesn't have an official mime type yet. # https://www.ietf.org/mail-archive/web/media-types/current/msg00688.html return yaml.dump(self.swagger, default_flow_style=False), { 'Content-Type': 'application/yaml' }