ADF Data Flows: Databricks Notebook ETL vs. ADF Visual Data Flow ETL

The new (preview) feature in Azure Data Factory called Data Flows, allows you to visually design, build, debug, and execute data transformations at scale on Spark by leveraging Azure Databricks clusters. You can then operationalize your data flows inside a general ADF pipeline with scheduling, triggers, monitoring, etc.

ADF Data Flows (preview) provides a visually-oriented design paradigm meant for code-free data transformation. You can also use ADF to execute code in Databricks, if you prefer to write code, using Databricks Notebooks, Python, JARs, etc. using the ADF pipeline activities.

What I wanted to do in this post was to demonstrate how to perform the same functions of a Databricks Notebook example using ADF Data Flows. In this demo, I’m going to use the European Football statistics sample from Kaggle (aka “Soccer stats”) that is used in this Databricks sample Notebook: Analyze Games from European Soccer Leagues with Apache Spark and Databricks. I am going to recreate the ETL Data Engineering steps with ADF. The completed JSON for this Data Flow is here.

Step 1:

Extract data from source CSV into DataFrames. In ADF, you will build a new Data Flow and use a Source transformation that points to that CSV file. Turn debug on so that you can see the same data profile stats and data preview as the Notebooks demo illustrates.

From Databricks:

from pyspark.sql.types import *

schema = (StructType().
          add("id_odsp", StringType()).add("id_event", StringType()).add("sort_order", IntegerType()).
          add("time", IntegerType()).add("text", StringType()).add("event_type", IntegerType()).
          add("event_type2", IntegerType()).add("side", IntegerType()).add("event_team", StringType()).
          add("opponent", StringType()).add("player", StringType()).add("player2", StringType()).
          add("player_in", StringType()).add("player_out", StringType()).add("shot_place", IntegerType()).
          add("shot_outcome", IntegerType()).add("is_goal", IntegerType()).add("location", IntegerType()).
          add("bodypart", IntegerType()).add("assist_method", IntegerType()).add("situation", IntegerType()).
          add("fast_break", IntegerType())

eventsDf = ("/data/eu-soccer-events/input/events.csv", 
                         schema=schema, header=True, 

eventsDf ={'player': 'NA', 'event_team': 'NA', 'opponent': 'NA', 
                             'event_type': 99, 'event_type2': 99, 'shot_place': 99, 
                             'shot_outcome': 99, 'location': 99, 'bodypart': 99, 
                             'assist_method': 99, 'situation': 99})


From Kaggle:


ADF Data Flows (Source Transform):


Step 2:

Transform Data by Mapping Names to Codes. The Kaggle data has codified fields which need to be mapped to names for analysis.


eventsDf = (
withColumn("event_type_str", mapKeyToVal(evtTypeMap)("event_type")).
withColumn("event_type2_str", mapKeyToVal(evtTyp2Map)("event_type2")).
withColumn("side_str", mapKeyToVal(sideMap)("side")).
withColumn("shot_place_str", mapKeyToVal(shotPlaceMap)("shot_place")).
withColumn("shot_outcome_str", mapKeyToVal(shotOutcomeMap)("shot_outcome")).
withColumn("location_str", mapKeyToVal(locationMap)("location")).
withColumn("bodypart_str", mapKeyToVal(bodyPartMap)("bodypart")).
withColumn("assist_method_str", mapKeyToVal(assistMethodMap)("assist_method")).
withColumn("situation_str", mapKeyToVal(situationMap)("situation"))

joinedDf = (
eventsDf.join(gameInfDf, eventsDf.id_odsp == gameInfDf.id_odsp, ‘inner’).
select(eventsDf.id_odsp, eventsDf.id_event, eventsDf.sort_order, eventsDf.time, eventsDf.event_type, eventsDf.event_type_str, eventsDf.event_type2, eventsDf.event_type2_str, eventsDf.side, eventsDf.side_str, eventsDf.event_team, eventsDf.opponent, eventsDf.player, eventsDf.player2, eventsDf.player_in, eventsDf.player_out, eventsDf.shot_place, eventsDf.shot_place_str, eventsDf.shot_outcome, eventsDf.shot_outcome_str, eventsDf.is_goal, eventsDf.location, eventsDf.location_str, eventsDf.bodypart, eventsDf.bodypart_str, eventsDf.assist_method, eventsDf.assist_method_str, eventsDf.situation, eventsDf.situation_str, gameInfDf.country_code)

ADF: Use IIF or Case in Expression Builder, or use Lookup with mapped files. Derived column with Case expression:


Lookup transformation using a file with mapped values. In this case, we won’t hard-code names in the expression. Instead, we’ll use an external file with the mapped names and join it via Lookup.


Step 3: 

Bucketize data based on match time with 10 bins


from import QuantileDiscretizer

joinedDf = QuantileDiscretizer(numBuckets=10, inputCol=”time”, outputCol=”time_bin”).fit(joinedDf).transform(joinedDf)


ADF Data Flow: Window transformation using nTile (10)


Step 4: Load Data into Database as Final Step


LOCATION "dbfs:/FileStore/databricks-abhinav/eu-soccer-events/interm"


joinedDf.write.saveAsTable("GAME_EVENTS", format = "parquet", mode = "overwrite", partitionBy = "COUNTRY_CODE", path = "dbfs:/FileStore/databricks-abhinav/eu-soccer-events/interm/tr-events")

ADF: Sink Transformation


Azure Data Factory Data Flow Preview: Sabermetrics Example

Let’s walk through an end-to-end sample scenario that utilizes the new Azure Data Factory Data Flow feature. Some of the patterns that I’ll demonstrate here are very common in ETL data integration projects, which is the target use case for ADF Data Flow.

Data Flow in ADF is current in limited preview and available only via whitelisting of your Azure Subscription. You can submit your request for access by filling out this form. All associated online documentation, videos and getting started guides are available here.

Bill James is known as the father of baseball statistics and considered a genius for those of us who love baseball stats and his Sabermetrics methodology of analyzing baseball through advanced statistics. So, rather than use a typical SQL Server or SSIS example ETL data integration project around books, bikes or toys samples, I thought I’d build a sample data flow using baseball statistics. The raw data that I used here is available for download from a number of different sites. I used the stats CSVs that cover up to the 2017 season from Chadwick Baseball Bureau:


I first created one data flow that used the raw at-bats, walks, hits, HRs, etc. to calculate basic stats batting average (BA), on-base % (OBP), Slugging (SLG) and total bases (TB). I called this Data Flow “Baseball 1 Basic Stats”. I’ll show you how I used Aggregation transforms for grouping to combine the players by player ID and year:


There are 2 source files. The raw batting stats from the above CSV screenshot and a CSV that contains details about each player. In ADF Data Flow, sources are all on the left-hand side of the design surface with vertical bars. The Sinks are your destination outputs with vertical bars on the right-hand side of your flows. In this case, I join the 2 sources of data together using a composite key that I created using ADF Data Flow’s expression language so that I can ensure that I am grouping players by name and year. You can see that key in the Join Condition below:


My sink destination is an Azure SQL Database where I generate a new schema on the fly. I do not need to define that target table schema. Data Flow will create the SQL tables for me:


To create calculations in ADF Data Flow, you will use transformations like Aggregate and Derived Column. In ETL patterns, these transformations will be very commonly used. Notice in the Data Flow diagram at the top of this blog post, there is a straight vertical line coming down from the first source. That is using the “New Branch” transform which tells Data Flow to duplicate the data stream. You will do this often in data transformation routines that use aggregates. When you aggregate data, the output schema will only contain the fields that are used in your group-by clause and your aggregate functions. So I create a separate branch just for my aggregations and I’ll join this back together with the rest of the fields that I wish to sink in the end. You can always see what metadata is coming in and metadata coming out of each transform by looking at the Inspect tab, which also shows data lineage for how you’ve derived your column values:


Also, on the Inspect tab, you can view a preview of your data as you are building your transformation. You do this by first clicking the Debug button on the top left of the Data Flow design window. Debug mode will allow you to work in a data prep environment where you can view the data as it is being transformed throughout each step in order to validate your transformation routine. I can set this to sample 50 rows out of the 100k+ rows in my dataset. It also will not require me to write the data into my Azure SQL DB:


You will see two Joins in this design. The first join on the 2nd row is combining the aggregated data back to the original source of data so that we can sink all of our interesting columns of data. The 2nd Join is joining two different ADF Blob CSV datasets: the batting data with the player data. This is what the batting data looks like when the CSV is brought into ADF via ADF dataset. I’ve adjusted the data types accordingly for my calculations:


One last transformation on this Data Flow I want to point out is the Select transform on the top row. I’ve used the Select transform here to rename the original source dataset to “OrigSourceBatting” so that I know that is the native source raw data with no aggregates and no derived columns.

The final step in this Data Flow before the Sink is my “AddTimestamp” Derived Column transform. This is another common pattern in ETL data integration design. I land this as a column in my Sink dataset (Azure SQL DB) so that when the table is written, I can have a timestamp in each row to indicate to me the last time that it was updated.

I will switch-off the “Debug” mode from my Data Flow so that I no longer incur the cost of my Azure Databricks (this is the engine that Data Flow uses for data transformation) and I’ll now build a very simple new ADF Pipeline with just 1 single activity: Data Flow. In that Data Flow, I will point to my Azure Databricks job cluster and execute the Data Flow from my pipeline so that I can now load all of my data into the Azure SQL DB. While I was in Debug mode, I was only sampling 50 rows for testing my expressions. Also, debug mode does not sink the data into my Sink location.


Once that pipeline execution is complete, I can go into the Monitoring view on the ADF UI and check to see the stats from that Data Flow run. I’ll be able to view data partitioning, data skew, the time spent in each stage of data transformation, data lineage along with other row & column statistics:


Now that I have all of my basic stats calculated and landed in a database table, I can begin adding the advanced Sabermetric stats. For this example, let’s start with RunsCreated. This will be a very simple data flow because I’ve already created the aggregate buckets (player + year) with the basic stats. Now we’re just adding more calculations on a row-by-row basis. This means that I can use the same Azure SQL DB table that I created above as my Source and use a Derived Column transform to generate Runs Created:


Switch on Debug mode before entering into the above Expression Builder so that you can see the results of that Runs Created formula as you write it. This way, you can assure that you have the proper parentheses, operands, etc. Since I don’t have to aggregate across multiple rows again, I can use this non-blocking transformation and all fields from input schema will just naturally flow to my output.


That’s all it takes for this demo, so now I can execute the advanced data flow stats from a pipeline using Trigger Now in the Pipeline with a Data Flow activity:


I’m now landing my data in Azure SQL DW because I want to take advantage of SQL DW’s scaling and pausing features when I hook it up to my Power BI and reporting tools. Here’s what the final stats look like as I begin building my Sabermetrics dashboard:


That’s pretty much it for this demo of ADF’s new Data Flow feature (currently in limited preview). A few good practices contained in this sample that you should follow when using Data Flow for your ETL and data prep projects:

  1. Land your data in Blob store for transformation. Data Flow can read directly from ADLS, Blob, Azure SQL DB, and Azure SQL DW. But working with files in Blob store was the easiest.
  2. Switch on debug mode during your Data Flow designing to validate your designs and expressions. Don’t guess, then execute from your pipeline. You can always execute Data Flows from ADF pipelines in Pipeline Debug mode. But that requires you to switch back & forth between design screens, wasting time. Note that Debug mode is not yet enabled in Data Flow. That is lighting up very soon.
  3. Use New Branch when creating aggregates so that you can join your data back together after your aggregate. Make sure to attach a key field to your agg so that you have it available for the subsequent JOIN operation. Otherwise, sink your agg data on a separate path from the rest of your columns.
  4. Use a timestamp function in a Derived Column transform so you can tag your rows written with the latest timestamp.
  5. After you have completed testing your Data Flow, use the normal ADF Data Flow mechanisms to schedule your pipeline with data flows and monitor via the Monitoring UX.
  6. If you need to turn up the knob on parallelism and partitioning on Databricks, you can use the Optimize tab on the transformations. The skewness and kurtosis of data partitioning is presented to you in the monitoring UI. There is no need for you to dive into Spark, Databricks or any other cluster operation. You can design and optimize your Data Flow all from the ADF UI.