我们从Python开源项目中,提取了以下50个代码示例,用于说明如何使用pyspark.SparkContext()。
def create_sc(): sc_conf = SparkConf() sc_conf.setAppName("finance-similarity-app") sc_conf.setMaster('spark://10.21.208.21:7077') sc_conf.set('spark.executor.memory', '2g') sc_conf.set('spark.executor.cores', '4') sc_conf.set('spark.cores.max', '40') sc_conf.set('spark.logConf', True) print sc_conf.getAll() sc = None try: sc.stop() sc = SparkContext(conf=sc_conf) except: sc = SparkContext(conf=sc_conf) return sc
def get_rdd(es_index, es_type): if es_type is "": resource = es_index else: resource = es_index + "/" + es_type es_read_conf = { "es.nodes": ES_IP, "es.port": ES_PORT, "es.resource": resource, "es.index.read.missing.as.empty": 'yes' } conf = SparkConf().setAppName("Unfetter") sc = SparkContext(conf=conf) rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_read_conf) return rdd
def main(argv=None): args = parse_arguments(argv) if args['very_verbose']: logging.basicConfig(level=logging.DEBUG) elif args['verbose']: logging.basicConfig(level=logging.INFO) else: logging.basicConfig() del args['verbose'] del args['very_verbose'] sc = SparkContext(appName="MLR: data collection pipeline") # spark info logging is incredibly spammy. Use warn to have some hope of # human decipherable output sc.setLogLevel('WARN') sqlContext = HiveContext(sc) run_pipeline(sc, sqlContext, **args)
def main(): conf = SparkConf().setMaster("local[2]").setAppName("Streamer") sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 10) # Create a streaming context with batch interval of 10 sec ssc.checkpoint("checkpoint") pwords = load_wordlist("positive.txt") nwords = load_wordlist("negative.txt") ts = time.time() counts = stream(ssc, pwords, nwords, 100) make_plot(counts)
def _py2java(sc, obj): """ Convert Python object into Java """ if isinstance(obj, RDD): obj = _to_java_object_rdd(obj) elif isinstance(obj, DataFrame): obj = obj._jdf elif isinstance(obj, SparkContext): obj = obj._jsc elif isinstance(obj, list): obj = [_py2java(sc, x) for x in obj] elif isinstance(obj, JavaObject): pass elif isinstance(obj, (int, long, float, bool, bytes, unicode)): pass else: data = bytearray(PickleSerializer().dumps(obj)) obj = sc._jvm.org.apache.spark.ml.python.MLSerDe.loads(data) return obj
def main(): conf = SparkConf().set("spark.ui.showConsoleProgress", "false") sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf) def run(): rdd = sc.parallelize(range(10), 10).map(delayed(2)) reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) return reduced.map(delayed(2)).collect() result = call_in_background(run) status = sc.statusTracker() while result.empty(): ids = status.getJobIdsForGroup() for id in ids: job = status.getJobInfo(id) print("Job", id, "status: ", job.status) for sid in job.stageIds: info = status.getStageInfo(sid) if info: print("Stage %d: %d tasks total (%d active, %d complete)" % (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks)) time.sleep(1) print("Job results are:", result.get()) sc.stop()
def sql_context(self, application_name): """Create a spark context given the parameters configured in this class. The caller is responsible for calling ``.close`` on the resulting spark context Parameters ---------- application_name : string Returns ------- sc : SparkContext """ sc = self.spark_context(application_name) import pyspark sqlContext = pyspark.SQLContext(sc) return (sc, sqlContext)
def benchmark_spark(ratings, factors, iterations=5): conf = (SparkConf() .setAppName("implicit_benchmark") .setMaster('local[*]') .set('spark.driver.memory', '16G') ) context = SparkContext(conf=conf) spark = SparkSession(context) times = {} try: ratings = convert_sparse_to_dataframe(spark, context, ratings) for rank in factors: als = ALS(rank=rank, maxIter=iterations, alpha=1, implicitPrefs=True, userCol="row", itemCol="col", ratingCol="data") start = time.time() als.fit(ratings) elapsed = time.time() - start times[rank] = elapsed / iterations print("spark. factors=%i took %.3f" % (rank, elapsed/iterations)) finally: spark.stop() return times
def main(): conf = SparkConf().setMaster("local[2]").setAppName("YelpRecommender") sc = SparkContext(conf=conf) rdd_data = readElasticSearch(sc) parsed_mapped_data = rdd_data.filter(location_recommender) sorted_data = parsed_mapped_data.top(150, key=lambda a: a[1]["stars"]) topn_data = copyUniqueData(sorted_data, 5) printResult(topn_data) clearElasticSearch() sorted_rdd = sc.parallelize(topn_data) es_data = sorted_rdd.map(remap_es) es_data.saveAsNewAPIHadoopFile(path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.resource" : "yelpreco/resturant"})
def run(self): self.args = self.parse_arguments() conf = SparkConf().setAll(( ("spark.task.maxFailures", "10"), ("spark.locality.wait", "20s"), ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"), )) sc = SparkContext( appName=self.name, conf=conf) sqlc = SQLContext(sparkContext=sc) self.records_processed = sc.accumulator(0) self.warc_input_processed = sc.accumulator(0) self.warc_input_failed = sc.accumulator(0) self.run_job(sc, sqlc) sc.stop()
def generate_user_actions_with_artist(sc): ''' data_source/user_actions.csv ????artist_id Args: sc: pyspark.SparkContext ''' hdfs_file_dir = 'hdfs:/home/ProjectPOP/data_source' hdfs_song_path = '%s/mars_tianchi_songs.csv' % (hdfs_file_dir) hdfs_action_path = '%s/mars_tianchi_user_actions.csv' % (hdfs_file_dir) logger.info('Start generate song_artist_dict') song_artist_dict = dict(sc.textFile(hdfs_song_path).map(_generate_song_artist_dict).collect()) song_artist_dict_broadcast = sc.broadcast(song_artist_dict) logger.info('Start process user_actions') user_actions = sc.textFile(hdfs_action_path).map(lambda l: _add_artist_into_line(l, song_artist_dict_broadcast)) logger.info(user_actions.take(5)) user_actions.saveAsTextFile('%s/mars_tianchi_songs_with_artist.csv' % (hdfs_file_dir)) return True
def main(): conf = SparkConf().setAppName("binarize nifti") sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') parser = argparse.ArgumentParser(description='Binarize images using FSL installed in a Docker container') parser.add_argument('threshold', type=int, help="binarization threshold") parser.add_argument('folder_path', type=str, help='folder path containing all of the splits') parser.add_argument('output_path', type=str, help='output folder path') args = parser.parse_args() print args.folder_path client = Config().get_client('dev') nibRDD = sc.binaryFiles(args.folder_path)\ .map(lambda x: get_data(x))\ .map(lambda x: binarize(x, args.threshold))\ .map(lambda x: copy_to_hdfs(x, args.output_path, client)).collect()
def main(): input = sys.argv[1] output = sys.argv[2] conf = SparkConf().setAppName('Sparse Matrix Multiplication') sc = SparkContext(conf=conf) assert sc.version >= '1.5.1' sparseMatrix = sc.textFile(input).map(lambda row : row.split(' ')).map(createCSRMatrix).map(multiplyMatrix).reduce(operator.add) outputFile = open(output, 'w') for row in range(len(sparseMatrix.indptr)-1): col = sparseMatrix.indices[sparseMatrix.indptr[row]:sparseMatrix.indptr[row+1]] data = sparseMatrix.data[sparseMatrix.indptr[row]:sparseMatrix.indptr[row+1]] indexValuePairs = zip(col,data) formattedOutput = formatOutput(indexValuePairs) outputFile.write(formattedOutput + '\n')
def main(in_loc, out_dir): logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO) sc = ps.SparkContext(appName='Word2Vec') logger.info('Distributing input data') raw_data = sc.textFile(in_loc).cache() data = raw_data.map(lambda line: line.split(' ')) print(data.getNumPartitions()) logger.info('Training Word2Vec model') model = Word2Vec().setVectorSize(128).setNumIterations(5).fit(data) w2v_dict = model.getVectors() logger.info('Saving word to vectors dictionary') with open(path.join(out_dir, 'w2v_dict.pkl'), 'wb') as f: cPickle.dump(w2v_dict, f, cPickle.HIGHEST_PROTOCOL) model.save(sc, out_dir)
def main(in_dir, out_dir): sc = ps.SparkContext() text_files = sc.textFile(in_dir) counts = text_files.flatMap(lambda line: line.split(' ')) \ .filter(lambda word: any(label in word for label in LABELS)) \ .map(lambda word: (word, 1)) \ .reduceByKey(add) \ .persist(storageLevel=ps.StorageLevel.MEMORY_AND_DISK) total_nouns = counts.values() \ .reduce(add) sorted_nouns = counts.map(lambda (word, count): (word, count / float(total_nouns))) \ .sortBy(lambda (word, count): count, ascending=False) \ .collect() with open(path.join(out_dir, 'sorted_nouns.txt'), 'w+') as f: for word in sorted_nouns: f.write(str(word) + '\n')
def main(argv=None): args = parse_arguments(argv) if args['very_verbose']: logging.basicConfig(level=logging.DEBUG) elif args['verbose']: logging.basicConfig(level=logging.INFO) else: logging.basicConfig() del args['verbose'] del args['very_verbose'] # TODO: Set spark configuration? Some can't actually be set here though, so best might be to set all of it # on the command line for consistency. sc = SparkContext(appName="MLR: training pipeline") sc.setLogLevel('WARN') sqlContext = HiveContext(sc) output_dir = args['output_dir'] if os.path.exists(output_dir): logging.error('Output directory (%s) already exists' % (output_dir)) sys.exit(1) # Maybe this is a bit early to create the path ... but should be fine. # The annoyance might be that an error in training requires deleting # this directory to try again. os.mkdir(output_dir) try: run_pipeline(sc, sqlContext, **args) except: # noqa: E722 # If the directory we created is still empty delete it # so it doesn't need to be manually re-created if not len(glob.glob(os.path.join(output_dir, '*'))): os.rmdir(output_dir) raise
def init_context(self): self.base_hostname = socket.gethostname().split(".")[0] master_node = 'spark://' + self.base_hostname + ':7077' self.context = SparkContext(master_node, 'INFO')
def setUp(self): self.sc = SparkContext('local[4]', "MLlib tests") self.spark = SparkSession(self.sc)
def load(cls, sc, path): """Load the GaussianMixtureModel from disk. :param sc: SparkContext. :param path: Path to where the model is stored. """ model = cls._load_java(sc, path) wrapper = sc._jvm.org.apache.spark.mllib.api.python.GaussianMixtureModelWrapper(model) return cls(wrapper)
def load(cls, sc, path): """Load the LDAModel from disk. :param sc: SparkContext. :param path: Path to where the model is stored. """ if not isinstance(sc, SparkContext): raise TypeError("sc should be a SparkContext, got type %s" % type(sc)) if not isinstance(path, basestring): raise TypeError("path should be a basestring, got type %s" % type(path)) model = callMLlibFunc("loadLDAModel", sc, path) return LDAModel(model)
def _test(): import doctest import pyspark.mllib.clustering globs = pyspark.mllib.clustering.__dict__.copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)
def setUp(self): self.sc = SparkContext('local[4]', "MLlib tests") self.ssc = StreamingContext(self.sc, 1.0)
def save(self, sc, path): """Save this model to the given path.""" if not isinstance(sc, SparkContext): raise TypeError("sc should be a SparkContext, got type %s" % type(sc)) if not isinstance(path, basestring): raise TypeError("path should be a basestring, got type %s" % type(path)) self._java_model.save(sc._jsc.sc(), path)
def _test(): import doctest import pyspark.mllib.recommendation from pyspark.sql import SQLContext globs = pyspark.mllib.recommendation.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)
def createContext(host, port, outputPath): # If you do not see this printed, that means the StreamingContext has been loaded # from the new checkpoint print("Creating new context") if os.path.exists(outputPath): os.remove(outputPath) sc = SparkContext(appName="PythonStreamingRecoverableNetworkWordCount") ssc = StreamingContext(sc, 1) # Create a socket stream on target ip:port and count the # words in input stream of \n delimited text (eg. generated by 'nc') lines = ssc.socketTextStream(host, port) words = lines.flatMap(lambda line: line.split(" ")) wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) def echo(time, rdd): # Get or register the blacklist Broadcast blacklist = getWordBlacklist(rdd.context) # Get or register the droppedWordsCounter Accumulator droppedWordsCounter = getDroppedWordsCounter(rdd.context) # Use blacklist to drop words and use droppedWordsCounter to count them def filterFunc(wordCount): if wordCount[0] in blacklist.value: droppedWordsCounter.add(wordCount[1]) False else: True counts = "Counts at time %s %s" % (time, rdd.filter(filterFunc).collect()) print(counts) print("Dropped %d word(s) totally" % droppedWordsCounter.value) print("Appending to " + os.path.abspath(outputPath)) with open(outputPath, 'a') as f: f.write(counts + "\n") wordCounts.foreachRDD(echo) return ssc
def __init__(self, name, broker, source_topic, destination_topic): sc = SparkContext("local[2]", name) sc.setLogLevel('ERROR') self.ssc = StreamingContext(sc, 5) directKafkaStream = KafkaUtils.createDirectStream( self.ssc, [source_topic], {'metadata.broker.list': broker} ) producer = Producer(broker, destination_topic) process_stream(directKafkaStream, producer)
def main(): sc = SparkContext('local', 'practicas_spark') pr = definir_path_resultados('./resultados') ejercicio_0(sc, pr) ejercicio_1(sc, pr) ejercicio_2(sc, pr) ejercicio_3(sc, pr) ejercicio_4(sc, pr) ejercicio_5(sc, pr) ejercicio_6(sc, pr) ejercicio_7(sc, pr)
def _train_fp_growth_model(cls, data_store, eco_to_package_topic_dict, min_support_count, additional_path, fp_num_partition): sc = SparkContext() manifest_file_list = data_store.list_files( prefix=os.path.join(additional_path, gnosis_constants.MANIFEST_FILEPATH)) list_of_topic_list = list() for manifest_file in manifest_file_list: eco_to_package_list_json_array = data_store.read_json_file( manifest_file) for eco_to_package_list_json in eco_to_package_list_json_array: ecosystem = eco_to_package_list_json.get(gnosis_constants.MANIFEST_ECOSYSTEM) list_of_package_list = eco_to_package_list_json.get( gnosis_constants.MANIFEST_PACKAGE_LIST) for package_list in list_of_package_list: package_list_lowercase = [x.lower() for x in package_list] topic_list = cls.get_topic_list_for_package_list(package_list_lowercase, ecosystem, eco_to_package_topic_dict) list_of_topic_list.append(topic_list) transactions = sc.parallelize(list_of_topic_list) transactions.cache() min_support = float(min_support_count / float(transactions.count())) model = FPGrowth.train(transactions, minSupport=min_support, numPartitions=fp_num_partition) return model
def spark_context(self, application_name): """Create a spark context given the parameters configured in this class. The caller is responsible for calling ``.close`` on the resulting spark context Parameters ---------- application_name : string Returns ------- sc : SparkContext """ # initialize the spark configuration self._init_spark() import pyspark import pyspark.sql # initialize conf spark_conf = pyspark.SparkConf() for k, v in self._spark_conf_helper._conf_dict.items(): spark_conf.set(k, v) log.info("Starting SparkContext") return pyspark.SparkContext(appName=application_name, conf=spark_conf)
def with_spark_context(application_name, conf=None): """Context manager for a spark context Parameters ---------- application_name : string conf : string, optional Returns ------- sc : SparkContext Examples -------- Used within a context manager >>> with with_spark_context("MyApplication") as sc: ... # Your Code here ... pass """ if conf is None: conf = default_configuration assert isinstance(conf, SparkConfiguration) sc = conf.spark_context(application_name) try: yield sc finally: sc.stop()
def with_sql_context(application_name, conf=None): """Context manager for a spark context Returns ------- sc : SparkContext sql_context: SQLContext Examples -------- Used within a context manager >>> with with_sql_context("MyApplication") as (sc, sql_context): ... import pyspark ... # Do stuff ... pass """ if conf is None: conf = default_configuration assert isinstance(conf, SparkConfiguration) sc = conf.spark_context(application_name) import pyspark.sql try: yield sc, pyspark.sql.SQLContext(sc) finally: sc.stop()
def main(): #parameters num_features = 400 #vocabulary size #load data print "loading 20 newsgroups dataset..." categories = ['rec.autos','rec.sport.hockey','comp.graphics','sci.space'] tic = time() dataset = fetch_20newsgroups(shuffle=True, random_state=0, categories=categories, remove=('headers','footers','quotes')) train_corpus = dataset.data # a list of 11314 documents / entries train_labels = dataset.target toc = time() print "elapsed time: %.4f sec" %(toc - tic) #tf-idf vectorizer tfidf = TfidfVectorizer(max_df=0.5, max_features=num_features, \ min_df=2, stop_words='english', use_idf=True) X_tfidf = tfidf.fit_transform(train_corpus).toarray() #append document labels train_labels = train_labels.reshape(-1,1) X_all = np.hstack([train_labels, X_tfidf]) #distribute the data sc = SparkContext('local', 'log_reg') rdd = sc.parallelize(X_all) labeled_corpus = rdd.map(parse_doc) train_RDD, test_RDD = labeled_corpus.randomSplit([8, 2], seed=0) #distributed logistic regression print "training logistic regression..." model = LogisticRegressionWithLBFGS.train(train_RDD, regParam=1, regType='l1', numClasses=len(categories)) #evaluated the model on test data labels_and_preds = test_RDD.map(lambda p: (p.label, model.predict(p.features))) test_err = labels_and_preds.filter(lambda (v, p): v != p).count() / float(test_RDD.count()) print "log-reg test error: ", test_err #model.save(sc, './log_reg_lbfgs_model')
def main(): conf = SparkConf().setMaster("local[2]").setAppName("YelpConsumer") sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 10) kstream = KafkaUtils.createDirectStream(ssc, topics=['yelp-stream'], kafkaParams={"metadata.broker.list": 'localhost:9092'}) parsed_json = kstream.map(lambda (k, v): json.loads(v)) remapped_data = parsed_json.map(remap_elastic) remapped_data.foreachRDD(writeElasticSearch) ssc.start() ssc.awaitTermination()
def setup_env(cls): cls.sc = SparkContext('local[*]', cls.__name__) cls.sql = SQLContext(cls.sc) cls.session = SparkSession.builder.getOrCreate()
def _count_child(job, masterHostname): # noinspection PyUnresolvedReferences from pyspark import SparkContext # start spark context and connect to cluster sc = SparkContext(master='spark://%s:7077' % masterHostname, appName='count_test') # create an rdd containing 0-9999 split across 10 partitions rdd = sc.parallelize(xrange(10000), 10) # and now, count it assert rdd.count() == 10000
def setup_backend(): global backend import pyspark sc = pyspark.SparkContext() from abcpy.backends import BackendSpark as Backend backend = Backend(sc, parallelism=4)
def _default_sparkconf_builder(): """ Build a SparkConf object that can be used for the worker's SparkContext. """ from pyspark import SparkConf return SparkConf().setAppName('SparkCeleryTask') \ .set('spark.dynamicAllocation.minExecutors', 1) \ .set('spark.dynamicAllocation.executorIdleTimeout', 60) \ .set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 3600)
def worker_init(self, loader): """ Initialize Spark config and context now. """ from pyspark import SparkContext from pyspark.sql import SparkSession sparkconf_builder = self.sparkconf_builder or _default_sparkconf_builder self.spark_conf = sparkconf_builder() self.sc = SparkContext(conf=self.spark_conf) self.spark = SparkSession.builder.config(conf=self.spark_conf).getOrCreate()
def __init__(self, _config): self._links = None self._sources = None self._orchestrator = None self.set_links(config.instantiate_components(_config)) def restart_spark(): self._ssc = streamingctx.create_streaming_context( self._sc, _config) self._restart_spark = restart_spark self._sc = pyspark.SparkContext( appName=_config["spark_config"]["appName"]) self._ssc = streamingctx.create_streaming_context(self._sc, _config)
def init_spark_context(): # load spark context conf = SparkConf().setAppName("movie_recommendation-server") # IMPORTANT: pass aditional Python modules to each worker # sc = SparkContext(conf=conf, pyFiles=['rec_engine.py', 'app.py']) sc = SparkContext(conf=conf) return sc
def main(): if len(sys.argv) != 4: print(USAGE) exit(-1) sc = SparkContext(appName="Realtime-Analytics-Engine") ssc = StreamingContext(sc, TIMER) zookeeper, in_topic, out_topic = sys.argv[1:] kvs = KafkaUtils.createStream(ssc, zookeeper, "analytics-engine-consumer", {in_topic: 1}) aggRDD = aggregate(kvs) aggRDD.foreachRDD(lambda rec : publish(rec, out_topic)) ssc.start() ssc.awaitTermination()
def get_logger(self, spark_context=None): """Get logger from SparkContext or (if None) from logging module""" if spark_context is None: return logging.getLogger(self.name) return spark_context._jvm.org.apache.log4j.LogManager \ .getLogger(self.name)
def _test(): import doctest import pyspark.mllib.fpm globs = pyspark.mllib.fpm.__dict__.copy() globs['sc'] = SparkContext('local[4]', 'PythonTest') (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark import SparkContext import pyspark.mllib.evaluation globs = pyspark.mllib.evaluation.__dict__.copy() globs['sc'] = SparkContext('local[4]', 'PythonTest') (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)
def load(cls, sc, path): """Load the GaussianMixtureModel from disk. :param sc: SparkContext :param path: str, path to where the model is stored. """ model = cls._load_java(sc, path) wrapper = sc._jvm.GaussianMixtureModelWrapper(model) return cls(wrapper)
def load(cls, sc, path): """Load the LDAModel from disk. :param sc: SparkContext :param path: str, path to where the model is stored. """ if not isinstance(sc, SparkContext): raise TypeError("sc should be a SparkContext, got type %s" % type(sc)) if not isinstance(path, basestring): raise TypeError("path should be a basestring, got type %s" % type(path)) model = callMLlibFunc("loadLDAModel", sc, path) return LDAModel(model)
def _test(): import doctest from pyspark import SparkContext import pyspark.mllib.classification globs = pyspark.mllib.classification.__dict__.copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)
def _test(): import doctest globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)
def _test(): import doctest from pyspark import SparkContext globs = globals().copy() globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs['sc'].stop() if failure_count: exit(-1)