from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, min, max, lead, lag, lit, date_format
import pandas as pd
# create spark session for Dataframe
spark = SparkSession.builder.appName('Practice').getOrCreate()
# create spark context for RDD
sc = spark.sparkContext
RDD (Resilient Distributed Datasets):¶
a. Transformations vs Actions¶
1. Transformations:¶
- PySpark transformation functions which produce RDDs, DataFrames or DataSets in results.
-
Transformations are lazy operations meaning none of the transformations get executed until you call an action on Spark RDD.
1.1 Narrow Transformations:
- These types of transformations convert each input partition to only one output partition.
- When each partition at the parent RDD is used by at most one partition of the child RDD or when each partition from child produced or dependent on single parent RDD.
- This kind of transformation is basically fast.
- Does not require any data shuffling over the cluster network or no data movement.
-
in RDD:
map()
filter()
flatMap()
sample()
union()
-
in Dataframe
select()
filter()
orwhere()
withColumn()
drop()
alias()
sample()
-
1.2 Wide Transforamtions:
-
in rdd
groupByKey()
aggregateByKey()
aggregate()
join()
repartition()
-
in dataframe
groupBy()
agg()
(aftergroupBy()
)rollup()
(followed by aggregation)orderBy()
(across partitions)join()
with different join typesrepartition()
coalesce()
-
2. Actions:¶
- PySpark actions produce a computed value back to the Spark driver program.
-
RDD Actions
collect()
count()
first()
top()
min()
max()
take()
-
Dataframe Actions
show(n)
count()
first()
andhead()
collect()
take(n)
foreach(func)
write.format(...).save(path)
agg(...)
b. Repartition vs Coalesce¶
1. Repartition¶
- Spark RDD
repartition()
method is used to increase or decrease the partitions.
2. Coalesce¶
- Spark RDD
coalesce()
is used only to reduce the number of partitions. - This is optimized or improved version of
repartition()
where the movement of the data across the partitions is lower using coalesce.
c. Cache and Persist¶
- Caching or persistence are optimization techniques for (iterative and interactive) Spark computations.
- They help saving interim partial results so they can be reused in subsequent stages.
- These interim results as RDDs are thus kept in memory (default) or more solid storage like disk and/or replicated.
- RDDs can be cached using cache operation.
- They can also be persisted using persist operation.
- With
cache()
, you use only the default storage level :- MEMORY_ONLY for RDD
- MEMORY_AND_DISK for Dataset
- With
persist()
, you can specify which storage level you want for both RDD and Dataset.
d. Broadcasting¶
- Broadcast variables allow the developers to cache a copy of the read-only variable on each machine/node rather than moving the copy of it with tasks.
e. Spark Workflow¶
- ### 1. Driver program:
- The driver program is the main program that is executed on the driver node.
- It is responsible for creating the SparkContext object, which is the entry point to the Spark API. ### 2. DAG creation:
- The driver program creates a DAG of tasks to be executed.
- The DAG is a directed acyclic graph, which means that there are no circular dependencies between tasks. ### 3. Task scheduling:
- Once the DAG is created, the driver program schedules the tasks to be executed on the worker nodes.
- The driver program takes into account the resources available on each worker node when scheduling the tasks. ### 4. Task execution:
- The tasks are then executed on the worker nodes.
- The worker nodes are responsible for actually processing the data and performing the computations. ### 5. Result aggregation:
- Once the tasks are completed, the driver program aggregates the results from the worker nodes.
- The driver program then returns the results to the user.
f. Stages & Task Creation¶
- Each spark action results in one or more stages and these stages are divided based on transformations that have narrow dependedncies and wide dependencies.
- ex.
map()
tranformation has a narrow dependency, meaning each output partition depends on only one input partition. Thus our job will have only one stage. - Each stage is divided into tasks. A task represents the computation applied to a single partition of the RDD.
- The number of task equal to the number of partitions in the RDD
- By default when an RDD is created using parallelize without specifying a number of partitions spark might use a default value typically the num,ber of available cores.
g. Executor¶
- Executors are responsible for running tasks on cluster nodes
- An executor can run multiple tasks concurrently.
- If an executor is allocated 5 cores it can run upto 5 tasks at the same time
- The driver program communicates with the executors to schedule tasks on them & collect results.
h. Executor memory and Storage¶
- e.g. 5 node cluster
- Available : cores per node = 16 cores
Memory per node - 64 GB
- Reseverd: 1 core per node for OS and daemons
1 GB per node
- Executor: common practice is to have 5 cores per executor i.e. 3 executors per node total executors = 3 * 5 = 15 executors
-
Reserved 1 executor for app management i.e. number of executors 14
-
executor memory per node = 63 GB/5 executors = 21 GB per executor leave some heap memory i.e. 20 GB
i.e final configuration
--num-executors 14 -- executor-memory 20G --executor-cores 5
Spark Files¶
- This method is particularly useful when running Spark on a cluster, as it ensures that the file is available on all nodes where the tasks will run.
- By using SparkFiles, you can ensure that your external files are available to your Spark application regardless of where in the cluster your tasks are executed.
- You can add files to your Spark job using the
SparkContext.addFile()
method. - This method uploads the specified file to the Spark master, and then it is transferred to all the worker nodes before the job starts.
- Once a file has been added using
addFile()
, it can be accessed from worker nodes using theSparkFiles.get()
method. - This method provides the file's local path on each node, enabling your Spark tasks to read from it as though it was a local file.
- Common use cases for SparkFiles include distributing lookup tables, machine learning models, or any other necessary files that your Spark jobs need to access.
Serializers¶
- #### 1. Default PickleSerializer:
- For general use cases where custom serialization is not needed. #### 2. MarshalSerializer:
- When working with simpler data types and needing slightly better performance than pickle. #### 3. KryoSerializer:
- For large datasets and when performance is critical.
- Kryo is often used in Spark applications dealing with large-scale data processing. #### 4. Custom Serializers:
- In more advanced scenarios, you might even need to implement custom serializers for specific use cases or optimize serialization for custom objects.
- However, this is usually only necessary in very specific or high-performance scenarios.
from pyspark.serializer import KryoSerializer
conf = SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc = SparkContext(conf=conf)
or
from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "Marshal Serialization", serializer = MarshalSerializer())
PySpark RDD¶
- PySpark RDDs have the following advantages: #### 1. In-Memory Processing:
- PySpark’s RDD helps in loading data from the disk to the memory.
- The RDDs can even be persisted in the memory for reusing the computations. #### 2. Immutability:
- The RDDs are immutable which means that once created, they cannot be modified.
- While applying any transformation operations on the RDDs, a new RDD would be created. #### 3. Fault Tolerance:
- The RDDs are fault-tolerant.
- This means that whenever an operation fails, the data gets automatically reloaded from other available partitions.
- This results in seamless execution of the PySpark applications. #### 4. Lazy Evolution:
- The PySpark transformation operations are not performed as soon as they are encountered.
- The operations would be stored in the DAG and are evaluated once it finds the first RDD action. #### 5. Partitioning:
- Whenever RDD is created from any data, the elements in the RDD are partitioned to the cores available by default.
Shared Variables (Broadcast, Accumulators)¶
- Normally, when you run a Spark operation, each node in the cluster works on its own copy of variables.
- However, for certain tasks, you might want to share some state or data across tasks or nodes.
- PySpark provides two types of shared variables: broadcast variables and accumulators. #### 1. Broadcast Variables
- Broadcast variables are used to save a copy of a large value (like a lookup table) in every node's memory, instead of shipping a copy of it with tasks.
- They are useful when tasks across multiple stages need the same data.
- They should be used when the same data is needed by all or many of the tasks.
- They help in reducing network I/O and memory usage. #### 2. Accumulators
- Accumulators are used for aggregating the information across tasks.
- They are write-only variables for the executors (nodes processing the data) and readable only by the driver program.
- They are typically used for counters and sums.
-
Useful for aggregating data from worker nodes to the driver node.
-
Broadcast Variable
from pyspark import SparkContext sc = SparkContext("local", "Broadcast Example") large_lookup_table = {"key1": "value1", "key2": "value2"} # Large dataset broadcast_var = sc.broadcast(large_lookup_table) # Accessing the broadcast variable in an operation rdd = sc.parallelize([1, 2]) result = rdd.map(lambda x: (x, broadcast_var.value.get(f"key{x}", None))).collect() print(result) # Output will use values from the broadcasted variable
-
Accumulator Variable
from pyspark import SparkContext sc = SparkContext("local", "Accumulator Example") accum = sc.accumulator(0) rdd = sc.parallelize([1, 2, 3, 4, 5]) rdd.foreach(lambda x: accum.add(x)) # The value of accum is available in the driver print(accum.value) # Output will be the sum of the RDD elements
Create an RDD¶
rdd = sc.parallelize([1,2,3,4])
rdd.collect()
[1, 2, 3, 4]
Create an RDD from text file¶
rdd = sc.textFile(r"C:\Users\uif52518\Desktop\interview\data\sample1.txt")
Word Count Program¶
rdd = rdd.flatMap(lambda line: line.split(" "))
print('flatmap: ', rdd.collect())
rdd = rdd.map(lambda word: (word, 1))
print('\nmap: ', rdd.collect())
rdd = rdd.reduceByKey(lambda a, b: a+b)
print('\nreduceByKey: ', rdd.collect())
flatmap: ['Hello', 'world', 'PySpark', 'is', 'awesome', 'Apache', 'Spark', 'is', 'fast', 'Spark', 'with', 'Python', 'is', 'great', 'Learning', 'Spark', 'is', 'fun'] map: [('Hello', 1), ('world', 1), ('PySpark', 1), ('is', 1), ('awesome', 1), ('Apache', 1), ('Spark', 1), ('is', 1), ('fast', 1), ('Spark', 1), ('with', 1), ('Python', 1), ('is', 1), ('great', 1), ('Learning', 1), ('Spark', 1), ('is', 1), ('fun', 1)] reduceByKey: [('Hello', 1), ('world', 1), ('PySpark', 1), ('is', 4), ('awesome', 1), ('Apache', 1), ('Spark', 3), ('Python', 1), ('fast', 1), ('with', 1), ('great', 1), ('Learning', 1), ('fun', 1)]
Repartition and Coalesce¶
print(rdd.getNumPartitions())
print(rdd.glom().map(len).collect())
2 [8, 5]
rdd = rdd.repartition(4)
print(rdd.getNumPartitions())
print(rdd.glom().map(len).collect())
4 [0, 5, 0, 8]
rdd = rdd.coalesce(3)
print(rdd.getNumPartitions())
print(rdd.glom().map(len).collect())
3 [5, 8, 0]
Create DataFrame¶
person_data = [[1,'Wang','Allen'],
[2,'Alice','Bob']]
person_cols = ['personId', 'lastName', 'firstName']
add_data = [[1,2,'New York City', 'New York'],
[2,3,'Leetcode', 'California']]
add_cols = ['addressId', 'personId', 'city', 'state']
person_df = spark.createDataFrame(person_data, person_cols)
add_df = spark.createDataFrame(add_data, add_cols)
person_df.show()
+--------+--------+---------+ |personId|lastName|firstName| +--------+--------+---------+ | 1| Wang| Allen| | 2| Alice| Bob| +--------+--------+---------+
add_df.show()
+---------+--------+-------------+----------+ |addressId|personId| city| state| +---------+--------+-------------+----------+ | 1| 2|New York City| New York| | 2| 3| Leetcode|California| +---------+--------+-------------+----------+
Print Schema¶
person_df.printSchema()
Combine DataFrames¶
person_df.union(person_df).show()
+--------+--------+---------+ |personId|lastName|firstName| +--------+--------+---------+ | 1| Wang| Allen| | 2| Alice| Bob| | 1| Wang| Allen| | 2| Alice| Bob| +--------+--------+---------+
Join DataFrames¶
res_sdf = person_df.join(add_df, how='left', on='personId')
res_sdf[['firstName', 'lastName', 'city', 'state']].show()
+---------+--------+-------------+--------+ |firstName|lastName| city| state| +---------+--------+-------------+--------+ | Allen| Wang| null| null| | Bob| Alice|New York City|New York| +---------+--------+-------------+--------+
Group By¶
person_df.groupBy('firstname').agg()
<pyspark.sql.group.GroupedData at 0x1bdcab623a0>
User Defined Functions¶
from pyspark.sql.functions import UserDefinedFunction
def test(x):
return x+1
udf = UserDefinedFunction(test)
person_df.select('personId').withColumn('a', udf(person_df.personId)).show()
+--------+---+ |personId| a| +--------+---+ | 1| 2| | 2| 3| +--------+---+
Window Function¶
from pyspark.sql.window import Window
w = Window.partitionBy('deptId').orderBy('name')
df.withColumn('new', rank().over(w))
DateTime¶
from pyspark.sql.functions import col, to_date, to_timestamp, date_format,\
dayofweek, year, month, hour, minute, second
data = [("2021-01-01 10:15:30",), ("2021-06-24 12:01:19",)]
df = spark.createDataFrame(data, ["datetime_str"])
df.show()
df = df.withColumn("date", to_date(col("datetime_str"), "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("timestamp", to_timestamp(col("datetime_str"), "yyyy-MM-dd HH:mm:ss"))
df.show()
df = df.withColumn("year", year(col("timestamp")))
df = df.withColumn("month", month(col("timestamp")))
df = df.withColumn("day", dayofweek(col("timestamp")))
df.show()
df = df.withColumn("formatted_date", date_format(col("timestamp"), "yyyy/MM/dd HH:mm"))
df.show()
PySpark provides a wide range of functions to work with dates and times,
such as add_months, date_add, date_sub, datediff, current_date, current_timestamp,
and many others to perform various date and time-related operations.
# Add 3 months to the start_date
df = df.withColumn("start_plus_3_months", add_months(col("start_date"), 3))
# Subtract 1 month from the end_date
df = df.withColumn("end_minus_1_month", add_months(col("end_date"), -1))
df.show()
# Add 10 days to start_date
df = df.withColumn("start_plus_10_days", date_add(col("start_date"), 10))
# Subtract 5 days from end_date
df = df.withColumn("end_minus_5_days", date_sub(col("end_date"), 5))
df.show()
# Difference in days between start_date and end_date
df = df.withColumn("days_diff", datediff(col("end_date"), col("start_date")))
df.show()
df = df.withColumn("current_date", current_date())
df = df.withColumn("current_timestamp", current_timestamp())
df.show()
Datetime squence¶
from pyspark.sql import SparkSession
from pyspark.sql.functions import sequence, to_date, col, explode, expr
from pyspark.sql.types import DateType
spark = SparkSession.builder.appName("sequence_example").getOrCreate()
df = spark.createDataFrame([("2022-01-01", "2022-01-05")], ["start_date", "end_date"])
df
DataFrame[start_date: string, end_date: string]
# Convert to date type
df = df.withColumn("start_date", to_date(col("start_date"), "yyyy-M-D"))
df = df.withColumn("end_date", to_date(col("end_date"), "yyyy-M-D"))
# df = df.withColumn("start_date", to_date(col("start_date")).cast(DateType()))
# df = df.withColumn("end_date", to_date(col("end_date")).cast(DateType()))
# Generate sequence of dates
df = df.withColumn("date_sequence", sequence(col("start_date"), col("end_date"), expr("interval 1 day")))
df.show()
+----------+----------+--------------------+ |start_date| end_date| date_sequence| +----------+----------+--------------------+ |2022-01-01|2022-01-05|[2022-01-01, 2022...| +----------+----------+--------------------+
# Explode the sequence to have one date per row
df = df.withColumn("date", explode(col("date_sequence")))
df.select("date").show()
+----------+ | date| +----------+ |2022-01-01| |2022-01-02| |2022-01-03| |2022-01-04| |2022-01-05| +----------+
176. Second Highest Salary¶
'''
+----+--------+
| Id | Salary |
+----+--------+
| 1 | 100 |
| 2 | 200 |
| 3 | 300 |
+----+--------+
'''
'\n+----+--------+\n| Id | Salary |\n+----+--------+\n| 1 | 100 |\n| 2 | 200 |\n| 3 | 300 |\n+----+--------+\n'
emp_data = [[1,100],
[2,200],
[3,300]]
emp_cols = ['Id','Salary']
emp_sdf = spark.createDataFrame(emp_data, emp_cols)
w = Window.orderBy(emp_sdf.Salary.desc())
ds = emp_sdf.withColumn('rank', F.rank().over(w))
ds.filter(ds.rank==2)[['Salary']].withColumnRenamed('Salary', 'secondMaxSalary').show()
+---------------+ |secondMaxSalary| +---------------+ | 200| +---------------+
177. Nth Highest Salary¶
emp_data = [[1,100],
[2,200],
[3,300]]
emp_cols = ['Id','Salary']
emp_sdf = spark.createDataFrame(emp_data, emp_cols)
def nth_salary(n, emp_sdf):
w = Window.orderBy(emp_sdf.Salary.desc())
ds = emp_sdf.withColumn('dense_rank', F.dense_rank().over(w))
ds.filter(ds.dense_rank==n)[['Salary']].withColumnRenamed('Salary', 'MaxSalary('+str(n)+')').show()
nth_salary(3, emp_sdf)
+------------+ |MaxSalary(3)| +------------+ | 100| +------------+
178. Rank Scores¶
+----+-------+
| Id | Score |
+----+-------+
| 1 | 3.50 |
| 2 | 3.65 |
| 3 | 4.00 |
| 4 | 3.85 |
| 5 | 4.00 |
| 6 | 3.65 |
+----+-------+
score_data = [[1, 3.50],
[2, 3.65],
[3, 4.00],
[4, 3.85],
[5, 4.00],
[6, 3.65]]
score_cols = ['Id','Score']
df = spark.createDataFrame(score_data, score_cols)
df
DataFrame[Id: bigint, Score: double]
w = Window.orderBy(sdf.Score.desc())
df = sdf.withColumn('rank', F.dense_rank().over(w))
df[['Score', 'rank']].show()
+-----+----+ |Score|rank| +-----+----+ | 4.0| 1| | 4.0| 1| | 3.85| 2| | 3.65| 3| | 3.65| 3| | 3.5| 4| +-----+----+
180. Consecutive Numbers¶
# Write a SQL query to find all numbers that appear at least three times
+----+-----+
| Id | Num |
+----+-----+
| 1 | 1 |
| 2 | 1 |
| 3 | 1 |
| 4 | 2 |
| 5 | 1 |
| 6 | 2 |
| 7 | 2 |
+----+-----+
nums_data = [[1,1],
[2,1],
[3,1],
[4,2],
[5,1],
[6,2],
[7,2]]
nums_col = ['Id','Num']
sdf = spark.createDataFrame(nums_data, nums_col)
w = Window.orderBy(sdf.Id)
df = sdf.withColumn('next', F.lead(sdf.Num).over(w))
df = sdf.withColumn('prev', F.lag(sdf.Num).over(w))
df.show()
+---+---+----+----+ | Id|Num|next|prev| +---+---+----+----+ | 1| 1| 1|null| | 2| 1| 1| 1| | 3| 1| 2| 1| | 4| 2| 1| 1| | 5| 1| 2| 2| | 6| 2| 2| 1| | 7| 2|null| 2| +---+---+----+----+
df = sdf[(sdf['Num']==sdf['next']) & (sdf['Num']==sdf['prev']) & (sdf['next']==sdf['prev'])][['Num']]
df.withColumnRenamed('Num','ConsecutiveNums').show()
+---------------+ |ConsecutiveNums| +---------------+ | 1| +---------------+
181. Employees Earning More Than Their Managers¶
emp_data = [[1, 'Joe', 70000, 3 ],
[2, 'Henry', 80000, 4],
[3, 'Sam', 60000, None],
[4, 'Max', 90000, None]]
emp_cols = ['Id', 'Name', 'Salary', 'ManagerId']
df = spark.createDataFrame(emp_data, emp_cols)
joined_sdf = df.alias('e1').join(sdf.alias('e2'), col('e1.id')==col('e2.ManagerId'), how='inner')
joined_sdf.where('e1.salary>e2.salary').select('e1.name').show()
+----+ |name| +----+ | Max| +----+
182. Duplicate Emails¶
email_data = [[1, 'a@b.com'],
[2, 'c@d.com'],
[3, 'a@b.com']]
email_cols = ['Id', 'Email']
df = spark.createDataFrame(email_data, email_cols)
df.groupBy('Email').count().alias('df').where('df.count>1').select('df.email').show()
+-------+ | email| +-------+ |a@b.com| +-------+
from pyspark.ml.regression import RandomForestRegressor