Python simplejson 模块,dumps() 实例源码

我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用simplejson.dumps()

项目:gee-bridge    作者:francbartoli    | 项目源码 | 文件源码
def DumpReadable(self, *args):
        """
        DumpReadable(Feature self)

        void
        OGR_F_DumpReadable(OGRFeatureH hFeat, FILE *fpOut)

        Dump this feature in a human readable form.

        This dumps the attributes, and geometry; however, it doesn't
        definition information (other than field types and names), nor does it
        report the geometry spatial reference system.

        This function is the same as the C++ method
        OGRFeature::DumpReadable().

        Parameters:
        -----------

        hFeat:  handle to the feature to dump.

        fpOut:  the stream to write to, such as strout. 
        """
        return _ogr.Feature_DumpReadable(self, *args)
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def _register_avro_schema(
        self,
        namespace,
        source,
        schema_json=None,
        **overrides
    ):
        schema_json = schema_json or {
            'type': 'record',
            'name': source,
            'namespace': namespace,
            'doc': 'test',
            'fields': [{'type': 'int', 'doc': 'test', 'name': 'foo'}]
        }
        params = {
            'namespace': namespace,
            'source': source,
            'source_owner_email': self.source_owner_email,
            'schema': simplejson.dumps(schema_json),
            'contains_pii': False
        }
        if overrides:
            params.update(**overrides)
        return self._get_client().schemas.register_schema(body=params).result()
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def pk_topic_resp(self, yelp_namespace_name, usr_src_name):
        pk_schema_json = {
            'type': 'record',
            'name': usr_src_name,
            'namespace': yelp_namespace_name,
            'doc': 'test',
            'fields': [
                {'type': 'int', 'doc': 'test', 'name': 'id', 'pkey': 1},
                {'type': 'int', 'doc': 'test', 'name': 'data'}
            ],
            'pkey': ['id']
        }
        return self._register_avro_schema(
            yelp_namespace_name,
            usr_src_name,
            schema=simplejson.dumps(pk_schema_json)
        ).topic
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def _register_avro_schema(self, namespace, source, two_fields):
        fields = [{'type': 'int', 'doc': 'test', 'name': 'foo'}]
        if two_fields:
            fields.append({'type': 'int', 'doc': 'test', 'name': 'bar'})
        schema_json = {
            'type': 'record',
            'name': source,
            'namespace': namespace,
            'doc': 'test',
            'fields': fields
        }
        return get_schematizer().register_schema(
            namespace=namespace,
            source=source,
            schema_str=simplejson.dumps(schema_json),
            source_owner_email=self.source_owner_email,
            contains_pii=False,
            base_schema_id=None
        )
项目:data_pipeline    作者:Yelp    | 项目源码 | 文件源码
def _format_message(self, message):
        result_dict = self._get_message_result_dict(message)
        if self.options.iso_time:
            self._iso_time(result_dict)
        if self.options.json:
            return simplejson.dumps(
                obj=result_dict,
                sort_keys=True,

                # Objects can use _asdict() to be encoded as JSON objects
                namedtuple_as_object=True,

                # Use an object's __repr__() to return a serializable version
                # of an object, rather than raising a TypeError, if the object
                # does not define an _asdict() method
                default=lambda x: repr(x)
            )
        else:
            return result_dict
项目:sat6_scripts    作者:RedHatSatellite    | 项目源码 | 文件源码
def get_product(org_id, cp_id):
    """
    Find and return the label of the given product ID
    """
    prod_list = helpers.get_p_json(
        helpers.KATELLO_API + "/products/", \
                json.dumps(
                        {
                           "organization_id": org_id,
                           "per_page": '1000',
                        }
                ))

    for prod in prod_list['results']:
        if prod['cp_id'] == cp_id:
            prodlabel = prod['label']
            return prodlabel
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def test_separators(self):
        lst = [1,2,3,4]
        expect = '[\n1,\n2,\n3,\n4\n]'
        expect_spaces = '[\n1, \n2, \n3, \n4\n]'
        # Ensure that separators still works
        self.assertEqual(
            expect_spaces,
            json.dumps(lst, indent=0, separators=(', ', ': ')))
        # Force the new defaults
        self.assertEqual(
            expect,
            json.dumps(lst, indent=0, separators=(',', ': ')))
        # Added in 2.1.4
        self.assertEqual(
            expect,
            json.dumps(lst, indent=0))
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def test_iterable(self):
        for l in ([], [1], [1, 2], [1, 2, 3]):
            for opts in [{}, {'indent': 2}]:
                for dumps in (json.dumps, iter_dumps, sio_dump):
                    expect = dumps(l, **opts)
                    default_expect = dumps(sum(l), **opts)
                    # Default is False
                    self.assertRaises(TypeError, dumps, iter(l), **opts)
                    self.assertRaises(TypeError, dumps, iter(l), iterable_as_array=False, **opts)
                    self.assertEqual(expect, dumps(iter(l), iterable_as_array=True, **opts))
                    # Ensure that the "default" gets called
                    self.assertEqual(default_expect, dumps(iter(l), default=sum, **opts))
                    self.assertEqual(default_expect, dumps(iter(l), iterable_as_array=False, default=sum, **opts))
                    # Ensure that the "default" does not get called
                    self.assertEqual(
                        expect,
                        dumps(iter(l), iterable_as_array=True, default=sum, **opts))
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def test_tuple_array_dump(self):
        t = (1, 2, 3)
        expect = json.dumps(list(t))
        # Default is True
        sio = StringIO()
        json.dump(t, sio)
        self.assertEqual(expect, sio.getvalue())
        sio = StringIO()
        json.dump(t, sio, tuple_as_array=True)
        self.assertEqual(expect, sio.getvalue())
        self.assertRaises(TypeError, json.dump, t, StringIO(),
                          tuple_as_array=False)
        # Ensure that the "default" does not get called
        sio = StringIO()
        json.dump(t, sio, default=repr)
        self.assertEqual(expect, sio.getvalue())
        sio = StringIO()
        json.dump(t, sio, tuple_as_array=True, default=repr)
        self.assertEqual(expect, sio.getvalue())
        # Ensure that the "default" gets called
        sio = StringIO()
        json.dump(t, sio, tuple_as_array=False, default=repr)
        self.assertEqual(
            json.dumps(repr(t)),
            sio.getvalue())
项目:oscars2016    作者:0x0ece    | 项目源码 | 文件源码
def test_listrecursion(self):
        x = []
        x.append(x)
        try:
            json.dumps(x)
        except ValueError:
            pass
        else:
            self.fail("didn't raise ValueError on list recursion")
        x = []
        y = [x]
        x.append(y)
        try:
            json.dumps(x)
        except ValueError:
            pass
        else:
            self.fail("didn't raise ValueError on alternating list recursion")
        y = []
        x = [y, y]
        # ensure that the marker is cleared
        json.dumps(x)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def run_command(session, args):
    cmd = json.loads(args.get('cmd'))
    if cmd and cmd[0] not in ALLOWED_CMDS:
        msg = _("Dom0 execution of '%s' is not permitted") % cmd[0]
        raise PluginError(msg)
    returncode, out, err = _run_command(
        cmd, json.loads(args.get('cmd_input', 'null')))
    if not err:
        err = ""
    if not out:
        out = ""
    # This runs in Dom0, will return to neutron-ovs-agent in compute node
    result = {'returncode': returncode,
              'out': out,
              'err': err}
    return json.dumps(result)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def key_init(self, arg_dict):
    """Handles the Diffie-Hellman key exchange with the agent to

    establish the shared secret key used to encrypt/decrypt sensitive
    info to be passed, such as passwords. Returns the shared
    secret key value.
    """
    timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT))
    # WARNING: Some older Windows agents will crash if the public key isn't
    # a string
    pub = arg_dict["pub"]
    arg_dict["value"] = json.dumps({"name": "keyinit", "value": pub})
    request_id = arg_dict["id"]
    arg_dict["path"] = "data/host/%s" % request_id
    xenstore.write_record(self, arg_dict)
    try:
        resp = _wait_for_agent(self, request_id, arg_dict, timeout)
    except TimeoutError, e:  # noqa
        raise PluginError(e)
    return resp
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def password(self, arg_dict):
    """Writes a request to xenstore that tells the agent to set

    the root password for the given VM. The password should be
    encrypted using the shared secret key that was returned by a
    previous call to key_init. The encrypted password value should
    be passed as the value for the 'enc_pass' key in arg_dict.
    """
    timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT))
    enc_pass = arg_dict["enc_pass"]
    arg_dict["value"] = json.dumps({"name": "password", "value": enc_pass})
    request_id = arg_dict["id"]
    arg_dict["path"] = "data/host/%s" % request_id
    xenstore.write_record(self, arg_dict)
    try:
        resp = _wait_for_agent(self, request_id, arg_dict, timeout)
    except TimeoutError, e:  # noqa
        raise PluginError(e)
    return resp
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def agent_update(self, arg_dict):
    """Expects an URL and md5sum of the contents

     Then directs the agent to update itself.
    """
    timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT))
    request_id = arg_dict["id"]
    url = arg_dict["url"]
    md5sum = arg_dict["md5sum"]
    arg_dict["value"] = json.dumps({"name": "agentupdate",
                                    "value": "%s,%s" % (url, md5sum)})
    arg_dict["path"] = "data/host/%s" % request_id
    xenstore.write_record(self, arg_dict)
    try:
        resp = _wait_for_agent(self, request_id, arg_dict, timeout)
    except TimeoutError, e:  # noqa
        raise PluginError(e)
    return resp
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def _get_agent_features(self, arg_dict):
    """Return an array of features that an agent supports."""
    timeout = int(arg_dict.pop('timeout', DEFAULT_TIMEOUT))
    tmp_id = commands.getoutput("uuidgen")
    dct = {}
    dct.update(arg_dict)
    dct["value"] = json.dumps({"name": "features", "value": ""})
    dct["path"] = "data/host/%s" % tmp_id
    xenstore.write_record(self, dct)
    try:
        resp = _wait_for_agent(self, tmp_id, dct, timeout)
    except TimeoutError, e:  # noqa
        raise PluginError(e)
    response = json.loads(resp)
    if response['returncode'] != 0:
        return response["message"].split(",")
    else:
        return {}
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_iptables_config(self, mock_json_loads):
        self.mock_patch_object(self.pluginlib,
                               'exists',
                               'fake_cmd_args')
        self.mock_patch_object(self.pluginlib,
                               'optional',
                               'fake_cmd_pro_input')
        self.host._run_command.return_value = 'fake_run_cmd_resule'
        mock_json_loads.return_value = ['iptables-save']
        expected = json.dumps(dict(out='fake_run_cmd_resule', err=''))

        ret_str = self.host.iptables_config('fake_session', 'fake_args')
        self.pluginlib.exists.assert_called_once()
        self.pluginlib.optional.assert_called_once()
        mock_json_loads.assert_called_with('fake_cmd_args')
        self.assertEqual(ret_str, expected)
        self.host._run_command.assert_called_with(
            map(str, ['iptables-save']), 'fake_cmd_pro_input')
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_version_timout_exception(self):
        tmp_arg_dict = FAKE_ARG_DICT
        tmp_arg_dict["value"] = json.dumps({"name": "version",
                                            "value": "agent"})
        request_id = tmp_arg_dict["id"]
        tmp_arg_dict["path"] = "data/host/%s" % request_id
        side_effects = [FakeTimeoutException('TIME_OUT')]
        self.agent.PluginError = FakeTimeoutException
        self.agent._wait_for_agent.side_effect = side_effects

        self.assertRaises(self.agent.PluginError,
                          self.agent.version,
                          self.agent, FAKE_ARG_DICT)
        self.agent._wait_for_agent.assert_called_once()
        self.agent.xenstore.write_record.assert_called_with(self.agent,
                                                            tmp_arg_dict)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_password_timout_exception(self):
        tmp_arg_dict = FAKE_ARG_DICT
        enc_pass = tmp_arg_dict["enc_pass"]
        tmp_arg_dict["value"] = json.dumps({"name": "password",
                                            "value": enc_pass})
        request_id = tmp_arg_dict["id"]
        tmp_arg_dict["path"] = "data/host/%s" % request_id
        side_effects = [FakeTimeoutException('TIME_OUT')]
        self.agent.PluginError = FakeTimeoutException
        self.agent._wait_for_agent.side_effect = side_effects

        self.assertRaises(self.agent.PluginError,
                          self.agent.password,
                          self.agent, FAKE_ARG_DICT)
        self.agent._wait_for_agent.assert_called_once()
        self.agent.xenstore.write_record.assert_called_with(self.agent,
                                                            tmp_arg_dict)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_reset_network_timout_exception(self):
        tmp_arg_dict = FAKE_ARG_DICT
        tmp_arg_dict['value'] = json.dumps({'name': 'resetnetwork',
                                            'value': ''})
        request_id = tmp_arg_dict['id']
        tmp_arg_dict['path'] = "data/host/%s" % request_id
        side_effects = [FakeTimeoutException('TIME_OUT')]
        self.agent.PluginError = FakeTimeoutException
        self.agent._wait_for_agent.side_effect = side_effects

        self.assertRaises(self.agent.PluginError,
                          self.agent.resetnetwork,
                          self.agent, FAKE_ARG_DICT)
        self.agent._wait_for_agent.assert_called_once()
        self.agent.xenstore.write_record.assert_called_with(self.agent,
                                                            tmp_arg_dict)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_inject_file_with_new_agent(self):
        tmp_arg_dict = FAKE_ARG_DICT
        request_id = tmp_arg_dict["id"]
        b64_path = tmp_arg_dict["b64_path"]
        b64_file = tmp_arg_dict["b64_contents"]
        self.mock_patch_object(self.agent,
                               '_get_agent_features',
                               'file_inject')
        tmp_arg_dict["value"] = json.dumps({"name": "file_inject",
                                            "value": {"b64_path": b64_path,
                                                      "b64_file": b64_file}})
        tmp_arg_dict["path"] = "data/host/%s" % request_id
        self.agent.inject_file(self.agent, FAKE_ARG_DICT)

        self.agent._wait_for_agent.assert_called_once()
        self.agent.xenstore.write_record.assert_called_with(self.agent,
                                                            tmp_arg_dict)
        self.agent._get_agent_features.assert_called_once()
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_inject_file_Timeout_exception(self):
        tmp_arg_dict = FAKE_ARG_DICT
        request_id = tmp_arg_dict["id"]
        b64_path = tmp_arg_dict["b64_path"]
        b64_file = tmp_arg_dict["b64_contents"]
        tmp_arg_dict["value"] = json.dumps({"name": "file_inject",
                                            "value": {"b64_path": b64_path,
                                                      "b64_file": b64_file}})
        tmp_arg_dict["path"] = "data/host/%s" % request_id
        self.mock_patch_object(self.agent,
                               '_get_agent_features',
                               'file_inject')
        side_effects = [FakeTimeoutException('TIME_OUT')]
        self.agent.PluginError = FakeTimeoutException
        self.agent._wait_for_agent.side_effect = side_effects

        self.assertRaises(self.agent.PluginError,
                          self.agent.inject_file,
                          self.agent, FAKE_ARG_DICT)
        self.agent._wait_for_agent.assert_called_once()
        self.agent.xenstore.write_record.assert_called_with(self.agent,
                                                            tmp_arg_dict)
项目:os-xenapi    作者:openstack    | 项目源码 | 文件源码
def test_agent_update_timout_exception(self):
        tmp_arg_dict = FAKE_ARG_DICT
        request_id = tmp_arg_dict["id"]
        url = tmp_arg_dict["url"]
        md5sum = tmp_arg_dict["md5sum"]
        tmp_arg_dict["value"] = json.dumps({"name": "agentupdate",
                                            "value": "%s,%s" % (url, md5sum)})
        tmp_arg_dict["path"] = "data/host/%s" % request_id
        side_effects = [FakeTimeoutException('TIME_OUT')]
        self.agent.PluginError = FakeTimeoutException
        self.agent._wait_for_agent.side_effect = side_effects

        self.assertRaises(self.agent.PluginError,
                          self.agent.agent_update,
                          self.agent, FAKE_ARG_DICT)
        self.agent._wait_for_agent.assert_called_once()
        self.agent.xenstore.write_record.assert_called_once()
项目:Flask_Blog    作者:sugarguo    | 项目源码 | 文件源码
def generate_adapter(adapter, name='url_for', map_name='url_map'):
    """Generates the url building function for a map."""
    values = {
        u'server_name':     dumps(adapter.server_name),
        u'script_name':     dumps(adapter.script_name),
        u'subdomain':       dumps(adapter.subdomain),
        u'url_scheme':      dumps(adapter.url_scheme),
        u'name':            name,
        u'map_name':        map_name
    }
    return u'''\
var %(name)s = %(map_name)s(
    %(server_name)s,
    %(script_name)s,
    %(subdomain)s,
    %(url_scheme)s
);''' % values
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def writerow(self, row):
        """Write row."""
        line = []
        for s in row:
            if (type(s) == dict):
                line.append(json.dumps(s))
            else:
                line.append(unicode(s).encode("utf-8"))
        self.writer.writerow(line)
        # Fetch UTF-8 output from the queue ...
        data = self.queue.getvalue()
        data = data.decode("utf-8")
        # ... and reencode it into the target encoding
        data = self.encoder.encode(data)
        # write to the target stream
        self.stream.write(data)
        # empty queue
        self.queue.truncate(0)
项目:FRG-Crowdsourcing    作者:97amarnathk    | 项目源码 | 文件源码
def get_disqus_sso_payload(user):
    """Return remote_auth_s3 and api_key for user."""
    DISQUS_PUBLIC_KEY = current_app.config.get('DISQUS_PUBLIC_KEY')
    DISQUS_SECRET_KEY = current_app.config.get('DISQUS_SECRET_KEY')
    if DISQUS_PUBLIC_KEY and DISQUS_SECRET_KEY:
        if user:
            data = simplejson.dumps({
                'id': user.id,
                'username': user.name,
                'email': user.email_addr,
            })
        else:
            data = simplejson.dumps({})
        # encode the data to base64
        message = base64.b64encode(data)
        # generate a timestamp for signing the message
        timestamp = int(time.time())
        # generate our hmac signature
        sig = hmac.HMAC(DISQUS_SECRET_KEY, '%s %s' % (message, timestamp),
                        hashlib.sha1).hexdigest()

        return message, timestamp, sig, DISQUS_PUBLIC_KEY
    else:
        return None, None, None, None
项目:swjtu-pyscraper    作者:Desgard    | 项目源码 | 文件源码
def generate_adapter(adapter, name='url_for', map_name='url_map'):
    """Generates the url building function for a map."""
    values = {
        u'server_name':     dumps(adapter.server_name),
        u'script_name':     dumps(adapter.script_name),
        u'subdomain':       dumps(adapter.subdomain),
        u'url_scheme':      dumps(adapter.url_scheme),
        u'name':            name,
        u'map_name':        map_name
    }
    return u'''\
var %(name)s = %(map_name)s(
    %(server_name)s,
    %(script_name)s,
    %(subdomain)s,
    %(url_scheme)s
);''' % values
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_rpc_call_with_positional_parameters(http_client, base_url):
    base_url += "/api"
    body = json.dumps({"jsonrpc": "2.0", "method": "subtract", "params": [42, 23], "id": 1})
    result = yield http_client.fetch(base_url, method="POST", body=body,
                                     headers={"content-type": "application/json"})

    assert result.code == 200
    response_body = json.loads(result.body)
    assert response_body["jsonrpc"] == "2.0"
    assert response_body["result"] == 19
    assert response_body["id"] == 1

    body = json.dumps({"jsonrpc": "2.0", "method": "subtract", "params": [23, 42], "id": 2})
    result = yield http_client.fetch(base_url, method="POST", body=body,
                                     headers={"content-type": "application/json"})

    assert result.code == 200
    response_body = json.loads(result.body)
    assert response_body["jsonrpc"] == "2.0"
    assert response_body["result"] == -19
    assert response_body["id"] == 2
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_rpc_call_with_named_parameters(http_client, base_url):
    base_url += "/api"
    body = json.dumps(
        {"jsonrpc": "2.0", "method": "subtract", "params": {"a": 42, "b": 23}, "id": 3})
    result = yield http_client.fetch(base_url, method="POST", body=body,
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    response_body = json.loads(result.body)
    assert response_body["jsonrpc"] == "2.0"
    assert response_body["result"] == 19
    assert response_body["id"] == 3

    body = json.dumps(
        {"jsonrpc": "2.0", "method": "subtract", "params": {"b": 42, "a": 23}, "id": 4})
    result = yield http_client.fetch(base_url, method="POST", body=body,
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    response_body = json.loads(result.body)
    assert response_body["jsonrpc"] == "2.0"
    assert response_body["result"] == -19
    assert response_body["id"] == 4
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_simple_call_no_parameters(http_client, base_url):
    base_url += "/api"
    body = {
        "id": 1,
        "method": "say_hello",
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    resp_body = json.loads(result.body)

    assert resp_body["id"] == 1
    assert resp_body["error"] is None
    assert resp_body["result"] == "hello"
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_simple_call_keyword_arguments(http_client, base_url):
    base_url += "/api"
    body = {
        "id": 1,
        "method": "subtract",
        "params": {"b": 20, "a": 10},
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    resp_body = json.loads(result.body)

    assert resp_body["id"] == 1
    assert resp_body["error"] is None
    assert resp_body["result"] == -10
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_simple_call_variable_length_positional_arguments(http_client, base_url):
    base_url += "/api"
    body = {
        "id": 1,
        "method": "sum",
        "params": list(range(0, 100)),
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    resp_body = json.loads(result.body)

    assert resp_body["id"] == 1
    assert resp_body["error"] is None
    assert resp_body["result"] == 4950
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_simple_call_raise_exception(http_client, base_url):
    base_url += "/api"
    body = {
        "id": 1,
        "method": "divide",
        "params": {"a": 10, "b": 0},
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    resp_body = json.loads(result.body)

    assert resp_body["id"] == 1
    assert resp_body["result"] is None
    assert resp_body["error"] is not None
    assert resp_body["error"]["code"] == -32603
    assert resp_body["error"]["message"] == "Internal error"
    assert resp_body["error"]["data"]["class"] == "ZeroDivisionError"
    assert resp_body["error"]["data"]["info"] == "division by zero"
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_simple_call_wrong_arguments_too_few_positional_arguments(http_client, base_url):
    base_url += "/api"
    body = {
        "id": 1,
        "method": "subtract",
        "params": [10],
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    resp_body = json.loads(result.body)
    print(resp_body)
    assert resp_body["id"] == 1
    assert resp_body["result"] is None
    assert resp_body["error"] is not None
    assert resp_body["error"]["code"] == -32602
    assert resp_body["error"]["message"] == "Invalid params"
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_simple_call_wrong_arguments_too_many_positional_arguments(http_client, base_url):
    base_url += "/api"
    body = {
        "id": 1,
        "method": "subtract",
        "params": [10, 20, 30],
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    resp_body = json.loads(result.body)
    print(resp_body)
    assert resp_body["id"] == 1
    assert resp_body["result"] is None
    assert resp_body["error"] is not None
    assert resp_body["error"]["code"] == -32602
    assert resp_body["error"]["message"] == "Invalid params"
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_simple_call_wrong_arguments_too_many_keyword_arguments(http_client, base_url):
    base_url += "/api"
    body = {
        "id": 1,
        "method": "subtract",
        "params": {"a": 10, "b": 20, "c": 30},
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    resp_body = json.loads(result.body)
    print(resp_body)
    assert resp_body["id"] == 1
    assert resp_body["result"] is None
    assert resp_body["error"] is not None
    assert resp_body["error"]["code"] == -32602
    assert resp_body["error"]["message"] == "Invalid params"
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_simple_call_wrong_arguments_no_arguments(http_client, base_url):
    base_url += "/api"
    body = {
        "id": 1,
        "method": "subtract",
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    resp_body = json.loads(result.body)
    print(resp_body)
    assert resp_body["id"] == 1
    assert resp_body["result"] is None
    assert resp_body["error"] is not None
    assert resp_body["error"]["code"] == -32602
    assert resp_body["error"]["message"] == "Invalid params"
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_notification_valid(http_client, base_url):
    base_url += "/api"
    body = {
        "method": "subtract",
        "params": [20, 10],
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    resp_body = json.loads(result.body)
    print(resp_body)
    assert "id" not in resp_body
    assert resp_body["result"] is None
    assert resp_body["error"] is None
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_notification_method_not_found(http_client, base_url):
    base_url += "/api"
    body = {
        "method": "foobar",
        "params": [20, 10],
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    resp_body = json.loads(result.body)
    print(resp_body)
    assert "id" not in resp_body
    assert resp_body["result"] is None
    assert resp_body["error"] is None
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_notification_invalid_params(http_client, base_url):
    base_url += "/api"
    body = {
        "method": "divide",
        "params": [20, 10, 30],
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json"})
    assert result.code == 200
    resp_body = json.loads(result.body)
    print(resp_body)
    assert "id" not in resp_body
    assert resp_body["result"] is None
    assert resp_body["error"] is None
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_private_call_wrong_token(http_client, base_url):
    base_url += "/api"
    body = {
        "id": 3,
        "method": "private_sum",
        "params": [13, -13],
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json",
                                              "X-Testing-Token": "bad_token"})
    assert result.code == 200
    resp_body = json.loads(result.body)
    print(resp_body)
    assert resp_body["id"] == 3
    assert resp_body["result"] is None
    assert resp_body["error"] is not None
    assert resp_body["error"]["code"] == -32001
    assert resp_body["error"]["message"] == "Access denied"
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_private_call_wrong_token_and_wrong_parameters(http_client, base_url):
    # without a valid token, somebody should not be able to get information
    # about certain methods, such as the parameters
    base_url += "/api"
    body = {
        "id": 3,
        "method": "private_sum",
        "params": [13],
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json",
                                              "X-Testing-Token": "bad_token"})
    assert result.code == 200
    resp_body = json.loads(result.body)
    print(resp_body)
    assert resp_body["id"] == 3
    assert resp_body["result"] is None
    assert resp_body["error"] is not None
    assert resp_body["error"]["code"] == -32001
    assert resp_body["error"]["message"] == "Access denied"
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_private_call_ok_token(http_client, base_url):
    # without a valid token, somebody should not be able to get information
    # about certain methods, such as the parameters
    base_url += "/api"
    body = {
        "id": 3,
        "method": "private_sum",
        "params": [13, -13],
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json",
                                              "X-Testing-Token": "testing_token"})
    assert result.code == 200
    resp_body = json.loads(result.body)
    print(resp_body)
    assert resp_body["id"] == 3
    assert resp_body["result"] == 0
    assert resp_body["error"] is None
    assert resp_body["jsonrpc"] == "2.0"
项目:gemstone    作者:vladcalin    | 项目源码 | 文件源码
def test_private_call_ok_token_wrong_parameters(http_client, base_url):
    # without a valid token, somebody should not be able to get information
    # about certain methods, such as the parameters
    base_url += "/api"
    body = {
        "id": 3,
        "method": "private_sum",
        "params": [13],
        "jsonrpc": "2.0"
    }

    result = yield http_client.fetch(base_url, method="POST", body=json.dumps(body),
                                     headers={"content-type": "application/json",
                                              "X-Testing-Token": "testing_token"})
    assert result.code == 200
    resp_body = json.loads(result.body)
    print(resp_body)
    assert resp_body["id"] == 3
    assert resp_body["result"] is None
    assert resp_body["error"] is not None
    assert resp_body["error"]["code"] == -32602
    assert resp_body["error"]["message"] == "Invalid params"
    assert resp_body["jsonrpc"] == "2.0"
项目:OpsManage    作者:welliamcao    | 项目源码 | 文件源码
def connect(self, message):
        try:
            webssh = Global_Config.objects.get(id=1).webssh
        except Exception,ex:
            webssh = 0
        if webssh != 1:
            self.message.reply_channel.send({"text":json.dumps(['stdout','\033[1;3;31m????????WebSSH??~\033[0m'])},immediately=True)  
            self.disconnect(message)            
        if self.message.user:
            #??????
            user = User.objects.get(username=self.message.user)    
            if user.is_superuser:
                self.message.reply_channel.send({"accept": True})
            else:
                try:
                    sid = int(self.message.content.get('query_string').split('=')[1])
                except Exception:
                    self.message.reply_channel.send({"text":json.dumps(['stdout','\033[1;3;31m"???????????"\033[0m'])},immediately=True)  
                    self.disconnect(message) 
                server = User_Server.objects.get(user_id=user.id,server_id=sid)   
                if server:self.message.reply_channel.send({"accept": True})                          
        else:
            self.disconnect(message)
项目:OpsManage    作者:welliamcao    | 项目源码 | 文件源码
def posix_shell(chan,channel,log_name=None,width=90,height=40):
    from OpsManage.asgi import channel_layer
    stdout = list()
    begin_time = time.time()
    last_write_time = {'last_activity_time':begin_time}    
    try:
        chan.settimeout(0.0)
        while True:
            try:               
                x = u(chan.recv(1024))
                if len(x) == 0:
                    channel_layer.send(channel, {'text': json.dumps(['disconnect',smart_unicode('\r\n*** EOF\r\n')]) })
                    break
                now = time.time()
                delay = now - last_write_time['last_activity_time']
                last_write_time['last_activity_time'] = now                
                if x == "exit\r\n" or x == "logout\r\n" or x == 'logout':
                    chan.close()
                else:
                    stdout.append([delay,codecs.getincrementaldecoder('UTF-8')('replace').decode(x)]) 
                channel_layer.send(channel, {'text': json.dumps(['stdout',smart_unicode(x)]) })
            except socket.timeout:
                pass
            except Exception,e:
                channel_layer.send(channel, {'text': json.dumps(['stdout','A bug find,You can report it to me' + smart_unicode(e)]) })

    finally:
        pass
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def set(cls, model, domain, child_name, nodes, selected_nodes):
        # Normalize the json domain
        domain = json.dumps(json.loads(domain))
        current_user = Transaction().user
        records = cls.search([
                ('user', '=', current_user),
                ('model', '=', model),
                ('domain', '=', domain),
                ('child_name', '=', child_name),
                ])
        cls.delete(records)
        cls.create([{
                    'user': current_user,
                    'model': model,
                    'domain': domain,
                    'child_name': child_name,
                    'nodes': nodes,
                    'selected_nodes': selected_nodes,
                    }])
项目:health-mosconi    作者:GNUHealth-Mosconi    | 项目源码 | 文件源码
def get(cls, model, domain, child_name):
        # Normalize the json domain
        domain = json.dumps(json.loads(domain))
        current_user = Transaction().user
        try:
            expanded_info, = cls.search([
                    ('user', '=', current_user),
                    ('model', '=', model),
                    ('domain', '=', domain),
                    ('child_name', '=', child_name),
                    ],
                limit=1)
        except ValueError:
            return (cls.default_nodes(), cls.default_selected_nodes())
        state = cls(expanded_info)
        return (state.nodes or cls.default_nodes(),
            state.selected_nodes or cls.default_selected_nodes())
项目:devops-playground    作者:jerrywardlow    | 项目源码 | 文件源码
def write_to_cache(self):
        ''' Writes data in JSON format to a file '''
        data = {'data': self.data, 'inventory': self.inventory}
        json_data = json.dumps(data, sort_keys=True, indent=2)

        cache = open(self.cache_filename, 'w')
        cache.write(json_data)
        cache.close()

    ###########################################################################
    # Utilities
    ###########################################################################
项目:devops-playground    作者:jerrywardlow    | 项目源码 | 文件源码
def json_format_dict(self, data, pretty=False):
        ''' Converts a dict to a JSON object and dumps it as a formatted
        string '''

        if pretty:
            return json.dumps(data, sort_keys=True, indent=2)
        else:
            return json.dumps(data)


# Run the script
项目:devops-playground    作者:jerrywardlow    | 项目源码 | 文件源码
def json_format_dict(self, data, pretty=False):
        ''' Converts a dict to a JSON object and dumps it as a formatted
        string '''

        if pretty:
            return json.dumps(data, sort_keys=True, indent=2)
        else:
            return json.dumps(data)


# Run the script