我们从Python开源项目中,提取了以下27个代码示例,用于说明如何使用pyspark.sql.SparkSession()。
def createOrReplaceTempView(self, name): """Creates or replaces a local temporary view with this DataFrame. The lifetime of this temporary table is tied to the :class:`SparkSession` that was used to create this :class:`DataFrame`. >>> df.createOrReplaceTempView("people") >>> df2 = df.filter(df.age > 3) >>> df2.createOrReplaceTempView("people") >>> df3 = spark.sql("select * from people") >>> sorted(df3.collect()) == sorted(df2.collect()) True >>> spark.catalog.dropTempView("people") """ self._jdf.createOrReplaceTempView(name)
def createTempView(self, name): """Creates a local temporary view with this DataFrame. The lifetime of this temporary table is tied to the :class:`SparkSession` that was used to create this :class:`DataFrame`. throws :class:`TempTableAlreadyExistsException`, if the view name already exists in the catalog. >>> df.createTempView("people") >>> df2 = spark.sql("select * from people") >>> sorted(df.collect()) == sorted(df2.collect()) True >>> df.createTempView("people") # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ... AnalysisException: u"Temporary table 'people' already exists;" >>> spark.catalog.dropTempView("people") """ self._jdf.createTempView(name)
def schema(self, schema): """Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema automatically from data. By specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. :param schema: a :class:`pyspark.sql.types.StructType` object """ from pyspark.sql import SparkSession if not isinstance(schema, StructType): raise TypeError("schema should be StructType") spark = SparkSession.builder.getOrCreate() jschema = spark._jsparkSession.parseDataType(schema.json()) self._jreader = self._jreader.schema(jschema) return self
def schema(self, schema): """Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema automatically from data. By specifying the schema here, the underlying data source can skip the schema inference step, and thus speed up data loading. .. note:: Experimental. :param schema: a :class:`pyspark.sql.types.StructType` object >>> s = spark.readStream.schema(sdf_schema) """ from pyspark.sql import SparkSession if not isinstance(schema, StructType): raise TypeError("schema should be StructType") spark = SparkSession.builder.getOrCreate() jschema = spark._jsparkSession.parseDataType(schema.json()) self._jreader = self._jreader.schema(jschema) return self
def process(time, rdd): print("========= %s =========" % str(time)) try: # Get the singleton instance of SparkSession spark = getSparkSessionInstance(rdd.context.getConf()) # Convert RDD[String] to RDD[Row] to DataFrame rowRdd = rdd.map(lambda w: Row(word=w)) wordsDataFrame = spark.createDataFrame(rowRdd) # Creates a temporary view using the DataFrame. wordsDataFrame.createOrReplaceTempView("words") # Do word count on table using SQL and print it wordCountsDataFrame = \ spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() except: pass
def benchmark_spark(ratings, factors, iterations=5): conf = (SparkConf() .setAppName("implicit_benchmark") .setMaster('local[*]') .set('spark.driver.memory', '16G') ) context = SparkContext(conf=conf) spark = SparkSession(context) times = {} try: ratings = convert_sparse_to_dataframe(spark, context, ratings) for rank in factors: als = ALS(rank=rank, maxIter=iterations, alpha=1, implicitPrefs=True, userCol="row", itemCol="col", ratingCol="data") start = time.time() als.fit(ratings) elapsed = time.time() - start times[rank] = elapsed / iterations print("spark. factors=%i took %.3f" % (rank, elapsed/iterations)) finally: spark.stop() return times
def main(): spark = SparkSession \ .builder \ .appName("RandomForest") \ .config("spark.executor.heartbeatInterval","60s")\ .getOrCreate() sc = spark.sparkContext sqlContext = SQLContext(sc) sc.setLogLevel("INFO") train_df = spark.read.parquet(sys.argv[1]) #Persist the data in memory and disk train_df.persist(StorageLevel(True, True, False, False, 1)) rfc = RandomForestClassifier(maxDepth=8, maxBins=2400000, numTrees=128,impurity="gini") rfc_model = rfc.fit(train_df) rfc_model.save(sys.argv[2] + "rfc_model")
def loadMovieNames(): movieNames = {} with open("ml-100k/u.ITEM") as f: for line in f: fields = line.split('|') movieNames[int(fields[0])] = fields[1] return movieNames # Create a SparkSession (the config bit is only for Windows!)
def setUp(self): self.sc = SparkContext('local[4]', "MLlib tests") self.spark = SparkSession(self.sc)
def setUpClass(cls): PySparkTestCase.setUpClass() cls.spark = SparkSession(cls.sc)
def _test(): import doctest from pyspark.context import SparkContext from pyspark.sql import Row, SQLContext, SparkSession import pyspark.sql.dataframe from pyspark.sql.functions import from_unixtime globs = pyspark.sql.dataframe.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') globs['sc'] = sc globs['sqlContext'] = SQLContext(sc) globs['spark'] = SparkSession(sc) globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')])\ .toDF(StructType([StructField('age', IntegerType()), StructField('name', StringType())])) globs['df2'] = sc.parallelize([Row(name='Tom', height=80), Row(name='Bob', height=85)]).toDF() globs['df3'] = sc.parallelize([Row(name='Alice', age=2), Row(name='Bob', age=5)]).toDF() globs['df4'] = sc.parallelize([Row(name='Alice', age=10, height=80), Row(name='Bob', age=5, height=None), Row(name='Tom', age=None, height=None), Row(name=None, age=None, height=None)]).toDF() globs['sdf'] = sc.parallelize([Row(name='Tom', time=1479441846), Row(name='Bob', time=1479442946)]).toDF() (failure_count, test_count) = doctest.testmod( pyspark.sql.dataframe, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) globs['sc'].stop() if failure_count: exit(-1)
def _test(): import doctest import os import tempfile import py4j from pyspark.context import SparkContext from pyspark.sql import SparkSession, Row import pyspark.sql.readwriter os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.readwriter.__dict__.copy() sc = SparkContext('local[4]', 'PythonTest') try: spark = SparkSession.builder.enableHiveSupport().getOrCreate() except py4j.protocol.Py4JError: spark = SparkSession(sc) globs['tempfile'] = tempfile globs['os'] = os globs['sc'] = sc globs['spark'] = spark globs['df'] = spark.read.parquet('python/test_support/sql/parquet_partitioned') (failure_count, test_count) = doctest.testmod( pyspark.sql.readwriter, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) sc.stop() if failure_count: exit(-1)
def queryName(self, queryName): """Specifies the name of the :class:`StreamingQuery` that can be started with :func:`start`. This name must be unique among all the currently active queries in the associated SparkSession. .. note:: Experimental. :param queryName: unique name for the query >>> writer = sdf.writeStream.queryName('streaming_query') """ if not queryName or type(queryName) != str or len(queryName.strip()) == 0: raise ValueError('The queryName must be a non-empty string. Got: %s' % queryName) self._jwrite = self._jwrite.queryName(queryName) return self
def _test(): import doctest import os import tempfile from pyspark.sql import Row, SparkSession, SQLContext import pyspark.sql.streaming os.chdir(os.environ["SPARK_HOME"]) globs = pyspark.sql.streaming.__dict__.copy() try: spark = SparkSession.builder.getOrCreate() except py4j.protocol.Py4JError: spark = SparkSession(sc) globs['tempfile'] = tempfile globs['os'] = os globs['spark'] = spark globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext) globs['sdf'] = \ spark.readStream.format('text').load('python/test_support/sql/streaming') globs['sdf_schema'] = StructType([StructField("data", StringType(), False)]) globs['df'] = \ globs['spark'].readStream.format('text').load('python/test_support/sql/streaming') (failure_count, test_count) = doctest.testmod( pyspark.sql.streaming, globs=globs, optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF) globs['spark'].stop() if failure_count: exit(-1)
def getSparkSessionInstance(sparkConf): if ('sparkSessionSingletonInstance' not in globals()): globals()['sparkSessionSingletonInstance'] = SparkSession\ .builder\ .config(conf=sparkConf)\ .getOrCreate() return globals()['sparkSessionSingletonInstance']
def spark_session(self, application_name): sc = self.spark_context(application_name) from pyspark.sql import SparkSession return SparkSession(sc)
def main(date, bucket, prefix, num_clusters, num_donors, kernel_bandwidth, num_pdf_points): spark = (SparkSession .builder .appName("taar_similarity") .enableHiveSupport() .getOrCreate()) if num_donors < 100: logger.warn("Less than 100 donors were requested.", extra={"donors": num_donors}) num_donors = 100 logger.info("Loading the AMO whitelist...") whitelist = load_amo_external_whitelist() logger.info("Computing the list of donors...") # Compute the donors clusters and the LR curves. cluster_ids, donors_df = get_donors(spark, num_clusters, num_donors, whitelist) lr_curves = get_lr_curves(spark, donors_df, cluster_ids, kernel_bandwidth, num_pdf_points) # Store them. donors = format_donors_dictionary(donors_df) store_json_to_s3(json.dumps(donors, indent=2), 'donors', date, prefix, bucket) store_json_to_s3(json.dumps(lr_curves, indent=2), 'lr_curves', date, prefix, bucket) spark.stop()
def readImages(self, path, recursive=False, numPartitions=-1, dropImageFailures=False, sampleRatio=1.0, seed=0): """ Reads the directory of images from the local or remote source. .. note:: If multiple jobs are run in parallel with different sampleRatio or recursive flag, there may be a race condition where one job overwrites the hadoop configs of another. .. note:: If sample ratio is less than 1, sampling uses a PathFilter that is efficient but potentially non-deterministic. :param str path: Path to the image directory. :param bool recursive: Recursive search flag. :param int numPartitions: Number of DataFrame partitions. :param bool dropImageFailures: Drop the files that are not valid images. :param float sampleRatio: Fraction of the images loaded. :param int seed: Random number seed. :return: a :class:`DataFrame` with a single column of "images", see ImageSchema for details. >>> df = ImageSchema.readImages('python/test_support/image/kittens', recursive=True) >>> df.count() 4 .. versionadded:: 2.3.0 """ ctx = SparkContext._active_spark_context spark = SparkSession(ctx) image_schema = ctx._jvm.org.apache.spark.ml.image.ImageSchema jsession = spark._jsparkSession jresult = image_schema.readImages(path, jsession, recursive, numPartitions, dropImageFailures, float(sampleRatio), seed) return DataFrame(jresult, spark._wrapped)
def __init__(self, sparkContext, **options): super(GlueContext, self).__init__(sparkContext) register(sparkContext) self._glue_scala_context = self._get_glue_scala_context(**options) self.create_dynamic_frame = DynamicFrameReader(self) self.write_dynamic_frame = DynamicFrameWriter(self) self.spark_session = SparkSession(sparkContext, self._glue_scala_context.getSparkSession())
def basic_df_example(spark): # $example on:create_df$ # spark is an existing SparkSession df = spark.read.json("/home/xieenze/Desktop/spark/testweet.json") # Displays the content of the DataFrame to stdout a=df.collect() df.createGlobalTempView("people") # Global temporary view is tied to a system preserved database `global_temp` b=spark.sql("SELECT * FROM global_temp.people").collect() return a,b;
def getspark(): spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example") \ .config("spark.some.config.option", "some-value") \ .getOrCreate() return spark
def get_content(): spark = SparkSession \ .builder \ .appName("Crime Detection") \ .config("cloudant.host", "115.146.94.41:9584/") \ .config("cloudant.username", "cadmin") \ .config("cloudant.password", "qwerty8888") \ .config("cloudant.protocol", "http") \ .config('jsonstore.rdd.partitions', 32) \ .getOrCreate() spark.sql(" CREATE TEMPORARY VIEW tweetTmpView USING com.cloudant.spark OPTIONS ( database 'tweet_raw_crimeresult')") twt = spark.sql('SELECT text,entities,geo,created_at,user FROM tweetTmpView') # hashtag = spark.sql('SELECT hashtags FROM tweetTmpView') # twt.printSchema() # print 'Total # of rows in tweet_raw: ' + str(twt.count()) content = [] for tw in twt.collect(): content_dict = {} if (len(tw.entities.hashtags) == 0): content_dict['twitter_content'] = tw.text.encode('utf-8') content_dict['geo'] = tw.geo content_dict['created_at'] = tw.created_at content_dict['location'] = tw.user.location else: content_dict['twitter_content'] = tw.text.encode('utf-8') content_dict['hashtag'] = tw.entities.hashtags[0].text content_dict['geo'] = tw.geo content_dict['created_at'] = tw.created_at content_dict['location'] = tw.user.location content.append(content_dict) return content
def main(): spark = SparkSession \ .builder \ .appName(app_name) \ .config('cloudant.host', db_host) \ .config('cloudant.username', db_user) \ .config('cloudant.password', db_password) \ .config('cloudant.protocol', 'http') \ .config('jsonstore.rdd.partitions', part_size) \ .getOrCreate() tweet = spark.read.load(db_source, 'com.cloudant.spark') tweet.cache() # tweet.printSchema() if not has_column(tweet, tag_name): filtered_tweet = tweet.filter(tweet.text.rlike(keyword)) sentiment_udf = udf(sentiment, StringType()) df = filtered_tweet.withColumn(tag_name, sentiment_udf(filtered_tweet['text'])) # df.select('_id', 'text', 'tag').show() # print(df.count()) df.write.format('com.cloudant.spark').save(db_target)
def get_content(): spark = SparkSession \ .builder \ .appName("Crime Detection") \ .config("cloudant.host", "115.146.94.41:5000") \ .config("cloudant.username", "cadmin") \ .config("cloudant.password", "qwerty8888") \ .config("cloudant.protocol", "http") \ .config('jsonstore.rdd.partitions', 32) \ .getOrCreate() spark.sql(" CREATE TEMPORARY VIEW tweetTmpView USING com.cloudant.spark OPTIONS ( database 'tweet_raw_trump')") twt = spark.sql('SELECT text,entities,geo,created_at FROM tweetTmpView') # hashtag = spark.sql('SELECT hashtags FROM tweetTmpView') # twt.printSchema() # print 'Total # of rows in tweet_raw: ' + str(twt.count()) content = [] for tw in twt.collect(): content_dict = {} if (len(tw.entities.hashtags) == 0): content_dict['twitter_content'] = tw.text.encode('utf-8') content_dict['geo'] = tw.geo content_dict['created_at'] = tw.created_at else: content_dict['twitter_content'] = tw.text.encode('utf-8') content_dict['hashtag'] = tw.entities.hashtags[0].text content_dict['geo'] = tw.geo content_dict['created_at'] = tw.created_at content.append(content_dict) return content
def main(): spark = SparkSession \ .builder \ .appName("RandomForest") \ .config("spark.executor.heartbeatInterval","60s")\ .getOrCreate() sc = spark.sparkContext sqlContext = SQLContext(sc) sc.setLogLevel("INFO") # Loading the test data df_test= spark.read.parquet(sys.argv[1]) df_test, df_discard = df_test.randomSplit([0.2, 0.8]) # Load the model rf_model=RandomForestClassificationModel.load(sys.argv[2]) # Make the predictions predictions = rf_model.transform(df_test) #predictionsRDD=predictions.rdd #predictionsRDD.saveAsTextFile(sys.argv[3]+"output.text") evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="accuracy") accuracy = evaluator_acc.evaluate(predictions) print "accuracy *******************" print accuracy evaluator_pre = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedPrecision") print "precision *******************" print evaluator_pre.evaluate(predictions) print "recall **********************" print MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="label", metricName="weightedRecall").evaluate(predictions)
def tearDown(self): self.__class__.writer.client.drop_database(self.__class__.influx_options["database"]) # Temporary commented because we don't understand why this test failed # def test_write_rdd_to_influx(self): # struct = ["min_packet_size", "max_traffic"] # config = Config(CONFIG_PATH) # # self.__class__.influx_options = config.content["output"]["options"]["influx"] # # client = InfluxDBClientMock(self.__class__.influx_options["host"], self.__class__.influx_options["port"], # self.__class__.influx_options["username"], # self.__class__.influx_options["password"], # self.__class__.influx_options["database"]) # # self.__class__.writer = InfluxWriter(client, self.__class__.influx_options["database"], # self.__class__.influx_options["measurement"], # struct) # spark = SparkSession \ # .builder \ # .appName("TestInfluxWriter") \ # .getOrCreate() # # array = [("91.221.61.168", 68, 34816), ("192.168.30.2", 185, 189440), # ("91.226.13.80", 1510, 773120), ("217.69.143.60", 74, 37888)] # # sample = spark.sparkContext.parallelize(array).repartition(1) # # write_lambda = self.__class__.writer.get_write_lambda() # write_lambda(sample) # # result = self.__class__.writer.client.query( # "select * from {0}".format(self.__class__.influx_options["measurement"])) # # points = list(result.get_points()) # # self.assertEqual(len(points), len(array), # "In {0} measurement should be written {1} points".format( # self.__class__.influx_options["measurement"], len(array))) # # struct.insert(0, "key") # # for p_index, point in enumerate(points): # for s_index, name in enumerate(struct): # self.assertEqual(point[name], array[p_index][s_index], # "Point {0} field {1} should has value {2}".format(p_index, name, # array[p_index][s_index])) # # spark.stop()
def main(): spark = SparkSession \ .builder \ .appName("RandomForest") \ .config("spark.executor.heartbeatInterval", "60s") \ .getOrCreate() sc = spark.sparkContext sqlContext = SQLContext(sc) sc.setLogLevel("INFO") # Loading the test data df_test = spark.read.parquet(sys.argv[1]) df_test, df_train = df_test.randomSplit([0.3, 0.7]) df_train_indexed=df_train.selectExpr("label as indexedLabel","features as indexedFeatures") df_test_indexed=df_test.selectExpr("label as indexedLabel","features as indexedFeatures") # # Load the model # rf_model = RandomForestClassificationModel.load(sys.argv[2]) # # # Make the predictions # predictions = rf_model.transform(df_test) gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=100,maxBins=24000000) model=gbt.fit(df_train_indexed) predictions = model.transform(df_test_indexed) # predictionsRDD=predictions.rdd # predictionsRDD.saveAsTextFile(sys.argv[3]+"output.text") evaluator_acc = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="indexedLabel", metricName="accuracy") accuracy = evaluator_acc.evaluate(predictions) print "accuracy *******************" print accuracy evaluator_pre = MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="indexedLabel", metricName="weightedPrecision") print "precision *******************" print evaluator_pre.evaluate(predictions) print "recall **********************" print MulticlassClassificationEvaluator(predictionCol="prediction", labelCol="indexedLabel", metricName="weightedRecall").evaluate(predictions)