Skip to main content

Pipes and Filters Architecture

In Linux, when we want to combine results of varies commands to filer out our desired result we use pipe. We feed output of one program as the input of the next. This is fine example of Pipes and Filters pattern.

$ ps aux | grep java

Pipes and filters is a very helpful architectural design pattern for stream of data. Also, it is helpfull when there are data transformations through the sequence.




It consists of number of components called Filters that transform data before handing over to next Filter via connectros called Pipes.
  • Filter - transforms or filters data it receives via the pipe.
  • Pipe - is the connector that passes data from and to filters.
  • Pump - is the data producer.
  • Sink - is the end consumer of the target data.
A pipeline consists of a chain of processing elements, arranged so that the output of each element is the input of the next (Wikipedia).

Filter can be a small class as well as a big component.
Input, output can be decided by the project needs. Whether you use fixed modles, or use input as output or use fully individual input, outputs, it all depends on your need.

Even related operations can be divided into smaller, manageable units by using this pattern. I have used (and even written by myself) classes where business logic is composed by multiple private methods. Class become very big and very difficult to unit test. It needs to mock all dependencies to test an unit.

It is true that it looks weired at first glance. Why separate known logic into separate Steps and plug at runtime? When it comes to maintainance, readability, extendability, etc this is very flexible. (Well, I have failed attempt to emphasize others about this, may be try another day).

Here is the base implementation. You can find full source code in my github account.

package com.slmanju.pipes;

public interface Filter<I, O> {

  O execute(I input);

}

package com.slmanju.pipes;

class Pipe<I, O, P> implements Filter<I, P> {

  private Filter<I, O> current;
  private Filter<O, P> next;

  Pipe(Filter<I, O> current, Filter<O, P> next) {
    this.current = current;
    this.next = next;
  }

  public P execute(I input) {
    return next.execute(current.execute(input));
  }

}

package com.slmanju.pipes;

class Pipeline<I, O> {

  private Filter<I, O> current;

  Pipeline(Filter<I, O> current) {
    this.current = current;
  }
  

<P> Pipeline<I, P> pipe(Filter<O, P> next) { return new Pipeline<>(new Pipe<>(current, next)); } // Using functional programming.

<P> Pipeline<I, P> pipe2(Filter<O, P> next) { return new Pipeline<>((input -> next.execute(current.execute(input)))); } O process(I input) { return current.execute(input); } }

Pipe vs Chain

At first glance, someone can think that Pipe and Filters is as same as Chain of Responsibility. However, these are completely different.
In Pipes and Filters, we divide complex operations into manageable units. All the filters need to be executed to get the final result.
In Chain of Responsibility, flow is divided into small handlers. Only (mostly) the related handler will be excecuted for a given input.

Advantages

  • Individual components ensures loose coupling.
  • Loose coupling ensures indipendent development.
  • Modification in one filter does not affect others.
  • Re-usable components.
  • Highly unit testable.

Disadvantages

  • Should carefully design the flow.
  • Can degrage the performance due to individual behaviour.
  • Mutating the input will not fit for all the situations.

Source Code

References


Comments