ADF Slowly Changing Dimension Type 2 with Mapping Data Flows (complete)

I have been putting together a series of posts and videos around building SCD Type 1 and Type 2 using Mapping Data Flows with Azure Data Factory. In this latest post, I’m going to walk through a complete end-to-end Type 2. I won’t be able to provide full detail here. Instead, I’m going to touch on the different pieces that you need from ADF to make it work and then I would suggest that you download the JSON for this data flow here and walk through it on your Data Factory.

Here are links to the other parts of this series:

This post is an expansion of the first intro to SCD post above. However, this time, I’m going to expand upon some of the more complex scenarios that you’ll find in dimension handling in ETL like keeping member history in your dimension table.

scdT1_001

Use Case

DimEmployees is a dimension in a data warehouse that analyzes projects. Attributes of those employee records will change occasionally and when they do, we want to track them by maintaining history, creating a new row with the new employee data (SCD Type 2).

Step 1: We need 2 sources. First is the incoming new employee records, which is this CSV file:

EmpID Region Status Function Level Role StartDate EndDate
1234 SER A ADM A Finance 1/1/2000
1345 SER A ADM A Finance 4/5/2008
1789 PNW A ENG N Engineer 7/9/2011
2349 PNW I ENG N Engineer 9/8/1999 4/1/2019
8382 NER A RAD A Marketing 4/5/1998

The 2nd source will be the existing DimEmployees table (DimEmp) in the existing data warehouse, which is in my Azure SQL Database:

dimemp

Basic SCD Type 2 Logic

  1. Lookup incoming new Employee records against existing records in the DimEmployee table
  2. If they are new employees, then create a new surrogate key and insert the row into the DimEmployee table
  3. If the Employee member already exists, then set the “iscurrent” flag to 0 and update the End Date and Status attributes in the existing tow
  4. Add a new row for the Employee with the new member attributes, set “iscurrent” to 1

The top row in the Data Flow is the “new employee” stream.

sctT1_new

New Rows

    1. The Employee new file source is set to delete upon completion and uses a wildcard path to find the latest CSVs in a folder
    2. sctT1_source1
    3. The Null Filter removes any extraneous rows from the source due to extra newlines using the Filter tranformation
    4. The TypeConservation Derived Column transformation norms the data types of the incoming CSV string-types to logical types and also sets an ETL processtime field to currentTimeStamp(). I use this in all of my ETL processes for tracking & logging.
    5. LookupIDs will find matching employees from the DimEmp source, matching on EmpId. This is the 2nd source:
    6. sctT1_lookup
    7. I also used a type conversion Derived Column to again norm data types by casting each field to ensure we are using the same logical types on this 2nd source from the database table.
    8. NormNames is a “Select” transformation which is used for aliasing, renaming, and column selectivity. I am removing any columns from the Lookup that I do not wish to flow through here as well as removing the FQNs, keeping simple names with no namespace.
    9. NewRow is a Conditional Split which is used from the results of the Lookup to decide if this is a new incoming employee. If the “iscurrent” field is NULL, then we know it is new because that column will only be present from an existing database row.
    10. Now that we know the empID is new, we can create a new row for Insert. The Sink will have “Allow Inserts” as the only database option for the DimEmp table and the SetAttrsForNew is a Derived Column that will set “iscurrent” to 1 and will generate a new Surrogate Key using this formula:
    11. toString(crc32(EmpID,EmpFunction))
    12. Notice that I did not use the Surrogate Key transformation. In this case, I am not seeding all new values. The SK in Data Flows acts as a sequence generator. In this case, I wish to use pure non-business keys that are not sequential.

Existing Rows

scdT1_updateWe’ll know that the incoming row is an update to an existing member because we found a value (not NULL, could be any value) from the Conditional Split in the “checkForUpdates” branch.

      1. NameNorm2 is another Select transform that again picks the columns we’re interested in and allows us to remove duplicate columns that originated from the Lookup
      2. CheckForChanges is an Exists transformation and is how we’re going to make a determination that there was a change in any of the existing member properties that we are interested in.
      3. I decided to only trigger an update to the table if we see a change in Region, Status, Role, Level, or End Date using this formula in Exists:
      4. NameNorm2@EmpID == TypeConversions@EmpID &&
        (
        NameNorm2@Region != DimEmployees@Region ||
        NameNorm2@Status != DimEmployees@Status ||
        NameNorm2@Level != DimEmployees@Level ||
        NameNorm2@Role != DimEmployees@Role ||
        NameNorm2@EndDate != DimEmployees@EndDate
        )
      5. If any rows have changes to those attributes, we’ll write a new row by setting “iscurrent” to 1 in the SetAttrUpdate Derived Column, marking this row as the active member
      6. On the “CheckForChanges” Exists transform, select “New Branch”. This will create a separate copy of that data stream so that we can use any matches from that Exists check to turn the existing rows to inactive.
      7. SetAttrsInactive is a Derived Column that sets “iscurrent” to 0, marking the existing member row as not current.
      8. I use a Select transformation called “InactiveFields” so that I choose only the columns that I wish to update to the existing, now inactive, version of the Employee member.
      9. Alter Row is added next as a transformation that will set my database policy on this stream to “Update”. The formula to update is simply “true()”. This allows us to update the existing member row.
      10. The Sink is set to only “allow updates” and the mapping only maps the fields that need to be updated to the existing dimension members:
      11. scdT1_sink

     

The complete JSON for this data flow is in my Github repo here.

I also recorded a video showing this SCD T2 Data Flow in action here.

Advertisements

ADF Data Flows: Self-Join

Data Flows in ADF (Preview) allow you to build visual data transformation routines that ADF will compile and execute as optimized scale-out operations on Azure Databricks clusters. In this quick post, I want to talk a bit about a pattern that you’ll use from time-to-time in ADF Data Flows: Self Joins.

In SQL database parlance, a self-join is a query where both the left and right side tables are the same. In ADF Data Flow, you can achieve this through a combination of the Join transformation and the Select transformation. In both instances, the Join operation will require you to alias one of the relationships.

The Join transform will allow you to join 2 Data Flow streams (can be a source or any other transform step in your data flow) and the Select transform will be used for aliasing.

Here is what a very simple example would look like when I am aggregating data in my data flow. In this case, I am creating an aggregation called “states” in Aggregate1 that is grouped by AddressID. AddressID is a unique key field, but I am using in my group-by in the Aggregate so that I can use it in my self-join next. Aggregate transforms in ADF Data Flow will only output the columns used in the aggregation. I.e. only the fields used in group-by and the aggregated fields will be passed on to the next transformation in your data flow.

This is why self-joins are very important with flows that use aggregators. If you wish to include the previous columns in your flow, use a New Branch from the previous step and use the self-join pattern to connect the flow with the original metadata. The new branch is created by clicking the “+” on the Addresses source transform:

sj1

The New Branch duplicates the stream, so both Address streams are identical. To enable a Self-Join with the Join transform at the end, I need to alias the ID column. Do this with the Select transform, called “AddressesOrig” in the diagram (I simply added “_orig” to differentiate):

sj2

As stated above, I want to pass-through the AddressID for my join on the top stream, so I included AddressID in the Group-by in my Aggregator even though it will not result in any actual grouping:

sj3

Now, I can join the original set of column metadata back together with my new aggregation which I called “states”. I’ll do this with a Join transform. Note that I see both the AddressID from the Aggregate and the AddressID_orig from the branch with the Select alias:

sj4

And to show that my self-join is working, you’ll see that my data flow now has all of the data combined with my new “states” aggregation:

sj5

If you’d like to get started building Data Flows in ADF, please submit a request for whitelisting your Azure Subscription to enable ADF Data Flow factories: http://aka.ms/dataflowpreview.

 

New Azure Data Factory End-to-End Lab

Just a few weeks ago, we announced the public preview of the new browser-based UI for Azure Data Factory. See Gaurav’s blog here detailing the release. To help you understand how to build complex data integration projects in the ADF Visual Design interface, we’ve partnered with Pragmatic Works, who have been long-time experts in the Microsoft data integration and ETL space, to create a new set of hands-on labs that you can now use to learn how to build those DI patterns using ADF V2.

That link points to a GitHub repo for the lab, where you will find data files and scripts in the Deployment folder. There are lab manual folders for each Lab and overview presentations, shown below for more details. You will also find a series of PowerShell and DB scripts as well as ARM templates that will generate resource groups that the labs need in order for you to successfully build out an end-to-end scenario with sample data that you can use for Power BI reports in the final Lab 9. Here is how the individual labs are divided:

  • Lab 1 – Setting up ADF and Resources, Start here to get all of the ARM resource groups and database backup files loaded properly.
  • Lab 2 – Lift and Shift of SSIS to Azure, Go to this lab if you have existing SSIS packages on-prem that you’d like to migrate directly to the cloud using the ADF SSIS-IR capability.
  • Lab 3 – Rebuilding an existing SSIS job as an ADF pipeline.
  • Lab 4 – Take the new ADF pipeline and enhance it with data from Cloud Sources.
  • Lab 5 – Modernize the DW pipeline by transforming Big Data with HDInsight.
  • Lab 6 – Go to this lab to learn how to create copy workflows in ADF into Azure SQL Data Warehouse.
  • Lab 7 – Build a trigger-based schedule for your new ADF pipeline.
  • Lab 8 – You’ve operationalized your pipeline based on a schedule. Now learn how to monitor and manage that DI process.
  • Lab 9 – Bringing it all Together