IteratorX - 极简的 JDBC/File 读取器


Apache-2.0
跨平台
Java

软件简介

IteratorX 是一个极简主义的 jdbc/file reader。

参见:

maven:https://mvnrepository.com/artifact/io.iteratorx/iteratorx

使用:

1. Reader: JdbcReader, FileReader

超简单,从 jdbc 或 file 中读取数据到 JSONObject 中。

1.1. JdbcReader:读取 jdbc

  // create jdbc reader
  final JdbcReader jdbcReader = new JdbcReader(
      new JdbcDataSourceBuilder().setUrl("jdbc:postgresql://10.23.112.2:3333/dbname")
          .setUser("username").setPassword("password").build());
  
  // fetch by iterable
  for (final JSONObject item : jdbcReader.read("select * from tablename")) {
    System.err.println(item);
  }
  
  // fetch all into one collection
  final Collection items = jdbcReader.readAll("select * from tablename where type = ?", param);
  for (final JSONObject item : items) {
    System.err.println(item);
  }

1.2. FileReader:读取 file

  // create file reader
  final FileReader fileReader = new FileReader();

  // fetch by iterable
  for (final JSONObject item : fileReader.read(new File("data.json"), "utf-8")) {
    System.err.println(item);
  }

  // fetch all into one collection
  final Collection items = fileReader.readAll(new File("data.json"), "utf-8");
  for (final JSONObject item : items) {
    System.err.println(item);
  }

2. Parallels: Threads, Flink, RxJava

简单易用的 多线程处理。

2.1. Threads: 使用 ThreadPool 并行处理

  // process each item parallelly using thread pool
  Threads.from(jdbcReader.read("select * from tablename")).forEach(item -> {
    System.err.println(item);
  });
  
  // process batch data parallelly
  Threads.from(jdbcReader.read("select * from tablename")).forBatch(items -> {
    for (final JSONObject item : items) {
      System.err.println(item);
    }
  });

2.2. Flink: 使用 Flink 并行处理

  // process each item parallelly using Flink engine
  Flink.from(jdbcReader.read("select * from tablename")).forEach(item -> {
    System.err.println(item);
  });
  
  // process batch data parallelly
  Flink.from(jdbcReader.read("select * from tablename")).forBatch(items -> {
    for (final JSONObject item : items) {
      System.err.println(item);
    }
  });
  
  // use DataSet directly to enable all Flink power
  Flink.from(jdbcReader.read("select * from tablename")).dataSet().distinct().count();

2.3. RxJava: 使用 RxJava 并行处理

  // process each item parallely using RxJava engine
  RxJava.from(jdbcReader.read("select * from tablename")).forEach(item -> {
    System.err.println(item);
  });
  
  // process batch data parallely
  RxJava.from(jdbcReader.read("select * from tablename")).forBatch(items -> {
    for (final JSONObject item : items) {
      System.err.println(item);
    }
  });
  
  // use Observable directly
  RxJava.from(jdbcReader.read("select * from tablename")).observable().distinct().count();

Bug:使用 RxJava 的时候,程序结束了停不下来,谁能帮忙解决吗?

参见:

maven:https://mvnrepository.com/artifact/io.iteratorx/iteratorx