@Test public void extendeSessionShouldBeClusterSerializable() throws InterruptedException { SessionImpl delegate = new SessionImpl(3000); ExtendedSession extendedSession = ExtendedSession.adapt(delegate); assertThat(extendedSession).isInstanceOf(ClusterSerializable.class); long createdAt = extendedSession.createdAt(); extendedSession.put("key1", "value"); extendedSession.put("key2", 20); Thread.sleep(300); Buffer buffer = Buffer.buffer(); ((ClusterSerializable) extendedSession).writeToBuffer(buffer); assertThat(buffer.length() > 0); ExtendedSession fromBuffer = ExtendedSession.adapt(new SessionImpl(0)); ((ClusterSerializable) fromBuffer).readFromBuffer(0, buffer); assertThat(fromBuffer.createdAt()).isEqualTo(createdAt); assertThat(fromBuffer.id()).isEqualTo(delegate.id()); assertThat(fromBuffer.timeout()).isEqualTo(delegate.timeout()); assertThat(fromBuffer.data()).isEqualTo(delegate.data()); }
protected byte[] asByte(Object object) throws IOException { ByteArrayOutputStream byteOut = new ByteArrayOutputStream(); DataOutput dataOutput = new DataOutputStream(byteOut); if (object instanceof ClusterSerializable) { ClusterSerializable clusterSerializable = (ClusterSerializable) object; dataOutput.writeBoolean(true); dataOutput.writeUTF(object.getClass().getName()); Buffer buffer = Buffer.buffer(); clusterSerializable.writeToBuffer(buffer); byte[] bytes = buffer.getBytes(); dataOutput.writeInt(bytes.length); dataOutput.write(bytes); } else { dataOutput.writeBoolean(false); ByteArrayOutputStream javaByteOut = new ByteArrayOutputStream(); ObjectOutput objectOutput = new ObjectOutputStream(javaByteOut); objectOutput.writeObject(object); dataOutput.write(javaByteOut.toByteArray()); } return byteOut.toByteArray(); }
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { boolean isClusterSerializable = in.readBoolean(); if (isClusterSerializable) { String className = in.readUTF(); Class<?> clazz = Thread.currentThread().getContextClassLoader().loadClass(className); try { data = (T) clazz.newInstance(); byte[] bytes = new byte[in.read()]; in.read(bytes); ((ClusterSerializable) data).readFromBuffer(0, Buffer.buffer(bytes)); } catch (InstantiationException | IllegalAccessException e) { throw new VertxException(e); } } else { data = (T) in.readObject(); } }
@Override public void writeToBuffer(Buffer buffer) { // try to get the user from the context otherwise fall back to any cached version User user = context != null ? context.user() : this.user; if (user != null && user instanceof ClusterSerializable) { buffer.appendByte((byte)1); String className = user.getClass().getCanonicalName(); if (className == null) { throw new IllegalStateException("Cannot serialize " + user.getClass().getName()); } byte[] bytes = className.getBytes(StandardCharsets.UTF_8); buffer.appendInt(bytes.length); buffer.appendBytes(bytes); ClusterSerializable cs = (ClusterSerializable)user; cs.writeToBuffer(buffer); } else { buffer.appendByte((byte)0); } }
@Override public int readFromBuffer(int pos, Buffer buffer) { byte b = buffer.getByte(pos++); if (b == (byte)1) { int len = buffer.getInt(pos); pos += 4; byte[] bytes = buffer.getBytes(pos, pos + len); pos += len; String className = new String(bytes, StandardCharsets.UTF_8); try { Class clazz = Utils.getClassLoader().loadClass(className); ClusterSerializable obj = (ClusterSerializable) clazz.newInstance(); pos = obj.readFromBuffer(pos, buffer); user = (User) obj; } catch (Exception e) { throw new VertxException(e); } } else { user = null; } return pos; }
private static ClusterSerializable unmarshal0(ClusterSerializableValue value) { try { Class<?> cls = Thread.currentThread().getContextClassLoader().loadClass(value.getClassName()); ClusterSerializable obj = (ClusterSerializable) cls.newInstance(); obj.readFromBuffer(0, Buffer.buffer(value.getData())); return obj; } catch (Exception e) { throw new IllegalStateException("Failed to load class " + value.getClassName(), e); } }
public static <T> Object toCachedObject(T t) { if (t instanceof ServerID) { return new InfinispanServerID((ServerID) t); } if (t instanceof ClusterNodeInfo) { return new InfinispanClusterNodeInfo((ClusterNodeInfo) t); } if (t instanceof ClusterSerializable) { return new InfinispanClusterSerializable((ClusterSerializable) t); } return t; }
/** * Creates a new Vert.x compatible serializer. */ private Serializer createSerializer() { return Serializer.using(KryoNamespace.builder() .setRegistrationRequired(false) .register(KryoNamespaces.BASIC) .register(ServerID.class) .register(new ClusterSerializableSerializer<>(), ClusterSerializable.class) .build()); }
@Override public void writeExternal(ObjectOutput out) throws IOException { boolean isClusterSerializable = ClusterSerializable.class.isInstance(data); out.writeBoolean(isClusterSerializable); if (isClusterSerializable) { out.writeUTF(data.getClass().getName()); Buffer buffer = Buffer.buffer(); ((ClusterSerializable) data).writeToBuffer(buffer); byte[] bytes = buffer.getBytes(); out.write(bytes.length); out.write(bytes); } else { out.writeObject(data); } }
@Override public void put(Session session, Handler<AsyncResult<Void>> resultHandler) { ClusterSerializable cs = (ClusterSerializable)session; Buffer buff = Buffer.buffer(); cs.writeToBuffer(buff); sessions.put(session.id(), buff); vertx.runOnContext(v -> resultHandler.handle(Future.succeededFuture())); }
@SuppressWarnings("unchecked") public static <T> T convertParam(T obj) { if (obj instanceof ClusterSerializable) { ClusterSerializable cobj = (ClusterSerializable) obj; return (T) (new DataSerializableHolder(cobj)); } else { return obj; } }
@Override public void readData(ObjectDataInput objectDataInput) throws IOException { String className = objectDataInput.readUTF(); int length = objectDataInput.readInt(); byte[] bytes = new byte[length]; objectDataInput.readFully(bytes); try { Class<?> clazz = Thread.currentThread().getContextClassLoader().loadClass(className); clusterSerializable = (ClusterSerializable) clazz.newInstance(); clusterSerializable.readFromBuffer(0, Buffer.buffer(bytes)); } catch (Exception e) { throw new IllegalStateException("Failed to load class " + e.getMessage(), e); } }
@Override public void writeToBuffer(Buffer buffer) { buffer.appendLong(createdAt); ((ClusterSerializable) delegate).writeToBuffer(buffer); }
@Override public int readFromBuffer(int pos, Buffer buffer) { createdAt = buffer.getLong(pos); return ((ClusterSerializable) delegate).readFromBuffer(pos + 8, buffer); }
private static ClusterSerializableValue marshal0(ClusterSerializable obj) { Buffer buffer = Buffer.buffer(); obj.writeToBuffer(buffer); return new ClusterSerializableValue(obj.getClass().getName(), buffer.getBytes()); }
public InfinispanClusterSerializable(ClusterSerializable data) { Objects.requireNonNull(data); this.data = data; }
public ClusterSerializable getData() { return data; }
private DataSerializableHolder(ClusterSerializable clusterSerializable) { this.clusterSerializable = clusterSerializable; }
public ClusterSerializable clusterSerializable() { return clusterSerializable; }
/** * Serializes and wraps to {@link ClusterSerializableValue} given object if it implements * {@link ClusterSerializable} interface, otherwise returns source value. * * @param obj Object. * @return {@link ClusterSerializableValue} instance as serialized form of passed object if it implements * {@link ClusterSerializable} interface, otherwise passed object itself. */ public static <T> T marshal(T obj) { if (obj instanceof ClusterSerializable) { return (T) marshal0((ClusterSerializable) obj); } else { return obj; } }