Java 类com.google.common.collect.MigrateMap 实例源码

项目:yugong    文件:FullRecordApplier.java   
protected void doApply(List<Record> records) {
    Map<List<String>, List<Record>> buckets = MigrateMap.makeComputingMap(new Function<List<String>, List<Record>>() {

        public List<Record> apply(List<String> names) {
            return Lists.newArrayList();
        }
    });

    // 根据目标库的不同,划分为多个bucket
    for (Record record : records) {
        buckets.get(Arrays.asList(record.getSchemaName(), record.getTableName())).add(record);
    }

    JdbcTemplate jdbcTemplate = new JdbcTemplate(context.getTargetDs());
    for (final List<Record> batchRecords : buckets.values()) {
        TableSqlUnit sqlUnit = getSqlUnit(batchRecords.get(0));
        if (context.isBatchApply()) {
            applierByBatch(jdbcTemplate, batchRecords, sqlUnit);
        } else {
            applyOneByOne(jdbcTemplate, batchRecords, sqlUnit);
        }
    }
}
项目:yugong    文件:CheckRecordApplier.java   
public void start() {
    super.start();

    dbType = YuGongUtils.judgeDbType(context.getTargetDs());
    tableCache = MigrateMap.makeComputingMap(new Function<List<String>, Table>() {

        public Table apply(List<String> names) {
            if (names.size() != 2) {
                throw new YuGongException("names[" + names.toString() + "] is not valid");
            }

            return TableMetaGenerator.getTableMeta(context.getTargetDs(),
                context.isIgnoreSchema() ? null : names.get(0),
                names.get(1));
        }
    });

    selectSqlCache = MigrateMap.makeMap();
}
项目:yugong    文件:CheckRecordApplier.java   
protected void doApply(List<Record> records) {
    Map<List<String>, List<Record>> buckets = MigrateMap.makeComputingMap(new Function<List<String>, List<Record>>() {

        public List<Record> apply(List<String> names) {
            return Lists.newArrayList();
        }
    });

    // 根据目标库的不同,划分为多个bucket
    for (Record record : records) {
        buckets.get(Arrays.asList(record.getSchemaName(), record.getTableName())).add(record);
    }

    JdbcTemplate jdbcTemplate = new JdbcTemplate(context.getTargetDs());
    for (final List<Record> batchRecords : buckets.values()) {
        List<Record> queryRecords = null;
        if (context.isBatchApply()) {
            queryRecords = queryByBatch(jdbcTemplate, batchRecords);
        } else {
            queryRecords = queryOneByOne(jdbcTemplate, batchRecords);
        }

        diff(batchRecords, queryRecords);
    }
}
项目:canal    文件:MemoryMetaManager.java   
public void start() {
    super.start();

    batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() {

        public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) {
            return MemoryClientIdentityBatch.create(clientIdentity);
        }

    });

    cursors = new MapMaker().makeMap();

    destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>() {

        public List<ClientIdentity> apply(String destination) {
            return Lists.newArrayList();
        }
    });
}
项目:canal    文件:FileMixedLogPositionManager.java   
public FileMixedLogPositionManager(File dataDir, long period, MemoryLogPositionManager memoryLogPositionManager){
    if (dataDir == null) {
        throw new NullPointerException("null dataDir");
    }
    if (period <= 0) {
        throw new IllegalArgumentException("period must be positive, given: " + period);
    }
    if (memoryLogPositionManager == null) {
        throw new NullPointerException("null memoryLogPositionManager");
    }
    this.dataDir = dataDir;
    this.period = period;
    this.memoryLogPositionManager = memoryLogPositionManager;

    this.dataFileCaches = MigrateMap.makeComputingMap(new Function<String, File>() {

        public File apply(String destination) {
            return getDataFile(destination);
        }
    });

    this.executorService = Executors.newScheduledThreadPool(1);
    this.persistTasks = Collections.synchronizedSet(new HashSet<String>());
}
项目:yugong    文件:OracleMaterializedIncRecordExtractor.java   
public void start() {
    super.start();

    masterSqlCache = MigrateMap.makeMap();
    String schemaName = context.getTableMeta().getSchema();
    String tableName = context.getTableMeta().getName();

    // 后去mlog表名
    String mlogTableName = TableMetaGenerator.getMLogTableName(context.getSourceDs(), schemaName, tableName);
    if (StringUtils.isEmpty(mlogTableName)) {
        throw new YuGongException("not found mlog table for [" + schemaName + "." + tableName + "]");
    }
    // 获取mlog表结构
    mlogMeta = TableMetaGenerator.getTableMeta(context.getSourceDs(),
        context.getTableMeta().getSchema(),
        mlogTableName);

    // 构造mlog sql
    String colstr = SqlTemplates.COMMON.makeColumn(mlogMeta.getColumns());
    mlogExtractSql = new MessageFormat(MLOG_EXTRACT_FORMAT).format(new Object[] { colstr, schemaName, mlogTableName });
    mlogCleanSql = new MessageFormat(MLOG_CLEAN_FORMAT).format(new Object[] { schemaName, mlogTableName });

    executorName = this.getClass().getSimpleName() + "-" + context.getTableMeta().getFullName();
    if (executor == null) {
        executor = new ThreadPoolExecutor(threadSize,
            threadSize,
            60,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(threadSize * 2),
            new NamedThreadFactory(executorName),
            new ThreadPoolExecutor.CallerRunsPolicy());
    }

    tracer.update(context.getTableMeta().getFullName(), ProgressStatus.INCING);
}
项目:yugong    文件:IncrementRecordApplier.java   
public void start() {
    super.start();
    dbType = YuGongUtils.judgeDbType(context.getTargetDs());
    insertSqlCache = MigrateMap.makeMap();
    updateSqlCache = MigrateMap.makeMap();
    deleteSqlCache = MigrateMap.makeMap();
}
项目:yugong    文件:DataSourceFactory.java   
public void start() {
    super.start();
    dataSources = MigrateMap.makeComputingMap(new Function<DataSourceConfig, DataSource>() {

        public DataSource apply(DataSourceConfig config) {
            return createDataSource(config.getUrl(),
                config.getUsername(),
                config.getPassword(),
                config.getType(),
                config.getProperties());
        }
    });

}
项目:cango    文件:DataSourceFactory.java   
public void start() {
    super.start();
    dataSources = MigrateMap.makeComputingMap(new Function<DataSourceConfig, DataSource>() {

        public DataSource apply(DataSourceConfig config) {
            return createDataSource(config.getUrl(),
                    config.getUsername(),
                    config.getPassword(),
                    config.getType(),
                    config.getProperties());
        }
    });

}
项目:otter    文件:LoadStatsTracker.java   
public LoadThroughput(Identity identity){
    counters = MigrateMap.makeComputingMap(new Function<Long, LoadCounter>() {

        public LoadCounter apply(Long pairId) {
            return new LoadCounter(pairId);
        }
    });
}
项目:canal    文件:CanalServerWithEmbedded.java   
public void start() {
    if (!isStart()) {
        super.start();

        canalInstances = MigrateMap.makeComputingMap(new Function<String, CanalInstance>() {

            public CanalInstance apply(String destination) {
                return canalInstanceGenerator.generate(destination);
            }
        });

        // lastRollbackPostions = new MapMaker().makeMap();
    }
}
项目:yugong    文件:FullRecordApplier.java   
public void start() {
    super.start();
    dbType = YuGongUtils.judgeDbType(context.getTargetDs());
    applierSqlCache = MigrateMap.makeMap();
}