IteratorX 是一个极简主义的 jdbc/file reader。
参见:
maven:https://mvnrepository.com/artifact/io.iteratorx/iteratorx
使用:
超简单,从 jdbc 或 file 中读取数据到 JSONObject 中。
// create jdbc reader final JdbcReader jdbcReader = new JdbcReader( new JdbcDataSourceBuilder().setUrl("jdbc:postgresql://10.23.112.2:3333/dbname") .setUser("username").setPassword("password").build()); // fetch by iterable for (final JSONObject item : jdbcReader.read("select * from tablename")) { System.err.println(item); } // fetch all into one collection final Collection items = jdbcReader.readAll("select * from tablename where type = ?", param); for (final JSONObject item : items) { System.err.println(item); }
// create file reader final FileReader fileReader = new FileReader(); // fetch by iterable for (final JSONObject item : fileReader.read(new File("data.json"), "utf-8")) { System.err.println(item); } // fetch all into one collection final Collection items = fileReader.readAll(new File("data.json"), "utf-8"); for (final JSONObject item : items) { System.err.println(item); }
简单易用的 多线程处理。
// process each item parallelly using thread pool Threads.from(jdbcReader.read("select * from tablename")).forEach(item -> { System.err.println(item); }); // process batch data parallelly Threads.from(jdbcReader.read("select * from tablename")).forBatch(items -> { for (final JSONObject item : items) { System.err.println(item); } });
// process each item parallelly using Flink engine Flink.from(jdbcReader.read("select * from tablename")).forEach(item -> { System.err.println(item); }); // process batch data parallelly Flink.from(jdbcReader.read("select * from tablename")).forBatch(items -> { for (final JSONObject item : items) { System.err.println(item); } }); // use DataSet directly to enable all Flink power Flink.from(jdbcReader.read("select * from tablename")).dataSet().distinct().count();
// process each item parallely using RxJava engine RxJava.from(jdbcReader.read("select * from tablename")).forEach(item -> { System.err.println(item); }); // process batch data parallely RxJava.from(jdbcReader.read("select * from tablename")).forBatch(items -> { for (final JSONObject item : items) { System.err.println(item); } }); // use Observable directly RxJava.from(jdbcReader.read("select * from tablename")).observable().distinct().count();
Bug:使用 RxJava 的时候,程序结束了停不下来,谁能帮忙解决吗?