@Override public Response getRecordById(String keyspaceName, String tableName, String identifier) { long startTime = System.currentTimeMillis(); ProjectLogger.log("Cassandra Service getRecordById method started at ==" + startTime, LoggerEnum.PERF_LOG); Response response = new Response(); try { Select selectQuery = QueryBuilder.select().all().from(keyspaceName, tableName); Where selectWhere = selectQuery.where(); Clause clause = QueryBuilder.eq(Constants.IDENTIFIER, identifier); selectWhere.and(clause); ResultSet results = connectionManager.getSession(keyspaceName).execute(selectQuery); response = CassandraUtil.createResponse(results); } catch (Exception e) { ProjectLogger.log(Constants.EXCEPTION_MSG_FETCH + tableName + " : " + e.getMessage(), e); throw new ProjectCommonException(ResponseCode.SERVER_ERROR.getErrorCode(), ResponseCode.SERVER_ERROR.getErrorMessage(), ResponseCode.SERVER_ERROR.getResponseCode()); } long stopTime = System.currentTimeMillis(); long elapsedTime = stopTime - startTime; ProjectLogger.log("Cassandra Service getRecordById method end at ==" + stopTime + " ,Total time elapsed = " + elapsedTime, LoggerEnum.PERF_LOG); return response; }
@Override public Response getRecordsByProperty(String keyspaceName, String tableName, String propertyName, List<Object> propertyValueList) { long startTime = System.currentTimeMillis(); ProjectLogger.log("Cassandra Service getRecordsByProperty method started at ==" + startTime, LoggerEnum.PERF_LOG); Response response = new Response(); try { Select selectQuery = QueryBuilder.select().all().from(keyspaceName, tableName); Where selectWhere = selectQuery.where(); Clause clause = QueryBuilder.in(propertyName, propertyValueList); selectWhere.and(clause); ResultSet results = connectionManager.getSession(keyspaceName).execute(selectQuery); response = CassandraUtil.createResponse(results); } catch (Exception e) { ProjectLogger.log(Constants.EXCEPTION_MSG_FETCH + tableName + " : " + e.getMessage(), e); throw new ProjectCommonException(ResponseCode.SERVER_ERROR.getErrorCode(), ResponseCode.SERVER_ERROR.getErrorMessage(), ResponseCode.SERVER_ERROR.getResponseCode()); } long stopTime = System.currentTimeMillis(); long elapsedTime = stopTime - startTime; ProjectLogger.log("Cassandra Service getRecordsByProperty method end at ==" + stopTime + " ,Total time elapsed = " + elapsedTime, LoggerEnum.PERF_LOG); return response; }
@Override public List<E> queryByOpId(String opId) throws EventStoreException { Select select = QueryBuilder.select(CassandraEventRecorder.ENTITY_ID).from(tableName); select.where(QueryBuilder.eq(CassandraEventRecorder.OP_ID, opId)); List<Row> entityEventDatas = cassandraSession.execute(select, PagingIterable::all); Map<String, E> resultList = new HashMap<>(); for (Row entityEvent : entityEventDatas) { String entityId = entityEvent.getString(CassandraEventRecorder.ENTITY_ID); if (!resultList.containsKey(entityId)) { E value = queryEntity(entityId); if (value != null) resultList.put(entityId, value); } } return new ArrayList<>(resultList.values()); }
@Override public Response getAllRecords(String keyspaceName, String tableName) { long startTime = System.currentTimeMillis(); ProjectLogger.log("Cassandra Service getAllRecords method started at ==" + startTime, LoggerEnum.PERF_LOG); Response response = new Response(); try { Select selectQuery = QueryBuilder.select().all().from(keyspaceName, tableName); ResultSet results = connectionManager.getSession(keyspaceName).execute(selectQuery); response = CassandraUtil.createResponse(results); } catch (Exception e) { ProjectLogger.log(Constants.EXCEPTION_MSG_FETCH + tableName + " : " + e.getMessage(), e); throw new ProjectCommonException(ResponseCode.SERVER_ERROR.getErrorCode(), ResponseCode.SERVER_ERROR.getErrorMessage(), ResponseCode.SERVER_ERROR.getResponseCode()); } long stopTime = System.currentTimeMillis(); long elapsedTime = stopTime - startTime; ProjectLogger.log("Cassandra Service getAllRecords method end at ==" + stopTime + " ,Total time elapsed = " + elapsedTime, LoggerEnum.PERF_LOG); return response; }
@Override public ListenableFuture<List<EntityRelation>> findRelations(EntityId from, String relationType, RelationTypeGroup typeGroup, ThingType childType, TimePageLink pageLink) { Select.Where query = CassandraAbstractSearchTimeDao.buildQuery(ModelConstants.RELATION_BY_TYPE_AND_CHILD_TYPE_VIEW_NAME, Arrays.asList(eq(ModelConstants.RELATION_FROM_ID_PROPERTY, from.getId()), eq(ModelConstants.RELATION_FROM_TYPE_PROPERTY, from.getEntityType().name()), eq(ModelConstants.RELATION_TYPE_GROUP_PROPERTY, typeGroup.name()), eq(ModelConstants.RELATION_TYPE_PROPERTY, relationType), eq(ModelConstants.RELATION_TO_TYPE_PROPERTY, childType.name())), Arrays.asList( pageLink.isAscOrder() ? QueryBuilder.desc(ModelConstants.RELATION_TYPE_GROUP_PROPERTY) : QueryBuilder.asc(ModelConstants.RELATION_TYPE_GROUP_PROPERTY), pageLink.isAscOrder() ? QueryBuilder.desc(ModelConstants.RELATION_TYPE_PROPERTY) : QueryBuilder.asc(ModelConstants.RELATION_TYPE_PROPERTY), pageLink.isAscOrder() ? QueryBuilder.desc(ModelConstants.RELATION_TO_TYPE_PROPERTY) : QueryBuilder.asc(ModelConstants.RELATION_TO_TYPE_PROPERTY) ), pageLink, ModelConstants.RELATION_TO_ID_PROPERTY); return getFuture(executeAsyncRead(query), this::getEntityRelations); }
@Override public Event findEvent(UUID tenantId, EntityId entityId, String eventType, String eventUid) { log.debug("Search event entity by [{}][{}][{}][{}]", tenantId, entityId, eventType, eventUid); Select.Where query = select().from(getColumnFamilyName()).where( eq(ModelConstants.EVENT_TENANT_ID_PROPERTY, tenantId)) .and(eq(ModelConstants.EVENT_ENTITY_TYPE_PROPERTY, entityId.getEntityType())) .and(eq(ModelConstants.EVENT_ENTITY_ID_PROPERTY, entityId.getId())) .and(eq(ModelConstants.EVENT_TYPE_PROPERTY, eventType)) .and(eq(ModelConstants.EVENT_UID_PROPERTY, eventUid)); log.trace("Execute query [{}]", query); EventEntity entity = findOneByStatement(query); if (log.isTraceEnabled()) { log.trace("Search result: [{}] for event entity [{}]", entity != null, entity); } else { log.debug("Search result: [{}]", entity != null); } return DaoUtil.getData(entity); }
@Override protected void doHealthCheck(Health.Builder builder) throws Exception { try { Select select = QueryBuilder.select("release_version").from("system", "local"); ResultSet results = this.cassandraOperations.query(select); if (results.isExhausted()) { builder.up(); return; } String version = results.one().getString(0); builder.up().withDetail("version", version); } catch (Exception ex) { builder.down(ex); } }
@Test public void testInsert() throws InterruptedException { Book book = new Book(); book.setIsbn(UUIDs.timeBased()); book.setTitle("Spring Integration Cassandra"); book.setAuthor("Cassandra Guru"); book.setPages(521); book.setSaleDate(new Date()); book.setInStock(true); this.sink.input().send(new GenericMessage<>(book)); final Select select = QueryBuilder.select().all().from("book"); assertEqualsEventually(1, new Supplier<Integer>() { @Override public Integer get() { return cassandraTemplate.select(select, Book.class).size(); } }); this.cassandraTemplate.delete(book); }
@Test public void testIngestQuery() throws Exception { List<Book> books = getBookList(5); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false); Jackson2JsonObjectMapper mapper = new Jackson2JsonObjectMapper(objectMapper); this.sink.input().send(new GenericMessage<>(mapper.toJson(books))); final Select select = QueryBuilder.select().all().from("book"); assertEqualsEventually(5, new Supplier<Integer>() { @Override public Integer get() { return cassandraTemplate.select(select, Book.class).size(); } }); this.cassandraTemplate.truncate("book"); }
@Override public ListenableFuture<List<EntityRelation>> findRelations(EntityId from, String relationType, RelationTypeGroup typeGroup, EntityType childType, TimePageLink pageLink) { Select.Where query = CassandraAbstractSearchTimeDao.buildQuery(ModelConstants.RELATION_BY_TYPE_AND_CHILD_TYPE_VIEW_NAME, Arrays.asList(eq(ModelConstants.RELATION_FROM_ID_PROPERTY, from.getId()), eq(ModelConstants.RELATION_FROM_TYPE_PROPERTY, from.getEntityType().name()), eq(ModelConstants.RELATION_TYPE_GROUP_PROPERTY, typeGroup.name()), eq(ModelConstants.RELATION_TYPE_PROPERTY, relationType), eq(ModelConstants.RELATION_TO_TYPE_PROPERTY, childType.name())), Arrays.asList( pageLink.isAscOrder() ? QueryBuilder.desc(ModelConstants.RELATION_TYPE_GROUP_PROPERTY) : QueryBuilder.asc(ModelConstants.RELATION_TYPE_GROUP_PROPERTY), pageLink.isAscOrder() ? QueryBuilder.desc(ModelConstants.RELATION_TYPE_PROPERTY) : QueryBuilder.asc(ModelConstants.RELATION_TYPE_PROPERTY), pageLink.isAscOrder() ? QueryBuilder.desc(ModelConstants.RELATION_TO_TYPE_PROPERTY) : QueryBuilder.asc(ModelConstants.RELATION_TO_TYPE_PROPERTY) ), pageLink, ModelConstants.RELATION_TO_ID_PROPERTY); return getFuture(executeAsyncRead(query), this::getEntityRelations); }
@Override public ListenableFuture<List<EntitySubtype>> findTenantDeviceTypesAsync(UUID tenantId) { Select select = select().from(ENTITY_SUBTYPE_COLUMN_FAMILY_NAME); Select.Where query = select.where(); query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId)); query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.DEVICE)); query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); ResultSetFuture resultSetFuture = getSession().executeAsync(query); return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() { @Nullable @Override public List<EntitySubtype> apply(@Nullable ResultSet resultSet) { Result<EntitySubtypeEntity> result = cluster.getMapper(EntitySubtypeEntity.class).map(resultSet); if (result != null) { List<EntitySubtype> entitySubtypes = new ArrayList<>(); result.all().forEach((entitySubtypeEntity) -> entitySubtypes.add(entitySubtypeEntity.toEntitySubtype()) ); return entitySubtypes; } else { return Collections.emptyList(); } } }); }
@Override public ListenableFuture<List<EntitySubtype>> findTenantAssetTypesAsync(UUID tenantId) { Select select = select().from(ENTITY_SUBTYPE_COLUMN_FAMILY_NAME); Select.Where query = select.where(); query.and(eq(ENTITY_SUBTYPE_TENANT_ID_PROPERTY, tenantId)); query.and(eq(ENTITY_SUBTYPE_ENTITY_TYPE_PROPERTY, EntityType.ASSET)); query.setConsistencyLevel(cluster.getDefaultReadConsistencyLevel()); ResultSetFuture resultSetFuture = getSession().executeAsync(query); return Futures.transform(resultSetFuture, new Function<ResultSet, List<EntitySubtype>>() { @Nullable @Override public List<EntitySubtype> apply(@Nullable ResultSet resultSet) { Result<EntitySubtypeEntity> result = cluster.getMapper(EntitySubtypeEntity.class).map(resultSet); if (result != null) { List<EntitySubtype> entitySubtypes = new ArrayList<>(); result.all().forEach((entitySubtypeEntity) -> entitySubtypes.add(entitySubtypeEntity.toEntitySubtype()) ); return entitySubtypes; } else { return Collections.emptyList(); } } }); }
protected Optional<Row> singleEvent(String analyseId, String jvmId, String bucketId, DateTime start, String[] fields) { List<String> dates = dates(Range.of(start.toDateTime(DateTimeZone.UTC), DateTime.now(DateTimeZone.UTC))); for (String date : dates) { Select from = QueryBuilder.select(fields).from(Strings.isNullOrEmpty(bucketId) ? TABLE_NAME : BUCKET_TABLE_NAME); Select.Where statement = from.limit(1) .where(eq("analyse_id", UUID.fromString(analyseId))) .and(eq("jvm_id", jvmId)) .and(eq("date", date)); if (!Strings.isNullOrEmpty(bucketId)) { statement = statement.and(eq("bucket_id", bucketId)); } List<Row> rows = connector.session().execute(statement).all(); if (rows.size() > 0) { return Optional.of(rows.get(0)); } } return Optional.empty(); }
public boolean schemaExists() throws StoreException { if (session != null) { boolean exists = false; Select select = QueryBuilder.select().column("keyspace_name").from("system.schema_keyspaces"); ResultSet results = session.execute(select); for (Row row : results) { String name = row.getString("keyspace_name"); if (KEYSPACE_NAME.equals(name)) { exists = true; break; } } return exists; } else { throw new StoreException("Keyspaces not available; store not open"); } }
@Override protected void doHealthCheck(Health.Builder builder) throws Exception { try { Select select = QueryBuilder.select("release_version").from("system", "local"); ResultSet results = this.cassandraAdminOperations.query(select); if (results.isExhausted()) { builder.up(); return; } String version = results.one().getString(0); builder.up().withDetail("version", version); } catch (Exception ex) { builder.down(ex); } }
public boolean findIndexRow(String table, String rowKey, byte[] key, Object indValue) { Select selectQuery = QueryBuilder.select().all().from(keys, table).allowFiltering(); Where selectWhere = selectQuery.where(); Clause rkClause = QueryBuilder.eq("id", rowKey); selectWhere.and(rkClause); Clause indClause = null; if (indValue != null) { indClause = QueryBuilder.eq("colname", indValue); } else { if (table.equalsIgnoreCase("IntegerIndice")) { indClause = QueryBuilder.eq("colname", ByteBuffer.wrap(new byte[0])); } else { indClause = QueryBuilder.eq("colname", ""); } } selectWhere.and(indClause); Clause keyClause = QueryBuilder.eq("colvalue", ByteBuffer.wrap(key)); selectWhere.and(keyClause); Query query = selectWhere.limit(1); ResultSet resultSet = session.execute(query); return !resultSet.isExhausted(); }
public static Where createRowQueryFromValues(List<byte[]> values, DboColumnMeta colMeta, Select selectQuery, String rowKey) { Where selectWhere = selectQuery.where(); Clause rkClause = QueryBuilder.eq("id", rowKey); selectWhere.and(rkClause); Object[] valStrings = new Object[values.size()]; int count = 0; for (byte[] value : values) { valStrings[count] = StandardConverters.convertFromBytes(String.class, value); count++; } Clause inClause = QueryBuilder.in("colname", valStrings); selectWhere.and(inClause); return selectWhere; }
public List<Row> get() { Select.Where where = QueryBuilder.select().from(parent.getKeyspace(), parent.getTable()).where(); for (Clause clause : clauses) { where.and(clause); } if (searchBuilder != null) { where.and(QueryBuilder.eq(parent.getIndexColumn(), searchBuilder.refresh(refresh).toJson())); } BuiltStatement statement = limit == null ? where : where.limit(limit); String query = statement.toString(); query = query.substring(0, query.length() - 1); StringBuilder sb = new StringBuilder(query); for (String extra : extras) { sb.append(" "); sb.append(extra); sb.append(" "); } return parent.execute(sb, fetchSize); }
@Inject public CassandraSearcher(CassandraSession session, MetricRegistry registry, ContextConfigurations contextConfigurations) { m_session = checkNotNull(session, "session argument"); m_searchTimer = registry.timer(name("search", "search")); m_contextConfigurations = checkNotNull(contextConfigurations, "contextConfigurations argument"); Select select = QueryBuilder.select(Schema.C_TERMS_RESOURCE).from(Schema.T_TERMS); select.where(eq(Schema.C_TERMS_CONTEXT, bindMarker(Schema.C_TERMS_CONTEXT))) .and( eq(Schema.C_TERMS_FIELD, bindMarker(Schema.C_TERMS_FIELD))) .and( eq(Schema.C_TERMS_VALUE, bindMarker(Schema.C_TERMS_VALUE))); m_searchStatement = m_session.prepare(select.toString()); select = QueryBuilder.select(Schema.C_ATTRS_ATTR, Schema.C_ATTRS_VALUE).from(Schema.T_ATTRS); select.where(eq(Schema.C_ATTRS_CONTEXT, bindMarker(Schema.C_ATTRS_CONTEXT))) .and( eq(Schema.C_ATTRS_RESOURCE, bindMarker(Schema.C_ATTRS_RESOURCE))); m_selectAttributesStatement = m_session.prepare(select.toString()); select = QueryBuilder.select(Schema.C_METRICS_NAME).from(Schema.T_METRICS); select.where(eq(Schema.C_METRICS_CONTEXT, bindMarker(Schema.C_METRICS_CONTEXT))) .and( eq(Schema.C_METRICS_RESOURCE, bindMarker(Schema.C_METRICS_RESOURCE))); m_selectMetricNamesStatement = m_session.prepare(select.toString()); }
@Override public Collection<Badge> findByEvent(String event) { Map<UUID, Badge> badges = new HashMap<>(); Select.Where select = QueryBuilder.select().distinct().column("badge_id") .from("badge_events") .where(QueryBuilder.eq("event", event)); ResultSet set = session.execute(select); for(Row row:set.all()) { UUID badgeId = row.getUUID("badge_id"); if(badges.get(badgeId) == null) { badges.put(badgeId, get(badgeId).get()); } } return badges.values(); }
@Override public List<Goal> findGoalsByBadgeId(UUID badgeId) { Select.Where goalQuery = QueryBuilder.select("event", "count") .from("badge_events") .where(QueryBuilder.eq("badge_id", badgeId)); ResultSet rows = session.execute(goalQuery); List<Goal> goals = new ArrayList<>(); for (Row row : rows) { goals.add(new Goal.Builder() .event(row.getString("event")) .value(row.getInt("count")) .build()); } return goals; }
@Override public Optional<Badge> get(UUID id) { Select.Where badgeQuery = QueryBuilder .select() .all() .from("badges") .where(QueryBuilder.eq("id", id)); ResultSet set = session.execute(badgeQuery); Row row = set.one(); return Optional.of(new Badge.Builder().id(row.getUUID("id")) .retired(row.getBool("retired")) .description(row.getString("description")) .name(row.getString("name")) .goals(findGoalsByBadgeId(id)) .url(row.getString("url")) .build()); }
@Override public boolean evaluate(long playerId, Badge badge) { if(hasEarned(playerId, badge.getId())) return false; int goalsAchieved = 0; for (Goal goal : badge.getGoals()) { Select.Where eval = QueryBuilder.select().all() .from("player_event_counts") .where(QueryBuilder.eq("player_id", playerId)) .and(QueryBuilder.eq("event", goal.getEvent())); ResultSet resultSet = session.execute(eval); Row row = resultSet.one(); if(row != null) { long count = row.getLong("counter_value"); if(count >= goal.getValue() ) { goalsAchieved++; } } } return goalsAchieved == badge.getGoals().size(); }
@Override public CassandraEndpointUserConfiguration findByUserIdAndAppTokenAndSchemaVersion( String userId, String appToken, Integer schemaVersion ) { LOG.debug("Searching for user specific configuration by user id {}, " + "application token {} and schema version {}", userId, appToken, schemaVersion); Select.Where select = select().from(getColumnFamilyName()) .where(eq(EP_USER_CONF_USER_ID_PROPERTY, userId)) .and(eq(EP_USER_CONF_APP_TOKEN_PROPERTY, appToken)) .and(eq(EP_USER_CONF_VERSION_PROPERTY, schemaVersion)); CassandraEndpointUserConfiguration userConfiguration = findOneByStatement(select); if (LOG.isTraceEnabled()) { LOG.debug("[{},{},{}] Search result: {}.", userId, appToken, schemaVersion, userConfiguration); } else { LOG.debug("[{},{},{}] Search result: {}.", userId, appToken, schemaVersion, userConfiguration != null); } return userConfiguration; }
private static String processKeys(String[] columnNames, BuiltStatement delete) { BuiltStatement query = null; boolean isWhereNeeded = true; for (String columnName : columnNames) { if (isWhereNeeded) { if (delete instanceof Delete) { query = ((Delete) delete).where(QueryBuilder.eq(columnName, "?")); } else { query = ((Select) delete).where(QueryBuilder.eq(columnName, "?")); } isWhereNeeded = false; } else { if (delete instanceof Delete) { query = ((Delete.Where) query).and(QueryBuilder.eq(columnName, "?")); } else { query = ((Select.Where) query).and(QueryBuilder.eq(columnName, "?")); } } } return query != null ? query.getQueryString() : null; }
private void createPreparedStatements() { Select.Where select = select() .all() .from( CassandraModel.CF_METRICS_METADATA_NAME ) .where( eq( KEY, bindMarker() )); getValue = DatastaxIO.getSession().prepare( select ); Insert insert = insertInto( CassandraModel.CF_METRICS_METADATA_NAME ) .value( KEY, bindMarker() ) .value( COLUMN1, bindMarker() ) .value( VALUE, bindMarker() ); putValue = DatastaxIO.getSession().prepare( insert ); putValue.setConsistencyLevel( ConsistencyLevel.LOCAL_ONE ); }
/** * Create all prepared statements use in this class for metrics_locator */ private void createPreparedStatements() { // create a generic select statement for retrieving from metrics_locator Select.Where select = QueryBuilder .select() .all() .from( CassandraModel.CF_METRICS_LOCATOR_NAME ) .where( eq ( KEY, bindMarker() )); getValue = DatastaxIO.getSession().prepare( select ); // create a generic insert statement for inserting into metrics_locator Insert insert = QueryBuilder.insertInto( CassandraModel.CF_METRICS_LOCATOR_NAME) .using(ttl(TenantTtlProvider.LOCATOR_TTL)) .value(KEY, bindMarker()) .value(COLUMN1, bindMarker()) .value(VALUE, bindMarker()); putValue = DatastaxIO.getSession() .prepare(insert) .setConsistencyLevel( ConsistencyLevel.LOCAL_ONE ); }
public DDelayedLocatorIO() { // create a generic select statement for retrieving from metrics_delayed_locator Select.Where select = QueryBuilder .select() .all() .from( CassandraModel.CF_METRICS_DELAYED_LOCATOR_NAME ) .where( eq ( KEY, bindMarker() )); getValue = DatastaxIO.getSession().prepare( select ); // create a generic insert statement for inserting into metrics_delayed_locator Insert insert = QueryBuilder.insertInto( CassandraModel.CF_METRICS_DELAYED_LOCATOR_NAME) .using(ttl(TenantTtlProvider.DELAYED_LOCATOR_TTL)) .value(KEY, bindMarker()) .value(COLUMN1, bindMarker()) .value(VALUE, bindMarker()); putValue = DatastaxIO.getSession() .prepare(insert) .setConsistencyLevel( ConsistencyLevel.LOCAL_ONE ); }
private E queryEntityInternal(String entityId, Select select) throws EventStoreException { List<Row> entityEventDatas = cassandraSession.execute(select, PagingIterable::all); E initialInstance, result = null; try { initialInstance = entityType.newInstance(); } catch (InstantiationException | IllegalAccessException e) { log.error(e.getMessage(), e); throw new EventStoreException(e); } for (Row entityEventData : entityEventDatas) { EntityEvent entityEvent = convertToEntityEvent(entityEventData); if (entityEvent.getStatus() == EventState.CREATED || entityEvent.getStatus() == EventState.SUCCEDEED) { EntityFunctionSpec<E, ?> functionSpec = functionMap.get(entityEvent.getEventType()); if (functionSpec != null) { EntityEventWrapper eventWrapper = new EntityEventWrapper<>(functionSpec.getQueryType(), objectMapper, entityEvent); EntityFunction<E, ?> entityFunction = functionSpec.getEntityFunction(); result = (E) entityFunction.apply(result == null ? initialInstance : result, eventWrapper); } else log.trace("Function Spec is not available for " + entityEvent.getEventType() + " EntityId:" + entityId + " Table:" + tableName); } if (result != null) { result.setId(entityId); result.setVersion(entityEvent.getEventKey().getVersion()); } } return (result == null || result.getId() == null) ? null : result; }
@Override public List<EntityEvent> queryHistory(String entityId) throws EventStoreException { Select select = QueryBuilder.select().from(tableName); select.where(QueryBuilder.eq(CassandraEventRecorder.ENTITY_ID, entityId)); return cassandraSession.execute(select, PagingIterable::all) .stream().map(CassandraViewQuery::convertToEntityEvent).collect(Collectors.toList()); }
@Override public E queryEntity(String entityId, int version) throws EventStoreException { Select select = QueryBuilder.select().from(tableName); select.where(QueryBuilder.eq(CassandraEventRecorder.ENTITY_ID, entityId)); select.where(QueryBuilder.lte(CassandraEventRecorder.VERSION, version)); return queryEntityInternal(entityId, select); }
/** * Get entity id by entity key. * * @param key the entity key * @param tenant tenant name * @return entity id */ public Long getIdByKey(String key, String tenant) { Select select = QueryBuilder.select(ENTITY_ID_COL).from(tenant, TABLE_KEY_ID); select.where(eq(ENTITY_KEY_COL, key)); ResultSet resultSet = session.execute(select); Row row = resultSet.one(); return row == null ? null : row.getLong(ENTITY_ID_COL); }
/** * Get entity key by entity id. * * @param entityId entity id * @param tenant tenant name * @return entity key */ public String getKeyById(Long entityId, String tenant) { Select select = QueryBuilder.select(ENTITY_KEY_COL).from(tenant, VIEW_ID_KEY); select.where(eq(ENTITY_ID_COL, entityId)); ResultSet resultSet = session.execute(select); Row row = resultSet.one(); return row == null ? null : row.getString(ENTITY_KEY_COL); }
/** * Get timelines by user key and date. * * @param userKey the user key * @param dateFrom the date from * @param dateTo the date to * @param page the next page code * @param limit the limit per page * @return timeline page with list of timelines and next page code */ public TimelinePageVM getTimelinesByUserKeyAndDate(String userKey, Instant dateFrom, Instant dateTo, String page, int limit) { Select select = QueryBuilder.select(getFields()).from(TenantContext.getCurrent().getTenant(), TABLE_TIMELINE_BY_USER_AND_DATE); select.where(eq(USER_KEY_COL, userKey)); prepareWhereClause(select, null, dateFrom, dateTo, limit); return getPage(select, page, limit); }
/** * Get timelines by user key, operation and date. * * @param userKey the user key * @param operation the operation * @param dateFrom the date from * @param dateTo the date to * @param page the next page code * @param limit the limit per page * @return timeline page with list of timelines and next page code */ public TimelinePageVM getTimelinesByUserKeyAndOpAndDate(String userKey, String operation, Instant dateFrom, Instant dateTo, String page, int limit) { Select select = QueryBuilder.select(getFields()).from(TenantContext.getCurrent().getTenant(), TABLE_TIMELINE_BY_USER_AND_OP_AND_DATE); select.where(eq(USER_KEY_COL, userKey)); prepareWhereClause(select, operation, dateFrom, dateTo, limit); return getPage(select, page, limit); }