Merging Data Using A Queuing Pattern

In my recent article Using Materialized Views Effectively, I offered a few ways to populate data stores that can be critical performance-improvers for your apps and BI environments that use SQL Server. I showed how we could incrementally improve the loading process for these specialized tables - starting from a simple TRUNCATE / INSERT, to a MERGE statement, to a MERGE filtered based on “since last load.” This article covers another method that I’ve been using in production situations, with good success.

In my recent article Using Materialized Views Effectively, I offered a few ways to populate data stores that can be critical performance-improvers for your apps and BI environments that use SQL Server. I showed how we could incrementally improve the loading process for these specialized tables - starting from a simple TRUNCATE / INSERT, to a MERGE statement, to a MERGE filtered based on “since the last load.” There’s another method that I’ve been using in production situations, with good success. In the remainder of this article, I’ll walk through both the implementation of this queued merge and its advantages, using a realistic test harness that I’ve made available on GitHub as one way to demonstrate and compare.

The Concept

In my previous article, I worked with imaginary “widgets” which are tracked in our SQL Server database using event records – widget arrival, departure, and cancellation events. I noted that perhaps our widget management application cares about every arrival and departure, but it cares most about the current state of each widget: when did it last arrive? When did it last depart? The basic schema and query to do this is shown here.

  1. CREATE TABLE [Source].[EventType] (  
  2. EventTypeID tinyint NOT NULL IDENTITY PRIMARY KEY,  
  3. EventTypeCode varchar(20) NOT NULL,  
  4. EventTypeDesc varchar(100) NOT NULL)  
  5. GO  
  6.   
  7. INSERT [Source].[EventType] (EventTypeCode, EventTypeDesc) VALUES ('ARRIVE''Widget Arrival');  
  8. INSERT [Source].[EventType] (EventTypeCode, EventTypeDesc) VALUES ('CAN_ARRIVE''Cancel Widget Arrival');  
  9. INSERT [Source].[EventType] (EventTypeCode, EventTypeDesc) VALUES ('LEAVE''Widget Depart');  
  10. INSERT [Source].[EventType] (EventTypeCode, EventTypeDesc) VALUES ('CAN_LEAVE''Cancel Widget Depart');  
  11. GO  
  12.    
  13. CREATE TABLE [Source].[Event] (  
  14. WidgetID int NOT NULL,  
  15. EventTypeID tinyint NOT NULL REFERENCES [Source].[EventType] (EventTypeID),  
  16. TripID int NOT NULL,  
  17. EventDate datetime NOT NULL,  
  18. PRIMARY KEY (WidgetID, EventTypeID, EventDate, TripID))  
  19. GO  
  20.   
  21. SELECT  
  22.     lw.WidgetID  
  23.     , la.LastTripID  
  24.     , lw.LastEventDate  
  25.     , la.ArrivalDate  
  26.     , (SELECT MAX(de.EventDate)  
  27.         FROM [Source].[Event] de  
  28.         WHERE de.EventTypeID = 3  
  29.         AND de.WidgetID = lw.WidgetID  
  30.         AND de.TripID = la.LastTripID  
  31.         AND NOT EXISTS  
  32.             (SELECT 0  
  33.             FROM [Source].[Event] dc  
  34.             WHERE lw.WidgetID = dc.WidgetID  
  35.             AND la.LastTripID = dc.TripID  
  36.             AND dc.EventTypeID = 4  
  37.             AND dc.EventDate > de.EventDate)) AS DepartureDate  
  38. FROM  
  39.     (SELECT  
  40.         e.WidgetID  
  41.         , MAX(e.EventDate) AS LastEventDate  
  42.     FROM  
  43.         [Source].[Event] e  
  44.     GROUP BY  
  45.         e.WidgetID) lw  
  46.     LEFT OUTER JOIN  
  47.     (SELECT  
  48.         ae.WidgetID  
  49.         , ae.TripID AS LastTripID  
  50.         , ae.EventDate AS ArrivalDate  
  51.     FROM  
  52.         [Source].[Event] ae  
  53.     WHERE  
  54.         ae.EventTypeID = 1  
  55.     AND ae.EventDate =  
  56.         (SELECT MAX(la.EventDate)  
  57.         FROM [Source].[Event] la  
  58.         WHERE la.EventTypeID = 1  
  59.         AND la.WidgetID = ae.WidgetID  
  60.         AND NOT EXISTS  
  61.             (SELECT 0  
  62.             FROM [Source].[Event] ac  
  63.             WHERE la.WidgetID = ac.WidgetID  
  64.             AND la.TripID = ac.TripID  
  65.             AND ac.EventTypeID = 2  
  66.             AND ac.EventDate > la.EventDate))) AS la ON lw.WidgetID = la.WidgetID  
  67. GO  

The complexity looks high, and the case was made to persist the query results into a table, on a scheduled basis. (We’re using subqueries and outer joins so indexed views are out-of-bounds here.) The test harness I offered on GitHub can be used to show the performance of trying to merge updates to this table using different approaches. The fastest option was to use a MERGE statement that filters to look at only widgets where there could be a possible change, based on the fact we’ve got a “last updated date” available to tell when any change has occurred that might affect results.

Let’s consider a similar situation: You’ve got a query that’s even more complex – perhaps joining 15+ tables - and given the nature of these tables, the determination of the “latest change” is harder. You might have to do a CASE statement or use a user-defined function (UDF) to encapsulate the need to find the greatest modification date among many tables. When comparing a “last run date” with this calculation to determine the maximum date among many, there’s a good chance you’ll suffer nasty performance degradation. How can we avoid this?

If our source query is efficient when run for a single “primary entity” (a widget in my example), then we can take advantage of that fact by creating a work queue that holds a simple list of entities (by key) and then “work the queue” – either one-by-one or using parallelism. We could, for example, create a queue table that looks like this.

  1. CREATE TABLE dbo.WidgetLatestStateQueue  
  2. (WidgetID int NOT NULL PRIMARY KEY)  

If we knew that our source query might be affected by an update to a related table, say “WidgetJob”, we might include the population of this queue based on a check of changes within the WidgetJob table that happened since our “last run date”,

  1. INSERT dbo.WidgetLatestStateQueue (WidgetID)  
  2. SELECT DISTINCT j.WidgetID  
  3. FROM dbo.WidgetJob j  
  4. WHERE j.LastUpdatedDate >= @since  
  5. AND NOT EXISTS (SELECT 0 FROM dbo.WidgetLatestStateQueue q WHERE q.WidgetID = j.WidgetID);  

This pattern can be repeated to handle multiple tables. This approach obviously relies on having a “last updated date” as a standardized audit field used throughout your app (for more on that, see my article Auditing Data Changes in Microsoft SQL Server). Having a non-clustered index on that field is often important for the performance of this queue population step.

Now that we have a singular list that’s based on the primary entity key, we can perform a MERGE that’s targeted directly by key. Review your execution plan to see if this offers a good benefit – if your plan isn’t improved by using a key value, this approach is likely not going to bring joy. (In fact, the GitHub harness illustrates that performance gains aren’t a “sure thing”!)

Implementation Details

If we wanted to encapsulate both the queue population and queue processing into a stored procedure, we could end up scheduling the stored procedure using something like SQL Agent, and we’d be done - if we’re happy with the row-by-row processing of the queue. What might this type of stored procedure look like? I prefer to parameterize it with an optional start date, usually.

  1. CREATE PROCEDURE dbo.up_WidgetLatestStateQueue_ProcessAll  
  2.     @start datetime2 = NULL  
  3. AS  
  4. BEGIN  
  5. EXEC dbo.up_WidgetLatestStateQueue_Populate @start;  
  6. EXEC dbo.up_WidgetLatestStateQueue_Process;  
  7. END  
  8. GO  
  9.   
  10. CREATE PROCEDURE dbo.up_WidgetLatestStateQueue_Populate  
  11.     @start datetime2 = NULL  
  12. AS  
  13. BEGIN  
  14. IF @start IS NULL  
  15.         SET @start = DATEADD(dd, -1, SYSDATETIME());  
  16.   
  17. INSERT dbo.WidgetLatestStateQueue (WidgetID)  
  18. SELECT DISTINCT e.WidgetID  
  19. FROM   
  20.         [Source].[Event] e  
  21. WHERE  
  22.         e.EventDate >= @start  
  23. AND NOT EXISTS (SELECT 0 FROM dbo.WidgetLatestStateQueue q WHERE e.WidgetID = q.WidgetID);  
  24. END  
  25. GO  

This affords a way to load historical data, such as we might want to do when we first introduce the materialized view and it’s empty. When the value is NULL, I might instead calculate the start date based on a trailing 24 hours if I know the job is running say every few hours. This might lead to some unnecessary reprocessing of records that would not cause a material change, but it also gives some leeway if the job fails for some reason: if it’s running again within a day, there’s no need to go back and explicitly reload from a start date. If the process is sufficiently fast, you’re trading some extra CPU and I/O for peace-of-mind, and you can adjust this type of “dial” to suit your preferences.

The procedure logic turns into two main steps: queue population and queue processing. Queue population can involve multiple INSERTS – all of which should check that the keys being inserted do not already exist in the queue. As a clustered primary key, our lookups in the queue will be efficient.

The queue processing step can be as simple as this,

  1. CREATE PROCEDURE dbo.up_WidgetLatestStateQueue_Process  
  2. AS  
  3. BEGIN  
  4. DECLARE @WidgetID int;  
  5. SELECT TOP 1 @WidgetID = WidgetID FROM dbo.WidgetLatestStateQueue;  
  6.   
  7. WHILE @WidgetID IS NOT NULL  
  8. BEGIN  
  9.         EXEC dbo.up_WidgetLatestStateQueue_ByID @WidgetID;  
  10.   
  11. DELETE dbo.WidgetLatestStateQueue WHERE WidgetID = @WidgetID;  
  12.   
  13.         SET @WidgetID = NULL;  
  14.         SELECT TOP 1 @WidgetID = WidgetID FROM dbo.WidgetLatestStateQueue;  
  15. END  
  16. END  
  17. GO  
  18.   
  19. CREATE PROCEDURE dbo.up_WidgetLatestStateQueue_ByID  
  20.     @WidgetID int  
  21. AS  
  22. BEGIN  
  23. MERGE [Dest].[WidgetLatestState] AS a  
  24.  USING (  
  25.  SELECT  
  26.    v.[WidgetID]  
  27.         , v.[LastTripID]  
  28.         , v.[LastEventDate]  
  29.         , v.[ArrivalDate]  
  30.         , v.[DepartureDate]  
  31.  FROM  
  32.    [Dest].[uv_WidgetLatestState] v  
  33.  WHERE  
  34.    v.WidgetID = @WidgetID  
  35.  ) AS T  
  36.  ON  
  37.  (  
  38.    a.[WidgetID] = t.[WidgetID]  
  39.  )  
  40. WHEN MATCHED   
  41.      AND t.ArrivalDate IS NOT NULL  
  42.      AND ((a.[LastTripID] <> CONVERT(int, t.[LastTripID]))  
  43.           OR (a.[LastEventDate] <> CONVERT(datetime, t.[LastEventDate]))  
  44.           OR (a.[ArrivalDate] <> CONVERT(datetime, t.[ArrivalDate]))  
  45.           OR (a.[DepartureDate] <> CONVERT(datetime, t.[DepartureDate]) OR (a.[DepartureDate] IS NULL AND t.[DepartureDate] IS NOT NULLOR (a.[DepartureDate] IS NOT NULL AND t.[DepartureDate] IS NULL))) THEN  
  46.      UPDATE  
  47.       SET LastTripID = t.LastTripID  
  48.         , LastEventDate = t.LastEventDate  
  49.         , ArrivalDate = t.ArrivalDate  
  50.         , DepartureDate = t.DepartureDate  
  51. WHEN NOT MATCHED BY TARGET AND t.ArrivalDate IS NOT NULL THEN  
  52.       INSERT (  
  53.         WidgetID  
  54.         , LastTripID  
  55.         , LastEventDate  
  56.         , ArrivalDate  
  57.         , DepartureDate  
  58.       ) VALUES (  
  59.         t.[WidgetID]  
  60.         , t.[LastTripID]  
  61.         , t.[LastEventDate]  
  62.         , t.[ArrivalDate]  
  63.         , t.[DepartureDate]  
  64.       )  
  65. WHEN MATCHED AND t.ArrivalDate IS NULL THEN  
  66.      DELETE;  
  67. END  
  68. GO  

Notice we’re pulling an entity key from the queue into a local variable which becomes our main filter against our source query for our MERGE, and then once the MERGE has succeeded, we can DELETE the value from the queue safely and continue until there’s nothing left to do. This implementation simply loops until the queue table is empty.

Nifty Benefits

One of the cool benefits of this approach is you’ve effectively instrumented your merge process. You could run this,

  1. SELECT COUNT(*) FROM dbo.WidgetLatestStateQueue 

… and you’d know how many records are left to process. (It’s geeky fun to see the number spike, then bleed down to zero.) I like to additionally log the number of expected rows to process prior to the processing loop, so you could even get a “percent complete” if you wanted to.

Another benefit can be reduced contention. If you’re doing a massive, big-bang merge/load, you might end up doing some heavy locking whereas, with this approach, your locks are very finite. In the real world in a couple of cases, it was the contention that drove me to this solution more than performance – and with the change to row-by-row, problems were reduced.

A third benefit is that we can selectively control how much data to refresh - with no code changes. Say, for example, we make a database change that adds a new table to our “big join” – say WidgetJobAttribute (a child of WidgetJob) – we might add an INSERT for our queue based on the changes in this table (like for other tables involved), but also, perform a one-time load/update of data based on this INSERT (i.e. overall job attributes that relate to all widgets).

  1. INSERT dbo.WidgetLatestStateQueue (WidgetID)  
  2. SELECT DISTINCT j.WidgetID  
  3. FROM dbo.WidgetJob j  
  4.     JOIN dbo.WidgetJobAttribute a  
  5.         ON j.WidgetJobID = a.WidgetJobID  
  6. WHERE NOT EXISTS (SELECT 0 FROM dbo.WidgetLatestStateQueue q WHERE q.WidgetID = j.WidgetID);  

Despite the benefits, a side-effect of this approach is we’ll see data appearing and/or changing bit by bit. In other words, data consistency is sacrificed here, as is acknowledged in this paper: on materialized views. One way I’ve sometimes dealt with this concern is to build my queue at a higher level of granularity than my materialized view. For example, say we have a materialized view to support our WidgetJob entity (one row per job). Instead of queuing changes by WidgetJobID (the primary key of WidgetJob), we might queue based on WidgetID instead. When merging data in this case, we’d see all WidgetJob updates for a given widget, using a single MERGE invocation. There are other design tricks to mitigate concerns, but your requirements will drive what tricks can be used and whether queuing is even feasible or not. (Having a BISM as another layer on top of your database offers another nice way to mitigate this problem.)

Comparisons and the “Real-World”

To get some concrete performance figures, I’ve taken a real-world example and tested it in three different ways, under similar load. This is not my widgets' example – I wanted a bigger, complex data set (while solving a real production requirement at the same time!).

The first approach is using a multi-step MERGE for what I’d consider a “big-bang” update. I did this by populating key values into a table that serves as a candidate list of items to process, but instead of processing items individually, I join the “to process” table to the source query and MERGE (source query to target materialized view) using all ~280,000 rows in one step. The main difference compared to the fastest approach presented in my last article is this includes the tuning step of determining the list of candidate items to insert/update ahead of time to avoid a complex query against our “last updated date” over many tables. This works – but takes 108 minutes, which is our baseline for this exercise. Note, of these 108 minutes, five seconds are what’s expended in populating the candidate list – so that aspect is clearly not expensive.

Next, I tried switching to a “queue” and record-by-record processing, largely following what’s described in the previous sections. The performance here for the same 280,000 “keys” is 55.5 minutes - so clearly, better for this workload and server load. Of note: expect greater variability in performance with this approach, in practice, based on server load. Also, if your “big bang” is usually a small number of rows or relatively simple, the benefits of row-by-row diminishes significantly. I have at times been pushed to use row-by-row knowing that I needed to one-time load millions of rows of historical data, where “big bang” was proven to be nearly impossible, but row-by-row worked like a champ.

Finally, I took out the SQL WHILE loop to process queue entries and replaced it with a SSIS package step that looks like the one highlighted here.

Merging Data Using A Queuing Pattern 

This is a script task – so what’s the core script for this?

  1. using(var conn = new SqlConnection(Dts.Variables["User::BI_ADONET_Connection"].Value.ToString())) {  
  2.     conn.Open();  
  3.     DataView keys = null;  
  4.     while (keys == null || keys.Count > 0) {  
  5.         using(var da = new SqlDataAdapter("SELECT TOP 8 JobID FROM dbo.JobSummaryQueue", conn)) {  
  6.             DataTable dt = new DataTable();  
  7.             da.Fill(dt);  
  8.             keys = dt.DefaultView;  
  9.         }  
  10.         if (keys.Count > 0) {  
  11.             Parallel.ForEach(keys.Cast < DataRowView > (), (kdrv) => {  
  12.                 using(var connInner = new SqlConnection(Dts.Variables["User::BI_ADONET_Connection"].Value.ToString())) {  
  13.                     connInner.Open();  
  14.                     using(var cmd = new SqlCommand("dbo.up_JobSummary_ByJobID", connInner)) {  
  15.                         cmd.CommandType = CommandType.StoredProcedure;  
  16.                         cmd.Parameters.AddWithValue("JobID", kdrv["JobID"]);  
  17.                         cmd.ExecuteNonQuery();  
  18.                     }  
  19.                 }  
  20.             });  
  21.         }  
  22.     }  
  23. }  

After passing in my connection string as a variable (settable via package configurations, for example), you can see how I’ve taken advantage of the .NET Parallel class to launch up to 8 concurrent threads (based on the “TOP 8”), each of which receives its own entity ID (jobs in this case) to process. The job ID is passed to a stored procedure which is doing our MERGE, filtered by JobID, and then deletes the input JobID from the queue.

There’s more overhead here since unlike a SQL WHILE loop that can execute all within SQL Server, this approach requires out-of-process communication using multiple connections. However, the timing does not lie: 12.3 minutes to process all 280,000 queue entries (89% improvement over baseline)!

The ideal level of parallelism is, once again, a big “it depends” – mostly on how much your environment can handle reasonably. Eight in my example is fine, but that’s because I have the capacity to support eight additional concurrent users – which is effectively what these requests look like to the system.

Should we care that much about speed in a largely-batch BI environment? I vote “yes”: I’ve seen this solve multiple types of problems by quietly shifting away from “real-time” data that was problematic to query, over to “near-real-time” BI data that’s much easier to query and everyone has remained happy about the timeliness of data since it can be refreshed on the scale of minutes instead of hours (or days).

The creation of this SSIS package is something that I discuss in more detail in another article, where we can leverage quite a bit of metadata and conventions-based naming to generate the required .dtsx file with no SSIS coding per se.