ADF Data Flows: Databricks Notebook ETL vs. ADF Visual Data Flow ETL

The new (preview) feature in Azure Data Factory called Data Flows, allows you to visually design, build, debug, and execute data transformations at scale on Spark by leveraging Azure Databricks clusters. You can then operationalize your data flows inside a general ADF pipeline with scheduling, triggers, monitoring, etc.

ADF Data Flows (preview) provides a visually-oriented design paradigm meant for code-free data transformation. You can also use ADF to execute code in Databricks, if you prefer to write code, using Databricks Notebooks, Python, JARs, etc. using the ADF pipeline activities.

What I wanted to do in this post was to demonstrate how to perform the same functions of a Databricks Notebook example using ADF Data Flows. In this demo, I’m going to use the European Football statistics sample from Kaggle (aka “Soccer stats”) that is used in this Databricks sample Notebook: Analyze Games from European Soccer Leagues with Apache Spark and Databricks. I am going to recreate the ETL Data Engineering steps with ADF. The completed JSON for this Data Flow is here.

Step 1:

Extract data from source CSV into DataFrames. In ADF, you will build a new Data Flow and use a Source transformation that points to that CSV file. Turn debug on so that you can see the same data profile stats and data preview as the Notebooks demo illustrates.

From Databricks:

from pyspark.sql.types import *

schema = (StructType().
          add("id_odsp", StringType()).add("id_event", StringType()).add("sort_order", IntegerType()).
          add("time", IntegerType()).add("text", StringType()).add("event_type", IntegerType()).
          add("event_type2", IntegerType()).add("side", IntegerType()).add("event_team", StringType()).
          add("opponent", StringType()).add("player", StringType()).add("player2", StringType()).
          add("player_in", StringType()).add("player_out", StringType()).add("shot_place", IntegerType()).
          add("shot_outcome", IntegerType()).add("is_goal", IntegerType()).add("location", IntegerType()).
          add("bodypart", IntegerType()).add("assist_method", IntegerType()).add("situation", IntegerType()).
          add("fast_break", IntegerType())
         )

eventsDf = (spark.read.csv("/data/eu-soccer-events/input/events.csv", 
                         schema=schema, header=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True,
                         nullValue='NA'))

eventsDf = eventsDf.na.fill({'player': 'NA', 'event_team': 'NA', 'opponent': 'NA', 
                             'event_type': 99, 'event_type2': 99, 'shot_place': 99, 
                             'shot_outcome': 99, 'location': 99, 'bodypart': 99, 
                             'assist_method': 99, 'situation': 99})
display(eventsDf)

image8-2-1024x393

From Kaggle:

kaggle

ADF Data Flows (Source Transform):

soccer1

Step 2:

Transform Data by Mapping Names to Codes. The Kaggle data has codified fields which need to be mapped to names for analysis.

Databricks:


eventsDf = (
eventsDf.
withColumn("event_type_str", mapKeyToVal(evtTypeMap)("event_type")).
withColumn("event_type2_str", mapKeyToVal(evtTyp2Map)("event_type2")).
withColumn("side_str", mapKeyToVal(sideMap)("side")).
withColumn("shot_place_str", mapKeyToVal(shotPlaceMap)("shot_place")).
withColumn("shot_outcome_str", mapKeyToVal(shotOutcomeMap)("shot_outcome")).
withColumn("location_str", mapKeyToVal(locationMap)("location")).
withColumn("bodypart_str", mapKeyToVal(bodyPartMap)("bodypart")).
withColumn("assist_method_str", mapKeyToVal(assistMethodMap)("assist_method")).
withColumn("situation_str", mapKeyToVal(situationMap)("situation"))
)

joinedDf = (
eventsDf.join(gameInfDf, eventsDf.id_odsp == gameInfDf.id_odsp, ‘inner’).
select(eventsDf.id_odsp, eventsDf.id_event, eventsDf.sort_order, eventsDf.time, eventsDf.event_type, eventsDf.event_type_str, eventsDf.event_type2, eventsDf.event_type2_str, eventsDf.side, eventsDf.side_str, eventsDf.event_team, eventsDf.opponent, eventsDf.player, eventsDf.player2, eventsDf.player_in, eventsDf.player_out, eventsDf.shot_place, eventsDf.shot_place_str, eventsDf.shot_outcome, eventsDf.shot_outcome_str, eventsDf.is_goal, eventsDf.location, eventsDf.location_str, eventsDf.bodypart, eventsDf.bodypart_str, eventsDf.assist_method, eventsDf.assist_method_str, eventsDf.situation, eventsDf.situation_str, gameInfDf.country_code)
)

ADF: Use IIF or Case in Expression Builder, or use Lookup with mapped files. Derived column with Case expression:

soccer4

Lookup transformation using a file with mapped values. In this case, we won’t hard-code names in the expression. Instead, we’ll use an external file with the mapped names and join it via Lookup.

soccer3

Step 3: 

Bucketize data based on match time with 10 bins

Databricks:


from pyspark.ml.feature import QuantileDiscretizer

joinedDf = QuantileDiscretizer(numBuckets=10, inputCol=”time”, outputCol=”time_bin”).fit(joinedDf).transform(joinedDf)

display(joinedDf)

ADF Data Flow: Window transformation using nTile (10)

soccer6

Step 4: Load Data into Database as Final Step

Databricks:


%sql
CREATE DATABASE IF NOT EXISTS EURO_SOCCER_DB
LOCATION "dbfs:/FileStore/databricks-abhinav/eu-soccer-events/interm"


%sql USE EURO_SOCCER_DB


joinedDf.write.saveAsTable("GAME_EVENTS", format = "parquet", mode = "overwrite", partitionBy = "COUNTRY_CODE", path = "dbfs:/FileStore/databricks-abhinav/eu-soccer-events/interm/tr-events")

ADF: Sink Transformation

soccer5

Advertisements

4 thoughts on “ADF Data Flows: Databricks Notebook ETL vs. ADF Visual Data Flow ETL

    1. We have not yet published performance metrics. But you can adjust the performance on both mechanisms because in both cases, you control the size and scale of the Azure Databricks clusters. Additionally, in both approaches, you can control the data partitioning schemes which will have large impacts on the performance for big data scenarios. In the Notebooks case, you can set the partitioning in code. In Data Flow, you set the partitioning per transformation (including source & sink) in the UI using the Optimize tab: https://github.com/kromerm/adfdataflowdocs/blob/master/Transformations/adf-data-flow-transformations-optimize-tab.md

  1. On a similar vein to this article, I would love to see what Databricks queries are auto-generated by various ADF Data Flow components, i.e., a 2 column table: [Data Factory Component], [Resulting Databricks Code]. This would be fairly easy – albeit time consuming – to create: monitor the cluster as you execute various Data Flows. However, I understand that an article like like this grow stale as MS refactors the Data Flow query federation code.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s