我们从Python开源项目中,提取了以下7个代码示例,用于说明如何使用toolz.partition_all()。
def _batches(self): """ Partition the data into consecutive data sets of the specified batch size. :return: batched data :rtype: DataFrame iterator """ t1 = partition_all(self.batch_size, self.data[text_1]) t2 = partition_all(self.batch_size, self.data[text_2]) if self._labeled: l = partition_all(self.batch_size, self.data[label].cat.codes) batches = zip(t1, t2, l) else: batches = zip(t1, t2) for batch in batches: if self._labeled: columns = [text_1, text_2, label] else: columns = [text_1, text_2] yield DataFrame(dict(zip(columns, batch)), columns=columns)
def scrape_blockchain(mongo): s = Steem() # see how far behind we are missing = list(range(last_block_num(mongo), s.last_irreversible_block_num)) # if we are far behind blockchain head # split work in chunks of 100 if len(missing) > 100: for batch in partition_all(100, missing): results = s.get_blocks(batch) insert_blocks(mongo, results) # otherwise continue as normal blockchain = Blockchain(mode="irreversible") hist = blockchain.stream_from(start_block=last_block_num(mongo), full_blocks=True) insert_blocks(mongo, hist)
def main(in_dir, out_dir, n_process=int(multiprocessing.cpu_count() * .75), n_thread=4, batch_size=10000): # Create the output directory, if it doesn't exist if not path.exists(out_dir): makedirs(out_dir) # Get total number of input files for tracking progress total_files = len(list(iter_dir(in_dir))) # For each input file for i, file in enumerate(iter_dir(in_dir)): # Print progress print('Tagging file %s of %s' % (i + 1, total_files)) # If multiprocessing if n_process >= 2: # Split up text in the input file texts = partition_all(100000, iter_lines(file)) # Parallelize the job parallelize(save_parses, enumerate(texts), n_process, [out_dir, n_thread, batch_size], backend='multiprocessing') # If not multiprocessing else: save_parses(0, iter_lines(file), out_dir, n_thread, batch_size)
def sync_from_file(file_path, skip_lines, chunk_size=250, is_initial_sync=False): with open(file_path) as f: # each line in file represents one block # we can skip the blocks we already have remaining = drop(skip_lines, f) for batch in partition_all(chunk_size, remaining): process_blocks(map(json.loads, batch), is_initial_sync)
def compute_date_range_chunks(sessions, start_date, end_date, chunksize): """Compute the start and end dates to run a pipeline for. Parameters ---------- sessions : DatetimeIndex The available dates. start_date : pd.Timestamp The first date in the pipeline. end_date : pd.Timestamp The last date in the pipeline. chunksize : int or None The size of the chunks to run. Setting this to None returns one chunk. Returns ------- ranges : iterable[(np.datetime64, np.datetime64)] A sequence of start and end dates to run the pipeline for. """ if start_date not in sessions: raise KeyError("Start date %s is not found in calendar." % (start_date.strftime("%Y-%m-%d"),)) if end_date not in sessions: raise KeyError("End date %s is not found in calendar." % (end_date.strftime("%Y-%m-%d"),)) if end_date < start_date: raise ValueError("End date %s cannot precede start date %s." % (end_date.strftime("%Y-%m-%d"), start_date.strftime("%Y-%m-%d"))) if chunksize is None: return [(start_date, end_date)] start_ix, end_ix = sessions.slice_locs(start_date, end_date) return ( (r[0], r[-1]) for r in partition_all( chunksize, sessions[start_ix:end_ix] ) )
def optimize(model, sampler, train, valid): """ Optimize the model. TODO: implement early-stopping :param model: model to optimize :param sampler: mini-batch sampler :param train: train user-item matrix :param valid: validation user-item matrix :return: None """ sess = tf.Session() sess.run(tf.global_variables_initializer()) if model.feature_projection is not None: # initialize item embedding with feature projection sess.run(tf.assign(model.item_embeddings, model.feature_projection)) while True: # create evaluator on validation set validation_recall = RecallEvaluator(train, valid) # compute recall on validate set valid_recalls = [] # sample some users to calculate recall validation valid_users = list(set(valid.nonzero()[0]))[:300] for user_chunk in toolz.partition_all(300, valid_users): scores = sess.run(model.item_scores, {model.score_user_ids: user_chunk}) valid_recalls.extend([validation_recall.eval(user, user_scores) for user, user_scores in zip(user_chunk, scores)] ) print("\nRecall on (sampled) validation set: {}".format(numpy.mean(valid_recalls))) # TODO: early stopping based on validation recall # train model losses = [] # run n mini-batches for _ in tqdm(range(EVALUATION_EVERY_N_BATCHES), desc="Optimizing..."): user_pos, neg = sampler.next_batch() _, loss = sess.run((model.optimize, model.loss), {model.user_positive_items_pairs: user_pos, model.negative_samples: neg}) losses.append(loss) print("\nTraining loss {}".format(numpy.mean(losses)))
def optimize(model, sampler, train, valid): """ Optimize the model. TODO: implement early-stopping :param model: model to optimize :param sampler: mini-batch sampler :param train: train user-item matrix :param valid: validation user-item matrix :return: None """ sess = tf.Session() sess.run(tf.global_variables_initializer()) if model.feature_projection is not None: # initialize item embedding with feature projection sess.run(tf.assign(model.item_embeddings, model.feature_projection)) # sample some users to calculate recall validation valid_users = numpy.random.choice(list(set(valid.nonzero()[0])), size=1000, replace=False) while True: # create evaluator on validation set validation_recall = RecallEvaluator(model, train, valid) # compute recall on validate set valid_recalls = [] # compute recall in chunks to utilize speedup provided by Tensorflow for user_chunk in toolz.partition_all(100, valid_users): valid_recalls.extend([validation_recall.eval(sess, user_chunk)]) print("\nRecall on (sampled) validation set: {}".format(numpy.mean(valid_recalls))) # TODO: early stopping based on validation recall # train model losses = [] # run n mini-batches for _ in tqdm(range(EVALUATION_EVERY_N_BATCHES), desc="Optimizing..."): user_pos, neg = sampler.next_batch() _, loss = sess.run((model.optimize, model.loss), {model.user_positive_items_pairs: user_pos, model.negative_samples: neg}) losses.append(loss) print("\nTraining loss {}".format(numpy.mean(losses)))