In Spark < 2.4 you can use an user defined function:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, DataType, StringType
def transform(f, t=StringType()):
if not isinstance(t, DataType):
raise TypeError("Invalid type {}".format(type(t)))
@udf(ArrayType(t))
def _(xs):
if xs is not None:
return [f(x) for x in xs]
return _
foo_udf = transform(str.upper)
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
Considering high cost of explode
+ collect_list
idiom, this approach is almost exclusively preferred, despite its intrinsic cost.
In Spark 2.4 or later you can use transform
* with upper
(see SPARK-23909):
from pyspark.sql.functions import expr
df.withColumn(
'names', expr('transform(names, x -> upper(x))')
).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
It is also possible to use pandas_udf
from pyspark.sql.functions import pandas_udf, PandasUDFType
def transform_pandas(f, t=StringType()):
if not isinstance(t, DataType):
raise TypeError("Invalid type {}".format(type(t)))
@pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
def _(xs):
return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
return _
foo_udf_pandas = transform_pandas(str.upper)
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[JOHN, SAM, JANE] |
|pet |[WHISKERS, ROVER, FIDO]|
+------+-----------------------+
although only the latest Arrow / PySpark combinations support handling ArrayType
columns (SPARK-24259, SPARK-21187). Nonetheless this option should be more efficient than standard UDF (especially with a lower serde overhead) while supporting arbitrary Python functions.
* A number of other higher order functions are also supported, including, but not limited to filter
and aggregate
. See for example
- Querying Spark SQL DataFrame with complex types
- How to slice and sum elements of array column?
- Filter array column content
- Spark Scala row-wise average by handling null.
- How to use transform higher-order function?.