private boolean isSyncNeeded(Document doc, BsonTimestamp oplogTs) { syncFlg = doc.getBoolean(SyncConstants.SYNC_FLAG, false); syncTime = doc.get(SyncConstants.SYNC_TIME); boolean syncNeeded = false; if (syncFlg) { syncNeeded = true; } else { if (!isRestrictedSyncEnabled) { if (syncTime != null) { if (oplogTs.getTime() - (Long.valueOf(String.valueOf(syncTime)) / 1000) > SYNC_DIFF) { syncNeeded = true; } } else { syncNeeded = true; } } } return syncNeeded; }
@Override public Struct toStruct(Document document, Schema schema) { final Struct messageStruct = new Struct(schema); final BsonTimestamp bsonTimestamp = (BsonTimestamp) document.get("ts"); final Integer seconds = bsonTimestamp.getTime(); final Integer order = bsonTimestamp.getInc(); messageStruct.put("timestamp", seconds); messageStruct.put("order", order); messageStruct.put("operation", document.get("op")); messageStruct.put("database", document.get("ns")); final Document modifiedDocument = (Document) document.get("o"); messageStruct.put("object", modifiedDocument.toJson()); return messageStruct; }
private DateTime getLastDateTime(Object lastDateValue) { if (lastDateValue == null) { return null; } // ObjectId类型 if ("id".equals(since_type)) { ObjectId objectId = (ObjectId) lastDateValue; return new DateTime(objectId.getDate()); } else { Class<?> clazz = lastDateValue.getClass(); if (String.class.isAssignableFrom(clazz)) { // TODO format } else if (BsonTimestamp.class.isAssignableFrom(clazz)) { // TODO convert } } return null; }
/** * Returns the timestamp of the latest oplog entry. * * @param collection The oplog {@link MongoCollection} * @return The latest timestamp or {@code null} if no entry is available */ public static BsonTimestamp getLatestOplogTimestamp(MongoCollection<BsonDocument> collection) { final AtomicReference<BsonTimestamp> timestamp = new AtomicReference<>(); final AtomicReference<Throwable> error = new AtomicReference<>(); final CountDownLatch waiter = new CountDownLatch(1); collection.find().sort(new Document("$natural", -1)).limit(1).first(new SingleResultCallback<BsonDocument>() { @Override public void onResult(BsonDocument document, Throwable throwable) { if (throwable != null) error.set(throwable); if (document != null) timestamp.set(document.getTimestamp("ts")); waiter.countDown(); } }); ConcurrentUtils.safeAwait(waiter); Throwable realError = error.get(); if (realError != null) throw Throwables.propagate(realError); return timestamp.get(); }
public void process() { MongoDatabase db = mongoClient.getDatabase("local"); MongoCollection<RawBsonDocument> oplog = db.getCollection("oplog.rs", RawBsonDocument.class); RawBsonDocument lastOplogEntry = oplog.find().sort(new Document("$natural", -1)).first(); BsonTimestamp lastTimestamp = (BsonTimestamp) lastOplogEntry.get("ts"); System.out.println(lastTimestamp); Document query = new Document("ts", new Document("$lt", lastTimestamp)); for (RawBsonDocument doc : oplog.find(query).noCursorTimeout(true)) { BsonString ns = (BsonString) doc.get("ns"); BsonString op = (BsonString) doc.get("op"); // ignore no-op if (!op.getValue().equals("n")) { OplogEntryKey key = new OplogEntryKey(ns.getValue(), op.getValue()); EntryAccumulator accum = accumulators.get(key); if (accum == null) { accum = new EntryAccumulator(key); accumulators.put(key, accum); } long len = doc.getByteBuffer().asNIO().array().length; accum.addExecution(len); } if (stop) { mongoClient.close(); report(); break; } } }
public MngOpLogReader(String collectionName, String mongoDbName, String mongoUserName, BlockingQueue<Document> dataBuffer, BsonTimestamp lastReadTime, SyncMarker marker, CountDownLatch latch , ObjectId eventId) { super(); this.ns = mongoDbName + QueryConstants.DOT + collectionName; this.mongoDbName = mongoDbName; this.mongoUserName = mongoUserName; this.dataBuffer = dataBuffer; this.lastReadTime = lastReadTime; this.marker = marker; this.latch=latch; this.eventId=eventId; this.eventDao= new SyncEventDao(); }
public static CombinedId random() { CombinedId combinedId = new CombinedId(); combinedId.idPart1 = "Hi " + randomGenerator.nextInt(); combinedId.idPart2 = new ObjectId(); combinedId.idPart3 = new BsonTimestamp(randomGenerator.nextInt(), 0); return combinedId; }
@Override public Struct toStruct(Document document, Schema schema) { Struct messageStruct = new Struct(schema); BsonTimestamp bsonTimestamp = (BsonTimestamp) document.get("ts"); Integer seconds = bsonTimestamp.getTime(); Integer order = bsonTimestamp.getInc(); messageStruct.put("timestamp", seconds); messageStruct.put("order", order); messageStruct.put("operation", document.get("op")); messageStruct.put("database", document.get("ns")); messageStruct.put("object", document.get("o").toString()); return messageStruct; }
/** * Calculates the timestamp of the message. * * @param message from which retrieve the timestamp * @return BsonTimestamp formatted as a String (seconds+inc) */ private String getTimestamp(Document message) { BsonTimestamp timestamp = (BsonTimestamp) message.get("ts"); return new StringBuilder() .append(timestamp.getTime()) .append("_") .append(timestamp.getInc()) .toString(); }
private Bson getFilterLastTimeStamp(MongoCollection<Document> tsCollection, MongoClientWrapper client) { Document lastTimeStamp = tsCollection.find(new Document("_id", client.getHost())).limit(1) .first(); return getTimeQuery(lastTimeStamp == null ? null : (BsonTimestamp) lastTimeStamp .get(OPLOG_TIMESTAMP)); }
private void updateHostOperationTimeStamp(MongoCollection<Document> tsCollection, BsonTimestamp lastTimeStamp, String host) { try { tsCollection.replaceOne(new Document("_id", host), new Document("_id", host).append(OPLOG_TIMESTAMP, lastTimeStamp), (new UpdateOptions()).upsert(true)); } catch (Exception e) { logger.error(e.getMessage()); } }
@Test public void testTimeStampType() throws IOException { BsonDocument bsonDoc = new BsonDocument(); bsonDoc.append("ts", new BsonTimestamp(1000, 10)); writer.reset(); bsonReader.write(writer, new BsonDocumentReader(bsonDoc)); SingleMapReaderImpl mapReader = (SingleMapReaderImpl) writer.getMapVector().getReader(); assertEquals(1000l, mapReader.reader("ts").readDateTime().getMillis()); }
/** * {@inheritDoc} */ @Override public void run() { BsonTimestamp timestamp = OpLogUtils.getLatestOplogTimestamp(oplog); if (timestamp == null) { LOGGER.severe("OpLog is not ready. Please make sure that the server maintains an oplog and restart this server."); return; } final AtomicReference<BsonTimestamp> last = new AtomicReference<>(timestamp); //noinspection InfiniteLoopStatement while (true) { final CountDownLatch waiter = new CountDownLatch(1); oplog.find(Filters.and(Filters.gt("ts", last.get()), Filters.eq("ns", namespace))).cursorType(CursorType.TailableAwait).forEach( new Block<BsonDocument>() { @Override public void apply(BsonDocument document) { BsonTimestamp current = document.getTimestamp("ts"); if (current.getTime() > last.get().getTime()) { last.set(current); parser.emit(document); } } }, new SingleResultCallback<Void>() { @Override public void onResult(Void aVoid, Throwable throwable) { waiter.countDown(); } } ); ConcurrentUtils.safeAwait(waiter); } }
@Override public Long convertFromMongoDbType(BsonTimestamp data) { int inc = data.getInc(); if (inc < 0 || inc >= 1000) { throw new RuntimeException("Overflow occurs while converting BsonTimestamp into long: " + data); } return (long) data.getTime() * 1000 + inc; }
@Override public BsonTimestamp convertToMongoDbType(String data) { Matcher matcher = TIMESTAMP_PATTERN.matcher(data); if (!matcher.matches()) { throw new RuntimeException("Invalid BSONTimestamp " + data); } int time = Integer.parseInt(matcher.group(1)); int inc = Integer.parseInt(matcher.group(2)); return new BsonTimestamp(time, inc); }
private void prepareCursor(int timestampSeconds, int ordinal, List<OplogOpType> filterOplogTypes, int batchSize) { LOG.debug("Getting new cursor with offset - TimeStampInSeconds:'{}', Ordinal : '{}' and Batch Size : '{}'",timestampSeconds, ordinal, batchSize); FindIterable<Document> mongoCursorIterable = mongoCollection .find() //As the collection is a capped collection we use Tailable cursor which will return results in natural order in this case //based on ts timestamp field. //Tailable Await does not return and blocks, so we are using tailable. .cursorType(CursorType.Tailable) .batchSize(batchSize); List<Bson> andFilters = new ArrayList<>(); //Only filter if we already have saved/initial offset specified or else both time_t and ordinal will not be -1. if (timestampSeconds > 0 && ordinal >= 0) { andFilters.add(Filters.gt(TIMESTAMP_FIELD, new BsonTimestamp(timestampSeconds, ordinal))); } if (!filterOplogTypes.isEmpty()) { List<Bson> oplogOptypeFilters = new ArrayList<>(); Set<OplogOpType> oplogOpTypesSet = new HashSet<>(); for (OplogOpType filterOplogopType : filterOplogTypes) { if (oplogOpTypesSet.add(filterOplogopType)) { oplogOptypeFilters.add(Filters.eq(OP_TYPE_FIELD, filterOplogopType.getOp())); } } //Add an or filter for filtered Or Types andFilters.add(Filters.or(oplogOptypeFilters)); } //Finally and timestamp with oplog filters if (!andFilters.isEmpty()) { mongoCursorIterable = mongoCursorIterable.filter(Filters.and(andFilters)); } cursor = mongoCursorIterable.iterator(); }
private Record getOplogRecord() throws IOException { Document doc = cursor.tryNext(); if (doc != null) { validateOpLogDocument(doc); BsonTimestamp timestamp = (BsonTimestamp) doc.get(TIMESTAMP_FIELD); lastOffsetTsSeconds = timestamp.getTime(); lastOffsetTsOrdinal = timestamp.getInc(); //This does not seem to be always increasing, but is unique, // we are not using it for offset but using it for source record id Long opId = doc.getLong(OP_LONG_HASH_FIELD); Record record = getContext().createRecord( MongoDBSourceUtil.getSourceRecordId( configBean.mongoConfig.connectionString, configBean.mongoConfig.database, configBean.mongoConfig.collection, String.valueOf(opId) + "::" + createOffset() ) ); String ns = doc.getString(NS_FIELD); String opType = doc.getString(OP_TYPE_FIELD); record.getHeader().setAttribute(NS_FIELD, ns); record.getHeader().setAttribute(OP_TYPE_FIELD, opType); //Populate Generic operation type populateGenericOperationTypeInHeader(record, opType); record.set(Field.create(MongoDBSourceUtil.createFieldFromDocument(doc))); return record; } else { LOG.trace("Document from Cursor is null, No More Records"); } return null; }
@Override public void accept(ObjectMapper mapper) { SimpleModule module = new SimpleModule(); addSerializer(module, BsonBoolean.class, (value, gen) -> { gen.writeBoolean(value.getValue()); }); addSerializer(module, BsonDateTime.class, (value, gen) -> { if (Config.USE_TIMESTAMPS) { gen.writeString(DataConverterRegistry.convert(String.class, new Date(value.getValue()))); } else { gen.writeNumber(value.getValue()); } }); addSerializer(module, BsonDouble.class, (value, gen) -> { gen.writeNumber(value.getValue()); }); addSerializer(module, BsonInt32.class, (value, gen) -> { gen.writeNumber(value.getValue()); }); addSerializer(module, BsonInt64.class, (value, gen) -> { gen.writeNumber(value.getValue()); }); addSerializer(module, BsonNull.class, (value, gen) -> { gen.writeNull(); }); addSerializer(module, BsonRegularExpression.class, (value, gen) -> { gen.writeString(value.getPattern()); }); addSerializer(module, BsonString.class, (value, gen) -> { gen.writeString(value.getValue()); }); addSerializer(module, BsonTimestamp.class, (value, gen) -> { if (Config.USE_TIMESTAMPS) { gen.writeString(DataConverterRegistry.convert(String.class, new Date(value.getTime() * 1000L))); } else { gen.writeNumber(value.getTime()); } }); addSerializer(module, BsonUndefined.class, (value, gen) -> { gen.writeNull(); }); addSerializer(module, Binary.class, (value, gen) -> { gen.writeString(BASE64.encode(value.getData())); }); addSerializer(module, Code.class, (value, gen) -> { gen.writeString(value.getCode()); }); addSerializer(module, Decimal128.class, (value, gen) -> { gen.writeNumber(value.bigDecimalValue()); }); addSerializer(module, ObjectId.class, (value, gen) -> { gen.writeString(value.toHexString()); }); addSerializer(module, Symbol.class, (value, gen) -> { gen.writeString(value.getSymbol()); }); mapper.registerModule(module); }
@Override public void accept(ExtensibleRepresenter representer) { addSerializer(representer, BsonBoolean.class, (value) -> { return Boolean.toString(value.getValue()); }); addSerializer(representer, BsonDateTime.class, (value) -> { if (Config.USE_TIMESTAMPS) { return DataConverterRegistry.convert(String.class, new Date(value.getValue())); } return Long.toString(value.getValue()); }); addSerializer(representer, BsonDouble.class, (value) -> { return Double.toString(value.getValue()); }); addSerializer(representer, BsonInt32.class, (value) -> { return Integer.toString(value.getValue()); }); addSerializer(representer, BsonInt64.class, (value) -> { return Long.toString(value.getValue()); }); addSerializer(representer, BsonNull.class, (value) -> { return null; }); addSerializer(representer, BsonRegularExpression.class, (value) -> { return value.getPattern(); }); addSerializer(representer, BsonString.class, (value) -> { return value.getValue(); }); addSerializer(representer, BsonTimestamp.class, (value) -> { if (Config.USE_TIMESTAMPS) { return DataConverterRegistry.convert(String.class, new Date(value.getTime() * 1000L)); } return Integer.toString(value.getTime()); }); addSerializer(representer, BsonUndefined.class, (value) -> { return null; }); addSerializer(representer, Binary.class, (value) -> { return BASE64.encode(value.getData()); }); addSerializer(representer, Code.class, (value) -> { return value.getCode(); }); addSerializer(representer, Decimal128.class, (value) -> { return value.bigDecimalValue().toPlainString(); }); addSerializer(representer, ObjectId.class, (value) -> { return value.toHexString(); }); addSerializer(representer, Symbol.class, (value) -> { return value.getSymbol(); }); }
@Override public void accept(MapperBuilder builder) { addSerializer(builder, BsonBoolean.class, Boolean.class, (value) -> { return value.getValue(); }); if (Config.USE_TIMESTAMPS) { addSerializer(builder, BsonDateTime.class, String.class, (value) -> { return DataConverterRegistry.convert(String.class, new Date(value.getValue())); }); } else { addSerializer(builder, BsonDateTime.class, Long.class, (value) -> { return value.getValue(); }); } addSerializer(builder, BsonDouble.class, Double.class, (value) -> { return value.getValue(); }); addSerializer(builder, BsonInt32.class, Integer.class, (value) -> { return value.getValue(); }); addSerializer(builder, BsonInt64.class, Long.class, (value) -> { return value.getValue(); }); addSerializer(builder, BsonNull.class, Object.class, (value) -> { // Johnzon fails from null values return "null"; }); addSerializer(builder, BsonRegularExpression.class, String.class, (value) -> { return value.getPattern(); }); addSerializer(builder, BsonString.class, String.class, (value) -> { return value.getValue(); }); if (Config.USE_TIMESTAMPS) { addSerializer(builder, BsonTimestamp.class, String.class, (value) -> { return DataConverterRegistry.convert(String.class, new Date(value.getTime() * 1000L)); }); } else { addSerializer(builder, BsonTimestamp.class, Integer.class, (value) -> { return value.getTime(); }); } addSerializer(builder, BsonUndefined.class, String.class, (value) -> { // Johnzon fails from null values return "null"; }); addSerializer(builder, Binary.class, String.class, (value) -> { return BASE64.encode(value.getData()); }); addSerializer(builder, Code.class, String.class, (value) -> { return value.getCode(); }); addSerializer(builder, Decimal128.class, BigDecimal.class, (value) -> { return value.bigDecimalValue(); }); addSerializer(builder, ObjectId.class, String.class, (value) -> { return value.toHexString(); }); addSerializer(builder, Symbol.class, String.class, (value) -> { return value.getSymbol(); }); }
@Override public void accept(GensonBuilder builder) { addSerializer(builder, BsonBoolean.class, (value, writer, ctx) -> { writer.writeBoolean(value.getValue()); }); addSerializer(builder, BsonDateTime.class, (value, writer, ctx) -> { if (Config.USE_TIMESTAMPS) { writer.writeString(DataConverterRegistry.convert(String.class, new Date(value.getValue()))); } else { writer.writeNumber(value.getValue()); } }); addSerializer(builder, BsonDouble.class, (value, writer, ctx) -> { writer.writeNumber(value.getValue()); }); addSerializer(builder, BsonInt32.class, (value, writer, ctx) -> { writer.writeNumber(value.getValue()); }); addSerializer(builder, BsonInt64.class, (value, writer, ctx) -> { writer.writeNumber(value.getValue()); }); addSerializer(builder, BsonNull.class, (value, writer, ctx) -> { writer.writeNull(); }); addSerializer(builder, BsonRegularExpression.class, (value, writer, ctx) -> { writer.writeString(value.getPattern()); }); addSerializer(builder, BsonString.class, (value, writer, ctx) -> { writer.writeString(value.getValue()); }); addSerializer(builder, BsonTimestamp.class, (value, writer, ctx) -> { if (Config.USE_TIMESTAMPS) { writer.writeString(DataConverterRegistry.convert(String.class, new Date(value.getTime() * 1000L))); } else { writer.writeNumber(value.getTime()); } }); addSerializer(builder, BsonUndefined.class, (value, writer, ctx) -> { writer.writeNull(); }); addSerializer(builder, Binary.class, (value, writer, ctx) -> { writer.writeString(BASE64.encode(value.getData())); }); addSerializer(builder, Code.class, (value, writer, ctx) -> { writer.writeString(value.getCode()); }); addSerializer(builder, Decimal128.class, (value, writer, ctx) -> { writer.writeNumber(value.bigDecimalValue()); }); addSerializer(builder, ObjectId.class, (value, writer, ctx) -> { writer.writeString(value.toHexString()); }); addSerializer(builder, Symbol.class, (value, writer, ctx) -> { writer.writeString(value.getSymbol()); }); }
@Override public void accept(DslJson<Object> dslJson) { dslJson.registerWriter(BsonBoolean.class, (writer, value) -> { BoolConverter.serialize(value.getValue(), writer); }); dslJson.registerWriter(BsonDateTime.class, (writer, value) -> { if (Config.USE_TIMESTAMPS) { StringConverter.serialize(DataConverterRegistry.convert(String.class, new Date(value.getValue())), writer); } else { NumberConverter.serialize(value.getValue(), writer); } }); dslJson.registerWriter(BsonDouble.class, (writer, value) -> { NumberConverter.serialize(value.getValue(), writer); }); dslJson.registerWriter(BsonInt32.class, (writer, value) -> { NumberConverter.serialize(value.getValue(), writer); }); dslJson.registerWriter(BsonInt64.class, (writer, value) -> { NumberConverter.serialize(value.getValue(), writer); }); dslJson.registerWriter(BsonNull.class, (writer, value) -> { writer.writeNull(); }); dslJson.registerWriter(BsonRegularExpression.class, (writer, value) -> { StringConverter.serializeNullable(value.getPattern(), writer); }); dslJson.registerWriter(BsonString.class, (writer, value) -> { StringConverter.serializeNullable(value.getValue(), writer); }); dslJson.registerWriter(BsonTimestamp.class, (writer, value) -> { if (Config.USE_TIMESTAMPS) { StringConverter.serialize( DataConverterRegistry.convert(String.class, new Date(value.getTime() * 1000L)), writer); } else { NumberConverter.serialize(value.getTime(), writer); } }); dslJson.registerWriter(BsonUndefined.class, (writer, value) -> { writer.writeNull(); }); dslJson.registerWriter(Binary.class, (writer, value) -> { StringConverter.serialize(BASE64.encode(value.getData()), writer); }); dslJson.registerWriter(Code.class, (writer, value) -> { StringConverter.serializeNullable(value.getCode(), writer); }); dslJson.registerWriter(Decimal128.class, (writer, value) -> { NumberConverter.serialize(value.bigDecimalValue(), writer); }); dslJson.registerWriter(ObjectId.class, (writer, value) -> { StringConverter.serialize(value.toHexString(), writer); }); dslJson.registerWriter(Symbol.class, (writer, value) -> { StringConverter.serializeNullable(value.getSymbol(), writer); }); }
@Override public void accept(MessagePack mapper) { addSerializer(mapper, BsonBoolean.class, (packer, value) -> { packer.write(value.getValue()); }); addSerializer(mapper, BsonDateTime.class, (packer, value) -> { packer.write(value.getValue()); }); addSerializer(mapper, BsonDouble.class, (packer, value) -> { packer.write(value.getValue()); }); addSerializer(mapper, BsonInt32.class, (packer, value) -> { packer.write(value.getValue()); }); addSerializer(mapper, BsonInt64.class, (packer, value) -> { packer.write(value.getValue()); }); addSerializer(mapper, BsonNull.class, (packer, value) -> { packer.writeNil(); }); addSerializer(mapper, BsonRegularExpression.class, (packer, value) -> { packer.write(value.getPattern()); }); addSerializer(mapper, BsonString.class, (packer, value) -> { packer.write(value.getValue()); }); addSerializer(mapper, BsonTimestamp.class, (packer, value) -> { packer.write(value.getTime() * 1000L); }); addSerializer(mapper, BsonUndefined.class, (packer, value) -> { packer.writeNil(); }); addSerializer(mapper, Binary.class, (packer, value) -> { packer.write(BASE64.encode(value.getData())); }); addSerializer(mapper, Code.class, (packer, value) -> { packer.write(value.getCode()); }); addSerializer(mapper, Decimal128.class, (packer, value) -> { packer.write(value.bigDecimalValue()); }); addSerializer(mapper, ObjectId.class, (packer, value) -> { packer.write(value.toHexString()); }); addSerializer(mapper, Symbol.class, (packer, value) -> { packer.write(value.getSymbol()); }); }
@Override public void accept(SerializeConfig config) { addSerializer(config, BsonBoolean.class, (value, serializer) -> { serializer.write(value.getValue()); }); addSerializer(config, BsonDateTime.class, (value, serializer) -> { if (Config.USE_TIMESTAMPS) { serializer.write(DataConverterRegistry.convert(String.class, new Date(value.getValue()))); } else { serializer.write(value.getValue()); } }); addSerializer(config, BsonDouble.class, (value, serializer) -> { serializer.write(value.getValue()); }); addSerializer(config, BsonInt32.class, (value, serializer) -> { serializer.write(value.getValue()); }); addSerializer(config, BsonInt64.class, (value, serializer) -> { serializer.write(value.getValue()); }); addSerializer(config, BsonNull.class, (value, serializer) -> { serializer.writeNull(); }); addSerializer(config, BsonRegularExpression.class, (value, serializer) -> { serializer.write(value.getPattern()); }); addSerializer(config, BsonString.class, (value, serializer) -> { serializer.write(value.getValue()); }); addSerializer(config, BsonTimestamp.class, (value, serializer) -> { if (Config.USE_TIMESTAMPS) { serializer.write(DataConverterRegistry.convert(String.class, new Date(value.getTime() * 1000L))); } else { serializer.write(value.getTime()); } }); addSerializer(config, BsonUndefined.class, (value, serializer) -> { serializer.writeNull(); }); addSerializer(config, Binary.class, (value, serializer) -> { serializer.write(BASE64.encode(value.getData())); }); addSerializer(config, Code.class, (value, serializer) -> { serializer.write(value.getCode()); }); addSerializer(config, Decimal128.class, (value, serializer) -> { serializer.write(value.bigDecimalValue()); }); addSerializer(config, ObjectId.class, (value, serializer) -> { serializer.write(value.toHexString()); }); addSerializer(config, Symbol.class, (value, serializer) -> { serializer.write(value.getSymbol()); }); }
@Override public void accept(JsonSerializer serializer) { addSerializer(serializer, BsonBoolean.class, (value, ctx) -> { ctx.write(Boolean.toString(value.getValue())); }); addSerializer(serializer, BsonDateTime.class, (value, ctx) -> { if (Config.USE_TIMESTAMPS) { ctx.writeString(DataConverterRegistry.convert(String.class, new Date(value.getValue()))); } else { ctx.writeNumber(value.getValue()); } }); addSerializer(serializer, BsonDouble.class, (value, ctx) -> { ctx.writeNumber(value.getValue()); }); addSerializer(serializer, BsonInt32.class, (value, ctx) -> { ctx.writeNumber(value.getValue()); }); addSerializer(serializer, BsonInt64.class, (value, ctx) -> { ctx.writeNumber(value.getValue()); }); addSerializer(serializer, BsonNull.class, (value, ctx) -> { ctx.write("null"); }); addSerializer(serializer, BsonRegularExpression.class, (value, ctx) -> { ctx.writeString(value.getPattern()); }); addSerializer(serializer, BsonString.class, (value, ctx) -> { ctx.writeString(value.getValue()); }); addSerializer(serializer, BsonTimestamp.class, (value, ctx) -> { if (Config.USE_TIMESTAMPS) { ctx.writeString( DataConverterRegistry.convert(String.class, new Date(value.getTime() * 1000L))); } else { ctx.writeNumber(value.getTime()); } }); addSerializer(serializer, BsonUndefined.class, (value, ctx) -> { ctx.write("null"); }); addSerializer(serializer, Binary.class, (value, ctx) -> { ctx.writeString(BASE64.encode(value.getData())); }); addSerializer(serializer, Code.class, (value, ctx) -> { ctx.writeString(value.getCode()); }); addSerializer(serializer, Decimal128.class, (value, ctx) -> { ctx.writeNumber(value.bigDecimalValue()); }); addSerializer(serializer, ObjectId.class, (value, ctx) -> { ctx.writeString(value.toHexString()); }); addSerializer(serializer, Symbol.class, (value, ctx) -> { ctx.writeString(value.getSymbol()); }); }
@Override public void accept(TypeTransformerMap map) { addSerializer(map, BsonBoolean.class, (value, ctx) -> { ctx.write(Boolean.toString(value.getValue())); }); addSerializer(map, BsonDateTime.class, (value, ctx) -> { if (Config.USE_TIMESTAMPS) { ctx.writeQuoted(DataConverterRegistry.convert(String.class, new Date(value.getValue()))); } else { ctx.write(Long.toString(value.getValue())); } }); addSerializer(map, BsonDouble.class, (value, ctx) -> { ctx.write(Double.toString(value.getValue())); }); addSerializer(map, BsonInt32.class, (value, ctx) -> { ctx.write(Integer.toString(value.getValue())); }); addSerializer(map, BsonInt64.class, (value, ctx) -> { ctx.write(Long.toString(value.getValue())); }); addSerializer(map, BsonNull.class, (value, ctx) -> { ctx.write("null"); }); addSerializer(map, BsonRegularExpression.class, (value, ctx) -> { ctx.writeQuoted(value.getPattern()); }); addSerializer(map, BsonString.class, (value, ctx) -> { ctx.writeQuoted(value.getValue()); }); addSerializer(map, BsonTimestamp.class, (value, ctx) -> { if (Config.USE_TIMESTAMPS) { ctx.writeQuoted(DataConverterRegistry.convert(String.class, new Date(value.getTime() * 1000L))); } else { ctx.write(Integer.toString(value.getTime())); } }); addSerializer(map, BsonUndefined.class, (value, ctx) -> { ctx.write("null"); }); addSerializer(map, Binary.class, (value, ctx) -> { ctx.writeQuoted(BASE64.encode(value.getData())); }); addSerializer(map, Code.class, (value, ctx) -> { ctx.writeQuoted(value.getCode()); }); addSerializer(map, Decimal128.class, (value, ctx) -> { ctx.write(value.bigDecimalValue().toPlainString()); }); addSerializer(map, ObjectId.class, (value, ctx) -> { ctx.writeQuoted(value.toHexString()); }); addSerializer(map, Symbol.class, (value, ctx) -> { ctx.writeQuoted(value.getSymbol()); }); }
@Override public void accept(HashMap<Class<?>, Function<Object, Object>> converters) { addSerializer(converters, BsonBoolean.class, (value) -> { return value.getValue(); }); addSerializer(converters, BsonDateTime.class, (value) -> { return new Date(value.getValue()); }); addSerializer(converters, BsonDouble.class, (value) -> { return value.getValue(); }); addSerializer(converters, BsonInt32.class, (value) -> { return value.getValue(); }); addSerializer(converters, BsonInt64.class, (value) -> { return value.getValue(); }); addSerializer(converters, BsonNull.class, (value) -> { return null; }); addSerializer(converters, BsonRegularExpression.class, (value) -> { return value.getPattern(); }); addSerializer(converters, BsonString.class, (value) -> { return value.getValue(); }); addSerializer(converters, BsonTimestamp.class, (value) -> { return new Date(value.getTime() * 1000L); }); addSerializer(converters, BsonUndefined.class, (value) -> { return null; }); addSerializer(converters, Binary.class, (value) -> { return value.getData(); }); addSerializer(converters, Code.class, (value) -> { return value.getCode(); }); addSerializer(converters, Decimal128.class, (value) -> { return value.bigDecimalValue(); }); addSerializer(converters, ObjectId.class, (value) -> { return value.toHexString(); }); addSerializer(converters, Symbol.class, (value) -> { return value.getSymbol(); }); }
@Override public void accept(GsonBuilder builder) { addSerializer(builder, BsonBoolean.class, (value) -> { return new JsonPrimitive(value.getValue()); }); addSerializer(builder, BsonDateTime.class, (value) -> { if (Config.USE_TIMESTAMPS) { return new JsonPrimitive(DataConverterRegistry.convert(String.class, new Date(value.getValue()))); } else { return new JsonPrimitive(value.getValue()); } }); addSerializer(builder, BsonDouble.class, (value) -> { return new JsonPrimitive(value.getValue()); }); addSerializer(builder, BsonInt32.class, (value) -> { return new JsonPrimitive(value.getValue()); }); addSerializer(builder, BsonInt64.class, (value) -> { return new JsonPrimitive(value.getValue()); }); addSerializer(builder, BsonNull.class, (value) -> { return JsonNull.INSTANCE; }); addSerializer(builder, BsonRegularExpression.class, (value) -> { return new JsonPrimitive(value.getPattern()); }); addSerializer(builder, BsonString.class, (value) -> { return new JsonPrimitive(value.getValue()); }); addSerializer(builder, BsonTimestamp.class, (value) -> { if (Config.USE_TIMESTAMPS) { return new JsonPrimitive( DataConverterRegistry.convert(String.class, new Date(value.getTime() * 1000L))); } else { return new JsonPrimitive(value.getTime()); } }); addSerializer(builder, BsonUndefined.class, (value) -> { return JsonNull.INSTANCE; }); addSerializer(builder, Binary.class, (value) -> { return new JsonPrimitive(BASE64.encode(value.getData())); }); addSerializer(builder, Code.class, (value) -> { return new JsonPrimitive(value.getCode()); }); addSerializer(builder, Decimal128.class, (value) -> { return new JsonPrimitive(value.bigDecimalValue()); }); addSerializer(builder, ObjectId.class, (value) -> { return new JsonPrimitive(value.toHexString()); }); addSerializer(builder, Symbol.class, (value) -> { return new JsonPrimitive(value.getSymbol()); }); }
@Override public void accept(XStream mapper) { addSerializer(mapper, BsonBoolean.class, (value) -> { return Boolean.toString(value.getValue()); }); addSerializer(mapper, BsonDateTime.class, (value) -> { if (Config.USE_TIMESTAMPS) { return DataConverterRegistry.convert(String.class, new Date(value.getValue())); } return Long.toString(value.getValue()); }); addSerializer(mapper, BsonDouble.class, (value) -> { return Double.toString(value.getValue()); }); addSerializer(mapper, BsonInt32.class, (value) -> { return Integer.toString(value.getValue()); }); addSerializer(mapper, BsonInt64.class, (value) -> { return Long.toString(value.getValue()); }); addSerializer(mapper, BsonNull.class, (value) -> { return "null"; }); addSerializer(mapper, BsonRegularExpression.class, (value) -> { return value.getPattern(); }); addSerializer(mapper, BsonString.class, (value) -> { return value.getValue(); }); addSerializer(mapper, BsonTimestamp.class, (value) -> { if (Config.USE_TIMESTAMPS) { return DataConverterRegistry.convert(String.class, new Date(value.getTime() * 1000L)); } return Integer.toString(value.getTime()); }); addSerializer(mapper, BsonUndefined.class, (value) -> { return "null"; }); addSerializer(mapper, Binary.class, (value) -> { return BASE64.encode(value.getData()); }); addSerializer(mapper, Code.class, (value) -> { return value.getCode(); }); addSerializer(mapper, Decimal128.class, (value) -> { return value.bigDecimalValue().toPlainString(); }); addSerializer(mapper, ObjectId.class, (value) -> { return value.toHexString(); }); addSerializer(mapper, Symbol.class, (value) -> { return value.getSymbol(); }); }
@Test public void testMongoTypes() throws Exception { // JSON-Simple and JsonUtil aren't extendable APIs String writerClass = TreeWriterRegistry.getWriter(TreeWriterRegistry.JSON).getClass().toString(); boolean unsupportedAPI = writerClass.contains("Simple") || writerClass.contains("JsonUtil"); if (unsupportedAPI) { return; } Document doc = new Document(); doc.put("BsonBoolean", new BsonBoolean(true)); long time = System.currentTimeMillis(); doc.put("BsonDateTime", new BsonDateTime(time)); doc.put("BsonDouble", new BsonDouble(123.456)); doc.put("BsonInt32", new BsonInt32(123)); doc.put("BsonInt64", new BsonInt64(123456)); doc.put("BsonNull", new BsonNull()); doc.put("BsonRegularExpression", new BsonRegularExpression("abc")); doc.put("BsonString", new BsonString("abcdefgh")); doc.put("BsonTimestamp", new BsonTimestamp(12, 23)); doc.put("BsonUndefined", new BsonUndefined()); doc.put("Binary", new Binary("abcdefgh".getBytes())); doc.put("Code", new Code("var a = 5;")); doc.put("Decimal128", new Decimal128(123456789)); ObjectId objectID = new ObjectId(); doc.put("ObjectId", objectID); doc.put("Symbol", new Symbol("s")); Tree t = new Tree(doc, null); String json = t.toString(); System.out.println("-------------------- BSON --------------------"); System.out.println("Output of " + writerClass + " serializer (MongoDB types):"); System.out.println(json); t = new Tree(json); assertTrue(t.get("BsonBoolean", false)); Date date = t.get("BsonDateTime", new Date()); assertEquals(time / 1000L, date.getTime() / 1000L); assertEquals(123.456, t.get("BsonDouble", 1d)); assertEquals(123, t.get("BsonInt32", 1)); assertEquals(123456L, t.get("BsonInt64", 1L)); assertNull(t.get("BsonNull", "?")); assertEquals("abc", t.get("BsonRegularExpression", "?")); assertEquals("abcdefgh", t.get("BsonString", "?")); // String or Number date = t.get("BsonTimestamp", new Date()); assertEquals(12000L, date.getTime()); assertNull(t.get("BsonUndefined", "?")); assertEquals("abcdefgh", new String(t.get("Binary", "?".getBytes()))); assertEquals("var a = 5;", t.get("Code", "?")); assertEquals(123456789L, t.get("Decimal128", 1L)); assertEquals(objectID.toHexString(), t.get("ObjectId", "?")); assertEquals("s", t.get("Symbol", "?")); }
public void updateLastReadTime(ObjectId eventId, BsonTimestamp lastReadTime) { syncEvents.findOneAndUpdate(Filters.eq(SyncAttrs.ID, eventId), Updates.set(SyncAttrs.LAST_READ_TIME, lastReadTime)); }
public BsonTimestamp getLastReadTime() { return lastReadTime; }
public void setLastReadTime(BsonTimestamp lastReadTime) { this.lastReadTime = lastReadTime; }
@Test public void testMongoTypes() throws Exception { Document doc = new Document(); doc.put("BsonBoolean", new BsonBoolean(true)); doc.put("BsonDateTime", new BsonDateTime(12345)); doc.put("BsonDouble", new BsonDouble(123.456)); doc.put("BsonInt32", new BsonInt32(123)); doc.put("BsonInt64", new BsonInt64(123456)); doc.put("BsonNull", new BsonNull()); doc.put("BsonRegularExpression", new BsonRegularExpression("abc")); doc.put("BsonString", new BsonString("abcdefgh")); doc.put("BsonTimestamp", new BsonTimestamp(12, 23)); doc.put("BsonUndefined", new BsonUndefined()); doc.put("Binary", new Binary("abcdefgh".getBytes())); doc.put("Code", new Code("var a = 5;")); doc.put("Decimal128", new Decimal128(123456789)); doc.put("ObjectId", new ObjectId()); doc.put("Symbol", new Symbol("s")); Document map = new Document(); map.put("a", "b"); map.put("c", 5); doc.put("map", map); ArrayList<Object> list = new ArrayList<>(); list.add("c"); list.add("b"); list.add("a"); doc.put("list", list); Tree t = new Tree(doc, null); String json = t.toString(); String writerClass = TreeWriterRegistry.getWriter(TreeWriterRegistry.JSON).getClass().toString(); System.out.println("--------------------------------------------------------------"); System.out.println("Output of " + writerClass + " serializer:"); System.out.println(json); Tree t2 = new Tree(json); assertEquals(true, t2.get("BsonBoolean", false)); // assertEquals(12345, t2.get("BsonDateTime", -1)); assertEquals(123.456, t2.get("BsonDouble", 1d)); assertEquals(123, t2.get("BsonInt32", 345)); assertEquals(123456, t2.get("BsonInt64", 1)); assertNull(t2.get("BsonNull", "X")); assertEquals("abc", t2.get("BsonRegularExpression", "xcf")); assertEquals("abcdefgh", t2.get("BsonString", "fsdfasdf")); // doc.put("BsonTimestamp", new BsonTimestamp(12, 23)); // doc.put("BsonUndefined", new BsonUndefined()); // doc.put("Binary", new Binary("abcdefgh".getBytes())); // doc.put("Code", new Code("var a = 5;")); // doc.put("Decimal128", new Decimal128(123456789)); // doc.put("ObjectId", new ObjectId()); // doc.put("Symbol", new Symbol("s")); }
@Override public BsonTimestamp decode(BsonValue bsonValue) { return bsonValue.asTimestamp(); }
@Override public BsonTimestamp encode(BsonTimestamp object) { return object; }
@Override public BsonTimestamp decode(BsonReader bsonReader) { return bsonReader.readTimestamp(); }
@Override public void encode(BsonWriter bsonWriter, BsonTimestamp value) { bsonWriter.writeTimestamp(value); }
private Bson getTimeQuery(BsonTimestamp lastTimeStamp) { return lastTimeStamp == null ? new Document() : gt(OPLOG_TIMESTAMP, lastTimeStamp); }
private void putOperationOnOpsQueue(Entry<String, FindPublisher<Document>> publisher, MongoCollection<Document> tsCollection, Document t) throws InterruptedException { updateHostOperationTimeStamp(tsCollection, t.get(OPLOG_TIMESTAMP, BsonTimestamp.class), publisher.getKey()); putOperationOnOpsQueueIfFullyReplicated(t); }