Custom logging and auditing of ADF Data Flows

ADF has a number of built-in capabilities for logging, monitoring, alerting, and auditing your pipelines. There are UI monitoring tools, telemetry logs, and integration with Azure Monitor to provide a rich set of tools for the administration of your ETL and data integration processes.

However, if you’d like to apply additional custom logging and auditing for your ETL data flows, you can use these techniques below which are all based on existing functionality found within ADF natively:

Data Flow pipeline activity output

log9

With this technique, you will query the output metrics from your data flow activities in the pipeline and pass in the values you are interested to another data flow. The first data flow (ExecDataFlow) is the data transformation worker and the second data flow (Log Data) is the logger activity.

If you look at the output from your data flow activity execution, you will see the JSON payload returned by the activity.

log12

You can pick out different metrics to log such as time for each transformation stage, source rows read, sink rows written, bytes read/written … For this example, I’m going to log the processing time for the Sink (the total time it took to write the rows to the sink) and the number of rows written:

@activity('ExecDataFlow').output.runStatus.metrics.sink1.rowsWritten

@activity('ExecDataFlow').output.runStatus.metrics.sink1.sinkProcessingTime

I am assigning those values to the logger data flow which takes in several integer parameters and simply serves the purpose of writing out those params to a text delimited file with no header to my output folder in ADLS. This makes the data flow very generic and reusable for logging.

The logger data flow uses an ADF data flow technique of pointing to a source CSV file in Blob Store that contains just a single row, a single column, and has no header.

The file content is simply this:

1

I call this file “dummyfile.txt” and I recommend keeping one of those around in your blob stores with an ADF dataset pointing it. It will allow you to generate data flows that don’t really use the source data. Instead, data flows like this logger, will generate values and use parameterized values via Derived Column transformations.

It’s an important technique to learn and repeat in ADF data flows. This way, I can set my source with this dummy source and then set the incoming parameter values in a Derived Column. Then I can write each logger parameter to a text delimited file.

log14

log13

log11

This data flow can be re-used in other pipelines as your logger.

Row Count aggregation inside the data flow

With this technique, you add logging directly inside of your data flows. Here, you can just log row counts and sink those values to a text file or database table. Use a new branch in your data flow logic to create a separate logging branch. Add an Aggregate transformation with no grouping and use the count() function. Notice in the 2nd example below, I’m writing the counts of each insert, update, and upsert operation coming from my logic to audit and log my database operations. In both cases, using a new branch with a logging branch does not affect your transformation logic. However, this technique requires you to add this as non-reusable logic inside each data flow. The above technique with a separate logger data flow allows for reuse.

log5

log15

Row Count lookup activity verification

Another common technique is to count the number of rows from your data flow logic and compare it against the actual number of rows written to the database sink. This is important for auditing and validation.

log10

In this example, we use the Lookup activity in the pipeline and query the total number of rows so that we compare it to the number of rows reported from our data flow logger.

This pipeline expression is the 3rd parameter sent to our data flow logger:

@activity('GetRowCount').output.firstRow.myrowcount

Now, when we look at the output file from our data flow logger, it shows the number of rows written from the activity, the time it took to execute in milliseconds, and the number of rows counted in the actual database itself, so that we can see the discrepancy:

9128,14913,9125

Late Binding vs. Early Binding in ADF Data Flows

When transforming cloud data at scale in ADF, Mapping Data Flows allows Data Engineers to graphically design an execution plan of data transformation routines that will execute against Spark. The inherent nature of complex cloud-based big data is that the data will be messy and will frequently change shape and form.

We have built-in features in Azure Data Factory to make it easy for Data Engineers to build robust ETL projects in ADF that can transform massive amounts in a flexible way. without writing code. To build your transformations in a way that can account for changes in data shapes, you will build your data flows in a way that exploits what we call “late binding”.

In many cases, you will build ETL projects for your business that require well-defined schemas with data sources that do not frequently change. In those cases, you can use traditional early-binding in your data flows. These are the characteristics of early binding:

  1. A well-defined schema in your dataset
  2. The data shape for your data sources does not frequently change
  3. Your transformation expressions use explicit column names from your datasets
  4. Heavy use of Select transformations to manipulate schemas manually
  5. Heavy use of manual mapping in Sink and Select transformations

An example set of Aggregation expressions that utilize early-binding would be these expressions below that I use in my baseball data flow to generate offensive statistics based on columns that I expect to be present in my source data:

exp11

For column mapping, I know exactly which fields I wish to map in my destination and I want to control the logical names, so I’m using manual mapping:

mapping.png

As your data flows become more complex, include flexible schemas, and utilize late-binding, you’ll use either auto-mapping to generate output for all late-arriving columns (also known as “drifted”) or rule-based mapping.

To build data flows that are more resilient to change, you can choose to leverage features for late binding. It is important to keep in mind that when utilizing these late-binding techniques, your logical flow may look a little more complex and requires you to work with your data in a way that is patterns-based.

  1. Use datasets that are schemaless. Your dataset is used in the Source transformations to define the shape of your data as the data is acquired into a Spark data frame. ADF datasets do not require schemas and you can take advantage of that to work with flexible data sources. Any column that ADF sees in your source that is not defined in your dataset is labeled as “drifted”.
  2. Build transformations that use patterns instead of named columns. Writing expressions that require explicitly named columns from a dataset are using “early binding”. Instead, use patterns and rules that look for data by position, by naming patterns and data types.
  3. Late binding utilizes ADF’s schema drift features, so you’ll want to make sure you’ve selected “schema drift” on your source and sinks. This way, ADF can pass through all newly-detected columns.

Here is an example of a Derived Column where I demonstrate pattern matching. From top-to-bottom:

  1. I trim every String column, keeping the column name the same
  2. I at 1.2 to the columns present at ordinal positions 2 and 3
  3. I look for any column with the words “Customers” followed by a number using regular expressions. When that pattern is found, I generate a new column by appending “_customer” to the original column name. “$$” means that I am keeping the original value.

exp22

As your datasets become larger and wider, utilize rules in your Sink mapping like this example below. Here, I’m mapping any stat that contains “H” to the same target name in my destination table. I’m also including “SH” as a new column called “Sacrifices”.

matches.png

These are examples of late binding that depend upon patterns and rules to define my transformation and mapping intent when I have data sources that are wide and change frequently.

For more details in these areas in ADF, read about:

  1. Data Flow schema drift
  2. Data Flow column patterns

ADF Data Flows: Distinct Rows

Below is a method to use in ADF’s Mapping Data Flows to reduce the data stream in your data flows to only include distinct rows. This sample is available as a pipeline template here.

distinct1

  1. Choose your source. This will work with any type of source.
  2. Add an Aggregate transformation. Here I call it “DistinctRows”.
  3. In the group by setting, you need to choose which or column or combination of columns will make up the key(s) for ADF to determine distinct rows. In this simple demo, I’ll just pick “movie” as my key column:distinct3
  4. The inherent nature of the aggregate transformation is to block all metadata columns not used in the aggregate. But here, we are using the aggregate to filter out non-distinct rows, so we need every column from the original dataset.
  5. To do this, go to the aggregate settings and choose Column Pattern.
  6. Here, you will need to make a choice between including the first set of values from the duplicate rows, or the last. Essentially, choose which row you want to be the source of truth. distinct2
  7. That’s really it. That’s all you need to do to find distinct rows in your data.
  8. The rest of this data flow sample is comparing the distinct row to the original set of data. You can keep the other transformation streams in this sample data flow so that you can compare the original data with the distinct row to make sure it is behaving the way you expect.
  9. I created a copy of the original source data by using New Source and then renamed that stream as “OriginalData” by aliasing it with the Select transformation on the bottom row.
  10. The row counts are just Aggregate transformations.
  11. To create row counts go to Aggregate settings and leave the group by empty.
  12. In the aggregate function, use the function count(1). This will create a running count of every row.

ETL with ADF: Convert Pig to Data Flows

Here’s a brief posting on taking an ETL script written in Pig. I took an ETL example using Pig from the Hortonworks tutorials site and migrating it to ADF using Mapping Data Flows. It took me approximately 10 minutes to design it.

What we’ll do in this demo is to create projections from two separate CSV files sitting in our data lake, aggregate the hours logged and miles logged by drivers, then join the datasets together for a complete report that is stored back in a lake folder. The full tutorial in Pig is here. You can download the source files to play along at home here.

I was able to take this 13-line Pig script and build it visually, without any coding, in ADF Data Flows in about 10 minutes. All it took was configuring 6 transformations on the design surface and I didn’t need to bother with Hadoop, MapReduce, Spark, Databricks or any other Big Data tech. Just define your data transformation intent in the ADF design surface and Data Flows handles the rest by spinning-up a Spark environment JIT, executing your transformations at scale, then tearing down the cluster for you.

drivers = LOAD 'drivers.csv' USING PigStorage(',');
raw_drivers = FILTER drivers BY $0>1;
drivers_details = FOREACH raw_drivers GENERATE $0 AS driverId, $1 AS name;
timesheet = LOAD 'timesheet.csv' USING PigStorage(',');
raw_timesheet = FILTER timesheet by $0>1;
timesheet_logged = FOREACH raw_timesheet GENERATE $0 AS driverId, $2 AS hours_logged, $3 AS miles_logged;
grp_logged = GROUP timesheet_logged by driverId;
sum_logged = FOREACH grp_logged GENERATE group as driverId,
SUM(timesheet_logged.hours_logged) as sum_hourslogged,
SUM(timesheet_logged.miles_logged) as sum_mileslogged;
join_sum_logged = JOIN sum_logged by driverId, drivers_details by driverId;
join_data = FOREACH join_sum_logged GENERATE $0 as driverId, $4 as name, $1 as hours_logged, $2 as miles_logged;
dump join_data;

Here are the steps to create this as an ADF Mapping Data Flow so that your end result will look something like this:

drivers1

  1. Create Delimited Text datasets for drivers.csv and timsesheet.csv. Switch on headers in the dataset configuration and import schema from source.
  2. Create a new mapping data flow.
  3. Switch on Data Flow Debug mode.
  4. Add a Source transformation to the data flow canvas.
  5. Select the DriversCSV dataset that you created in step 1.
  6. Set the projection data types to match these formats:drivers2
  7. Add a 2nd Source transformation and select the timesheetCSV dataset from step 1 and set the projection data types as such:driver9
  8. Validate that your data looks correct in the data preview tab on both sources. You should see something like this for each:drivers10drivers3
  9. Make sure that your data types and data look correct. We’ll use the driverID as the PK/FK for the Join and the hours logged and miles logged for aggregations.
  10. Let’s build the aggregations next. Add an aggregate transformation after your timesheet source transformation.
  11. Name your aggregate transformation stream to be “SumHoursAndMiles”.
  12. Configure the group-by field to be “driverId” because we’ll calculate the hours and miles per driver.
  13. In the aggregates settings, create 2 new fields using the sum() function to look like this:drivers5
  14. Use the data preview tab on the Aggregate transformation to validate that the data is summing properly:drivers6
  15. You should see a sum total for your 2 new fields next to each driver ID.
  16. Now we can join the 2 streams together so that we can see the full driver details along with the aggregated sums.
  17. Add a Join transformation to your driversCSV source transformation.
  18. Configure the Join for inner join on the “driverID” column:drivers4
  19. There are a few duplicate columns on both streams, so let’s prune out the duplicate columns by using a Select transformation to curate the metadata.
  20. In the Select transformation, all you have to do is click “Skip Duplicates” for both input and output:drivers7
  21. Now you have a clean projection that you can sink to your data lake.
  22. Add a Sink transformation that writes to a folder dataset in Blob Store or ADLS Gen 2.
  23. Keep auto-map on the Sink and click data preview to make sure your data is looking correct. You should now see driver details along with their aggregate values:drivers1
  24. That’s it for designing your data flow from the Pig example. If you want to execute this to produce output files into your lake folder sink, create a pipeline in ADF and add a Data Flow activity.
  25. The best practice is to point the data flow activity to your new data flow and test it using the Debug button in the pipeline canvas before you set a triggered schedule for it. This way, you can verify that the data is being written to your lake folder properly and you won’t have to wait for the Databricks cluster spin-up time.

ADF Mapping Data Flows Parameters

Using Azure Data Factory Mapping Data Flows, you can make your data transformations flexible and general-purpose by using parameters. Use Data Flow parameters to create dynamic transformation expressions and dynamic contents inside of transformation settings. The online documentation for Data Flow parameters can be found here.

scdT1_update

  1. To get started, open a Data Flow and click on the Parameters tabcreate-params
  2. Here is where you can create and manage the Parameters that you will use inside of your data flows. You have the option of setting an optional default value here.
  3. You can also create parameters from the Expression Builder.param8
  4. You can use those parameters in your expressions to make dynamic transformations. Access the read-only parameter values inside your expressions.params9
  5. Use parameters to make the settings in your source & sink transformations dynamic. Make your data flows dynamic based upon values sent in from the pipeline execution.params6
  6. Click inside configurable settings fields and you’ll see the “Add dynamic content” link pop-up. Click to launch the expression builder and enter static values or dynamic expressions that utilize your parameters.
  7. Now that you have your data flow designed with parameters, you can test it from a pipeline debug run. Click on the Execute Data Flow activity and you’ll see a Parameters tab on the bottom settings panel. Click inside the Value to set a static or dynamic value in either Data Flow Expression language or using the pipeline expressions (param must be type string).params7parameter-example

 

 

ADF Mapping Data Flows: Optimize for File Source and Sink

I’m going to use this blog post as a dynamic list of performance optimizations to consider when using Azure Data Factory’s Mapping Data Flow. I am going to focus this only to files. I will post subsequent articles that list ways to optimize other source, sinks, and data transformation types. As I receive more good practices, feedback, and other performance tunings, I will update this article accordingly.

Here is Azure SQL DB Optimizations for ADF Data Flows

Here is Azure SQL DW Optimizations for ADF Data Flows

Optimizations to consider when using ADF Mapping Data Flows with files

NOTE: When you are designing and testing Data Flows from the ADF UI, make sure to turn on the Debug switch so that you can execute your data flows in real-time without waiting for a cluster to warm up.

debugb1

You can control how many partitions that ADF will use

opt1

  1. On each Source & Sink transformation, as well as each individual transformation, you can set a partitioning scheme.
  2. For smaller files, you may find selecting “Single Partition” can sometimes work better and faster than asking Spark to partition your small files.
  3. If you do not have enough information about your source data, you can choose “Round Robin” partitioning and set the number of partitions.
  4. If you explore your data and find that you have columns that can be good hash keys, use the Hash partitioning option.

File naming options

  1. The default nature of writing transformed data in ADF Mapping Data Flows is to write to a dataset that has a Blob or ADLS Linked Service. You should set that dataset to point to a folder or container, not a named file.
  2. Data Flows use Azure Databricks Spark for execution, which means that your output will be split over multiple files based on either default Spark partitioning or the partitioning scheme that you’ve explicitly chosen.
  3. A very common operation in ADF Data Flows is to choose “Output to single file” so that all of your output PART files are merged together into a single output file.
  4. However, this operation requires that the output reduces to a single partition on a single cluster node.
  5. Keep this in mind when choosing this popular option. You can run out of cluster node resources if you are combining many large source files into a single output file partition.
  6. To avoid this, you can keep the default or explicit partitioning scheme in ADF, which optimizes for performance, and then add a subsequent Copy Activity in the pipeline that merges all of the PART files from the output folder to a new single file. Essentially, this technique separates the action of transformation from file merging and achieves the same result as setting “output to single file”.

Increase size of your compute engine in Azure Integration Runtime

ir-new

  1. Increase the number of cores, which will increase the number of nodes, and provide you with more processing power to process your file operations.
  2. Try “Compute Optimized” and “Memory Optimized” options.

ADF Mapping Data Flows: Optimize for Azure SQL Data Warehouse

I’m going to use this blog post as a dynamic list of performance optimizations to consider when using Azure Data Factory’s Mapping Data Flow. I am going to focus this only to Azure SQL DW. I will post subsequent articles that list ways to optimize other source, sinks, and data transformation types. As I receive more good practices, feedback, and other performance tunings, I will update this article accordingly.

Here is Azure SQL DB Optimizations for ADF Data Flows.

Optimizations to consider when using ADF Mapping Data Flows with Azure SQL DW

NOTE: When you are designing and testing Data Flows from the ADF UI, make sure to turn on the Debug switch so that you can execute your data flows in real-time without waiting for a cluster to warm up.

debugb1

You can match Spark data partitioning to your source database partitioning based on a database table column key in the Source transformation

sourcepart2

  1. Go to “Optimize” and select “Source”. Set either a specific table column or a type in a query.
  2. If you chose “column”, then pick the partition column.
  3. Also, set the maximum number of connections to your Azure SQL DW. You can try a higher setting to gain parallel connections to your database. However, some cases may result in faster performance with a limited number of connections.

Set Batch Size and Query on Source

source4.png

  1. Setting batch size will instruct ADF to store data in sets in memory instead of row-by-row. It is an optional setting and you may run out of resources on the compute nodes if they are not sized properly.
  2. Setting a query can allow you to filter rows right at the source before they even arrive for Data Flow for processing, which can make the initial data acquisition faster.

Use staging to load data in bulk via Polybase

  1.  In order to avoid row-by-row processing of your data floes, set the “Staging” option in the Sink settings so that ADF can leverage Polybase to avoid row-by-row inserts into DW. This will instruct ADF to use Polybase so that data can be loaded in bulk.
  2. When you execute your data flow activity from a pipeline, with Staging turned on, you will need to select the Blob store location of your staging data for bulk loading.

Set Partitioning Options on your Sink

robin

  1. Even if you don’t have your data partitioned in your destination Azure SQL DW tables, go to the Optimize tab and set partitioning.
  2. Very often, simply telling ADF to use Round Robin partitioning on the Spark execution clusters results in much faster data loading instead of forcing all connections from a single node/partition.

Set isolation level on Source transformation settings for SQL datasets

  1. Read uncommitted will provide faster query results on Source transformation

isolationlevel

Increase size of your compute engine in Azure Integration Runtime

ir-new

  1. Increase the number of cores, which will increase the number of nodes, and provide you with more processing power to query and write to your Azure SQL DW.
  2. Try “Compute Optimized” and “Memory Optimized” options

Disable indexes on write

  1. Use an ADF pipeline stored procedure activity prior to your Data Flow activity that disables indexes on your target tables that are being written to from your Sink.
  2. After your Data Flow activity, add another stored proc activity that enabled those indexes.

Increase the size of your Azure SQL DW

  1. Schedule a resizing of your source and sink Azure SQL DW before you run your pipeline to increase the throughput and minimize Azure throttling once you reach DWU limits.
  2. After your pipeline execution is complete, you can resize your databases back to their normal run rate.

ADF Mapping Data Flows: Optimize for Azure SQL Database

I’m going to use this blog post as a dynamic list of performance optimizations to consider when using Azure Data Factory’s Mapping Data Flow. I am going to focus this only to Azure SQL DB. I will post subsequent articles that list ways to optimize other source, sinks, and data transformation types. As I receive more good practices, feedback, and other performance tunings, I will update this article accordingly.

Here is Azure SQL DW Optimizations for ADF Data Flows.

Optimizations to consider when using ADF Mapping Data Flows with Azure SQL DB

NOTE: When you are designing and testing Data Flows from the ADF UI, make sure to turn on the Debug switch so that you can execute your data flows in real-time without waiting for a cluster to warm up.

debugb1

Partition your source query

sourcepart2

  1. Go to “Optimize” and select “Source”. Set either a specific table column or a type in a query.
  2. If you chose “column”, then pick the partition column.
  3. Also, set the maximum number of connections to your Azure SQL DB. You can try a higher setting to gain parallel connections to your database. However, some cases may result in faster performance with a limited number of connections.
  4. Your source database tables do not need to be partitioned.
  5. Setting a query in your Source transformation that matches the partitioning scheme of your database table will allow the source database engine to leverage partition elimination.
  6. If your source is not already partitioned, ADF will still use data partitioning in the Spark transformation environment based on the key that you select in the Source transformation.

Set Batch Size and Query on Source

source4.png

  1. Setting batch size will instruct ADF to store data in sets in memory instead of row-by-row. It is an optional setting and you may run out of resources on the compute nodes if they are not sized properly.
  2. Setting a query can allow you to filter rows right at the source before they even arrive for Data Flow for processing, which can make the initial data acquisition faster.
  3. If you use a query, you can add optional query hints for your Azure SQL DB, i.e. READ UNCOMMITTED

Set isolation level on Source transformation settings for SQL datasets

  1. Read uncommitted will provide faster query results on Source transformation

isolationlevel

Set Sink Batch Size

sink4

  1.  In order to avoid row-by-row processing of your data floes, set the “Batch size” in the sink settings for Azure SQL DB. This will tell ADF to process database writes in batches based on the size provided.

Set Partitioning Options on your Sink

robin

  1. Even if you don’t have your data partitioned in your destination Azure SQL DB tables, go to the Optimize tab and set partitioning.
  2. Very often, simply telling ADF to use Round Robin partitioning on the Spark execution clusters results in much faster data loading instead of forcing all connections from a single node/partition.

Increase size of your compute engine in Azure Integration Runtime

ir-new

  1. Increase the number of cores, which will increase the number of nodes, and provide you with more processing power to query and write to your Azure SQL DB.
  2. Try “Compute Optimized” and “Memory Optimized” options

Disable indexes on write

  1. Use an ADF pipeline stored procedure activity prior to your Data Flow activity that disables indexes on your target tables that are being written to from your Sink.
  2. After your Data Flow activity, add another stored proc activity that enabled those indexes.

Increase the size of your Azure SQL DB

  1. Schedule a resizing of your source and sink Azure SQL DB before your run your pipeline to increase the throughput and minimize Azure throttling once you reach DTU limits.
  2. After your pipeline execution is complete, you can resize your databases back to their normal run rate.

ADF Slowly Changing Dimension Type 2 with Mapping Data Flows (complete)

I have been putting together a series of posts and videos around building SCD Type 1 and Type 2 using Mapping Data Flows with Azure Data Factory. In this latest post, I’m going to walk through a complete end-to-end Type 2. I won’t be able to provide full detail here. Instead, I’m going to touch on the different pieces that you need from ADF to make it work and then I would suggest that you download the JSON for this data flow here and walk through it on your Data Factory.

Here are links to the other parts of this series:

This post is an expansion of the first intro to SCD post above. However, this time, I’m going to expand upon some of the more complex scenarios that you’ll find in dimension handling in ETL like keeping member history in your dimension table.

scdT1_001

Use Case

DimEmployees is a dimension in a data warehouse that analyzes projects. Attributes of those employee records will change occasionally and when they do, we want to track them by maintaining history, creating a new row with the new employee data (SCD Type 2).

Step 1: We need 2 sources. First is the incoming new employee records, which is this CSV file:

EmpID Region Status Function Level Role StartDate EndDate
1234 SER A ADM A Finance 1/1/2000
1345 SER A ADM A Finance 4/5/2008
1789 PNW A ENG N Engineer 7/9/2011
2349 PNW I ENG N Engineer 9/8/1999 4/1/2019
8382 NER A RAD A Marketing 4/5/1998

The 2nd source will be the existing DimEmployees table (DimEmp) in the existing data warehouse, which is in my Azure SQL Database:

dimemp

Basic SCD Type 2 Logic

  1. Lookup incoming new Employee records against existing records in the DimEmployee table
  2. If they are new employees, then create a new surrogate key and insert the row into the DimEmployee table
  3. If the Employee member already exists, then set the “iscurrent” flag to 0 and update the End Date and Status attributes in the existing tow
  4. Add a new row for the Employee with the new member attributes, set “iscurrent” to 1

The top row in the Data Flow is the “new employee” stream.

sctT1_new

New Rows

    1. The Employee new file source is set to delete upon completion and uses a wildcard path to find the latest CSVs in a folder
    2. sctT1_source1
    3. The Null Filter removes any extraneous rows from the source due to extra newlines using the Filter tranformation
    4. The TypeConservation Derived Column transformation norms the data types of the incoming CSV string-types to logical types and also sets an ETL processtime field to currentTimeStamp(). I use this in all of my ETL processes for tracking & logging.
    5. LookupIDs will find matching employees from the DimEmp source, matching on EmpId. This is the 2nd source:
    6. sctT1_lookup
    7. I also used a type conversion Derived Column to again norm data types by casting each field to ensure we are using the same logical types on this 2nd source from the database table.
    8. NormNames is a “Select” transformation which is used for aliasing, renaming, and column selectivity. I am removing any columns from the Lookup that I do not wish to flow through here as well as removing the FQNs, keeping simple names with no namespace.
    9. NewRow is a Conditional Split which is used from the results of the Lookup to decide if this is a new incoming employee. If the “iscurrent” field is NULL, then we know it is new because that column will only be present from an existing database row.
    10. Now that we know the empID is new, we can create a new row for Insert. The Sink will have “Allow Inserts” as the only database option for the DimEmp table and the SetAttrsForNew is a Derived Column that will set “iscurrent” to 1 and will generate a new Surrogate Key using this formula:
    11. toString(crc32(EmpID,EmpFunction))
    12. Notice that I did not use the Surrogate Key transformation. In this case, I am not seeding all new values. The SK in Data Flows acts as a sequence generator. In this case, I wish to use pure non-business keys that are not sequential.

Existing Rows

scdT1_updateWe’ll know that the incoming row is an update to an existing member because we found a value (not NULL, could be any value) from the Conditional Split in the “checkForUpdates” branch.

      1. NameNorm2 is another Select transform that again picks the columns we’re interested in and allows us to remove duplicate columns that originated from the Lookup
      2. CheckForChanges is an Exists transformation and is how we’re going to make a determination that there was a change in any of the existing member properties that we are interested in.
      3. I decided to only trigger an update to the table if we see a change in Region, Status, Role, Level, or End Date using this formula in Exists:
      4. NameNorm2@EmpID == TypeConversions@EmpID &&
        (
        NameNorm2@Region != DimEmployees@Region ||
        NameNorm2@Status != DimEmployees@Status ||
        NameNorm2@Level != DimEmployees@Level ||
        NameNorm2@Role != DimEmployees@Role ||
        NameNorm2@EndDate != DimEmployees@EndDate
        )
      5. If any rows have changes to those attributes, we’ll write a new row by setting “iscurrent” to 1 in the SetAttrUpdate Derived Column, marking this row as the active member
      6. On the “CheckForChanges” Exists transform, select “New Branch”. This will create a separate copy of that data stream so that we can use any matches from that Exists check to turn the existing rows to inactive.
      7. SetAttrsInactive is a Derived Column that sets “iscurrent” to 0, marking the existing member row as not current.
      8. I use a Select transformation called “InactiveFields” so that I choose only the columns that I wish to update to the existing, now inactive, version of the Employee member.
      9. Alter Row is added next as a transformation that will set my database policy on this stream to “Update”. The formula to update is simply “true()”. This allows us to update the existing member row.
      10. The Sink is set to only “allow updates” and the mapping only maps the fields that need to be updated to the existing dimension members:
      11. scdT1_sink

     

The complete JSON for this data flow is in my Github repo here.

I also recorded a video showing this SCD T2 Data Flow in action here.