Data processing with Azure Data Explorer (ADX), PostgreSQL DB and Azure Databricks

Gibin Francis
4 min readSep 12, 2022

--

Recently we got a requirement to aggregate data from Azure Data Explorer ADX and insert the same to PostgreSQL DB. Considering the volume we want to do the same with Azure Databricks. We also tried Azure Data factory and its very easy and convenient, but considering the future need of customization and flexibility, we leaned towards the Databricks offering. Am not going deep into my requirements or the logic, am just sharing some snippets from the same that may help you in our technical needs. Now lets explore though the same

I hope you have a data bricks environment ready with you to start developing. Please find below document to add keys from KeyVault, this will help you to keep all secrets safely. you can check link to do the same

Now lets start by installing necessary packages

pip install azure-kusto-data azure-kusto-ingest psycopg2

Now lets do the imports, which we will be using in subsequent steps below

from datetime import datetime as dt
from datetime import timedelta
from pandas import DataFrame
from azure.kusto.data.exceptions import KustoServiceError
from azure.kusto.data.helpers import dataframe_from_result_table
from azure.kusto.data import KustoClient, KustoConnectionStringBuilder, ClientRequestProperties
import psycopg2
import psycopg2.extras
import pandas.io.sql as psql

Read Data From ADX

Now lets read some data from ADX, use this method only when you use the query result to insert, don't use when you have a large data to be ingested from ADX. This is a scheduled task to take results on periodic basis and insert into PostgreSQL. Here you have to provide all your necessary information with your query and the result will be available as a data frame at the end of the process

# setting variables
adx_cluster = "your_adx_cluster_url"
keyVault_Scope = "your_keyvault_scope"
adx_spn_secret = "your_spc_secret"
adx_spn_client_id = "your_spc_clientId"
adx_db = "your_adx_db_name"
authority_id = "your_azure_ad_tenant_id"
# creating kusto connection
kcsb = KustoConnectionStringBuilder.with_aad_application_key_authentication(adx_cluster, adx_spn_client_id, adx_spn_secret, authority_id)
kusto_client = KustoClient(kcsb)
print("ADX client Configured")
# database query preperation
query = "your_adx_query"
# executing query
resp = kusto_client.execute(adx_db, query)
ResFromADX = dataframe_from_result_table(resp.primary_results[0])

print("Count : " + str(len(ResFromADX)))
print(ResFromADX)

Read Data from PostgreSQL DB

in this we will be reading some data from PostgreSQL. In our case we are doing the same to collect the dates from the destination table to get the last processed information, this will help us to process only relevant data without repeating anything

# setting variables
pg_pwd = "your_postgres_pwd"
pg_dbname = "your_postgres_db_name"
pg_uname = "your_postgres_db_user_name"
pg_host = "your_postgres_db_host_url"
# creating postgres connection
pg_conn = psycopg2.connect("dbname='"+ pg_dbname +"' user='"+ pg_uname +"' host='"+ pg_host +"' password='"+ pg_pwd +"'")
print("PG client Configured")
# database query preparation
query = "your_postgres_query"
# executing query
ResFromPg = psql.read_sql(query, pg_conn)

print("Count : " + str(len(ResFromPg)))
print(ResFromPg)

Joining Results from ADX and PostgreSQL

In this step we are using the merge capability to merge the results from above two results, that will give us the starting point to query. This might be irrelevant on your use case.

joinedRes=ResFromADX.merge(ResFromPg, on='your_key_name', how="left")print("Count : " + str(len(joinedRes)))
print(joinedRes)

Function to Insert to PostgreSQL DB

Now we can define a function to insert to PostgreSQL DB

def insertIntoPgTable(df):
if len(df) > 0:
df_columns = list(df)
# create (col1,col2,...)
columns = "\",\"".join(df_columns)
# create VALUES('%s', '%s',...) one '%s' per column
values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))

#create INSERT INTO table (columns) VALUES('%s',...)
insert_stmt = "INSERT INTO your_table_name (\"{}\") {}".format(columns,values)
try:
cur = pg_conn.cursor()
psycopg2.extras.execute_batch(cur, insert_stmt, df.values)
pg_conn.commit()
print("Insertion to Postgres Completed")
cur.close()
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
conn.rollback()
cur.close()

Iterate with ADX result and Insert to PostgreSQL DB

now we will loop through the joined result and prepare our ADX query and insert the same to PostgreSQL using the defined method above

# looping though the joined result set
for index, row in joinedRes.iterrows():

print("Processing " + str(row.ColumnName))

if(isinstance(row.stringColumnName, str) == False):
continue

if(str(row.DateColumnName).lower() == 'nat' or
str(row.DateColumnName).lower() == 'nan' or
row.DateColumnName is None):
continue

query = "your_query_with_row_ColumnName"
response = kusto_client.execute(adx_db, query)
RepeatedResFromADX = dataframe_from_result_table(response.primary_results[0])
if len(RepeatedResFromADX) > 0:
insertIntoPgTable(RepeatedResFromADX);

Now we are good with all our processing snippets. Lets enhance the same with some minor changes below

Format Datetime to query ADX

now lets format the date time column which can be appended with ADX query.

dateTimeFormat = "%Y-%m-%d %H:%M:%S.%f"
dateTimeColumn.strftime(dateTimeFormat)

do date time operations like below

days_to_substract = 5
dateTimeVariable = dt.utcnow() — timedelta(days = days_to_substract)

Get maximum value of column from ADX result

maxColumn = RepeatedResFromADX["ColumnName"]
maxColumnValue = maxColumn.max()

Get Secrets from key Vault

you have to move all your secrets in the notebook to azure key vault and consume them via the utility library

dbutils.secrets.get(scope = keyVault_Scope, key = "your_key_name")

Print logs on debug mode

If you want to print the logs with debug mode with condition, you can use below snippet, you can define the variable on a root or top level, thus the same will be applied across

debugMode = True
print(your_obj_to_print) if debugMode else None

Parameterizing the Notebook

now lets parameterize the notebook, then we can call the notebook dynamically.

dbutils.widgets.text('your_parameter', '')
your_parameter_variable = dbutils.widgets.get('your_parameter')

Invoke notebook with parameters

now we have parameterized our notebook, lets call the same from another notebook.

dbutils.notebook.run("your_notebook_name", 7200, {"your_parameter": value})

Hope this may help your in development.

Keep Coding..

--

--

Gibin Francis
Gibin Francis

Written by Gibin Francis

Technical guy interested in MIcrosoft Technologies, IoT, Azure, Docker, UI framework, and more

No responses yet