Using the Intercepting Filter Pattern to create a Generic Reusable Processing Pipeline

by Jon Wojtowicz

A pipeline is a common architectural style for sequential processing of data based on the pipes and filters, interceptor or intercepting filter patterns. This implementation is based on the intercepting filter pattern.



A pipeline can be thought of as water flowing through a pipe. The pipe connects a series of stages where processing occurs. Because all the stages use the same interface they can be composed into different solutions by connecting the stages to different pipes. This allows for adding, omitting or rearranging stages without having to change the individual stages themselves.

The problem this is trying to solve is how do implement a configurable series of steps for processing data. Typically this logic is composed of hard coding the sequence and many if or case statements to implement the logic. By breaking down the pieces of the sequence simpler classes can be designed that will be ignorant of the other processing steps. This allows developers to focus on the small parts of the solution and not get overwhelmed by the entire problem. Another advantage of smaller component parts greater reuse and a pluggable architecture.

A pipeline is for simple linear processing and is not orchestration. An orchestration provides conditional branching where a pipeline infers a linear flow from stage to stage. A typical scenario for a pipeline would be in order processing. This may include checking inventory, validating credit card information, applying sales tax, calculating shipping charges, etc prior to saving the final order. Another possible use can be in decompressing and decrypting a stream before processing. This is also the pattern used for applying IHttpModules to an ASP.Net request.

Structure

The following is the structure of the pipeline.

Sequence

PipelineManager

The PipelineManager manages filter processing. It contains the FilterChain and initiates processing. It also calls the processor that will complete the processing.

FilterChain

The FilterChain is an ordered collection of independent filters.

FilterOne, FilterTwo, FilterThree

These are the individual filters. The FilterChain coordinates their processing.

CoreProcessor

The CoreProcessor processes the result of the filters.

Pipeline Class Design

The pipeline is comprised of four main classes, PipelineManager, FilterChain, IFilter and ICoreProcessor which are elaborated on later. The pipeline accepts the data in as a reference parameter and returns a Boolean indicating success or failure in the processing steps. During the processing the data can be modified or completely replaced with new data. The second parameter can be used to indicate that processing should stop on failure, a step returning false.

The intercepting filter pattern uses a controller to call each stage in the pipeline as opposed to the other pattern implementations which chains the stages into a linked list where each stage calls the next. In my opinion this is more advantageous since it prevents the issue of a developer forgetting to call the next stage in their filter code.

The classes for the Pipeline are as shown.

The IFilter interfaces provide the processing stages for each pipeline. The ref data parameter allows the data to be completely replaced during the processing. The Process method returns a Boolean value to indicate success or failure with the step. This can be useful in determine if processing needs to be terminated due to a failed step. It also allows the pipeline to be used as a non-classic Chain of Responsibility.

public interface IFilter<T>
{
bool Process(ref T data);
}

The FilterChain is an extension of the List class. It is responsible for executing the individual filters in the pipeline. An optional parameter can be used to indicate if it should stop on any filter returning a false (failure).

public class FilterChain<T> : List<IFilter<T>>
{
internal bool Process(ref T data, bool stopOnFailure)
{
bool success = true;
foreach (IFilter<T> processor in this)
{
if (!processor.Process(ref data))
false;
if (stopOnFailure && !success)
break;
}
return success;
}

internal bool Process(ref T data)
{
return Process(ref data, false);
}
}

The PipelineManager contains the filter chain and the final processor of the data. This is the class that should be used by the client when using the pipeline. On a design note, the final processor was implemented as an interface rather than a delegate to be more explicit and preserve the OO design. This can be implemented with a delegate with minimal changes to the code. The processor can also be null if no further processing is required.

public class PipelineManager<T>
{
private FilterChain<T> filters;
private ICoreProcessor<T> processor;

public PipelineManager()
{
filters = new FilterChain<T>();
}

public PipelineManager(FilterChain<T> filters)
{
this.filters = filters;
}

public FilterChain<T> Filters
{
get { return filters; }
}

public ICoreProcessor<T> Processor
{
get { return processor; }
set { processor = value; }
}

public bool ProcessFilter(ref T data, bool stopOnFailure)
{
bool success = filters.Process(ref data, stopOnFailure);
if (!stopOnFailure && success && processor != null)
processor.Execute(data);
return success;
}

public bool ProcessFilter(ref T data)
{
return ProcessFilter(ref data, false);
}
}

The ICoreProcessor interface provides the final target for the data. It is used by the FilterManager.

public interface ICoreProcessor<T>
{
void Execute(T data);
}

Creating Pipelines from a Configuration File

To increase the usefulness of the pipelines they should be allowed to be specified in a configuration file. This will allow each stage to become a pluggable part of the pipeline. Several classes were added to allow the specification within a configuration. The configuration classes and the pipeline factory are as follows.

The configuration section uses the add/remove/clear semantics for the list of processing stages in a pipeline. The type of pipeline created from a particular section must match the type of stages specified in the configuration. This results in a configuration section that has the following structure.

<filterPipeline>
<handlers>
<add type=" PipelineTest.StringFilter, PipelineTest" />
</handlers>
</filterPipeline>

One of the issues in loading from a configuration file was determining the actual type a pipeline can handle. Generics require the classes for each type be created at compile time. This issue was resolved by using a generic method in a factory which forces the pipeline user to determine the types that will be handled. The factory contains one method for creating the pipeline from the configuration.

Using a configuration file for the stage determination allows for a very flexible processing framework. This allows for completely reconfiguring the pipeline without modifying the code. Since the name of the configuration section for each pipeline is specified in the client code, multiple pipelines can be created and used within a single application.

This pattern can be used with other patterns to create a powerful and flexible framework. Download the sample code, written in Visual Studio 2005, and look at this simple yet powerful pattern.

Download the Visual Studio 2005 Solution that accompanies this article

Jon Wojtowicz is a C# MVP and a Systems Analyst at a large insurance company in Chattanooga, TN where he currently provides developer support and internal training. He has worked as a consultant working with Microsoft Technologies. This includes ASP, COM, VB6 and .Net, both C# and VB.Net since Beta 1. He has been an MCSD since 1999 and an MCT since 2000. Prior to getting a degree in Computer science he worked as a process engineer focusing on process automation, programmable controllers and equipment installations. In his spare time he likes woodworking and gardening.
Article Discussion: