ADF Slowly Changing Dimension Type 2 with Mapping Data Flows (complete)

I have been putting together a series of posts and videos around building SCD Type 1 and Type 2 using Mapping Data Flows with Azure Data Factory. In this latest post, I’m going to walk through a complete end-to-end Type 2. I won’t be able to provide full detail here. Instead, I’m going to touch on the different pieces that you need from ADF to make it work and then I would suggest that you download the JSON for this data flow here and walk through it on your Data Factory.

Here are links to the other parts of this series:

This post is an expansion of the first intro to SCD post above. However, this time, I’m going to expand upon some of the more complex scenarios that you’ll find in dimension handling in ETL like keeping member history in your dimension table.

scdT1_001

Use Case

DimEmployees is a dimension in a data warehouse that analyzes projects. Attributes of those employee records will change occasionally and when they do, we want to track them by maintaining history, creating a new row with the new employee data (SCD Type 2).

Step 1: We need 2 sources. First is the incoming new employee records, which is this CSV file:

EmpID Region Status Function Level Role StartDate EndDate
1234 SER A ADM A Finance 1/1/2000
1345 SER A ADM A Finance 4/5/2008
1789 PNW A ENG N Engineer 7/9/2011
2349 PNW I ENG N Engineer 9/8/1999 4/1/2019
8382 NER A RAD A Marketing 4/5/1998

The 2nd source will be the existing DimEmployees table (DimEmp) in the existing data warehouse, which is in my Azure SQL Database:

dimemp

Basic SCD Type 2 Logic

  1. Lookup incoming new Employee records against existing records in the DimEmployee table
  2. If they are new employees, then create a new surrogate key and insert the row into the DimEmployee table
  3. If the Employee member already exists, then set the “iscurrent” flag to 0 and update the End Date and Status attributes in the existing tow
  4. Add a new row for the Employee with the new member attributes, set “iscurrent” to 1

The top row in the Data Flow is the “new employee” stream.

sctT1_new

New Rows

    1. The Employee new file source is set to delete upon completion and uses a wildcard path to find the latest CSVs in a folder
    2. sctT1_source1
    3. The Null Filter removes any extraneous rows from the source due to extra newlines using the Filter tranformation
    4. The TypeConservation Derived Column transformation norms the data types of the incoming CSV string-types to logical types and also sets an ETL processtime field to currentTimeStamp(). I use this in all of my ETL processes for tracking & logging.
    5. LookupIDs will find matching employees from the DimEmp source, matching on EmpId. This is the 2nd source:
    6. sctT1_lookup
    7. I also used a type conversion Derived Column to again norm data types by casting each field to ensure we are using the same logical types on this 2nd source from the database table.
    8. NormNames is a “Select” transformation which is used for aliasing, renaming, and column selectivity. I am removing any columns from the Lookup that I do not wish to flow through here as well as removing the FQNs, keeping simple names with no namespace.
    9. NewRow is a Conditional Split which is used from the results of the Lookup to decide if this is a new incoming employee. If the “iscurrent” field is NULL, then we know it is new because that column will only be present from an existing database row.
    10. Now that we know the empID is new, we can create a new row for Insert. The Sink will have “Allow Inserts” as the only database option for the DimEmp table and the SetAttrsForNew is a Derived Column that will set “iscurrent” to 1 and will generate a new Surrogate Key using this formula:
    11. toString(crc32(EmpID,EmpFunction))
    12. Notice that I did not use the Surrogate Key transformation. In this case, I am not seeding all new values. The SK in Data Flows acts as a sequence generator. In this case, I wish to use pure non-business keys that are not sequential.

Existing Rows

scdT1_updateWe’ll know that the incoming row is an update to an existing member because we found a value (not NULL, could be any value) from the Conditional Split in the “checkForUpdates” branch.

      1. NameNorm2 is another Select transform that again picks the columns we’re interested in and allows us to remove duplicate columns that originated from the Lookup
      2. CheckForChanges is an Exists transformation and is how we’re going to make a determination that there was a change in any of the existing member properties that we are interested in.
      3. I decided to only trigger an update to the table if we see a change in Region, Status, Role, Level, or End Date using this formula in Exists:
      4. NameNorm2@EmpID == TypeConversions@EmpID &&
        (
        NameNorm2@Region != DimEmployees@Region ||
        NameNorm2@Status != DimEmployees@Status ||
        NameNorm2@Level != DimEmployees@Level ||
        NameNorm2@Role != DimEmployees@Role ||
        NameNorm2@EndDate != DimEmployees@EndDate
        )
      5. If any rows have changes to those attributes, we’ll write a new row by setting “iscurrent” to 1 in the SetAttrUpdate Derived Column, marking this row as the active member
      6. On the “CheckForChanges” Exists transform, select “New Branch”. This will create a separate copy of that data stream so that we can use any matches from that Exists check to turn the existing rows to inactive.
      7. SetAttrsInactive is a Derived Column that sets “iscurrent” to 0, marking the existing member row as not current.
      8. I use a Select transformation called “InactiveFields” so that I choose only the columns that I wish to update to the existing, now inactive, version of the Employee member.
      9. Alter Row is added next as a transformation that will set my database policy on this stream to “Update”. The formula to update is simply “true()”. This allows us to update the existing member row.
      10. The Sink is set to only “allow updates” and the mapping only maps the fields that need to be updated to the existing dimension members:
      11. scdT1_sink

     

The complete JSON for this data flow is in my Github repo here.

I also recorded a video showing this SCD T2 Data Flow in action here.

Advertisements

Partition Large Files with ADF using Mapping Data Flows

A very common practice when designing Big Data ETL and Analytics solutions in the Cloud is to find creative ways to work with very large data files. Of course, Data Engineers who are working primarily on-prem also face challenges processing very large files. The added challenge with hybrid and Cloud scenarios is that you also have to build architectures with bandwidth and utility billing constraints in mind.

Big Data platforms like Spark and Hadoop provide a very natural fit for handling large file processing by leveraging distributed file systems. This allows partitioned data files across worker nodes to process data locally on each node to divide & conquer these workloads.

ADF’s Mapping Data Flow feature is built upon Spark in the Cloud, so the fundamental steps in large file processing are also available to you as an ADF user. This means that you can use Data Flows to perform the very common requirement of splitting your large file across partitioned files so that you can process and move the file in pieces.

To accomplish this in ADF Data Flows:

  1. Create a new Data Flow
    1. You are going to create a very simple Data Flow just to leverage file partitioning. There will not be any column or row transformations. Just a Source and a Sink that will take a large file and produce smaller part files.filesplit1
  2. Add a Source file
    1. For this demo, I am using the Kaggle public loans data CSV with >800k records. You do not need to set any schema or projections because we are not working with data at the column level here.
    2. Make sure to turn on the Data Flow Debug switch at the top of the UI browser to warm up your cluster to execute this data flow later
  3. Add a Sink folder
    1. For the Sink dataset, choose the type of output files you would like to produce. I’m going to start with my large CSV and produce partitioned CSV files, so I’m using a Delimited Text dataset. Note in the dataset file path I am typing in a new folder name “output/parts”. ADF will use this to generate a new folder called “parts” when this data flow executes that will be created in my existing “output” folder in Blob.filesplit4
    2. In the Sink, define the partitioning
    3. filesplit2
    4. This is where you will define how you would like the partitioned files to be generated. I’m asking for 20 equal distributions using a simple Round Robin technique. I have also set the output file names using the “pattern” option. “loans[n].csv” will produce new part files names loans1.csv, loans2.csv … loans20.csv.
    5. filesplit3
    6. Notice I’ve also set “Clear the folder”. This will ask ADF to wipe the contents of the destination folder clean before loading new part files
  4. Save your data flow and create a new pipeline
  5. Add an Execute Data Flow activity and select your new file split data flow
  6. Execute the pipeline using the pipeline debug button
    1. You must execute data flows from a pipeline in order to generate file output. Debugging from Data Flow does not write any data.
  7. After execution, you should now see 20 files that resulted from round robin partitioning of your large source file. You’re done:
    1. filesplit6
  8. In the output of your pipeline debug run, you’ll see the execution results of the data flow activity. Click on eyeglasses icon to show the details of your data flow execution. You’ll see the statistics of the distribution of records in your partitioned files:
    1. filesplit5

 

Azure Data Factory: Build U-SQL Tweet Analysis with ADF Data Flows

One of the most commonly used execution environments for Big Data transformations in ADF is Azure Data Lake Analytics (ADLA) using U-SQL scripts to transform data at scale. The new ADF Data Flow feature allows you to build data transformations at scale without needing to know any scripting or coding environments. You won’t need to learn and understand the details of the execution environment or big data cluster maintenance. Instead, you can remain within the ETL data integration environment to build both your workflow pipelines as well as your data transformation code-free inside the ADF visual UI.

For this example, I’ll rebuild the Tweet Analysis U-SQL demo here from Michael Rys that demonstrates some of the analytical capabilities of ADLA and U-SQL. To build this in ADF, I’ll use visual data flow capabilities against the MikeDoesBigDataTweets.csv file in this folder. I added a few additional new Tweets to the CSV to update the contents a bit using my own KromerBigData handle using http://tweetdownload.net/.

Step 1: Define the schema and count the number of Tweets grouped by Author

U-SQL: Simple Analysis


@t =
EXTRACT date string,
time string,
author string,
tweet string
FROM "/Samples/Data/Tweets/MikeDoesBigDataTweets.csv"
USING Extractors.Csv();


@res =
SELECT author,
COUNT( * ) AS
FROM @t
GROUP BY author;

OUTPUT @res
TO "/Output/TweetAnalysis/MyTwitterAnalysis1.csv"
ORDER BY DESC
USING Outputters.Csv();

In ADF Data Flow, I’ll accomplish this with a Source Transform that defines the schema and then an Aggregate transform for the count. Notice that I put the Aggregate in a separate order than the U-SQL sample. There is no need to recreate scripts or programs in exact order when building a data flow. Also, notice that I grouped both by author & category. I took some liberties to change the scenario slightly. You can see that I am setting my Data Flow to “Debug” mode so that I can see the Data Preview as I build the flow.

usql1

usql4

usql3

Step 2: Categorize Tweets

U-SQL: Extract Mentions

@m = SELECT m.Substring(1) AS m
, "mention" AS category
FROM @m CROSS APPLY EXPLODE(mentions) AS t(m)
WHERE m != "@";
@t =
SELECT author, "author" AS category
FROM @t
UNION ALL
SELECT *
FROM @m;
@res = SELECT author.ToLowerInvariant() AS author
, category
, COUNT( * ) AS tweetcount
FROM @t
GROUP BY author.ToLowerInvariant(), category;
OUTPUT @res
TO "/Output/TweetAnalysis/MyTwitterAnalysis2.csv"
ORDER BY tweetcount DESC
USING Outputters.Csv();

In ADF Data Flow, I’m going to categorize the Tweets by Mentions and Authors using an IIF statement based upon the first character of the Tweet Text. Any aggregation like Count, is done in the Aggregate Transform, which is why the sequencing is different when converting from a scripting environment.

usql2

Step 3: Rank Tweets

U-SQL:  Rank Tweets with Window Functions

@res =
SELECT DISTINCT
author, category, tweetcount
, PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY tweetcount ASC)
OVER (PARTITION BY category) AS median_tweetcount_perhandle_category
, PERCENT_RANK() OVER
(PARTITION BY category ORDER BY tweetcount ASC) AS relative_rank
, ROW_NUMBER() OVER
(PARTITION BY category ORDER BY tweetcount DESC) AS absolute_rank
FROM TweetAuthorsAndMentions
WHERE tweetcount >= 50;
OUTPUT @res
TO "/Output/TweetAnalysis/MyTwitterAnalysis6.csv"
ORDER BY absolute_rank, category ASC
USING Outputters.Csv();

 

In ADF Data Flow, you’ll use a Window Transformation with the Rank function and partition the data using the Over and Range settings in the transformation. You can see in the debug data previews below that the Rank function has partitioned the Tweets both by the Category and the Author as I had set in my Aggregation previously.

usql5

usql6

Step 4: Serialize output results to Lake

In these U-SQL samples, the scripts are divided up by function, so the results are serialized back in the lake at the end of each step. In ADF Data Flow, to serialize your results, use a Sink. To keep the data in the lake, just use an ADLS or Azure Blob Store dataset to land the results in files. If you’d like to serialize your results in a database, you can use Azure SQL DB or Azure SQL DW datasets.

usql7

ADF Data Flows: Databricks Notebook ETL vs. ADF Visual Data Flow ETL

The new (preview) feature in Azure Data Factory called Data Flows, allows you to visually design, build, debug, and execute data transformations at scale on Spark by leveraging Azure Databricks clusters. You can then operationalize your data flows inside a general ADF pipeline with scheduling, triggers, monitoring, etc.

ADF Data Flows (preview) provides a visually-oriented design paradigm meant for code-free data transformation. You can also use ADF to execute code in Databricks, if you prefer to write code, using Databricks Notebooks, Python, JARs, etc. using the ADF pipeline activities.

What I wanted to do in this post was to demonstrate how to perform the same functions of a Databricks Notebook example using ADF Data Flows. In this demo, I’m going to use the European Football statistics sample from Kaggle (aka “Soccer stats”) that is used in this Databricks sample Notebook: Analyze Games from European Soccer Leagues with Apache Spark and Databricks. I am going to recreate the ETL Data Engineering steps with ADF. The completed JSON for this Data Flow is here.

Step 1:

Extract data from source CSV into DataFrames. In ADF, you will build a new Data Flow and use a Source transformation that points to that CSV file. Turn debug on so that you can see the same data profile stats and data preview as the Notebooks demo illustrates.

From Databricks:

from pyspark.sql.types import *

schema = (StructType().
          add("id_odsp", StringType()).add("id_event", StringType()).add("sort_order", IntegerType()).
          add("time", IntegerType()).add("text", StringType()).add("event_type", IntegerType()).
          add("event_type2", IntegerType()).add("side", IntegerType()).add("event_team", StringType()).
          add("opponent", StringType()).add("player", StringType()).add("player2", StringType()).
          add("player_in", StringType()).add("player_out", StringType()).add("shot_place", IntegerType()).
          add("shot_outcome", IntegerType()).add("is_goal", IntegerType()).add("location", IntegerType()).
          add("bodypart", IntegerType()).add("assist_method", IntegerType()).add("situation", IntegerType()).
          add("fast_break", IntegerType())
         )

eventsDf = (spark.read.csv("/data/eu-soccer-events/input/events.csv", 
                         schema=schema, header=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True,
                         nullValue='NA'))

eventsDf = eventsDf.na.fill({'player': 'NA', 'event_team': 'NA', 'opponent': 'NA', 
                             'event_type': 99, 'event_type2': 99, 'shot_place': 99, 
                             'shot_outcome': 99, 'location': 99, 'bodypart': 99, 
                             'assist_method': 99, 'situation': 99})
display(eventsDf)

image8-2-1024x393

From Kaggle:

kaggle

ADF Data Flows (Source Transform):

soccer1

Step 2:

Transform Data by Mapping Names to Codes. The Kaggle data has codified fields which need to be mapped to names for analysis.

Databricks:


eventsDf = (
eventsDf.
withColumn("event_type_str", mapKeyToVal(evtTypeMap)("event_type")).
withColumn("event_type2_str", mapKeyToVal(evtTyp2Map)("event_type2")).
withColumn("side_str", mapKeyToVal(sideMap)("side")).
withColumn("shot_place_str", mapKeyToVal(shotPlaceMap)("shot_place")).
withColumn("shot_outcome_str", mapKeyToVal(shotOutcomeMap)("shot_outcome")).
withColumn("location_str", mapKeyToVal(locationMap)("location")).
withColumn("bodypart_str", mapKeyToVal(bodyPartMap)("bodypart")).
withColumn("assist_method_str", mapKeyToVal(assistMethodMap)("assist_method")).
withColumn("situation_str", mapKeyToVal(situationMap)("situation"))
)

joinedDf = (
eventsDf.join(gameInfDf, eventsDf.id_odsp == gameInfDf.id_odsp, ‘inner’).
select(eventsDf.id_odsp, eventsDf.id_event, eventsDf.sort_order, eventsDf.time, eventsDf.event_type, eventsDf.event_type_str, eventsDf.event_type2, eventsDf.event_type2_str, eventsDf.side, eventsDf.side_str, eventsDf.event_team, eventsDf.opponent, eventsDf.player, eventsDf.player2, eventsDf.player_in, eventsDf.player_out, eventsDf.shot_place, eventsDf.shot_place_str, eventsDf.shot_outcome, eventsDf.shot_outcome_str, eventsDf.is_goal, eventsDf.location, eventsDf.location_str, eventsDf.bodypart, eventsDf.bodypart_str, eventsDf.assist_method, eventsDf.assist_method_str, eventsDf.situation, eventsDf.situation_str, gameInfDf.country_code)
)

ADF: Use IIF or Case in Expression Builder, or use Lookup with mapped files. Derived column with Case expression:

soccer4

Lookup transformation using a file with mapped values. In this case, we won’t hard-code names in the expression. Instead, we’ll use an external file with the mapped names and join it via Lookup.

soccer3

Step 3: 

Bucketize data based on match time with 10 bins

Databricks:


from pyspark.ml.feature import QuantileDiscretizer

joinedDf = QuantileDiscretizer(numBuckets=10, inputCol=”time”, outputCol=”time_bin”).fit(joinedDf).transform(joinedDf)

display(joinedDf)

ADF Data Flow: Window transformation using nTile (10)

soccer6

Step 4: Load Data into Database as Final Step

Databricks:


%sql
CREATE DATABASE IF NOT EXISTS EURO_SOCCER_DB
LOCATION "dbfs:/FileStore/databricks-abhinav/eu-soccer-events/interm"


%sql USE EURO_SOCCER_DB


joinedDf.write.saveAsTable("GAME_EVENTS", format = "parquet", mode = "overwrite", partitionBy = "COUNTRY_CODE", path = "dbfs:/FileStore/databricks-abhinav/eu-soccer-events/interm/tr-events")

ADF: Sink Transformation

soccer5

ADF Data Flows: Self-Join

Data Flows in ADF (Preview) allow you to build visual data transformation routines that ADF will compile and execute as optimized scale-out operations on Azure Databricks clusters. In this quick post, I want to talk a bit about a pattern that you’ll use from time-to-time in ADF Data Flows: Self Joins.

In SQL database parlance, a self-join is a query where both the left and right side tables are the same. In ADF Data Flow, you can achieve this through a combination of the Join transformation and the Select transformation. In both instances, the Join operation will require you to alias one of the relationships.

The Join transform will allow you to join 2 Data Flow streams (can be a source or any other transform step in your data flow) and the Select transform will be used for aliasing.

Here is what a very simple example would look like when I am aggregating data in my data flow. In this case, I am creating an aggregation called “states” in Aggregate1 that is grouped by AddressID. AddressID is a unique key field, but I am using in my group-by in the Aggregate so that I can use it in my self-join next. Aggregate transforms in ADF Data Flow will only output the columns used in the aggregation. I.e. only the fields used in group-by and the aggregated fields will be passed on to the next transformation in your data flow.

This is why self-joins are very important with flows that use aggregators. If you wish to include the previous columns in your flow, use a New Branch from the previous step and use the self-join pattern to connect the flow with the original metadata. The new branch is created by clicking the “+” on the Addresses source transform:

sj1

The New Branch duplicates the stream, so both Address streams are identical. To enable a Self-Join with the Join transform at the end, I need to alias the ID column. Do this with the Select transform, called “AddressesOrig” in the diagram (I simply added “_orig” to differentiate):

sj2

As stated above, I want to pass-through the AddressID for my join on the top stream, so I included AddressID in the Group-by in my Aggregator even though it will not result in any actual grouping:

sj3

Now, I can join the original set of column metadata back together with my new aggregation which I called “states”. I’ll do this with a Join transform. Note that I see both the AddressID from the Aggregate and the AddressID_orig from the branch with the Select alias:

sj4

And to show that my self-join is working, you’ll see that my data flow now has all of the data combined with my new “states” aggregation:

sj5

If you’d like to get started building Data Flows in ADF, please submit a request for whitelisting your Azure Subscription to enable ADF Data Flow factories: http://aka.ms/dataflowpreview.

 

Azure Data Factory Data Flow: Building Slowly Changing Dimensions

Here is a quick walk-through on how to use Azure Data Factory’s new Data Flow feature (limited preview) to build Slowly Changing Dimension (SCD) ETL patterns.

The Data Flow feature in ADF is currently in limited preview. If you would like to try this out on your Data Factories, please fill out this form to request whitelisting your Azure Subscription for ADF Data Flows: http://aka.ms/dataflowpreview. Once your subscription has been enabled, you will see “Data Factory V2 (with data flows)” as an option from the Azure Portal when creating Data Factories.

I’m going to start super-simple by building just the path in my data flow for an SCD Type 2 in the instance where the dimension member does not already exist in the target Azure SQL DW. In ADF Data Flow, you can have multiple paths in your data flows (called streams) that enable you to perform different data transformations on different data sources in a single Data Flow.

In this case, I will look-up from the DW to see if the incoming product already exists in the DW dimension table. If it does, I’ll branch off and update the row with the new attributes (I’ll blog that path later). For this blog, we’ll complete the path to create a new surrogate key for the row and clean up some of the attributes before loading the new row into the target DW dimension table.

scd1

The first thing I’ll do is to add 2 sources, 1 for the Azure SQL OLTP dataset which has products that were sold today. The 2nd source is a query from the existing DW DimProducts table which has existing unique rows from the products dimension table. Because I am demonstrating a Type 2 SCD, the active dimension members can be signified with NULL as the End Date or ‘Y’ in the Status column to indicate that is the row to use in calculations. To use this query for the 2nd source at the bottom of the diagram, I created a SQL View with the proper query predicate and used the view as my source in the ADF Dataset. However, I could have also used Data Flow directly as the query predicate using the Filter transform and an expression of isNull(EndDate) or EndDate > addMonths(currentDate, 999), depending on the method that you choose to utilize for your SCD management in your dimension tables.

scd7.png

ADF Data Flow is smart enough to take your end-to-end flows and optimize the execution utilizing pushdown techniques when available. So, using a Filter transform against what appears like a complete table scan in the design view may not actually execute as such when you attach your Data Flow to a pipeline. It is best to experiment with different techniques and use the Monitoring in ADF to gather timings and partition counts on your Data Flow activity executions.

There are several different ways to proceed from here. In this demo, I used a Left Outer Join so that I can get all of the new products and all of the existing products in a single query, then I split the flow with a Conditional Split based on whether the surrogate key (stored in ProductAlternateKey) was set.

scd2

If the OLTP source has this value in this sample, then I can drop to the bottom stream in the Conditional Split. I did not yet build out this flow, but what I will do here is to look to see if the stored attributes in the DimProduct table are updated. If I make a determination that the dimension row requires a new row, then I’ll add that row to the DW in the bottom stream.

Using the Lookup transform within ADF Data Flow is also a very common mechanism to use to lookup reference values that can be leveraged in this case. You can utilize this pattern for SCD Type 2 when you want to match the DW member values incoming values to update the table when an attribute changes.

Real quick, I want to point out the Select transform immediately following my Join. This is an important concept in ADF Data Flow. All of your columns will automatically propagate throughout your streams. As your columns accumulate, you can use the Select transform to “select” the columns that you wish to keep. You will see stream names appended to the column names to indicate the origins and lineage of those columns to help you make the proper determination.

scd9

While you can always choose column selectivity in the Sink transform at the end of your Data Flow, maintaining column hygiene may help you to prune your column lists. The downside to this approach is that you will lose that metadata downstream and will not be able to access it once you’ve dropped it from your metadata with a Select.

Now, we need to add a surrogate key. Use the Surrogate Key transform to generate a unique key that will be stored, in this case, in ProductAlternateKey as a way to provide a key that is used for inner joins in the DW star schema and for our BI/analytical solutions. This is a key that is not a business key and is not part of the OLTP system. This is specific to the data warehouse in joining facts to dimensions.

scd5

The ADF Data Flow surrogate transform will provide an incrementing value, so I’m going to prepend some text to the beginning of that value to demonstrate how to give your own flavor to the key. I decided to do this all in one Derived Column transform that I called “SetAttributes” below:scd6

You can always create separate Derived Column transforms for these formulas, but I just found it easier to build all of my expressions in one transform for this sample.

The other formulas are setting up attributes that I’ll use to hold some stored data for publishing into the SQL DW DimProd table. I’m using ‘Y’ for active status of the dimension member, setting a StartDate for the row, EndDate in the future, empty strings instead of NULLs and setting a bit to use because my target schema requires it.

Now all you have to do is sink your fields in your Azure SQL DW with a Sink transform and you’re SCD Type 2 data flow will be complete. In order to execute this flow on a schedule, create a new ADF Pipeline and add a Data Flow activity that points to this SCD Data Flow. You’ll be able to use the full range of ADF schedules and monitoring to operationalize your data warehouse loading in ADF.

Azure Data Factory Data Flow Preview: Sabermetrics Example

Let’s walk through an end-to-end sample scenario that utilizes the new Azure Data Factory Data Flow feature. Some of the patterns that I’ll demonstrate here are very common in ETL data integration projects, which is the target use case for ADF Data Flow.

Data Flow in ADF is current in limited preview and available only via whitelisting of your Azure Subscription. You can submit your request for access by filling out this form. All associated online documentation, videos and getting started guides are available here.

Bill James is known as the father of baseball statistics and considered a genius for those of us who love baseball stats and his Sabermetrics methodology of analyzing baseball through advanced statistics. So, rather than use a typical SQL Server or SSIS example ETL data integration project around books, bikes or toys samples, I thought I’d build a sample data flow using baseball statistics. The raw data that I used here is available for download from a number of different sites. I used the stats CSVs that cover up to the 2017 season from Chadwick Baseball Bureau:

bb_excel1.png

I first created one data flow that used the raw at-bats, walks, hits, HRs, etc. to calculate basic stats batting average (BA), on-base % (OBP), Slugging (SLG) and total bases (TB). I called this Data Flow “Baseball 1 Basic Stats”. I’ll show you how I used Aggregation transforms for grouping to combine the players by player ID and year:

bb_basic1

There are 2 source files. The raw batting stats from the above CSV screenshot and a CSV that contains details about each player. In ADF Data Flow, sources are all on the left-hand side of the design surface with vertical bars. The Sinks are your destination outputs with vertical bars on the right-hand side of your flows. In this case, I join the 2 sources of data together using a composite key that I created using ADF Data Flow’s expression language so that I can ensure that I am grouping players by name and year. You can see that key in the Join Condition below:

bb_join1.png

My sink destination is an Azure SQL Database where I generate a new schema on the fly. I do not need to define that target table schema. Data Flow will create the SQL tables for me:

bb_ssms1.png

To create calculations in ADF Data Flow, you will use transformations like Aggregate and Derived Column. In ETL patterns, these transformations will be very commonly used. Notice in the Data Flow diagram at the top of this blog post, there is a straight vertical line coming down from the first source. That is using the “New Branch” transform which tells Data Flow to duplicate the data stream. You will do this often in data transformation routines that use aggregates. When you aggregate data, the output schema will only contain the fields that are used in your group-by clause and your aggregate functions. So I create a separate branch just for my aggregations and I’ll join this back together with the rest of the fields that I wish to sink in the end. You can always see what metadata is coming in and metadata coming out of each transform by looking at the Inspect tab, which also shows data lineage for how you’ve derived your column values:

bb_inspect1

Also, on the Inspect tab, you can view a preview of your data as you are building your transformation. You do this by first clicking the Debug button on the top left of the Data Flow design window. Debug mode will allow you to work in a data prep environment where you can view the data as it is being transformed throughout each step in order to validate your transformation routine. I can set this to sample 50 rows out of the 100k+ rows in my dataset. It also will not require me to write the data into my Azure SQL DB:

bb_debug1.png

You will see two Joins in this design. The first join on the 2nd row is combining the aggregated data back to the original source of data so that we can sink all of our interesting columns of data. The 2nd Join is joining two different ADF Blob CSV datasets: the batting data with the player data. This is what the batting data looks like when the CSV is brought into ADF via ADF dataset. I’ve adjusted the data types accordingly for my calculations:

bb_source1

One last transformation on this Data Flow I want to point out is the Select transform on the top row. I’ve used the Select transform here to rename the original source dataset to “OrigSourceBatting” so that I know that is the native source raw data with no aggregates and no derived columns.

The final step in this Data Flow before the Sink is my “AddTimestamp” Derived Column transform. This is another common pattern in ETL data integration design. I land this as a column in my Sink dataset (Azure SQL DB) so that when the table is written, I can have a timestamp in each row to indicate to me the last time that it was updated.

I will switch-off the “Debug” mode from my Data Flow so that I no longer incur the cost of my Azure Databricks (this is the engine that Data Flow uses for data transformation) and I’ll now build a very simple new ADF Pipeline with just 1 single activity: Data Flow. In that Data Flow, I will point to my Azure Databricks job cluster and execute the Data Flow from my pipeline so that I can now load all of my data into the Azure SQL DB. While I was in Debug mode, I was only sampling 50 rows for testing my expressions. Also, debug mode does not sink the data into my Sink location.

bb_pipe1

Once that pipeline execution is complete, I can go into the Monitoring view on the ADF UI and check to see the stats from that Data Flow run. I’ll be able to view data partitioning, data skew, the time spent in each stage of data transformation, data lineage along with other row & column statistics:

bb_mon1.png

Now that I have all of my basic stats calculated and landed in a database table, I can begin adding the advanced Sabermetric stats. For this example, let’s start with RunsCreated. This will be a very simple data flow because I’ve already created the aggregate buckets (player + year) with the basic stats. Now we’re just adding more calculations on a row-by-row basis. This means that I can use the same Azure SQL DB table that I created above as my Source and use a Derived Column transform to generate Runs Created:

bb_runscreated1

Switch on Debug mode before entering into the above Expression Builder so that you can see the results of that Runs Created formula as you write it. This way, you can assure that you have the proper parentheses, operands, etc. Since I don’t have to aggregate across multiple rows again, I can use this non-blocking transformation and all fields from input schema will just naturally flow to my output.

bb_adv1

That’s all it takes for this demo, so now I can execute the advanced data flow stats from a pipeline using Trigger Now in the Pipeline with a Data Flow activity:

bb_pipe2

I’m now landing my data in Azure SQL DW because I want to take advantage of SQL DW’s scaling and pausing features when I hook it up to my Power BI and reporting tools. Here’s what the final stats look like as I begin building my Sabermetrics dashboard:

bb_ssms2.png

That’s pretty much it for this demo of ADF’s new Data Flow feature (currently in limited preview). A few good practices contained in this sample that you should follow when using Data Flow for your ETL and data prep projects:

  1. Land your data in Blob store for transformation. Data Flow can read directly from ADLS, Blob, Azure SQL DB, and Azure SQL DW. But working with files in Blob store was the easiest.
  2. Switch on debug mode during your Data Flow designing to validate your designs and expressions. Don’t guess, then execute from your pipeline. You can always execute Data Flows from ADF pipelines in Pipeline Debug mode. But that requires you to switch back & forth between design screens, wasting time. Note that Debug mode is not yet enabled in Data Flow. That is lighting up very soon.
  3. Use New Branch when creating aggregates so that you can join your data back together after your aggregate. Make sure to attach a key field to your agg so that you have it available for the subsequent JOIN operation. Otherwise, sink your agg data on a separate path from the rest of your columns.
  4. Use a timestamp function in a Derived Column transform so you can tag your rows written with the latest timestamp.
  5. After you have completed testing your Data Flow, use the normal ADF Data Flow mechanisms to schedule your pipeline with data flows and monitor via the Monitoring UX.
  6. If you need to turn up the knob on parallelism and partitioning on Databricks, you can use the Optimize tab on the transformations. The skewness and kurtosis of data partitioning is presented to you in the monitoring UI. There is no need for you to dive into Spark, Databricks or any other cluster operation. You can design and optimize your Data Flow all from the ADF UI.

Azure Data Factory: Delete Files From Azure Data Lake Store (ADLS)

In a previous post over at Kromer Big Data, I posted examples of deleting files from Azure Blob Storage and Table Storage as part of your ETL pipeline using Azure Data Factory (ADF). In those examples, I built a small, quick Logic App that used the Azure Storage APIs to delete data. In those post, I’m going to demonstrate how to remove files from Azure Data Lake Store (ADLS). For this demo, we’ll use ADF’s V2 service.

Deleting / removing files after they’ve been processed is a very common task in ETL Data Integration routines. Here’s how to do that for Azure Data Lake Store files in ADF:

adfweb

  1. Start by creating a new Data Factory from Azure
  2. Click “Author & Monitor” from your factory in order to launch the ADF UI.
  3. Create a new pipeline and add a single Web Activity.
  4. Switch to the “Settings” tab on the properties pane at the bottom of the pipeline builder UI.
  5. The URL in the Web Activity will need to be the URI pointer the ADLS file you wish to delete
    • https://<yourstorename>.azuredatalakestore.net/webhdfs/v1/mytempdir/myinputfile1.txt?op=DELETE
  6. The URL above (i.e. file names, folder names) can be parameterized. Click the “Add Dynamic Content” link when editing the URL text box.
  7. Set the Web Activity “Method” to “DELETE”.
  8. For authentication, you will need to have an access token. You can use this method to produce one:
  9. The access token returned will need to be captured and used in the Web Activity header as such:
    • Header = "Authorization"  Expression = "Bearer <ACCESS TOKEN>"
  10. You can now validate and test run your pipeline with the Web Activity. Click the “Debug” button to give it a try.

 

 

New Azure Data Factory End-to-End Lab

Just a few weeks ago, we announced the public preview of the new browser-based UI for Azure Data Factory. See Gaurav’s blog here detailing the release. To help you understand how to build complex data integration projects in the ADF Visual Design interface, we’ve partnered with Pragmatic Works, who have been long-time experts in the Microsoft data integration and ETL space, to create a new set of hands-on labs that you can now use to learn how to build those DI patterns using ADF V2.

That link points to a GitHub repo for the lab, where you will find data files and scripts in the Deployment folder. There are lab manual folders for each Lab and overview presentations, shown below for more details. You will also find a series of PowerShell and DB scripts as well as ARM templates that will generate resource groups that the labs need in order for you to successfully build out an end-to-end scenario with sample data that you can use for Power BI reports in the final Lab 9. Here is how the individual labs are divided:

  • Lab 1 – Setting up ADF and Resources, Start here to get all of the ARM resource groups and database backup files loaded properly.
  • Lab 2 – Lift and Shift of SSIS to Azure, Go to this lab if you have existing SSIS packages on-prem that you’d like to migrate directly to the cloud using the ADF SSIS-IR capability.
  • Lab 3 – Rebuilding an existing SSIS job as an ADF pipeline.
  • Lab 4 – Take the new ADF pipeline and enhance it with data from Cloud Sources.
  • Lab 5 – Modernize the DW pipeline by transforming Big Data with HDInsight.
  • Lab 6 – Go to this lab to learn how to create copy workflows in ADF into Azure SQL Data Warehouse.
  • Lab 7 – Build a trigger-based schedule for your new ADF pipeline.
  • Lab 8 – You’ve operationalized your pipeline based on a schedule. Now learn how to monitor and manage that DI process.
  • Lab 9 – Bringing it all Together