我们有一个Hadoop集群,我们在上面存储使用Kryo(序列化框架)序列化为字节的数据。我们用于此目的的Kryo版本是从2.21正式版本派生而来的,以将我们自己的补丁应用于我们使用Kryo遇到的问题。当前的Kryo版本2.22也解决了这些问题,但是具有不同的解决方案。结果,我们不能仅仅更改我们使用的Kryo版本,因为这意味着我们将不再能够读取已经存储在Hadoop集群中的数据。为了解决这个问题,我们想运行一个Hadoop作业
问题在于,在一个Java程序中(更确切地说,在Hadoop作业的mapper类中)使用同一类的两个不同版本并不容易。
在一个Hadoop作业中,如何使用同一序列化框架的两个不同版本对对象进行反序列化和序列化?
我们想到的第一种方法是使用Maven Shade插件的重定位功能在我们自己的Kryo分支中重命名软件包,并使用不同的工件ID释放它,以便我们可以依赖转换工作项目中的两个工件。然后,我们将实例化旧版本和新版本的Kryo对象,并使用旧的Kryo对象进行反序列化,并使用新的Kryo对象再次序列化该对象。
问题 我们没有在Hadoop作业中显式使用Kryo,而是通过我们自己的库的多层访问它。对于这些库中的每一个,都有必要
为了使事情变得更加混乱,我们还使用了其他第三方库提供的Kryo序列化器,对此我们必须做同样的事情。
我们想到的第二种方法是完全不依赖包含转换作业的Maven项目中的Kryo,而是从每个版本的JAR加载所需的类,该类存储在Hadoop的分布式缓存中。然后序列化一个对象看起来像这样:
public byte[] serialize(Object foo, JarClassLoader cl) { final Class<?> kryoClass = cl.loadClass("com.esotericsoftware.kryo.Kryo"); Object k = kryoClass.getConstructor().newInstance(); ByteArrayOutputStream baos = new ByteArrayOutputStream(); final Class<?> outputClass = cl.loadClass("com.esotericsoftware.kryo.io.Output"); Object output = outputClass.getConstructor(OutputStream.class).newInstance(baos); Method writeObject = kryoClass.getMethod("writeObject", outputClass, Object.class); writeObject.invoke(k, output, foo); outputClass.getMethod("close").invoke(output); baos.close(); byte[] bytes = baos.toByteArray(); return bytes; }
问题 尽管此方法可能会实例化未配置的Kryo对象并序列化/还原某些对象,但我们使用了更为复杂的Kryo配置。这包括几个自定义序列化程序,注册的类ID等。例如,我们无法找到一种方法来设置类的自定义序列化程序,而不会出现NoClassDefFoundError- 以下代码不起作用:
Class<?> kryoClass = this.loadClass("com.esotericsoftware.kryo.Kryo"); Object kryo = kryoClass.getConstructor().newInstance(); Method addDefaultSerializer = kryoClass.getMethod("addDefaultSerializer", Class.class, Class.class); addDefaultSerializer.invoke(kryo, URI.class, URISerializer.class); // throws NoClassDefFoundError
最后一行抛出一个
java.lang.NoClassDefFoundError: com/esotericsoftware/kryo/Serializer
因为URISerializer该类引用了Kryo的Serializer类,并尝试使用其自己的类加载器(即System类加载器)加载它,而该类加载器不知道Serializer该类。
URISerializer
Serializer
当前,最有前途的方法似乎是使用独立的中间序列化,例如使用Gson或类似方法的JSON ,然后运行两个单独的作业:
问题 此解决方案的最大问题是它使处理的数据的空间消耗大致翻了一番。此外,我们需要另一种序列化方法,该方法对所有数据均无问题,我们需要首先对其进行研究。
我将使用多重类加载器方法。
(软件包重命名也可以。看起来确实很丑陋,但这是一次性的技巧,因此美观和正确性可以让位。中间序列化似乎有风险- 您使用Kryo是有原因的,而该理由将被否定。通过使用其他中间形式)。
总体设计为:
child classloaders: Old Kryo New Kryo <-- both with simple wrappers \ / \ / \ / \ / | default classloader: domain model; controller for the re-serialization
使用修改后的Kryo版本和包装器代码加载Jar。包装器具有一个带有一个参数的静态“ main”方法:要反序列化的文件名。通过默认类加载器的反射调用main方法:
Class deserializer = deserializerClassLoader.loadClass("com.example.deserializer.Main"); Method mainIn = deserializer.getMethod("main", String.class); Object graph = mainIn.invoke(null, "/path/to/input/file");
当调用返回时,使用一个简单的包装器用新的序列化框架加载另一个Jar。包装器有一个静态的“ main”方法和一个参数来传递要序列化的文件名。通过默认类加载器的反射来调用main方法:
Class serializer = deserializerClassLoader.loadClass("com.example.serializer.Main");
Method mainOut = deserializer.getMethod(“main”, Object.class, String.class); mainOut.invoke(null, graph, “/path/to/output/file”);
这个方法
注意事项
在代码片段中,为每个对象的序列化和反序列化创建了一个类加载器。您可能只想加载一次类加载器,发现主要方法并遍历文件,例如:
for (String file: files) { Object graph = mainIn.invoke(null, file + ".in"); mainOut.invoke(null, graph, file + ".out"); }
域对象是否引用 任何 Kryo类?如果是这样,您将遇到困难:
无论哪种情况,您的第一种方法都是检查这些参考文献并消除它们。确保已完成此操作的一种方法是确保默认的类加载器无法访问 任何 Kryo版本。如果域对象以任何方式引用Kryo,则引用将失败(如果直接引用该类,则引发ClassNotFoundError;如果使用反射,则引发ClassNotFoundException)。