Wednesday, June 5, 2024

DataBricks — Returning Customers within 7 days in PySpark Dataframe

PySpark is a Python API for Apache Spark, whereas Apache Spark is an Analytical Processing Engine for large scale sophisticated distributed data processing and machine learning applications.

If you are working as a PySpark developer, data engineer, data analyst, or data scientist for any organization then it requires you to be familiar with dataframes because data manipulation is the act of transforming, cleansing, and organising raw data into a format that can be used for analysis and decision making.

For this tutorial, we are using Databricks Community Edition which is web-based, cost-free services from the Databricks.





Requested Task — You are given transaction Dataset where we need to identify returning active customers.
    • Only those returning active customers Or users
    • Who made a transaction within 7 days of their first purchase.

Step 1: Import all the necessary libraries in our code as given below —

#import all the libraries of pyspark.sql
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from datetime import datetime
Step 2: Create Or Get the SparkSession — Initialize Spark Session-
#create or get SparkSession
spark=SparkSession.builder.appName("ReturningCustomers").getOrCreate()

Step 3: Build Sample Transaction data and Schema of the data —

### Transactional Sample data
data = [
(1, 101, datetime.strptime('2024-05-01 10:00:00', '%Y-%m-%d %H:%M:%S'),200.00),
(2, 101, datetime.strptime('2024-05-05 14:30:00', '%Y-%m-%d %H:%M:%S'),300.00),
(3, 102, datetime.strptime('2024-05-01 12:00:00', '%Y-%m-%d %H:%M:%S'),220.00),
(4, 103, datetime.strptime('2024-05-01 15:00:00', '%Y-%m-%d %H:%M:%S'),330.00),
(5, 101, datetime.strptime('2024-05-12 10:00:00', '%Y-%m-%d %H:%M:%S'),400.00),
(6, 102, datetime.strptime('2024-05-10 10:00:00', '%Y-%m-%d %H:%M:%S'),180.00),
(7, 103, datetime.strptime('2024-05-08 16:00:00', '%Y-%m-%d %H:%M:%S'),120.00),
(8, 104, datetime.strptime('2024-05-01 17:00:00', '%Y-%m-%d %H:%M:%S'),210.00),
(9, 104, datetime.strptime('2024-05-08 18:00:00', '%Y-%m-%d %H:%M:%S'),205.00),
(10, 105, datetime.strptime('2024-05-10 11:00:00', '%Y-%m-%d %H:%M:%S'),105.00),
(11, 101, datetime.strptime('2024-05-09 11:00:00', '%Y-%m-%d %H:%M:%S'),200.00)

]

##### Schema of the transactional data
data_schema = StructType([StructField("TransactionID", IntegerType(), True),
StructField("UserID", IntegerType(), True),
StructField("TransactionDate", TimestampType(), True),
StructField("Amount", FloatType(), True)])

Step 4: Load sample data into PySpark dataframe and display the dataset as given below —

### load sample data into dataframe
df = spark.createDataFrame(data=data, schema=data_schema)

#
#### Display
df.display()

Step 5: Print the schema of the dataframe as given below —

##### print the schema of the dataframe 
df.printSchema()

In PySpark, the Window specification allows you to define a window of rows that surrounds the current row. The unboundedPreceding and unboundedFollowing are unique window frame borders that allow you to create windows that encompass all rows from the beginning to the current row or from the current row to the end, respectively.

* unboundedPreceding: All rows from the beginning of the partition to the present row.

* unboundedFollowing: includes all rows from the current row to the conclusion of the partition.

Step 6: Use of window functions — The Window function is used to partition and order the data by UserId and TansactionDateas given below —

# Define the window specification
specification_window= Window.partitionBy("UserID").orderBy(asc("TransactionDate")).rowsBetween(Window.unboundedPreceding , Window.unboundedFollowing)

Step 7: Create Dictionary of columns: Creating three columns such as

  • FirstTransDate column on the first TransactionDate over by calling window specification variable,
  • SecTransDate Column on the nth_value for TranscationDate over by calling window specification variable,
  • Date_Diff columns to get the days between two first transaction date
# Dictionary of columns
purchase_columns = {"FirstTransDate": first("TransactionDate").over(specification_window) ,
"SecTransDate": nth_value("TransactionDate",2).over(specification_window),
"date_diff" : datediff(col("SecTransdate") , col("FirstTransDate"))}

### Print
purchase_columns

Step 8: Add new columns in the existing dataframe as given below -

# Add the firstdate, secdate, and date_difference columns 
df = df.withColumns(purchase_columns)

#display records
df.show()

Step 8: Filter based on Date Difference: Apply 7 days logic for each customer on the each row as given below —

#filter based on the date_difference
df = df.filter(col('date_diff') <=7)

#display records
df.show()

Step 9: Show distinct customers who fulfill second transaction within 7 days of their first purchase as given below —

# Display the distinct results
df.select("UserID","FirstTransDate","SecTransDate").distinct().display()

Conclusion 
In the above demo, you can see that there is no need to check or build any logic to compare each row to identify the customers who made a transaction within 7 days of their first purchase.

You can utilize some features of window modules to identify the customers who made a transaction within 7 days of their first purchase.
To learn more, please follow us —

http://www.sql-datatools.com

To Learn more, please visit our YouTube channel at — http://www.youtube.com/c/Sql-datatools

To Learn more, please visit our Instagram account at — https://www.instagram.com/asp.mukesh/

To Learn more, please visit our twitter account at — https://twitter.com/macxima

No comments:

Post a Comment