我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask_restful.marshal()。
def delete(self, model_id): """ Delete model model_id :param model_id: :return: """ # delete model and files on disk status_code, deleted_model = lda_utils.delete_model(model_id) if status_code == 404: return api_utils.prepare_error_response(404, 'Model not found.', more_info={'model_id': model_id}), 404 else: return api_utils.prepare_success_response(200, 'Model deleted.', more_info=marshal(deleted_model, api_utils.model_fields)), 200
def patch(self, model_id, topic_id): """ Edit the topic topic_id in model model_id changing some additional information (e.g. topic_label) :param model_id: :param topic_id: :return: """ parser = reqparse.RequestParser(bundle_errors=True) parser.add_argument('label', default=None, required=False, type=str, help='The human readable label of the topic.') parser.add_argument('description', default=None, required=False, type=str, help='The human readable description of the topic.') args = parser.parse_args() if args['label'] is not None or args['description'] is not None: topic = db_utils.update_topic(model_id, int(topic_id), args['label'], args['description']) if topic is None: return api_utils.prepare_error_response(500, 'Error during the update, check the provided topic id.') else: return api_utils.prepare_success_response(200, 'Topic updated.', data=marshal(topic, api_utils.topic_fields)) else: return api_utils.prepare_error_response(500, 'Provide the label or the description to set.')
def make_response(data, marshal_table, cursors=None): if util.is_iterable(data): response = { 'status': 'success', 'count': len(data), 'now': datetime.utcnow().isoformat(), 'result': map(lambda l: flask_restful.marshal(l, marshal_table), data), } if cursors: if isinstance(cursors, dict): if cursors.get('next'): response['next_cursor'] = cursors['next'] response['next_url'] = util.generate_next_url(cursors['next']) if cursors.get('prev'): response['prev_cursor'] = cursors['prev'] response['prev_url'] = util.generate_next_url(cursors['prev']) else: response['next_cursor'] = cursors response['next_url'] = util.generate_next_url(cursors) return util.jsonpify(response) return util.jsonpify({ 'status': 'success', 'now': datetime.utcnow().isoformat(), 'result': flask_restful.marshal(data, marshal_table), })
def handle_result(result, rsc): logdebug(result) if result or result == []: try: return marshal(result, rsc.marshal_with(), rsc.envelope()) except AttributeError as e: if "'NoneType' object has no attribute 'items'" in str(e): loginfo(e) try: result = to_dict(result) loginfo(result) return marshal(result, rsc.marshal_with(), rsc.envelope()) except AttributeError as e: if isinstance(result, dict): logdebug("result is already dict") return result else: loginfo(str(e)) return {rsc.envelope(): result} else: raise AttributeError(e)
def marshal(self): """Get REST API marshalled fields as an orderedDict Returns: OrderedDict of fields defined by marshal_fields """ marshal_fields = { 'type': fields.String(attribute='__class__.__name__'), 'id': fields.Integer(attribute='appinterface.id'), 'name': fields.String, 'iothost': fields.String, 'keyname': fields.String, 'keyvalue': fields.String, 'poll_interval': fields.Integer, 'started': fields.Boolean, } return marshal(self, marshal_fields)
def get(self, appeui): """Method to get all app properties""" try: # Return a 404 if the application is not found. app = yield Application.find(where=['appeui = ?', appeui], limit=1) if app is None: abort(404, message={'error': "Application {} doesn't exist." .format(euiString(appeui))}) # Get the properties properties = yield app.properties.get() if properties is None: returnValue({}) data = {} for i,p in enumerate(properties): data[i] = marshal(p, self.fields) returnValue(data) except TimeoutError: log.error("REST API timeout retrieving application {appeui} " "properties", appeui=euiString(appeui))
def get(self): """Method to get all application interfaces""" try: interfaces = interfaceManager.getAllInterfaces() if interfaces is None: returnValue({}) marshal_fields = { 'type': fields.String(attribute='__class__.__name__'), 'id': fields.Integer(attribute='appinterface.id'), 'name': fields.String } data = {} for i,interface in enumerate(interfaces): data[i] = marshal(interface, marshal_fields) returnValue(data) yield except TimeoutError: log.error("REST API timeout retrieving application interfaces")
def get(self, appeui): """Method to handle application GET requests Args: appeui (int): Application EUI """ try: a = yield Application.find(where=['appeui = ?', appeui], limit=1) # Return a 404 if not found. if a is None: abort(404, message={'error': "Application {} doesn't exist." .format(euiString(appeui))}) data = marshal(a, self.fields) data['properties'] = yield self.getProperties(a) returnValue(data) except TimeoutError: log.error("REST API timeout retrieving application {appeui}", appeui=euiString(appeui))
def get(self, deveui): """Method to handle device GET request Args: deveui (int): Device deveui """ try: d = yield Device.find(where=['deveui = ?', deveui], limit=1) # Return a 404 if not found. if d is None: abort(404, message={'error': "Device {} doesn't exist". format(euiString(deveui))}) returnValue(marshal(d, self.fields)) except TimeoutError: log.error("REST API timeout for device GET request")
def get(self, org_id, location_id, role_id, shift_id): shift = Shift2.query.get(shift_id) allow_past = g.current_user.is_sudo( ) or g.current_user.is_org_admin_or_location_manager(org_id, location_id) within_caps, exceeds_caps = shift.get_all_eligible_users( allow_past=allow_past) marshal_within_caps = map(lambda user: marshal(user, user_fields), within_caps) marshal_exceeds_caps = map(lambda user: marshal(user, user_fields), exceeds_caps) for user in marshal_within_caps: user["within_caps"] = True for user in marshal_exceeds_caps: user["within_caps"] = False return {API_ENVELOPE: marshal_within_caps + marshal_exceeds_caps}
def get(self, org_id, location_id, role_id, user_id): user = User.query.get_or_404(user_id) rtu = RoleToUser.query.filter_by( user_id=user_id, role_id=role_id).first() if user is None: abort(404) if rtu is None: abort(404) data = {} data.update(marshal(user, user_fields)) data.update(marshal(rtu, role_to_user_fields)) return { API_ENVELOPE: data, "resources": ["timeclocks", "timeoffrequests"], }
def get(self): parser = reqparse.RequestParser() parser.add_argument("offset", type=int, default=0) parser.add_argument("limit", type=int, default=25) args = parser.parse_args() offset = args["offset"] limit = args["limit"] response = { "offset": offset, "limit": limit, API_ENVELOPE: [], } organizations = Organization.query response[API_ENVELOPE] = map( lambda organization: marshal(organization, organization_fields), organizations.limit(limit).offset(offset).all()) return response
def post(self): parser = reqparse.RequestParser() parser.add_argument("name", type=str) parser.add_argument("day_week_starts", type=str, default="monday") parser.add_argument("plan", type=str, default="flex-v1") parser.add_argument("trial_days", type=int, default=14) parameters = parser.parse_args(strict=True) if parameters.get("day_week_starts") not in DAYS_OF_WEEK: return {"message": "day_week_starts is not valid"}, 400 if parameters.get("trial_days") < 0: return {"messages": "trial_days cannot be less than 0"} o = Organization( name=parameters.get("name"), day_week_starts=parameters.get("day_week_starts"), trial_days=parameters.get("trial_days")) db.session.add(o) try: db.session.commit() return marshal(o, organization_fields), 201 except: abort(500)
def get(self): """ Return all queued calculations """ response = {} response[API_ENVELOPE] = {} for state in ['chomp-queue', 'chomp-processing']: data = Schedule2.query.join(Role)\ .join(Location)\ .join(Organization)\ .filter(Schedule2.state == state, Organization.active == True)\ .all() response[API_ENVELOPE][state] = map( lambda schedule: marshal(schedule, tasking_schedule_fields), data) return response
def post(self): try: parser = reqparse.RequestParser() parser.add_argument('filter_params', type=str) args = parser.parse_args() filter_params = json.loads(args['filter_params']) rules = filter_params['rules'] queryObject = filterDatasetQueryObjectWithRules(Dataset.query,rules) result = marshal(queryObject.all(), fullDatasetFields) if queryObject.count()>user_n_threshold: return {'error':'you are trying to get metadata for too many datasets at once. Please query the database for '+str(user_n_threshold)+' or fewer datasets at a time'} else: return {"count_matching_datasets":len(result), "filter_params":filter_params, "datasets":result} except Exception as e: return {'error': str(e)}
def post(self): try: parser = reqparse.RequestParser() parser.add_argument('metaseek_ids', type=str) args = parser.parse_args() metaseek_ids = json.loads(args['metaseek_ids']) #check if the number of ids you're looking up is below an acceptable threshold; otherwise return error if len(metaseek_ids)>user_n_threshold: return {'error':'you are trying to get metadata for too many datasets at once. Please query the database for '+str(user_n_threshold)+' or fewer datasets at a time'} else: queryObject = filterQueryByRule(Dataset,Dataset.query,'id',8,metaseek_ids) result = marshal(queryObject.all(), fullDatasetFields) return {"count_matching_datasets":len(result), "datasets":result} except Exception as e: return {'error': str(e)} # End route functions # Declare routing
def post(self): """Serve POST requests.""" try: user = User( email=request.form.get('email', ''), password=request.form.get('password', '') ) except ValidationError as error: return {'errors': error.message}, 400 db.session.add(user) db.session.commit() login_user(user) return marshal(user, user_fields), 201
def get(self,person_id): n=2 occurs=[] grams_arr=[] sixgrams = ngrams(str_read.split(), n) for grams in sixgrams: #print str(grams) x=NGram.compare('{}'.format(person_id),str(grams)) occurs.append(x) grams_arr.append(str(grams)) main_fields={'occurs':fields.String,"word":fields.String} datas={'occurs':"{}".format(max(occurs)*1000),'word':"{}".format(grams_arr[occurs.index(max(occurs))])} x=marshal(datas,main_fields) #json.dumps(marshal(datas,main_fields)) return x
def get(self,person_id): n=2 occurs=[] grams_arr=[] sixgrams = ngrams(str_read.split(), n) for grams in sixgrams: #print str(grams) x=NGram.compare('{}'.format(person_id.decode('latin-1')),str(grams)) occurs.append(x) grams_arr.append(str(grams)) main_fields={'occurs':fields.String,"word":fields.String} datas={'occurs':"{}".format(max(occurs)*1000),'word':"{}".format(grams_arr[occurs.index(max(occurs))])} x=marshal(datas,main_fields) #json.dumps(marshal(datas,main_fields)) return x
def get(self,paragraph): word_true=[] paragraph_arr=paragraph.split() for i in paragraph_arr: for db_word in r_server.scan_iter(): if i== db_word: word_true.append(i) break #accuracy=paragraph[:]-word_true[:] main_fields={'true_words':fields.String,"all_words":fields.String,"accuracy":fields.Integer} diff=set(paragraph_arr)-set(word_true) accuracy=100-len(diff)*10 datas={'true_words':word_true,"all_words":paragraph,"accuracy":accuracy} x=marshal(datas,main_fields) return x
def _assert_db_effect(self, model, key, fields, expected): with self.app.app_context(): obj = model.query.get(key) self.assertIsNotNone(obj) marshalled = flask_restful.marshal(obj, fields) self.assertEqual(expected, marshalled)
def getResourceURL(endpoint, selector_name = None, selector_value = None, absolute = False): if selector_name is None and selector_value is None: selector_name = "dummy_RDCH106" selector_value = "dummy_RDCH106" uri_field = {'url': fields.Url(endpoint)} selector = {selector_name: selector_value} if absolute: return getDomain(request) + marshal(selector, uri_field)["url"] else: return marshal(selector, uri_field)["url"]
def get(self): """ Retrieve all available models from db. :return: """ models = {'models': db_utils.get_all_models()} # models = api_utils.filter_only_exposed(models, config.exposed_fields['models']) return api_utils.prepare_success_response(200, 'Models retrieved.', marshal(models, api_utils.models_fields)), 200
def get(self, model_id): """ Get all info related to model model_id :param model_id: :return: """ model = db_utils.get_model(model_id) if model is None: return jsonify(api_utils.prepare_error_response(404, 'Model id not found.', more_info={'model_id': model_id})) return api_utils.prepare_success_response(200, 'Model retrieved.', marshal(model, api_utils.model_fields)), 200
def get(self, model_id, topic_id=None): """ Get all documents that have a topic assignment in model model_id :param model_id: :return: """ topic_weight = 0.0 if topic_id is not None: topic_id = int(topic_id) parser = reqparse.RequestParser(bundle_errors=True) parser.add_argument('threshold', default=0.0, required=False, type=float, help='The minimum probability that the specified topic should ' 'have to consider that document as related.') args = parser.parse_args() topic_weight = args['threshold'] data = {'documents': db_utils.get_all_documents(model_id, topic_id, topic_weight)} # data = api_utils.filter_only_exposed(data, config.exposed_fields['documents']) response = 'Documents retrieved.' return api_utils.prepare_success_response(200, response, marshal(data, api_utils.documents_fields)['documents'])
def get(self, model_id, document_id): """ Get all info about documents and topics assigned to document document_id in model model_id :param model_id: :param document_id: string, the document id :return: """ parser = reqparse.RequestParser(bundle_errors=True) parser.add_argument('threshold', default=config.defaults['minimum_threshold']['value'], required=False, type=float, help='The minimum probability that a topic should ' 'have to be returned as related to the document.') args = parser.parse_args() threshold = args['threshold'] data = db_utils.get_document(model_id, str(document_id), topics_threshold=threshold) if data is None: return api_utils.prepare_error_response(400, 'Document not found', more_info={'model_id': model_id, 'document_id': document_id}) data['threshold'] = threshold data['model_id'] = model_id return api_utils.prepare_success_response(200, 'Document retrieved.', marshal(data, api_utils.document_fields))
def search(self, model_id): """ Extracts topics from a specified query :param model_id: :return: """ parser = reqparse.RequestParser(bundle_errors=True) parser.add_argument('threshold', default=0.0, required=False, type=float, help='The minimum probability that a topic should have to be returned as related to the query string.') parser.add_argument('text', default=None, required=True, type=str, help='The query to assign topics to.') args = parser.parse_args() if args['text'].strip() is not None: data = {'model_id': model_id, 'threshold': args['threshold'], 'textual_query': args['text']} topics_assignment = lda_utils.assign_topics_for_query(model_id, args['text'], args['threshold']) if topics_assignment is None: response = "Error during topic extraction. Check logs." response_code = 500 else: list_of_da = lda_utils.convert_topic_assignment_to_dictionary(topics_assignment) data['assigned_topics'] = sorted(list_of_da[0]['assigned_topics'], reverse=True, key=lambda d:d['topic_weight']) response_code = 200 response = "Topics extracted." marshalled = marshal(data, api_utils.textual_query_fields) else: response_code = 500 marshalled = None response = "The text field is empty." if response_code == 200: return api_utils.prepare_success_response(response_code, response, marshalled) else: return api_utils.prepare_error_response(response_code, response, marshalled)
def get(self, model_id, topic_id): """ Get all info related to the topic topic_id in model model_id :param model_id: :param topic_id: :return: """ data = db_utils.get_topic(model_id, int(topic_id)) marshalled = marshal(data, api_utils.topic_fields) response = "Topic retrieved." return api_utils.prepare_success_response(200, response, marshalled)
def get(self, domain_name=None): """ Return a listing of all public domains resolved by this resolver """ if domain_name: # Include DNS records when an individual domain is requested return marshal(Domain.get_or_404(domain_name), domain_fields_with_records), 200 all_domains = Domain.query.filter_by(public=True, deleted=False).all() return marshal(all_domains, domain_fields), 200
def on_join(data): if current_app.config["AUTH"] == Config.NONE: user = User("Gandalf", superadmin=True) else: token = data.get('jwt') if not token: disconnect() return try: payload = LoginService.parse_api_token_direct(token) except DecodeError: disconnect() return except ExpiredSignature: disconnect() return user = User.query.filter_by(username=payload["username"]).scalar() printers = user.get_accessible_printers() for printer in printers: join_room(str(printer.id)) datatype = { 'id': fields.Integer, 'name': fields.String, 'group': fields.List( fields.Nested({ 'name': fields.String }) ) } emit("printers", marshal(printers, datatype))
def marshal(self): """Get REST API marshalled fields as an orderedDict Returns: OrderedDict of fields defined by marshal_fields """ marshal_fields = { 'type': fields.String(attribute='__class__.__name__'), 'id': fields.Integer(attribute='appinterface.id'), 'name': fields.String, 'started': fields.Boolean } return marshal(self, marshal_fields)
def get(self): """Method to handle system configuration GET requests""" try: config = yield Config.find(limit=1) # Return a 404 if not found. if config is None: abort(404, message={'error': "Could not get the system configuration"}) returnValue(marshal(config, self.fields)) except TimeoutError: log.error("REST API timeout retrieving application {appeui}", appeui=euiString(appeui))
def get(self, appeui): """Method to handle application property GET requests Args: appeui (int): Application EUI port (int): Application property port """ try: app = yield Application.find(where=['appeui = ?', appeui], limit=1) # Return a 404 if not found. if app is None: abort(404, message={'error': "Application {} doesn't exist." .format(euiString(appeui))}) port = self.args['port'] p = yield AppProperty.find(where=['application_id = ? AND port = ?', app.id, port]) if p is None: abort(404, message={'error': "Application property doesn't exist."}) data = marshal(p, self.fields) returnValue(data) except TimeoutError: log.error("REST API timeout get request for application {appeui} " "property {port}", appeui=euiString(appeui), port=port)
def getProperties(self, app): """Get and marshal the application properties""" # Get the properties props = yield app.properties.get() # Marshal data = {} for i,p in enumerate(props): data[i] = marshal(p, self.pfields) returnValue(data)
def get(self): """Method to get all applications""" try: apps = yield Application.all() if apps is None: returnValue({}) data = {} for i,a in enumerate(apps): data[i] = marshal(a, self.fields) data[i]['properties'] = yield self.getProperties(a) returnValue(data) except TimeoutError: log.error("REST API timeout retrieving application {appeui}", appeui=euiString(appeui))
def get(self, host): """Method to handle gateway GET requests""" try: g = yield Gateway.find(where=['host = ?', host], limit=1) # Return a 404 if not found. if g is None: abort(404, message={'error': "Gateway {} doesn't exist.".format(host)}) returnValue(marshal(g, self.fields)) except TimeoutError: log.error("REST API timeout retrieving gateway {host}", host=host)
def get(self): """Method to get all gateways""" try: gateways = yield Gateway.all() if gateways is None: returnValue({}) data = {} for i,g in enumerate(gateways): data[i] = marshal(g, self.fields) returnValue(data) except TimeoutError: # Exception returns 500 to client log.error("REST API timeout retrieving all gateways")
def get(self, org_id, location_id): """ List all managers in this location""" response = { API_ENVELOPE: [], } location = Location.query.get_or_404(location_id) response[API_ENVELOPE] = map( lambda manager: marshal(manager, user_fields), location.managers.all()) return response
def get(self, org_id, location_id, user_id): user = User.query.get_or_404(user_id) if not user.is_location_manager(location_id): return {"message": "User does not exist or not a manager"}, 404 response = { API_ENVELOPE: marshal(user, user_fields), "resources": [], } return response
def get(self, org_id, location_id): response = { API_ENVELOPE: {}, "resources": [ "attendance", "managers", "shifts", "timeclocks", "timeoffrequests", "roles" ], } parser = reqparse.RequestParser() parser.add_argument("recurse", type=inputs.boolean, default=False) parser.add_argument("archived", type=inputs.boolean) args = parser.parse_args() args = dict((k, v) for k, v in args.iteritems() if v is not None) location = Location.query.get_or_404(location_id) response[API_ENVELOPE] = marshal(location, location_fields) if args["recurse"]: roles_query = Role.query.filter_by(location_id=location_id) if "archived" in args: roles_query = roles_query.filter_by(archived=args["archived"]) roles = roles_query.all() response[API_ENVELOPE].update({ "roles": map(lambda role: marshal(role, role_fields), roles) }) # also include managers for the location managers = User.query.join(User.manager_of).filter( Location.id == location_id).all() response[API_ENVELOPE].update({ "managers": map(lambda manager: marshal(manager, user_fields), managers) }) return response
def get(self, org_id, location_id, role_id): response = { API_ENVELOPE: {}, "resources": ["recurringshifts", "schedules", "shifts", "users"], } parser = reqparse.RequestParser() parser.add_argument("recurse", type=inputs.boolean, default=False) parser.add_argument("archived", type=inputs.boolean) args = parser.parse_args() args = dict((k, v) for k, v in args.iteritems() if v is not None) role = Role.query.get_or_404(role_id) response[API_ENVELOPE] = marshal(role, role_fields) if args["recurse"]: rtu_query = RoleToUser.query.filter_by(role_id=role_id) if "archived" in args: rtu_query = rtu_query.filter_by(archived=args["archived"]) members = rtu_query.all() memberships = [] for member in members: rtu = marshal(member, role_to_user_fields) rtu.update(marshal(member.user, user_fields)) memberships.append(rtu) response[API_ENVELOPE].update({"users": memberships}) return response
def get(self, org_id, location_id, role_id): """ get recurring shifts for a role. can optionally filter by user_id """ parser = reqparse.RequestParser() parser.add_argument("user_id", type=int) parameters = parser.parse_args() # Filter out null values parameters = dict((k, v) for k, v in parameters.iteritems() if v is not None) recurring_shifts_query = RecurringShift.query.filter_by( role_id=role_id) if "user_id" in parameters: user_id = None if parameters[ "user_id"] == constants.UNASSIGNED_USER_ID else parameters[ "user_id"] recurring_shifts_query = recurring_shifts_query.filter_by( user_id=user_id) return { constants.API_ENVELOPE: map(lambda recurring_shift: marshal(recurring_shift, recurring_shift_fields), recurring_shifts_query.all()) }
def get(self, org_id, location_id, role_id, user_id, time_off_request_id): """ returns a specific time off request record """ time_off_request = TimeOffRequest.query.get_or_404(time_off_request_id) return { API_ENVELOPE: marshal(time_off_request, time_off_request_fields), "resources": [], }
def get(self, org_id, location_id, role_id): """ List all members in this role """ parser = reqparse.RequestParser() parser.add_argument("archived", type=inputs.boolean) parameters = parser.parse_args(strict=True) parameters = dict((k, v) for k, v in parameters.iteritems() if v is not None) response = { API_ENVELOPE: [], } users_query = RoleToUser.query.filter_by(role_id=role_id) # by default, only include active users if "archived" in parameters: users_query = users_query.filter_by( archived=parameters["archived"]) members = users_query.all() for member in members: datum = {} datum.update(marshal(member.user, user_fields)) datum.update(marshal(member, role_to_user_fields)) response[API_ENVELOPE].append(datum) return response
def get(self, org_id, location_id, role_id, schedule_id): response = { API_ENVELOPE: {}, "resources": ["preferences", "shifts", "timeclocks", "timeoffrequests"], } schedule = Schedule2.query.get_or_404(schedule_id) schedule = marshal(schedule, schedule_fields) response[API_ENVELOPE] = schedule return response
def get(self, org_id, location_id, role_id, schedule_id): """ returns all time off request data that correlates to the timespan of a given schedule """ parser = reqparse.RequestParser() parser.add_argument("user_id", type=int) parameters = parser.parse_args() # Filter out null values parameters = dict((k, v) for k, v in parameters.iteritems() if v is not None) # get schedule object schedule = Schedule2.query.get_or_404(schedule_id) # prepare query time_off_requests = TimeOffRequest.query \ .join(RoleToUser) \ .filter( RoleToUser.role_id == role_id, TimeOffRequest.start >= schedule.start, TimeOffRequest.start < schedule.stop ) # add user id if optionally added if "user_id" in parameters: time_off_requests = time_off_requests\ .filter( RoleToUser.user_id == parameters.get("user_id") ) return { API_ENVELOPE: map(lambda time_off_request: marshal(time_off_request, time_off_request_fields), time_off_requests.all()) }
def get(self, org_id, location_id, role_id, schedule_id): """ returns all timeclock data that correlates to the timespan of a given schedule """ parser = reqparse.RequestParser() parser.add_argument("user_id", type=int) parameters = parser.parse_args() # Filter out null values parameters = dict((k, v) for k, v in parameters.iteritems() if v is not None) # get schedule object schedule = Schedule2.query.get_or_404(schedule_id) # prepare query timeclocks = Timeclock.query \ .filter_by(role_id=role_id) \ .filter(Timeclock.start >= schedule.start) \ .filter(Timeclock.start < schedule.stop) # add user id if optionally added if "user_id" in parameters: timeclocks = timeclocks.filter_by( user_id=parameters.get("user_id")) return { API_ENVELOPE: map(lambda timeclock: marshal(timeclock, timeclock_fields), timeclocks.all()) }
def get(self, org_id, location_id, role_id, schedule_id): response = { API_ENVELOPE: [], } preferences = Preference.query.filter_by(schedule_id=schedule_id).all() response[API_ENVELOPE] = map( lambda preference: marshal(preference, preference_fields), preferences) return response
def post(self, org_id): parser = reqparse.RequestParser() parser.add_argument("name", type=str) parser.add_argument( "timezone", type=str, default=current_app.config.get("DEFAULT_TIMEZONE")) parameters = parser.parse_args(strict=True) timezone = parameters.get("timezone") try: pytz.timezone(timezone) except pytz.exceptions.UnknownTimeZoneError as zone: return {"message": "Unknown timezone specified: %s" % zone}, 400 l = Location( name=parameters.get("name"), organization_id=org_id, timezone=timezone) db.session.add(l) try: db.session.commit() return marshal(l, location_fields), 201 except Exception as exception: db.session.rollback() current_app.logger.exception(str(exception)) abort(400)
def get(self, org_id): """ List all admins in this organization """ response = { API_ENVELOPE: [], } org = Organization.query.get_or_404(org_id) response[API_ENVELOPE] = map(lambda admin: marshal(admin, user_fields), org.admins.all()) return response