Wednesday, November 15, 2023

PySpark — Retrieve matching rows from two Dataframes

Data integrity refers to the quality, consistency, and reliability of data throughout its life cycle. Data engineering pipelines are methods and structures that collect, transform, store, and analyse data from many sources.

If you are working as a PySpark developer, data engineer, data analyst, or data scientist for any organisation requires you to be familiar with dataframes because data manipulation is the act of transforming, cleansing, and organising raw data into a format that can be used for analysis and decision making.


For example, you have some user’s data in dataframe-1, and you have to new users’ data in a dataframe-2, then you must find out all the matched records from dataframe-2 and dataframe-1. In PySpark, you can retrieve matching rows from two Dataframes using the join operation. The join operation combines rows from two Dataframes based on a common column.

# importing sparksession from  
from pyspark.sql import SparkSession
# pyspark.sql module
from pyspark.sql.functions import col
# Create a Spark session and giving an app name
spark = SparkSession.builder.appName("UpdateMutliColumns"
).getOrCreate()

Dataset 1: In this dataset, we have three columns such as Name, Age and Occupation and have a pre-defined schema for our PySpark dataframe as given below — 

# Sample data for DataFrame1
dataset1 = [("Ryan Arjun"
, 25, "Engineer"),          ("Kimmy Wang", 30, "Data Scientist"),          ("Saurabh Yadav", 22, "Analyst")]

# Define the schema for DataFrame1
ds_schema1 = ["Name"
, "Age", "Occupation"]

PySpark Dataframe 1 from dataset 1 — In PySpark, we are going to call already existing pre-defined createDataFrame function which takes two parameters such as data and schema and passing the above dataset1 and ds_schema1 as given below-  


# Create DataFrames
df1 = spark.createDataFrame(dataset1, schema=ds_schema1)

### show the schema of the dataframe
df1.printSchema()
# Show the original DataFrames
print("DataFrame 1:")
df1.show()



Dataset 2:
 In this dataset, we have three columns such as Name, Sex and Country and have a pre-defined schema for our PySpark dataframe as given below —

# Sample data for DataFrame2
dataset2 = [("Ryan Arjun"
, "Male", "Indian"),          ("Kimmy Wang", "Female", "Japan"),          ("Lovish Singh", "Male", "China")]

# Define the schema for DataFrame2
ds_schema2 = ["Name"
, "Gender", "Country"]

 

PySpark Dataframe 2 from dataset 2 — In PySpark, we are going to call already existing pre-defined createDataFrame function which takes two parameters such as data and schema and passing the above dataset1 and ds_schema2 as given below-

# Create DataFrames for second dataset
df2 = spark.createDataFrame(dataset2, schema=ds_schema2)
### show the schema of the dataframe
df2.printSchema()
# Show the original DataFrames
print("DataFrame 2:") df2.show()


Get matching records from both dataframes — In this example, df1.join(df2, “Name”, “inner”) performs an inner join based on the “Name” column. The resulting DataFrame, joined_df, contains only the rows where the “Name” column is common in both Dataframes as given below —

# Join DataFrames based on the "Name" column
joined_df = df1.join(df2, "Name", "inner")
### show the schema of the dataframe
joined_df.printSchema()
# Show the original DataFrames
print("DataFrame with Matching rows:")
joined_df.show()



Note: You can adjust the join type (inner, left, right, full) based on your specific requirements. Additionally, if the column names are different in the two Dataframes, you can specify the join condition explicitly using the on parameter. You can adjust the join condition based on your specific use case and column names.

 

Now, you can see that it is just piece of cake to get the matching records from both dataframe based on your matching keys.

Lets learn more on the data validation side which is the most important part of the data engineering.

Data validation — Data validation is the process of checking the data against predefined rules and standards, such as data types, formats, ranges, and constraints.

  1. 💫Schema Validation: Verify data adherence to predefined schemas, checking types, formats, and structures.
  2. 💫Integrity Constraints: Enforce rules and constraints to maintain data integrity, preventing inconsistencies.
  3. 💫Cross-Field Validation: Validate relationships and dependencies between different fields to ensure logical coherence.
  4. 💫Data Quality Metrics: Define and track quality metrics, such as completeness, accuracy, and consistency.
  5. 💫Automated Validation Scripts: Develop and run automated scripts to check data against predefined rules and criteria.

 

To learn more, please follow us -
🔊 http://www.sql-datatools.com

To Learn more, please visit our YouTube channel at —
🔊 http://www.youtube.com/c/Sql-datatools

To Learn more, please visit our Instagram account at -
🔊 https://www.instagram.com/asp.mukesh/

To Learn more, please visit our twitter account at -
🔊
 https://twitter.com/macxima

Tuesday, November 14, 2023

PySpark— Update multiple columns in a dataframe

Working as a PySpark developer, data engineer, data analyst, or data scientist for any organisation requires you to be familiar with dataframes because data manipulation is the act of transforming, cleansing, and organising raw data into a format that can be used for analysis and decision making.

 

Note: We are using Databricks environment to articulate this example.


We understand, we can add a column to a dataframe and update its values to the values returned from a function or other dataframe column’s values as given below -

## importing sparksession from  
## pyspark.sql module
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Create a Spark session and giving an app name
spark = SparkSession.builder.appName("UpdateMutliColumns"
).getOrCreate()

When you see data in a list in PySpark, it signifies you have a collection of data in a PySpark driver. This collection will be parallelized when you construct a DataFrame. Here, we have 5 elements in a list and let’s convert this to a DataFrame as given below —

### Create a list of data
MyData=[("Finance"
,2), ("Marketing",4), ("Sales",6), ("IT",8),("Admin",9) ]

#### convert above list to a DataFrame
sdf =  spark.createDataFrame(data=MyData, schema=['Dept'
,'Code'])

### show the schema of the dataframe
sdf.printSchema()

## Show the DataFrame
sdf.show(10
, False)


The most important aspect of Spark SQL & DataFrame is PySpark UDF (User Defined Function), which is used to enhance the PySpark built-in capabilities.

 

Note — UDFs are the costliest procedures, therefore use them only when you have no other option and when absolutely necessary. In the next part, I will explain in detail why utilising UDFs is a costly activity.

 

User-defined scalar functions — Python : This page covers examples of Python user-defined functions (UDFs). It demonstrates how to register and invoke UDFs.

 

#create square() function to return single value 
#passing variable is x
#return single value
def square(x):
  
return x*x
  
#create Cube function to return single value
#passing variable is x
#return cubes
def Cube(x):
  
return x*x*x

 

Register a function as a UDF — In PySpark, you can add custom UDFs in PySpark spark context as given below-

 

## Register the function as a UDF (User-Defined Function)
spark.udf.register("square_udf"
, square)
spark.udf.register("Cube_udf"
, Cube)

 

Add and Update multiple columns in a dataframe — If you want to update multiple columns in dataframe then you should make sure that these columns must be present in your dataframe. In case, updated columns are not in your dataframe, you must create them as given below — 

 

### Lets add a new column in the dataframe as SalesAmount
sdf2=sdf.withColumn("Square"
, expr("square_udf(Code)"))
sdf2=sdf2.withColumn("Cube"
, expr("Cube_udf(Code)"))

## Show the DataFrame
sdf2.show(10
, False)



Based on the official documentationwithColumn returns a new DataFrame by adding a column or replacing the existing column that has the same name.

 

Now, the above example shows you how to update multiple columns inside your dataframe in PySpark. By using withColumn, you can only create or modify one column at each time.

 

To learn more, please follow us -
http://www.sql-datatools.com

To Learn more, please visit our YouTube channel at —
http://www.youtube.com/c/Sql-datatools

To Learn more, please visit our Instagram account at -
https://www.instagram.com/asp.mukesh/

To Learn more, please visit our twitter account at -
https://twitter.com/macxima

To Learn more, please visit our medium account at -
https://macxima.medium.com/