Data processing with Azure Data Explorer (ADX), PostgreSQL DB and Azure Databricks
Recently we got a requirement to aggregate data from Azure Data Explorer ADX and insert the same to PostgreSQL DB. Considering the volume we want to do the same with Azure Databricks. We also tried Azure Data factory and its very easy and convenient, but considering the future need of customization and flexibility, we leaned towards the Databricks offering. Am not going deep into my requirements or the logic, am just sharing some snippets from the same that may help you in our technical needs. Now lets explore though the same
I hope you have a data bricks environment ready with you to start developing. Please find below document to add keys from KeyVault, this will help you to keep all secrets safely. you can check link to do the same
Now lets start by installing necessary packages
pip install azure-kusto-data azure-kusto-ingest psycopg2
Now lets do the imports, which we will be using in subsequent steps below
from datetime import datetime as dt
from datetime import timedelta
from pandas import DataFrame
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRequestProperties
import psycopg2
import psycopg2.extras
import pandas.io.sql as psql
Read Data From ADX
Now lets read some data from ADX, use this method only when you use the query result to insert, don't use when you have a large data to be ingested from ADX. This is a scheduled task to take results on periodic basis and insert into PostgreSQL. Here you have to provide all your necessary information with your query and the result will be available as a data frame at the end of the process
# setting variables
adx_cluster = "your_adx_cluster_url"
keyVault_Scope = "your_keyvault_scope"
adx_spn_secret = "your_spc_secret"
adx_spn_client_id = "your_spc_clientId"
adx_db = "your_adx_db_name"
authority_id = "your_azure_ad_tenant_id"# creating kusto connection
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(adx_cluster, adx_spn_client_id, adx_spn_secret, authority_id)
kusto_client = KustoClient(kcsb)
print("ADX client Configured")# database query preperation
query = "your_adx_query"# executing query
resp = kusto_client.execute(adx_db, query)
ResFromADX = dataframe_from_result_table(resp.primary_results[0])
print("Count : " + str(len(ResFromADX)))
print(ResFromADX)
Read Data from PostgreSQL DB
in this we will be reading some data from PostgreSQL. In our case we are doing the same to collect the dates from the destination table to get the last processed information, this will help us to process only relevant data without repeating anything
# setting variables
pg_pwd = "your_postgres_pwd"
pg_dbname = "your_postgres_db_name"
pg_uname = "your_postgres_db_user_name"
pg_host = "your_postgres_db_host_url"# creating postgres connection
pg_conn = psycopg2.connect("dbname='"+ pg_dbname +"' user='"+ pg_uname +"' host='"+ pg_host +"' password='"+ pg_pwd +"'")
print("PG client Configured")# database query preparation
query = "your_postgres_query"# executing query
ResFromPg = psql.read_sql(query, pg_conn)
print("Count : " + str(len(ResFromPg)))
print(ResFromPg)
Joining Results from ADX and PostgreSQL
In this step we are using the merge capability to merge the results from above two results, that will give us the starting point to query. This might be irrelevant on your use case.
joinedRes=ResFromADX.merge(ResFromPg, on='your_key_name', how="left")print("Count : " + str(len(joinedRes)))
print(joinedRes)
Function to Insert to PostgreSQL DB
Now we can define a function to insert to PostgreSQL DB
def insertIntoPgTable(df):
if len(df) > 0:
df_columns = list(df)
# create (col1,col2,...)
columns = "\",\"".join(df_columns) # create VALUES('%s', '%s',...) one '%s' per column
values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
#create INSERT INTO table (columns) VALUES('%s',...)
insert_stmt = "INSERT INTO your_table_name (\"{}\") {}".format(columns,values)
try:
cur = pg_conn.cursor()
psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
pg_conn.commit()
print("Insertion to Postgres Completed")
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
conn.rollback()
cur.close()
Iterate with ADX result and Insert to PostgreSQL DB
now we will loop through the joined result and prepare our ADX query and insert the same to PostgreSQL using the defined method above
# looping though the joined result set
for index, row in joinedRes.iterrows():
print("Processing " + str(row.ColumnName))
if(isinstance(row.stringColumnName, str) == False):
continue
if(str(row.DateColumnName).lower() == 'nat' or
str(row.DateColumnName).lower() == 'nan' or
row.DateColumnName is None):
continue
query = "your_query_with_row_ColumnName"
response = kusto_client.execute(adx_db, query)
RepeatedResFromADX = dataframe_from_result_table(response.primary_results[0])if len(RepeatedResFromADX) > 0:
insertIntoPgTable(RepeatedResFromADX);
Now we are good with all our processing snippets. Lets enhance the same with some minor changes below
Format Datetime to query ADX
now lets format the date time column which can be appended with ADX query.
dateTimeFormat = "%Y-%m-%d %H:%M:%S.%f"
dateTimeColumn.strftime(dateTimeFormat)
do date time operations like below
days_to_substract = 5
dateTimeVariable = dt.utcnow() — timedelta(days = days_to_substract)
Get maximum value of column from ADX result
maxColumn = RepeatedResFromADX["ColumnName"]
maxColumnValue = maxColumn.max()
Get Secrets from key Vault
you have to move all your secrets in the notebook to azure key vault and consume them via the utility library
dbutils.secrets.get(scope = keyVault_Scope, key = "your_key_name")
Print logs on debug mode
If you want to print the logs with debug mode with condition, you can use below snippet, you can define the variable on a root or top level, thus the same will be applied across
debugMode = True
print(your_obj_to_print) if debugMode else None
Parameterizing the Notebook
now lets parameterize the notebook, then we can call the notebook dynamically.
dbutils.widgets.text('your_parameter', '')
your_parameter_variable = dbutils.widgets.get('your_parameter')
Invoke notebook with parameters
now we have parameterized our notebook, lets call the same from another notebook.
dbutils.notebook.run("your_notebook_name", 7200, {"your_parameter": value})
Hope this may help your in development.
Keep Coding..