/** * {@inheritDoc} */ @Override public TableInfo get(@Nonnull @NonNull final ConnectorRequestContext context, @Nonnull @NonNull final QualifiedName name) { final String keyspace = name.getDatabaseName(); final String table = name.getTableName(); log.debug("Attempting to get metadata for Cassandra table {}.{} for request {}", keyspace, table, context); try { final KeyspaceMetadata keyspaceMetadata = this.getCluster().getMetadata().getKeyspace(keyspace); if (keyspaceMetadata == null) { throw new DatabaseNotFoundException(name); } final TableMetadata tableMetadata = keyspaceMetadata.getTable(table); if (tableMetadata == null) { throw new TableNotFoundException(name); } final TableInfo tableInfo = this.getTableInfo(name, tableMetadata); log.debug("Successfully got metadata for Cassandra table {}.{} for request {}", keyspace, table, context); return tableInfo; } catch (final DriverException de) { log.error(de.getMessage(), de); throw this.getExceptionMapper().toConnectorException(de, name); } }
static RuntimeException propagateCause(ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof Error) throw ((Error) cause); // We could just rethrow e.getCause(). However, the cause of the ExecutionException has likely been // created on the I/O thread receiving the response. Which means that the stacktrace associated // with said cause will make no mention of the current thread. This is painful for say, finding // out which execute() statement actually raised the exception. So instead, we re-create the // exception. if (cause instanceof DriverException) throw ((DriverException) cause).copy(); else throw new DriverInternalError("Unexpected exception thrown", cause); }
/** * {@inheritDoc} */ @Override public void create( @Nonnull @NonNull final ConnectorRequestContext context, @Nonnull @NonNull final DatabaseInfo resource ) { final String keyspace = resource.getName().getDatabaseName(); log.debug("Attempting to create a Cassandra Keyspace named {} for request {}", keyspace, context); try { // TODO: Make this take parameters for replication and the class this.executeQuery( "CREATE KEYSPACE " + keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};" ); log.debug("Successfully created Cassandra Keyspace named {} for request {}", keyspace, context); } catch (final DriverException de) { log.error(de.getMessage(), de); throw this.getExceptionMapper().toConnectorException(de, resource.getName()); } }
/** * {@inheritDoc} */ @Override public DatabaseInfo get( @Nonnull @NonNull final ConnectorRequestContext context, @Nonnull @NonNull final QualifiedName name ) { final String keyspace = name.getDatabaseName(); log.debug("Attempting to get keyspace metadata for keyspace {} for request {}", keyspace, context); try { final KeyspaceMetadata keyspaceMetadata = this.getCluster().getMetadata().getKeyspace(keyspace); if (keyspaceMetadata == null) { throw new DatabaseNotFoundException(name); } log.debug("Successfully found the keyspace metadata for {} for request {}", name, context); return DatabaseInfo.builder().name(name).build(); } catch (final DriverException de) { log.error(de.getMessage(), de); throw this.getExceptionMapper().toConnectorException(de, name); } }
/** * Convert the given Cassandra driver exception to a corresponding ConnectorException if possible, otherwise * return a generic ConnectorException. * * @param de The Cassandra driver exception * @param name The fully qualified name of the resource which was attempting to be accessed or modified at time of * error * @return A connector exception wrapping the DriverException */ public ConnectorException toConnectorException( @Nonnull @NonNull final DriverException de, @Nonnull @NonNull final QualifiedName name ) { if (de instanceof AlreadyExistsException) { final AlreadyExistsException ae = (AlreadyExistsException) de; if (ae.wasTableCreation()) { return new TableAlreadyExistsException(name, ae); } else { return new DatabaseAlreadyExistsException(name, ae); } } else { return new ConnectorException(de.getMessage(), de); } }
/** * Add a single Permission to the Role's Permission Collection * * @param trans * @param role * @param perm * @param type * @param action * @return */ public Result<Void> addPerm(AuthzTrans trans, RoleDAO.Data role, PermDAO.Data perm) { // Note: Prepared Statements for Collection updates aren't supported String pencode = perm.encode(); try { getSession(trans).execute(UPDATE_SP + TABLE + " SET perms = perms + {'" + pencode + "'} WHERE " + "ns = '" + role.ns + "' AND name = '" + role.name + "';"); } catch (DriverException | APIException | IOException e) { reportPerhapsReset(trans,e); return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); } wasModified(trans, CRUD.update, role, "Added permission " + pencode + " to role " + role.fullName()); return Result.ok(); }
/** * Remove a single Permission from the Role's Permission Collection * @param trans * @param role * @param perm * @param type * @param action * @return */ public Result<Void> delPerm(AuthzTrans trans, RoleDAO.Data role, PermDAO.Data perm) { // Note: Prepared Statements for Collection updates aren't supported String pencode = perm.encode(); //ResultSet rv = try { getSession(trans).execute(UPDATE_SP + TABLE + " SET perms = perms - {'" + pencode + "'} WHERE " + "ns = '" + role.ns + "' AND name = '" + role.name + "';"); } catch (DriverException | APIException | IOException e) { reportPerhapsReset(trans,e); return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); } //TODO how can we tell when it doesn't? wasModified(trans, CRUD.update, role, "Removed permission " + pencode + " from role " + role.fullName() ); return Result.ok(); }
/** * Add description to role * * @param trans * @param ns * @param name * @param description * @return */ public Result<Void> addDescription(AuthzTrans trans, String ns, String name, String description) { try { getSession(trans).execute(UPDATE_SP + TABLE + " SET description = '" + description + "' WHERE ns = '" + ns + "' AND name = '" + name + "';"); } catch (DriverException | APIException | IOException e) { reportPerhapsReset(trans,e); return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); } Data data = new Data(); data.ns=ns; data.name=name; wasModified(trans, CRUD.update, data, "Added description " + description + " to role " + data.fullName(), null ); return Result.ok(); }
@Override public Result<Void> delete(AuthzTrans trans, Data data, boolean reread) { TimeTaken tt = trans.start("Delete NS Attributes " + data.name, Env.REMOTE); try { StringBuilder stmt = new StringBuilder(); attribDeleteAllStmt(stmt, data); try { getSession(trans).execute(stmt.toString()); } catch (DriverException | APIException | IOException e) { reportPerhapsReset(trans,e); trans.info().log(stmt); return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); } } finally { tt.done(); } return super.delete(trans, data, reread); }
public Result<Map<String,String>> readAttribByNS(AuthzTrans trans, String ns) { Map<String,String> map = new HashMap<String,String>(); TimeTaken tt = trans.start("readAttribByNS " + ns, Env.REMOTE); try { ResultSet rs = getSession(trans).execute("SELECT key,value FROM " + TABLE_ATTRIB + " WHERE ns='" + ns + "';"); for(Iterator<Row> iter = rs.iterator();iter.hasNext(); ) { Row r = iter.next(); map.put(r.getString(0), r.getString(1)); } } catch (DriverException | APIException | IOException e) { reportPerhapsReset(trans,e); return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); } finally { tt.done(); } return Result.ok(map); }
public Result<Set<String>> readNsByAttrib(AuthzTrans trans, String key) { Set<String> set = new HashSet<String>(); TimeTaken tt = trans.start("readNsBykey " + key, Env.REMOTE); try { ResultSet rs = getSession(trans).execute("SELECT ns FROM " + TABLE_ATTRIB + " WHERE key='" + key + "';"); for(Iterator<Row> iter = rs.iterator();iter.hasNext(); ) { Row r = iter.next(); set.add(r.getString(0)); } } catch (DriverException | APIException | IOException e) { reportPerhapsReset(trans,e); return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); } finally { tt.done(); } return Result.ok(set); }
/** * Add a single Permission to the Role's Permission Collection * * @param trans * @param roleFullName * @param perm * @param type * @param action * @return */ public Result<Void> addRole(AuthzTrans trans, PermDAO.Data perm, String roleFullName) { // Note: Prepared Statements for Collection updates aren't supported //ResultSet rv = try { getSession(trans).execute(UPDATE_SP + TABLE + " SET roles = roles + {'" + roleFullName + "'} " + "WHERE " + "ns = '" + perm.ns + "' AND " + "type = '" + perm.type + "' AND " + "instance = '" + perm.instance + "' AND " + "action = '" + perm.action + "';" ); } catch (DriverException | APIException | IOException e) { reportPerhapsReset(trans,e); return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); } wasModified(trans, CRUD.update, perm, "Added role " + roleFullName + " to perm " + perm.ns + '.' + perm.type + '|' + perm.instance + '|' + perm.action); return Result.ok(); }
/** * Remove a single Permission from the Role's Permission Collection * @param trans * @param roleFullName * @param perm * @param type * @param action * @return */ public Result<Void> delRole(AuthzTrans trans, PermDAO.Data perm, String roleFullName) { // Note: Prepared Statements for Collection updates aren't supported //ResultSet rv = try { getSession(trans).execute(UPDATE_SP + TABLE + " SET roles = roles - {'" + roleFullName + "'} " + "WHERE " + "ns = '" + perm.ns + "' AND " + "type = '" + perm.type + "' AND " + "instance = '" + perm.instance + "' AND " + "action = '" + perm.action + "';" ); } catch (DriverException | APIException | IOException e) { reportPerhapsReset(trans,e); return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); } //TODO how can we tell when it doesn't? wasModified(trans, CRUD.update, perm, "Removed role " + roleFullName + " from perm " + perm.ns + '.' + perm.type + '|' + perm.instance + '|' + perm.action); return Result.ok(); }
/** * Add description to this permission * * @param trans * @param ns * @param type * @param instance * @param action * @param description * @return */ public Result<Void> addDescription(AuthzTrans trans, String ns, String type, String instance, String action, String description) { try { getSession(trans).execute(UPDATE_SP + TABLE + " SET description = '" + description + "' WHERE ns = '" + ns + "' AND type = '" + type + "'" + "AND instance = '" + instance + "' AND action = '" + action + "';"); } catch (DriverException | APIException | IOException e) { reportPerhapsReset(trans,e); return Result.err(Result.ERR_Backend, CassAccess.ERR_ACCESS_MSG); } Data data = new Data(); data.ns=ns; data.type=type; data.instance=instance; data.action=action; wasModified(trans, CRUD.update, data, "Added description " + description + " to permission " + data.encode(), null ); return Result.ok(); }
private BoundStatement ps(TransStore trans) throws APIException, IOException { if(ps==null) { synchronized(this) { if(ps==null) { TimeTaken tt = trans.start("Preparing PSInfo " + crud.toString().toUpperCase() + " on " + name,Env.SUB); try { ps = new BoundStatement(getSession(trans).prepare(cql)); ps.setConsistencyLevel(consistency); } catch (DriverException e) { reportPerhapsReset(trans,e); throw e; } finally { tt.done(); } } } } return ps; }
/** * Read the Data from Cassandra given a Prepared Statement (defined by the * DAO Instance) * * This is common behavior among all DAOs. * @throws DAOException */ public Result<List<DATA>> read(TRANS trans, String text, Object[] key) { TimeTaken tt = trans.start(text,Env.REMOTE); ResultSet rs; try { rs = getSession(trans).execute(key==null?ps(trans):ps(trans).bind(key)); /// TEST CODE for Exception // boolean force = true; // if(force) { // Map<InetSocketAddress, Throwable> misa = new HashMap<InetSocketAddress,Throwable>(); // //misa.put(new InetSocketAddress(444),new Exception("no host was tried")); // misa.put(new InetSocketAddress(444),new Exception("Connection has been closed")); // throw new com.datastax.driver.core.exceptions.NoHostAvailableException(misa); //// throw new com.datastax.driver.core.exceptions.AuthenticationException(new InetSocketAddress(9999),"no host was tried"); // } //// END TEST CODE } catch (DriverException | APIException | IOException e) { AbsCassDAO.this.reportPerhapsReset(trans,e); return Result.err(Status.ERR_Backend,"%s-%s executing %s",e.getClass().getName(),e.getMessage(), cql); } finally { tt.done(); } return extract(loader,rs,null /*let Array be created if necessary*/,dflt); }
/** * Creates a cluster object. */ public void buildCluster() { try { if (protocolVersion != null && protocolVersion.length() != 0) { ProtocolVersion version = getCassandraProtocolVersion(); cluster = Cluster.builder().addContactPoint(node).withCredentials(userName, password).withProtocolVersion(version).build(); } else { cluster = Cluster.builder().addContactPoint(node).withCredentials(userName, password).build(); } } catch (DriverException ex) { throw new RuntimeException("closing database resource", ex); } catch (Throwable t) { DTThrowable.rethrow(t); } }
/** * Create connection with database. */ @Override public void connect() { try { if (cluster == null) { buildCluster(); } session = cluster.connect(); logger.debug("Cassandra connection Success"); } catch (DriverException ex) { throw new RuntimeException("closing database resource", ex); } catch (Throwable t) { DTThrowable.rethrow(t); } }
@Override public long getCommittedWindowId(String appId, int operatorId) { try { BoundStatement boundStatement = new BoundStatement(lastWindowFetchCommand); lastWindowFetchStatement = boundStatement.bind(appId,operatorId); long lastWindow = -1; ResultSet resultSet = session.execute(lastWindowFetchStatement); if (!resultSet.isExhausted()) { lastWindow = resultSet.one().getLong(0); } lastWindowFetchCommand.disableTracing(); return lastWindow; } catch (DriverException ex) { throw new RuntimeException(ex); } }
public void insertEventsInTable(int numEvents) { try { Cluster cluster = Cluster.builder().addContactPoint(NODE).build(); Session session = cluster.connect(KEYSPACE); String insert = "INSERT INTO " + TABLE_NAME_INPUT + " (ID,lastname,age)" + " VALUES (?,?,?);"; PreparedStatement stmt = session.prepare(insert); BoundStatement boundStatement = new BoundStatement(stmt); for (int i = 0; i < numEvents; i++) { ids.add(i); mapNames.put(i, "test" + i); mapAge.put(i, i + 10); session.execute(boundStatement.bind(i, "test" + i, i + 10)); } } catch (DriverException e) { throw new RuntimeException(e); } }
/** * Run cql statements on this embedded cassandra instance. * @throws EmbeddedCassandraException if the cql statements can't be run */ public void runCQL(final Session session, final String cql) { checkState(running.get(), "not running"); for (String statement : Splitter.on(';').omitEmptyStrings().trimResults().split(cql)) { int retries = CQL_RETRIES; while (retries-- > 0) { try { session.execute(statement + ';'); break; } catch (DriverException e) { if (retries > 0) { LOG.warn("Cql error ({}). {} retries left", e.getMessage(), retries); } else { LOG.error("Cql error", e); throw new EmbeddedCassandraException("Can't run cql statement: " + statement, e); } } } } }
protected void checkCassandraException(Exception e) { _mexceptions.incr(); if (e instanceof AlreadyExistsException || e instanceof AuthenticationException || e instanceof DriverException || e instanceof DriverInternalError || e instanceof InvalidConfigurationInQueryException || e instanceof InvalidQueryException || e instanceof InvalidTypeException || e instanceof QueryExecutionException || e instanceof QueryTimeoutException || e instanceof QueryValidationException || e instanceof ReadTimeoutException || e instanceof SyntaxError || e instanceof TraceRetrievalException || e instanceof TruncateException || e instanceof UnauthorizedException || e instanceof UnavailableException || e instanceof ReadTimeoutException || e instanceof WriteTimeoutException) { throw new ReportedFailedException(e); } else { throw new RuntimeException(e); } }
@Override public void commit(Long txid) { DriverException lastException = null; // Read current value. //if we failed to apply the update , maybe the state has change already , we need to calculate the new state and apply it again for (Map.Entry<K, V> entry : aggregateValues.entrySet()) { int attempts = 0; boolean applied = false; while (!applied && attempts < maxAttempts) { try{ applied = updateState(entry, txid); } catch(QueryExecutionException e) { lastException = e; LOG.warn("Catching {} attempt {}"+txid+"-"+partitionIndex, e.getMessage(), attempts); } attempts++; } if(!applied) { if(lastException != null) { throw new CassandraCqlIncrementalStateException("Ran out of attempts ["+attempts+"] max of ["+maxAttempts+"] "+txid+"-"+ partitionIndex, lastException); } else { throw new CassandraCqlIncrementalStateException("Ran out of attempts ["+attempts+"] max of ["+maxAttempts+"] "+txid+"-"+ partitionIndex); } } } }
static void load(SessionContext sessionContext, List<String> cqlStatements) { if (!cqlStatements.isEmpty()) { sessionContext.checkClusterHealth(); } try { cqlStatements.stream() .map(stringStatement -> new SimpleStatement(stringStatement).setConsistencyLevel(sessionContext.getWriteConsistencyLevel())) .forEach(statement -> { LOGGER.debug("Executing cql statement {}", statement); sessionContext.getSession().execute(statement); }); } catch (DriverException e) { LOGGER.error("Failed to execute cql statements {}: {}", cqlStatements, e.getMessage()); throw e; } }
/** * {@inheritDoc} * <p> * Returns true if successfully inserted lock. * Returns false if current lock is owned by this client. * Returns false if WriteTimeoutException thrown. * * @throws CannotAcquireLockException if any DriverException thrown while executing queries. */ @Override public boolean acquire(String clientId) throws CannotAcquireLockException { try { ResultSet resultSet = session.execute(insertLockQuery.bind(lockName, clientId)); Row currentLock = resultSet.one(); // we could already hold the lock and not be aware if a previous acquire had a writetimeout as a timeout is not a failure in cassandra if (currentLock.getBool("[applied]") || clientId.equals(currentLock.getString("client"))) { return true; } else { log.info("Lock currently held by {}", currentLock); return false; } } catch (WriteTimeoutException wte) { log.warn("Query to acquire lock for {} failed to execute: {}", clientId, wte.getMessage()); return false; } catch (DriverException de) { throw new CannotAcquireLockException(String.format("Query to acquire lock %s for client %s failed to execute", lockName, clientId), de); } }
@Test public void shouldThrowExceptionIfQueryFailsToExecuteWhenAcquiringLock() throws Exception { //given primingClient.prime(PrimingRequest.preparedStatementBuilder() .withQuery("INSERT INTO cqlmigrate.locks (name, client) VALUES (?, ?) IF NOT EXISTS") .withThen(then() .withResult(Result.unavailable)) .build() ); //when Throwable throwable = catchThrowable(() -> lockingMechanism.acquire(CLIENT_ID)); //then assertThat(throwable) .isNotNull() .isInstanceOf(CannotAcquireLockException.class) .hasCauseInstanceOf(DriverException.class) .hasMessage(String.format("Query to acquire lock %s.schema_migration for client %s failed to execute", LOCK_KEYSPACE, CLIENT_ID)); }
@Test public void shouldThrowExceptionIfQueryFailsToExecuteWhenReleasingLock() throws Exception { //given primingClient.prime(PrimingRequest.preparedStatementBuilder() .withQuery("DELETE FROM cqlmigrate.locks WHERE name = ? IF client = ?") .withThen(then() .withResult(Result.unavailable)) .build() ); //when Throwable throwable = catchThrowable(() -> lockingMechanism.release(CLIENT_ID)); //then assertThat(throwable) .isNotNull() .isInstanceOf(CannotReleaseLockException.class) .hasCauseInstanceOf(DriverException.class) .hasMessage("Query failed to execute"); }
public boolean updateUser(User user) { Session session = cassandraSupport.getSession(); PreparedStatement statement = session.prepare( "update user set firstname = ?, lastname = ?, email = ?, password = ?, reputation = ? where login = ?;" ); BoundStatement boundStatement = new BoundStatement(statement); try { session.execute(boundStatement.bind(user.getFirstName(), user.getLastName(), user.getEmail(), md5(user.getPassword()), user.getReputation(), user.getLogin())); return true; } catch (DriverException e) { Logger.error("unable update user", e); } return false; }
@Override public void run() { pendingRequestCounter.decrementAndGet(); try { future.getUninterruptibly(); } catch (DriverException e) { cassandraErrorCount.increment(); if (event != null) { if (event.get(JetstreamReservedKeys.MessageAffinityKey .toString()) == null) { event.put(JetstreamReservedKeys.MessageAffinityKey .toString(), (String) event .get(MCConstant.METRIC_NAME)); } getAdviceListener().retry(event, RetryEventCode.MSG_RETRY, e.getMessage()); eventSentToAdviceListener.increment(); } registerError(e); } }
@Override public void close() { if (session != null && !session.isClosed()) { if (schemaOption.shouldDropSchema()) { try { session.execute(String.format("DROP KEYSPACE %s; ", keySpace)); LOG.info("Dropped keyspace {}", keySpace); } catch (DriverException e) { LOG.error("Failed to drop keyspace : {}. err={}", keySpace, e); } } session.close(); session = null; } this.connectionContext.close(this); }
private void retryQuery(String id, Statement query, final long startTime, int retryCount, DriverException e) throws DriverException { if (retryCount >= maxWriteRetries) { logger.error("[{}]: Query aborted after {} retry: ", id, retryCount, e.getMessage()); metricFailed.inc(((BatchStatement) query).size()); commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); throw e; } else { logger.warn("[{}]: Query failed, retrying {} of {}: {} ", id, retryCount, maxWriteRetries, e.getMessage()); try { Thread.sleep(1000 * (1 << retryCount)); } catch (InterruptedException ie) { logger.debug("[{}]: Interrupted: {}", id, ie); } _executeQuery(id, query, startTime, retryCount++); } }
private <T> Observable.Transformer<T, T> applyInsertRetryPolicy() { return tObservable -> tObservable .retryWhen(errors -> { Observable<Integer> range = Observable.range(1, 2); return errors .zipWith(range, (t, i) -> { if (t instanceof DriverException) { return i; } throw Exceptions.propagate(t); }) .flatMap(retryCount -> { long delay = (long) Math.min(Math.pow(2, retryCount) * 1000, 3000); log.debug("Retrying batch insert in " + delay + " ms"); return Observable.timer(delay, TimeUnit.MILLISECONDS); }); }); }
private <T> Observable.Transformer<T, T> applyRetryPolicy() { return tObservable -> tObservable .retryWhen(observable -> { Observable<Integer> range = Observable.range(1, Integer.MAX_VALUE); Observable<Observable<?>> zipWith = observable.zipWith(range, (t, i) -> { log.debug("Attempt #" + i + " to retry the operation after Cassandra client" + " exception"); if (t instanceof DriverException) { return Observable.timer(i, TimeUnit.SECONDS).onBackpressureDrop(); } else { return Observable.error(t); } }); return Observable.merge(zipWith); }) .doOnError(t -> log.error("Failure while trying to apply compression, skipping block", t)) .onErrorResumeNext(Observable.empty()); }
/** * {@inheritDoc} */ public List<String> getSubscriptionIDs(String queryName) throws NoSuchNameExceptionResponse, SecurityExceptionResponse, ValidationExceptionResponse, ImplementationExceptionResponse { try { LOG.info("Invoking 'getSubscriptionIDs'"); Session session = null; try { session = cassandraResource.createOrGetSession(); // TODO: filter by queryName?! Map<String, QuerySubscriptionScheduled> subscribedMap = loadSubscriptions(session); Set<String> temp = subscribedMap.keySet(); return new ArrayList<String>(temp); } finally { LOG.debug("DB connection closed"); } } catch (DriverException e) { ImplementationException iex = new ImplementationException(); String msg = "SQL error during query execution: " + e.getMessage(); LOG.error(msg, e); iex.setReason(msg); iex.setSeverity(ImplementationExceptionSeverity.ERROR); throw new ImplementationExceptionResponse(msg, iex, e); } }
/** * Creates a new {@link CassandraEntityStoreFactory} instance. * * @param host * a Cassandra cluster host to connect to * @param port * port number for Cassandra's native protocol * @param keyspace * application keyspace * @param consistency * default consistency level */ @Inject public CassandraEntityStoreFactory(@Named("cassandraHost") String host, @Named("cassandraPort") int port, @Named("cassandraKeyspace") String keyspace, @Named("cassandraConsistency") ConsistencyLevel consistency) { checkNotNull(host, "Cassandra hostname"); checkNotNull(port, "Cassandra port number"); checkNotNull(keyspace, "Cassandra keyspace"); checkNotNull(consistency, "Cassandra consistency level"); m_consistency = consistency; Cluster cluster = Cluster.builder().withPort(port).addContactPoint(host).build(); try { m_session = cluster.connect(keyspace); } catch (DriverException e) { throw new LucidityException(e); } }
/** * {@inheritDoc} */ @Override public void delete(@Nonnull @NonNull final ConnectorRequestContext context, @Nonnull @NonNull final QualifiedName name) { final String keyspace = name.getDatabaseName(); final String table = name.getTableName(); log.debug("Attempting to delete Cassandra table {}.{} for request {}", keyspace, table, context); try { this.executeQuery("USE " + keyspace + "; DROP TABLE IF EXISTS " + table + ";"); log.debug("Successfully deleted Cassandra table {}.{} for request {}", keyspace, table, context); } catch (final DriverException de) { log.error(de.getMessage(), de); throw this.getExceptionMapper().toConnectorException(de, name); } }
/** * {@inheritDoc} */ @Override public void delete(@Nonnull @NonNull final ConnectorRequestContext context, @Nonnull @NonNull final QualifiedName name) { final String keyspace = name.getDatabaseName(); log.debug("Attempting to drop Cassandra keyspace {} for request {}", keyspace, context); try { this.executeQuery("DROP KEYSPACE IF EXISTS " + keyspace + ";"); log.debug("Successfully dropped {} keyspace", keyspace); } catch (final DriverException de) { log.error(de.getMessage(), de); throw this.getExceptionMapper().toConnectorException(de, name); } }
/** * {@inheritDoc} */ @Override public List<QualifiedName> listViewNames( @Nonnull @NonNull final ConnectorRequestContext context, @Nonnull @NonNull final QualifiedName databaseName ) { final String catalogName = databaseName.getCatalogName(); final String keyspace = databaseName.getDatabaseName(); log.debug("Attempting to get materialized view names for keyspace {} due to request {}", keyspace, context); try { final KeyspaceMetadata keyspaceMetadata = this.getCluster().getMetadata().getKeyspace(keyspace); if (keyspaceMetadata == null) { throw new DatabaseNotFoundException(databaseName); } final ImmutableList.Builder<QualifiedName> viewsBuilder = ImmutableList.builder(); for (final MaterializedViewMetadata view : keyspaceMetadata.getMaterializedViews()) { viewsBuilder.add( QualifiedName.ofView(catalogName, keyspace, view.getBaseTable().getName(), view.getName()) ); } final List<QualifiedName> views = viewsBuilder.build(); log.debug("Successfully found {} views for keyspace {} due to request {}", views.size(), keyspace, context); return views; } catch (final DriverException de) { log.error(de.getMessage(), de); throw this.getExceptionMapper().toConnectorException(de, databaseName); } }
/** * {@inheritDoc} */ @Override public List<QualifiedName> listNames( @Nonnull @NonNull final ConnectorRequestContext context, @Nonnull @NonNull final QualifiedName name, @Nullable final QualifiedName prefix, @Nullable final Sort sort, @Nullable final Pageable pageable ) { log.debug("Attempting to list keyspaces for request {}", context); try { final List<QualifiedName> names = Lists.newArrayList(); for (final KeyspaceMetadata keyspace : this.getCluster().getMetadata().getKeyspaces()) { final String keyspaceName = keyspace.getName(); if (prefix != null && !keyspaceName.startsWith(prefix.getDatabaseName())) { continue; } names.add(QualifiedName.ofDatabase(name.getCatalogName(), keyspaceName)); } if (sort != null) { // We can only really sort by the database name at this level so ignore SortBy field final Comparator<QualifiedName> comparator = Comparator.comparing(QualifiedName::getDatabaseName); ConnectorUtils.sort(names, sort, comparator); } final List<QualifiedName> results = ConnectorUtils.paginate(names, pageable); log.debug("Finished listing keyspaces for request {}", context); return results; } catch (final DriverException de) { log.error(de.getMessage(), de); throw this.getExceptionMapper().toConnectorException(de, name); } }
@Override public Status process() throws EventDeliveryException { Status status = Status.BACKOFF; Transaction txn = this.getChannel().getTransaction(); try { txn.begin(); List<Event> eventList = this.takeEventsFromChannel(this.getChannel(), this.batchSize); status = Status.READY; if (!eventList.isEmpty()) { if (eventList.size() == this.batchSize) { this.sinkCounter.incrementBatchCompleteCount(); } else { this.sinkCounter.incrementBatchUnderflowCount(); } for (final CassandraTable table : tables) { table.save(eventList); } this.sinkCounter.addToEventDrainSuccessCount(eventList.size()); } else { this.sinkCounter.incrementBatchEmptyCount(); } txn.commit(); status = Status.READY; } catch (Throwable t) { try { txn.rollback(); } catch (Exception e) { log.error("Exception in rollback. Rollback might not have been successful.", e); } log.error("Failed to commit transaction. Rolled back.", t); if (t instanceof DriverException || t instanceof IllegalArgumentException) { throw new EventDeliveryException("Failed to commit transaction. Rolled back.", t); } else { // (t instanceof Error || t instanceof RuntimeException) Throwables.propagate(t); } } finally { txn.close(); } return status; }