我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用flask.jsonify()。
def index(): code = request.args.get("code", "") app.logger.debug("code: %s" %code) #app.logger.debug(request.args) if g.signin: return "logged_in" elif code: _data = Get_Access_Token(code) access_token = _data.get('access_token') userData = Get_User_Info(access_token) app.logger.debug(userData) #resp = render_template('info.html', userData=userData) #resp.set_cookie(key="logged_in", value='true', expires=None) resp = jsonify(userData) resp.set_cookie(key="logged_in", value='true', expires=None) return resp else: return redirect(url_for("login"))
def omw_lang_selector(): selected_lang = request.cookies.get('selected_lang') selected_lang2 = request.cookies.get('selected_lang2') lang_id, lang_code = fetch_langs() html = '<select name="lang" style="font-size: 85%; width: 9em" required>' for lid in lang_id.keys(): if selected_lang == str(lid): html += """<option value="{}" selected>{}</option> """.format(lid, lang_id[lid][1]) else: html += """<option value="{}">{}</option> """.format(lid, lang_id[lid][1]) html += '</select>' html += '<select name="lang2" style="font-size: 85%; width: 9em" required>' for lid in lang_id.keys(): if selected_lang2 == str(lid): html += """<option value="{}" selected>{}</option> """.format(lid, lang_id[lid][1]) else: html += """<option value="{}">{}</option> """.format(lid, lang_id[lid][1]) html += '</select>' return jsonify(result=html)
def nowplaying(): username = "@" + request.form.get('user_name', None) user_id = request.form.get('user_id', None) lastfm_user = request.form.get('text', None) if lastfm_user == "" or lastfm_user == None: user = User.query.filter_by(user_id=user_id).first() if user == None: return "Please enter a valid username", 200 lastfm_user = user.lastfm if lastfm_user == None: return "Please enter a valid username", 200 np = lastfm_network.get_user(lastfm_user).get_now_playing() if np == None: return "No song playing.", 200 payload = { "response_type": "in_channel", "text": "<http://last.fm/user/" + lastfm_user + "|" + lastfm_user + "> is currently listening to <" + np.get_url() + "|" + np.artist.name + " - " + np.title + ">", "unfurl_links": False } return jsonify(payload), 200
def get_chapter(book_id): # chapters = Chapter.query.filter_by(book_id=book_id).all() page = request.args.get('page', 1, type=int) pagination = Chapter.query.filter_by(book_id=book_id).paginate( page, per_page=current_app.config['CHAPTER_PER_PAGE'], error_out=False ) chapters = pagination.items prev = None if pagination.has_prev: prev = url_for('api.get_chapter', book_id=book_id, page=page-1, _external=True) next = None if pagination.has_next: next = url_for('api.get_chapter', book_id=book_id, page=page+1, _external=True) return jsonify({ 'chapters': [chapter.to_json() for chapter in chapters], 'prev': prev, 'next': next })
def is_hot_dog(): if request.method == 'POST': if not 'file' in request.files: return jsonify({'error': 'no file'}), 400 # Image info img_file = request.files.get('file') img_name = img_file.filename mimetype = img_file.content_type # Return an error if not a valid mimetype if not mimetype in valid_mimetypes: return jsonify({'error': 'bad-type'}) # Write image to static directory and do the hot dog check img_file.save(os.path.join(app.config['UPLOAD_FOLDER'], img_name)) hot_dog_conf = rekognizer.get_confidence(img_name) # Delete image when done with analysis os.remove(os.path.join(app.config['UPLOAD_FOLDER'], img_name)) is_hot_dog = 'false' if hot_dog_conf == 0 else 'true' return_packet = { 'is_hot_dog': is_hot_dog, 'confidence': hot_dog_conf } return jsonify(return_packet)
def create(): form = UserCreateForm.from_json(request.get_json()) if not form.validate(): return jsonify(form.errors), 400 user = User() user.email = form.data.get('email') user.first_name = form.data.get('first_name') user.last_name = form.data.get('last_name') user.avatar = form.data.get('avatar', None) user.password = User.make_password(form.data.get('password')) user.save() access_token = jwt.jwt_encode_callback(user) return jsonify({'user': user.serialize(), 'access_token': access_token.decode('utf-8')}), 200
def login(): data = request.get_json() username = data.get(current_app.config.get('JWT_AUTH_USERNAME_KEY'), None) password = data.get(current_app.config.get('JWT_AUTH_PASSWORD_KEY'), None) criterion = [username, password, len(data) == 2] if not all(criterion): return jsonify({'message': 'Invalid credentials'}), 401 user = jwt.authentication_callback(username, password) if user: if not user.is_active: return jsonify({'message': 'InActive User'}), 401 access_token = jwt.jwt_encode_callback(user) return jsonify({'user': user.serialize(), 'access_token': access_token.decode('utf-8')}), 200 else: return jsonify({'message': 'Invalid credentials'}), 401
def rundeck_list_groups(): ''' Return the list of groups from all available profiles ''' resp_obj = {} awsconfig = aws_config.AwsConfig() profiles = awsconfig.get_profiles() for profile in profiles: session = boto3.Session(profile_name=profile) iamclient = session.client('iam') try: groupinfo = iamclient.list_groups() except botocore.exceptions.ClientError: groupinfo['Groups'] = [] for group in groupinfo['Groups']: grouptext = "(%s) %s" % (profile, group['GroupName']) resp_obj[grouptext] = group['GroupName'] return jsonify(resp_obj)
def rundeck_list_iam_policies(): ''' Return the list of profiles from all available profiles ''' resp_obj = {} awsconfig = aws_config.AwsConfig() profiles = awsconfig.get_profiles() for profile in profiles: session = boto3.Session(profile_name=profile) iamclient = session.client('iam') try: policyinfo = iamclient.list_policies() except botocore.exceptions.ClientError: policyinfo['Policies'] = [] for policy in policyinfo['Policies']: policytext = "(%s) %s" % (profile, policy['PolicyName']) resp_obj[policytext] = policy['PolicyName'] return jsonify(resp_obj)
def detailed_id(): ili_id = request.args.get('ili_id', None) rate_hist = fetch_rate_id([ili_id]) comm_hist = fetch_comment_id([ili_id]) users = fetch_allusers() r_html = "" for r, u, t in rate_hist[int(ili_id)]: r_html += '{} ({}): {} <br>'.format(users[u]['userID'], t, r) c_html = "" for c, u, t in comm_hist[int(ili_id)]: c_html += '{} ({}): {} <br>'.format(users[u]['userID'], t, c) html = """ <td colspan="9"> <div style="width: 49%; float:left;"> <h6>Ratings</h6> {}</div> <div style="width: 49%; float:right;"> <h6>Comments</h6> {}</div> </td>""".format(r_html, c_html) return jsonify(result=html)
def min_omw_concepts(ss=None, ili_id=None): if ili_id: ss_ids = f_ss_id_by_ili_id(ili_id) else: ss_ids = [ss] pos = fetch_pos() langs_id, langs_code = fetch_langs() ss, senses, defs, exes, links = fetch_ss_basic(ss_ids) ssrels = fetch_ssrel() return jsonify(result=render_template('min_omw_concept.html', pos = pos, langs = langs_id, senses=senses, ss=ss, links=links, ssrels=ssrels, defs=defs, exes=exes))
def network(interface=None): if interface: try: netifaces.ifaddresses(interface) interfaces = [interface] except ValueError: return jsonify({"message": "interface {} not available".format(interface)}), 404 else: interfaces = netifaces.interfaces() data = dict() for i in interfaces: try: data[i] = netifaces.ifaddresses(i)[2] except KeyError: data[i] = {"message": "AF_INET data missing"} return jsonify(data)
def run_selected_case(): # return jsonify(dict(name='selenium')) data = request.get_json() start = handle.now_str() # ???mongo??case?? db = MongoClient() case_list = db.get_case_by_name(data.get('case_name')) obj = apiFunc() rt = obj.run_tests(case_list, app.config['THREAD_NUM']) report = obj.writeReport(rt) html = render_template('testResult.html',failNum=rt['failed_num'], ignored=rt['ignored'], successNum=rt['success_num'], total=rt['total'], start=start, end=handle.now_str(), cost="{:.2}?".format(handle.delta(start, handle.now_str())), fileName=report) return jsonify(dict(html=html))
def taskstatus(task_id): task = analyzetweets.AsyncResult(str(task_id)) if task.state == "PENDING": response = { "state": task.state, "current": 0, "total": NUMBER_OF_TWEETS } elif task.state != "FAILURE": response = { "state": task.state, "current": task.info.get("current", 0), "total": NUMBER_OF_TWEETS } if "subjectivityavg" in task.info: response["subjectivityavg"] = task.info["subjectivityavg"] if "sentimentavg" in task.info: response["sentimentavg"] = task.info["sentimentavg"] else: response = { "state": task.state, "current": 1, "total": NUMBER_OF_TWEETS } return jsonify(response)
def login(): """Login POST handler. Only runs when ``/login`` is hit with a POST method. There is no GET method equivilent, as it is handled by the navigation template. Sets the status code to ``401`` on login failure. Returns: JSON formatted output describing success or failure. """ log.debug("Entering login, attempting to authenticate user.") username = request.form['signin_username'] password = request.form['signin_password'] log.debug("Username: {0}".format(username)) if fe.check_auth(username, password): log.debug("User authenticated. Trying to set session.") session_id = fe.set_login_id() session['logged_in'] = session_id log.debug("Session ID: {0}, returning to user".format(session_id)) return jsonify({ "login": "success" }) log.debug("Username or password not recognized, sending 401.") response.status = 401 return jsonify({ "login": "failed" })
def star(): """Starring/Highlighting handler. Attempts to toggle a star/highlight on a particular show. The show ID must be passed in the ``id`` query string. If the user is unauthenticated, the function is aborted with a ``404`` message to hide the page. Returns: JSON formatted output describing success and the ID of the show starred. """ log.debug("Entering star, trying to toggle star.") if fe.check_login_id(escape(session['logged_in'])): log.debug("Sending show ID {0} to function".format(request.args['id'])) fe.star_show(request.args['id']) log.debug("Returning to user.") return jsonify({ "star": "success", "id": request.args['id'] }) log.debug("User cannot be authenticated, send 404 to hide page.") abort(404)
def scan_scrapers(): """On demand scrapper scanning handler. For some reason the scheduler doesn't always work, this endpoint allows for instant scanning, assuming it's not already occurring. The function is aborted with a ``404`` message to hide the page if the user is not authenticated. Scanning can take a long time - 20 to 30 minutes - so it's recommended this endpoint be called asynchronously. Returns: JSON formatted output to identify that scanning has completed or is already ongoing. """ log.debug("Entering scan_scrapers.") if fe.check_login_id(escape(session['logged_in'])): log.debug("User is logged in, attempting to begin scan.") if not fe.scrape_shows(): log.debug("scrape_shows returned false, either the lockfile exists incorrectly or scraping is ongoing.") return jsonify({"scan":"failure", "reason":"A scan is ongoing"}) log.debug("scrape_shows just returned. Returning success.") return jsonify({"scan":"success"}) log.debug("User cannot be authenticated, send 404 to hide page.") abort(404)
def get_role_credentials(api_version, requested_role, junk=None): try: role_params = roles.get_role_params_from_ip( request.remote_addr, requested_role=requested_role ) except roles.UnexpectedRoleError: return '', 403 try: assumed_role = roles.get_assumed_role_credentials( role_params=role_params, api_version=api_version ) except roles.GetRoleError as e: return '', e.args[0][0] return jsonify(assumed_role)
def get_instance_identity_document(api_version): time_format = "%Y-%m-%dT%H:%M:%SZ" now = datetime.datetime.now(dateutil.tz.tzutc()) ret = { 'privateIp': '127.255.0.1', 'devpayProductCodes': None, 'availabilityZone': 'us-east-1a', 'version': '2010-08-31', 'accountId': '1234', 'instanceId': 'i-{0}'.format(app.config['MOCKED_INSTANCE_ID']), 'billingProducts': None, 'instanceType': 't2.medium', # This may be a terrible mock for this... 'pendingTime': now.strftime(time_format), 'imageId': 'ami-1234', 'kernelId': None, 'ramdiskId': None, 'architecture': 'x86_64', 'region': 'us-east-1' } return jsonify(ret)
def iam_sts_credentials(api_version, requested_role, junk=None): if not _supports_iam(api_version): return passthrough(request.path) try: role_params = roles.get_role_params_from_ip( request.remote_addr, requested_role=requested_role ) except roles.UnexpectedRoleError: msg = "Role name {0} doesn't match expected role for container" log.error(msg.format(requested_role)) return '', 404 log.debug('Providing assumed role credentials for {0}'.format(role_params['name'])) assumed_role = roles.get_assumed_role_credentials( role_params=role_params, api_version=api_version ) return jsonify(assumed_role)
def post(self, params=None): user = SharedSessionMaster.new( alias=params['client_alias'] ) secret = SharedSessionSecret.new( shares=params['session_policies']['shares'], quorum=params['session_policies']['quorum'], protocol=params['session_policies']['protocol'] ) session = CombineSession.new( session_id=params.get('session_id'), master=user, alias=params['session_alias'], session_type=params['session_type'], secret=secret ).store() return flask.jsonify( { "session": session.to_api(auth=user.uuid), "session_id": str(session.uuid) } )
def put(self, session_id, params=None): session = CombineSession.get(session_id) if not session: raise exceptions.ObjectNotFoundException user = params.get('auth') and session.get_user(params['auth'], alias=str(params['client_alias'])) \ or session.join(params['client_alias']) if not user: raise exceptions.ObjectDeniedException user.session = session if params.get('share'): session.secret.add_share(Share(params['share'], str(user.uuid))) session.update() return flask.jsonify( { "session": session.to_api(auth=str(user.uuid)), "session_id": str(session.uuid) } )
def get(self, session_id, params=None): session = SplitSession.get(session_id, auth=params['auth']) if not session: raise exceptions.ObjectNotFoundException if not session.ttl: raise exceptions.ObjectExpiredException if session.current_user.is_shareholder and session.secret.splitted and not \ session.secret.user_have_share(session.current_user): session.current_user.secret.attach_user_to_share(session.current_user) session.update() return flask.jsonify( { "session": session.to_api(auth=params['auth']), "session_id": str(session.uuid) } )
def upload(): starttime = time.time() groupid = None if request.args.get('groupid'): groupid = request.args.get('groupid') else: groupid = (int(time.time()) * 1000) + random.randint(0,999) f = request.files.getlist('vidfiles') savelocation = './videos/{0}'.format(groupid) if not os.path.exists(savelocation): os.makedirs(savelocation) for file in f: file.save(os.path.join(savelocation,file.filename.lower())) endtime = time.time() totaltime = endtime-starttime thread = threading.Thread(target=video_processing, args=(str(groupid),)) thread.start() return jsonify(groupid=groupid, time=totaltime)
def get(self, testcase_name): # pylint: disable=no-self-use """ GET the info of one testcase""" testcase = Testcase().show(testcase_name) if not testcase: return api_utils.result_handler( status=1, data="The test case '%s' does not exist or is not supported" % testcase_name) testcase_info = api_utils.change_obj_to_dict(testcase) dependency_dict = api_utils.change_obj_to_dict( testcase_info.get('dependency')) testcase_info.pop('name') testcase_info.pop('dependency') result = {'testcase': testcase_name} result.update(testcase_info) result.update({'dependency': dependency_dict}) return jsonify(result)
def post(self): check_user = check_login() if check_user is None: return jsonify({'status': 'error', 'msg': 'no authority'}) if check_user == -1: return jsonify({'status': 'error', 'msg': 'user is valid'}) user_id = session.get('user_id') counts = db.session.query(Message).filter(Message.to_id == user_id).count() page = int(request.form.get('page', 0)) nums = (page - 1 if page >= 0 else 0) * 5 msgs = db.session.query(Message).filter(Message.to_id == user_id).order_by(Message.id.desc())[nums: nums + 5] all_page = int(counts / 5) + (1 if counts % 5 != 0 else 0) return jsonify({'status': 'ok', 'data': [msg.to_json() for msg in msgs], 'len': len(msgs), 'now': page, 'all': all_page})
def reset_api_key(name): """ Reset API-KEY for user. Returns a Jinja2 template. """ if request.method == 'POST': user = user_repo.get_by_name(name) if not user: return abort(404) ensure_authorized_to('update', user) user.api_key = model.make_uuid() user_repo.update(user) cached_users.delete_user_summary(user.name) msg = gettext('New API-KEY generated') flash(msg, 'success') return redirect_content_type(url_for('account.profile', name=name)) else: csrf = dict(form=dict(csrf=generate_csrf())) return jsonify(csrf)
def run_cmd(): """Execute general (non-host specific) command.""" #Read 'command' argument from query string. comm = request.args.get('command') if not comm or comm in EXCLUDED_FUNCTIONS: abort(400) try: #Read 'args' argument from query string. args = request.args.getlist('args') #Cast arguments to string. for i, val in enumerate(args): args[i] = str(val) except KeyError: args = [] #Execute command. ret = RSPET_API.call_plugin(comm, args) if ret['code'] == rspet_server.ReturnCodes.OK: http_code = 200 else: http_code = 404 return make_response(jsonify(ret), http_code)
def general_help(): """Return general help.""" hlp_sntx = RSPET_API.help() #Read 'state' argument from query string. cur_state = request.args.get('state') #Remove excluded functions. for hlp in EXCLUDED_FUNCTIONS: if hlp in hlp_sntx: hlp_sntx.pop(hlp) #Remove out of scope functions. tmp = {} for hlp in hlp_sntx: if cur_state in hlp_sntx[hlp]['states']: tmp[hlp] = hlp_sntx[hlp] return jsonify({'commands': [make_public_help(command, hlp_sntx[command])\ for command in tmp]})
def index(): code = request.args.get("code", "") #app.logger.debug("code:%s" %code) #app.logger.debug(request.args) if g.signin: return "logged_in" elif code: _data = Get_Access_Token(code) access_token = _data['access_token'] userData = Get_User_Info(access_token) app.logger.debug(userData) #resp = render_template('info.html', userData=userData) #resp.set_cookie(key="logged_in", value='true', expires=None) resp = jsonify(userData) resp.set_cookie(key="logged_in", value='true', expires=None) return resp else: return redirect(url_for("login"))
def index(): code = request.args.get("code", "") #app.logger.debug("code:%s" %code) #app.logger.debug(request.args) if g.signin: return "logged_in" elif code: _data = Get_Access_Token(code) app.logger.debug(_data) access_token = _data['access_token'] uid = _data['uid'] userData = Get_User_Info(access_token, uid) app.logger.debug(userData) #resp = render_template('info.html', userData=userData) #resp.set_cookie(key="logged_in", value='true', expires=None) resp = jsonify(userData) resp.set_cookie(key="logged_in", value='true', expires=None) return resp else: return redirect(url_for("login"))
def index(): code = request.args.get("code", "") #app.logger.debug("code:%s" %code) #app.logger.debug(request.args) if g.signin: return "logged_in" elif code: _data = Get_Access_Token(code) access_token = _data['access_token'] openid = Get_OpenID(access_token)['openid'] userData = Get_User_Info(access_token, openid) app.logger.debug(userData) #resp = render_template('info.html', userData=userData) #resp.set_cookie(key="logged_in", value='true', expires=None) resp = jsonify(userData) resp.set_cookie(key="logged_in", value='true', expires=None) return resp else: return redirect(url_for("login"))
def upload_music(): name = request.form.get('name') author = request.form.get('author') authorID = int(request.form.get('authorID')) date = request.form.get('date') music_name = request.form.get('musicName') img_name = request.form.get('imgName') info = request.form.get('musicInfo') author_avatar = request.form.get('authorAvatarName') upload_result = db.music_insert(name, author, authorID, author_avatar, date, music_name, img_name, info) if upload_result == 0: dic = {"error": 0, "msg": "OK"} return jsonify(dic), 200 elif upload_result == 1: dic = {"error": 21, "msg": "Exist the same name"} return jsonify(dic), 409
def predict(): import ipdb; ipdb.set_trace(context=20) if 'file' not in request.files: flash('No file part') return redirect(request.url) file = request.files['file'] if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): filename = secure_filename(file.filename) try: pokemon_name = predict_mlp(file).capitalize() pokemon_desc = pokemon_entries.get(pokemon_name) msg = "" except Exception as e: pokemon_name = None pokemon_desc = None msg = str(e) return jsonify({'name': pokemon_name, 'description': pokemon_desc, "msg": msg})
def get_concepts(): keywords = request.args.get('keywords', None) semanticGroups = request.args.get('semanticGroups', None) pageSize = int(request.args.get('pageSize', 1)) pageNumber = int(request.args.get('pageNumber', 1)) validatePagination(pageSize, pageNumber) validateKeywords(keywords) q = GolrSearchQuery( term=keywords, category=build_categories(semanticGroups), rows=pageSize, start=getStartIndex(pageNumber, pageSize) ) results = q.exec() concepts = [] for d in results['docs']: concept = parse_concept(d) concepts.append(concept) return jsonify(concepts)
def get_types(): frequency = {semanticGroup : 0 for semanticGroup in semantic_mapping.keys()} results = GolrAssociationQuery( rows=0, facet_fields=['subject_category', 'object_category'] ).exec() facet_counts = results['facet_counts'] subject_category = facet_counts['subject_category'] object_category = facet_counts['object_category'] for key in subject_category: frequency[monarch_to_UMLS(key)] += subject_category[key] for key in object_category: frequency[monarch_to_UMLS(key)] += object_category[key] return jsonify([{'id' : c, 'idmap' : None, 'frequency' : f} for c, f in frequency.items()])
def get_predicates(): """ I'm not quite sure how to best get at all the predicates and tag them as relations with id's """ """ results = GolrAssociationQuery( rows=0, facet_fields=['relation'] ).exec() facet_counts = results['facet_counts'] relations = facet_counts['relation'] return jsonify([{'id' : "biolink:"+c, 'name' : c, 'definition' : None} for key in relations]) """ # Not yet implemented... don't really know how return jsonify([])
def make_json_app(import_name, **kwargs): """ Creates a JSON-oriented Flask app. All error responses that you don't specifically manage yourself will have application/json content type, and will contain JSON like this (just an example): { "message": "405: Method Not Allowed" } """ def make_json_error(ex): response = jsonify(message=str(ex)) response.status_code = (ex.code if isinstance(ex, HTTPException) else 500) return response app = Flask(import_name, **kwargs) for code in default_exceptions.iterkeys(): app.register_error_handler(code, make_json_error) return app
def messages(room_id): room_member = RoomMember.where('room_id', room_id).where('user_id', g.user['id']).first() if not room_member: return jsonify({'message': "Unknown Room"}), 400 room = Room.where('id', room_id).with_('members.user').first().serialize() for item in room['members']: item['user']['username'] = '%s %s' % (item['user']['first_name'],item['user']['last_name']) messages = Message.select('messages.*', 'u.first_name', 'u.last_name', 'u.avatar') \ .where('room_id', room_id) \ .join('users as u', 'u.id', '=', 'messages.sender_id') \ .order_by('created_at', 'desc') \ .limit(100) \ .get() return jsonify({'room': room, 'messages': messages.serialize()}), 200
def s_connect(): g.user = get_jwt_user() if not g.user: return jsonify({}), 403 # get user rooms my_rooms = RoomMember.select('room_id').where('user_id', g.user['id']).group_by('room_id').get() for room in my_rooms: join_room('room-%s' % room.room_id) connected_users.append({ 'id': g.user['id'], 'sid': request.sid }) socketio.emit('user_connect',{'id':g.user['id']})
def do_summary(self, *args, **kwargs): return flask.jsonify(self.df.describe().to_dict())
def _process_json(self, df, **kwargs): time_frame = None if not kwargs['time_frame'] else kwargs['time_frame'] from_time = None if not kwargs['from_time'] else kwargs['from_time'] to_time = None if not kwargs['to_time'] else kwargs['to_time'] if time_frame and time_frame != 'custom': from_time, to_time = self._time_frame(time_frame) if from_time or to_time: df = df[from_time:to_time] if kwargs['columns_data']: return flask.jsonify(self.columns_data(df)) j = df.to_json(date_format='iso', orient=kwargs['orient'], double_precision=2, date_unit='s') return flask.jsonify(json.loads(j))
def do_describe(self, *args, **kwargs): ret = {'csv_file': self.handler.data_file, 'last_update': self.last_update, 'samples': len(self.df), 'auto_refresh': self.auto_refresh, 'commands': [x.split('do_')[-1].replace('_', '-') for x in dir(self) if 'do_' in x], 'status': self._status()} return flask.jsonify(ret)
def do_describe(self, *args, **kwargs): ret = {'csv_file': self.handler.data_file, 'started': self.started, 'last_update': self.last_update, 'commands': [x.split('do_')[-1].replace('_', '-') for x in dir(self) if 'do_' in x], 'status': self._status()} return flask.jsonify(ret)
def index(): ''' Default path to orca server. List all the services available ''' services = ['s3', 'iam', 'ec2'] resp_obj = {} resp_obj['services'] = services return jsonify(resp_obj)
def rundeck_list_resources(): ''' Return a list of S3 and EC2 Resources from all available profiles ''' resp_obj = {} awsconfig = aws_config.AwsConfig() profiles = awsconfig.get_profiles() # Populate s3 buckets. for profile in profiles: session = boto3.Session(profile_name=profile) s3client = session.client('s3') try: s3info = s3client.list_buckets() except botocore.exceptions.ClientError: s3info['Buckets'] = [] for bucket in s3info['Buckets']: bucket_text = "s3: (%s) %s" % (profile, bucket['Name']) resp_obj[bucket_text] = bucket['Name'] # Populate ec2 instances. for profile in profiles: session = boto3.Session(profile_name=profile) ec2client = session.client('ec2', region_name="us-east-1") try: ec2info = ec2client.describe_instances() except botocore.exceptions.ClientError: ec2info['Instances'] = [] for reservation in ec2info['Reservations']: for instance in reservation['Instances']: instance_text = "ec2: (%s) %s" % \ (profile, instance['InstanceId']) resp_obj[instance_text] = instance['InstanceId'] return jsonify(resp_obj)
def list_groups(profile): ''' Return all the groups. ''' resp_obj = {} resp_obj['status'] = 'OK' awsconfig = aws_config.AwsConfig() profiles = awsconfig.get_profiles() profile_valid = False for configuredprofile in profiles: if profile == configuredprofile: profile_valid = True if not profile_valid: resp_obj['status'] = 'FAIL' return jsonify(resp_obj) session = boto3.Session(profile_name=profile) iamclient = session.client('iam') try: groupinfo = iamclient.list_groups() except botocore.exceptions.ClientError: groupinfo['Groups'] = [] groups = [] for group in groupinfo['Groups']: groups.append(group['GroupName']) resp_obj['groups'] = groups return jsonify(resp_obj)
def custResponse(code=404, message="Error: Not Found", data=None): message = { "status": code, "message": message } if data: for k, v in data.items(): message[k] = v resp = jsonify(message) resp.status_code = code return resp
def leaderboard(): if request_wants_json(): leader_gen = users.generate_leaders(leader_factory=freeze_leader) return jsonify({'leaders': list(leader_gen)}) leader_gen = users.generate_leaders() return render_template( 'leaderboard.html', leader_gen=leader_gen, )