我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用jsonpickle.decode()。
def json_deserialize(json, unboxing_function=None): """JSON Deerialization of a given string. Args: json (str): The JSON serialized string to deserialize. Returns: dict: A dictionary representing the data contained in the JSON serialized string. """ if json is None: return None try: decoded = jsonpickle.decode(json) except: return json if unboxing_function is None: return decoded elif isinstance(decoded, list): return [unboxing_function(element) for element in decoded] else: return unboxing_function(decoded)
def json_deserialize(cls, json): """ JSON Deerialization of a given string. Args: json (str): The JSON serialized string to deserialize. Returns: dict: A dictionary representing the data contained in the JSON serialized string. """ if json is None: return None return jsonpickle.decode(json)
def convert_to_deploy_azure_vm_resource_model(deployment_request, cloudshell_session, logger): """ Convert deployment request JSON to the DeployAzureVMResourceModel model :param str deployment_request: JSON string :param cloudshell.api.cloudshell_api.CloudShellAPISession cloudshell_session: instance :param logging.Logger logger: :return: deploy_azure_vm_resource_models.DeployAzureVMResourceModel instance :rtype: DeployAzureVMResourceModel """ data = jsonpickle.decode(deployment_request) deployment_resource_model = DeployAzureVMResourceModel() data_attributes = data['Attributes'] deployment_resource_model.image_offer = data_attributes['Image Offer'] deployment_resource_model.image_publisher = data_attributes['Image Publisher'] deployment_resource_model.image_sku = data_attributes['Image SKU'] deployment_resource_model.image_version = data_attributes['Image Version'] AzureModelsParser._set_base_deploy_azure_vm_model_params(deployment_resource_model=deployment_resource_model, data_holder=data, cloudshell_session=cloudshell_session, logger=logger) return deployment_resource_model
def test__str__(self): """ Test whether the __str__ method successfully generates a json string representation of the object. """ # Confirm that the string representation of the current object and that # of an object decoded from this string representation are equal. str_o = str(self.o) new_o = jsonpickle.decode(str_o) self.assertEqual(str_o, str(new_o)) # Confirm that the property values are preserved through string # encoding and decoding. self.assertEqual(new_o.name, 'NameA') self.assertEqual(new_o.description, 'DescriptionA')
def handle_message(customer_uuid): """Handles messages from a phone number with body @param customer_uuid The UUID of the sender @data message_body The body of the message """ data = jsonpickle.decode(request.data.decode("utf-8")) message_body = data["message_body"] # Retrieve the message and create the customer customer = Customer.load_from_db(customer_uuid) messaging.on_message_recieve(customer, message_body) return jsonpickle.encode(dict( success=True ))
def enforce_request_json_schema(schema, no_extra=False): """Decorator that throws an exception if the request data doesn't match the given schema @param schema A one level deep dictionary that maps keys to types @param no_extra A flag which asserts that the request data had no superflous keys """ def validate_data(data): if no_extra and data.keys() > schema.keys(): return False check_type = lambda key: isinstance(data[key], schema[key]) return all(map(check_type, schema.keys())) def wraps(f): def decorated(*args, **kwargs): data = jsonpickle.decode(request.data.decode("utf-8")) if not validate_data(data): raise DecidePoliticsException(Errors.INVALID_DATA_PRESENT) return f(*args, **kwargs) return decorated return wraps
def openMemeDB(): if not os.path.exists("MemeDB.json"): memeDbFile = open("MemeDB.json","w") memeDbFile.write("[]") memeDbFile.close() memeDbFile = open("MemeDB.json","r") memeDbStr = memeDbFile.read() memesList = jsonpickle.decode(memeDbStr) memeDbFile.close() return memesList #This function will take a list of meme objects and save it to MemeDB.json
def check_tree_generator(): # First, read in the appropriate values: # Input: resources = os.path.join("resources", "FormatterUnitTests") with open(os.path.join(resources, "JsonOutput.txt")) as f: objectModel=jsonpickle.decode(f.read()) # Verify that they have all the same lines. with open(os.path.join(resources, "TreeDepth3.txt")) as f: treeDepth3 = f.read() assert collections.Counter( utils.get_result_as_tree(objectModel, depth=3).split("\n") ) == collections.Counter( treeDepth3.split("\n") ) # Verify that if we change the depth, it still has all the same lines. with open(os.path.join(resources, "TreeDepth5.txt")) as f: treeDepth5 = f.read() assert collections.Counter( utils.get_result_as_tree(objectModel, depth=5).split("\n") ) == collections.Counter( treeDepth5.split("\n") ) print("Tree generator is working.")
def json_deserialize(json, unboxing_function=None): """JSON Deerialization of a given string. Args: json (str): The JSON serialized string to deserialize. Returns: dict: A dictionary representing the data contained in the JSON serialized string. """ if json is None: return None try: decoded = jsonpickle.decode(json) except: return json if unboxing_function == None: return decoded elif type(decoded) == list: return [unboxing_function(element) for element in decoded] else: return unboxing_function(decoded)
def load_game(save_path): jsonpickle.set_preferred_backend('simplejson') #cur_path = os.path.dirname(__file__) #save_path = os.path.relpath('..\\Save Game\\' + file_name, cur_path) with open(save_path, 'r') as save: state = jsonpickle.decode(save.read(), keys = True) #print("State:") #for s in state: # print(s) #save = shelve.open(file_name, writeback = False) #state = dict() #for k, v in save.items(): # state[k] = v #save.close() #save = shelve.open(file_name, flag = "n", writeback = False) return state
def load_metadata(self): """ Loads this node's metadata which is stored in a child comment encoded in json. """ for child in self: if type(child) is NodeComment: if child.text.startswith("<designer.metadata.do.not.edit>"): try: self.metadata = decode(child.text.split(maxsplit=1)[1]) except JSONDecodeError: continue self.model_item.setText(self.metadata.get("name", self.update_item_name())) self.user_sort_order = self.metadata.get("user_sort", "0".zfill(7)) if not self.hidden_children: hidden_nodes = self.metadata.get("hidden_nodes", []) for node_string in hidden_nodes: node_string = node_string.replace("<!- -", "<!--").replace("- ->", "-->") node = copy_node(etree.fromstring(node_string), self) # type: _NodeElement self.add_child(node) if node.tag is not etree.Comment else self.append(node) node.set_hidden(True) self.sort() self.model_item.sortChildren(0)
def aad_cache(): """AAD token cache.""" return jsonpickle.decode(get_config_value('aad_token', fallback=None)), \ jsonpickle.decode(get_config_value('aad_cache', fallback=None))
def json_loads(string): return jsonpickle.decode(string)
def _get_flow_for_token(csrf_token, request): """ Looks up the flow in session to recover information about requested scopes. Args: csrf_token: The token passed in the callback request that should match the one previously generated and stored in the request on the initial authorization view. Returns: The OAuth2 Flow object associated with this flow based on the CSRF token. """ flow_pickle = request.session.get(_FLOW_KEY.format(csrf_token), None) return None if flow_pickle is None else jsonpickle.decode(flow_pickle)
def to_python(self, value): """Overrides ``models.Field`` method. This is used to convert bytes (from serialization etc) to an instance of this class""" if value is None: return None elif isinstance(value, oauth2client.client.Credentials): return value else: try: return jsonpickle.decode( base64.b64decode(encoding.smart_bytes(value)).decode()) except ValueError: return pickle.loads( base64.b64decode(encoding.smart_bytes(value)))
def json_deserialize(json): """JSON Deerialization of a given string. Args: json (str): The JSON serialized string to deserialize. Returns: dict: A dictionary representing the data contained in the JSON serialized string. """ if json is None: return None return jsonpickle.decode(json)
def from_json(bundle_json): return jsonpickle.decode(bundle_json)
def test_mask_body_with_none_and_empty_masks(self): body = jsonpickle.decode('{"a": [1, 2, 3, {"b": 1}], "b": [1, 2, 3, {"c": 1}]}') masks = None result = mask_body(body, masks) self.assertEqual(body, result) masks = [] result = mask_body(body, masks) self.assertEqual(body, result)
def test_mask_body_with_deep_item(self): body = jsonpickle.decode(""" { "a": [1, 2, 3, {"b": 1}], "b": [1, 2, 3, {"c": 1}], "c": 123, "d": "a string" } """) masks = ['c', 'd'] result = mask_body(body, masks) self.assertIsNone(result.get('d')) self.assertIsNone(result['b'][3].get('c'))
def __init__(self, model_dir): self.model_dir = model_dir def load_model(arch_file, weights_file): """ Load Keras model from files - YAML architecture, HDF5 weights. """ with open(arch_file) as f: model = keras.models.model_from_yaml(f.read()) model.load_weights(weights_file) model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) return model def load_model_from_dir(model_dir): """ Load Keras model stored into a given directory with some file-name conventions. YAML architecture, HDF5 weights. """ return load_model( model_dir + '/model_arch.yaml', model_dir + '/model_weights.h5') self.model = load_model_from_dir(model_dir) with open(model_dir + '/preproc_transformers.json', 'r') as f: self.instr_family_le, self.scaler, self.ch = \ jsonpickle.decode(f.read())
def process_result_value(self, value, engine): if value: return jsonpickle.decode(value) else: # default can also be a list return {}
def set_state(self, state): state = state.decode('utf-8') value = jsonpickle.decode(state) self._start_date = value['start_date'] self._static_unit_net_value = value['static_unit_net_value'] self._units = value['units'] for k, v in six.iteritems(value['accounts']): if k == 'ACCOUNT_TYPE.STOCK': self._accounts[ACCOUNT_TYPE.STOCK].set_state(v) elif k == 'ACCOUNT_TYPE.FUTURE': self._accounts[ACCOUNT_TYPE.FUTURE].set_state(v) else: raise NotImplementedError
def queryInvoice(query, context): # get realm for OAuth1 from view session and OAuth2 from settings if settings.oauth_flag == 1: realm_id = context.realmId else: realm_id = settings.realm_id url = settings.base_url+realm_id+"/query?query="+quote(query)+"&minorversion=9" request = services.makeRequest(url, 'GET', context) queryResponse = request.json() if len(queryResponse["QueryResponse"].keys()) > 0: invoiceList = [] for each in queryResponse["QueryResponse"]['Invoice']: deserialize_obj_string = "py/object" each["py/object"] = "SampleInvoiceCRUD.models.Invoice" lines = each["Line"] for line in lines: line[deserialize_obj_string] = "SampleInvoiceCRUD.models.LineItem" invoice_json_str = json.dumps(each) invoice_obj = jsonpickle.decode(invoice_json_str) invoiceList.append(invoice_obj) return invoiceList else: message = "Your query returned empty response." return message # Similar to create, need to provide Id, SyncToken and other updated fields
def readInvoice(invoiceId, context): # get realm for OAuth1 from view session and OAuth2 from settings if settings.oauth_flag == 1: realm_id = context.realmId else: realm_id = settings.realm_id url=settings.base_url+realm_id+"/invoice/"+str(invoiceId)+"?minorversion=9" request = services.makeRequest(url, 'GET', context) invoice_json = request.json() if request.status_code == 200: # Add key value for jsonpickle to work deserialize_obj_string = "py/object" invoice_json["Invoice"][deserialize_obj_string] = "SampleInvoiceCRUD.models.Invoice" lines = invoice_json["Invoice"]["Line"] for line in lines: line[deserialize_obj_string] = "SampleInvoiceCRUD.models.LineItem" invoice = invoice_json["Invoice"] invoice_json_str = json.dumps(invoice) invoice_obj = jsonpickle.decode(invoice_json_str) if(type(invoice_obj)) == Invoice: return invoice_obj else: return "Could not deserialize invoice. Please use this object as a dictionary." else: return 'No invoice found with Id '+str(invoiceId)
def load(self): with open(self.dump_filename, "r", encoding='utf-8') as f: vectorizer = jsonpickle.decode(f.read()) self.__dict__.update(vectorizer.__dict__)
def test_classification_result(self): result = ClassificationResult(5) result.additions["iambos"].append(StressCorrection(0, 0, 0, "", 0)) self.assertEqual(result, jsonpickle.decode(result.to_json()))
def loads(json, **args): return jsonpickle.decode(json)
def get_creds(request): flow = jsonpickle.decode(request.session['flow']) credential = flow.step2_exchange(request.GET.get('code', False)) storage = DjangoORMStorage(GoogleCredentials, 'id', request.user, 'credential') storage.put(credential) if request.GET.get('state', False): return redirect(unquote_plus(request.GET.get('state'))) else: return HttpResponse(status=200)
def prepare_connectivity(self, context, request, cancellation_context): """ Creates a connectivity for the Sandbox: 1.Resource group 2.Storage account 3.Key pair 4.Network Security Group 5.Creating a subnet under the :param context: :param request: :param cancellation_context cloudshell.shell.core.driver_context.CancellationContext instance :return: """ with LoggingSessionContext(context) as logger: with ErrorHandlingContext(logger): logger.info('Preparing Connectivity for Azure VM...') with CloudShellSessionContext(context) as cloudshell_session: cloud_provider_model = self.model_parser.convert_to_cloud_provider_resource_model( resource=context.resource, cloudshell_session=cloudshell_session) azure_clients = AzureClientsManager(cloud_provider_model) prepare_connectivity_request = DeployDataHolder(jsonpickle.decode(request)) prepare_connectivity_request = getattr(prepare_connectivity_request, 'driverRequest', None) result = self.prepare_connectivity_operation.prepare_connectivity( reservation=self.model_parser.convert_to_reservation_model(context.reservation), cloud_provider_model=cloud_provider_model, storage_client=azure_clients.storage_client, resource_client=azure_clients.resource_client, network_client=azure_clients.network_client, logger=logger, request=prepare_connectivity_request, cancellation_context=cancellation_context) logger.info('End Preparing Connectivity for Azure VM') return self.command_result_parser.set_command_result({'driverResponse': {'actionResults': result}})
def cleanup_connectivity(self, command_context, request): with LoggingSessionContext(command_context) as logger: with ErrorHandlingContext(logger): logger.info('Teardown...') with CloudShellSessionContext(command_context) as cloudshell_session: cloud_provider_model = self.model_parser.convert_to_cloud_provider_resource_model( resource=command_context.resource, cloudshell_session=cloudshell_session) azure_clients = AzureClientsManager(cloud_provider_model) resource_group_name = command_context.reservation.reservation_id cleanup_connectivity_request = getattr(DeployDataHolder(jsonpickle.decode(request)), 'driverRequest', None) result = self.delete_azure_vm_operation.cleanup_connectivity( network_client=azure_clients.network_client, resource_client=azure_clients.resource_client, cloud_provider_model=cloud_provider_model, resource_group_name=resource_group_name, request=cleanup_connectivity_request, logger=logger) logger.info('End Teardown') return self.command_result_parser.set_command_result({'driverResponse': {'actionResults': [result]}})
def _set_base_deploy_azure_vm_model_params(self, deployed_resource, resource): """Convert all basic parameters for VM deploy models :param deployed_resource: deploy_azure_vm_resource_models.BaseDeployAzureVMResourceModel subclass instance :param resource: The context of the resource :return: """ deployed_resource.group_name = "" # needs to be auto generated deployed_resource.vm_name = "" # needs to be auto generated deployed_resource.cloud_provider = resource.attributes[ 'Cloud Provider'] if 'Cloud Provider' in resource.attributes.keys() else None deployed_resource.vm_size = resource.attributes['VM Size'] deployed_resource.autoload = self._convert_to_bool(resource.attributes['Autoload']) deployed_resource.add_public_ip = self._convert_to_bool(resource.attributes['Add Public IP']) deployed_resource.inbound_ports = resource.attributes['Inbound Ports'] deployed_resource.public_ip_type = resource.attributes['Public IP Type'] deployed_resource.disk_type = resource.attributes['Disk Type'] deployed_resource.extension_script_file = resource.attributes['Extension Script file'] deployed_resource.extension_script_configurations = resource.attributes['Extension Script Configurations'] deployed_resource.extension_script_timeout = int(resource.attributes['Extension Script Timeout']) app_request = jsonpickle.decode(resource.app_context.app_request_json) attrs = app_request["logicalResource"]["attributes"] deployed_resource.username = AzureModelsParser.get_attribute_value_by_name_ignoring_namespace(attrs, "User") deployed_resource.password = AzureModelsParser.get_attribute_value_by_name_ignoring_namespace(attrs, "Password")
def convert_app_resource_to_deployed_app(resource): json_str = jsonpickle.decode(resource.app_context.deployed_app_json) data_holder = DeployDataHolder(json_str) return data_holder
def Deploy(self, context, request=None, cancellation_context=None): app_request = jsonpickle.decode(request) deployment_name = app_request['DeploymentServiceName'] if deployment_name in self.deployments.keys(): deploy_method = self.deployments[deployment_name] return deploy_method(context,request,cancellation_context) else: raise Exception('Could not find the deployment')
def load_game(): with open('savegame.json', 'r') as save_file: data = json.load(save_file) game = jsonpickle.decode(data['serialized_game']) player_index = jsonpickle.decode(data['serialized_player_index']) camera = jsonpickle.decode(data['serialized_cam']) player = game.level.current_entities[player_index] return game, player, camera # main loop
def read(path): with open(path, 'r') as file: s = file.read() result = jsonpickle.decode(s) result._init() return result
def test___str__(self): """ Test whether the __str__ method successfully generates a json string representation of the object. """ # Confirm that the string representation of the current object and that # of an object decoded from this string representation are equal. str_o = str(self.o) new_o = jsonpickle.decode(str_o) self.assertEqual(str_o, str(new_o))
def deserialize(self, byte_array): if not byte_array: return None else: return byte_array.decode("UTF-8")
def deserialize(self, byte_array): if byte_array: return jsonpickle.decode(byte_array.decode("UTF-8")) else: return None
def create_new_customer(): """Creates a new customer from the given data @data The attributes to be present in the customer item""" data = jsonpickle.decode(request.data.decode("utf-8")) customer = Customer.create_new(attributes=data) customer.create() return jsonpickle.encode(dict( success=True ))
def deserialize_session(session): ''' Takes a dictionary having a session object's atributes and deserializes it into a sessoin object. ''' decoded = jsonpickle.decode(session) new_session = requests.session() new_session.__dict__.update(decoded) return new_session
def on_message(self, message): if message: decoded_message = jsonpickle.decode(message) logging.info("incomding ws message %s " % decoded_message) handler = ZynthianWebSocketMessageHandlerFactory(decoded_message['handler_name'], self) handler.on_websocket_message(decoded_message['data']) self.handlers.append(handler) # client disconnected
def load(self, filepath): """Load the optimizer parameters from the specifiec path as JSON. Parameters ---------- filepath: str The file path. """ with open(filepath, 'r') as f: json = f.read() model = jsonpickle.decode(json) self.__dict__.update(model.__dict__)