Python json 模块,update() 实例源码

我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用json.update()

项目:smc-python    作者:gabstopper    | 项目源码 | 文件源码
def import_submodules(package, recursive=True):
    """
    Import all submodules of a module, recursively,
    including subpackages.

    From http://stackoverflow.com/questions/3365740/how-to-import-all-submodules

    :param package: package (name or actual module)
    :type package: str | module
    :rtype: dict[str, types.ModuleType]
    """
    import importlib
    import pkgutil
    if isinstance(package, str):
        package = importlib.import_module(package)
    results = {}
    for _loader, name, is_pkg in pkgutil.walk_packages(package.__path__):
        full_name = package.__name__ + '.' + name
        results[full_name] = importlib.import_module(full_name)
        if recursive and is_pkg:
            results.update(import_submodules(full_name))
项目:viewX-vscode    作者:danielkupco    | 项目源码 | 文件源码
def serialize_json(obj):
    """
    A helper method for serializing Cytoscape.js elements in desired json form.
    :param obj: Object to serialize
    :return: JSON string representation of obj
    """
    # handle concrete class serialization
    if hasattr(obj, '__metaclass__') and obj.__metaclass__.__name__ == 'Element':
        json = {}  # { '__classname__' : type(obj).__name__ }
        json.update(vars(obj))
        json.pop('__metaclass__', None)  # remove __metaclass__ from json
    # handle abstract class serialization
    elif obj.__class__.__name__ == 'type' and obj.__name__ == 'Element':
        json = obj.__name__
    elif obj.__class__.__name__ == 'ViewStyle':
        json = {}
        json.update(vars(obj))
    else:
        json = obj.__str__()
    return json
项目:activitypub-example    作者:tOkeshu    | 项目源码 | 文件源码
def to_activitystream(self):
        json = {
            "type": "Person",
            "id": self.uris.id,
            "name": self.name,
            "preferredUsername": self.username,
        }

        if not self.remote:
            json.update({
                "following": self.uris.following,
                "followers": self.uris.followers,
                "outbox": self.uris.outbox,
                "inbox": self.uris.inbox,
            })
        return json
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def _encode_send_job(self, job):
        json = self._encode_base_job(job)
        json.update({
            'subject': job.subject,
            'messageId': job.message_id
        })
        return json
项目:sndlatr    作者:Schibum    | 项目源码 | 文件源码
def _encode_remind_job(self, job):
        json = self._encode_base_job(job)
        json.update({
            'subject': job.subject,
            'threadId': job.thread_id,
            'knownMessageIds': job.known_message_ids,
            'onlyIfNoreply': job.only_if_noreply,
            'disabledReply': job.disabled_reply
        })
        return json
项目:smc-python    作者:gabstopper    | 项目源码 | 文件源码
def switch_domain(self, domain):
        """
        Switch from one domain to another. You can call session.login() with a domain
        key value to log directly into the domain of choice or alternatively switch
        from domain to domain. The user must have permissions to the domain or
        unauthorized will be returned. 
        ::

            session.login() # Log in to 'Shared Domain'
            ...
            session.switch_domain('MyDomain')

        :raises SMCConnectionError: Error logging in to specified domain.
            This typically means the domain either doesn't exist or the
            user does not have privileges to that domain.
        """
        if self.domain != domain:
            # Do we already have a session
            if domain not in self._sessions:
                logger.info('Creating session for domain: %s', domain)
                credentials = self._get_login_params()
                credentials.update(domain=domain)
                self.login(**credentials)
            else:
                logger.info('Switching to existing domain session: %s', domain)
                self._session = self._sessions.get(domain)
                self._domain = domain
项目:smc-python    作者:gabstopper    | 项目源码 | 文件源码
def _get_login_params(self):
        """
        Spec for login parameters
        """
        credentials = dict(
            url=self.url,
            api_version=self.api_version,
            timeout=self.timeout,
            verify=self._session.verify,
            domain=self.domain)
        credentials.update(self.credential.get_credentials())
        credentials.update(**self._extra_args)
        return credentials
项目:coralillo    作者:getfleety    | 项目源码 | 文件源码
def save(self):
        ''' Persists this object to the database. Each field knows how to store
        itself so we don't have to worry about it '''
        redis = type(self).get_redis()
        pipe = to_pipeline(redis)

        pipe.hset(self.key(), 'id', self.id)

        for fieldname, field in self.proxy:
            if not isinstance(field, Relation):
                field.save(getattr(self, fieldname), pipe, commit=False)

        pipe.sadd(type(self).members_key(), self.id)

        pipe.execute()

        if self.notify:
            data = json.dumps({
                'event': 'create' if not self._persisted else 'update',
                'data': self.to_json(),
            })
            redis.publish(type(self).cls_key(), data)
            redis.publish(self.key(), data)

        self._persisted = True

        return self
项目:coralillo    作者:getfleety    | 项目源码 | 文件源码
def update(self, **kwargs):
        ''' validates the given data against this object's rules and then
        updates '''
        redis = type(self).get_redis()
        errors = ValidationErrors()

        for fieldname, field in self.proxy:
            if not field.fillable:
                continue

            given = kwargs.get(fieldname)

            if given is None:
                continue

            try:
                value = field.validate(kwargs.get(fieldname), redis)
            except BadField as e:
                errors.append(e)
                continue

            setattr(
                self,
                fieldname,
                value
            )

        if errors.has_errors():
            raise errors

        return self.save()
项目:viewX-vscode    作者:danielkupco    | 项目源码 | 文件源码
def add_data(self, key, value):
        # self.data.update({key : value})
        self.data[key] = value
项目:activitypub-example    作者:tOkeshu    | 项目源码 | 文件源码
def to_json(self, **kwargs):
        json = Object.to_json(self, **kwargs)
        items = [item.to_json() if isinstance(item, Object) else item
                 for item in self.items]
        json.update({
            "items": items
        })
        return json
项目:activitypub-example    作者:tOkeshu    | 项目源码 | 文件源码
def to_activitystream(self):
        payload = self.payload.decode("utf-8")
        data = json.loads(payload)
        data.update({
            "id": self.uris.id
        })
        return data
项目:coralillo    作者:getfleety    | 项目源码 | 文件源码
def to_json(self, *, fields=None, embed=None):
        ''' Serializes this model to a JSON representation so it can be sent
        via an HTTP REST API '''
        json = dict()

        if fields is None or 'id' in fields:
            json['id'] = self.id

        if fields is None or '_type' in fields:
            json['_type'] = type(self).cls_key()

        def fieldfilter(fieldtuple):
            return\
                not fieldtuple[1].private and\
                not isinstance(fieldtuple[1], Relation) and (
                    fields is None or fieldtuple[0] in fields
                )

        json.update(dict(starmap(
            lambda fn, f: (fn, f.to_json(getattr(self, fn))),
            filter(
                fieldfilter,
                self.proxy
            )
        )))

        if embed:
            for requested_relation in parse_embed(embed):
                relation_name, subfields = requested_relation

                if not hasattr(self.proxy, relation_name):
                    continue

                relation = getattr(self.proxy, relation_name)

                if isinstance(relation, ForeignIdRelation):
                    item = relation.get()

                    if item is not None:
                        json[relation_name] = item.to_json(fields=subfields)
                    else:
                        json[relation_name] = None
                elif isinstance(relation, MultipleRelation):
                    json[relation_name] = list(map(lambda o: o.to_json(fields=subfields), relation.get()))

        return json