ADF Mapping Data Flows Parameters

Using Azure Data Factory Mapping Data Flows, you can make your data transformations flexible and general-purpose by using parameters. Use Data Flow parameters to create dynamic transformation expressions and dynamic contents inside of transformation settings. The online documentation for Data Flow parameters can be found here.

scdT1_update

  1. To get started, open a Data Flow and click on the Parameters tabcreate-params
  2. Here is where you can create and manage the Parameters that you will use inside of your data flows. You have the option of setting an optional default value here.
  3. You can also create parameters from the Expression Builder.param8
  4. You can use those parameters in your expressions to make dynamic transformations. Access the read-only parameter values inside your expressions.params9
  5. Use parameters to make the settings in your source & sink transformations dynamic. Make your data flows dynamic based upon values sent in from the pipeline execution.params6
  6. Click inside configurable settings fields and you’ll see the “Add dynamic content” link pop-up. Click to launch the expression builder and enter static values or dynamic expressions that utilize your parameters.
  7. Now that you have your data flow designed with parameters, you can test it from a pipeline debug run. Click on the Execute Data Flow activity and you’ll see a Parameters tab on the bottom settings panel. Click inside the Value to set a static or dynamic value in either Data Flow Expression language or using the pipeline expressions (param must be type string).params7parameter-example

 

 

Advertisements

Dynamic SQL Table Names with Azure Data Factory Data Flows

You can leverage ADF’s parameters feature with Mapping Data Flows to create pipelines that dynamically create new target tables. You can set those table names through Lookups or other activities. I’ve written a very simply post below on the tools you’ll need to do this:

tables3

  1. Create a new ADF pipeline.
  2. Switch on “Data Flow Debug”.
  3. Create 2 new datasets. We’ll use Azure SQL DB for this demo, so create 2 Azure SQL DB datasets.
  4. tables1
  5. Go to the Parameter tab in the dataset and add a parameter for “tablename” as a sring.
  6. In both of the datasets, do not select a table name. Instead, leave the table name blank and click in the table name field to display the “Add Dynamic Content” link.
  7. datasparam
  8. Select the “tablename” parameter for the value in the Table field.
  9. You’ll do this for both datasets. We’ll use 1 dataset for the source and the other for the sink destination.
  10. Add an Execute Data Flow activity to your pipeline canvas and create a new data flow.
  11. tables2
  12. Inside the data flow canvas, select the dataset for the source table.
  13. Add a Sink transformation directly after the source.
  14. Choose the dataset for the destination table.
  15. Back in the pipeline, click on the Execute Data Flow activity.
  16. dataflowparam
  17. In the Settings tab, you’ll see a prompt for the values for the incoming and outgoing table names.
  18. For this demo, I just typed in static text. I have an existing table in my SQL DB called “dbo.batting1”. I want to copy it as “dbo.batting2”.
  19. This pipeline will copy it as a different name in the same database.
  20. In a real-world scenario, you will set these dataset parameters via values from Lookup or other activities that change dynamically.
  21. Click on “Debug” to test your pipeline.
  22. After the debug run is executed, you should now see a new table in your Azure SQL DB with the name that you provided in the 2nd dataset parameter.

ADF Mapping Data Flows: Optimize for File Source and Sink

I’m going to use this blog post as a dynamic list of performance optimizations to consider when using Azure Data Factory’s Mapping Data Flow. I am going to focus this only to files. I will post subsequent articles that list ways to optimize other source, sinks, and data transformation types. As I receive more good practices, feedback, and other performance tunings, I will update this article accordingly.

Here is Azure SQL DB Optimizations for ADF Data Flows

Here is Azure SQL DW Optimizations for ADF Data Flows

Optimizations to consider when using ADF Mapping Data Flows with files

NOTE: When you are designing and testing Data Flows from the ADF UI, make sure to turn on the Debug switch so that you can execute your data flows in real-time without waiting for a cluster to warm up.

debugb1

You can control how many partitions that ADF will use

opt1

  1. On each Source & Sink transformation, as well as each individual transformation, you can set a partitioning scheme.
  2. For smaller files, you may find selecting “Single Partition” can sometimes work better and faster than asking Spark to partition your small files.
  3. If you do not have enough information about your source data, you can choose “Round Robin” partitioning and set the number of partitions.
  4. If you explore your data and find that you have columns that can be good hash keys, use the Hash partitioning option.

File naming options

  1. The default nature of writing transformed data in ADF Mapping Data Flows is to write to a dataset that has a Blob or ADLS Linked Service. You should set that dataset to point to a folder or container, not a named file.
  2. Data Flows use Azure Databricks Spark for execution, which means that your output will be split over multiple files based on either default Spark partitioning or the partitioning scheme that you’ve explicitly chosen.
  3. A very common operation in ADF Data Flows is to choose “Output to single file” so that all of your output PART files are merged together into a single output file.
  4. However, this operation requires that the output reduces to a single partition on a single cluster node.
  5. Keep this in mind when choosing this popular option. You can run out of cluster node resources if you are combining many large source files into a single output file partition.
  6. To avoid this, you can keep the default or explicit partitioning scheme in ADF, which optimizes for performance, and then add a subsequent Copy Activity in the pipeline that merges all of the PART files from the output folder to a new single file. Essentially, this technique separates the action of transformation from file merging and achieves the same result as setting “output to single file”.

Increase size of your compute engine in Azure Integration Runtime

ir-new

  1. Increase the number of cores, which will increase the number of nodes, and provide you with more processing power to process your file operations.
  2. Try “Compute Optimized” and “Memory Optimized” options.

ADF Mapping Data Flows: Optimize for Azure SQL Data Warehouse

I’m going to use this blog post as a dynamic list of performance optimizations to consider when using Azure Data Factory’s Mapping Data Flow. I am going to focus this only to Azure SQL DW. I will post subsequent articles that list ways to optimize other source, sinks, and data transformation types. As I receive more good practices, feedback, and other performance tunings, I will update this article accordingly.

Here is Azure SQL DB Optimizations for ADF Data Flows.

Optimizations to consider when using ADF Mapping Data Flows with Azure SQL DW

NOTE: When you are designing and testing Data Flows from the ADF UI, make sure to turn on the Debug switch so that you can execute your data flows in real-time without waiting for a cluster to warm up.

debugb1

You can match Spark data partitioning to your source database partitioning based on a database table column key in the Source transformation

sourcepart2

  1. Go to “Optimize” and select “Source”. Set either a specific table column or a type in a query.
  2. If you chose “column”, then pick the partition column.
  3. Also, set the maximum number of connections to your Azure SQL DW. You can try a higher setting to gain parallel connections to your database. However, some cases may result in faster performance with a limited number of connections.

Set Batch Size and Query on Source

source4.png

  1. Setting batch size will instruct ADF to store data in sets in memory instead of row-by-row. It is an optional setting and you may run out of resources on the compute nodes if they are not sized properly.
  2. Setting a query can allow you to filter rows right at the source before they even arrive for Data Flow for processing, which can make the initial data acquisition faster.

Use staging to load data in bulk via Polybase

  1.  In order to avoid row-by-row processing of your data floes, set the “Staging” option in the Sink settings so that ADF can leverage Polybase to avoid row-by-row inserts into DW. This will instruct ADF to use Polybase so that data can be loaded in bulk.
  2. When you execute your data flow activity from a pipeline, with Staging turned on, you will need to select the Blob store location of your staging data for bulk loading.

Set Partitioning Options on your Sink

robin

  1. Even if you don’t have your data partitioned in your destination Azure SQL DW tables, go to the Optimize tab and set partitioning.
  2. Very often, simply telling ADF to use Round Robin partitioning on the Spark execution clusters results in much faster data loading instead of forcing all connections from a single node/partition.

Set isolation level on Source transformation settings for SQL datasets

  1. Read uncommitted will provide faster query results on Source transformation

isolationlevel

Increase size of your compute engine in Azure Integration Runtime

ir-new

  1. Increase the number of cores, which will increase the number of nodes, and provide you with more processing power to query and write to your Azure SQL DW.
  2. Try “Compute Optimized” and “Memory Optimized” options

Disable indexes on write

  1. Use an ADF pipeline stored procedure activity prior to your Data Flow activity that disables indexes on your target tables that are being written to from your Sink.
  2. After your Data Flow activity, add another stored proc activity that enabled those indexes.

Increase the size of your Azure SQL DW

  1. Schedule a resizing of your source and sink Azure SQL DW before you run your pipeline to increase the throughput and minimize Azure throttling once you reach DWU limits.
  2. After your pipeline execution is complete, you can resize your databases back to their normal run rate.

ADF Mapping Data Flows: Optimize for Azure SQL Database

I’m going to use this blog post as a dynamic list of performance optimizations to consider when using Azure Data Factory’s Mapping Data Flow. I am going to focus this only to Azure SQL DB. I will post subsequent articles that list ways to optimize other source, sinks, and data transformation types. As I receive more good practices, feedback, and other performance tunings, I will update this article accordingly.

Here is Azure SQL DW Optimizations for ADF Data Flows.

Optimizations to consider when using ADF Mapping Data Flows with Azure SQL DB

NOTE: When you are designing and testing Data Flows from the ADF UI, make sure to turn on the Debug switch so that you can execute your data flows in real-time without waiting for a cluster to warm up.

debugb1

Partition your source query

sourcepart2

  1. Go to “Optimize” and select “Source”. Set either a specific table column or a type in a query.
  2. If you chose “column”, then pick the partition column.
  3. Also, set the maximum number of connections to your Azure SQL DB. You can try a higher setting to gain parallel connections to your database. However, some cases may result in faster performance with a limited number of connections.
  4. Your source database tables do not need to be partitioned.
  5. Setting a query in your Source transformation that matches the partitioning scheme of your database table will allow the source database engine to leverage partition elimination.
  6. If your source is not already partitioned, ADF will still use data partitioning in the Spark transformation environment based on the key that you select in the Source transformation.

Set Batch Size and Query on Source

source4.png

  1. Setting batch size will instruct ADF to store data in sets in memory instead of row-by-row. It is an optional setting and you may run out of resources on the compute nodes if they are not sized properly.
  2. Setting a query can allow you to filter rows right at the source before they even arrive for Data Flow for processing, which can make the initial data acquisition faster.
  3. If you use a query, you can add optional query hints for your Azure SQL DB, i.e. READ UNCOMMITTED

Set isolation level on Source transformation settings for SQL datasets

  1. Read uncommitted will provide faster query results on Source transformation

isolationlevel

Set Sink Batch Size

sink4

  1.  In order to avoid row-by-row processing of your data floes, set the “Batch size” in the sink settings for Azure SQL DB. This will tell ADF to process database writes in batches based on the size provided.

Set Partitioning Options on your Sink

robin

  1. Even if you don’t have your data partitioned in your destination Azure SQL DB tables, go to the Optimize tab and set partitioning.
  2. Very often, simply telling ADF to use Round Robin partitioning on the Spark execution clusters results in much faster data loading instead of forcing all connections from a single node/partition.

Increase size of your compute engine in Azure Integration Runtime

ir-new

  1. Increase the number of cores, which will increase the number of nodes, and provide you with more processing power to query and write to your Azure SQL DB.
  2. Try “Compute Optimized” and “Memory Optimized” options

Disable indexes on write

  1. Use an ADF pipeline stored procedure activity prior to your Data Flow activity that disables indexes on your target tables that are being written to from your Sink.
  2. After your Data Flow activity, add another stored proc activity that enabled those indexes.

Increase the size of your Azure SQL DB

  1. Schedule a resizing of your source and sink Azure SQL DB before your run your pipeline to increase the throughput and minimize Azure throttling once you reach DTU limits.
  2. After your pipeline execution is complete, you can resize your databases back to their normal run rate.

ADF Slowly Changing Dimension Type 2 with Mapping Data Flows (complete)

I have been putting together a series of posts and videos around building SCD Type 1 and Type 2 using Mapping Data Flows with Azure Data Factory. In this latest post, I’m going to walk through a complete end-to-end Type 2. I won’t be able to provide full detail here. Instead, I’m going to touch on the different pieces that you need from ADF to make it work and then I would suggest that you download the JSON for this data flow here and walk through it on your Data Factory.

Here are links to the other parts of this series:

This post is an expansion of the first intro to SCD post above. However, this time, I’m going to expand upon some of the more complex scenarios that you’ll find in dimension handling in ETL like keeping member history in your dimension table.

scdT1_001

Use Case

DimEmployees is a dimension in a data warehouse that analyzes projects. Attributes of those employee records will change occasionally and when they do, we want to track them by maintaining history, creating a new row with the new employee data (SCD Type 2).

Step 1: We need 2 sources. First is the incoming new employee records, which is this CSV file:

EmpID Region Status Function Level Role StartDate EndDate
1234 SER A ADM A Finance 1/1/2000
1345 SER A ADM A Finance 4/5/2008
1789 PNW A ENG N Engineer 7/9/2011
2349 PNW I ENG N Engineer 9/8/1999 4/1/2019
8382 NER A RAD A Marketing 4/5/1998

The 2nd source will be the existing DimEmployees table (DimEmp) in the existing data warehouse, which is in my Azure SQL Database:

dimemp

Basic SCD Type 2 Logic

  1. Lookup incoming new Employee records against existing records in the DimEmployee table
  2. If they are new employees, then create a new surrogate key and insert the row into the DimEmployee table
  3. If the Employee member already exists, then set the “iscurrent” flag to 0 and update the End Date and Status attributes in the existing tow
  4. Add a new row for the Employee with the new member attributes, set “iscurrent” to 1

The top row in the Data Flow is the “new employee” stream.

sctT1_new

New Rows

    1. The Employee new file source is set to delete upon completion and uses a wildcard path to find the latest CSVs in a folder
    2. sctT1_source1
    3. The Null Filter removes any extraneous rows from the source due to extra newlines using the Filter tranformation
    4. The TypeConservation Derived Column transformation norms the data types of the incoming CSV string-types to logical types and also sets an ETL processtime field to currentTimeStamp(). I use this in all of my ETL processes for tracking & logging.
    5. LookupIDs will find matching employees from the DimEmp source, matching on EmpId. This is the 2nd source:
    6. sctT1_lookup
    7. I also used a type conversion Derived Column to again norm data types by casting each field to ensure we are using the same logical types on this 2nd source from the database table.
    8. NormNames is a “Select” transformation which is used for aliasing, renaming, and column selectivity. I am removing any columns from the Lookup that I do not wish to flow through here as well as removing the FQNs, keeping simple names with no namespace.
    9. NewRow is a Conditional Split which is used from the results of the Lookup to decide if this is a new incoming employee. If the “iscurrent” field is NULL, then we know it is new because that column will only be present from an existing database row.
    10. Now that we know the empID is new, we can create a new row for Insert. The Sink will have “Allow Inserts” as the only database option for the DimEmp table and the SetAttrsForNew is a Derived Column that will set “iscurrent” to 1 and will generate a new Surrogate Key using this formula:
    11. toString(crc32(EmpID,EmpFunction))
    12. Notice that I did not use the Surrogate Key transformation. In this case, I am not seeding all new values. The SK in Data Flows acts as a sequence generator. In this case, I wish to use pure non-business keys that are not sequential.

Existing Rows

scdT1_updateWe’ll know that the incoming row is an update to an existing member because we found a value (not NULL, could be any value) from the Conditional Split in the “checkForUpdates” branch.

      1. NameNorm2 is another Select transform that again picks the columns we’re interested in and allows us to remove duplicate columns that originated from the Lookup
      2. CheckForChanges is an Exists transformation and is how we’re going to make a determination that there was a change in any of the existing member properties that we are interested in.
      3. I decided to only trigger an update to the table if we see a change in Region, Status, Role, Level, or End Date using this formula in Exists:
      4. NameNorm2@EmpID == TypeConversions@EmpID &&
        (
        NameNorm2@Region != DimEmployees@Region ||
        NameNorm2@Status != DimEmployees@Status ||
        NameNorm2@Level != DimEmployees@Level ||
        NameNorm2@Role != DimEmployees@Role ||
        NameNorm2@EndDate != DimEmployees@EndDate
        )
      5. If any rows have changes to those attributes, we’ll write a new row by setting “iscurrent” to 1 in the SetAttrUpdate Derived Column, marking this row as the active member
      6. On the “CheckForChanges” Exists transform, select “New Branch”. This will create a separate copy of that data stream so that we can use any matches from that Exists check to turn the existing rows to inactive.
      7. SetAttrsInactive is a Derived Column that sets “iscurrent” to 0, marking the existing member row as not current.
      8. I use a Select transformation called “InactiveFields” so that I choose only the columns that I wish to update to the existing, now inactive, version of the Employee member.
      9. Alter Row is added next as a transformation that will set my database policy on this stream to “Update”. The formula to update is simply “true()”. This allows us to update the existing member row.
      10. The Sink is set to only “allow updates” and the mapping only maps the fields that need to be updated to the existing dimension members:
      11. scdT1_sink

     

The complete JSON for this data flow is in my Github repo here.

I also recorded a video showing this SCD T2 Data Flow in action here.

Partition Large Files with ADF using Mapping Data Flows

A very common practice when designing Big Data ETL and Analytics solutions in the Cloud is to find creative ways to work with very large data files. Of course, Data Engineers who are working primarily on-prem also face challenges processing very large files. The added challenge with hybrid and Cloud scenarios is that you also have to build architectures with bandwidth and utility billing constraints in mind.

Big Data platforms like Spark and Hadoop provide a very natural fit for handling large file processing by leveraging distributed file systems. This allows partitioned data files across worker nodes to process data locally on each node to divide & conquer these workloads.

ADF’s Mapping Data Flow feature is built upon Spark in the Cloud, so the fundamental steps in large file processing are also available to you as an ADF user. This means that you can use Data Flows to perform the very common requirement of splitting your large file across partitioned files so that you can process and move the file in pieces.

To accomplish this in ADF Data Flows:

  1. Create a new Data Flow
    1. You are going to create a very simple Data Flow just to leverage file partitioning. There will not be any column or row transformations. Just a Source and a Sink that will take a large file and produce smaller part files.filesplit1
  2. Add a Source file
    1. For this demo, I am using the Kaggle public loans data CSV with >800k records. You do not need to set any schema or projections because we are not working with data at the column level here.
    2. Make sure to turn on the Data Flow Debug switch at the top of the UI browser to warm up your cluster to execute this data flow later
  3. Add a Sink folder
    1. For the Sink dataset, choose the type of output files you would like to produce. I’m going to start with my large CSV and produce partitioned CSV files, so I’m using a Delimited Text dataset. Note in the dataset file path I am typing in a new folder name “output/parts”. ADF will use this to generate a new folder called “parts” when this data flow executes that will be created in my existing “output” folder in Blob.filesplit4
    2. In the Sink, define the partitioning
    3. filesplit2
    4. This is where you will define how you would like the partitioned files to be generated. I’m asking for 20 equal distributions using a simple Round Robin technique. I have also set the output file names using the “pattern” option. “loans[n].csv” will produce new part files names loans1.csv, loans2.csv … loans20.csv.
    5. filesplit3
    6. Notice I’ve also set “Clear the folder”. This will ask ADF to wipe the contents of the destination folder clean before loading new part files
  4. Save your data flow and create a new pipeline
  5. Add an Execute Data Flow activity and select your new file split data flow
  6. Execute the pipeline using the pipeline debug button
    1. You must execute data flows from a pipeline in order to generate file output. Debugging from Data Flow does not write any data.
  7. After execution, you should now see 20 files that resulted from round robin partitioning of your large source file. You’re done:
    1. filesplit6
  8. In the output of your pipeline debug run, you’ll see the execution results of the data flow activity. Click on eyeglasses icon to show the details of your data flow execution. You’ll see the statistics of the distribution of records in your partitioned files:
    1. filesplit5

 

Azure Data Factory: Build U-SQL Tweet Analysis with ADF Data Flows

One of the most commonly used execution environments for Big Data transformations in ADF is Azure Data Lake Analytics (ADLA) using U-SQL scripts to transform data at scale. The new ADF Data Flow feature allows you to build data transformations at scale without needing to know any scripting or coding environments. You won’t need to learn and understand the details of the execution environment or big data cluster maintenance. Instead, you can remain within the ETL data integration environment to build both your workflow pipelines as well as your data transformation code-free inside the ADF visual UI.

For this example, I’ll rebuild the Tweet Analysis U-SQL demo here from Michael Rys that demonstrates some of the analytical capabilities of ADLA and U-SQL. To build this in ADF, I’ll use visual data flow capabilities against the MikeDoesBigDataTweets.csv file in this folder. I added a few additional new Tweets to the CSV to update the contents a bit using my own KromerBigData handle using http://tweetdownload.net/.

Step 1: Define the schema and count the number of Tweets grouped by Author

U-SQL: Simple Analysis


@t =
EXTRACT date string,
time string,
author string,
tweet string
FROM "/Samples/Data/Tweets/MikeDoesBigDataTweets.csv"
USING Extractors.Csv();


@res =
SELECT author,
COUNT( * ) AS
FROM @t
GROUP BY author;

OUTPUT @res
TO "/Output/TweetAnalysis/MyTwitterAnalysis1.csv"
ORDER BY DESC
USING Outputters.Csv();

In ADF Data Flow, I’ll accomplish this with a Source Transform that defines the schema and then an Aggregate transform for the count. Notice that I put the Aggregate in a separate order than the U-SQL sample. There is no need to recreate scripts or programs in exact order when building a data flow. Also, notice that I grouped both by author & category. I took some liberties to change the scenario slightly. You can see that I am setting my Data Flow to “Debug” mode so that I can see the Data Preview as I build the flow.

usql1

usql4

usql3

Step 2: Categorize Tweets

U-SQL: Extract Mentions

@m = SELECT m.Substring(1) AS m
, "mention" AS category
FROM @m CROSS APPLY EXPLODE(mentions) AS t(m)
WHERE m != "@";
@t =
SELECT author, "author" AS category
FROM @t
UNION ALL
SELECT *
FROM @m;
@res = SELECT author.ToLowerInvariant() AS author
, category
, COUNT( * ) AS tweetcount
FROM @t
GROUP BY author.ToLowerInvariant(), category;
OUTPUT @res
TO "/Output/TweetAnalysis/MyTwitterAnalysis2.csv"
ORDER BY tweetcount DESC
USING Outputters.Csv();

In ADF Data Flow, I’m going to categorize the Tweets by Mentions and Authors using an IIF statement based upon the first character of the Tweet Text. Any aggregation like Count, is done in the Aggregate Transform, which is why the sequencing is different when converting from a scripting environment.

usql2

Step 3: Rank Tweets

U-SQL:  Rank Tweets with Window Functions

@res =
SELECT DISTINCT
author, category, tweetcount
, PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY tweetcount ASC)
OVER (PARTITION BY category) AS median_tweetcount_perhandle_category
, PERCENT_RANK() OVER
(PARTITION BY category ORDER BY tweetcount ASC) AS relative_rank
, ROW_NUMBER() OVER
(PARTITION BY category ORDER BY tweetcount DESC) AS absolute_rank
FROM TweetAuthorsAndMentions
WHERE tweetcount >= 50;
OUTPUT @res
TO "/Output/TweetAnalysis/MyTwitterAnalysis6.csv"
ORDER BY absolute_rank, category ASC
USING Outputters.Csv();

 

In ADF Data Flow, you’ll use a Window Transformation with the Rank function and partition the data using the Over and Range settings in the transformation. You can see in the debug data previews below that the Rank function has partitioned the Tweets both by the Category and the Author as I had set in my Aggregation previously.

usql5

usql6

Step 4: Serialize output results to Lake

In these U-SQL samples, the scripts are divided up by function, so the results are serialized back in the lake at the end of each step. In ADF Data Flow, to serialize your results, use a Sink. To keep the data in the lake, just use an ADLS or Azure Blob Store dataset to land the results in files. If you’d like to serialize your results in a database, you can use Azure SQL DB or Azure SQL DW datasets.

usql7

ADF Data Flows: Databricks Notebook ETL vs. ADF Visual Data Flow ETL

The new (preview) feature in Azure Data Factory called Data Flows, allows you to visually design, build, debug, and execute data transformations at scale on Spark by leveraging Azure Databricks clusters. You can then operationalize your data flows inside a general ADF pipeline with scheduling, triggers, monitoring, etc.

ADF Data Flows (preview) provides a visually-oriented design paradigm meant for code-free data transformation. You can also use ADF to execute code in Databricks, if you prefer to write code, using Databricks Notebooks, Python, JARs, etc. using the ADF pipeline activities.

What I wanted to do in this post was to demonstrate how to perform the same functions of a Databricks Notebook example using ADF Data Flows. In this demo, I’m going to use the European Football statistics sample from Kaggle (aka “Soccer stats”) that is used in this Databricks sample Notebook: Analyze Games from European Soccer Leagues with Apache Spark and Databricks. I am going to recreate the ETL Data Engineering steps with ADF. The completed JSON for this Data Flow is here.

Step 1:

Extract data from source CSV into DataFrames. In ADF, you will build a new Data Flow and use a Source transformation that points to that CSV file. Turn debug on so that you can see the same data profile stats and data preview as the Notebooks demo illustrates.

From Databricks:

from pyspark.sql.types import *

schema = (StructType().
          add("id_odsp", StringType()).add("id_event", StringType()).add("sort_order", IntegerType()).
          add("time", IntegerType()).add("text", StringType()).add("event_type", IntegerType()).
          add("event_type2", IntegerType()).add("side", IntegerType()).add("event_team", StringType()).
          add("opponent", StringType()).add("player", StringType()).add("player2", StringType()).
          add("player_in", StringType()).add("player_out", StringType()).add("shot_place", IntegerType()).
          add("shot_outcome", IntegerType()).add("is_goal", IntegerType()).add("location", IntegerType()).
          add("bodypart", IntegerType()).add("assist_method", IntegerType()).add("situation", IntegerType()).
          add("fast_break", IntegerType())
         )

eventsDf = (spark.read.csv("/data/eu-soccer-events/input/events.csv", 
                         schema=schema, header=True, 
                         ignoreLeadingWhiteSpace=True, 
                         ignoreTrailingWhiteSpace=True,
                         nullValue='NA'))

eventsDf = eventsDf.na.fill({'player': 'NA', 'event_team': 'NA', 'opponent': 'NA', 
                             'event_type': 99, 'event_type2': 99, 'shot_place': 99, 
                             'shot_outcome': 99, 'location': 99, 'bodypart': 99, 
                             'assist_method': 99, 'situation': 99})
display(eventsDf)

image8-2-1024x393

From Kaggle:

kaggle

ADF Data Flows (Source Transform):

soccer1

Step 2:

Transform Data by Mapping Names to Codes. The Kaggle data has codified fields which need to be mapped to names for analysis.

Databricks:


eventsDf = (
eventsDf.
withColumn("event_type_str", mapKeyToVal(evtTypeMap)("event_type")).
withColumn("event_type2_str", mapKeyToVal(evtTyp2Map)("event_type2")).
withColumn("side_str", mapKeyToVal(sideMap)("side")).
withColumn("shot_place_str", mapKeyToVal(shotPlaceMap)("shot_place")).
withColumn("shot_outcome_str", mapKeyToVal(shotOutcomeMap)("shot_outcome")).
withColumn("location_str", mapKeyToVal(locationMap)("location")).
withColumn("bodypart_str", mapKeyToVal(bodyPartMap)("bodypart")).
withColumn("assist_method_str", mapKeyToVal(assistMethodMap)("assist_method")).
withColumn("situation_str", mapKeyToVal(situationMap)("situation"))
)

joinedDf = (
eventsDf.join(gameInfDf, eventsDf.id_odsp == gameInfDf.id_odsp, ‘inner’).
select(eventsDf.id_odsp, eventsDf.id_event, eventsDf.sort_order, eventsDf.time, eventsDf.event_type, eventsDf.event_type_str, eventsDf.event_type2, eventsDf.event_type2_str, eventsDf.side, eventsDf.side_str, eventsDf.event_team, eventsDf.opponent, eventsDf.player, eventsDf.player2, eventsDf.player_in, eventsDf.player_out, eventsDf.shot_place, eventsDf.shot_place_str, eventsDf.shot_outcome, eventsDf.shot_outcome_str, eventsDf.is_goal, eventsDf.location, eventsDf.location_str, eventsDf.bodypart, eventsDf.bodypart_str, eventsDf.assist_method, eventsDf.assist_method_str, eventsDf.situation, eventsDf.situation_str, gameInfDf.country_code)
)

ADF: Use IIF or Case in Expression Builder, or use Lookup with mapped files. Derived column with Case expression:

soccer4

Lookup transformation using a file with mapped values. In this case, we won’t hard-code names in the expression. Instead, we’ll use an external file with the mapped names and join it via Lookup.

soccer3

Step 3: 

Bucketize data based on match time with 10 bins

Databricks:


from pyspark.ml.feature import QuantileDiscretizer

joinedDf = QuantileDiscretizer(numBuckets=10, inputCol=”time”, outputCol=”time_bin”).fit(joinedDf).transform(joinedDf)

display(joinedDf)

ADF Data Flow: Window transformation using nTile (10)

soccer6

Step 4: Load Data into Database as Final Step

Databricks:


%sql
CREATE DATABASE IF NOT EXISTS EURO_SOCCER_DB
LOCATION "dbfs:/FileStore/databricks-abhinav/eu-soccer-events/interm"


%sql USE EURO_SOCCER_DB


joinedDf.write.saveAsTable("GAME_EVENTS", format = "parquet", mode = "overwrite", partitionBy = "COUNTRY_CODE", path = "dbfs:/FileStore/databricks-abhinav/eu-soccer-events/interm/tr-events")

ADF: Sink Transformation

soccer5