我们从Python开源项目中,提取了以下25个代码示例,用于说明如何使用jsonpickle.dumps()。
def test_background_queue(): publish_event = MagicMock() publish_event.return_value = 'aaa' task_queue = TaskQueue(publish_event) @task_queue.task() def funcy(): global funcy_called funcy_called += 1 return "blah" assert funcy() == "blah" assert funcy.delay() is True event = jsonpickle.dumps((funcy.path, (), {})) publish_event.assert_called_once_with(event) task_queue.process_event(event) assert funcy_called == 2
def to_dict(cls, node): """ Creates a dict representation of a node :param node: datamodel.base.node.Node instance :return: dict """ params = dict() for name, value in node._params.items(): if callable(value): params[name] = jsonpickle.dumps(cloudpickle.dumps(value)) else: params[name] = value return {'class': str(node.__class__), 'name': node._name, 'params': params, 'output_label': node._output_label}
def _get_encoded_attachments(self): attachments = self.get_attachments() if attachments: new_attachments = [] for attachment in attachments: if isinstance(attachment, File): attachment.seek(0) new_attachments.append((attachment.name, attachment.read(), guess_type(attachment.name)[0])) else: new_attachments.append(attachment) attachments = new_attachments return jsonpickle.dumps(attachments)
def _execute(self, ctx): self._check_closed() # Temporary file used to pass arguments to the started subprocess file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json') os.close(file_descriptor) with open(arguments_json_path, 'wb') as f: f.write(pickle.dumps(self._create_arguments_dict(ctx))) env = self._construct_subprocess_env(task=ctx.task) # Asynchronously start the operation in a subprocess proc = subprocess.Popen( [ sys.executable, os.path.expanduser(os.path.expandvars(__file__)), os.path.expanduser(os.path.expandvars(arguments_json_path)) ], env=env) self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc)
def site_map(): output = [] for rule in app.url_map.iter_rules(): options = {} for arg in rule.arguments: options[arg] = "[{0}]".format(arg) methods = ','.join(rule.methods) url = url_for(rule.endpoint, **options) import urllib.request line = urllib.request.unquote("{:50s} {:20s} {}".format(rule.endpoint, methods, url)) output.append(line) logging.info(str(output)) response = app.response_class( response=jsonpickle.dumps(output), status=200, mimetype='application/json' ) return response
def test_schedule_queue(): publish_event = MagicMock() publish_event.return_value = 'aaa' task_queue = TaskQueue(publish_event) @task_queue.task(schedules=['bbb']) def funcy(): return "blah" task_queue.process_schedule('bbb') event = jsonpickle.dumps((funcy.path, (), {})) publish_event.assert_called_once_with(event)
def test_sns_background_queue(): with patch('xavier.aws.sns.send_sns_message') as mock_send_sns_message: mock_send_sns_message.return_value = {"MessageId": "1234"} publish_event = publish_sns_message("sns:topic") task_queue = TaskQueue(publish_event) @task_queue.task(schedules=['bbb']) def funcy(): return "Blah" funcy.delay() event = jsonpickle.dumps((funcy.path, (), {})) mock_send_sns_message.assert_called_once_with( Message=event, TopicArn="sns:topic", ) mock_background_func = MagicMock() mock_background_func.__name__ = "background_func" mock_background_func.__module__ = "testing" mock_background_func.return_value = "awesome" composed_mock = task_queue.task()(mock_background_func) event = jsonpickle.dumps((composed_mock.path, (), {})) sns_consumer = handle_sns_message(task_queue.process_event) sns_consumer({ "Records": [{ 'Sns': { 'Message': event } }] }, {}) mock_background_func.assert_called_once_with()
def delay(self, *args, **kwargs): event = jsonpickle.dumps((self.path, args, kwargs)) if not self.publish_event: logger.error("This task has not yet been registered with a task queue") return False self.publish_event(event) return True
def serialize(cls, node): """ Creates a JSON representation of a node :param node: datamodel.base.node.Node instance :return: str """ return json.dumps(cls.to_dict(node))
def serialize(cls, graph): """ Creates a JSON representation of a graph :param node: datamodel.base.graph.Graph instance :return: str """ return json.dumps(cls.to_dict(graph))
def send(self, raise_exception=False, user=None): """ Handles the preparing the notification for sending. Called to trigger the send from code. If raise_exception is True, it will raise any exceptions rather than simply logging them. returns boolean whether or not the notification was sent successfully """ context = self.get_context_data() recipients = self.get_recipients() if 'text' in self.render_types: text_content = self.render('text', context) else: text_content = None if 'html' in self.render_types: html_content = self.render('html', context) else: html_content = None sent_from = self.get_sent_from() subject = self.get_subject() extra_data = self.get_extra_data() sent_notification = SentNotification( recipients=','.join(recipients), text_content=text_content, html_content=html_content, sent_from=sent_from, subject=subject, extra_data=json.dumps(extra_data) if extra_data else None, notification_class=self.get_class_path(), attachments=self._get_encoded_attachments(), user=user, ) return self.resend(sent_notification, raise_exception=raise_exception)
def dump(obj, file_name): if file_name.endswith('.json'): with open(file_name, 'w') as f: f.write(jsonpickle.dumps(obj)) return if isinstance(obj, np.ndarray): np.save(file_name, obj) return # Using joblib instead of pickle because of http://bugs.python.org/issue11564 joblib.dump(obj, file_name, protocol=pickle.HIGHEST_PROTOCOL)
def get_state(self): return jsonpickle.dumps([o.order_id for _, o in self._delayed_orders]).encode('utf-8')
def get_state(self): result = {} for key, obj in six.iteritems(self._objects): state = obj.get_state() if state is not None: result[key] = state return jsonpickle.dumps(result).encode('utf-8')
def get_state(self): return jsonpickle.dumps({ 'open_orders': [o.get_state() for account, o in self._open_orders], 'delayed_orders': [o.get_state() for account, o in self._delayed_orders] }).encode('utf-8')
def _send_message(connection, message): # Packing the length of the entire msg using struct.pack. # This enables later reading of the content. def _pack(data): return struct.pack(_INT_FMT, len(data)) data = jsonpickle.dumps(message) msg_metadata = _pack(data) connection.send(msg_metadata) connection.sendall(data)
def get(self): try: data = dao.get_all_categories() if data is not None: return jsonify({"categoryList": json.loads(jsonpickle.dumps(data, unpicklable=False))}) else: raise except Exception as e: print("AllCategoryApi", e)
def publish(self, payload, channel=None): if channel is None: channel = self._pub_channel_name log.debug("publishing to channel: %s \n %s" % (channel, payload)) try: payload = jsonpickle.dumps(payload) except ValueError: pass self._redis_conn.publish(channel, payload)
def _send(self, data: Dict[str, object]): self.statsPublisher.send(jsonpickle.dumps(data))