ETL with ADF: Convert Pig to Data Flows

Here’s a brief posting on taking an ETL script written in Pig. I took an ETL example using Pig from the Hortonworks tutorials site and migrating it to ADF using Mapping Data Flows. It took me approximately 10 minutes to design it.

What we’ll do in this demo is to create projections from two separate CSV files sitting in our data lake, aggregate the hours logged and miles logged by drivers, then join the datasets together for a complete report that is stored back in a lake folder. The full tutorial in Pig is here. You can download the source files to play along at home here.

I was able to take this 13-line Pig script and build it visually, without any coding, in ADF Data Flows in about 10 minutes. All it took was configuring 6 transformations on the design surface and I didn’t need to bother with Hadoop, MapReduce, Spark, Databricks or any other Big Data tech. Just define your data transformation intent in the ADF design surface and Data Flows handles the rest by spinning-up a Spark environment JIT, executing your transformations at scale, then tearing down the cluster for you.

drivers = LOAD 'drivers.csv' USING PigStorage(',');
raw_drivers = FILTER drivers BY $0>1;
drivers_details = FOREACH raw_drivers GENERATE $0 AS driverId, $1 AS name;
timesheet = LOAD 'timesheet.csv' USING PigStorage(',');
raw_timesheet = FILTER timesheet by $0>1;
timesheet_logged = FOREACH raw_timesheet GENERATE $0 AS driverId, $2 AS hours_logged, $3 AS miles_logged;
grp_logged = GROUP timesheet_logged by driverId;
sum_logged = FOREACH grp_logged GENERATE group as driverId,
SUM(timesheet_logged.hours_logged) as sum_hourslogged,
SUM(timesheet_logged.miles_logged) as sum_mileslogged;
join_sum_logged = JOIN sum_logged by driverId, drivers_details by driverId;
join_data = FOREACH join_sum_logged GENERATE $0 as driverId, $4 as name, $1 as hours_logged, $2 as miles_logged;
dump join_data;

Here are the steps to create this as an ADF Mapping Data Flow so that your end result will look something like this:

drivers1

  1. Create Delimited Text datasets for drivers.csv and timsesheet.csv. Switch on headers in the dataset configuration and import schema from source.
  2. Create a new mapping data flow.
  3. Switch on Data Flow Debug mode.
  4. Add a Source transformation to the data flow canvas.
  5. Select the DriversCSV dataset that you created in step 1.
  6. Set the projection data types to match these formats:drivers2
  7. Add a 2nd Source transformation and select the timesheetCSV dataset from step 1 and set the projection data types as such:driver9
  8. Validate that your data looks correct in the data preview tab on both sources. You should see something like this for each:drivers10drivers3
  9. Make sure that your data types and data look correct. We’ll use the driverID as the PK/FK for the Join and the hours logged and miles logged for aggregations.
  10. Let’s build the aggregations next. Add an aggregate transformation after your timesheet source transformation.
  11. Name your aggregate transformation stream to be “SumHoursAndMiles”.
  12. Configure the group-by field to be “driverId” because we’ll calculate the hours and miles per driver.
  13. In the aggregates settings, create 2 new fields using the sum() function to look like this:drivers5
  14. Use the data preview tab on the Aggregate transformation to validate that the data is summing properly:drivers6
  15. You should see a sum total for your 2 new fields next to each driver ID.
  16. Now we can join the 2 streams together so that we can see the full driver details along with the aggregated sums.
  17. Add a Join transformation to your driversCSV source transformation.
  18. Configure the Join for inner join on the “driverID” column:drivers4
  19. There are a few duplicate columns on both streams, so let’s prune out the duplicate columns by using a Select transformation to curate the metadata.
  20. In the Select transformation, all you have to do is click “Skip Duplicates” for both input and output:drivers7
  21. Now you have a clean projection that you can sink to your data lake.
  22. Add a Sink transformation that writes to a folder dataset in Blob Store or ADLS Gen 2.
  23. Keep auto-map on the Sink and click data preview to make sure your data is looking correct. You should now see driver details along with their aggregate values:drivers1
  24. That’s it for designing your data flow from the Pig example. If you want to execute this to produce output files into your lake folder sink, create a pipeline in ADF and add a Data Flow activity.
  25. The best practice is to point the data flow activity to your new data flow and test it using the Debug button in the pipeline canvas before you set a triggered schedule for it. This way, you can verify that the data is being written to your lake folder properly and you won’t have to wait for the Databricks cluster spin-up time.
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s