private void advanceToNonEmptyRow() { KeySlice row = m_cassandraRows.get(m_rowIndex); m_currentCols = row.getColumns(); int skipSize = 0; while (m_currentCols.size() == skipSize && m_rowIndex < m_cassandraRows.size() - 1) { m_rowIndex++; row = m_cassandraRows.get(m_rowIndex); m_currentCols = row.getColumns(); } if (m_currentCols.size() == skipSize) { // we've been through the batch and there are no columns in any of these // rows - // so nothing to output! Indicate this by setting currentCols to null m_currentCols = null; } }
@Override public List<String> getRows(String storeName, String continuationToken, int count) { DBConn dbConn = getDBConnection(); try { List<KeySlice> keys = dbConn.getRangeSlices( CassandraDefs.columnParent(storeName), CassandraDefs.slicePredicateStartEndCol(null, null, 1), CassandraDefs.keyRangeStartRow(Utils.toBytes(continuationToken), count)); List<String> result = new ArrayList<>(keys.size()); for(KeySlice key: keys) { result.add(Utils.toString(key.getKey())); } return result; } finally { returnDBConnection(dbConn); } }
@Override protected Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>> computeNext() { maybeInit(); if (rows == null) { return endOfData(); } KeySlice ks = rows.get(0); SortedMap<ByteBuffer, IColumn> map = new TreeMap<ByteBuffer, IColumn>(comparator); for (ColumnOrSuperColumn cosc : ks.columns) { IColumn column = unthriftify(cosc); map.put(column.name(), column); } // return new Pair<ByteBuffer, SortedMap<ByteBuffer, IColumn>>(ks.key, map); return Pair.create(ks.key, map); }
/** * Return the decoded key value of a row. Assumes that the supplied row comes * from the column family that this meta data represents!! * * @param row a Cassandra row * @return the decoded key value * @throws KettleException if a deserializer can't be determined */ public Object getKeyValue(KeySlice row) throws KettleException { ByteBuffer key = row.bufferForKey(); if (m_keyValidator.indexOf("BytesType") > 0) { return row.getKey(); } return getColumnValue(key, m_keyValidator); }
private List<Map<String,Object>> convertKeySliceList(List<KeySlice> keySliceList, String primaryKeyName) { List<Map<String,Object>> rowList = new ArrayList<Map<String,Object>>(); try { for (KeySlice keySlice: keySliceList) { List<ColumnOrSuperColumn> columnList = keySlice.getColumns(); if (!columnList.isEmpty()) { byte[] keyBytes = keySlice.getKey(); String key = new String(keyBytes, "UTF-8"); Map<String,Object> columnMap = new HashMap<String,Object>(); columnMap.put(primaryKeyName, key); for (ColumnOrSuperColumn columnOrSuperColumn: columnList) { Column column = columnOrSuperColumn.getColumn(); byte[] columnNameBytes = column.getName(); String columnName = new String(columnNameBytes, "UTF-8"); byte[] valueBytes = column.getValue(); String value = new String(valueBytes, "UTF-8"); if (value.equals(NULL_VALUE_STRING)) value = null; columnMap.put(columnName, value); } rowList.add(columnMap); } } return rowList; } catch (UnsupportedEncodingException exc) { throw new StorageException("Character encoding exception with key range", exc); } }
/** * get * * @throws Exception */ @Test public void getByKey() throws Exception { String KEYSPACE = "mock"; client.set_keyspace(KEYSPACE); String COLUMN_FAMILY = "student"; // 讀取整筆 ColumnParent columnParent = new ColumnParent(COLUMN_FAMILY); // 術語 SlicePredicate predicate = new SlicePredicate(); // InvalidRequestException(why:predicate column_names and slice_range // may not both be present) // 範圍 // SliceRange sliceRange = new SliceRange(); // sliceRange.setStart(new byte[0]);// 開始 // sliceRange.setFinish(new byte[0]);// 結束 // predicate.setSlice_range(sliceRange); // 讀取1個column predicate.addToColumn_names(ByteBufferHelper.toByteBuffer("grad")); // key範圍 KeyRange keyRange = new KeyRange(); keyRange.setStart_key(new byte[0]); keyRange.setEnd_key(new byte[0]); keyRange.setCount(100); // 結果 // column_parent, predicate, range, consistency_level List<KeySlice> results = client.get_range_slices(columnParent, predicate, keyRange, ConsistencyLevel.ONE); for (KeySlice keySlice : results) { for (ColumnOrSuperColumn cos : keySlice.getColumns()) { Column column = cos.column; System.out.println(ByteHelper.toString(keySlice.getKey()) + ", " + ByteHelper.toString(column.getName()) + ": " + ByteHelper.toString(column.getValue()) + ", " + column.getTimestamp()); // Rose, grad, 4, 1380931646061000 // Jack, art, 87, 1380933848350 // Jack, grad, 5, 1380932164492000 // Jack, math, 97, 1380933848305 } } }
public List<POI> findPOIByHotel(String hotel) throws Exception { // /query SlicePredicate predicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(); sliceRange.setStart(hotel.getBytes()); sliceRange.setFinish(hotel.getBytes()); predicate.setSlice_range(sliceRange); // read all columns in the row String scFamily = "PointOfInterest"; ColumnParent parent = new ColumnParent(scFamily); KeyRange keyRange = new KeyRange(); keyRange.start_key = bytes(""); keyRange.end_key = bytes(""); List<POI> pois = new ArrayList<POI>(); // instead of a simple list, we get a map whose keys are row keys // and the values the list of columns returned for each // only row key + first column are indexed Connector cl = new Connector(); Cassandra.Client client = cl.connect(); List<KeySlice> slices = client.get_range_slices(parent, predicate, keyRange, CL); for (KeySlice slice : slices) { List<ColumnOrSuperColumn> cols = slice.columns; POI poi = new POI(); poi.name = new String(ByteBufferUtil.string(slice.key)); for (ColumnOrSuperColumn cosc : cols) { SuperColumn sc = cosc.super_column; List<Column> colsInSc = sc.columns; for (Column c : colsInSc) { String colName = new String(c.name.array(), UTF8); if (colName.equals("desc")) { poi.desc = new String(c.value.array(), UTF8); } if (colName.equals("phone")) { poi.phone = new String(c.value.array(), UTF8); } } LOG.debug("Found something neat nearby: " + poi.name + ". \nDesc: " + poi.desc + ". \nPhone: " + poi.phone); pois.add(poi); } } cl.close(); return pois; }
public List<Hotel> findHotelByCity(String city, String state) throws Exception { LOG.debug("Seaching for hotels in " + city + ", " + state); String key = city + ":" + state.toUpperCase(); // /query SlicePredicate predicate = new SlicePredicate(); SliceRange sliceRange = new SliceRange(); sliceRange.setStart(new byte[0]); sliceRange.setFinish(new byte[0]); predicate.setSlice_range(sliceRange); // read all columns in the row String columnFamily = "HotelByCity"; ColumnParent parent = new ColumnParent(columnFamily); KeyRange keyRange = new KeyRange(); keyRange.setStart_key(key.getBytes()); keyRange.setEnd_key("".getBytes()); // just outside lexical range keyRange.count = 5; Connector cl = new Connector(); Cassandra.Client client = cl.connect(); List<KeySlice> keySlices = client.get_range_slices(parent, predicate, keyRange, CL); List<Hotel> results = new ArrayList<Hotel>(); for (KeySlice ks : keySlices) { List<ColumnOrSuperColumn> coscs = ks.columns; LOG.debug(new String("Using key " + ks.key)); for (ColumnOrSuperColumn cs : coscs) { Hotel hotel = new Hotel(); hotel.name = ByteBufferUtil.string(cs.column.name); hotel.city = city; hotel.state = state; results.add(hotel); LOG.debug("Found hotel result for " + hotel.name); } } // /end query cl.close(); return results; }
private void maybeInit() { // check if we need another row if (rows != null && columnsRead < rowPageSize) { columnsRead = 0; startToken = partitioner.getTokenFactory().toString(partitioner.getToken(rows.get(0).key)); predicate.getSlice_range().setStart(startSlicePredicate); rows = null; prevStartSlice = null; totalRead++; } if (startToken == null) { startToken = split.getStartToken(); } else if (startToken.equals(split.getEndToken()) && rows == null) { // reached end of the split return; } KeyRange keyRange = new KeyRange(batchRowCount) .setStart_token(startToken) .setEnd_token(split.getEndToken()); try { rows = client.get_range_slices(new ColumnParent(cfName), predicate, keyRange, consistencyLevel); // nothing new? reached the end if (rows.isEmpty()) { rows = null; return; } //detect infinite loop if (prevStartSlice != null && ByteBufferUtil.compareUnsigned(prevStartSlice, predicate.slice_range.start) == 0) { rows = null; return; } // prepare for the next slice to be read KeySlice row = rows.get(0); if (row.getColumnsSize() > 0) { ColumnOrSuperColumn cosc = row.getColumns().get(row.getColumnsSize() - 1); prevStartSlice = predicate.slice_range.start; //prepare next slice if (cosc.column != null) { predicate.slice_range.start = cosc.column.name; } if (cosc.super_column != null) { predicate.slice_range.start = cosc.super_column.name; } if (cosc.counter_column != null) { predicate.slice_range.start = cosc.counter_column.name; } if (cosc.counter_super_column != null) { predicate.slice_range.start = cosc.counter_super_column.name; } columnsRead = row.getColumnsSize(); //If we've hit the max columns then rm the last column //to make sure we don't know where to start next without overlap if (columnsRead == rowPageSize) { row.getColumns().remove(columnsRead - 1); } } } catch (Exception e) { throw new RuntimeException(e); } }