Azure Data Factory: Build U-SQL Tweet Analysis with ADF Data Flows

One of the most commonly used execution environments for Big Data transformations in ADF is Azure Data Lake Analytics (ADLA) using U-SQL scripts to transform data at scale. The new ADF Data Flow feature allows you to build data transformations at scale without needing to know any scripting or coding environments. You won’t need to learn and understand the details of the execution environment or big data cluster maintenance. Instead, you can remain within the ETL data integration environment to build both your workflow pipelines as well as your data transformation code-free inside the ADF visual UI.

For this example, I’ll rebuild the Tweet Analysis U-SQL demo here from Michael Rys that demonstrates some of the analytical capabilities of ADLA and U-SQL. To build this in ADF, I’ll use visual data flow capabilities against the MikeDoesBigDataTweets.csv file in this folder. I added a few additional new Tweets to the CSV to update the contents a bit using my own KromerBigData handle using http://tweetdownload.net/.

Step 1: Define the schema and count the number of Tweets grouped by Author

U-SQL: Simple Analysis


@t =
EXTRACT date string,
time string,
author string,
tweet string
FROM "/Samples/Data/Tweets/MikeDoesBigDataTweets.csv"
USING Extractors.Csv();


@res =
SELECT author,
COUNT( * ) AS
FROM @t
GROUP BY author;

OUTPUT @res
TO "/Output/TweetAnalysis/MyTwitterAnalysis1.csv"
ORDER BY DESC
USING Outputters.Csv();

In ADF Data Flow, I’ll accomplish this with a Source Transform that defines the schema and then an Aggregate transform for the count. Notice that I put the Aggregate in a separate order than the U-SQL sample. There is no need to recreate scripts or programs in exact order when building a data flow. Also, notice that I grouped both by author & category. I took some liberties to change the scenario slightly. You can see that I am setting my Data Flow to “Debug” mode so that I can see the Data Preview as I build the flow.

usql1

usql4

usql3

Step 2: Categorize Tweets

U-SQL: Extract Mentions

@m = SELECT m.Substring(1) AS m
, "mention" AS category
FROM @m CROSS APPLY EXPLODE(mentions) AS t(m)
WHERE m != "@";
@t =
SELECT author, "author" AS category
FROM @t
UNION ALL
SELECT *
FROM @m;
@res = SELECT author.ToLowerInvariant() AS author
, category
, COUNT( * ) AS tweetcount
FROM @t
GROUP BY author.ToLowerInvariant(), category;
OUTPUT @res
TO "/Output/TweetAnalysis/MyTwitterAnalysis2.csv"
ORDER BY tweetcount DESC
USING Outputters.Csv();

In ADF Data Flow, I’m going to categorize the Tweets by Mentions and Authors using an IIF statement based upon the first character of the Tweet Text. Any aggregation like Count, is done in the Aggregate Transform, which is why the sequencing is different when converting from a scripting environment.

usql2

Step 3: Rank Tweets

U-SQL:  Rank Tweets with Window Functions

@res =
SELECT DISTINCT
author, category, tweetcount
, PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY tweetcount ASC)
OVER (PARTITION BY category) AS median_tweetcount_perhandle_category
, PERCENT_RANK() OVER
(PARTITION BY category ORDER BY tweetcount ASC) AS relative_rank
, ROW_NUMBER() OVER
(PARTITION BY category ORDER BY tweetcount DESC) AS absolute_rank
FROM TweetAuthorsAndMentions
WHERE tweetcount >= 50;
OUTPUT @res
TO "/Output/TweetAnalysis/MyTwitterAnalysis6.csv"
ORDER BY absolute_rank, category ASC
USING Outputters.Csv();

 

In ADF Data Flow, you’ll use a Window Transformation with the Rank function and partition the data using the Over and Range settings in the transformation. You can see in the debug data previews below that the Rank function has partitioned the Tweets both by the Category and the Author as I had set in my Aggregation previously.

usql5

usql6

Step 4: Serialize output results to Lake

In these U-SQL samples, the scripts are divided up by function, so the results are serialized back in the lake at the end of each step. In ADF Data Flow, to serialize your results, use a Sink. To keep the data in the lake, just use an ADLS or Azure Blob Store dataset to land the results in files. If you’d like to serialize your results in a database, you can use Azure SQL DB or Azure SQL DW datasets.

usql7

Advertisements

ADF Data Flows: Databricks Notebook ETL vs. ADF Visual Data Flow ETL

The new (preview) feature in Azure Data Factory called Data Flows, allows you to visually design, build, debug, and execute data transformations at scale on Spark by leveraging Azure Databricks clusters. You can then operationalize your data flows inside a general ADF pipeline with scheduling, triggers, monitoring, etc.

ADF Data Flows (preview) provides a visually-oriented design paradigm meant for code-free data transformation. You can also use ADF to execute code in Databricks, if you prefer to write code, using Databricks Notebooks, Python, JARs, etc. using the ADF pipeline activities.

What I wanted to do in this post was to demonstrate how to perform the same functions of a Databricks Notebook example using ADF Data Flows. In this demo, I’m going to use the European Football statistics sample from Kaggle (aka “Soccer stats”) that is used in this Databricks sample Notebook: Analyze Games from European Soccer Leagues with Apache Spark and Databricks. I am going to recreate the ETL Data Engineering steps with ADF. The completed JSON for this Data Flow is here.

Step 1:

Extract data from source CSV into DataFrames. In ADF, you will build a new Data Flow and use a Source transformation that points to that CSV file. Turn debug on so that you can see the same data profile stats and data preview as the Notebooks demo illustrates.

From Databricks:

from pyspark.sql.types import *

schema = (StructType().
          add("id_odsp", StringType()).add("id_event", StringType()).add("sort_order", IntegerType()).
          add("time", IntegerType()).add("text", StringType()).add("event_type", IntegerType()).
          add("event_type2", IntegerType()).add("side", IntegerType()).add("event_team", StringType()).
          add("opponent", StringType()).add("player", StringType()).add("player2", StringType()).
          add("player_in", StringType()).add("player_out", StringType()).add("shot_place", IntegerType()).
          add("shot_outcome", IntegerType()).add("is_goal", IntegerType()).add("location", IntegerType()).
          add("bodypart", IntegerType()).add("assist_method", IntegerType()).add("situation", IntegerType()).
          add("fast_break", IntegerType())
         )

eventsDf = (spark.read.csv("/data/eu-soccer-events/input/events.csv", 
                         schema=schema, header=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True,
                         nullValue='NA'))

eventsDf = eventsDf.na.fill({'player': 'NA', 'event_team': 'NA', 'opponent': 'NA', 
                             'event_type': 99, 'event_type2': 99, 'shot_place': 99, 
                             'shot_outcome': 99, 'location': 99, 'bodypart': 99, 
                             'assist_method': 99, 'situation': 99})
display(eventsDf)

image8-2-1024x393

From Kaggle:

kaggle

ADF Data Flows (Source Transform):

soccer1

Step 2:

Transform Data by Mapping Names to Codes. The Kaggle data has codified fields which need to be mapped to names for analysis.

Databricks:


eventsDf = (
eventsDf.
withColumn("event_type_str", mapKeyToVal(evtTypeMap)("event_type")).
withColumn("event_type2_str", mapKeyToVal(evtTyp2Map)("event_type2")).
withColumn("side_str", mapKeyToVal(sideMap)("side")).
withColumn("shot_place_str", mapKeyToVal(shotPlaceMap)("shot_place")).
withColumn("shot_outcome_str", mapKeyToVal(shotOutcomeMap)("shot_outcome")).
withColumn("location_str", mapKeyToVal(locationMap)("location")).
withColumn("bodypart_str", mapKeyToVal(bodyPartMap)("bodypart")).
withColumn("assist_method_str", mapKeyToVal(assistMethodMap)("assist_method")).
withColumn("situation_str", mapKeyToVal(situationMap)("situation"))
)

joinedDf = (
eventsDf.join(gameInfDf, eventsDf.id_odsp == gameInfDf.id_odsp, ‘inner’).
select(eventsDf.id_odsp, eventsDf.id_event, eventsDf.sort_order, eventsDf.time, eventsDf.event_type, eventsDf.event_type_str, eventsDf.event_type2, eventsDf.event_type2_str, eventsDf.side, eventsDf.side_str, eventsDf.event_team, eventsDf.opponent, eventsDf.player, eventsDf.player2, eventsDf.player_in, eventsDf.player_out, eventsDf.shot_place, eventsDf.shot_place_str, eventsDf.shot_outcome, eventsDf.shot_outcome_str, eventsDf.is_goal, eventsDf.location, eventsDf.location_str, eventsDf.bodypart, eventsDf.bodypart_str, eventsDf.assist_method, eventsDf.assist_method_str, eventsDf.situation, eventsDf.situation_str, gameInfDf.country_code)
)

ADF: Use IIF or Case in Expression Builder, or use Lookup with mapped files. Derived column with Case expression:

soccer4

Lookup transformation using a file with mapped values. In this case, we won’t hard-code names in the expression. Instead, we’ll use an external file with the mapped names and join it via Lookup.

soccer3

Step 3: 

Bucketize data based on match time with 10 bins

Databricks:


from pyspark.ml.feature import QuantileDiscretizer

joinedDf = QuantileDiscretizer(numBuckets=10, inputCol=”time”, outputCol=”time_bin”).fit(joinedDf).transform(joinedDf)

display(joinedDf)

ADF Data Flow: Window transformation using nTile (10)

soccer6

Step 4: Load Data into Database as Final Step

Databricks:


%sql
CREATE DATABASE IF NOT EXISTS EURO_SOCCER_DB
LOCATION "dbfs:/FileStore/databricks-abhinav/eu-soccer-events/interm"


%sql USE EURO_SOCCER_DB


joinedDf.write.saveAsTable("GAME_EVENTS", format = "parquet", mode = "overwrite", partitionBy = "COUNTRY_CODE", path = "dbfs:/FileStore/databricks-abhinav/eu-soccer-events/interm/tr-events")

ADF: Sink Transformation

soccer5

Azure Data Factory Data Flow: Building Slowly Changing Dimensions

Here is a quick walk-through on how to use Azure Data Factory’s new Data Flow feature (limited preview) to build Slowly Changing Dimension (SCD) ETL patterns.

The Data Flow feature in ADF is currently in limited preview. If you would like to try this out on your Data Factories, please fill out this form to request whitelisting your Azure Subscription for ADF Data Flows: http://aka.ms/dataflowpreview. Once your subscription has been enabled, you will see “Data Factory V2 (with data flows)” as an option from the Azure Portal when creating Data Factories.

I’m going to start super-simple by building just the path in my data flow for an SCD Type 2 in the instance where the dimension member does not already exist in the target Azure SQL DW. In ADF Data Flow, you can have multiple paths in your data flows (called streams) that enable you to perform different data transformations on different data sources in a single Data Flow.

In this case, I will look-up from the DW to see if the incoming product already exists in the DW dimension table. If it does, I’ll branch off and update the row with the new attributes (I’ll blog that path later). For this blog, we’ll complete the path to create a new surrogate key for the row and clean up some of the attributes before loading the new row into the target DW dimension table.

scd1

The first thing I’ll do is to add 2 sources, 1 for the Azure SQL OLTP dataset which has products that were sold today. The 2nd source is a query from the existing DW DimProducts table which has existing unique rows from the products dimension table. Because I am demonstrating a Type 2 SCD, the active dimension members can be signified with NULL as the End Date or ‘Y’ in the Status column to indicate that is the row to use in calculations. To use this query for the 2nd source at the bottom of the diagram, I created a SQL View with the proper query predicate and used the view as my source in the ADF Dataset. However, I could have also used Data Flow directly as the query predicate using the Filter transform and an expression of isNull(EndDate) or EndDate > addMonths(currentDate, 999), depending on the method that you choose to utilize for your SCD management in your dimension tables.

scd7.png

ADF Data Flow is smart enough to take your end-to-end flows and optimize the execution utilizing pushdown techniques when available. So, using a Filter transform against what appears like a complete table scan in the design view may not actually execute as such when you attach your Data Flow to a pipeline. It is best to experiment with different techniques and use the Monitoring in ADF to gather timings and partition counts on your Data Flow activity executions.

There are several different ways to proceed from here. In this demo, I used a Left Outer Join so that I can get all of the new products and all of the existing products in a single query, then I split the flow with a Conditional Split based on whether the surrogate key (stored in ProductAlternateKey) was set.

scd2

If the OLTP source has this value in this sample, then I can drop to the bottom stream in the Conditional Split. I did not yet build out this flow, but what I will do here is to look to see if the stored attributes in the DimProduct table are updated. If I make a determination that the dimension row requires a new row, then I’ll add that row to the DW in the bottom stream.

Using the Lookup transform within ADF Data Flow is also a very common mechanism to use to lookup reference values that can be leveraged in this case. You can utilize this pattern for SCD Type 2 when you want to match the DW member values incoming values to update the table when an attribute changes.

Real quick, I want to point out the Select transform immediately following my Join. This is an important concept in ADF Data Flow. All of your columns will automatically propagate throughout your streams. As your columns accumulate, you can use the Select transform to “select” the columns that you wish to keep. You will see stream names appended to the column names to indicate the origins and lineage of those columns to help you make the proper determination.

scd9

While you can always choose column selectivity in the Sink transform at the end of your Data Flow, maintaining column hygiene may help you to prune your column lists. The downside to this approach is that you will lose that metadata downstream and will not be able to access it once you’ve dropped it from your metadata with a Select.

Now, we need to add a surrogate key. Use the Surrogate Key transform to generate a unique key that will be stored, in this case, in ProductAlternateKey as a way to provide a key that is used for inner joins in the DW star schema and for our BI/analytical solutions. This is a key that is not a business key and is not part of the OLTP system. This is specific to the data warehouse in joining facts to dimensions.

scd5

The ADF Data Flow surrogate transform will provide an incrementing value, so I’m going to prepend some text to the beginning of that value to demonstrate how to give your own flavor to the key. I decided to do this all in one Derived Column transform that I called “SetAttributes” below:scd6

You can always create separate Derived Column transforms for these formulas, but I just found it easier to build all of my expressions in one transform for this sample.

The other formulas are setting up attributes that I’ll use to hold some stored data for publishing into the SQL DW DimProd table. I’m using ‘Y’ for active status of the dimension member, setting a StartDate for the row, EndDate in the future, empty strings instead of NULLs and setting a bit to use because my target schema requires it.

Now all you have to do is sink your fields in your Azure SQL DW with a Sink transform and you’re SCD Type 2 data flow will be complete. In order to execute this flow on a schedule, create a new ADF Pipeline and add a Data Flow activity that points to this SCD Data Flow. You’ll be able to use the full range of ADF schedules and monitoring to operationalize your data warehouse loading in ADF.

Azure Data Factory: Delete Files From Azure Data Lake Store (ADLS)

In a previous post over at Kromer Big Data, I posted examples of deleting files from Azure Blob Storage and Table Storage as part of your ETL pipeline using Azure Data Factory (ADF). In those examples, I built a small, quick Logic App that used the Azure Storage APIs to delete data. In those post, I’m going to demonstrate how to remove files from Azure Data Lake Store (ADLS). For this demo, we’ll use ADF’s V2 service.

Deleting / removing files after they’ve been processed is a very common task in ETL Data Integration routines. Here’s how to do that for Azure Data Lake Store files in ADF:

adfweb

  1. Start by creating a new Data Factory from Azure
  2. Click “Author & Monitor” from your factory in order to launch the ADF UI.
  3. Create a new pipeline and add a single Web Activity.
  4. Switch to the “Settings” tab on the properties pane at the bottom of the pipeline builder UI.
  5. The URL in the Web Activity will need to be the URI pointer the ADLS file you wish to delete
    • https://<yourstorename>.azuredatalakestore.net/webhdfs/v1/mytempdir/myinputfile1.txt?op=DELETE
  6. The URL above (i.e. file names, folder names) can be parameterized. Click the “Add Dynamic Content” link when editing the URL text box.
  7. Set the Web Activity “Method” to “DELETE”.
  8. For authentication, you will need to have an access token. You can use this method to produce one:
  9. The access token returned will need to be captured and used in the Web Activity header as such:
    • Header = "Authorization"  Expression = "Bearer <ACCESS TOKEN>"
  10. You can now validate and test run your pipeline with the Web Activity. Click the “Debug” button to give it a try.

 

 

Control Flow Introduced in Azure Data Factory V2

One of the most exciting new features in Azure Data Factory will be very familiar to data developers, data engineers and ETLers familiar with hand-coding data integration jobs or designing SSIS packages …

“Control Flow”

In general computer science terms, Control Flow is described as “the order in which individual statements, instructions or function calls of an imperative program are executed or evaluated,” (Wikipedia https://en.wikipedia.org/wiki/Control_flow). In the Microsoft Data Integration world, tools like SQL Server Integration Services (SSIS) enables control flow via tasks and containers (https://docs.microsoft.com/en-us/sql/integration-services/control-flow/control-flow). Azure Data Factory’s V1 service was focused on building sequenced pipelines for Big Data Analytics based on time windows. The V2 (preview) version of ADF now includes workflow capabilities in pipelines that enable control flow capabilities that include parameterization, conditional execution, loops and if conditions.

If-Then

In ADF pipelines, activities are equivalent to SSIS tasks. So an IF statement in ADF Control Flow will need to be written as an activity of type “IfCondition”:

"name": "MyIfCondition",
"type": "IfCondition",
"typeProperties": { "expression":
  { "value": "@bool(pipeline().parameters.routeSelection)",
    "type": "Expression" }

IfTrue and IfFalse are properties of that activity which perform subsequent actions via Activities:

"ifTrueActivities":
  [ { "name": "CopyFromBlobToBlob1", "type": "Copy" ... ]

"ifFalseActivities":
  [ { "name": "CopyFromBlobToBlob2", "type": "Copy" ... ]

https://docs.microsoft.com/en-us/azure/data-factory/control-flow-if-condition-activity

Loops

Do-Until: https://docs.microsoft.com/en-us/azure/data-factory/control-flow-until-activity

"name": "Adfv2QuickStartPipeline",
"properties":
  { "activities":
    [ { "type": "Until", "typeProperties":
        { "expression": { "value": "@equals('false', pipeline().parameters.repeat)",
           "type": "Expression" }, "timeout": "00:01:00", "activities":
              [ { "name": "CopyFromBlobToBlob", "type": "Copy" ...

For-Each: https://docs.microsoft.com/en-us/azure/data-factory/control-flow-for-each-activity

"name": "<MyForEachPipeline>",
    "properties":
        { "activities":
             [ { "name": "<MyForEachActivity>", "type": "ForEach",
                   "typeProperties": { "isSequential": "true",
                    "items": "@pipeline().parameters.mySinkDatasetFolderPath", "activities":
                       [ { "name": "MyCopyActivity", "type": "Copy", ...

 

Parameters

Parameters are very common in SSIS packages as well to make for very flexible execution so that packages can be generalized and re-used. This is now available in ADF via parameters in control flow:

https://docs.microsoft.com/en-us/azure/data-factory/control-flow-expression-language-functions

You can build a pipeline that is parameterized like this example with parameterized paths:

"name": "Adfv2QuickStartPipeline",
"properties": {
"activities": [
 {
"name": "CopyFromBlobToBlob",
"type": "Copy",
"inputs": [
 {
"referenceName": "BlobDataset",
"parameters": {
"path": "@pipeline().parameters.inputPath"
 },
"type": "DatasetReference"
 }
 ],
"outputs": [
 {
"referenceName": "BlobDataset",
"parameters": {
"path": "@pipeline().parameters.outputPath"
 },
"type": "DatasetReference"

You can then execute that pipeline via the Invoke Pipeline PowerShell cmdlet: https://docs.microsoft.com/en-us/powershell/module/azurerm.datafactoryv2/Invoke-AzureRmDataFactoryV2Pipeline?view=azurermps-4.4.1. That command will let you either pass the parameters for inputPath and outputPath either on the command line PS or define the values dynamically via parameter files, i.e. (params.json):

{
"inputPath": "markkdhi/tmp",
"outputPath": "markkdhi/user"
}

 

 

Azure Data Factory V2 Quickstart Template

If you’d like to try a quick & easy way to get up & running with your first V2 pipeline of Azure Data Factory, check out the video that I just recorded walking you through how to use the Azure Quickstart Template.

To get to the template, you can search the Azure Quickstart template gallery, or here is a direct like to the ADF V2 template.

Deploying this template to your Azure subscription will build a very simple ADF pipeline using the V2 public preview service, which was announced at the end of September at Ignite.

The pipeline executes a copy activity that will copy a file in blob store. In the example video I linked to above, I use it to make a copy of a file in the same blob container but with a new filename.

To learn more about the new features in ADF V2, see our updated public documentation home page: https://docs.microsoft.com/en-us/azure/data-factory/.