APS Polybase for Hadoop and Windows Azure Blob Storage (WASB) Integration

Introduction

The Microsoft Analytics Platform System (APS) comes with a very powerful feature that’s called Polybase. Polybase has been introduced over 2.5 years ago and extended ever since to integrate the world of structured and unstructured data, either on-premise as well in the Microsoft Azure cloud. The concept is simple: within an APS database you create an ‘External’ table that points to data located in a hadoop hdfs file system or in Windows Azure Blob Storage enabling hybrid data access. It allows you to seamlessly import, export & access data even with a small pipeline; in this hybrid world moving data around is a point of attention where data compression plays a significant role to minimize time needed to crawl through data and turn it into insights.

Below an overview of one of the most powerful Polybase options: the ‘CREATE EXTERNAL TABLE AS SELECT’ (CETAS) which allows you to export data in a compressed format, saving time & bandwidth.

Configuring Polybase – the basics

By default Polybase is disabled on an APS appliance. All the ins & out are described in the APS online documentation but just to give you an idea of the few steps needed, hereby a quick walkthrough.

Multiple flavors and versions of Hadoop are supported by APS Polybase, even HDP running on Linux. For the tests I have in mind for this post we will stick with the setup on how to store and query data from Microsoft Azure Blob Storage (WASB): with SP_configure ‘Option 4’ you enable integration with HDInsight on Analytics Platform System (version AU2) (HDP 2.0), Hortonworks (HDP 2.0) for Windows Server and Azure blob storage on Microsoft Azure (WASB[S]).

EXEC sp_configure 'Hadoop Connectivity', 4; 
RECONFIGURE; 
GO

Hadoop Connectivity

 

Secondly we have to add an Azure storage account to the core-site config file that resides on the APS Control node in folder:

C:\Program Files\Microsoft SQL Server Parallel Data Warehouse\100\Hadoop\conf

Copy and paste the account name and the key into the core-site.xml file and restart the APS Region via the APS configuration utility (located in C:\Program Files\Microsoft SQL Server Parallel Data Warehouse\100\dwconfig.exe)

Edit account and key  (yes, dummy values !)

 

Last step is to Configure DNS forwarding on the DNS servers of both the APS Active Directory Nodes. That’s all!

Uploading data to Windows Azure Blob Storage

APS Polybase supports various external file formats like Delimited Text, Optimized Row Columnar (ORC) and Record Columnar File (RcFile) each with its specific compression and approach ranging from Hive Serializer and Deserializers (SerDe) to the Snappy Codec to GZIP. To put them to the test we will use an APS with a relative low upload link of 2 MByte/sec to Azure. Another good reason to put the compression algorithms to the test!

Defining a Baseline

As a baseline we will upload 10 million rows from an APS table into Azure via an ‘Create Table As Select’ (CETAS) command.  As part of the CETAS command you have to specify the external data source where to store/pull the data from, the file format and compression type to use:

Setup external Data Source:

To list all already defined Data sources on the APS run: SELECT * FROM sys.external_file_formats

To Create a new External Data Source to connect to your Azure storage account, you have to specify the ‘LOCATION’ part properly: the first part is the container name and the second  the storage account name. The syntax will look something similar to:

CREATE EXTERNAL DATA SOURCE APS_WASB1
WITH (
      TYPE = HADOOP,
      LOCATION = 'wasbs://Henks_Container@henks_storage_account.blob.core.windows.net/'
     );

 

Setup a File format

To list all already defined File Formats on the APS run :  SELECT * from sys.external_data_sources

To create a new external File Format to support TextDelimited:

 CREATE EXTERNAL FILE FORMAT TextDelimited
WITH (
      FORMAT_TYPE = DELIMITEDTEXT
     )

 

And now we are good to go to run a CETAS Baseline test

---------------------------------------------------------------
-- TEST 1: BASELINE: 
-- Exporting 10 Million rows to Windows Azure blob Storage
---------------------------------------------------------------

CREATE EXTERNAL TABLE [Azure_10Million_Row_Baseline]
WITH 
    (
      LOCATION='/LineItem10Mill_Base',
      DATA_SOURCE = APS_WASB1,
      FILE_FORMAT = TextDelimited,
      REJECT_TYPE = VALUE,
      REJECT_VALUE = 10
    ) 
 AS SELECT * FROM [10Million_CCI_REPL]

Via the APS Admin console we monitor the progression and note the Query ID (QID60313220):

It took 11 minutes and 39 seconds to upload the 10 million rows to Azure.

With as result a regular table that we can query as any other SQL database table but with the data stored in Azure!

image

 

Microsoft Azure –Block- Blob Storage

To monitor the Azure side there is a great utility on CodePlex called Azure Storage Explorer. To set it up you will need your Azure Storage account name and -key once more.

If you take a close look at the filename that got created you will notice that the filename actually contains the APS QueryID number as part of the file naming convention as well as the date_time stamp. This makes it very easy to trace where the data originated from! 

Our CETAS baseline export of 10 million rows resulted in a file of 1.75GB in size (== effective 2 MByte/sec upload):

ASE

 

Adding compression to CETAS

As part of the ‘Create External Table AS Select’ you can specify a compression method so the data will be compressed before it’s uploaded cross the wire to Azure. APS supports a variety of compression codecs with the various file formats. The Text Delimited file format with either one of the 2 supported compression algorithms (the ‘default codec’ and  ‘Gzip codec’) provide both a great improvement high throughput rate and also an okay data compression ratio.

Adding the Data compression method to a Delimited Text file format called ‘TextDelimited_GZIP’:

 CREATE EXTERNAL FILE FORMAT TextDelimited_GZIP
WITH (
    FORMAT_TYPE = DELIMITEDTEXT,
    FORMAT_OPTIONS 
    ( 
      FIELD_TERMINATOR = '\t'
    ),
    DATA_COMPRESSION = 'org.apache.hadoop.io.compress.GzipCodec'
)

To start a CETAS with the GZIP compression method, just include this new file format in the statement:

-----------------------------------------------------
-- Test 2: GZIP text delimited
-----------------------------------------------------

CREATE
EXTERNAL TABLE [Azure_10Mill_TextDelimited_GZIP] WITH ( LOCATION='/LineItem10Mill_TextDelimited_GZIP', DATA_SOURCE = APS_WASB1, FILE_FORMAT = TextDelimited_GZIP, REJECT_TYPE = VALUE, REJECT_VALUE = 10 ) AS SELECT * FROM [10Million_CCI_REPL]

When we look via the APS Admin console at the MPP query plan generated for the CETAS command something special shows: as very last step of the data export statistics are being created; APS Polybase adds statistical information to exported data to make smarter decisions once we start querying the data!

The filename of the data uploaded into Azure reflects the GZIP format; the file extension ‘.gz’ is added automatically. It took only 2minutes and 48 seconds to upload the same 10 million rows; that means 4.1 times faster upload! Also the file size decreased from 1.35 GByte (officially) to only 414.5 MBytes. With the GZIP method we achieved an 3.4x reduction in size!

 

APS automatically adds Parallelism to CETAS

When you export data from an APS distributed table where data is striped across multiple servers and tables the CETAS command will automatically start for every table distribution an upload into a separate file: In the example below an 8 node APS with 64 distributions will create 64 Gzipped data files in parallel, each containing a fraction of the dataset:

CETAS of a Distribution table creates parallelism automatically

If you would like to export all data from an distributed table into a single file instead, just specify as part of your CETAS command a SELECT TOP (..) *; the TOP () will serialize the upload request. (Under the hood the Data Movement Service (DMS) from the APS Control Node will take over control and upload the data into a single file in the blob container). 

 

SSDT

The data we just uploaded can be queried as any other regular SQL table; to view all the External Tables available in a database just expand the ‘Tables’ folder in SSDT and view the names of all External Tables. Use a table naming convention to easily identify where the external data resides since you will query these external tables transparently as any other table. Data stored in Windows Azure Blob Storage and/or Hadoop can be joined and queried like if the data would have reside within the database. Polybase provides a virtual data layer.

SSDT

 

For each of the 3 supported external file formats and the various data compression methods I recorded the time it took to upload the 10 million rows during at least 2 test runs. The best upload time and file size is recorded in the table below:

Wrap-up

The APS Polybase feature is a very powerful option that enables access to data stored outside of the SQL database; it enables you to query, import and export data stored in external Hadoop data sources by using standard SQL statements. It allows you to refer to data that resides in either a distributed Hadoop file system or in the cloud via Microsoft Azure Blob Storage (WASB) as a virtual data layer.

To increase data transport throughput rates Polybase supports various data compression methods to reduce the time needed to upload data by a factor 4! Also the storage capacity needed reduces significant. Also parallelism is created automatically where possible to maximize the data throughput!

GD Star Rating
loading...
GD Star Rating
loading...

Data Merge made easy with the APS Upsert

Comparing and merging datasets are typically time consuming operations when data volumes increase: the traditional approaches to either load data into a staging table and compare the content against its destination table or using an ETL tool to run the compare both have a downside: there is a lot of data movement going on. You have to touch the data at least twice. When you have to design a solution that can handle large volumes of data or when you are running out of time already there is an alternative!

The Microsoft Analytics Platform System (APS) offers a very powerful feature to Upsert data (=insert or update), read from a flat file or ETL data source directly, into a distributed table with just a single command: data gets loaded at maximum speed into multiple small temporary tables and each of them will get merged in parallel against the final destination table distributions without locking the destination table!

How does it work?

To start at the beginning, let’s look into the merge concept in its basic form: imagine you have to update a table to accurate reflect sales information details; when products have been returned to a store and money has been returned to your customers this means the sales history table needs to be updated accordingly. Typically you will load all incoming sales records into a staging table or ODS and update the DWH at a later stage with a merge command (a great feature that has been introduced in SQL2008). This means you will read all the new sales transactions you have received and for each new order number a record will get inserted into the destination table and, when an order number already exists, the entire record needs to be compared and updated accordingly.

 

CREATE TABLE [Blog].[dbo].[lineitem_CCI_Blog] (
    [l_ordernumber] int NOT NULL, 
    [l_shipdate] datetime NOT NULL, 
    [l_orderkey] int NOT NULL, 
    [l_quantity] int NOT NULL, 
    [l_returnflag] char(1) )
WITH ( Clustered Columnstore Index, DISTRIBUTION = HASH([l_ordernumber]));

 

Source data:

image

Imagine the following 2 records have to be processed what results in 1 updated and 1 new record:

2015001|2015-05-20|4|8|2| – Update

2015004|2015-05-22|1|5|0| –- Insert

With a single APS DWLoader command you can start a direct merge on any given key column specified with the -K option to base the merge on: If the merge key exists in the destination table, the row is updated. If the merge key doesn’t exist in the destination table, the row is appended.

dwloader.exe -i h:\blog\delta.tbl -M UPSERT -K L_ordernumber -S 172.16.254.1 -E -c -rt value -rv 100 -R G:\newdata\lineItem.tbl.rejects -e ascii -t "|" -r \r\n  -T blog.dbo.lineitem_blog 

Command output:

[2015-05-24 11:36:18] Starting Load

[2015-05-24 11:36:18] Connected to Microsoft SQL Server 2012 Parallel Data Warehouse (10.0.5108.1)

[2015-05-24 11:36:18] Load has started

[2015-05-24 11:36:18] Status: Running, Run Id: 112 – Total Rows Processed: 0, Total Rows Rejected: 0

[2015-05-24 11:36:20] Status: Completed, Run Id: 112 – Total Rows Processed: 2, Total Rows Rejected: 0

[2015-05-24 11:36:20] Load is complete

Result:

image

How to handle multiple updates to the same record

Assume we would like to process the following 2 changes: an update the shipdate column from 5/20 to 5/21, followed by an update to both shipdate and l_orderkey:

2015001|2015-05-21|4|8|2|  –Update 1

2015001|2015-05-20|3|8|2|  –Update 2

This means multiple changes to the same record have to be applied; if they are both within the same source file the DWLoader Upsert command will abort with the following error message:

Status: Error, Run Id: 119 – Error Code: 8672 – Message: The MERGE statement attempted to UPDATE or DELETE the same row more than once. This happens when a target row matches more than one source row. A MERGE statement cannot UPDATE/DELETE the same row of the target table multiple times. Refine the ON clause to ensure a target row matches at most one source row, or use the GROUP BY clause to group the source rows.

[2015-06-18 22:03:07] Load has Failed.

The chance that this will happen in the real world is pretty large so we will have to take precautions by selecting multiple columns to form a unique key which should represent a unique row. This way, when there are multiple updates to the same record we will record and keep them both.

Putting the APS Upsert to the test

It’s time to pull out the 75GB/600 Million lineitem flat file once more and merge the contents with a 763GB CCI table. With the upsert –m option specified APS performs and commits loads in parallel and a merge operation will be executed for each distribution table.

(Note: without this option only a single merge operation will get started per APS compute node and to process the 8 distributions sequentially, the merge phase will take 7 times longer).

Let’s create a unique merge key based upon 4 columns:

dwloader.exe -i h:\75\lineitem.tbl -M UPSERT –m

-K l_shipdate,l_orderkey,L_discount,l_suppkey

-S 172.16.254.1 -E –c -rt value -rv 100

-R G:\TPCH\lineItem.tbl.rejects -e ascii -t "|" -r \r\n

-T blog.dbo.lineitem_763GB_2

 

Via the APS admin console we can track the progress and the merge operation on 600 million rows only took 31 minutes in total to complete! 5minutes 17seconds to stage the data (DMS_LOAD phase), followed by 26 minutes to merge the data and insert 600 Million rows (LOAD_INSERT phase) on a 4 node APS. The duration of the LOAD_INSERT phase will depend on the size of the destination table and the quality of the data (please read the paragraph below on trailing spaces!).

APS Admin Console - Easy tracking of data merge operations

During the CREATE STAGING phase an empty table with the DDL copied of the destination table is created which will be dropped during the LOAD_CLEANUP phase.

a separate distributed temp table is created to prestage data

All 8 distributions in each compute node will run a merge statement in parallel. Most data will typically be in memory already with APS, but just by monitoring SSMS I’ve seen 270+ MB/sec of data being read from disk per compute node when needed.

APS merge operation uncovered

 

ETL integration

Besides the dwloader utility to read from (gzipped) flatfiles you can also use the SSIS PDW Destination adapter or Informatica’s Powercenter to integrate the upsert functionality into existing ETL processes.

To compose the merge key hit the tick boxes in the ‘K’ column in front of the input column:

SSIS PDW Destination Adapter

 

For distributed tables the table distribution key has to be part of the merge key. If you don’t include it the upsert operation will fail and the follow error message will be displayed:

[SQL Server PDW Destination [75]] Error: An error occurred with the following error message: "The server could not initialize the load. Details: The KeyColumn parameter must include the Distribution Column when doing an UPSERT operation".  

Trailing Spaces

The most frequently transmitted character –ever- across a wire over the last decades must be –by far- the ‘space’ character; the APS DWLoader utility is very much forgiving for data type incompatibilities and is optimized to trim unnecessary characters at the source before starting to load the data into the APS database:

Merging a ~400GB file with 1Billion rows that primarily contains record updates and many spaces, like shown in the picture above, against a 7.2 Billion row table completes within 4.5 hours;  where ~3 hours extra is needed to remove the spaces at the source.

Admin console - load pane details

 

Wrap up

If you spend a lot of time to execute data merge operations it might be worth checking the source data of your most time consuming jobs for trailing spaces; a quick win might be around the corner! When you see the amount of data to process coming your way is steadily increasing or you when have to reduce the amount of time it takes to merge data quickly, consider the Massive parallel processing (MPP) data merge capabilities of the Microsoft Analytics Platform System (APS) to handle lots of data, all in one shot, for you!

GD Star Rating
loading...
GD Star Rating
loading...
Better Tag Cloud