Median function in Azure Data Factory

To perform a median (middle value of a sorted list), you need to put a couple of transformations together. Below are the steps needed to use median in ADF using data flows.

  1. Sort your data by the field that you wish to find median value
  2. Collect the values into an array
  3. Count the number of values
  4. Find the midpoint

median1

In my demo, I’m using the movies database CSV source and I would like to find the median rating value of movies grouped by year. My final result will be a single median value for each year that represents the median rating of movies for that year.

The Sort transformation sorts Ratings so that I know that they are in ascending order for my median calculation. Next is the Aggregate transformation which I use to group the data by year. Inside the aggregate, I use collect() so that I can have an index for each value to find the middle and a count() for the total number of indexes.

median2

Last thing I need to do is to find the middle. I do that as a calculation inside a Derived Column transformation. I call the new field “median” and apply this formula:

ratingsCollection[toInteger(round(ratingsCount/2)+1)]

The field ratingsCount was created in the aggregation and so I divide it by 2, round it to an integer and then add 1. Adding 1 means I won’t ever end-up with a 0 index value and I simply pick the higher middle index.

Custom logging and auditing of ADF Data Flows

ADF Data Flows Custom Logging and Auditing Video

ADF has a number of built-in capabilities for logging, monitoring, alerting, and auditing your pipelines. There are UI monitoring tools, telemetry logs, and integration with Azure Monitor to provide a rich set of tools for the administration of your ETL and data integration processes.

However, if you’d like to apply additional custom logging and auditing for your ETL data flows, you can use these techniques below which are all based on existing functionality found within ADF natively:

Data Flow pipeline activity output

log9

With this technique, you will query the output metrics from your data flow activities in the pipeline and pass in the values you are interested to another data flow. The first data flow (ExecDataFlow) is the data transformation worker and the second data flow (Log Data) is the logger activity.

If you look at the output from your data flow activity execution, you will see the JSON payload returned by the activity.

log12

You can pick out different metrics to log such as time for each transformation stage, source rows read, sink rows written, bytes read/written … For this example, I’m going to log the processing time for the Sink (the total time it took to write the rows to the sink) and the number of rows written:

@activity('ExecDataFlow').output.runStatus.metrics.sink1.rowsWritten

@activity('ExecDataFlow').output.runStatus.metrics.sink1.sinkProcessingTime

I am assigning those values to the logger data flow which takes in several integer parameters and simply serves the purpose of writing out those params to a text delimited file with no header to my output folder in ADLS. This makes the data flow very generic and reusable for logging.

The logger data flow uses an ADF data flow technique of pointing to a source CSV file in Blob Store that contains just a single row, a single column, and has no header.

The file content is simply this:

1

I call this file “dummyfile.txt” and I recommend keeping one of those around in your blob stores with an ADF dataset pointing it. It will allow you to generate data flows that don’t really use the source data. Instead, data flows like this logger, will generate values and use parameterized values via Derived Column transformations.

It’s an important technique to learn and repeat in ADF data flows. This way, I can set my source with this dummy source and then set the incoming parameter values in a Derived Column. Then I can write each logger parameter to a text delimited file.

log14

log13

log11

This data flow can be re-used in other pipelines as your logger.

Row Count aggregation inside the data flow

With this technique, you add logging directly inside of your data flows. Here, you can just log row counts and sink those values to a text file or database table. Use a new branch in your data flow logic to create a separate logging branch. Add an Aggregate transformation with no grouping and use the count() function. Notice in the 2nd example below, I’m writing the counts of each insert, update, and upsert operation coming from my logic to audit and log my database operations. In both cases, using a new branch with a logging branch does not affect your transformation logic. However, this technique requires you to add this as non-reusable logic inside each data flow. The above technique with a separate logger data flow allows for reuse.

log5

log15

Row Count lookup activity verification

Another common technique is to count the number of rows from your data flow logic and compare it against the actual number of rows written to the database sink. This is important for auditing and validation.

log10

In this example, we use the Lookup activity in the pipeline and query the total number of rows so that we compare it to the number of rows reported from our data flow logger.

This pipeline expression is the 3rd parameter sent to our data flow logger:

@activity('GetRowCount').output.firstRow.myrowcount

Now, when we look at the output file from our data flow logger, it shows the number of rows written from the activity, the time it took to execute in milliseconds, and the number of rows counted in the actual database itself, so that we can see the discrepancy:

9128,14913,9125

Late Binding vs. Early Binding in ADF Data Flows

When transforming cloud data at scale in ADF, Mapping Data Flows allows Data Engineers to graphically design an execution plan of data transformation routines that will execute against Spark. The inherent nature of complex cloud-based big data is that the data will be messy and will frequently change shape and form.

We have built-in features in Azure Data Factory to make it easy for Data Engineers to build robust ETL projects in ADF that can transform massive amounts in a flexible way. without writing code. To build your transformations in a way that can account for changes in data shapes, you will build your data flows in a way that exploits what we call “late binding”.

In many cases, you will build ETL projects for your business that require well-defined schemas with data sources that do not frequently change. In those cases, you can use traditional early-binding in your data flows. These are the characteristics of early binding:

  1. A well-defined schema in your dataset
  2. The data shape for your data sources does not frequently change
  3. Your transformation expressions use explicit column names from your datasets
  4. Heavy use of Select transformations to manipulate schemas manually
  5. Heavy use of manual mapping in Sink and Select transformations

An example set of Aggregation expressions that utilize early-binding would be these expressions below that I use in my baseball data flow to generate offensive statistics based on columns that I expect to be present in my source data:

exp11

For column mapping, I know exactly which fields I wish to map in my destination and I want to control the logical names, so I’m using manual mapping:

mapping.png

As your data flows become more complex, include flexible schemas, and utilize late-binding, you’ll use either auto-mapping to generate output for all late-arriving columns (also known as “drifted”) or rule-based mapping.

To build data flows that are more resilient to change, you can choose to leverage features for late binding. It is important to keep in mind that when utilizing these late-binding techniques, your logical flow may look a little more complex and requires you to work with your data in a way that is patterns-based.

  1. Use datasets that are schemaless. Your dataset is used in the Source transformations to define the shape of your data as the data is acquired into a Spark data frame. ADF datasets do not require schemas and you can take advantage of that to work with flexible data sources. Any column that ADF sees in your source that is not defined in your dataset is labeled as “drifted”.
  2. Build transformations that use patterns instead of named columns. Writing expressions that require explicitly named columns from a dataset are using “early binding”. Instead, use patterns and rules that look for data by position, by naming patterns and data types.
  3. Late binding utilizes ADF’s schema drift features, so you’ll want to make sure you’ve selected “schema drift” on your source and sinks. This way, ADF can pass through all newly-detected columns.

Here is an example of a Derived Column where I demonstrate pattern matching. From top-to-bottom:

  1. I trim every String column, keeping the column name the same
  2. I at 1.2 to the columns present at ordinal positions 2 and 3
  3. I look for any column with the words “Customers” followed by a number using regular expressions. When that pattern is found, I generate a new column by appending “_customer” to the original column name. “$$” means that I am keeping the original value.

exp22

As your datasets become larger and wider, utilize rules in your Sink mapping like this example below. Here, I’m mapping any stat that contains “H” to the same target name in my destination table. I’m also including “SH” as a new column called “Sacrifices”.

matches.png

These are examples of late binding that depend upon patterns and rules to define my transformation and mapping intent when I have data sources that are wide and change frequently.

For more details in these areas in ADF, read about:

  1. Data Flow schema drift
  2. Data Flow column patterns

Reduce Execution Time for Data Flow Activities in ADF Pipelines

In ADF Mapping Data Flows, there are 2 working modes: Debug mode and Pipeline mode.

Debug mode is active when you turn on the Data Flow debug switch and the light is green, showing debug as active. You will also see the Data Preview pane at the bottom of your transformation panel light-up with a green light. This will turn on the ability to interactively preview your data as you build your transformations in real-time in the data flow UI.

You can also execute pipelines with data flow activities against that same live Azure IR. To do this, you click “Debug” from the pipeline. Your pipeline will execute immediately and you can view your data flow activity in the bottom panel.

debugpipeline.png

The other mode of working in Data Flows is from an operationalized pipeline. That is, a pipeline that has been scheduled from a trigger and runs against the live ADF service. Typically, you do this as your final testing step and then schedule your pipeline for normal operations. You can also get to this mode interactively from the pipeline screen (see above) by clicking Add Trigger > Trigger Now.

When you use Trigger Now, or a pipeline scheduled run, you no longer are using the debug Azure IR compute environment. Instead, you will use the Azure IR that is selected in each of your Data Flow activities.

activity-data-flow2.png

Each of your data flow activities can execute from a different configuration of Azure IR. That means that you can apply more or fewer resources to each execution. You define the size of the compute environment from the Data Flow properties in the Azure IR:

azureir2

If you set the TTL, ADF will spin-up and maintain a pool of resources that can be reused for that period of time. Every time that you request a data flow activity execution against that same Azure IR, ADF can load the cluster compute and job execution on a warm VM, reducing the overall execution time of your data flow. If you do not specify a TTL, then ADF will always spin-up new compute clusters on every execution.

This model ADF presents is very economical because you do not have to maintain an always-running Spark cluster to serve your ETL needs. With ADF, the compute is ephemeral and only present when it is needed.

The average start-up time for a just-in-time cluster is 5-7 minutes. If you have the need to lower that start-up time, then utilize the TTL setting. This will still require that initial load time to provision a pool of resources for that Azure IR. But each subsequent execution that occurs within that TTL window will have less acquisition time required.

am2

In my example above, I executed my JSON Data Flow first against an Azure IR without TTL and it took 13 mins. Then, I executed it again, this time using a TTL of 10 mins. In this case, the data flow took almost half that time. Because I used a 10 min TTL, the cluster spun-up from warm VMs and I only had to pay for an extra 10 min TTL.

On the monitoring view, I am able to see the difference in cluster acquisition time from my Azure IR with no TTL vs. with TTL. Just keep in mind that your first activity will spin-up the resource pool which requires that same 5-7 minute build time.

mon2

mon1

ADF Data Flows: Distinct Rows

Below is a method to use in ADF’s Mapping Data Flows to reduce the data stream in your data flows to only include distinct rows. This sample is available as a pipeline template here.

distinct1

  1. Choose your source. This will work with any type of source.
  2. Add an Aggregate transformation. Here I call it “DistinctRows”.
  3. In the group by setting, you need to choose which or column or combination of columns will make up the key(s) for ADF to determine distinct rows. In this simple demo, I’ll just pick “movie” as my key column:distinct3
  4. The inherent nature of the aggregate transformation is to block all metadata columns not used in the aggregate. But here, we are using the aggregate to filter out non-distinct rows, so we need every column from the original dataset.
  5. To do this, go to the aggregate settings and choose Column Pattern.
  6. Here, you will need to make a choice between including the first set of values from the duplicate rows, or the last. Essentially, choose which row you want to be the source of truth. distinct2
  7. That’s really it. That’s all you need to do to find distinct rows in your data.
  8. The rest of this data flow sample is comparing the distinct row to the original set of data. You can keep the other transformation streams in this sample data flow so that you can compare the original data with the distinct row to make sure it is behaving the way you expect.
  9. I created a copy of the original source data by using New Source and then renamed that stream as “OriginalData” by aliasing it with the Select transformation on the bottom row.
  10. The row counts are just Aggregate transformations.
  11. To create row counts go to Aggregate settings and leave the group by empty.
  12. In the aggregate function, use the function count(1). This will create a running count of every row.

ETL with ADF: Convert Pig to Data Flows

Here’s a brief posting on taking an ETL script written in Pig. I took an ETL example using Pig from the Hortonworks tutorials site and migrating it to ADF using Mapping Data Flows. It took me approximately 10 minutes to design it.

What we’ll do in this demo is to create projections from two separate CSV files sitting in our data lake, aggregate the hours logged and miles logged by drivers, then join the datasets together for a complete report that is stored back in a lake folder. The full tutorial in Pig is here. You can download the source files to play along at home here.

I was able to take this 13-line Pig script and build it visually, without any coding, in ADF Data Flows in about 10 minutes. All it took was configuring 6 transformations on the design surface and I didn’t need to bother with Hadoop, MapReduce, Spark, Databricks or any other Big Data tech. Just define your data transformation intent in the ADF design surface and Data Flows handles the rest by spinning-up a Spark environment JIT, executing your transformations at scale, then tearing down the cluster for you.

drivers = LOAD 'drivers.csv' USING PigStorage(',');
raw_drivers = FILTER drivers BY $0>1;
drivers_details = FOREACH raw_drivers GENERATE $0 AS driverId, $1 AS name;
timesheet = LOAD 'timesheet.csv' USING PigStorage(',');
raw_timesheet = FILTER timesheet by $0>1;
timesheet_logged = FOREACH raw_timesheet GENERATE $0 AS driverId, $2 AS hours_logged, $3 AS miles_logged;
grp_logged = GROUP timesheet_logged by driverId;
sum_logged = FOREACH grp_logged GENERATE group as driverId,
SUM(timesheet_logged.hours_logged) as sum_hourslogged,
SUM(timesheet_logged.miles_logged) as sum_mileslogged;
join_sum_logged = JOIN sum_logged by driverId, drivers_details by driverId;
join_data = FOREACH join_sum_logged GENERATE $0 as driverId, $4 as name, $1 as hours_logged, $2 as miles_logged;
dump join_data;

Here are the steps to create this as an ADF Mapping Data Flow so that your end result will look something like this:

drivers1

  1. Create Delimited Text datasets for drivers.csv and timsesheet.csv. Switch on headers in the dataset configuration and import schema from source.
  2. Create a new mapping data flow.
  3. Switch on Data Flow Debug mode.
  4. Add a Source transformation to the data flow canvas.
  5. Select the DriversCSV dataset that you created in step 1.
  6. Set the projection data types to match these formats:drivers2
  7. Add a 2nd Source transformation and select the timesheetCSV dataset from step 1 and set the projection data types as such:driver9
  8. Validate that your data looks correct in the data preview tab on both sources. You should see something like this for each:drivers10drivers3
  9. Make sure that your data types and data look correct. We’ll use the driverID as the PK/FK for the Join and the hours logged and miles logged for aggregations.
  10. Let’s build the aggregations next. Add an aggregate transformation after your timesheet source transformation.
  11. Name your aggregate transformation stream to be “SumHoursAndMiles”.
  12. Configure the group-by field to be “driverId” because we’ll calculate the hours and miles per driver.
  13. In the aggregates settings, create 2 new fields using the sum() function to look like this:drivers5
  14. Use the data preview tab on the Aggregate transformation to validate that the data is summing properly:drivers6
  15. You should see a sum total for your 2 new fields next to each driver ID.
  16. Now we can join the 2 streams together so that we can see the full driver details along with the aggregated sums.
  17. Add a Join transformation to your driversCSV source transformation.
  18. Configure the Join for inner join on the “driverID” column:drivers4
  19. There are a few duplicate columns on both streams, so let’s prune out the duplicate columns by using a Select transformation to curate the metadata.
  20. In the Select transformation, all you have to do is click “Skip Duplicates” for both input and output:drivers7
  21. Now you have a clean projection that you can sink to your data lake.
  22. Add a Sink transformation that writes to a folder dataset in Blob Store or ADLS Gen 2.
  23. Keep auto-map on the Sink and click data preview to make sure your data is looking correct. You should now see driver details along with their aggregate values:drivers1
  24. That’s it for designing your data flow from the Pig example. If you want to execute this to produce output files into your lake folder sink, create a pipeline in ADF and add a Data Flow activity.
  25. The best practice is to point the data flow activity to your new data flow and test it using the Debug button in the pipeline canvas before you set a triggered schedule for it. This way, you can verify that the data is being written to your lake folder properly and you won’t have to wait for the Databricks cluster spin-up time.

ADF Mapping Data Flows Parameters

Using Azure Data Factory Mapping Data Flows, you can make your data transformations flexible and general-purpose by using parameters. Use Data Flow parameters to create dynamic transformation expressions and dynamic contents inside of transformation settings. The online documentation for Data Flow parameters can be found here.

scdT1_update

  1. To get started, open a Data Flow and click on the Parameters tabcreate-params
  2. Here is where you can create and manage the Parameters that you will use inside of your data flows. You have the option of setting an optional default value here.
  3. You can also create parameters from the Expression Builder.param8
  4. You can use those parameters in your expressions to make dynamic transformations. Access the read-only parameter values inside your expressions.params9
  5. Use parameters to make the settings in your source & sink transformations dynamic. Make your data flows dynamic based upon values sent in from the pipeline execution.params6
  6. Click inside configurable settings fields and you’ll see the “Add dynamic content” link pop-up. Click to launch the expression builder and enter static values or dynamic expressions that utilize your parameters.
  7. Now that you have your data flow designed with parameters, you can test it from a pipeline debug run. Click on the Execute Data Flow activity and you’ll see a Parameters tab on the bottom settings panel. Click inside the Value to set a static or dynamic value in either Data Flow Expression language or using the pipeline expressions (param must be type string).params7parameter-example

 

 

Dynamic SQL Table Names with Azure Data Factory Data Flows

You can leverage ADF’s parameters feature with Mapping Data Flows to create pipelines that dynamically create new target tables. You can set those table names through Lookups or other activities. I’ve written a very simply post below on the tools you’ll need to do this:

tables3

  1. Create a new ADF pipeline.
  2. Switch on “Data Flow Debug”.
  3. Create 2 new datasets. We’ll use Azure SQL DB for this demo, so create 2 Azure SQL DB datasets.
  4. tables1
  5. Go to the Parameter tab in the dataset and add a parameter for “tablename” as a sring.
  6. In both of the datasets, do not select a table name. Instead, leave the table name blank and click in the table name field to display the “Add Dynamic Content” link.
  7. datasparam
  8. Select the “tablename” parameter for the value in the Table field.
  9. You’ll do this for both datasets. We’ll use 1 dataset for the source and the other for the sink destination.
  10. Add an Execute Data Flow activity to your pipeline canvas and create a new data flow.
  11. tables2
  12. Inside the data flow canvas, select the dataset for the source table.
  13. Add a Sink transformation directly after the source.
  14. Choose the dataset for the destination table.
  15. Back in the pipeline, click on the Execute Data Flow activity.
  16. dataflowparam
  17. In the Settings tab, you’ll see a prompt for the values for the incoming and outgoing table names.
  18. For this demo, I just typed in static text. I have an existing table in my SQL DB called “dbo.batting1”. I want to copy it as “dbo.batting2”.
  19. This pipeline will copy it as a different name in the same database.
  20. In a real-world scenario, you will set these dataset parameters via values from Lookup or other activities that change dynamically.
  21. Click on “Debug” to test your pipeline.
  22. After the debug run is executed, you should now see a new table in your Azure SQL DB with the name that you provided in the 2nd dataset parameter.

ADF Mapping Data Flows: Optimize for File Source and Sink

I’m going to use this blog post as a dynamic list of performance optimizations to consider when using Azure Data Factory’s Mapping Data Flow. I am going to focus this only to files. I will post subsequent articles that list ways to optimize other source, sinks, and data transformation types. As I receive more good practices, feedback, and other performance tunings, I will update this article accordingly.

Here is Azure SQL DB Optimizations for ADF Data Flows

Here is Azure SQL DW Optimizations for ADF Data Flows

Optimizations to consider when using ADF Mapping Data Flows with files

NOTE: When you are designing and testing Data Flows from the ADF UI, make sure to turn on the Debug switch so that you can execute your data flows in real-time without waiting for a cluster to warm up.

debugb1

You can control how many partitions that ADF will use

opt1

  1. On each Source & Sink transformation, as well as each individual transformation, you can set a partitioning scheme.
  2. For smaller files, you may find selecting “Single Partition” can sometimes work better and faster than asking Spark to partition your small files.
  3. If you do not have enough information about your source data, you can choose “Round Robin” partitioning and set the number of partitions.
  4. If you explore your data and find that you have columns that can be good hash keys, use the Hash partitioning option.

File naming options

  1. The default nature of writing transformed data in ADF Mapping Data Flows is to write to a dataset that has a Blob or ADLS Linked Service. You should set that dataset to point to a folder or container, not a named file.
  2. Data Flows use Azure Databricks Spark for execution, which means that your output will be split over multiple files based on either default Spark partitioning or the partitioning scheme that you’ve explicitly chosen.
  3. A very common operation in ADF Data Flows is to choose “Output to single file” so that all of your output PART files are merged together into a single output file.
  4. However, this operation requires that the output reduces to a single partition on a single cluster node.
  5. Keep this in mind when choosing this popular option. You can run out of cluster node resources if you are combining many large source files into a single output file partition.
  6. To avoid this, you can keep the default or explicit partitioning scheme in ADF, which optimizes for performance, and then add a subsequent Copy Activity in the pipeline that merges all of the PART files from the output folder to a new single file. Essentially, this technique separates the action of transformation from file merging and achieves the same result as setting “output to single file”.

Increase size of your compute engine in Azure Integration Runtime

ir-new

  1. Increase the number of cores, which will increase the number of nodes, and provide you with more processing power to process your file operations.
  2. Try “Compute Optimized” and “Memory Optimized” options.