Pipes and Filters Pattern

Requirements

To understand the article, basic knowledge of OOP and Visual studio is required along with basic C# programming knowledge.

I have tried to explain the article in such a fashion that simulates the typical software project's environment. The program developed in this article does not actually publish a message to any queue. Just to simulate the message being encrypted and priority set, I have added some properties in the Message to support them. But in an actual project we might be doing the job in a different manner.

Existing system

Company ABC wants to build a new application called “Messenger” that publishes the messages to MSMQ. They are expecting two new consumers to this application as soon as it is developed. One consumer will be publishing sensitive data and so expects the message to be encrypted while publishing it to the Queue and they also want to set the priority for the message. The other consumer will be publishing simple text data that does not need any encryption and only sets the message priority.

New requests from other consumers are expected soon and the application should be built in such a way as to incorporate any changes to the existing architecture in the future.

So the application manager approaches the technical team with the following problem statement:

I want to publish a message to a queue and it should undergo certain transformations or perform certain operations before publishing it.

I want my code design to support adding new operations or removing the existing operations.


Then the Architect/Technical lead jumps in and proposes the “Pipes and Filters Pattern” as the solution to the problem and provides the design to the development team with the high-level details as in Figure 1.

Publishing message to the Queue
Figure 1: Publishing message to the Queue

The color depictions used in the preceding figure are that an incoming message is in the Green color and is transformed to the Blue color after the message priority is set and then transformed to an Orange color after the message is encrypted and sent as an output.

High-Level Details

  • The Pipes and Filters architectural pattern is one of the Messaging Patterns that help in splitting a large set of operations on a message into various processes so that each process can work on the message independently and complete the transformation of the message.

  • Here each process will be called “Filters” and they are connected using the channels or connectors called “Pipes”.

  • All the filters implement a common interface so that all will conform to the contract that they are supposed to work on.

  • Message from the source will be set as input to the process and the output or result of the process will be sent as an input to another process and so on until it reaches the sink or destination process. (Note: Here the terms “Process” and “Filters” are used interchangeably).

  • In the preceding diagram the first process sets the message priority by taking the incoming message using a pipe and then sends the output to another process to encrypt the message.

  • Then the outgoing message is published to the Queue.

Advantages and disadvantages of this architectural pattern

  • All the filters or operations are independent and each can be plugged into or unplugged without affecting the other one. So this is a great feature that can be specialized for each consumer of the feature.

  • Because the filters are independent of each other, you cannot establish any communication between the filters and the message can be sent sequentially, in other words in a linear fashion.

Then the architect provides the low-level design for implanting the solution as is in Figure 2.

Class Diagram

Class diagram
Figure 2: Class Diagram

Class Diagram in detail

All operations expose one interface so that they comply with the interface implementation. In our example, Setting Message Priority and Encrypting the Message are the two operations or filters that must be implemented on the source message before publishing.

So we created an interface “IOperation<T>” that supports the generic data type so that the implementing classes adhere to the type it should support and implement the method of the interface “Execute(T input)” that takes the generic type T as the input and returns the same type. This also helps in various types to implement the same interface. This interface has only one method, “Execute”, that should be implemented by the implementing classes. This method takes an input parameter type, performs operations and returns the same type.

Note: Since we are working with messages, a message will always be the input and output for the filter or operation methods and so our method Execute also supports that.

Now getting to the operations

We need to create classes, each performing one operation and that takes the same input type and returns the same type. Depending on the requirements, we need to create two classes “EncryptMessage” and “MessagePriority” that encrypts and sets the priority for the message.

Now there must be some way that tells which operations must be registered from the incoming message. This job will be done by the pipeline classes. So we created an abstract base class “Pipelinebase” for the pipeline that supports generic types and it has methods for registering the operations and performing the operations. Depending on our requirements, we need one pipeline that sends a message from the client to the messaging queue and so we named our pipeline class “SendPipeline” that inherits the behavior of the base class “Pipeline” and also created an interface that defines our message called “IMessage” and our SendPipeline class supports the classes that implement the IMessage interface.

Also the SendPipeline class should have a constructor that takes Boolean parameters for each message operation. Since we have only two operations, we are creating a constructor that takes two Boolean parameters. If you have more parameters then it is advisable to use the enums with the flag attribute or a type instance that carries all the setting properties.

Then the development team comes with the coding implementation depending on the class diagram and specifications provided by the architect as in the following section.

Coding in Action

IOperation interface

This interface supports a generic type and all the classes that implement this interface will be implementing the Execute method that takes and returns the same type.

  1. namespace PipesAndFiltersExample  
  2. {  
  3.     /// <summary>  
  4.     /// Operation Interface  
  5.     /// All operations must implement the interface  
  6.     /// </summary>  
  7.     /// <typeparam name="T">Data type supported in the operation</typeparam>  
  8.     public interface IOperation<T>  
  9.     {  
  10.         /// <summary>  
  11.         /// Executes the operation  
  12.         /// </summary>  
  13.         /// <param name="input">The Input Parameter</param>  
  14.         /// <returns>Type defined for the operation</returns>  
  15.         T Execute(T input);  
  16.     }  
  17. }  
EncryptMessage class

This class encrypts the incoming message and implements the interface IOperation<IMessage>.
  1. namespace PipesAndFiltersExample  
  2. {  
  3.     /// <summary>  
  4.     /// Encrypts the message  
  5.     /// </summary>  
  6.     class EncryptMessage : IOperation<IMessage>  
  7.     {  
  8.         /// <summary>  
  9.         /// Executes the operation on the message  
  10.         /// </summary>  
  11.         /// <param name="input">The Input Message</param>  
  12.         /// <returns>Type of IMessage</returns>  
  13.         public IMessage Execute(IMessage input)  
  14.         {  
  15.             return Encrypt(input);  
  16.         }  
  17.   
  18.         /// <summary>  
  19.         /// Encrypts the message  
  20.         /// </summary>  
  21.         /// <param name="input">The Input Message</param>  
  22.         /// <returns>Type of IMessage</returns>  
  23.         private IMessage Encrypt(IMessage input)  
  24.         {  
  25.             input.IsEncrypted = true;  
  26.             //Encryption code  
  27.             return input;  
  28.         }  
  29.     }  
  30. }  
MessagePriority class

This class sets the message priority and implements the interface IOperation<IMessage>.
  1. namespace PipesAndFiltersExample  
  2. {  
  3.     /// <summary>  
  4.     /// Sets Message priority to the incoming message  
  5.     /// </summary>  
  6.     class MessagePriority:IOperation<IMessage>  
  7.     {  
  8.         /// <summary>  
  9.         /// Executes the operations  
  10.         /// </summary>  
  11.         /// <param name="input">The input message</param>  
  12.         /// <returns>Message with the priority set</returns>  
  13.         public IMessage Execute(IMessage input)  
  14.         {  
  15.             input.Priority = 1;  
  16.   
  17.             return input;  
  18.         }  
  19.     }  
  20. }  
IMessage interface
  1. namespace PipesAndFiltersExample  
  2. {  
  3.     /// <summary>  
  4.     /// Message Interface  
  5.     /// </summary>  
  6.     public interface IMessage  
  7.     {  
  8.         /// <summary>  
  9.         /// Priority of the message  
  10.         /// </summary>  
  11.         int Priority{get;set;}  
  12.   
  13.         /// <summary>  
  14.         /// Field that needs to be implemented by the  
  15.         /// implementing the classes  
  16.         /// </summary>  
  17.         bool IsEncrypted { getset; }  
  18.   
  19.         /// <summary>  
  20.         /// Tells if the message is encrypted  
  21.         /// </summary>  
  22.         /// <returns></returns>  
  23.         bool IsMessageEncrypted();  
  24.   
  25.         /// <summary>  
  26.         /// Tells if the priority is set  
  27.         /// </summary>  
  28.         /// <returns></returns>  
  29.         bool IsMessagePrioritySet();  
  30.   
  31.         /// <summary>  
  32.         /// Message Id of the message  
  33.         /// </summary>  
  34.         string MessageId {get;}  
  35.   
  36.         /// <summary>  
  37.         /// Body of the message  
  38.         /// </summary>  
  39.         string Body { get; }  
  40.   
  41.         /// <summary>  
  42.         /// Header of the message  
  43.         /// </summary>  
  44.         string Header { get; }  
  45.   
  46.         /// <summary>  
  47.         /// Subject of the message  
  48.         /// </summary>  
  49.         string Subject { get; }  
  50.     }  
  51. }  
Message class
  1. namespace PipesAndFiltersExample  
  2. {  
  3.     /// <summary>  
  4.     /// Message class  
  5.     /// </summary>  
  6.     public class Message:IMessage  
  7.     {  
  8.         /// <summary>  
  9.         /// boolean variables  
  10.         /// </summary>  
  11.         private string messageId;  
  12.         private string body;  
  13.         private string header;  
  14.         private string subject;  
  15.   
  16.         /// <summary>  
  17.         /// Properties to be set  
  18.         /// </summary>  
  19.         public int Priority { getset; }  
  20.   
  21.   
  22.         public bool IsEncrypted { getset; }  
  23.   
  24.         /// <summary>  
  25.         /// Property that returns MessageId  
  26.         /// </summary>  
  27.         public string MessageId  
  28.         {  
  29.             get { return messageId; }  
  30.         }  
  31.   
  32.         /// <summary>  
  33.         /// Property that returns body of the message  
  34.         /// </summary>  
  35.         public string Body  
  36.         {  
  37.             get { return body; }  
  38.         }  
  39.   
  40.         /// <summary>  
  41.         /// Property that returns the header of the message  
  42.         /// </summary>  
  43.         public string Header  
  44.         {  
  45.             get { return header; }  
  46.         }  
  47.   
  48.         /// <summary>  
  49.         /// Property that returns subject of the message  
  50.         /// </summary>  
  51.         public string Subject  
  52.         {  
  53.             get { return subject; }  
  54.         }  
  55.   
  56.         /// <summary>  
  57.         /// Constructor for the class Message  
  58.         /// </summary>  
  59.         /// <param name="messageId">The MessageId</param>  
  60.         /// <param name="body">The Body</param>  
  61.         /// <param name="header">The Header</param>  
  62.         /// <param name="subject">The Subject</param>  
  63.         public Message(string messageId, string body, string header, string subject)  
  64.         {  
  65.             this.messageId = messageId;  
  66.             this.body = body;  
  67.             this.header = header;  
  68.             this.subject = subject;  
  69.         }  
  70.   
  71.         /// <summary>  
  72.         /// Informs if the message is encrypted  
  73.         /// </summary>  
  74.         /// <returns>The value true/false</returns>  
  75.         public bool IsMessageEncrypted()  
  76.         {  
  77.             return IsEncrypted;  
  78.         }  
  79.   
  80.         /// <summary>  
  81.         /// Informs if the message priority set  
  82.         /// </summary>  
  83.         /// <returns>The value true/falses</returns>  
  84.         public bool IsMessagePrioritySet()  
  85.         {  
  86.             return (Priority != 0);  
  87.         }  
  88.   
  89.          
  90.   
  91.       
  92.     }  
  93. }  
Pipelinebase abstract class

This class has the two methods, Register and PerformOperation. The Register method takes IOperation<T> as an input and adds it to the list type operations in the class, so based on the choice of the clients, all the required operations will be added to the list using the Register method and then the PerformOperation method loops through all the operations and executes the method “Execute” of each operation. Each operation takes an input message and does the operation and then the obtained message will be sent to another operation and likewise until all the operations are completed.
  1. using System.Collections.Generic;  
  2. using System.Linq;  
  3. namespace PipesAndFiltersExample  
  4. {  
  5.     /// <summary>  
  6.     /// Base class for the pipeline classes  
  7.     /// </summary>  
  8.     /// <typeparam name="T">Type supported by the Pipeline base</typeparam>  
  9.     public abstract class Pipelinebase<T>  
  10.     {  
  11.         private readonly List<IOperation<T>> operations = new List<IOperation<T>>();  
  12.   
  13.         /// <summary>  
  14.         /// Registers the operation  
  15.         /// </summary>  
  16.         /// <param name="operation">The operation</param>  
  17.         /// <returns>The instance of the class</returns>  
  18.         public Pipelinebase<T> Register(IOperation<T> operation)  
  19.         {  
  20.             this.operations.Add(operation);  
  21.             return this;  
  22.         }  
  23.   
  24.         /// <summary>  
  25.         /// Perform the operation  
  26.         /// </summary>  
  27.         /// <param name="input">The input message</param>  
  28.         /// <returns>The instance of the class</returns>  
  29.         public T PerformOperation(T input)  
  30.         {  
  31.             return this.operations.Aggregate(input, (current, operation) => operation.Execute(current));  
  32.         }  
  33.     }  
  34. }  
SendPipeline class

This class has a constructor that takes two parameters, bool setPriority, bool encryptMessage and registers the operation classes based on the parameter values.
  1. namespace PipesAndFiltersExample  
  2. {  
  3.     /// <summary>  
  4.     /// Pipeline that sends message to the destination by performing operations  
  5.     /// on the incoming message  
  6.     /// </summary>  
  7.     public class SendPipeline : Pipelinebase<IMessage>  
  8.     {  
  9.         /// <summary>  
  10.         /// Constructor for the class where the operations are registered  
  11.         /// for the messages  
  12.         /// </summary>  
  13.         /// <param name="setPriority">The Set Priority parameter</param>  
  14.         /// <param name="encryptMessage">The Encrypt Message parameter</param>  
  15.         public SendPipeline(bool setPriority, bool encryptMessage)  
  16.         {  
  17.             if (setPriority)  
  18.             {  
  19.                 Register(new MessagePriority());  
  20.             }  
  21.   
  22.             if (encryptMessage)  
  23.             {  
  24.                 Register(new EncryptMessage());  
  25.             }  
  26.         }  
  27.     }  
  28. }  
Two console applications have been created to show the two clients. One client wants to encrypt and set the message priority and the other just wants the message priority to be set.

Client 1 program
  1. using PipesAndFiltersExample;  
  2. using System;  
  3.   
  4. namespace Client1  
  5. {  
  6.     class Program  
  7.     {  
  8.         static void Main(string[] args)  
  9.         {  
  10.             IMessage message = new Message(messageId:"1",   
  11.                 body:"This is the message body",  
  12.                 header:"Header Information",  
  13.                 subject:"Please set priority and encrypt the message");  
  14.               
  15.             //sending code to the pipeline and the message is encrypted and published into the queue  
  16.             SendPipeline sendPipeline = new SendPipeline(true,true);  
  17.             var publishedMessage = sendPipeline.PerformOperation(message);  
  18.             Console.WriteLine("I am client1 and my messages should be prioritized and encrypted!!!");  
  19.             Console.WriteLine("My message Priority Set? {0}", publishedMessage.IsMessagePrioritySet());  
  20.             Console.WriteLine("My message Encrypted? {0}", publishedMessage.IsMessageEncrypted());  
  21.             Console.WriteLine();  
  22.            
  23.             Console.Read();  
  24.         }  
  25.     }  
  26. }  
Client 2 program
  1. using PipesAndFiltersExample;  
  2. using System;  
  3.   
  4. namespace Client2  
  5. {  
  6.     class Program  
  7.     {  
  8.         static void Main(string[] args)  
  9.         {  
  10.             IMessage message = new Message(messageId: "1",  
  11.                 body: "This is the message body",  
  12.                 header: "Header Information",  
  13.                 subject: "Please set priority and encrypt the message");  
  14.               
  15.             //sending code to the pipeline and the message is encrypted and published into the queue  
  16.             SendPipeline sendPipeline = new SendPipeline(truefalse);  
  17.             var publishedMessage = sendPipeline.PerformOperation(message);  
  18.             Console.WriteLine("I am client2 and my messages should only be prioritized and not encrypted!!!");  
  19.             Console.WriteLine("My message Priority Set? {0}", publishedMessage.IsMessagePrioritySet());  
  20.             Console.WriteLine("My message Encrypted? {0}", publishedMessage.IsMessageEncrypted());  
  21.             Console.WriteLine();  
  22.                           
  23.             Console.Read();  
  24.         }  
  25.     }  
  26. }  
When you run the code, you should see output as in Figure 3.

Client 1 program

Client 1 Program output shows that the message
Figure 3: Client 1 Program output shows that the message is encrypted and the priority is set

Client 2 program

Client 2 Program output shows that the message priority
Figure 4: Client 2 Program output shows that the message priority is set but not encrypted

You can start two projects by right-clicking the solution and selecting “Set StartUp Projects” and “Start” for the two projects in the “Action” column and you should be good to run two console applications at one shot as in the following screenshot:

click Set StartUp Projects

Set StartUp Projects