How to Implement Continuous Querying with NCache?

Introduction

Let's assume we are building a web application for a seat reservation system that users use to book seats for some event. The application needs to be highly responsive to the seats available for an event and must reflect the change in available seats in real time for a given event.

To build such an application, we must ensure that the result set for a query is always available in the cache in order to serve data faster to the users, and any change in the backend (a seat reserved or canceled) must be immediately reflected across the cache so that users can see the updated data immediately.

What are Continuous Queries?

Continuous Queries is a technique that helps keep our application cache in sync with the underlying data source efficiently. It allows you to monitor and react to changes in cached data in real time so that the cache always reflects the most up-to-date information.

Advantages of Continuous Queries

Continuous querying in a cache offers several advantages, which makes it a useful strategy while building applications.

  • Real-time Data Synchronization
  • Live Dashboards and Analytics
  • Reduced Latency
  • Efficient Resource Utilization

Caching Providers that support Continuous Querying

Continuous Querying is a complex but powerful feature that is supported by many popular Cache Providers such as NCache, Redis, Hazelcast, Apache Ignite, Memcached, Couchbase, etc.

Among these caching providers, NCache is a feature-rich, .NET-based caching solution that supports continuous querying out of the box.

In this detailed article, let us discuss how we can implement continuous querying with NCache.

Implementing Continuous Querying with NCache


So, how does Continuous Querying work?

We can implement Continuous querying in four simple steps.

  1. Query Specification: Define a query that specifies the conditions for data we are interested in within the cache. This query can be based on various criteria, such as keys, attributes, or even complex data patterns.
  2. Subscription: Register the query with the cache provider. The cache begins monitoring the cached data for changes that match the query.
  3. Change Notifications: When data in the cache that satisfies the criteria we have defined changes, the cache provider sends real-time notifications to the subscribed application.
  4. Response: The application can then respond to these notifications immediately, ensuring that it always operates with the latest data.

Changes in the result set of a query can be triggered by the following operations.

  • Adding an item in the cache or updating any existing item that brings it into the query result set
  • Updating any existing cache item but remains in the query result set
  • Removing an item from the cache or updating any existing cached item such that it causes item removal from the query result set. It can also happen due to expiration, eviction, dependency change, or explicit API removal.

Query Indexing

One prerequisite to using continuous queries is that you need to configure query indexes over the data on which the querying will be applied. A Query Index is similar to why you use indexes over an SQL table - for performance and faster querying. In NCache, you will create a query index over the type of objects you are storing in the cache or a specific attribute of that type.

This will be created at the cache level - using the NCache Management Dashboard or by Powershell.

You can find more information about how to create Query Indexes here - https://www.alachisoft.com/resources/docs/ncache/admin-guide/configure-query-index.html

Implementing Continuous Querying with NCache with an Example

As mentioned above, implementing Continuous queries involves four steps.

  1. Create a Query specification
  2. Create a Subscription based on the Query
  3. Receive Notifications
  4. Unsubscribe to the Notifications

We can follow the above steps and create continuous queries as below.

1. Install the necessary packages required for the NCache Client libraries. You can follow this link for the prerequisite packages - https://www.alachisoft.com/resources/docs/ncache/prog-guide/client-side-api-pre-reqs.html?tabs =net

2. Once the packages are installed and libraries are ready to use, we will first create a query that filters out the data in the cache and for which we are interested in receiving notifications on any change. Then we create a continuous query for that particular query over the cache.

The below code creates a continuous query on the cached dataset over an indexed property of the records called category. It returns the created continuous query instance.

public ContinuousQuery RegisterCQ(string categoryValue)
{
    // Query for required operation
    string query = "SELECT $VALUE$ FROM Product WHERE Category = ?";
    // Create query command and add parameters
    var queryCommand = new QueryCommand(query);
    queryCommand.Parameters.Add("Category", categoryValue);
    // Create Continuous Query
    var cQuery = new ContinuousQuery(queryCommand);
    // Item add, update, remove notification
    // EventDataFilter.DataWithMetadata returns the cache keys added
    cQuery.RegisterNotification(
        new QueryDataNotificationCallback(OnChangeInQueryResultSet),
        EventType.ItemAdded | EventType.ItemUpdated | EventType.ItemRemoved,
        EventDataFilter.DataWithMetadata
    );
    // Register continuousQuery on server
    _cache.MessagingService.RegisterCQ(cQuery);
    return cQuery;
}

3. When any change over the data that matches with the specified query happens, the continuous query calls the OnChangeInQueryResultSet callback method that is registered via the QueryDataNotificationCallback object. This callback function looks like below.

Since we have registered this callback for all the event types - Add, Update, and Remove; we will handle each of these based on the CQEventArg parameter value.

public void OnChangeInQueryResultSet(string key, CQEventArg arg)
{
    switch (arg.EventType)
    {
        case EventType.ItemAdded:
            Console.WriteLine(
                $"Item with key '{key}' has been added to result set of continuous query"
            );
            break;
        case EventType.ItemUpdated:
            Console.WriteLine(
                $"Item with key '{key}' has been updated in the result set of continuous query"
            );

            // Get updated Product object
            // Item can be used if EventDataFilter is DataWithMetadata or Metadata 
            if (arg.Item != null)
            {
                Product updatedProduct = arg.Item.GetValue<Product>();
                Console.WriteLine(
                    $"Updated product '{updatedProduct.ProductName}' with key '{key}' has ID '{updatedProduct.ProductID}'"
                );
            }
            break;
        case EventType.ItemRemoved:
            Console.WriteLine(
                $"Item with key '{key}' has been removed from result set of continuous query"
            );
            break;
    }
}

4. When we are no longer interested in receiving notifications about a particular event type within a continuous query, we can do so by unregistering for the callback. In the below code block, we will pass the continuous query instance that we have received after calling the RegisterCQ method along with the EventType enum that we wish to unregister.

public void UnRegisterNotifications(ContinuousQuery cQuery, EventType eventType)
{
    // Unregister notifications for ItemAdded events from continuous query
    cQuery.UnRegisterNotification(
        new QueryDataNotificationCallback(OnChangeInQueryResultSet),
        eventType
    );
}

5. If we are no longer interested in the continuous query itself, we can also unregister from it, by which we will no longer receive notifications for any notifications under that continuous query. The below code block shows us how to do so.

public void UnRegisterCQ(ContinuousQuery cQuery)
{
    // Unregister Continuous Query from server
    _cache.MessagingService.UnRegisterCQ(cQuery);
}

The total sequence of calls can happen as below.

// create an instance of the Handler class
var cqHandler = new MyProductContinuousQueryHandler();
// create a continuous query on a particular
// category of the data set in the cache
var cQuery = cqHandler.RegisterCQ("Books");
// we will start receiving callback logs on the OnChangeInQueryResultSet method from here on
// unsubscribe from a particular event type
cqHandler.UnRegisterNotifications(EventType.ItemAdded);
// unregister the query itself
cqHandler.UnRegisterCQ(cQuery);

The complete class is as below.

public class MyProductContinuousQueryHandler
{
    public MyProductContinuousQueryHandler()
    {
        _cache = NCache.GetCache(ApplicationConstants.CACHE_NAME);
    }

    public void OnChangeInQueryResultSet(string key, CQEventArg arg)
    {
        switch (arg.EventType)
        {
            case EventType.ItemAdded:
                Console.WriteLine(
                    $"Item with key '{key}' has been added to result set of continuous query"
                );
                break;
            case EventType.ItemUpdated:
                Console.WriteLine(
                    $"Item with key '{key}' has been updated in the result set of continuous query"
                );

                // Get updated Product object
                // Item can be used if EventDataFilter is DataWithMetadata or Metadata if (arg.Item != null)
                {
                    Product updatedProduct = arg.Item.GetValue<Product>();
                    Console.WriteLine(
                        $"Updated product '{updatedProduct.ProductName}' with key '{key}' has ID '{updatedProduct.ProductID}'"
                    );
                }
                break;
            case EventType.ItemRemoved:
                Console.WriteLine(
                    $"Item with key '{key}' has been removed from result set of continuous query"
                );
                break;
        }
    }

    public ContinuousQuery RegisterCQ(string categoryValue)
    {
        // Query for required operation
        string query = "SELECT $VALUE$ FROM Product WHERE Category = ?";
        // Create query command and add parameters
        var queryCommand = new QueryCommand(query);
        queryCommand.Parameters.Add("Category", categoryValue);
        // Create Continuous Query
        var cQuery = new ContinuousQuery(queryCommand);
        // Item add, update, remove notification
        // EventDataFilter.DataWithMetadata returns the cache keys added
        cQuery.RegisterNotification(
            new QueryDataNotificationCallback(OnChangeInQueryResultSet),
            EventType.ItemAdded | EventType.ItemUpdated | EventType.ItemRemoved,
            EventDataFilter.DataWithMetadata
        );
        // Register continuousQuery on server
        _cache.MessagingService.RegisterCQ(cQuery);
        return cQuery;
    }

    public void UnRegisterNotifications(ContinuousQuery cQuery, EventType eventType)
    {
        // Unregister notifications for ItemAdded events from continuous query
        cQuery.UnRegisterNotification(
            new QueryDataNotificationCallback(OnChangeInQueryResultSet),
            eventType
        );
    }

    public void UnRegisterCQ(ContinuousQuery cQuery)
    {
        // Unregister Continuous Query from server
        _cache.MessagingService.UnRegisterCQ(cQuery);
    }
}

Conclusion

Continuous querying is a powerful technique that enables our application to receive real-time data notifications over any data in the cache we are interested in. It helps in keeping our application updated with the latest data, which helps with reduced latency and efficient resource utilization. Several cache providers - Redis, Hazelcast, Apache Ignite, Memcached, Couchbase, and NCache offer support for continuous queries with different features and capabilities.

In this article, we have looked into how we can implement continuous queries with NCache in a simple four-step process. You can checkout the official documentation, which provides further information regarding the same: https://www.alachisoft.com/resources/docs/ncache/prog-guide/continuous-query-overview.html