我们从Python开源项目中,提取了以下10个代码示例,用于说明如何使用pyspark.sql.Column()。
def add_meta(sc, col, metadata): """Add metadata to a column Adds metadata to a column for describing extra properties. This metadata survives serialization from dataframe to parquet and back to dataframe. Any manipulation of the column, such as aliasing, will lose the metadata. Parameters ---------- sc : pyspark.SparkContext col : pyspark.sql.Column metadata : dict Returns ------- pyspark.sql.Column """ meta = sc._jvm.org.apache.spark.sql.types \ .Metadata.fromJson(json.dumps(metadata)) return Column(getattr(col._jc, 'as')('', meta))
def wrap_function_cols(self, name, package_name=None, object_name=None, java_class_instance=None, doc=""): """Utility method for wrapping a scala/java function that returns a spark sql Column. This assumes that the function that you are wrapping takes a list of spark sql Column objects as its arguments. """ def _(*cols): jcontainer = self.get_java_container(package_name=package_name, object_name=object_name, java_class_instance=java_class_instance) # Ensure that your argument is a column col_args = [col._jc if isinstance(col, Column) else _make_col(col)._jc for col in cols] function = getattr(jcontainer, name) args = col_args jc = function(*args) return Column(jc) _.__name__ = name _.__doc__ = doc return _
def at_least_n_distinct(col, limit): """Count distinct that works with windows The standard distinct count in spark sql can't be applied in a window. This implementation allows that to work """ sc = SparkContext._active_spark_context j_cols = _to_seq(sc, [_to_java_column(col), _to_java_column(F.lit(limit))]) jc = sc._jvm.org.wikimedia.search.mjolnir.AtLeastNDistinct().apply(j_cols) return Column(jc)
def test_column_operators(self): ci = self.df.key cs = self.df.value c = ci == cs self.assertTrue(isinstance((- ci - 1 - 2) % 3 * 2.5 / 3.5, Column)) rcc = (1 + ci), (1 - ci), (1 * ci), (1 / ci), (1 % ci), (1 ** ci), (ci ** 1) self.assertTrue(all(isinstance(c, Column) for c in rcc)) cb = [ci == 5, ci != 0, ci > 3, ci < 4, ci >= 0, ci <= 7] self.assertTrue(all(isinstance(c, Column) for c in cb)) cbool = (ci & ci), (ci | ci), (~ci) self.assertTrue(all(isinstance(c, Column) for c in cbool)) css = cs.like('a'), cs.rlike('a'), cs.asc(), cs.desc(), cs.startswith('a'), cs.endswith('a') self.assertTrue(all(isinstance(c, Column) for c in css)) self.assertTrue(isinstance(ci.cast(LongType()), Column))
def test_access_column(self): df = self.df self.assertTrue(isinstance(df.key, Column)) self.assertTrue(isinstance(df['key'], Column)) self.assertTrue(isinstance(df[0], Column)) self.assertRaises(IndexError, lambda: df[2]) self.assertRaises(AnalysisException, lambda: df["bad_key"]) self.assertRaises(TypeError, lambda: df[{}])
def wrap_spark_sql_udf(self, name, package_name=None, object_name=None, java_class_instance=None, doc=""): """Wraps a scala/java spark user defined function """ def _(*cols): jcontainer = self.get_java_container(package_name=package_name, object_name=object_name, java_class_instance=java_class_instance) # Ensure that your argument is a column function = getattr(jcontainer, name) judf = function() jc = judf.apply(self.to_scala_seq([_to_java_column(c) for c in cols])) return Column(jc) _.__name__ = name _.__doc__ = doc return _
def test_list_columns(self): from pyspark.sql.catalog import Column spark = self.spark spark.catalog._reset() spark.sql("CREATE DATABASE some_db") spark.sql("CREATE TABLE tab1 (name STRING, age INT)") spark.sql("CREATE TABLE some_db.tab2 (nickname STRING, tolerance FLOAT)") columns = sorted(spark.catalog.listColumns("tab1"), key=lambda c: c.name) columnsDefault = sorted(spark.catalog.listColumns("tab1", "default"), key=lambda c: c.name) self.assertEquals(columns, columnsDefault) self.assertEquals(len(columns), 2) self.assertEquals(columns[0], Column( name="age", description=None, dataType="int", nullable=True, isPartition=False, isBucket=False)) self.assertEquals(columns[1], Column( name="name", description=None, dataType="string", nullable=True, isPartition=False, isBucket=False)) columns2 = sorted(spark.catalog.listColumns("tab2", "some_db"), key=lambda c: c.name) self.assertEquals(len(columns2), 2) self.assertEquals(columns2[0], Column( name="nickname", description=None, dataType="string", nullable=True, isPartition=False, isBucket=False)) self.assertEquals(columns2[1], Column( name="tolerance", description=None, dataType="float", nullable=True, isPartition=False, isBucket=False)) self.assertRaisesRegexp( AnalysisException, "tab2", lambda: spark.catalog.listColumns("tab2")) self.assertRaisesRegexp( AnalysisException, "does_not_exist", lambda: spark.catalog.listColumns("does_not_exist"))
def switch_case(switch, case=None, default=None, **additional_cases): """Switch/case style column generation. Args: switch (str, pyspark.sql.Column): column to "switch" on; its values are going to be compared against defined cases. case (dict): case statements. When a key matches the value of the column in a specific row, the respective value will be assigned to the new column for that row. This is useful when your case condition constants are not strings. default: default value to be used when the value of the switch column doesn't match any keys. additional_cases: additional "case" statements, kwargs style. Same semantics with cases above. If both are provided, cases takes precedence. Returns: pyspark.sql.Column Example: ``switch_case('state', CA='California', NY='New York', default='Other')`` is equivalent to >>> F.when( ... F.col('state') == 'CA', 'California' ).when( ... F.col('state') == 'NY', 'New York' ).otherwise('Other') """ if not isinstance(switch, Column): switch = F.col(switch) def _column_or_lit(x): return F.lit(x) if not isinstance(x, Column) else x def _execute_case(accumulator, case): # transform the case to a pyspark.sql.functions.when statement, # then chain it to existing when statements condition_constant, assigned_value = case when_args = (switch == F.lit(condition_constant), _column_or_lit(assigned_value)) return accumulator.when(*when_args) cases = case or {} for conflict in set(cases.keys()) & set(additional_cases.keys()): del additional_cases[conflict] cases = list(cases.items()) + list(additional_cases.items()) default = _column_or_lit(default) if not cases: return default result = reduce(_execute_case, cases, F).otherwise(default) return result