/** * Creates a table based on a select on another type and inserts the data (currently not possible to * perform this action 'WITH NO DATA'. * @param sql * @param createAsSelect * @param index * @return the number of rows inserted * @throws SQLException */ public int execute(String sql, CreateTableAsSelect createAsSelect, String index) throws SQLException { if(!createAsSelect.isWithData()) throw new SQLException("Not yet possible to create table as select without data (create emtpy table, " + "insert data and delete it will have the same effect"); // first create the index SqlParser parser = new SqlParser(); int queryIdx = sql.toLowerCase().indexOf(" as "); try{ String createSql = sql.substring(0, queryIdx)+" (_id String)" ; CreateTable create = (CreateTable)parser.createStatement(createSql); this.execute(createSql, create, index); }catch(SQLException sqle) { throw sqle; }catch(Exception e){ throw new SQLException("Unable to create table due to: "+e.getMessage(), e); } // secondly add the documents from the query String insertSql = "INSERT INTO "+createAsSelect.getName().toString()+" "+sql.substring(queryIdx+4); Insert insert = (Insert)parser.createStatement(insertSql); int res = this.execute(insertSql, insert, index); this.statement.getConnection().getTypeMap(); // trigger a reload of the table&column set for the connection return res; }
@Override protected Void visitInsert(Insert node, Integer indent) { builder.append("INSERT INTO ") .append(node.getTarget()) .append(" "); if (node.getColumns().isPresent()) { builder.append("(") .append(Joiner.on(", ").join(node.getColumns().get())) .append(") "); } process(node.getQuery(), indent); return null; }
@Override public int executeUpdate(String sql) throws SQLException { //System.out.println("QUERY: ["+sql+"]"); sql = sql.replaceAll("\r", " ").replaceAll("\n", " ").trim(); // custom stuff to support UPDATE statements since Presto does not parse it if(sql.toLowerCase().startsWith("update")){ return updateState.execute(sql); } com.facebook.presto.sql.tree.Statement statement = parser.createStatement(sql); if(statement instanceof Query) throw new SQLException("A regular query cannot be executed as an Update"); if(statement instanceof Insert){ //if(connection.getSchema() == null) throw new SQLException("No active index set for this driver. Pleas specify an active index or alias by executing 'USE <index/alias>' first"); return updateState.execute(sql, (Insert)statement, connection.getSchema()); }else if(statement instanceof Delete){ if(connection.getSchema() == null) throw new SQLException("No active index set for this driver. Pleas specify an active index or alias by executing 'USE <index/alias>' first"); return updateState.execute(sql, (Delete)statement, connection.getSchema()); }else if(statement instanceof CreateTable){ return updateState.execute(sql, (CreateTable)statement, connection.getSchema()); }else if(statement instanceof CreateTableAsSelect){ return updateState.execute(sql, (CreateTableAsSelect)statement, connection.getSchema()); }else if(statement instanceof CreateView){ return updateState.execute(sql, (CreateView)statement, connection.getSchema()); }else if(statement instanceof Use){ connection.setSchema( ((Use)statement).getSchema()); //connection.getTypeMap(); // updates the type mappings found in properties return 0; }else if(statement instanceof DropTable){ return updateState.execute(sql, (DropTable)statement); }else if(statement instanceof DropView){ return updateState.execute(sql, (DropView)statement); }throw new SQLFeatureNotSupportedException("Unable to parse provided update sql"); }
/** * Parses the Insert statement and returns the values as one list (even if multiple value sets were found). Columns * to be inserted are added to the provided heading * @param insert * @param updateState * @param heading * @return * @throws SQLException */ public List<Object> parse(Insert insert, QueryState state) throws SQLException{ if(!insert.getColumns().isPresent()) throw new SQLException("Unable to insert data without column references"); if(insert.getQuery().getQueryBody() == null) throw new SQLException("Unable to insert data without any values"); if(!(insert.getQuery().getQueryBody() instanceof Values)) throw new SQLException("Unable to insert data from a query, use ... VALUES (...)"); List<String> fields = insert.getColumns().get(); List<Object> values = insert.getQuery().getQueryBody().accept(this, state); if(state.hasException()) throw state.getException(); for(String field : fields) state.getHeading().add(new Column(field)); return values; }
/** * Parses and executes the provided insert statement and returns 1 if execution was successful * @param sql * @param insert * @param index * @return the number of executed inserts * @throws SQLException */ public int execute(String sql, Insert insert, String index) throws SQLException{ if(insert.getQuery().getQueryBody() instanceof Values){ // parse one or multiple value sets (... VALUES (1,2,'a'), (2,4,'b'), ...) return this.insertFromValues(sql, insert, index, Utils.getIntProp(props, Utils.PROP_FETCH_SIZE, 2500)); }else if(insert.getQuery().getQueryBody() instanceof QuerySpecification){ // insert data based on a SELECT statement return this.insertFromSelect(sql, insert, index, Utils.getIntProp(props, Utils.PROP_FETCH_SIZE, 2500)); }else throw new SQLException("Unknown set of values to insert ("+insert.getQuery().getQueryBody()+")"); }
/** * Executes the {@link BulkRequest} being hold by this state. * @return an integer indicator for each executed request: Statement.SUCCESS_NO_INFO for success, * else Statement.EXECUTE_FAILED) */ public int[] executeBulk(){ int[] result = new int[bulkList.size()]; SqlParser parser = new SqlParser(); for(int i=0; i<bulkList.size(); i++) try{ String sql = bulkList.get(i); com.facebook.presto.sql.tree.Statement st = parser.createStatement(sql); if(st instanceof DropTable){ this.execute(sql, (DropTable)st); }else if(st instanceof DropView){ this.execute(sql, (DropView)st); }else if(st instanceof CreateTable){ this.execute(sql, (CreateTable)st, this.statement.getConnection().getSchema()); }else if(st instanceof CreateTableAsSelect){ this.execute(sql, (CreateTableAsSelect)st, this.statement.getConnection().getSchema()); }else if(st instanceof CreateView){ this.execute(sql, (CreateView)st, this.statement.getConnection().getSchema()); }else if(st instanceof Delete){ this.execute(sql, (Delete)st, this.statement.getConnection().getSchema()); }else if(st instanceof Insert){ this.execute(sql, (Insert)st, this.statement.getConnection().getSchema()); } result[i]= Statement.SUCCESS_NO_INFO; }catch (Exception e){ result[i] = Statement.EXECUTE_FAILED; } this.clearBulk(); return result; }
@Override public Node visitInsertInto(SqlBaseParser.InsertIntoContext context) { return new Insert( getQualifiedName(context.qualifiedName()), Optional.ofNullable(getColumnAliases(context.columnAliases())), (Query) visit(context.query())); }
@Test public void testInsertInto() throws Exception { QualifiedName table = QualifiedName.of("a"); Query query = simpleQuery(selectList(new AllColumns()), table(QualifiedName.of("t"))); assertStatement("INSERT INTO a SELECT * FROM t", new Insert(table, Optional.empty(), query)); assertStatement("INSERT INTO a (c1, c2) SELECT * FROM t", new Insert(table, Optional.of(ImmutableList.of("c1", "c2")), query)); }
/** * Creates a set of indexrequests based on the result of a query * @param sql * @param insert * @param index * @return * @throws SQLException */ @SuppressWarnings("unchecked") private int insertFromSelect(String sql, Insert insert, String index, int maxRequestsPerBulk) throws SQLException { queryState.buildRequest(sql, insert.getQuery().getQueryBody(), index); String[] indexAndType = this.getIndexAndType(insert.getTarget().toString(), sql, "into\\s+", "\\s+select", index); index = indexAndType[0]; String type = indexAndType[1]; // execute query using nested resultsets ResultSet rs = queryState.execute(false); Heading headingToInsert = headingFromResultSet(rs.getMetaData()); // read the resultset (recursively if nested) HashMap<String, Object> fieldValues = new HashMap<String, Object>(); List<IndexRequestBuilder> indexReqs = new ArrayList<IndexRequestBuilder>(); int indexCount = 0; while(rs != null){ while(rs.next()){ for(Column col : headingToInsert.columns()){ String label = col.getLabel(); Object value = rs.getObject(label); if(value == null) continue; Map<String, Object> nested = fieldValues; if(label.contains(".")){ String[] parts = label.split("\\."); for(int i=0; i<parts.length-1; i++){ String part = parts[i]; if(!nested.containsKey(part)) nested.put(part, new HashMap<String, Object>()); nested = (Map<String, Object>)nested.get(part); } label = parts[parts.length - 1]; } if(value instanceof ResultSet){ value = buildSource((ResultSet)value); }else if(value instanceof Array){ Object[] arrayVal = (Object[])((Array)value).getArray(); if(arrayVal.length > 0 && arrayVal[0] instanceof ResultSet){ for(int i=0; i<arrayVal.length; i++){ arrayVal[i] = buildSource((ResultSet)arrayVal[i]); } } value = arrayVal; } nested.put(label, value); } IndexRequestBuilder indexReq = client.prepareIndex().setIndex(index) .setType(type) .setSource(fieldValues); indexReqs.add(indexReq); if(indexReqs.size() >= maxRequestsPerBulk){ indexCount += this.execute(indexReqs, maxRequestsPerBulk); indexReqs.clear(); } fieldValues = new HashMap<String, Object>(); } rs.close(); rs = queryState.moreResults(false); } if(indexReqs.size() > 0) indexCount += this.execute(indexReqs, maxRequestsPerBulk); return indexCount; }
/** * creates a set of index requests based on a set of explicit VALUES * @param insert * @param index * @return * @throws SQLException */ @SuppressWarnings("unchecked") private int insertFromValues(String sql, Insert insert, String index, int maxRequestsPerBulk) throws SQLException { Heading heading = new Heading(); QueryState state = new BasicQueryState(sql, heading, this.props); List<Object> values = updateParser.parse(insert, state); if(state.hasException()) throw state.getException(); if(heading.hasLabel("_index") || heading.hasLabel("_type")) throw new SQLException("Not possible to set _index and _type fields"); String[] indexAndType = this.getIndexAndType(insert.getTarget().toString(), sql, "into\\s+", "\\s+values", index); index = indexAndType[0]; String type = indexAndType[1]; if(values.size() % heading.getColumnCount() != 0) throw new SQLException("Number of columns does not match number of values for one of the inserts"); List<IndexRequestBuilder> indexReqs = new ArrayList<IndexRequestBuilder>(); int indexCount = 0; int valueIdx = 0; while(valueIdx < values.size()){ HashMap<String, Object> fieldValues = new HashMap<String, Object>(); String id = null; for(Column col : heading.columns()){ Object value = values.get(valueIdx); valueIdx++; if(col.getColumn().equals("_id")){ id = value.toString(); continue; } if(col.getColumn().indexOf('.') == -1) { fieldValues.put(col.getColumn(), value); continue; } // create nested object Map<String, Object> map = fieldValues; String[] objectDef = col.getColumn().split("\\."); for(int k=0; k<objectDef.length; k++){ String key = objectDef[k]; if(k == objectDef.length-1) map.put(key, value); else{ if(!map.containsKey(key)) map.put(key, new HashMap<String, Object>()); map = (Map<String, Object>)map.get(key); } } } // create index request IndexRequestBuilder indexReq = client.prepareIndex().setIndex(index).setType(type); if(id != null) indexReq.setId(id); indexReq.setSource(fieldValues); indexReqs.add(indexReq); if(indexReqs.size() >= maxRequestsPerBulk){ indexCount += this.execute(indexReqs, maxRequestsPerBulk); indexReqs.clear(); } } if(indexReqs.size() > 0) indexCount += this.execute(indexReqs, maxRequestsPerBulk); return indexCount; }
@Override protected RelationType visitInsert(Insert insert, AnalysisContext context) { QualifiedObjectName targetTable = createQualifiedObjectName(session, insert, insert.getTarget()); if (metadata.getView(session, targetTable).isPresent()) { throw new SemanticException(NOT_SUPPORTED, insert, "Inserting into views is not supported"); } analysis.setUpdateType("INSERT"); // analyze the query that creates the data RelationType queryDescriptor = process(insert.getQuery(), context); // verify the insert destination columns match the query Optional<TableHandle> targetTableHandle = metadata.getTableHandle(session, targetTable); if (!targetTableHandle.isPresent()) { throw new SemanticException(MISSING_TABLE, insert, "Table '%s' does not exist", targetTable); } accessControl.checkCanInsertIntoTable(session.getRequiredTransactionId(), session.getIdentity(), targetTable); TableMetadata tableMetadata = metadata.getTableMetadata(session, targetTableHandle.get()); List<String> tableColumns = tableMetadata.getVisibleColumnNames(); final List<String> insertColumns; if (insert.getColumns().isPresent()) { insertColumns = insert.getColumns().get().stream() .map(String::toLowerCase) .collect(toImmutableList()); Set<String> columnNames = new HashSet<>(); for (String insertColumn : insertColumns) { if (!tableColumns.contains(insertColumn)) { throw new SemanticException(MISSING_COLUMN, insert, "Insert column name does not exist in target table: %s", insertColumn); } if (!columnNames.add(insertColumn)) { throw new SemanticException(DUPLICATE_COLUMN_NAME, insert, "Insert column name is specified more than once: %s", insertColumn); } } } else { insertColumns = tableColumns; } Map<String, ColumnHandle> columnHandles = metadata.getColumnHandles(session, targetTableHandle.get()); analysis.setInsert(new Analysis.Insert( targetTableHandle.get(), insertColumns.stream().map(column -> columnHandles.get(column)).collect(toImmutableList()))); Iterable<Type> tableTypes = insertColumns.stream() .map(insertColumn -> tableMetadata.getColumn(insertColumn).getType()) .collect(toImmutableList()); Iterable<Type> queryTypes = transform(queryDescriptor.getVisibleFields(), Field::getType); if (!elementsEqual(tableTypes, queryTypes)) { throw new SemanticException(MISMATCHED_SET_COLUMN_TYPES, insert, "Insert query has mismatched column types: " + "Table: (" + Joiner.on(", ").join(tableTypes) + "), " + "Query: (" + Joiner.on(", ").join(queryTypes) + ")"); } return new RelationType(Field.newUnqualified("rows", BIGINT)); }