/** * * @param topKvalueCandidates the topK results per entity, acquired from value similarity * @param rawTriples1 the rdf triples of the first entity collection * @param rawTriples2 the rdf triples of the second entity collection * @param SEPARATOR the delimiter that separates subjects, predicates and objects in the rawTriples1 and rawTriples2 files * @param entityIds1 the mapping of entity urls to entity ids, as it was used in blocking * @param entityIds2 * @param MIN_SUPPORT_THRESHOLD the minimum support threshold, below which, relations are discarded from top relations * @param K the K for topK candidate matches * @param N the N for topN rdf relations (and neighbors) * @param jsc the java spark context used to load files and broadcast variables * @return topK neighbor candidates per entity */ public JavaPairRDD<Integer, IntArrayList> run(JavaPairRDD<Integer,Int2FloatLinkedOpenHashMap> topKvalueCandidates, JavaRDD<String> rawTriples1, JavaRDD<String> rawTriples2, String SEPARATOR, JavaRDD<String> entityIds1, JavaRDD<String> entityIds2, float MIN_SUPPORT_THRESHOLD, int K, int N, JavaSparkContext jsc) { Map<Integer,IntArrayList> inNeighbors = new HashMap<>(new RelationsRank().run(rawTriples1, SEPARATOR, entityIds1, MIN_SUPPORT_THRESHOLD, N, true, jsc)); inNeighbors.putAll(new RelationsRank().run(rawTriples2, SEPARATOR, entityIds2, MIN_SUPPORT_THRESHOLD, N, false, jsc)); Broadcast<Map<Integer,IntArrayList>> inNeighbors_BV = jsc.broadcast(inNeighbors); //JavaPairRDD<Integer, IntArrayList> topKneighborCandidates = getTopKNeighborSims(topKvalueCandidates, inNeighbors_BV, K); JavaPairRDD<Integer, IntArrayList> topKneighborCandidates = getTopKNeighborSimsSUM(topKvalueCandidates, inNeighbors_BV, K); return topKneighborCandidates; }
/** * * @param topKvalueCandidates the topK results per entity, acquired from value similarity * @param rawTriples1 the rdf triples of the first entity collection * @param rawTriples2 the rdf triples of the second entity collection * @param SEPARATOR the delimiter that separates subjects, predicates and objects in the rawTriples1 and rawTriples2 files * @param entityIds1 the mapping of entity urls to entity ids, as it was used in blocking * @param entityIds2 * @param MIN_SUPPORT_THRESHOLD the minimum support threshold, below which, relations are discarded from top relations * @param K the K for topK candidate matches * @param N the N for topN rdf relations (and neighbors) * @param jsc the java spark context used to load files and broadcast variables * @return topK neighbor candidates per entity */ public JavaPairRDD<Integer, IntArrayList> run(JavaPairRDD<Integer,Int2FloatLinkedOpenHashMap> topKvalueCandidates, JavaRDD<String> rawTriples1, JavaRDD<String> rawTriples2, String SEPARATOR, JavaRDD<String> entityIds1, JavaRDD<String> entityIds2, float MIN_SUPPORT_THRESHOLD, int K, int N, JavaSparkContext jsc) { Map<Integer,IntArrayList> inNeighbors = new HashMap<>(new RelationsRank().run(rawTriples1, SEPARATOR, entityIds1, MIN_SUPPORT_THRESHOLD, N, true, jsc)); inNeighbors.putAll(new RelationsRank().run(rawTriples2, SEPARATOR, entityIds2, MIN_SUPPORT_THRESHOLD, N, false, jsc)); Broadcast<Map<Integer,IntArrayList>> inNeighbors_BV = jsc.broadcast(inNeighbors); JavaPairRDD<Integer, IntArrayList> topKneighborCandidates = getTopKNeighborSimsSUM(topKvalueCandidates, inNeighbors_BV, K); return topKneighborCandidates; }
/** * * @param topKvalueCandidates the topK results per entity, acquired from value similarity * @param rawTriples1 the rdf triples of the first entity collection * @param rawTriples2 the rdf triples of the second entity collection * @param SEPARATOR the delimiter that separates subjects, predicates and objects in the rawTriples1 and rawTriples2 files * @param entityIds1 the mapping of entity urls to entity ids, as it was used in blocking * @param entityIds2 * @param MIN_SUPPORT_THRESHOLD the minimum support threshold, below which, relations are discarded from top relations * @param K the K for topK candidate matches * @param N the N for topN rdf relations (and neighbors) * @param jsc the java spark context used to load files and broadcast variables * @return topK neighbor candidates per entity */ public JavaPairRDD<Integer, Int2FloatLinkedOpenHashMap> run2(JavaPairRDD<Integer,Int2FloatLinkedOpenHashMap> topKvalueCandidates, JavaRDD<String> rawTriples1, JavaRDD<String> rawTriples2, String SEPARATOR, JavaRDD<String> entityIds1, JavaRDD<String> entityIds2, float MIN_SUPPORT_THRESHOLD, int K, int N, JavaSparkContext jsc) { Map<Integer,IntArrayList> inNeighbors = new HashMap<>(new RelationsRank().run(rawTriples1, SEPARATOR, entityIds1, MIN_SUPPORT_THRESHOLD, N, true, jsc)); inNeighbors.putAll(new RelationsRank().run(rawTriples2, SEPARATOR, entityIds2, MIN_SUPPORT_THRESHOLD, N, false, jsc)); Broadcast<Map<Integer,IntArrayList>> inNeighbors_BV = jsc.broadcast(inNeighbors); JavaPairRDD<Integer, Int2FloatLinkedOpenHashMap> topKneighborCandidates = getTopKNeighborSimsSUMWithScores(topKvalueCandidates, inNeighbors_BV, K); return topKneighborCandidates; }
/** * * @param topKvalueCandidates the topK results per entity, acquired from value similarity * @param rawTriples1 the rdf triples of the first entity collection * @param rawTriples2 the rdf triples of the second entity collection * @param SEPARATOR the delimiter that separates subjects, predicates and objects in the rawTriples1 and rawTriples2 files * @param entityIds1 the mapping of entity urls to entity ids, as it was used in blocking * @param entityIds2 * @param MIN_SUPPORT_THRESHOLD the minimum support threshold, below which, relations are discarded from top relations * @param K the K for topK candidate matches * @param N the N for topN rdf relations (and neighbors) * @param jsc the java spark context used to load files and broadcast variables * @return topK neighbor candidates per entity */ public JavaPairRDD<Integer, IntArrayList> run(JavaPairRDD<Integer,Int2FloatLinkedOpenHashMap> topKvalueCandidates, JavaRDD<String> rawTriples1, JavaRDD<String> rawTriples2, String SEPARATOR, JavaRDD<String> entityIds1, JavaRDD<String> entityIds2, float MIN_SUPPORT_THRESHOLD, int K, int N, JavaSparkContext jsc) { Map<Integer,IntArrayList> inNeighbors = new HashMap<>(new RelationsRank().run(rawTriples1, SEPARATOR, entityIds1, MIN_SUPPORT_THRESHOLD, N, true, jsc)); inNeighbors.putAll(new RelationsRank().run(rawTriples2, SEPARATOR, entityIds2, MIN_SUPPORT_THRESHOLD, N, false, jsc)); Broadcast<Map<Integer,IntArrayList>> inNeighbors_BV = jsc.broadcast(inNeighbors); //JavaPairRDD<Tuple2<Integer, Integer>, Float> neighborSims = getNeighborSims(topKvalueCandidates, inNeighbors_BV); //JavaPairRDD<Integer, IntArrayList> topKneighborCandidates = getTopKNeighborSimsOld(neighborSims, K); JavaPairRDD<Integer, IntArrayList> topKneighborCandidates = getTopKNeighborSims(topKvalueCandidates, inNeighbors_BV, K); return topKneighborCandidates; }
/** * Aggregates the two lists of candidate matches per entity using Borda, and returns the top-1 aggregate candidate match per entity. * @param topKValueCandidates the top candidate matches per entity based on values, in the form: key: entityId, value: map of [candidateMatch, valueSim(entityId,candidateMatch)] * @param topKNeighborCandidates the top candidate matches per entity based on neighbors, in the form: key: entityId, value: ranked list of [candidateMatch] * @param LISTS_WITH_COMMON_CANDIDATES * @return the top-1 aggregate candidate match per entity */ public JavaPairRDD<Integer,Integer> getTopCandidatePerEntity(JavaPairRDD<Integer, Int2FloatLinkedOpenHashMap> topKValueCandidates, JavaPairRDD<Integer, IntArrayList> topKNeighborCandidates, LongAccumulator LISTS_WITH_COMMON_CANDIDATES) { return topKValueCandidates .mapValues(x -> new IntArrayList(Utils.sortByValue(x, true).keySet())) //sort the int2floatopenhashmap and get the keys (entityIds) sorted by values (value similarity) (descending) .fullOuterJoin(topKNeighborCandidates) .mapValues(x -> top1Borda(x, LISTS_WITH_COMMON_CANDIDATES)) .filter((x -> x._2() != null)); }
/** * Aggregates the two lists of candidate matches per entity using Borda, and returns the top-1 aggregate candidate match per entity. * @param topKValueCandidates the top candidate matches per entity based on values, in the form: key: entityId, value: map of [candidateMatch, valueSim(entityId,candidateMatch)] * @param topKNeighborCandidates the top candidate matches per entity based on neighbors, in the form: key: entityId, value: ranked list of [candidateMatch] * @param LISTS_WITH_COMMON_CANDIDATES * @param K how many candidates to keep per entity * @return the top-K aggregate candidate match per entity */ public JavaPairRDD<Integer,IntArrayList> getTopKCandidatesPerEntity(JavaPairRDD<Integer, Int2FloatLinkedOpenHashMap> topKValueCandidates, JavaPairRDD<Integer, IntArrayList> topKNeighborCandidates, LongAccumulator LISTS_WITH_COMMON_CANDIDATES, int K, LongAccumulator RESULTS_FROM_VALUES, LongAccumulator RESULTS_FROM_NEIGHBORS, LongAccumulator RESULTS_FROM_SUM, LongAccumulator RESULTS_FROM_VALUES_WITHOUT_NEIGHBORS, LongAccumulator RESULTS_FROM_NEIGHBORS_WITHOUT_VALUES) { return topKValueCandidates .mapValues(x -> new IntArrayList(Utils.sortByValue(x, true).keySet())) //sort the int2floatopenhashmap and get the keys (entityIds) sorted by values (value similarity) (descending) .fullOuterJoin(topKNeighborCandidates) .mapValues(x -> topKBorda(x, LISTS_WITH_COMMON_CANDIDATES, K, RESULTS_FROM_VALUES, RESULTS_FROM_NEIGHBORS, RESULTS_FROM_SUM, RESULTS_FROM_VALUES_WITHOUT_NEIGHBORS, RESULTS_FROM_NEIGHBORS_WITHOUT_VALUES)) .filter((x -> x._2() != null)); }
/** * Aggregates the two lists of candidate matches per entity using Borda, and returns the top-1 aggregate candidate match per entity. * @param topKValueCandidates the top candidate matches per entity based on values, in the form: key: entityId, value: map of [candidateMatch, valueSim(entityId,candidateMatch)] * @param topKNeighborCandidates the top candidate matches per entity based on neighbors, in the form: key: entityId, value: ranked list of [candidateMatch] * @param LISTS_WITH_COMMON_CANDIDATES * @param K how many candidates to keep per entity * @return the top-K aggregate candidate match per entity */ public JavaPairRDD<Integer,PriorityQueue<ComparableIntFloatPairDUMMY>> getTopKCandidatesPerEntityDEBUGGING(JavaPairRDD<Integer, Int2FloatLinkedOpenHashMap> topKValueCandidates, JavaPairRDD<Integer, IntArrayList> topKNeighborCandidates, LongAccumulator LISTS_WITH_COMMON_CANDIDATES, int K, LongAccumulator RESULTS_FROM_VALUES, LongAccumulator RESULTS_FROM_NEIGHBORS, LongAccumulator RESULTS_FROM_SUM, LongAccumulator RESULTS_FROM_VALUES_WITHOUT_NEIGHBORS, LongAccumulator RESULTS_FROM_NEIGHBORS_WITHOUT_VALUES) { return topKValueCandidates .mapValues(x -> new IntArrayList(Utils.sortByValue(x, true).keySet())) //sort the int2floatopenhashmap and get the keys (entityIds) sorted by values (value similarity) (descending) .fullOuterJoin(topKNeighborCandidates) .mapValues(x -> topKBordaDEBUGGING(x, LISTS_WITH_COMMON_CANDIDATES, K, RESULTS_FROM_VALUES, RESULTS_FROM_NEIGHBORS, RESULTS_FROM_SUM, RESULTS_FROM_VALUES_WITHOUT_NEIGHBORS, RESULTS_FROM_NEIGHBORS_WITHOUT_VALUES)) .filter((x -> x._2() != null)); }
/** * * @param blocksFromEI * @param totalWeightsBV * @param K * @param numNegativeEntities * @param numPositiveEntities * @return key: an entityId, value: a list of pairs of candidate matches along with their value_sim with the key */ public JavaPairRDD<Integer,Int2FloatLinkedOpenHashMap> getTopKValueSims(JavaPairRDD<Integer, IntArrayList> blocksFromEI, int K, long numNegativeEntities, long numPositiveEntities) { //key: an entityId, value: a list of candidate matches, with first number being the number of entities from the same collection in this block JavaPairRDD<Integer, IntArrayList> mapOutput = CNPMapPhase.getMapOutputWJS(blocksFromEI); //reduce phase //metaBlockingResults: key: a negative entityId, value: a list of candidate matches (positive entity ids) along with their value_sim with the key return mapOutput .groupByKey() //for each entity create an iterable of arrays of candidate matches (one array from each common block) .mapToPair(x -> { int entityId = x._1(); //compute the numerators Int2FloatOpenHashMap counters = new Int2FloatOpenHashMap(); //number of common blocks with current entity per candidate match for(IntArrayList candidates : x._2()) { int numNegativeEntitiesInBlock = candidates.getInt(0); //the first element is the number of entities from the same collection int numPositiveEntitiesInBlock = candidates.size()-1; //all the other candidates are positive entity ids if (entityId >= 0) { numPositiveEntitiesInBlock = candidates.getInt(0); numNegativeEntitiesInBlock = candidates.size()-1; } float weight1 = (float) Math.log10((double)numNegativeEntities/numNegativeEntitiesInBlock); float weight2 = (float) Math.log10((double)numPositiveEntities/numPositiveEntitiesInBlock); candidates = new IntArrayList(candidates.subList(1, candidates.size())); //remove the first element which is the number of entities in this block from the same collection as the entityId for (int candidateId : candidates) { counters.addTo(candidateId, weight1+weight2); } } //keep the top-K weights Int2FloatLinkedOpenHashMap weights = new Int2FloatLinkedOpenHashMap(Utils.sortByValue(counters, true)); Int2FloatLinkedOpenHashMap weightsToEmit = new Int2FloatLinkedOpenHashMap(); int i = 0; for (Map.Entry<Integer, Float> neighbor : weights.entrySet()) { if (i == weights.size() || i == K) { break; } weightsToEmit.put(neighbor.getKey().intValue(), weights.get(neighbor.getKey().intValue())); i++; } return new Tuple2<>(entityId, weightsToEmit); }) .filter(x-> !x._2().isEmpty()); }