Read data from Azure Blob Store and Data Lake using Azure Databricks
From Blob Store
In this article we will be covering a very basic data import from Azure Blob store using Azure Databricks
In this approach we will be using mounting facility on Databricks, this will help us to mount our blob store as a mount volume to the Databricks and then we can do further queries on the mount.
storage_account_name = "your_storage_account_name"storage_account_key = "your_storage_account_key"container = "your_blob_container_name"# Set the configuration details
spark.conf.set("fs.azure.account.key.{0}.blob.core.windows.net".format(storage_account_name), storage_account_key)dbutils.fs.mount(
source = "wasbs://{0}@{1}.blob.core.windows.net".format(container, storage_account_name),
mount_point = "/mount_folder_name",
extra_configs = {"fs.azure.account.key.{0}.blob.core.windows.net".format(storage_account_name): storage_account_key}
)
now we have successfully mounted our blob storage as a mount volume, now lets validate the files present on the same
display(dbutils.fs.ls("/mount_folder_name/your_sub_folder"))
Now lets do some queries on the volume
import pyspark.sql.functions as psf
# Create conditions to filter DFs
target_string = "value"target_array = ["value1", "value2"]condition_statement = (
(psf.col("Your_String_Column_1") == target_string) &
(psf.col("Your_String_Column_2").isin(target_array))
)df = spark.read.option("header",True).format("json").load(f"/mount_folder_name/your_sub_folder/*.json.gz").filter(condition_statement)
#you can specify, required file format and change the above accordingly.
display(df)
Now we are able to see the data frames contains the data from our blob store
From Data lake
Now lets ingest similar kind of data from Data lake
In this we will be going bit more advanced level with some date based partitioning on the data lake and process only the specific dates
First do some imports
import datetime
import pandas
import pyspark.sql.functions as psf
from datetime import timedelta
from functools import reduce
from pyspark.sql import DataFrame
lets set some configurations so that you can acess your data lake
spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential")
spark.conf.set("dfs.adls.oauth2.client.id", "your_client_id")
spark.conf.set("dfs.adls.oauth2.credential", "your_client_credentials")
spark.conf.set("dfs.adls.oauth2.refresh.url", "https://login.microsoftonline.com/{your_tenant_id}/oauth2/token")
lets prepare the date range
start_date = datetime.date(2022, 9, 12)
end_date = datetime.date(2022, 9, 15)target_daily_dates_range = pandas.date_range(start_date, end_date, freq='d').strftime("DateTimeSource_DT=%Y-%m-%d")
Lets give the base path for the data lake
data_root_folder = "adl://your_data_lake.azuredatalakestore.net/your_sub_folders/"
Now lets loop through the paths and prepare the data frame
for target_date in target_daily_dates_range:
print(f"Loading {data_root_folder}{target_date}")
target_file = f"{data_root_folder}{target_date}"
temp_df = spark.read.format("delta").load(target_file)
target_dates_dfs.append(temp_df)
Now we have a collection of data frames lets combine and print the same
target_dates_dfs_merged = reduce(DataFrame.union, target_dates_dfs)display(target_dates_dfs_merged)
You can combine with filter statement from the blob storage steps, if you want to filter the same
Happy Coding… :)