我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pyramid.response.Response()。
def start_uri(self, uri): val = run(self.send, 'add ' + uri) if val: return Response('URI Already running ' + uri) else: return print(self.lock, id(self.urls)) with self.lock: print(self.lock, id(self.urls)) uris = self._getQ() if uri in uris: print(uri, 'is already running') return Response('URI Already running') else: print('starting work for', uri) uris.add(uri) self._setQ(uris)
def update_links(self): """updates the ticket.links attribute """ link_ids = self.get_multi_integer(self.request, 'link_id') from stalker import SimpleEntity links = SimpleEntity.query.filter(SimpleEntity.id.in_(link_ids)).all() if self.request.method == 'PATCH': for link in links: if link not in self.entity.links: self.entity.links.append(link) elif self.request.method == 'POST': self.entity.links = links import transaction transaction.commit() from pyramid.response import Response return Response('Updated links of ticket %s' % self.entity_id)
def delete_links(self): """removes items from the ticket.links attribute """ link_ids = self.get_multi_integer(self.request, 'link_id') from stalker import SimpleEntity links = SimpleEntity.query.filter(SimpleEntity.id.in_(link_ids)).all() successfully_deleted_item_ids = [] for link in links: if link in self.entity.links: self.entity.links.remove(link) successfully_deleted_item_ids.append(link.id) import transaction transaction.commit() from pyramid.response import Response return Response( 'Deleted links [%s] from ticket %s' % ( ', '.join(map(str, successfully_deleted_item_ids)), self.entity_id ) )
def get_task_reviews(request): """RESTful version of getting all reviews of a task """ logger.debug('get_task_reviews is running') task_id = request.matchdict.get('id', -1) #task = Task.query.filter(Task.id == task_id).first() # if not task: # transaction.abort() # return Response('There is no task with id: %s' % task_id, 500) where_conditions = """where "Review_Tasks".id = %(task_id)s""" % { 'task_id': task_id } return get_reviews(request, where_conditions)
def get_task_reviews_count(request): """RESTful version of getting all reviews of a task """ logger.debug('get_task_reviews_count is running') task_id = request.matchdict.get('id', -1) task = Task.query.filter(Task.id == task_id).first() if not task: transaction.abort() return Response('There is no task with id: %s' % task_id, 500) where_conditions = """where "Review_Tasks".id = %(task_id)s and "Reviews_Statuses".code ='NEW' """ % {'task_id': task_id} reviews = get_reviews(request, where_conditions) return len(reviews)
def get_user_reviews_count(request): """RESTful version of getting all reviews of a task """ logger.debug('get_user_reviews_count is running') reviewer_id = request.matchdict.get('id', -1) reviewer = User.query.filter(User.id == reviewer_id).first() if not reviewer: transaction.abort() return Response('There is no user with id: %s' % reviewer_id, 500) where_conditions = """where "Reviews".reviewer_id = %(reviewer_id)s and "Reviews_Statuses".code ='NEW' """ % {'reviewer_id': reviewer_id} reviews = get_reviews(request, where_conditions) return len(reviews)
def get_project_reviews(request): """RESTful version of getting all reviews of a task """ logger.debug('get_project_reviews is running') project_id = request.matchdict.get('id', -1) project = Project.query.filter(Project.id == project_id).first() if not project: transaction.abort() return Response('There is no user with id: %s' % project_id, 500) where_conditions = 'where "Review_Tasks".project_id = %(project_id)s' %\ {'project_id': project_id} return get_reviews(request, where_conditions)
def get_project_reviews_count(request): """RESTful version of getting all reviews of a task """ logger.debug('get_project_reviews_count is running') project_id = request.matchdict.get('id', -1) # project = Project.query.filter(Project.id == project_id).first() # if not project: # transaction.abort() # return Response('There is no project with id: %s' % project_id, 500) where_conditions = """ where "Review_Tasks".project_id = %(project_id)s and "Reviews_Statuses".code = 'NEW' """ % {'project_id': project_id} reviews = get_reviews(request, where_conditions) return len(reviews)
def create_entity(self): """called when adding a User """ availability_data = self.check_availability() login_available = availability_data['login_available'] email_available = availability_data['email_available'] if login_available and email_available: return super(UserViews, self).create_entity() else: body = '' if not login_available: login_name = self.request.params.get('login') body = 'Login not available: %s' % login_name elif not email_available: email = self.request.params.get('email') body = 'Email not available: %s' % email from pyramid.response import Response return Response(body, status=500)
def user_add_list_view(self): if not "added_users" in self.request.session: return HTTPFound(location=self.request.route_path("admin_accounts")) users = self.request.session["added_users"] group = self.request.root.users[users.keys()[0]].__parent__ if "type" in self.request.GET and self.request.GET["type"].lower() == "csv": csv = "Username,Password,Group\n,,,\n" for user in users: csv += '%s,%s,%s\n' % (user, users[user], group.name) return Response( body=csv, status=200, content_type="text/csv", content_disposition="attachment" ) return { "usernames": sorted(users.keys()), "users": users, "group": group }
def blog_post(request): print("Processing {} request from {} for the HTTP service: {}, ua: {}".format( request.method, get_ip(request), request.url, request.user_agent )) data = build_dict(request) post_id = data.get('post_id') post = MemoryDb.get_post(post_id, get_ip(request)) if not post: return Response('{"error":"Post with ID not found: ' + post_id + '"}', status=404) return post ################################################################################ # POST /api/blog # { blog post data... } #
def get_post_response(dom, request): id_text = dom.find('Body/GetPost/id').text post = MemoryDb.get_post(id_text, get_ip(request)) if not post: raise exception_response(404) resp_xml = """<?xml version="1.0" encoding="utf-8"?> <soap:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"> <soap:Body> <GetPostResponse xmlns="http://tempuri.org/"> <GetPostResult> <Id>{}</Id> <Title>{}</Title> <Published>{}</Published> <Content>{}</Content> <ViewCount>{}</ViewCount> </GetPostResult> </GetPostResponse> </soap:Body> </soap:Envelope>""".format(post.id, post.title, post.published, post.content, post.view_count) return Response(body=resp_xml, content_type='text/xml')
def delete_post_response(dom, request): id_text = dom.find('Body/DeletePost/id').text post = MemoryDb.get_post(id_text, get_ip(request)) if not post: raise exception_response(404) if MemoryDb.is_post_read_only(post.id): raise exception_response(403) MemoryDb.delete_post(post, get_ip(request)) resp_xml = """<?xml version="1.0" encoding="utf-8"?> <soap:Envelope xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:soap="http://schemas.xmlsoap.org/soap/envelope/"> <soap:Body> <DeletePostResponse xmlns="http://tempuri.org/" /> </soap:Body> </soap:Envelope>""" return Response(body=resp_xml, content_type='text/xml')
def proxy_write_to_master(xom, request): """ relay modifying http requests to master and wait until the change is replicated back. """ r = proxy_request_to_master(xom, request, stream=True) # for redirects, the body is already read and stored in the ``next`` # attribute (see requests.sessions.send) if r.raw.closed and r.next: body = r.next.body else: body = r.raw.read() if r.status_code < 400: commit_serial = int(r.headers["X-DEVPI-SERIAL"]) xom.keyfs.wait_tx_serial(commit_serial) headers = clean_response_headers(r) headers[str("X-DEVPI-PROXY")] = str("replica") if r.status_code == 302: # REDIRECT # rewrite master-related location to our replica site master_location = r.headers["location"] outside_url = request.application_url headers[str("location")] = str( master_location.replace(xom.config.master_url.url, outside_url)) return Response(status="%s %s" %(r.status_code, r.reason), body=body, headers=headers)
def certificate_focus_chain(self): dbServerCertificate = self._certificate_focus() if self.request.matchdict['format'] == 'pem': self.request.response.content_type = 'application/x-pem-file' return dbServerCertificate.certificate_upchain.cert_pem elif self.request.matchdict['format'] == 'pem.txt': return dbServerCertificate.certificate_upchain.cert_pem elif self.request.matchdict['format'] in ('cer', 'crt', 'der'): as_der = lib_cert_utils.convert_pem_to_der(pem_data=dbServerCertificate.certificate_upchain.cert_pem) response = Response() if self.request.matchdict['format'] in ('crt', 'der'): response.content_type = 'application/x-x509-ca-cert' elif self.request.matchdict['format'] in ('cer', ): response.content_type = 'application/pkix-cert' response.body = as_der return response return 'chain.pem'
def certificate_focus_cert(self): dbServerCertificate = self._certificate_focus() if self.request.matchdict['format'] == 'pem': self.request.response.content_type = 'application/x-pem-file' return dbServerCertificate.cert_pem elif self.request.matchdict['format'] == 'pem.txt': return dbServerCertificate.cert_pem elif self.request.matchdict['format'] == 'crt': as_der = lib_cert_utils.convert_pem_to_der(pem_data=dbServerCertificate.cert_pem) response = Response() response.content_type = 'application/x-x509-server-cert' response.body = as_der return response return 'cert.pem' # - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
def ca_certificate_focus_raw(self): dbCaCertificate = self._ca_certificate_focus() if self.request.matchdict['format'] == 'pem': self.request.response.content_type = 'application/x-pem-file' return dbCaCertificate.cert_pem elif self.request.matchdict['format'] == 'pem.txt': return dbCaCertificate.cert_pem elif self.request.matchdict['format'] in ('cer', 'crt', 'der'): as_der = lib_cert_utils.convert_pem_to_der(pem_data=dbCaCertificate.cert_pem) response = Response() if self.request.matchdict['format'] in ('crt', 'der'): response.content_type = 'application/x-x509-ca-cert' elif self.request.matchdict['format'] in ('cer', ): response.content_type = 'application/pkix-cert' response.body = as_der return response return 'chain.?'
def bad_csrf_token(context: BadCSRFToken, request: Request): """User friendly error page about bad CSRF token.""" # Log this as a warning session = request.session token = session.get_csrf_token() logger.warn("Bad CSRF error: session: %s IP: %s cookie: %s user agent: %s", request.session.session_id, request.client_addr, token, request.user_agent) html = render('core/badcsrftoken.html', {}, request=request) resp = Response(html) resp.status_code = 400 # Hint pyramid_redis_session not to generate any session cookies for this response resp.cache_control.public = True # Make sure nothing is written or no transaction left open on 500 request.tm.abort() return resp
def activate_by_email(self, activation_code: str, location=None) -> Response: """Active a user after user after the activation email. * User clicks link in the activation email * User enters the activation code on the form by hand """ request = self.request settings = request.registry.settings user_registry = get_user_registry(request) after_activate_url = request.route_url(settings.get('websauna.activate_redirect', 'index')) login_after_activation = asbool(settings.get('websauna.login_after_activation', False)) user = user_registry.activate_user_by_email_token(activation_code) if not user: raise HTTPNotFound("Activation code not found") if login_after_activation: login_service = get_login_service(self.request.registry) return login_service.authenticate(self.request, user) else: self.request.registry.notify(RegistrationActivatedEvent(self.request, user, None)) return HTTPFound(location=location or after_activate_url)
def authenticate_credentials(self, username: str, password: str, login_source:str, location: str=None) -> Response: """Try logging in the user with username and password. This is called after the user credentials have been validated, after sign up when direct sign in after sign up is in use or after successful federated authentication. Sets the auth cookies and redirects to a post login page, which defaults to a view named 'index'. Fills in user last login time and IP data.. :param request: Current request :param user: Default login service is designed to work with UserMixin compatible user classes :param location: Override the redirect page. If none use ``websauna.login_redirect``. TODO - to be changed. :raise: AuthenticationError """ # See that our user model matches one we expect from the configuration request = self.request registry = request.registry user = self.check_credentials(username, password) return self.authenticate_user(user, login_source, location)
def reset_password(self, activation_code: str, password: str, location=None) -> Response: """Perform actual password reset operations. User has following password reset link (GET) or enters the code on a form. """ request = self.request user_registry = get_user_registry(request) user = user_registry.get_user_by_password_reset_token(activation_code) if not user: return HTTPNotFound("Activation code not found") user_registry.reset_password(user, password) messages.add(request, msg="The password reset complete. Please sign in with your new password.", kind='success', msg_id="msg-password-reset-complete") request.registry.notify(PasswordResetEvent(self.request, user, password)) request.registry.notify(UserAuthSensitiveOperation(self.request, user, "password_reset")) location = location or get_config_route(request, 'websauna.reset_password_redirect') return HTTPFound(location=location)
def get_avatar(request): """ Returns the requested avatar --- avatar: description: 'Avatar Id' in: path required: true type: string """ avatar_id = request.matchdict['avatar'] try: avatar = Avatar.objects.get(id=avatar_id) except me.DoesNotExist: raise NotFoundError() return Response(content_type=str(avatar.content_type), body=str(avatar.body))
def post_add_group_member(self): # TODO: Intitee can respond with Y/N # TODO: If invitee was sole admin of previous group (but not sole # member), ask them to select a replacement admin # TODO: Log and email msg = "" url = self.request.route_url('settings') userid = '%s_%s' % (self.gid, self.uid) headers = remember(self.request, userid) try: uid = find_user_sql(self.p['name']) except Exception as e: # TODO: return with error message msg = str(e) return HTTPFound(location=url, headers=headers) post_add_group_member_sql(self.gid, uid) return Response('OK')
def post_add_task(self): """Adds new task if tid == -1, else updates task tid.""" gid = self.gid uid = self.uid pid = self.p['pid'] tid = -1 if 'tid' not in self.p else self.p['tid'] ptid = 0 if 'ptid' not in self.p else self.p['ptid'] [due_date, name] = [self.p[k] for k in ['due_date', 'name']] tid = post_add_task_sql(gid, pid, tid, ptid, due_date, name) post_add_log_sql(gid, uid, pid, tid, self.user['name'] + " added new task: " + name) proj_name = project_name_sql(gid, pid) email_a = email_all_sql(gid) for rec_addr in email_a: send_email(rec_addr, "New Task Added to " + proj_name, self.user['name'] + " added new task: " + name) return Response('OK')
def post_mark(self): """Marks task as complete or incomplete.""" [pid, tid, wid, sid, status] = \ [self.p[k] for k in ['pid', 'tid', 'wid', 'sid', 'status']] post_mark_sql(self.gid, pid, tid, wid, sid, status) # log task mark task_name = task_name_sql(self.gid, pid, tid) post_add_log_sql(self.gid, self.uid, pid, tid, self.user['name'] + " marked task " + status + ": " + task_name) proj_name = project_name_sql(self.gid, pid) email_a = email_all_sql(self.gid) for rec_addr in email_a: send_email(rec_addr, "Updated task status in " + proj_name, self.user['name'] + " marked task " + status + ": " + task_name) return Response('OK')
def vote_results_csv(request): ctx = request.context user_id = authenticated_userid(request) if not user_id: raise HTTPUnauthorized histogram = request.GET.get('histogram', None) if histogram: try: histogram = int(histogram) except ValueError as e: raise HTTPBadRequest(e) if histogram > 25: raise HTTPBadRequest( "Please select at most 25 bins in the histogram.") widget = ctx._instance.widget if widget.activity_state != "ended": permissions = ctx.get_permissions() if P_ADMIN_DISC not in permissions: raise HTTPUnauthorized() output = BytesIO() output_utf8 = TextIOWrapper(output, encoding='utf-8') ctx._instance.csv_results(output_utf8, histogram) output_utf8.detach() output.seek(0) return Response(body_file=output, content_type='text/csv', charset="utf-8")
def discussion_instance_view_jsonld(request): discussion = request.context._instance user_id, permissions, salt = read_user_token(request) if not (P_READ in permissions or P_READ_PUBLIC_CIF in permissions): raise HTTPUnauthorized() if not salt and P_ADMIN_DISC not in permissions: salt = base64.urlsafe_b64encode(urandom(6)) jdata = discussion_jsonld(discussion.id) if salt: obfuscator = AESObfuscator(salt) jdata = obfuscator.obfuscate(jdata) # TODO: Add age if "callback" in request.GET: jdata = handle_jsonp(request.GET['callback'], jdata) content_type = "application/javascript" else: content_type = "application/ld+json" return Response(body=jdata, content_type=content_type, charset="utf-8")
def user_private_view_jsonld(request): if request.scheme == "http" and asbool(request.registry.settings.get( 'accept_secure_connection', False)): return HTTPFound(get_global_base_url(True) + request.path_qs) discussion_id = request.context.get_discussion_id() user_id, permissions, salt = read_user_token(request) if P_READ not in permissions: raise HTTPUnauthorized() if not salt and P_ADMIN_DISC not in permissions: salt = base64.urlsafe_b64encode(urandom(6)) jdata = userprivate_jsonld(discussion_id) if salt: obfuscator = AESObfuscator(salt) jdata = obfuscator.obfuscate(jdata) if "callback" in request.GET: jdata = handle_jsonp(request.GET['callback'], jdata) content_type = "application/javascript" else: content_type = "application/ld+json" return Response(body=jdata, content_type=content_type, charset="utf-8")
def as_mind_map(request): """Provide a mind-map like representation of the table of ideas""" for mimetype in request.GET.getall('mimetype'): mimetype = mimetype if mimetype in pygraphviz_formats: break else: mimetype = request.accept.best_match(list(pygraphviz_formats.keys())) if not mimetype: raise HTTPNotAcceptable("Not known to pygraphviz: "+mimetype) discussion = request.context._instance G = discussion.as_mind_map() G.layout(prog='twopi') io = BytesIO() G.draw(io, format=pygraphviz_formats[mimetype]) io.seek(0) return Response(body_file=io, content_type=mimetype)
def show_suggestions_test(request): discussion = request.context._instance user_id = authenticated_userid(request) if not user_id: from urllib.parse import quote return HTTPFound(location="/login?next="+quote(request.path)) discussion = request.context._instance output = StringIO() from assembl.nlp.clusters import OpticsSemanticsAnalysisWithSuggestions analysis = OpticsSemanticsAnalysisWithSuggestions( discussion, user_id=user_id, min_samples=3, test_code=str(user_id)) from pyramid_jinja2 import IJinja2Environment jinja_env = request.registry.queryUtility( IJinja2Environment, name='.jinja2') analysis.as_html(output, jinja_env) output.seek(0) return Response(body_file=output, content_type='text/html', charset="utf-8")
def auth_token(request, extra_headers=None): headers = [] if 'origin' in request.headers: headers.extend([ (_ac + 'Allow-Origin', request.headers['origin']), (_ac + 'Allow-Credentials', 'true'), (_ac + 'Expose-Headers', 'Location, Content-Type, Content-Length'), ]) if extra_headers: headers.extend(extra_headers) user_id = authenticated_userid(request) payload = { 'consumerKey': 'assembl', 'userId': (user_id or Everyone), 'ttl': 86400 } token = encode_token(payload, request.registry.settings['session.secret']) return Response(token, 200, headers, content_type='text/plain', charset="ascii")
def test_register_view_returns_response(): """Register view returns a Response object.""" from mood_bot.views.default import register request = testing.DummyRequest() response = register(request) assert isinstance(response, dict) # def test_register_user_for_login(dummy_request): # """Test that checks for user login.""" # from mood_bot.views.default import register # from pyramid.httpexceptions import HTTPFound # data_dict = {'username': 'kurtykurt', 'password': 'kurtkurt', 'password-check': 'kurtkurt'} # dummy_request.POST = data_dict # response = register(dummy_request) # assert response.status_code == 302 # assert isinstance(response, HTTPFound)
def authorize_post_(request): form = request.web_input(credentials='', username='', password='', remember_me='', mobile='', not_me='') try: credentials = json.loads(form.credentials) except ValueError: raise HTTPBadRequest() scopes = credentials.pop('scopes') error = None if form.not_me and form.username: userid, error = login.authenticate_bcrypt(form.username, form.password, bool(form.remember_me)) if error: error = errorcode.login_errors.get(error, 'Unknown error.') elif not request.userid: error = "You must specify a username and password." else: userid = request.userid if error: return Response(render_form(request, scopes, credentials, bool(form.mobile), error, form.username, form.password, bool(form.remember_me), bool(form.not_me))) credentials['userid'] = userid response_attrs = server.create_authorization_response( *(extract_params(request) + (scopes, credentials))) return OAuthResponse(*response_attrs)
def search_(request): form = request.web_input(q="", min="", max="", currency="", pc="", c="", o="") limit = 30 offset = define.get_int(form.o) commishclass = form.pc if form.pc else form.c commishclass = commishclass.lower() results = commishinfo.select_commissionable(request.userid, form.q, commishclass, commishinfo.parse_currency(form.min), commishinfo.parse_currency(form.max), form.currency, offset, limit * 2,) rcount = len(results) results = results[0:limit] media.populate_with_user_media(results) prev_index = None if offset == 0 else offset - limit if offset - limit > 0 else 0 next_index = offset + limit if rcount - limit > 0 else None return Response(define.webpage(request.userid, "etc/marketplace.html", [results, form, commishinfo.CURRENCY_CHARMAP, commishinfo.PRESET_COMMISSION_CLASSES, prev_index, next_index]))
def hello_world(request): return Response( 'Hello world from Pyramid!\n', content_type='text/plain', )
def bookmarklet_wrapper(request, endpoint): """ Return text of the SciBot bookmarklet """ code = bookmarklet_base % (request.application_url.replace('http:', 'https:'), endpoint) bookmarklet = code.replace('"', '"').replace('\n','') html = html_base % (bookmarklet, request.host.split('.', 1)[-1], code) r = Response(html) r.content_type = 'text/html' return r
def export(request): print('starting csv export') output_rows, DATE = export_impl() data = StringIO() writer = csv.writer(data) writer.writerows(sorted(output_rows)) r = Response(gzip.compress(data.getvalue().encode())) r.content_type = 'text/csv' r.headers.update({ 'Content-Disposition':'attachment;filename = RRID-data-%s.csv' % DATE, 'Content-Encoding':'gzip' }) return r
def export_json(request): print('starting json export') output_json, DATE = export_json_impl() data = json.dumps(output_json, sort_keys=True, indent=4) r = Response(gzip.compress(data.encode())) r.content_type = 'application/json' r.headers.update({ 'Content-Encoding':'gzip' }) return r
def get_ticket_resolutions(self): """returns the available ticket resolutions """ from stalker import defaults from pyramid.response import Response return Response( json_body=defaults['ticket_resolutions'] )
def get_ticket_workflow(self): """returns the available ticket workflow """ from stalker import defaults from pyramid.response import Response return Response( json_body=defaults['ticket_workflow'] ) # ACTIONS
def get_references(self): """returns the references of this project """ sql = """select rse.id, rse.name, rse.entity_type from "Project_References" as pr join "SimpleEntities" as rse on pr.link_id = rse.id where pr.project_id = :id union select rse.id, rse.name, rse.entity_type from "Task_References" as tr join "Tasks" as t on tr.task_id = t.id join "SimpleEntities" as rse on tr.link_id = rse.id where t.project_id = :id """ from stalker.db.session import DBSession from sqlalchemy import text conn = DBSession.connection() result = conn.execute(text(sql), id=self.entity_id).fetchall() from stalker_pyramid import entity_type_to_url project_ref_data = [ { 'id': r[0], 'name': r[1], 'entity_type': r[2], '$ref': '%s/%s' % (entity_type_to_url[r[2]], r[0]) } for r in result ] from pyramid.response import Response return Response(json_body=project_ref_data, status=200)
def studio_scheduling_mode(request): """Sets the system to "in schedule" mode or "normal" mode. When the system is "in schedule" mode (Studio.is_scheduling == True) it is not allowed to schedule the system again until the previous one is finishes. """ logged_in_user = get_logged_in_user(request) # get the studio studio = Studio.query.first() mode = request.params.get('mode') logger.debug('schedule mode: %s' % mode) if not studio: transaction.abort() return Response("There is no Studio instance\n" "Please create a studio first", 500) if mode: # set the mode mode = bool(int(mode)) studio.is_scheduling = mode studio.is_scheduling_by = logged_in_user studio.scheduling_started_at = local_to_utc(datetime.datetime.now()) return Response( "Successfully, set the scheduling mode to: %s" % mode )
def update_entity(self): """update user view """ # before updating the login and email, check availability availability_data = self.check_availability() login_name = self.request.params.get('login') if login_name: login_available = availability_data['login_available'] if not login_available: from pyramid.response import Response return Response( 'Login not available: %s' % login_name, status=500 ) email = self.request.params.get('email') if email: email_available = availability_data['email_available'] if not email_available: from pyramid.response import Response return Response( 'Email not available: %s' % email, status=500 ) # update super data try: return super(UserViews, self).update_entity() except Exception as e: import transaction transaction.abort() from pyramid.response import Response return Response( body=str(e), status=500 )
def get_vacations(self): """returns user vacations """ sql = """ select "Vacations".id, "SimpleEntities".name, "SimpleEntities".entity_type from "Vacations" join "SimpleEntities" on "Vacations".id = "SimpleEntities".id where "Vacations".user_id = :id """ from stalker.db.session import DBSession conn = DBSession.connection() from sqlalchemy import text result = conn.execute(text(sql), id=self.entity_id) from stalker_pyramid import entity_type_to_url data = [{ 'id': r[0], '$ref': '%s/%s' % (entity_type_to_url[r[2]], r[0]), 'name': r[1], 'entity_type': r[2] } for r in result.fetchall()] from pyramid.response import Response return Response( json_body=data, status=200 ) # User <-> Task
def server_error(exc, request): msg = exc.args[0] if exc.args else '' response = Response('Server Error: %s' % msg, 500) transaction.abort() return response
def timeleft_view(self): if not self.request.root.properties[PROP_KEYS.QUEUE_ENABLED]: return 1 response = Response("text/plain") queue = Queue(self.request) response.body = str(queue.purchase_time_left()) return response
def check_username_view(self): user_id = self.request.matchdict["user_id"].lower() if user_id in self.request.root.users: return Response("true") else: return Response("false")
def my_view(request): try: one = DBSession.query(MyModel).filter(MyModel.name == 'one').first() except DBAPIError: return Response(conn_err_msg, content_type='text/plain', status_int=500) return {'one': one, 'project': 'source'}
def picture_handler(request): """Serve pictures from database binaries.""" if request.matchdict["db_id"] == "add": picture_data = request.dbsession.query(AddressBook).get(request.matchdict['pic_id']) elif request.matchdict["db_id"] == "cat": picture_data = request.dbsession.query(Category).get(request.matchdict['pic_id']) elif request.matchdict["db_id"] == "att": picture_data = request.dbsession.query(Attribute).get(request.matchdict['pic_id']) mime_type = picture_data.pic_mime if sys.version_info[0] < 3: mime_type = mime_type.encode('utf-8') return Response(content_type=mime_type, body=picture_data.picture)
def head(self): return Response()