我知道Mule使用元素对数据的gzip压缩提供了极大的支持。但是客户端现在需要zip压缩,因为该文件必须作为zip压缩文件放在FTP上:(
在以下情况下,我在m子中遇到困难:
我创建了一个Spring bean,其中包含文件。我想使用ZipOutputStream类压缩此文件,并将其传递给我们的ftp。
这是我的流程配置:
<flow name="testFlow" initialState="stopped"> <file:inbound-endpoint path="${home.dir}/out" moveToDirectory="${hip.dir}/out/hist" fileAge="10000" responseTimeout="10000" connector-ref="input"/> <component> <spring-object bean="zipCompressor"/> </component> <set-variable value="#[message.inboundProperties.originalFilename]" variableName="originalFilename" /> <ftp:outbound-endpoint host="${ftp.host}" port="${ftp.port}" user="${ftp.username}" password="${ftp.password}" path="${ftp.root.out}" outputPattern="#[flowVars['originalFilename']].zip" /> </flow>
这是我的zipCompressor的代码:
@Component public class ZipCompressor implements Callable { private static final Logger LOG = LogManager.getLogger(ZipCompressor.class.getName()); @Override @Transactional public Object onCall(MuleEventContext eventContext) throws Exception { if (eventContext.getMessage().getPayload() instanceof File) { final File srcFile = (File) eventContext.getMessage().getPayload(); final String fileName = srcFile.getName(); final File zipFile = new File(fileName + ".zip"); try { // create byte buffer byte[] buffer = new byte[1024]; FileOutputStream fos = new FileOutputStream(zipFile); ZipOutputStream zos = new ZipOutputStream(fos); FileInputStream fis = new FileInputStream(srcFile); // begin writing a new ZIP entry, positions the stream to the start of the entry data zos.putNextEntry(new ZipEntry(srcFile.getName())); int length; while ((length = fis.read(buffer)) > 0) { zos.write(buffer, 0, length); } zos.closeEntry(); // close the InputStream fis.close(); // close the ZipOutputStream zos.close(); } catch (IOException ioe) { LOG.error("Error creating zip file" + ioe); } eventContext.getMessage().setPayload(zipFile); } return eventContext.getMessage(); } }
我编写了一个单元测试,压缩效果很好。实际上,已经使用正确的名称将文件传输到FTP,但是zip文件无效,并且通过在NotePad ++中打开它,它仅包含原始文件名。
我认为将zip文件传回m子流程时做错了事,但此刻我被困住了,因此不胜感激!
我已经为此实现了变压器
package com.test.transformer; import java.io.IOException; import java.io.InputStream; import java.util.zip.ZipEntry; import java.util.zip.ZipOutputStream; import org.apache.commons.io.IOUtils; import org.apache.commons.io.output.ByteArrayOutputStream; import org.mule.api.MuleMessage; import org.mule.api.transformer.TransformerException; import org.mule.transformer.AbstractMessageTransformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZipTransformer extends AbstractMessageTransformer { private static final Logger log = LoggerFactory.getLogger(ZipTransformer.class); public static final int DEFAULT_BUFFER_SIZE = 32768; public static byte[] MAGIC = { 'P', 'K', 0x3, 0x4 }; public ZipTransformer() { registerSourceType(InputStream.class); registerSourceType(byte[].class); } public Object transformMessage(MuleMessage message, String outputEncoding) throws TransformerException { Object payload = message.getPayload(); try{ byte[] data; if (payload instanceof byte[]) { data = (byte[]) payload; } else if (payload instanceof InputStream) { data = IOUtils.toByteArray((InputStream)payload); } else if (payload instanceof String) { data = ((String) payload).getBytes(outputEncoding); } else { data = muleContext.getObjectSerializer().serialize(payload); } return compressByteArray(data); }catch (Exception ioex) { throw new TransformerException(this, ioex); } } public Object compressByteArray(byte[] bytes) throws IOException { if (bytes == null || isCompressed(bytes)) { if (logger.isDebugEnabled()) { logger.debug("Data already compressed; doing nothing"); } return bytes; } if (logger.isDebugEnabled()) { logger.debug("Compressing message of size: " + bytes.length); } ByteArrayOutputStream baos = null; ZipOutputStream zos = null; try { baos = new ByteArrayOutputStream(DEFAULT_BUFFER_SIZE); zos = new ZipOutputStream(baos); zos.putNextEntry(new ZipEntry("test.txt")); zos.write(bytes, 0, bytes.length); zos.finish(); zos.close(); byte[] compressedByteArray = baos.toByteArray(); baos.close(); if (logger.isDebugEnabled()) { logger.debug("Compressed message to size: " + compressedByteArray.length); } return compressedByteArray; } catch (IOException ioex) { throw ioex; } finally { IOUtils.closeQuietly(zos); IOUtils.closeQuietly(baos); } } public boolean isCompressed(byte[] bytes) throws IOException { if ((bytes == null) || (bytes.length < 4 )) { return false; } else { for (int i = 0; i < MAGIC.length; i++) { if (bytes[i] != MAGIC[i]) { return false; } } return true; } } }
用作
<custom-transformer class="com.test.transformer.ZipTransformer" doc:name="file zip transformer"/>
截至目前,将文件名设置为test.txt。您可以更改使用任何属性或变量。
希望这可以帮助。