private JobData newJobDataReference(final JobData delegate) { final JobData result = new JobDataWrapper(delegate); FinalizableReference ref = new FinalizablePhantomReference<JobData>(result, FINALIZABLE_REFERENCE_QUEUE) { @Override public void finalizeReferent() { jobResultReferences.remove(this); try { delegate.close(); } catch (Exception e) { logger.warn(String.format("Failed to close the data object for job %s", delegate.getJobId()), e); } } }; jobResultReferences.add(ref); return result; }
@Override protected void offer(final T source) { final T present = cache.getIfPresent(source.getIdentifier()); if (present == null) { cache.put(source.getIdentifier(), source); final SchemaSourceRegistration<T> reg = register(source.getIdentifier()); final FinalizablePhantomReference<T> ref = new FinalizablePhantomReference<T>(source, queue) { @Override public void finalizeReferent() { reg.close(); regs.remove(this); } }; regs.add(ref); } }
@Override public void close() { while (!regs.isEmpty()) { final FinalizablePhantomReference<?> ref = regs.get(0); ref.finalizeReferent(); } cache.invalidateAll(); queue.close(); }