UDFs
In [1]:
Copied!
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("UDFs")
.master("local[*]")
.getOrCreate()
)
spark
# Spark Session
from pyspark.sql import SparkSession
spark = (
SparkSession
.builder
.appName("UDFs")
.master("local[*]")
.getOrCreate()
)
spark
Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 25/02/16 15:53:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Out[1]:
SparkSession - in-memory
In [4]:
Copied!
# Read employee data
emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"
emp = spark.read.format("csv").option("header", True).schema(emp_schema).load("data/emp.csv")
emp.rdd.getNumPartitions()
# Read employee data
emp_schema = "employee_id string, department_id string, name string, age string, gender string, salary string, hire_date string"
emp = spark.read.format("csv").option("header", True).schema(emp_schema).load("data/emp.csv")
emp.rdd.getNumPartitions()
Out[4]:
1
In [5]:
Copied!
# Create a function to generate 10% of Salary as Bonus
def bonus(salary):
return int(salary) * 0.1
# Create a function to generate 10% of Salary as Bonus
def bonus(salary):
return int(salary) * 0.1
In [6]:
Copied!
from pyspark.sql.functions import udf
from pyspark.sql.functions import udf
In [7]:
Copied!
bonus_udf = udf(bonus)
spark.udf.register("bonus_sql_udf", bonus, "double")
bonus_udf = udf(bonus)
spark.udf.register("bonus_sql_udf", bonus, "double")
Out[7]:
<function __main__.bonus(salary)>
In [8]:
Copied!
# Create new column as bonus using UDF
from pyspark.sql.functions import expr
emp.withColumn("bonus", expr("bonus_sql_udf(salary)")).show()
# Create new column as bonus using UDF
from pyspark.sql.functions import expr
emp.withColumn("bonus", expr("bonus_sql_udf(salary)")).show()
+-----------+-------------+-------------+---+------+------+----------+------+ |employee_id|department_id| name|age|gender|salary| hire_date| bonus| +-----------+-------------+-------------+---+------+------+----------+------+ | 001| 101| John Doe| 30| Male| 50000|2015-01-01|5000.0| | 002| 101| Jane Smith| 25|Female| 45000|2016-02-15|4500.0| | 003| 102| Bob Brown| 35| Male| 55000|2014-05-01|5500.0| | 004| 102| Alice Lee| 28|Female| 48000|2017-09-30|4800.0| | 005| 103| Jack Chan| 40| Male| 60000|2013-04-01|6000.0| | 006| 103| Jill Wong| 32|Female| 52000|2018-07-01|5200.0| | 007| 101|James Johnson| 42| Male| 70000|2012-03-15|7000.0| | 008| 102| Kate Kim| 29|Female| 51000|2019-10-01|5100.0| | 009| 103| Tom Tan| 33| Male| 58000|2016-06-01|5800.0| | 010| 104| Lisa Lee| 27|Female| 47000|2018-08-01|4700.0| | 011| 104| David Park| 38| Male| 65000|2015-11-01|6500.0| | 012| 105| Susan Chen| 31|Female| 54000|2017-02-15|5400.0| | 013| 106| Brian Kim| 45| Male| 75000|2011-07-01|7500.0| | 014| 107| Emily Lee| 26|Female| 46000|2019-01-01|4600.0| | 015| 106| Michael Lee| 37| Male| 63000|2014-09-30|6300.0| | 016| 107| Kelly Zhang| 30|Female| 49000|2018-04-01|4900.0| | 017| 105| George Wang| 34| Male| 57000|2016-03-15|5700.0| | 018| 104| Nancy Liu| 29|Female| 50000|2017-06-01|5000.0| | 019| 103| Steven Chen| 36| Male| 62000|2015-08-01|6200.0| | 020| 102| Grace Kim| 32|Female| 53000|2018-11-01|5300.0| +-----------+-------------+-------------+---+------+------+----------+------+
UDFs take lot of time to deserialize as seen in the UI
In [9]:
Copied!
spark.stop()
spark.stop()
In [ ]:
Copied!