我们从Python开源项目中,提取了以下13个代码示例,用于说明如何使用json.update()。
def import_submodules(package, recursive=True): """ Import all submodules of a module, recursively, including subpackages. From http://stackoverflow.com/questions/3365740/how-to-import-all-submodules :param package: package (name or actual module) :type package: str | module :rtype: dict[str, types.ModuleType] """ import importlib import pkgutil if isinstance(package, str): package = importlib.import_module(package) results = {} for _loader, name, is_pkg in pkgutil.walk_packages(package.__path__): full_name = package.__name__ + '.' + name results[full_name] = importlib.import_module(full_name) if recursive and is_pkg: results.update(import_submodules(full_name))
def serialize_json(obj): """ A helper method for serializing Cytoscape.js elements in desired json form. :param obj: Object to serialize :return: JSON string representation of obj """ # handle concrete class serialization if hasattr(obj, '__metaclass__') and obj.__metaclass__.__name__ == 'Element': json = {} # { '__classname__' : type(obj).__name__ } json.update(vars(obj)) json.pop('__metaclass__', None) # remove __metaclass__ from json # handle abstract class serialization elif obj.__class__.__name__ == 'type' and obj.__name__ == 'Element': json = obj.__name__ elif obj.__class__.__name__ == 'ViewStyle': json = {} json.update(vars(obj)) else: json = obj.__str__() return json
def to_activitystream(self): json = { "type": "Person", "id": self.uris.id, "name": self.name, "preferredUsername": self.username, } if not self.remote: json.update({ "following": self.uris.following, "followers": self.uris.followers, "outbox": self.uris.outbox, "inbox": self.uris.inbox, }) return json
def _encode_send_job(self, job): json = self._encode_base_job(job) json.update({ 'subject': job.subject, 'messageId': job.message_id }) return json
def _encode_remind_job(self, job): json = self._encode_base_job(job) json.update({ 'subject': job.subject, 'threadId': job.thread_id, 'knownMessageIds': job.known_message_ids, 'onlyIfNoreply': job.only_if_noreply, 'disabledReply': job.disabled_reply }) return json
def switch_domain(self, domain): """ Switch from one domain to another. You can call session.login() with a domain key value to log directly into the domain of choice or alternatively switch from domain to domain. The user must have permissions to the domain or unauthorized will be returned. :: session.login() # Log in to 'Shared Domain' ... session.switch_domain('MyDomain') :raises SMCConnectionError: Error logging in to specified domain. This typically means the domain either doesn't exist or the user does not have privileges to that domain. """ if self.domain != domain: # Do we already have a session if domain not in self._sessions: logger.info('Creating session for domain: %s', domain) credentials = self._get_login_params() credentials.update(domain=domain) self.login(**credentials) else: logger.info('Switching to existing domain session: %s', domain) self._session = self._sessions.get(domain) self._domain = domain
def _get_login_params(self): """ Spec for login parameters """ credentials = dict( url=self.url, api_version=self.api_version, timeout=self.timeout, verify=self._session.verify, domain=self.domain) credentials.update(self.credential.get_credentials()) credentials.update(**self._extra_args) return credentials
def save(self): ''' Persists this object to the database. Each field knows how to store itself so we don't have to worry about it ''' redis = type(self).get_redis() pipe = to_pipeline(redis) pipe.hset(self.key(), 'id', self.id) for fieldname, field in self.proxy: if not isinstance(field, Relation): field.save(getattr(self, fieldname), pipe, commit=False) pipe.sadd(type(self).members_key(), self.id) pipe.execute() if self.notify: data = json.dumps({ 'event': 'create' if not self._persisted else 'update', 'data': self.to_json(), }) redis.publish(type(self).cls_key(), data) redis.publish(self.key(), data) self._persisted = True return self
def update(self, **kwargs): ''' validates the given data against this object's rules and then updates ''' redis = type(self).get_redis() errors = ValidationErrors() for fieldname, field in self.proxy: if not field.fillable: continue given = kwargs.get(fieldname) if given is None: continue try: value = field.validate(kwargs.get(fieldname), redis) except BadField as e: errors.append(e) continue setattr( self, fieldname, value ) if errors.has_errors(): raise errors return self.save()
def add_data(self, key, value): # self.data.update({key : value}) self.data[key] = value
def to_json(self, **kwargs): json = Object.to_json(self, **kwargs) items = [item.to_json() if isinstance(item, Object) else item for item in self.items] json.update({ "items": items }) return json
def to_activitystream(self): payload = self.payload.decode("utf-8") data = json.loads(payload) data.update({ "id": self.uris.id }) return data
def to_json(self, *, fields=None, embed=None): ''' Serializes this model to a JSON representation so it can be sent via an HTTP REST API ''' json = dict() if fields is None or 'id' in fields: json['id'] = self.id if fields is None or '_type' in fields: json['_type'] = type(self).cls_key() def fieldfilter(fieldtuple): return\ not fieldtuple[1].private and\ not isinstance(fieldtuple[1], Relation) and ( fields is None or fieldtuple[0] in fields ) json.update(dict(starmap( lambda fn, f: (fn, f.to_json(getattr(self, fn))), filter( fieldfilter, self.proxy ) ))) if embed: for requested_relation in parse_embed(embed): relation_name, subfields = requested_relation if not hasattr(self.proxy, relation_name): continue relation = getattr(self.proxy, relation_name) if isinstance(relation, ForeignIdRelation): item = relation.get() if item is not None: json[relation_name] = item.to_json(fields=subfields) else: json[relation_name] = None elif isinstance(relation, MultipleRelation): json[relation_name] = list(map(lambda o: o.to_json(fields=subfields), relation.get())) return json