Python flask.json 模块,dumps() 实例源码

我们从Python开源项目中,提取了以下45个代码示例,用于说明如何使用flask.json.dumps()

项目:autolipsync    作者:evgenijkatunov    | 项目源码 | 文件源码
def post_phonemes():
    print(request.files)
    if 'wave' not in request.files:
        abort(400)
    file = request.files['wave']
    tf = tempfile.NamedTemporaryFile(dir=UPLOAD_FOLDER)
    tmp_filename = tf.name
    tf.close()
    file.save(tmp_filename)
    wave = Wave()
    wave.load(tmp_filename)
    recogn = PhonemeRecognition()
    recogn.load('model_en_full')
    recogn.predict(wave)
    result = {'phonemes': wave.get_phoneme_map()}
    print(result)
    return json.dumps(result)
项目:flask-restler    作者:klen    | 项目源码 | 文件源码
def run(self, Resource, path=None, query_string=None, kwargs=None, **rkwargs):
        """Run given resource manually.

        See werkzeug.test.EnvironBuilder for rkwargs.
        """
        resource = Resource(self, raw=True)

        if isinstance(query_string, dict):
            if 'where' in query_string:
                query_string['where'] = json.dumps(query_string['where'])
            query_string = urlencode(query_string)

        rkwargs['query_string'] = query_string
        args = []
        if path:
            args.append(path)
        ctx = self.app.test_request_context(*args, **rkwargs)

        with ctx:
            kwargs = kwargs or {}
            return resource.dispatch_request(**kwargs)
项目:horse    作者:pragmaticcoders    | 项目源码 | 文件源码
def test_user_can_follow_another_user(client, users_repo):
    eve = User(name='Eve')
    adam = User(name='Adam')
    users_repo.store(eve)
    users_repo.store(adam)

    response = client.post('/users/{}/follow'.format(eve.pk), data=json.dumps({
        'pk': adam.pk
    }), content_type='application/json')
    assert response.status_code == 200

    response = client.get('/users/{}'.format(eve.pk))
    assert response.status_code == 200

    followed_users = json.loads(response.data)['followed_users']
    followed_users_names = [u['name'] for u in followed_users]

    assert 'Adam' in followed_users_names
项目:cobalt    作者:PressLabs    | 项目源码 | 文件源码
def create(self, data, suffix=''):
        """Create a new object inside the data store.

        Args:
            data (dict): Dictionary containing values you want to store
            suffix (str): If given then it will be used as a key while storing

        Returns:
            etcd.Result: THe created object expanded
        """
        append = True if suffix == '' else False
        key = '/{}/{}'.format(self.KEY, suffix)

        entry = self.client.write(key, json.dumps(data), append=append)

        return self._load_from_etcd(entry)
项目:cobalt    作者:PressLabs    | 项目源码 | 文件源码
def update(self, entity):
        """Update an existing object from the data store.

        Args:
            entity (etcd.Result): An expanded result object that needs to be serialized

        Returns:
            etcd.Result: The resulting object expanded, or None if Etcd failed
        """
        entity.value = json.dumps(entity.value)

        try:
            entity = self.client.update(entity)
        except (etcd.EtcdCompareFailed, etcd.EtcdKeyNotFound):
            return None

        return self._load_from_etcd(entity)
项目:flyby    作者:Skyscanner    | 项目源码 | 文件源码
def test_server_deregister_service(dynamodb):
    app.test_client().post(
        '/service',
        data=json.dumps({'name': 'foo', 'fqdn': 'foo.example.com'})
    )
    response = app.test_client().delete(
        '/service/foo'
    )
    assert response.status_code == 200
    describe_response = app.test_client().get(
        '/service/foo'
    )
    assert describe_response.status_code == 404
    deletion_response = app.test_client().delete(
        '/service/foo'
    )
    assert deletion_response.status_code == 404
    assert deletion_response.data.decode('UTF-8') == 'Service: foo not currently registered with Flyby.'
项目:flyby    作者:Skyscanner    | 项目源码 | 文件源码
def test_server_deregister_target_group(dynamodb):
    app.test_client().post(
        '/service',
        data=json.dumps({'name': 'foo', 'fqdn': 'foo.example.com'})
    )
    app.test_client().post(
        '/target',
        data=json.dumps(
            {
                'service_name': 'foo',
                'target_group_name': 'foo-blue',
                'weight': 50,
            }
        )
    )
    response = app.test_client().delete('/target/foo/foo-blue')
    assert response.status_code == 200
    service = json.loads(app.test_client().get('/service/foo').data)
    assert service['target_groups'] == []
    second_response = app.test_client().delete('/target/foo/foo-blue')
    assert second_response.status_code == 404
    service = json.loads(app.test_client().get('/service/foo').data)
    assert service['target_groups'] == []
项目:flyby    作者:Skyscanner    | 项目源码 | 文件源码
def test_server_register_backend(dynamodb):
    app.test_client().post(
        '/service',
        data=json.dumps({'name': 'foo', 'fqdn': 'foo.example.com'})
    )
    app.test_client().post(
        '/target',
        data=json.dumps({'target_group_name': 'foo-blue', 'service_name': 'foo', 'weight': 80})
    )
    response = app.test_client().post(
        '/backend',
        data=json.dumps(
            {
                'host': '10.0.0.1:80',
                'service_name': 'foo',
                'target_group_name': 'foo-blue',
            }
        )
    )
    assert response.status_code == 200
    assert json.loads(response.data)['host'] == '10.0.0.1:80'
项目:flask_ishuhui    作者:lufficc    | 项目源码 | 文件源码
def chapter(comic_id, chapter_id):
    comic = data.get_comic(comic_id)
    chapter = data.get_chapter(chapter_id)
    next_chapter = data.get_next_chapter(comic_id, chapter.chapter_number)
    prev_chapter = data.get_prev_chapter(comic_id, chapter.chapter_number)
    url = 'http://www.ishuhui.net/ComicBooks/ReadComicBooksToIsoV1/' + str(
        chapter_id) + '.html'
    if chapter.comic_id != comic_id:
        abort(404)
    if chapter.images:
        images = json.loads(chapter.images)
    else:
        images = get_images_from_url(url)
        chapter.images = json.dumps(images)
        db.session.commit()
    return render_template(
        'images.html',
        comic=comic,
        chapter=chapter,
        next_chapter=next_chapter,
        prev_chapter=prev_chapter,
        images=images,
        url=url)
项目:guides-cms    作者:pluralsight    | 项目源码 | 文件源码
def slack_stats():
    """
    Screen-scrape slack signup app since it's dynamic with node.js and grabs
    from slack API.
    """

    stats = ''
    resp = requests.get(SLACK_URL)

    if resp.status_code == 200:
        user_count = re.search(r'<p class="status">(.*?)</p>', resp.content)
        if user_count is not None:
            stats = user_count.group(1)

    return Response(response=json.dumps({'text': stats}), status=200,
                    mimetype='application/json')
项目:guides-cms    作者:pluralsight    | 项目源码 | 文件源码
def add_heart():
    """
    Add heart to referenced article and return new heart count as JSON
    """

    user = models.find_user()
    if user is None:
        data = {'error': 'Cannot heart unless logged in'}
        return Response(response=json.dumps(data), status=401,
                        mimetype='application/json')

    count = models.add_heart(request.form['stack'], request.form['title'],
                             user.login)

    return Response(response=json.dumps({'count': count}), status=200,
                    mimetype='application/json')
项目:guides-cms    作者:pluralsight    | 项目源码 | 文件源码
def remove_heart():
    """
    Remove heart to referenced article and return new heart count as JSON
    """

    user = models.find_user()
    if user is None:
        data = {'error': 'Cannot heart unless logged in'}
        return Response(response=json.dumps(data), status=401,
                        mimetype='application/json')

    count = models.remove_heart(request.form['stack'], request.form['title'],
                                user.login)

    return Response(response=json.dumps({'count': count}), status=200,
                    mimetype='application/json')
项目:ostip    作者:kx499    | 项目源码 | 文件源码
def indicator_bulk_add():
    req_keys = ('control', 'data_type', 'event_id', 'pending', 'data')

    try:
        pld = request.get_json(silent=True)
    except Exception, e:
        return json.dumps({'results': 'error', 'data': '%s' % e})

    if _valid_json(req_keys, pld):
        # load related stuff
        res_dict = ResultsDict(pld['event_id'], pld['control'])
        for val, desc in pld['data']:
            res_dict.new_ind(data_type=pld['data_type'],
                             indicator=val,
                             date=None,
                             description=desc)

        results = _add_indicators(res_dict, pld['pending'])
        return json.dumps({'results': 'success', 'data': results})
    else:
        return json.dumps({'results': 'error', 'data': 'bad json'})
项目:ostip    作者:kx499    | 项目源码 | 文件源码
def api_indicator_get():
    req_keys = ('conditions',)
    req_keys2 = ('field', 'operator', 'val')
    try:
        pld = request.get_json(silent=True)
    except Exception, e:
        return json.dumps({'results': 'error', 'data': '%s' % e})

    if not _valid_json(req_keys, pld):
        return json.dumps({'results': 'error', 'data': 'Invalid json'})

    for item in pld.get('conditions'):
        if not _valid_json(req_keys2, item):
            return json.dumps({'results': 'error', 'data': 'Invalid json'})

    q = filter_query(Indicator.query.join(Event), pld.get('conditions'))
项目:webhook-shims    作者:vmw-loginsight    | 项目源码 | 文件源码
def pushbullet(ALERTID=None, TOKEN=None):
    """
    Send a `link` notification to all devices on pushbullet with a link back to the alert's query.
    If `TOKEN` is not passed, requires `PUSHBULLETTOKEN` defined, see https://www.pushbullet.com/#settings/account
    """
    if not PUSHBULLETURL:
        return ("PUSHBULLET parameter must be set, please edit the shim!", 500, None)
    if (TOKEN is not None):
        PUSHBULLETTOKEN = TOKEN
    if not PUSHBULLETTOKEN:
        return ("PUSHBULLETTOKEN parameter must be set, please edit the shim!", 500, None)

    a = parse(request)

    payload = {
        "body": a['info'],
        "title": a['AlertName'],
        "type": "link",
        "url": a['url'],
    }

    headers = {'Content-type': 'application/json', 'Access-Token': PUSHBULLETTOKEN}

    return callapi(PUSHBULLETURL, 'post', json.dumps(payload), headers)
项目:webhook-shims    作者:vmw-loginsight    | 项目源码 | 文件源码
def socialcast(ALERTID=None, NUMRESULTS=10, TEAM=None, I=None, X=None):
    """
    Create a post on Socialcast containing log events.
    Limited to `NUMRESULTS` (default 10) or 1MB.
    If `TEAM/I/X` is not passed, requires `SOCIALCASTURL` defined in the form `https://TEAM.socialcast.com/api/webhooks/IIIIIIIIII/XXXXXXXXXXX`
    For more information see https://socialcast.github.io/socialcast/apidoc/incoming_webhook.html
    """
    if X is not None:
       URL = 'https://' + TEAM + '.socialcast.com/api/webhooks/' + I + '/' + X
    if not SOCIALCASTURL or not 'socialcast.com/api/webhooks' in SOCIALCASTURL:
        return ("SOCIALCASTURL parameter must be set properly, please edit the shim!\n", 500, None)
    else:
        URL = SOCIALCASTURL
    # Socialcast cares about the order of the information in the body
    # json.dumps() does not preserve the order so just use raw get_data()
    return callapi(URL, 'post', request.get_data())
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def active_tasks():
    rpc = app.config['scheduler_rpc']
    taskdb = app.config['taskdb']
    project = request.args.get('project', "")
    limit = int(request.args.get('limit', 100))

    try:
        tasks = rpc.get_active_tasks(project, limit)
    except socket.error as e:
        app.logger.warning('connect to scheduler rpc error: %r', e)
        return '{}', 502, {'Content-Type': 'application/json'}

    result = []
    for updatetime, task in tasks:
        task['updatetime'] = updatetime
        task['updatetime_text'] = utils.format_date(updatetime)
        if 'status' in task:
            task['status_text'] = taskdb.status_to_string(task['status'])
        result.append(task)

    return json.dumps(result), 200, {'Content-Type': 'application/json'}
项目:Url    作者:beiruan    | 项目源码 | 文件源码
def counter():
    rpc = app.config['scheduler_rpc']
    if rpc is None:
        return json.dumps({})

    result = {}
    try:
        data = rpc.webui_update()
        for type, counters in iteritems(data['counter']):
            for project, counter in iteritems(counters):
                result.setdefault(project, {})[type] = counter
        for project, paused in iteritems(data['pause_status']):
            result.setdefault(project, {})['paused'] = paused
    except socket.error as e:
        app.logger.warning('connect to scheduler rpc error: %r', e)
        return json.dumps({}), 200, {'Content-Type': 'application/json'}

    return json.dumps(result), 200, {'Content-Type': 'application/json'}
项目:chaos-monkey-engine    作者:BBVA    | 项目源码 | 文件源码
def test_plan_add_valid_body(app, manager):
    url = url_for("plans.add_plan")

    # add a planner and an attack
    manager.attacks_store.add(attack1_module)
    manager.planners_store.add(planner1_module)

    with app.test_request_context(url):

        res = app.test_client().post(url_for("plans.add_plan"),
                                     content_type="application/json",
                                     data=json.dumps(valid_request_body))

        assert res.status_code == 200
        assert res.mimetype == "application/json"
        assert res.json == {"msg": "ok"}
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_empty_input(self, mocked_send_to_chat, mocked_celery_chain):
        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                    data=json.dumps(TelegramUpdates.EMPTY),
                                    follow_redirects=True,
                                    headers=TelegramUpdates.HEADERS)

        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for empty input')

        self.assertEqual({}, json.loads(response.data),
                         'Failed to return an empty JSON for empty input')

        mocked_send_to_chat.apply_async.assert_called_with(args=[{}])

        with self.assertRaises(AssertionError) as chain_err:
            mocked_celery_chain.assert_called()

        self.assertIn("Expected 'celery_chain' to have been called",
                      str(chain_err.exception))
        self.assertEqual(mocked_celery_chain.call_args_list, [])
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_bad_id_bad_text(self, mocked_send_to_chat, mocked_celery_chain):
        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                    data=json.dumps(TelegramUpdates.TEXT_BAD_ID_BAD_TEXT),
                                    follow_redirects=True,
                                    headers=TelegramUpdates.HEADERS)
        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for an Update with '
                        'bad reply_message_id and bad text length')
        self.assertEqual({}, json.loads(response.data),
                         'Failed to return an empty JSON for an Update with '
                         'bad reply_message_id and bad text length')

        mocked_send_to_chat.apply_async.assert_called_with(args=[{}])

        with self.assertRaises(AssertionError) as chain_err:
            mocked_celery_chain.assert_called()

        self.assertIn("Expected 'celery_chain' to have been called",
                      str(chain_err.exception))
        self.assertEqual(mocked_celery_chain.call_args_list, [])
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_ok_id_bad_text(self, mocked_send_to_chat, mocked_celery_chain):
        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                    data=json.dumps(TelegramUpdates.TEXT_OK_ID_BAD_TEXT),
                                    follow_redirects=True,
                                    headers=TelegramUpdates.HEADERS)
        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for an Update with '
                        'bad text length')
        self.assertEqual({}, json.loads(response.data),
                         'Failed to return an empty JSON for an Update with '
                         'bad text length')

        mocked_send_to_chat.apply_async.assert_called_with(args=[{}])

        with self.assertRaises(AssertionError) as chain_err:
            mocked_celery_chain.assert_called()

        self.assertIn("Expected 'celery_chain' to have been called",
                      str(chain_err.exception))
        self.assertEqual(mocked_celery_chain.call_args_list, [])
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_bad_id_ok_text(self, mocked_send_to_chat, mocked_celery_chain):
        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                    data=json.dumps(TelegramUpdates.TEXT_BAD_ID_OK_TEXT),
                                    follow_redirects=True,
                                    headers=TelegramUpdates.HEADERS)
        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for an Update with '
                        'bad reply_message_id')
        self.assertEqual({}, json.loads(response.data),
                         'Failed to return an empty JSON for an Update with '
                         'bad reply_message_id')

        mocked_send_to_chat.apply_async.assert_called_with(args=[{}])

        with self.assertRaises(AssertionError) as chain_err:
            mocked_celery_chain.assert_called()

        self.assertIn("Expected 'celery_chain' to have been called",
                      str(chain_err.exception))
        self.assertEqual(mocked_celery_chain.call_args_list, [])
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_malformed_Message(self, mocked_send_to_chat, mocked_celery_chain):
        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                    data=json.dumps(TelegramUpdates.TEXT_MALFORMED_NO_MESSAGE),
                                    follow_redirects=True,
                                    headers=TelegramUpdates.HEADERS)
        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for an Update with '
                        'a malformed Message')
        self.assertEqual({}, json.loads(response.data),
                         'Failed to return an empty JSON for an Update with '
                         'a malformed Message')

        mocked_send_to_chat.apply_async.assert_called_with(args=[{}])

        with self.assertRaises(AssertionError) as chain_err:
            mocked_celery_chain.assert_called()

        self.assertIn("Expected 'celery_chain' to have been called",
                      str(chain_err.exception))
        self.assertEqual(mocked_celery_chain.call_args_list, [])
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_update_id_already_used(self, mocked_send_to_chat, mocked_celery_chain):
        # we don't need to test celery tasks in the view
        # that's objective for a separate test suite
        mocked_send_to_chat.return_value = None
        mocked_celery_chain.return_value = None

        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                     data=json.dumps(TelegramUpdates.TEXT_SAME_UPDATE_ID),
                                     follow_redirects=True,
                                     headers=TelegramUpdates.HEADERS)
        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for an Update with '
                        'an ID already used')
        self.assertEqual({}, json.loads(response.data),
                         'Failed to return an empty JSON for an Update with '
                         'an ID already used')

        mocked_send_to_chat.apply_async.assert_called_with(args=[{}])

        with self.assertRaises(AssertionError) as chain_err:
            mocked_celery_chain.assert_called()

        self.assertIn("Expected 'celery_chain' to have been called",
                      str(chain_err.exception))
        self.assertEqual(mocked_celery_chain.call_args_list, [])
项目:PilosusBot    作者:pilosus    | 项目源码 | 文件源码
def test_handle_valid_input(self, mocked_send_to_chat, mocked_celery_chain):
        response = self.client.post(TelegramUpdates.URL_HANDLE_WEBHOOK,
                                    data=json.dumps(TelegramUpdates.TEXT_OK_ID_OK_TEXT),
                                    follow_redirects=True,
                                    headers=TelegramUpdates.HEADERS)
        self.assertTrue(response.status_code == 200,
                        'Failed to return status code 200 for a valid input')
        self.assertNotEqual({}, response.data,
                            'Failed to return a non-empty JSON for a valid input')
        self.assertEqual(TelegramUpdates.TEXT_OK_ID_OK_TEXT,
                         json.loads(response.data),
                         'Failed to return an Update itself for a valid input Update')
        # watch out! parse_update eliminates Updates with message_id already processed
        # so we have to use not parsed Update here
        mocked_update = {'chat_id': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['chat']['id'],
                         'reply_to_message_id': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['message_id'],
                         'text': TelegramUpdates.TEXT_OK_ID_OK_TEXT['message']['text']}
        mocked_celery_chain.assert_called_with(mocked_update)

        with self.assertRaises(AssertionError) as send_err:
            mocked_send_to_chat.assert_called()

        self.assertIn("Expected 'send_message_to_chat' to have been called",
                      str(send_err.exception))
        self.assertEqual(mocked_send_to_chat.call_args_list, [])
项目:craftbeerpi3    作者:Manuel83    | 项目源码 | 文件源码
def update(cls, **kwargs):
        cur = get_db().cursor()
        query = 'UPDATE %s SET %s WHERE %s = ?' % (
            cls.__table_name__,
            ', '.join("'%s' = ?" % str(x) for x in cls.__fields__),cls.__priamry_key__)

        data = ()
        for f in cls.__fields__:
            if f in cls.__json_fields__:
                data = data + (json.dumps(kwargs.get(f)),)
            else:
                data = data + (kwargs.get(f),)

        data = data + (kwargs.get(cls.__priamry_key__),)
        cur.execute(query, data)
        get_db().commit()
        return cls(kwargs)
项目:craftbeerpi3    作者:Manuel83    | 项目源码 | 文件源码
def git_status(self):
        repo = Repo('./')
        o = repo.remotes.origin
        o.fetch()
        # Tags
        tags = []
        for t in repo.tags:
            tags.append({"name": t.name, "commit": str(t.commit), "date": t.commit.committed_date,
                         "committer": t.commit.committer.name, "message": t.commit.message})
        try:
            branch_name = repo.active_branch.name
            # test1
        except:
            branch_name = None

        changes = []
        commits_behind = repo.iter_commits('master..origin/master')

        for c in list(commits_behind):
            changes.append({"committer": c.committer.name, "message": c.message})

        return json.dumps({"tags": tags, "headcommit": str(repo.head.commit), "branchname": branch_name,
                           "master": {"changes": changes}})
项目:craftbeerpi3    作者:Manuel83    | 项目源码 | 文件源码
def get(self):
        conn = None
        try:
            if not os.path.exists(self.api.app.config['UPLOAD_FOLDER'] + '/kbh.db'):
                self.api.notify(headline="File Not Found", message="Please upload a Kleiner Brauhelfer Database", type="danger")
                return ('', 404)

            conn = sqlite3.connect(self.api.app.config['UPLOAD_FOLDER'] + '/kbh.db')
            c = conn.cursor()
            c.execute('SELECT ID, Sudname, BierWurdeGebraut FROM Sud')
            data = c.fetchall()
            result = []
            for row in data:
                result.append({"id": row[0], "name": row[1], "brewed": row[2]})
            return json.dumps(result)
        except Exception as e:
            print e
            self.api.notify(headline="Failed to load KHB database", message="ERROR", type="danger")
            return ('', 500)
        finally:
            if conn:
                conn.close()
项目:craftbeerpi3    作者:Manuel83    | 项目源码 | 文件源码
def get_logs_as_json(self, t, id):
        data = request.json
        result = []
        if t == "s":
            name = cbpi.cache.get("sensors").get(id).name
            result.append({"name": name, "data": self.read_log_as_json("sensor", id)})

        if t == "k":
            kettle = cbpi.cache.get("kettle").get(id)
            result = map(self.convert_chart_data_to_json, cbpi.get_controller(kettle.logic).get("class").chart(kettle))

        if t == "f":
            fermenter = cbpi.cache.get("fermenter").get(id)
            result = map(self.convert_chart_data_to_json, cbpi.get_fermentation_controller(fermenter.logic).get("class").chart(fermenter))

        return json.dumps(result)
项目:perseids-manifold    作者:RDACollectionsWG    | 项目源码 | 文件源码
def test_db_create_service(self):
         with app.app_context():
             s_obj = self.mock.service()
             self.db.set_service(s_obj)
             self.assertDictEqual(self.db.get_service().dict(),s_obj.dict())

    # def test_db_access_created_service(self):
    #     with app.app_context():
    #         s_obj = self.mock.service()
    #         self.db.set_service(s_obj)
    #         t_obj = self.db.get_service()
    #         self.assertEqual(json.dumps(s_obj), json.dumps(t_obj))
    #
    # def test_db_overwrite_service(self):
    #     with app.app_context():
    #         s_obj = self.mock.service()
    #         t_obj = self.mock.service()
    #         t_obj.providesCollectionPids = not s_obj.providesCollectionPids
    #         self.db.set_service(s_obj)
    #         self.db.set_service(t_obj)
    #         self.assertNotEqual(json.dumps(s_obj), json.dumps(self.db.get_service()))
项目:yarnitor    作者:maxpoint    | 项目源码 | 文件源码
def spark_application(app_id):
    """Mock of the Spark jobs REST resource."""
    if 'last' in request.args:
        return jsonify(redis.get(request.base_url))

    d = st.fixed_dictionaries({
        'jobId': st.integers(0),
        'name': st.text(),
        'submissionTime': st.text(),
        'completionTime': st.text(),
        'stageIds': st.lists(st.integers(0), average_size=3),
        'status': st.sampled_from(['SUCCEEDED', 'RUNNING', 'FAILED']),
        'numTasks': st.integers(0),
        'numActiveTasks': st.integers(0),
        'numCompletedTasks': st.integers(0),
        'numSkippedTasks': st.integers(0),
        'numFailedTasks': st.integers(0),
        'numActiveStages': st.integers(0),
        'numCompletedStages': st.integers(0),
        'numSkippedStages': st.integers(0),
        'numFailedStages': st.integers(0),
    })
    result = json.dumps(st.lists(d, average_size=3).example())
    redis.set(request.base_url, result)
    return jsonify(result)
项目:api    作者:Yiave    | 项目源码 | 文件源码
def get_promotions():
    page = request.args.get('page', 1, type=int)

    pagination = Promotion.query.paginate(
        page, per_page=current_app.config['YIAVE_PROMOTIONS_PER_PAGE'],
        error_out=False
    )
    promotions = pagination.items
    prev = None
    if pagination.has_prev:
        prev = url_for('api.get_promotions', page=page - 1, _external=True)
    next = None
    if pagination.has_next:
        next = url_for('api.get_promotions', page=page + 1, _external=True)

    data = [p.to_json() for p in promotions]
    response = Response(json.dumps(data))
    response.headers['Content-Type'] = "application/json"
    response.headers['X-Page-Pre'] = prev
    response.headers['X-Page-Next'] = next
    response.headers['X-Total-Count'] = pagination.total
    return response
项目:api    作者:Yiave    | 项目源码 | 文件源码
def get_customers():
    page = request.args.get('page', 1, type=int)

    pagination = Customer.query.paginate(
        page, per_page=current_app.config['YIAVE_BUSINESSES_PER_PAGE'],
        error_out=False
    )
    customers = pagination.items
    prev = None
    if pagination.has_prev:
        prev = url_for('customer.get_customers', page=page - 1, _external=True)
    next = None
    if pagination.has_next:
        next = url_for('customer.get_customers', page=page + 1, _external=True)

    data = [b.to_json() for b in customers]
    response = Response(json.dumps(data))
    response.headers['Content-Type'] = "application/json"
    response.headers['X-Page-Pre'] = prev
    response.headers['X-Page-Next'] = next
    response.headers['X-Total-Count'] = pagination.total

    return response
项目:api    作者:Yiave    | 项目源码 | 文件源码
def get_businesses():
    page = request.args.get('page', 1, type=int)

    pagination = Business.query.paginate(
        page, per_page=current_app.config['YIAVE_BUSINESSES_PER_PAGE'],
        error_out=False
    )
    businesses = pagination.items
    prev = None
    if pagination.has_prev:
        prev = url_for('api.get_businesses', page=page - 1, _external=True)
    next = None
    if pagination.has_next:
        next = url_for('api.get_businesses', page=page + 1, _external=True)

    data = [b.to_json() for b in businesses]
    response = Response(json.dumps(data))
    response.headers['Content-Type'] = "application/json"
    response.headers['X-Page-Pre'] = prev
    response.headers['X-Page-Next'] = next
    response.headers['X-Total-Count'] = pagination.total
    return response
项目:synergy_website    作者:alfredojf    | 项目源码 | 文件源码
def VPagina():
    #GET parameter
    idUsuario = request.args['idUsuario']
    res = {}
    if "actor" in session:
        res['actor']=session['actor']
        res['usuario'] = {'nombre': session['nombre_usuario']}
    #Action code goes here, res should be a JSON structure
    pagina_existente = db.session.query(Pagina).filter_by(id_usuario=idUsuario).first()
    usuario = {"nombre":idUsuario}
    if pagina_existente is not None:
        res = {"fPagina": {"titulo":pagina_existente.titulo, "contenido":pagina_existente.contenido}}
    res["usuario"]=usuario
    res['idUsuario'] = idUsuario
    #Action code ends here
    return json.dumps(res)





#Use case code starts here


#Use case code ends here
项目:synergy_website    作者:alfredojf    | 项目源码 | 文件源码
def AIdentificar():
    #POST/PUT parameters
    params = request.get_json()
    results = [{'label':'/VPrincipal', "actor":"duenoProducto"}, {'label':'/VLogin', 'msg':['Datos de identificación incorrectos']}, ]
    res = results[1]
    #Action code goes here, res should be a list with a label and a message

    for nombre_usuario, clave in db.session.query(Usuario.nombre_usuario, Usuario.clave) :
        if nombre_usuario == params['usuario'] and clave == params['clave'] :
            res = results[0]
            session['nombre_usuario']=params['usuario']

            session['idPaginaSitio'] = " "
            res['idPaginaSitio'] = " "
            break

    #Action code ends here
    if "actor" in res:
        if res['actor'] is None:
            session.pop("actor", None)
        else:
            session['actor'] = res['actor']
    return json.dumps(res)
项目:synergy_website    作者:alfredojf    | 项目源码 | 文件源码
def VSecundaria():
    res = {}
    if "actor" in session:
        res['actor']=session['actor']
    #Action code goes here, res should be a JSON structure
    idPagina = request.args['idPagina']

    pag = Sitio.query.filter_by(id=idPagina).first()
    if pag is None:
        res['pag'] = {
            'hilo':-1, 'titulo': 'Lo sentimos',
            'contenido': 'La página que busca no existe.', 'imagenes':''}
    else:
        res['pag'] = {
            'hilo': pag.hilo.id, 'titulo': pag.titulo, 
            'contenido': pag.contenido, 'imagenes': pag.imagenes}
    #Action code ends here

    if 'nombre_usuario' in session:
        res['idUsuario'] = session['nombre_usuario']
    return json.dumps(res)
项目:horse    作者:pragmaticcoders    | 项目源码 | 文件源码
def test_creating_movie(client):
    response = client.post('/movies', data=json.dumps({
        'title': 'Home Alone',
    }), content_type='application/json')
    assert response.status_code == 201

    response = client.get('/movies')
    assert response.status_code == 200

    movies = json.loads(response.data)['items']
    movie = movies[0]

    assert movie['title'] == 'Home Alone'
    assert movie['likes'] == 0
项目:horse    作者:pragmaticcoders    | 项目源码 | 文件源码
def test_creating_movie_400_error(client):
    response = client.post('/movies', data=json.dumps({
        'wrong_field': 'Home Alone',
    }), content_type='application/json')
    assert response.status_code == 400
项目:horse    作者:pragmaticcoders    | 项目源码 | 文件源码
def test_populate_movies(client, populate_feed, movies_repo):
    response = client.post(
        '/populate', data=json.dumps(populate_feed),
        content_type='application/json'
    )
    assert response.status_code == 201

    movies = movies_repo.all()
    titles = [m.title for m in movies]

    assert 'Home alone' in titles
    assert 'Shining' in titles
项目:horse    作者:pragmaticcoders    | 项目源码 | 文件源码
def test_populate_users(client, populate_feed, users_repo):
    response = client.post(
        '/populate', data=json.dumps(populate_feed),
        content_type='application/json'
    )
    assert response.status_code == 201

    users = users_repo.all()
    user_names = [u.name for u in users]

    assert 'Andrew' in user_names
    assert 'John' in user_names
项目:horse    作者:pragmaticcoders    | 项目源码 | 文件源码
def test_populate_follows(client, populate_feed, users_repo):
    response = client.post(
        '/populate', data=json.dumps(populate_feed),
        content_type='application/json'
    )
    assert response.status_code == 201

    user = users_repo.get_by_name('John')
    followed_names = [followed.name for followed in user.get_followed_users()]
    assert 'Andrew' in followed_names
项目:horse    作者:pragmaticcoders    | 项目源码 | 文件源码
def test_populate_400(client):
    response = client.post(
        '/populate', data=json.dumps({}), content_type='application/json'
    )
    assert response.status_code == 400
项目:horse    作者:pragmaticcoders    | 项目源码 | 文件源码
def test_user_registration(client):
    response = client.post('/users', data=json.dumps({
        'name': 'Kevin',
    }), content_type='application/json')
    assert response.status_code == 201

    response = client.get('/users')
    assert response.status_code == 200

    movies = json.loads(response.data)['items']
    names = [m['name'] for m in movies]

    assert 'Kevin' in names