我们从Python开源项目中,提取了以下34个代码示例,用于说明如何使用pyspark.SparkConf()。
def create_sc(): sc_conf = SparkConf() sc_conf.setAppName("finance-similarity-app") sc_conf.setMaster('spark://10.21.208.21:7077') sc_conf.set('spark.executor.memory', '2g') sc_conf.set('spark.executor.cores', '4') sc_conf.set('spark.cores.max', '40') sc_conf.set('spark.logConf', True) print sc_conf.getAll() sc = None try: sc.stop() sc = SparkContext(conf=sc_conf) except: sc = SparkContext(conf=sc_conf) return sc
def get_rdd(es_index, es_type): if es_type is "": resource = es_index else: resource = es_index + "/" + es_type es_read_conf = { "es.nodes": ES_IP, "es.port": ES_PORT, "es.resource": resource, "es.index.read.missing.as.empty": 'yes' } conf = SparkConf().setAppName("Unfetter") sc = SparkContext(conf=conf) rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_read_conf) return rdd
def main(): conf = SparkConf().setMaster("local[2]").setAppName("Streamer") sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 10) # Create a streaming context with batch interval of 10 sec ssc.checkpoint("checkpoint") pwords = load_wordlist("positive.txt") nwords = load_wordlist("negative.txt") ts = time.time() counts = stream(ssc, pwords, nwords, 100) make_plot(counts)
def getOrCreate(cls, checkpointPath, setupFunc): """ Either recreate a StreamingContext from checkpoint data or create a new StreamingContext. If checkpoint data exists in the provided `checkpointPath`, then StreamingContext will be recreated from the checkpoint data. If the data does not exist, then the provided setupFunc will be used to create a new context. @param checkpointPath: Checkpoint directory used in an earlier streaming program @param setupFunc: Function to create a new context and setup DStreams """ cls._ensure_initialized() gw = SparkContext._gateway # Check whether valid checkpoint information exists in the given path ssc_option = gw.jvm.StreamingContextPythonHelper().tryRecoverFromCheckpoint(checkpointPath) if ssc_option.isEmpty(): ssc = setupFunc() ssc.checkpoint(checkpointPath) return ssc jssc = gw.jvm.JavaStreamingContext(ssc_option.get()) # If there is already an active instance of Python SparkContext use it, or create a new one if not SparkContext._active_spark_context: jsc = jssc.sparkContext() conf = SparkConf(_jconf=jsc.getConf()) SparkContext(conf=conf, gateway=gw, jsc=jsc) sc = SparkContext._active_spark_context # update ctx in serializer cls._transformerSerializer.ctx = sc return StreamingContext(sc, None, jssc)
def main(): conf = SparkConf().set("spark.ui.showConsoleProgress", "false") sc = SparkContext(appName="PythonStatusAPIDemo", conf=conf) def run(): rdd = sc.parallelize(range(10), 10).map(delayed(2)) reduced = rdd.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y) return reduced.map(delayed(2)).collect() result = call_in_background(run) status = sc.statusTracker() while result.empty(): ids = status.getJobIdsForGroup() for id in ids: job = status.getJobInfo(id) print("Job", id, "status: ", job.status) for sid in job.stageIds: info = status.getStageInfo(sid) if info: print("Stage %d: %d tasks total (%d active, %d complete)" % (sid, info.numTasks, info.numActiveTasks, info.numCompletedTasks)) time.sleep(1) print("Job results are:", result.get()) sc.stop()
def benchmark_spark(ratings, factors, iterations=5): conf = (SparkConf() .setAppName("implicit_benchmark") .setMaster('local[*]') .set('spark.driver.memory', '16G') ) context = SparkContext(conf=conf) spark = SparkSession(context) times = {} try: ratings = convert_sparse_to_dataframe(spark, context, ratings) for rank in factors: als = ALS(rank=rank, maxIter=iterations, alpha=1, implicitPrefs=True, userCol="row", itemCol="col", ratingCol="data") start = time.time() als.fit(ratings) elapsed = time.time() - start times[rank] = elapsed / iterations print("spark. factors=%i took %.3f" % (rank, elapsed/iterations)) finally: spark.stop() return times
def main(): conf = SparkConf().setMaster("local[2]").setAppName("YelpRecommender") sc = SparkContext(conf=conf) rdd_data = readElasticSearch(sc) parsed_mapped_data = rdd_data.filter(location_recommender) sorted_data = parsed_mapped_data.top(150, key=lambda a: a[1]["stars"]) topn_data = copyUniqueData(sorted_data, 5) printResult(topn_data) clearElasticSearch() sorted_rdd = sc.parallelize(topn_data) es_data = sorted_rdd.map(remap_es) es_data.saveAsNewAPIHadoopFile(path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.resource" : "yelpreco/resturant"})
def run(self): self.args = self.parse_arguments() conf = SparkConf().setAll(( ("spark.task.maxFailures", "10"), ("spark.locality.wait", "20s"), ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"), )) sc = SparkContext( appName=self.name, conf=conf) sqlc = SQLContext(sparkContext=sc) self.records_processed = sc.accumulator(0) self.warc_input_processed = sc.accumulator(0) self.warc_input_failed = sc.accumulator(0) self.run_job(sc, sqlc) sc.stop()
def main(): conf = SparkConf().setAppName("binarize nifti") sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') parser = argparse.ArgumentParser(description='Binarize images using FSL installed in a Docker container') parser.add_argument('threshold', type=int, help="binarization threshold") parser.add_argument('folder_path', type=str, help='folder path containing all of the splits') parser.add_argument('output_path', type=str, help='output folder path') args = parser.parse_args() print args.folder_path client = Config().get_client('dev') nibRDD = sc.binaryFiles(args.folder_path)\ .map(lambda x: get_data(x))\ .map(lambda x: binarize(x, args.threshold))\ .map(lambda x: copy_to_hdfs(x, args.output_path, client)).collect()
def main(): input = sys.argv[1] output = sys.argv[2] conf = SparkConf().setAppName('Sparse Matrix Multiplication') sc = SparkContext(conf=conf) assert sc.version >= '1.5.1' sparseMatrix = sc.textFile(input).map(lambda row : row.split(' ')).map(createCSRMatrix).map(multiplyMatrix).reduce(operator.add) outputFile = open(output, 'w') for row in range(len(sparseMatrix.indptr)-1): col = sparseMatrix.indices[sparseMatrix.indptr[row]:sparseMatrix.indptr[row+1]] data = sparseMatrix.data[sparseMatrix.indptr[row]:sparseMatrix.indptr[row+1]] indexValuePairs = zip(col,data) formattedOutput = formatOutput(indexValuePairs) outputFile.write(formattedOutput + '\n')
def setUp(self): # Create a local Spark context with 4 cores spark_conf = SparkConf().setMaster('local[4]').\ setAppName("monasca-transform unit tests").\ set("spark.sql.shuffle.partitions", "10") self.spark_context = SparkContext.getOrCreate(conf=spark_conf) # quiet logging logger = self.spark_context._jvm.org.apache.log4j logger.LogManager.getLogger("org").setLevel(logger.Level.WARN) logger.LogManager.getLogger("akka").setLevel(logger.Level.WARN)
def spark_context(self, application_name): """Create a spark context given the parameters configured in this class. The caller is responsible for calling ``.close`` on the resulting spark context Parameters ---------- application_name : string Returns ------- sc : SparkContext """ # initialize the spark configuration self._init_spark() import pyspark import pyspark.sql # initialize conf spark_conf = pyspark.SparkConf() for k, v in self._spark_conf_helper._conf_dict.items(): spark_conf.set(k, v) log.info("Starting SparkContext") return pyspark.SparkContext(appName=application_name, conf=spark_conf)
def main(): conf = SparkConf().setMaster("local[2]").setAppName("YelpConsumer") sc = SparkContext(conf=conf) ssc = StreamingContext(sc, 10) kstream = KafkaUtils.createDirectStream(ssc, topics=['yelp-stream'], kafkaParams={"metadata.broker.list": 'localhost:9092'}) parsed_json = kstream.map(lambda (k, v): json.loads(v)) remapped_data = parsed_json.map(remap_elastic) remapped_data.foreachRDD(writeElasticSearch) ssc.start() ssc.awaitTermination()
def _default_sparkconf_builder(): """ Build a SparkConf object that can be used for the worker's SparkContext. """ from pyspark import SparkConf return SparkConf().setAppName('SparkCeleryTask') \ .set('spark.dynamicAllocation.minExecutors', 1) \ .set('spark.dynamicAllocation.executorIdleTimeout', 60) \ .set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 3600)
def sparkconfig_builder(): from pyspark import SparkConf return SparkConf().setAppName('SparkCeleryTask') \ .set('spark.dynamicAllocation.enabled', 'true') \ .set('spark.dynamicAllocation.schedulerBacklogTimeout', 1) \ .set('spark.dynamicAllocation.minExecutors', 1) \ .set('spark.dynamicAllocation.executorIdleTimeout', 20) \ .set('spark.dynamicAllocation.cachedExecutorIdleTimeout', 60)
def init_spark_context(): # load spark context conf = SparkConf().setAppName("movie_recommendation-server") # IMPORTANT: pass aditional Python modules to each worker # sc = SparkContext(conf=conf, pyFiles=['rec_engine.py', 'app.py']) sc = SparkContext(conf=conf) return sc
def __init__(self): conf = SparkConf().setAppName("ntu-speech").setMaster("local") self.sc = SparkContext(conf=conf) self.sqlCtx = SQLContext(self.sc)
def getSparkConf(mode="mesos", node=0): """ get the spark configuration according to the setting :param mode: :param node: :return: """ global options ''' get spark configuration ''' sconf=SparkConf() ''' set spark configuration ''' sconf.setAppName("%s" % (str(options))) # set run mode, now only support spark standalone and mesos coarse mode if (mode == 'spark'): sconf.setMaster(Setting.SPARK_STANDALONE_URL) elif (mode == 'mesos'): sconf.setMaster(Setting.MESOS_COARSE_URL) # sconf.set("spark.mesos.coarse", "false") sconf.set("spark.mesos.coarse", "true") sconf.set("spark.mesos.executor.home", Setting.SPARK_HOME) else: print("****unknown mode") exit(0) # set core limit if need if (0 >= node): print "****Spark:no cores max" else: sconf.set("spark.cores.max", "%d" % (options.cpu * node)) return sconf
def main(): conf = SparkConf().setAppName("binarize nifti") sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') parser = argparse.ArgumentParser(description='Binarize images') parser.add_argument('threshold', type=int, help="binarization threshold") parser.add_argument('folder_path', type=str, help='folder path containing all of the splits') parser.add_argument('output_path', type=str, help='output folder path') parser.add_argument('num', type=int,choices=[2,4,6,8], help='number of binarization operations') parser.add_argument('-m', '--in_memory', type=bool, default=True, help='in memory computation') args = parser.parse_args() nibRDD = sc.binaryFiles(args.folder_path)\ .map(lambda x: get_data(x)) client = Config().get_client('dev') if args.in_memory == 'True': print "Performing in-memory computations" for i in xrange(num - 1): nibRDD = nibRDD.map(lambda x: binarize(x, args.threshold)) nibRDD = nibRDD.map(lambda x: binarize_and_save(x, args.threshold, args.output_path, client)).collect() else: print "Writing intermediary results to disk and loading from disk" binRDD = nibRDD.map(lambda x: binarize_and_save(x, args.threshold, args.output_path + "1", client)).collect() for i in xrange(num - 1): binRDD = sc.binaryFiles(args.output_path + "1")\ .map(lambda x: get_data(x))\ .map(lambda x: binarize_and_save(x, args.threshold, args.output_path + "1", client)).collect()
def main(): # Arguments parsing parser=argparse.ArgumentParser() # Required inputs parser.add_argument("bids_app_boutiques_descriptor", help="Boutiques descriptor of the BIDS App that will process the dataset.") parser.add_argument("bids_dataset", help="BIDS dataset to be processed.") parser.add_argument("output_dir", help="Output directory.") # Optional inputs parser.add_argument("--skip-participant-analysis", action = 'store_true', help="Skips participant analysis.") parser.add_argument("--skip-group-analysis", action = 'store_true', help="Skips groups analysis.") parser.add_argument("--skip-participants", metavar="FILE", type=lambda x: is_valid_file(parser, x), help="Skips participant labels in the text file.") parser.add_argument("--hdfs", action = 'store_true', help="Passes data by value rather than by reference in the pipeline. Use it with HDFS only. Requires HDFS to be started.") args=parser.parse_args() spark_bids = SparkBIDS(args.bids_app_boutiques_descriptor, args.bids_dataset, args.output_dir, { 'use_hdfs': args.hdfs, 'skip_participant_analysis': args.skip_participant_analysis, 'skip_group_analysis': args.skip_group_analysis, 'skip_participants_file': args.skip_participants}) sc = None if spark_bids.spark_required(): # Spark initialization conf = SparkConf().setAppName("BIDS pipeline") sc = SparkContext(conf=conf) # Run! spark_bids.run(sc) # Execute program
def main(): conf = SparkConf().setMaster("local[2]").setAppName("Streamer") sc = SparkContext(conf=conf) # Creating a streaming context with batch interval of 10 sec ssc = StreamingContext(sc, 10) ssc.checkpoint("checkpoint") pwords = load_wordlist("./Dataset/positive.txt") nwords = load_wordlist("./Dataset/negative.txt") counts = stream(ssc, pwords, nwords, 100) make_plot(counts)
def __init__(self, additional_options=None): os.environ['PYSPARK_PYTHON'] = sys.executable submit_args = [ self._setup_repositories(), self._setup_packages(), self._setup_jars(), 'pyspark-shell', ] os.environ['PYSPARK_SUBMIT_ARGS'] = ' '.join(filter(None, submit_args)) def _create_spark_context(): spark_conf = SparkConf() spark_conf.set('spark.sql.catalogImplementation', 'hive') spark_conf.setAll(self._setup_options(additional_options)) return SparkContext(conf=spark_conf) # If we are in instant testing mode if InstantTesting.is_activated(): spark_context = InstantTesting.get_context() # It's the first run, so we have to create context and demonise the process. if spark_context is None: spark_context = _create_spark_context() if os.fork() == 0: # Detached process. signal.pause() else: InstantTesting.set_context(spark_context) else: spark_context = _create_spark_context() # Init HiveContext super(SparklySession, self).__init__(spark_context) self._setup_udfs() self.read_ext = SparklyReader(self) self.catalog_ext = SparklyCatalog(self) attach_writer_to_dataframe()
def setUp(self): super(TestSparklySession, self).setUp() self.spark_conf_mock = mock.Mock(spec=SparkConf) self.spark_context_mock = mock.Mock(spec=SparkContext) self.patches = [ mock.patch('sparkly.session.SparkConf', self.spark_conf_mock), mock.patch('sparkly.session.SparkContext', self.spark_context_mock), ] [p.start() for p in self.patches]
def setUpClass(cls): master = os.getenv('MASTER') assert master is not None, "Please start a Spark standalone cluster and export MASTER to your env." num_workers = os.getenv('SPARK_WORKER_INSTANCES') assert num_workers is not None, "Please export SPARK_WORKER_INSTANCES to your env." cls.num_workers = int(num_workers) spark_jars = os.getenv('SPARK_CLASSPATH') assert spark_jars and 'tensorflow-hadoop' in spark_jars, "Please add path to tensorflow-hadoop-*.jar to SPARK_CLASSPATH." cls.conf = SparkConf().set('spark.jars', spark_jars) cls.sc = SparkContext(master, cls.__name__, conf=cls.conf) cls.spark = SparkSession.builder.getOrCreate()
def init_spark_context(): # load spark context conf = SparkConf().setAppName("movie_recommendation-server") # IMPORTANT: pass aditional Python modules to each worker sc = SparkContext(conf=conf, pyFiles=['engine.py', 'app.py']) return sc
def main(): input = sys.argv[1] output = sys.argv[2] conf = SparkConf().setAppName('Matrix Multiplication') sc = SparkContext(conf=conf) assert sc.version >= '1.5.1' row = sc.textFile(input).map(lambda row : row.split(' ')).cache() ncol = len(row.take(1)[0]) intermediateResult = row.map(permutation).reduce(add_tuples) outputFile = open(output, 'w') result = [intermediateResult[x:x+3] for x in range(0, len(intermediateResult), ncol)] for row in result: for element in row: outputFile.write(str(element) + ' ') outputFile.write('\n') outputFile.close() # outputResult = sc.parallelize(result).coalesce(1) # outputResult.saveAsTextFile(output)
def _create_sql_context(self): """ Create a new SQL context within a new Spark context. Import of classes from pyspark has to be pushed down into this method as Spark needs to be available in order for the libraries to be imported successfully. Since Spark is not available when the ETL is started initally, we delay the import until the ETL has restarted under Spark. Side-effect: Logging is configured by the time that pyspark is loaded so we have some better control over filters and formatting. """ from pyspark import SparkConf, SparkContext, SQLContext if "SPARK_ENV_LOADED" not in os.environ: self.logger.warning("SPARK_ENV_LOADED is not set") self.logger.info("Starting SparkSQL context") conf = (SparkConf() .setAppName(__name__) .set("spark.logConf", "true")) sc = SparkContext(conf=conf) # Copy the credentials from the session into hadoop for access to S3 session = boto3.Session() credentials = session.get_credentials() hadoopConf = sc._jsc.hadoopConfiguration() hadoopConf.set("fs.s3a.access.key", credentials.access_key) hadoopConf.set("fs.s3a.secret.key", credentials.secret_key) return SQLContext(sc)
def spark_session_create(self, app_name): """ spark Loader Class creadted for the purpose of handling Spark Jobs """ tfmsa_logger("Spark Session Created") conf = SparkConf() conf.setMaster('spark://{0}'.format(settings.SPARK_HOST)) conf.setAppName(app_name) conf.set('spark.driver.cores', settings.SPARK_CORE) conf.set('spark.driver.memory', settings.SPARK_MEMORY) conf.set('spark.executor.cores', settings.SPARK_WORKER_CORE) conf.set('spark.executor.memory', settings.SPARK_WORKER_MEMORY) conf.set('spark.driver.allowMultipleContexts', "true") return SparkContext(conf=conf)
def create_session(self): """ spark Loader Class creadted for the purpose of handling Spark Jobs """ try : tfmsa_logger("Spark Session Created") # #tfmsa_logger("spark_context : {0}".format(spark_context)) # if (isinstance(spark_context, (SparkContext))): # return spark_context conf = SparkConf() conf.setMaster('spark://{0}'.format(settings.SPARK_HOST)) conf.setAppName("tfmsa_session_manager") conf.set('spark.driver.cores', settings.SPARK_CORE) conf.set('spark.driver.memory', settings.SPARK_MEMORY) conf.set('spark.executor.cores', settings.SPARK_WORKER_CORE) conf.set('spark.executor.memory', settings.SPARK_WORKER_MEMORY) #conf.set('spark.driver.allowMultipleContexts', "true") SparkSession.spark_context = SparkContext(conf=conf) return spark_context except Exception as e : # tfmsa_logger(e) # raise Exception(e) return spark_context
def init_spark_context(): # load spark context conf = SparkConf().setAppName("movie_recommendation-server") # IMPORTANT: pass aditional Python modules to each worker sc = SparkContext(conf=conf, pyFiles=['engine.py', 'app.py', 'util.py']) return sc
def invoke(): # object to keep track of offsets ConfigInitializer.basic_config() # app name application_name = "mon_metrics_kafka" my_spark_conf = SparkConf().setAppName(application_name) spark_context = SparkContext(conf=my_spark_conf) # read at the configured interval spark_streaming_context = \ StreamingContext(spark_context, cfg.CONF.service.stream_interval) kafka_stream = MonMetricsKafkaProcessor.get_kafka_stream( cfg.CONF.messaging.topic, spark_streaming_context) # transform to recordstore MonMetricsKafkaProcessor.transform_to_recordstore(kafka_stream) # catch interrupt, stop streaming context gracefully # signal.signal(signal.SIGINT, signal_handler) # start processing spark_streaming_context.start() # FIXME: stop spark context to relinquish resources # FIXME: specify cores, so as not to use all the resources on the cluster. # FIXME: HA deploy multiple masters, may be one on each control node try: # Wait for the Spark driver to "finish" spark_streaming_context.awaitTermination() except Exception as e: MonMetricsKafkaProcessor.log_debug( "Exception raised during Spark execution : " + str(e)) # One exception that can occur here is the result of the saved # kafka offsets being obsolete/out of range. Delete the saved # offsets to improve the chance of success on the next execution. # TODO(someone) prevent deleting all offsets for an application, # but just the latest revision MonMetricsKafkaProcessor.log_debug( "Deleting saved offsets for chance of success on next execution") MonMetricsKafkaProcessor.reset_kafka_offsets(application_name) # delete pre hourly processor offsets if cfg.CONF.stage_processors.pre_hourly_processor_enabled: PreHourlyProcessor.reset_kafka_offsets()
def train(sc=None, user=None, name='spark_mnist', server_host='localhost', server_port=10080, sync_interval=100, batch_size=100, num_partition=1, num_epoch=1, server_reusable=True): is_new_sc = False if sc is None: sc = pyspark.SparkContext(conf=pyspark.SparkConf()) is_new_sc = True image_rdd = extract_images(sc, train_image_path) label_rdd = extract_labels(sc, train_label_path, num_class=10, one_hot=True) image_label_rdd = image_rdd.join(label_rdd, numPartitions=num_partition).mapPartitions(flatten_image_label).cache() x = tf.placeholder(tf.float32, [None, 784]) W = tf.Variable(tf.zeros([784, 10])) b = tf.Variable(tf.zeros([10])) y = tf.nn.softmax(tf.matmul(x, W) + b) y_ = tf.placeholder(tf.float32, [None, 10]) cross_entropy = tf.reduce_mean(-tf.reduce_sum(y_ * tf.log(y), reduction_indices=[1])) train_step = tf.train.GradientDescentOptimizer(0.5).minimize(cross_entropy) init = tf.initialize_all_variables() sess = tf.Session() sess.run(init) feed_name_list = [x.name, y_.name] param_list = [W, b] spark_sess = sps.SparkSession(sc, sess, user=user, name=name, server_host=server_host, server_port=server_port, sync_interval=sync_interval, batch_size=batch_size) partitioner = par.RandomPartitioner(num_partition) combiner = comb.DeltaWeightCombiner() for i in range(num_epoch): spark_sess.run(train_step, feed_rdd=image_label_rdd, feed_name_list=feed_name_list, param_list=param_list, weight_combiner=combiner, shuffle_within_partition=True, server_reusable=server_reusable) if i != num_epoch-1: temp_image_label_rdd = image_label_rdd.partitionBy(num_partition, partitioner).cache() image_label_rdd.unpersist() image_label_rdd = temp_image_label_rdd # Since the parameter server is reusable in this spark_sess.run() example, one should stop the parameter server manually when it is no long used. if server_reusable: spark_sess.stop_param_server() if is_new_sc: sc.close() from tensorflow.examples.tutorials.mnist import input_data mnist_data = input_data.read_data_sets("MNIST_data/", one_hot=True) correct_prediction = tf.equal(tf.argmax(y,1), tf.argmax(y_,1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) print(sess.run(accuracy, feed_dict={x: mnist_data.test.images, y_: mnist_data.test.labels})) spark_sess.close()