private ObjectNode fetchObject(EntityKey key, TupleContext tupleContext) { LightblueEntityMetadataId entityId = LightblueEntityMetadataId.extractEntityInfo(key.getMetadata()); DataFindRequest request = new DataFindRequest(entityId.entityName, entityId.entityVersion); request.where(new ValueQuery("_id", ExpressionOperation.EQUALS, key.getColumnValues()[0].toString())); List<Projection> projections = new ArrayList<Projection>(Arrays.asList(projectionsFromColumns(tupleContext.getSelectableColumns()))); projections.add(new FieldProjection("_id", true, true)); request.select(projections); try { LightblueResponse response = provider.getLightblueClient().data(request); if (response.hasError()) { throw new RuntimeException("Error returned in response: " + response.getText()); } return (ObjectNode) response.getJson().get("processed").get(0); } catch (LightblueException e) { throw new RuntimeException("Unable to communicate with lightblue.", e); } }
@Override public void insertOrUpdateTuple(EntityKey key, TuplePointer tuplePointer, TupleContext tupleContext) throws TupleAlreadyExistsException { IgniteCache<Object, BinaryObject> entityCache = provider.getEntityCache( key.getMetadata() ); Tuple tuple = tuplePointer.getTuple(); Object keyObject = null; BinaryObjectBuilder builder = null; IgniteTupleSnapshot tupleSnapshot = (IgniteTupleSnapshot) tuple.getSnapshot(); keyObject = tupleSnapshot.getCacheKey(); if ( tuple.getSnapshotType() == SnapshotType.UPDATE ) { builder = provider.createBinaryObjectBuilder( tupleSnapshot.getCacheValue() ); } else { builder = provider.createBinaryObjectBuilder( provider.getEntityTypeName( key.getMetadata().getTable() ) ); } for ( String columnName : tuple.getColumnNames() ) { Object value = tuple.get( columnName ); if ( value != null ) { builder.setField( StringHelper.realColumnName( columnName ), value ); } else { builder.removeField( StringHelper.realColumnName( columnName ) ); } } BinaryObject valueObject = builder.build(); entityCache.put( keyObject, valueObject ); tuplePointer.setTuple( new Tuple( new IgniteTupleSnapshot( keyObject, valueObject, key.getMetadata() ), SnapshotType.UPDATE ) ); }
@Override public Tuple getTuple(EntityKey key, TupleContext tupleContext) { ObjectNode object = fetchObject(key, tupleContext); if (object != null) { return new Tuple(new LightblueTupleSnapshot(object, key.getMetadata(), OperationType.UPDATE)); } if (isInQueue(key, tupleContext)) { return createTuple(key, tupleContext); } return null; }
@Override public void removeTuple(EntityKey key, TupleContext tupleContext) { LightblueEntityMetadataId entityId = LightblueEntityMetadataId.extractEntityInfo(key.getMetadata()); DataDeleteRequest request = new DataDeleteRequest(entityId.entityName, entityId.entityVersion); request.where(new ValueQuery("_id", ExpressionOperation.EQUALS, key.getColumnValues()[0].toString())); try { LightblueResponse data = provider.getLightblueClient().data(request); System.out.println(data.getJson()); } catch (Exception e) { throw new RuntimeException(e); } }
@Override public void removeTuple(EntityKey key, TupleContext tupleContext) { remove( key ); }
@Override public void removeTuple(EntityKey key, TupleContext tupleContext) { IgniteCache<Object, BinaryObject> entityCache = provider.getEntityCache( key.getMetadata() ); entityCache.remove( provider.createKeyObject( key ) ); }
@Override public int executeBackendUpdateQuery(BackendQuery<IgniteQueryDescriptor> query, QueryParameters queryParameters, TupleContext tupleContext) { throw new UnsupportedOperationException( "executeBackendUpdateQuery() is not implemented" ); }
private boolean isInQueue(EntityKey key, TupleContext tupleContext) { return tupleContext.getOperationsQueue() != null && tupleContext.getOperationsQueue().contains(key); }
@Override public Tuple createTuple(EntityKey key, TupleContext tupleContext) { return new Tuple(new LightblueTupleSnapshot(mapper.createObjectNode(), key.getMetadata(), OperationType.INSERT)); }