# Filters And Pipes Reloaded

## Abstract

The very interesting article FiltersSourcesAndSinks by DiegoNehab together with section 9.2 of PIL (http://www.lua.org/pil/9.2.html) inspired me to experiment with pipes and think about notation issues and what makes pipes different from what we are used to work with in imperative programming. In the article I will discuss what pipes have in common with chained function or method calls, and what makes them different. I will then present and continually evolve a set of functions for working with pipes in Lua. Finally I will use some metaprogramming to get a more pipe-like notation.

## The Tao of Data Transformation

Many applications of computers involve the transformation of data from one representation into another, possibly including various forms of mixing and matching with other data. Such transformations can naturally grow more or less complex.

Complex data transformations - as any other algorithm - are easier to understand if they can be decomposed into smaller pieces, each of which performs a simple and well defined task. Additional power results if such pieces can be built in a reusable way, forming a set of building blocks from which complex transformations can be easily assembled.

The simplest form of composition is probably applying a number of transformations in sequence, that is using the result of every transformation as input to the next one. Depending on the environment we are working in, the notations and inner workings differ:

If you are working with functions, you will probably write something like

```result = transformation3( transformation2( transformation1( data ) ) )
```

This doesn't look very intuitive, since the transformations are written in the opposite order they are applied. (If they take additional parameters, the notation tends to get even worse.) The notation, however, doesn't change the essence of what's going on and can be enhanced without changing that. If you have a mechanism for function composition, you can write something similar to the following to have the transformations in the order they are applied:

```result = chain( transformation1, transformation2, transformation3 )( data )
```

(Using more higher order functions you can even stuff additional parameters with their transformations.)

Further, if you get help from the compiler or some preprocessor that turns, for instance, the | operator into calling its right operand with the left one as parameter, you could achieve an even more intuitive notation, putting the data you start with at the beginning of the chain and possibly writing something like

```result = data | transformation1 | transformation2 | transformation3
```

and, if you liked, with even more help you could even drop the familiar assignment notation and put the receiver for the result at the end of the chain to end up with a notation that looks much like a UNIX pipe, showing how data travels through a series of transformations like this

```data > transformation1 > transformation2 > transformation3 > result
```

or something similar. Note though, that all this doesn't change what's happening under the hood: a simple sequence of wrapped function invocations!

In object oriented programming, including mechanisms like C# extension methods, you can have the sequenced notation we saw above for free using chained method calls:

```result = data.transformation1().transformation2().transformation3()
```

The parentheses in this notation also make it fairly visible that the transformation "pipe" through which our data "flows" is simply a series of function calls. (By the way, the fact that in OO the possible transformations are bundled as methods with their input data types is no limitation whatsoever but only an implementation detail.)

Coming to the heart of our discussion, UNIX has brought to us a fundamentally different concept of applying a sequence of transformations to data: UNIX pipes. They come out of the box with an intuitive piping notation like the one we created above (without actually having pipes then!):

```data > transformation1 | transformation2 | transformation3 > result
```

We will see shortly what constitutes the fundamental difference of pipes to function or method call chains, but let's note first that all three mechanisms, independent of notation, reflect the same principle: applying a sequence of transformations to some data.

Said that, there is one aspect that fundamentally distinguishes pipes from function or method calls, and, as we could see above it's neither the notation, nor is it visible from it: Pipes process data in chunks. This applies to the individual stages of the pipeline (called filters) as well as to the pipeline as a whole. Only the amount of data that is actually needed for producing the next output is held in memory or other temporary storage at any stage, and no intermediate results are assembled and stored between the stages whatsoever.

This property makes an important distinction. Not only are pipes able to process unlimited amounts of data, they can even process infinite streams of data such as sensor measurements or the like, yielding results as soon as they are available. This wouldn't be possible using function or method calls since neither can an unlimited sequence of data be collected in advance to processing it, nor can there be any result before the end of the data stream, meaning that there can never be any result.

This difference causes a fundamental difference in the interface by which the transformations are connected within pipes. While function or method calls communicate by parameters and return values, filters communicate by receiving and sending chunks of data. An important aspect of this interface is the inherently decoupled nature of input and output, meaning that not every input needs to yield output, and not every output needs to be caused by input. (We could say that a filter is a generalization of a flattening operation and the opposite of it, and of a mix between the two.)

The way pipes work is thus fundamentally different from the synchronous nature of function calls and makes it impossible to implement a pipe as a simple loop of read-transform-transform-transform-write operations like this:

```-- Constructs a pipe connecting the given functions. The pipe will read data
-- by calling its first parameter, then transform it by invoking all given
-- functions in sequence, and output the result by calling its second parameter.
-- This will continue until input is exhausted.
-- THIS IS NOT A REAL PIPE!
function pipe( ... )
local filters = arg
return function( input, output )
for data in input do
local current = data
for _, filter in ipairs( filters ) do
current = filter( current )
end
output( current )
end
end
end

p = pipe( transformation1, transformation2, transformation3 )
p( io.lines, print )
```

Such an implementation will work only for simplest transformations where every input will be turned into exactly one output on any stage. This is not what filters in the general case do. (This doesn't mean, however, that pipes are capable of doing transformations that are impossible using function calls: If all data to be transformed can be collected and given to the transformation as a whole, it can as well be processed using ordinary function calls - the difference will be mostly in memory footprint. There is, albeit, a difference for infinite data streams. At the other hand, the individual stages of a pipe very well can and commonly do use ordinary functions to accomplish their processing tasks. This is because they are free to collect as much data as necessary as input for a concrete function call.)

Diego implements filters using a function call interface and addresses the impedance mismatch between the decoupled nature of input and output of filters vs. the synchronous nature of function calls by using a special protocol that allows for non-output and non-input "filter" calls. While this complicates not only the scheduler, but more importantly the filter implementations as well, it works very well for the sort of processing he targets with his framework.

More straightforward implementations of the filter algorithms become possible when filters are implemented as separate processes, each having its own control loop, reading input and writing output by calling system functions. This is the way filters are implemented in UNIX, and in Lua we can use coroutines to achieve the same effect. All scheduling and state-keeping is left to the runtime, so filter writers can concentrate on their algorithm only, using a natural and easy notation. Let's see how we can implement pipes and filters in Lua.

## Exploring Filters and Pipes

Before we begin, a word about the naming convention I use, so you hopefully won't get confused by the variable names in the code below: A prefix of "p" denotes a function parameter, a prefix of "l" a local variable, a prefix of "s" a static (module global) variable, and a prefix of "g" a global variable (not used here). Function names have no prefix (what definitly doesn't mean that a function couldn't be held in a variable which mandatorily has a prefix). Your preferences may vary, but I get a couple of important advantages from this naming convention:

• The scope of a variable is obvious from reading its name. This helps to understand the code.
• Calling a function held in a variable looks fundamentally different from a normal function call. This too helps understanding the code.
• There can't be any collisions with reserved words, library functions and other stuff like even my own functions. (Don't ever, for instance, name a variable or parameter `error` or `type`, but there can't be any problem if you name them `lError` or `pType` and so on.)
• I get local names for transformed function parameters for free. Why should I invent another name for the local variable different from the one I use for the parameter, when both denote the same thing? The following code, for instance, converts a boolean parameter that defaults to true to its real value (similar transformations of function parameters to local representations happen quite often):

```local lCondition = (pCondition == nil) or pCondition
```

The code I will present below is written for Lua 5.0.

Let's now first define the big picture and then see how we can implement it and see what we can do with it.

### The Big Picture

At first, we want to be able to write filters as functions taking as parameters two functions, for input and output. Upon execution they will run from begin to end, transforming all input into output. Input will be obtained by successively calling the input function, and any output will be written by calling the output function. And, instead of only strings we want to be able to pass multiple values of any type between filters. Thus, a filter implementation will roughly follow one of the following templates:

```function my_filter( pInput, pOutput )
for ... in pInput do
...
pOutput( ... )
...
end
```

or

```function my_filter( pInput, pOutput )
while ... do
...
... = pInput()
...
pOutput( ... )
...
end
```

The filters won't return any value until now.

Second, we want to be able to connect filters implemented this way into chains (pipes) that feed the output of one filter into the input of the next. The input of the first filter and the output of the last filter will be identical to the input and output of the pipe, thus, a pipe will itself be a filter and as such can be again combined with other filters or pipes. The code we want to write will look similar to the following, where transformation1, transformation2, transformation3 are filters implemented like shown above:

```p = pipe( transformation1, transformation2, transformation3 )
p( io.lines, print )
```

This example reads all lines from standard input, transforms this stream by piping it through the three filters in sequence, and prints the results to the console.

As you can see, in contrast to Diego's solution, we don't need any pumps for this to happen! This is interesting, and it results from the fact that our filters in contrast to Diego's ones are active entities, pumping their input to output themselves. (This is, by the way, the reason why we need coroutines to have multiple filters work together.)

### Implementing Pipes

As we said, we want to be able to pass multiple values of any type between filters. In addition, `nil`'s should be allowed too, and for this we define that the end of the data stream will be reached when the first of the values transferred by the input/output handshake is `nil`.

For this to implement in Lua 5.0 we need a simple utility function doing the opposite of Lua `unpack()` since simply wrapping a call into braces as usual to collect its return values won't do the job when there are `nil`'s:

```-- Inverse of Lua unpack(). Returns all its arguments packed into an array.
function pack( ... )
return arg
end
```

Now, let's implement pipes. The following function constructs a pipe. It takes a number of filters and returns a new filter representing the chain. The returned function, upon invocation, wraps all filters but the last one into coroutines and creates a function for each of them that resumes the coroutine, passing the filter's input as input and coroutine.yield as output, and returns the yielded value(s) (that is, the filter's output). This function, a generator, acts as the input for the next filter in the chain, the first filter gets the pipe's input as input. After building the chain, the last filter is invoked with its input and the output of the pipe, and what it returns is returned from the pipe (tail call). We don't need the returned value(s) until now but we will see later what this gives us.

```-- Creates a new filter acting as the concatenation of the given filters. A
-- filter is a function with two parameters, input and output, that obtains
-- its input by calling input and writes output by calling output. Optionally,
-- return values may be produced, the return values of the last filter being
-- returned by the pipe.
function pipe( ... )
local lFilters = arg
local lFilterCount = table.getn( lFilters )
return function( pInput, pOutput )
local lCurrentInput = pInput
for lFilterPosition = 1, lFilterCount - 1 do
local lCurrentFilter = coroutine.create( lFilters[ lFilterPosition ] )
local lPreviousInput = lCurrentInput
lCurrentInput = function()
local lResult = pack( coroutine.resume( lCurrentFilter, lPreviousInput, coroutine.yield ) )
local lSuccess = table.remove( lResult, 1 )
if lSuccess then
if coroutine.status( lCurrentFilter ) == "suspended" then
return unpack( lResult )
end
else
error( lResult[ 1 ] )
end
end
end
return lFilters[ lFilterCount ]( lCurrentInput, pOutput )
end
end
```

Some additional notes about the code above: First, when resuming the coroutines, we pass the filter's input and output. This would strongly be necessary only for the first resume that starts the execution of the coroutine. After that, the two arguments will be returned to the filter by every output call. Since they will be ignored by the filter, this doesn't hurt and we need not implement special handling. Second, since filters may return some value (even if we wouldn't have allowed it by contract), we have to check after every resume if the coroutine has yielded or has ended to decide if the result we got from the resume has to be output.

Note also that the pipe function doesn't build the filter chain itself, this is deferred until the filter is actually run. Otherwise it would not be possible to execute a pipe more than once, and it wouldn't be a filter in the strong meaning.

This small function is all we need to have pipes in Lua! Let's see what we can do with this.

### Sources, Sinks and Sealed Pipes

First, let's think about filters and where our data will come from and where it goes. As it turns out, our filter interface allows for two special types of filters: A filter that doesn't call its input parameter produces its output out of itself - it's a source. Conversely, a filter that doesn't call its output parameter only consumes input - it's a sink. Obviously sources and sinks can occur only at the beginning respective end of a pipe. A pipe beginning with a source is itself a source, one ending with a sink is itself a sink. A pipe beginning with a source and ending with a sink is a special case: it doesn't do any input or output, pumping upon invocation all data from the source to the sink in one rush. We'll look at those pipes shortly - they have a number of interesting properties. First let's examine where sources and sinks get their data from and where they put it to.

What could the input and output parameters in case of a source and sink, respectively, mean? They are not needed here for obtaining input or writing output, but they can be used for describing the (real) data source or destination to the filter (source or sink) as, for instance, the name of a file to read from or write to, or a string to pull data from by parsing it. The pipe containing these sources or sinks will have these parameters as its own input and output parameters, passing them to the source and sink unchanged.

Now, imagine for a moment that our sink is something that collects the data it receives into, for instance, a table, or that computes some aggregated value from it like a word count or average. This is where the return value of filters comes into play: Sinks can, instead of (or in addition to) using their output parameter, return a value (or possibly multiple values). The pipe (which is in this case also a sink) will return this result without modification.

Now, let's look again at pipes which are at the same time source and sink. As stated, they don't perform any input or output calls. We can call such pipes sealed because they keep their data flow inside and instead expose a function call interface: They communicate input and output by using parameter and return values. That is, they work effectively as ordinary functions with one or two parameters and possibly returning a result! Like functions, they can be freely invoked, upon every invocation "pumping" the data that was given them in one piece in the input parameter through itself and returning the result again in one piece (either as return value or putting it where specified by the output parameter). Note that there is also no state kept between successive data like in normal filter operation: Every input gets its own fresh pipe instance that "dies" when this data has been processed and the result returned. So, sealed pipes effectively turn pipes into functions!

Let's now see how using pipes looks in practice.

## Using Pipes

For experimentation we will define a simple data source yielding a sequence of integers starting with 1 and continuing until the limit it has been given as input parameter is reached:

```-- Produce a sequence of integers
function seq( pLimit, pOutput )
for lValue = 1, pLimit do
pOutput( lValue )
end
end
```

We can test this filter already. As it turns out (at least as long as `print()` gives us enough information about our values to be useful), we can invoke any filter or pipe for debugging purposes with its input and `print` as output to get its output printed to the console! Nifty, eh? Let's print the output of our number sequence source to the console:

```> seq( 4, print )
1
2
3
4
```

Now we will define a data sink computing the average of all values read from input:

```-- Compute the average from a sequence of numbers
function avg( pInput )
local lSum, lCount = 0, 0
for lValue in pInput do
lSum = lSum + lValue
lCount = lCount + 1
end
return lSum / lCount
end
```

Let's test it by feeding it the data produced by our data source. We will build a pipe for this:

```> average_of_sequence = pipe( seq, avg )
> return average_of_sequence( 4 )
2.5
```

Now, let's do some filtering. The following filter reads numbers from input and writes their squares to output:

```-- Produce the squares of the input values
function square( pInput, pOutput )
for lValue in pInput do
pOutput( lValue * lValue )
end
end
```

Using this filter, we'll build a pipe delivering a sequence of the first n squares, then append another square filter to it to get another pipe yielding a sequence of powers of 4, and finally we build a pipe from scratch to compute the average of the first n powers of 4:

```> seq_squares = pipe( seq, square )
> seq_squares( 4, print )
1
4
9
16
> seq_squares_squared = pipe( seq_squares, square )
> seq_squares_squared( 4, print )
1
16
81
256
> average_of_seq_of_powers = pipe( seq, square, square, avg )
> return average_of_seq_powers( 4 )
88.5
```

This apparently works very well, but our square filter indeed isn't that interesting: It doesn't make use of the decoupled input and output pipes provide. The transformations above could have been very well implemented using the simplified pipe implementation from the introduction. Let's thus write two other more interesting filters. The first one will output only the odd numbers read from input, and the second will collect its input into pairs:

```-- Let only odd numbers pass through
function odd( pInput, pOutput )
for lValue in pInput do
if math.mod( lValue, 2 ) == 1 then
pOutput( lValue )
end
end
end

-- Collect input into pairs
function pair( pInput, pOutput )
for lFirstValue in pInput do
local lSecondValue = pInput()
pOutput( lFirstValue, lSecondValue )
if lSecondValue == nil then
break
end
end
end
```

If we connect the filters with our number sequence source we see that they work as expected:

```> seq_odd = pipe( seq, odd )
> seq_odd( 4, print )
1
3
> seq_pairs = pipe( seq, pair )
> seq_pairs( 4, print )
1       2
3       4
```

We won't build an example for a filter now that calls output more often than input since we'll see this in action when we discuss flattening filters later on. For now let's have a look at how we can interface our pipes with code we might already have or intend to write.

## Interfacing Pipes with Existing Code

### Creating Sources and Sinks

As you might have noted, filters and pipes take a generator as their first (input) parameter. A number of functions is available that return generators like `io.lines()` or `string.gfind()`, which we can directly use as input for our pipes. To process, for instance, all numbers contained in the string `data` with the pipe `my_pipe` we could write:

```> my_pipe( string.gfind( data, "%d+" ), print )
```

It would be nice, however, if we could make a data source from `string.gfind`, so we could put this inside our pipe at its beginning, and instead write:

```> my_pipe( data, print )
```

The following utility function helps us with this, constructing a data source from a function returning a generator. Upon execution, the data source calls the given function and writes all data delivered by the returned generator to its output:

```-- Wraps a function returning a generator as data source
function source( pFunction )
return function( pInput, pOutput )
local lInput = pFunction( pInput )
while true do
local lData = pack( lInput() )
if lData[ 1 ] == nil then
break
end
pOutput( unpack( lData ) )
end
end
end
```

Driving by, for our convenience we define the following function that wraps a function taking many arguments and some values into a function taking only one argument that calls the given function with its argument followed by these values. Since most functions of the Lua library take the value to act on (the this value) as the first argument, this function gives us in Lua what partial function application gives in functional programming:

```-- Wraps a function returning a generator as data source
function curry( pFunction, ... )
return function( pParam )
return pFunction( pParam, unpack( arg ) )
end
end
```

Now we can build a data source that uses `string.gfind` with a pattern "%d+" to find all numbers contained in the string given as input parameter and write them to its output (as strings):

```> parse_numbers = source( curry( string.gfind, "%d+" ) )
> parse_numbers( "123 5 78 abc 12", print )
123
5
78
12
```

We'll work with this data source in the next section after we introduced another interface function.

So, after we defined a function that builds a data source from a function returning a generator, how about building a data sink from something we already have? What would be the opposite operation?

As it turns out, there's nothing we have to build: The opposite operation to a function taking a value and returning a generator would be a function taking a generator and returning a value. But that's exactly the description of our filter interface for the case of sinks: Any function that takes a generator (and optionally a second argument) and (optionally) returns some value is already a sink - there is no need to build one!

### Passing Values through Functions

The next thing we would like to have is building filters from existing functions. The pattern is easy: Read input, call the function and write the result to output. The following function accomplishes this:

```-- Wraps a function as filter
function pass( pFunction )
return function( pInput, pOutput )
while true do
local lData = pack( pInput() )
if lData[ 1 ] == nil then
break
end
pOutput( pFunction( unpack( lData ) ) )
end
end
end
```

With this function at hand we can create a filter that uses `tonumber()` to convert the strings passed from our number parser to numbers, so we can use them to compute their average. We build the following pipe:

```> avg_from_string = pipe( parse_numbers, pass( tonumber ), avg )
> return avg_from_string( "123 5 78 abc 12" )
54.5
```

Note that we can use `pass()` with a sealed pipe to use it as a subpipe and pass data through it.

The following function is a variety of `pass` that doesn't do any output, effectively consuming all input without any return:

```-- Wraps a function as data sink, consuming any input without any output or
-- return
function consume( pFunction )
return function( pInput )
while true do
local lData = pack( pInput() )
if lData[ 1 ] == nil then
break
end
pFunction( unpack( lData ) )
end
end
end
```

### Reading from and Writing to Pipes

Let's now see if we can interface pipes with custom program logic that processes data in loops. There are two ends of the pipe to interface with: processing the output flowing out of a pipe, and feeding values into a pipe. (Actively feeding a filter's input and reading its output in the same thread probably wouldn't be of much use because of the buffering and synchronization issue it causes. We won't go into this, therefore.)

To process the data coming out of a pipe by a loop we need to turn the pipe into a generator. The following function accomplishes this, wrapping a pipe (or any filter) into a reusable function with one argument that returns a generator yielding the output of the pipe when it's given the argument as input (This function exposes the same signature pattern as is used by `string.gfind()`, `io.lines()` and the like):

```-- Wraps a filter into a function with one argument returning a generator that
-- yields all values produced by the filter from the input given as argument.
function drain( pFilter )
return function( pInput )
local lGenerator = coroutine.create( pFilter )
return function()
local lResult = pack( coroutine.resume( lGenerator, pInput, coroutine.yield ) )
local lSuccess = table.remove( lResult, 1 )
if lSuccess then
if coroutine.status( lGenerator ) == "suspended" then
return unpack( lResult )
end
else
error( lResult[ 1 ] )
end
end
end
end
```

We can now write code of the following sort:

```for ... in drain( pipe( ... ) )( ... ) do
...
end
```

We stated initially that the pipe to drain has to be a data source, but in fact, the function above makes no assumptions about that and thus does not require the pipe to be a data source. It depends on what kind of parameter we intend to give to the returned function: If we plan to give it a generator, the pipe need not be a data source.

Now, how about the opposite binding, that is, turning a pipe that is a data sink into a function that we can call repeatedly to feed data into the input of the pipe? We can implement a function returning such a feeder like follows (for simplicity we don't return a reusable function here like in the case of `drain()` but bind the output parameter to the pipe in one step):

```-- Wraps a pipe that must be a data sink into a function that feeds its
-- arguments into the pipe's input.
-- THIS FUNCTION DOES NOT WORK WITH THE PIPES WE USE HERE!
function feed( pPipe, pOutput )
local lOutput = coroutine.create( pPipe )
local lPush = function( ... )
local lSuccess, lResult = coroutine.resume( lOutput, unpack( arg ) )
if not lSuccess then
error( lResult )
end
return coroutine.status( lOutput ) == "suspended"
end
lPush( function() return coroutine.yield() end, pOutput )
return lPush
end
```

The function creates a coroutine from the pipe passing it `coroutine.yield` as input, then executes it until it yields (requesting its first input), and returns a wrapper that resumes the coroutine on every invocation passing its arguments to it. The wrapper returns `true` if the coroutine yields again, that is if it will accept more input. Let's use it:

```> feeder = feed( square, print )
stdin:6: attempt to yield across metamethod/C-call boundary
stack traceback:
[C]: in function `error'
stdin:6: in function `lPush'
stdin:10: in function `feed'
stdin:1: in main chunk
[C]: ?
```

Oops! We stumbled upon a limitation of Lua: Lua does't allow the generator called from the header of a `for` loop to yield! If we rewrite our `square` filter, we can work around this limitation:

```> function square2( pInput, pOutput )
>>     while true do
>>         local lValue = pInput()
>>         if lValue == nil then break end
>>         pOutput( lValue * lValue )
>>     end
>> end
> feeder = feed( square2, print )
> return feeder( 2 )
4
true
```

There is, however, another problem: The `feed` function will only work for simple filters, it won't work for pipes. The reason is that in our pipes implementation the coroutines executing the filters are "stacked" one above the other in a resume/yield chain, and the filter that receives `coroutine.yield` as input has already been resumed by the filter following it, so upon yield it will return there and not to the code calling the pipe at the top level. We won't get the processor back, having the pipe waiting for input, as we intended.

We can improve our pipes implementation so that it doesn't have this problem: For this, we wrap all contained filters into coroutines and have them yield on input and output, and control them by a main loop that calls input and output itself:

```-- A symmetric pipe implementation. Pipes of this sort can be resumed from
-- both ends but prevent the constituting filters to use 'for' loops for
-- reading input. Also, sources and sinks cannot use the input and output
-- parameters of the pipe.
function symmetric_pipe( ... )
local lFilters = arg
local lFilterCount = table.getn( lFilters )
return function( pInput, pOutput )
local lHandover
local lInput = function()
lHandover = nil
coroutine.yield()
return unpack( lHandover )
end
local lOutput = function( ... )
lHandover = arg
coroutine.yield()
end
local lProcessors = {}
for _, lFilter in ipairs( lFilters ) do
table.insert( lProcessors, coroutine.create( lFilter ) )
end
local lCurrentProcessor = lFilterCount
while lCurrentProcessor <= lFilterCount do
if not lProcessors[ lCurrentProcessor ] then
error( "Requesting input from closed pipe" )
end
local lSuccess, lResult = coroutine.resume( lProcessors[ lCurrentProcessor ], lInput, lOutput )
if not lSuccess then
error( lResult )
end
if coroutine.status( lProcessors[ lCurrentProcessor ] ) == "suspended" then
if lHandover == nil then
lCurrentProcessor = lCurrentProcessor - 1
else
lCurrentProcessor = lCurrentProcessor + 1
end
if lCurrentProcessor == 0 then
lHandover = pack( pInput() )
lCurrentProcessor = 1
elseif lCurrentProcessor > lFilterCount then
pOutput( unpack( lHandover ) )
lCurrentProcessor = lFilterCount
end
else
lHandover = {}
lProcessors[ lCurrentProcessor ] = nil
lCurrentProcessor = lCurrentProcessor + 1
end
end
end
end
```

This pipes implementation allows pipes to be used with `feed()` too, however, these pipes have another problem: Since all filters now will yield on input, they suffer from Lua's limitation we saw above, effectively prohibiting us to use `for` loops in any filter. This is too severe a limitation, so we abandon this otherwise elegant implementation, accepting that there is no way to feed data into pipes from within a loop. If we want to write code that feeds data into a pipe, we have to either wrap it into a generator using `corourine.yield` and `coroutine.wrap` or write it as a data source, stuffing it in front of the pipe, and then execute the pipe. Our pipes must always be in the controlling position, and there is no way to have our loop control execution!

There is another issue with the `symmetric_pipe()` implementation above: It would prohibit using the input and output parameters of the pipe as parameters for the data sources and sinks because they never get them to see. We would lose quite a bit of flexibility by this.

## Reading and Writing Files

Using our `source()` function from above, we can simply wrap `io.lines` to get a data source that reads a file and writes its contents line by line to its output. There is, alas, no function in the standard library that we could use as a file sink, we have to write our own using `io.write`. For symmetry, we therefore implement also a file source using `io.read`. We implement it in such a way that we can use it either as source (taking two parameters) or as function returning a generator (when invoked with one parameter). This allows us to use it at our discretion for building pipes as well as for invoking them:

```-- Returns a function that can be used as data source yielding the contents of
-- the file named in its input parameter, processed by the given formats, or
-- that can be called with a filename alone to return a generator yielding this
-- data. If no formats are given, line-by-line is assumed.
function filereader( ... )
local lFormats = arg
return function( pInput, pOutput )
local lInput = io.open( pInput )
local lOutput = pOutput or coroutine.yield
local lFeeder = function()
while true do
local lData = pack( lInput:read( unpack( lFormats ) ) )
if lData[ 1 ] ~= nil then
lOutput( unpack( lData ) )
end
if lData[ table.getn( lData ) ] == nil then
break
end
end
lInput:close()
end
if pOutput then
lFeeder()
else
return coroutine.wrap( lFeeder )
end
end
end

-- Returns a data sink that writes its input, optionally formatted by the
-- given format string using string.format(), to the file named in its output
-- parameter. If no format is given, the processing is analogous to print().
function filewriter( pFormat )
return function( pInput, pOutput )
local lOutput = io.open( pOutput, "w" )
while true do
local lData = pack( pInput() )
if lData[ 1 ] == nil then
break
end
if pFormat then
lOutput:write( string.format( pFormat, unpack( lData ) ) )
else
for lIndex = 1, table.getn( lData ) do
if lIndex > 1 then
lOutput:write( "\t" )
end
lOutput:write( tostring( lData[ lIndex ] ) )
end
lOutput:write( "\n" )
end
end
lOutput:close()
end
end
```

Let's use both to copy a file line by line:

```> copy = pipe( filereader(), filewriter() )
> copy( "data.in", "data.out" )
```

If we stuff various filters between the reader and writer, we can do any processing we want on the lines traveling the pipe. A simple scenario is a utility like grep. The following function builds a filter we can use for this:

```-- Returns a filter that filters input by the given regexp
function grep( pPattern )
return function( pInput, pOutput )
for lData in pInput do
if string.find( lData, pPattern ) then
pOutput( lData )
end
end
end
end
```

We won't test this right now but instead compose a slightly more interesting example. You might have noticed that the function above doesn't mention lines in any way. Indeed, the filter is independent from the input being lines or not. Let's write another filter that collects lines into paragraphs and stuff the two together to do the grep on paragraphs instead of lines (like `grep -p`).

```-- Filter collecting lines into paragraphs. The trailing newline is preserved.
function paragraphize( pInput, pOutput )
local lBuffer = {}
for lLine in pInput do
table.insert( lBuffer, lLine )
if lLine == "" then
pOutput( table.concat( lBuffer, "\n" ) )
lBuffer = {}
end
end
if next( lBuffer ) then
pOutput( table.concat( lBuffer, "\n" ) )
end
end
```

```> filter_foobar_paragraphs = pipe( filereader(), paragraphize, grep( "foobar" ), filewriter() )
> filter_foobar_paragraphs( "data.in", "data.out" )
```

If we liked, we could have omitted the file writer, giving `print` instead of an output file name as second parameter to the pipe to have the results printed to the console.

An especially interesting kind of file reader is one that reads Lua data files of the sort described in section 10.1 of PIL (http://www.lua.org/pil/10.1.html): We would want it to write to output one after the other the entries read from the file. The following function works - similarly to the filereader - as a data source or function returning a generator. It executes the Lua file in an empty sandbox, exploiting some metaprogramming to submit the executing chunk the function(s) it calls for submitting a record (`entry` in the PIL example). This is done by defining the `__index` metamethod of the sandbox to provide the chunk with a function that outputs or yields the record together with its tag name (`entry` in our case). Having the tag name written to output too allows us to have input files carrying more than one kind of data (entity type), for instance if we would want to keep the contents of an entire (sufficiently small) relational database in one data file we could use the table names as tagnames and write the contents of all tables to one file.

```-- Invocation with a single argument returns a generator yielding the records
-- from the named Lua file. When invoked with two arguments this function acts
-- as a data source yielding this data on output.
function luareader( pInput, pOutput )
if pInput == nil then
error( "bad argument #1 to `loadfile' (string expected, got nil)" )
end
local lInput, lError = loadfile( pInput )
if not lInput then
error( lError )
end
local lOutput = pOutput or coroutine.yield
local lSandbox = {}
setmetatable( lSandbox, {
__index = function( pSandbox, pTagname )
local lWorker = function( pRecord )
lOutput( pRecord, pTagname )
end
pSandbox[ pTagname ] = lWorker
return lWorker
end
} )
setfenv( lInput, lSandbox )
if pOutput then
lInput()
else
return coroutine.wrap( lInput )
end
end
```

If we test this with the data from http://www.lua.org/pil/10.1.html (only one record) we get:

```> reader = luareader( "data.lua" )
> return reader()
table: 2001af18 entry
> return reader()
>
```

Let's now examine some more complex transformations possible with pipes.

## Flattening and Folding Filters

You might have asked yourself already how we would feed data into a pipe from multiple input sources so that the input to the pipe is the concatenation of the output of those sources.

If you think about this you will recognize that the core of this operation is not restricted to sources but can happen at any stage of a pipeline: What we need is a filter that produces a sequence from every input item and writes the concatenation of all those sequences to output. This is essentially a flattening operation. Having this generalization available as a filter we will no more need to build something special like a multisource: To concatenate several inputs we only need to produce a sequence of those inputs or values describing them, and apply a filter that unfolds every element into its contained data. We would best do this by calling a function that takes the element as parameter and returns a generator yielding the data resulting from unfolding it.

The following function takes a function returning a generator (such like `io.lines`) and builds a filter from it that calls this function for every input, feeding the output of the generator into its output:

```-- Returns a filter that calls the given function for every input and feeds
-- the output of the returned generator into its output.
function expand( pFunction )
return function( pInput, pOutput, ... )
while true do
local lInput = pack( pInput() )
if lInput[ 1 ] == nil then
break
end
local lGenerator = pFunction( unpack( lInput ) )
while true do
local lData = pack( lGenerator() )
if lData[ 1 ] == nil then
break
end
pOutput( unpack( lData ) )
end
end
end
end
```

Say, we have a file containing a list of filenames, one per line, and we want to concatenate the contents of those files line by line into one output file. This is how we could do that:

```> concat_files = pipe( filereader(), expand( filereader() ), filewriter() )
> concat_files( "filelist.in", "data.out" )
```

By the way, this example shows the hidden power behind the double nature of the functions returned by `filereader()`.

Let's now think about the opposite operation: assembling data flowing through the pipe into larger units. Since data flowing through a pipe is by its nature flat, an assembling filter must know out of itself (possibly by also looking at the data) where to draw the boundaries that separate the data going into one assembly from the data going into the next. There are no external hints like the end of input in the flattening case to switch from one assembly to the next.

The easiest way to describe an assembling algorithm is as a function that takes an iterator as argument pulling as much data out of it as it needs and returning the assembly. Such a function can be wrapped into a folding filter by the following function:

```-- Filter that calls the given function with its input until it is exhausted,
-- feeding the returned values into its output.
function assemble( pFunction )
return function( pInput, pOutput )
local lTerminated = false
local function lInput()
local lData = pack( pInput() )
if lData[ 1 ] == nil then
lTerminated = true
end
return unpack( lData )
end
repeat
pOutput( pFunction( lInput ) )
until lTerminated
end
end
```

With this function at hand, we can provide an easier implementation for the filter collecting lines into paragraphs we implemented earlier:

```-- Collects a paragraph by pulling lines from a generator. The trailing newline
-- is preserved.
function collect_paragraph( pInput )
local lBuffer = {}
for lLine in pInput do
table.insert( lBuffer, lLine )
if lLine == "" then
break
end
end
return table.concat( lBuffer, "\n" )
end

paragraphize = assemble( collect_paragraph )
```

## Nested Pipes

### Processing Complex Data

In the previous section we saw how to disassemble and assemble composite data (files and paragraphs in this case). The constituent parts of these composite data were fed along the same linear pipe as the composite data themselves, resulting in one single linear path of transformations. (This is at all what pipes are about, isn't it?) There are two facts to note about this scenario:

• The data stream fed between any two filters is homogeneous, meaning there is data of only one sort that is exchanged with every handshake.
• The boundaries of any composite data disassembled and assembled on their way through a pipe need not match, that is the data resulting from some composite entity need not be collected after transformation in exactly one new complex entity.

After all, the latter wouldn't even be possible in the general case since the intervening filters are free to mangle the disassembled data at their will, blurring the borders between the initial composites.

Now, imagine what topology would have to be used instead of one linear pipe if we wanted to process complex data with a heterogeneous structure where the constituent parts would have to undergo different transformations, and we wanted to use pipes for those transformations as well. The topology that comes to mind would be a bunch of parallel "subpipes" coming out of one node and joining into another. The question is, how are inputs and outputs of the subpipes synchronized in those nodes and possibly between them?

Obviously the parallel pipes must not "eat each other's food" individually pulling input, and everything produced by any of the pipes must go into the result constructed from the output of all pipes somehow. Naturally, we come to the concept that all subpipes must have their input and output in the same node (filter), and that they get exactly one input (read by the filter controlling the subpipes) and must produce exactly one output (assembled and written to the output by the controlling filter). That is, the subpipes are kept apart from the filter's input and output which is served by the controlling filter node itself. In fact, the subpipes are thus run as functions taking argument(s) and producing return value(s). Such pipes, as we know, must start with a data source (getting the input value(s)) and end with a data sink (returning the result value(s)). From the filter node's point of view there is no difference to a function call altogether.

As it turns out, processing the constituent parts of complex values using parallel "subpipes" poses no special requirement on the filter having them as input/output - it's like calling functions. Since there's nothing special to pipes, I won't provide a function for constructing a structured value from another one by calling a set of functions.

We can, however, provide a simple example of using a subpipe not involving complex values. We'll use `pass()` to run a sealed pipe as a subpipe to process the individual values flowing through the pipe. Remember our `average_of_seq_of_powers` example above computing the average of the first `n` powers of 4. We are now interested in how this value changes with `n` and build a corresponding sequence by passing every `n` through the pipe `average_of_seq_of_powers` we already have, using it as a subpipe:

```> seq_of_average_of_seq_of_powers = pipe( seq, pass( average_of_seq_of_powers ) )
> seq_of_average_of_seq_of_powers( 4, print )
1
8.5
32.666666666667
88.5
```

This is possible because `average_of_seq_of_powers` is a sealed pipe and thus behaves like a function. "Passing" may be a bit misleading a terminology for using subpipes this way since there isn't an alive subpipe that reads input and writes output - it's a completely different communication pattern: A new instanve of the pipe is created and run to end for every value "passed". But "passing" is the terminology we also use when invoking functions, and the value indeed manages to "go" through the subpipe, undergoing transformation, so this is probably justified.

### Processing Heterogeneous Data Streams

Apart from what we discussed in the previous section, we could imagine another scenario using parallel pipes where the subpipes remain pipes to the outer pipe instead of masquerading as functions: The data stream could be at some stage made up of data of different kinds that have to undergo different transformations before again joining the main pipe. There would have to be some sort of switch routing the data to one of a set of subpipes with some collector pulling the data from the ends of the subpipes in the order the corresponding input was fed. The latter condition forces the node collecting the data to be the one doing the input routing (since there are no other communication channels the two nodes could use for synchronization). The difference from the complex data scenario above would be that the subpipes need not be synchronized in input/output behaviour and can even have interleaving input and output.

To implement this sort of processing we would have to be able to suspend pipes on input, feeding data when it becomes available. This isn't possible with the sort of pipes we have here and would require symmetric pipes, so we abandon this idea.

Out of curiosity let's, nevertheless, ask if given these preconditions it were possible to have even a real pipe spaghetti that is, a web of pipes emanating from one source and converging into one sink. Obviously, such a construct couldn't be assembled by recursively assembling less complex structures into more complex ones but we'd have to wire the filters and subpipes together by individually connecting their inputs and outputs. If we think about how data would travel through such a web we see that instead of our pull style communication scheme (asking generators for input) we would need a push style communication (calling functions for processing output). (Our pull style scheme doesn't harmonize with the type of output dispatching the switches perform - being able to examine the data to be routed - and would require dispatching the request for input based only on the data that already has been output, that is without being able to examine the input.) Having that, we could even imagine processing webs with multiple inputs and outputs. This would be an interesting project to implement and experiment with to see what it could do for us.

## Working on Notation

While the main goal we pursued by implementing pipes was to get a natural notation for implementing data transformations (for data streams of undefined length, that is) that can be then combined to more complex transformations, let's see if the notation for using these transformations can be enhanced. For this, we will apply some metaprogramming.

The inline notation for invoking pipes, that is, building a pipe for single use and - without storing it in a variable - using it immediately, looks a bit clumsy. Take as an example our grep utility (omitting `paragraphize`):

```> pipe( filereader(), grep( "foobar" ), filewriter() )( "data.in", "data.out" )
```

This is far from the cleanness of the UNIX notation for pipes which would be roughly equivalent to

```\$ filereader "data.in" | grep "foobar" | filewriter "data.out"
```

When thinking about how we could use an operator notation to construct pipes, we first note that we have two options. Remember that a filter is a function taking as input parameter a generator and as output parameter a function. Having the generator at hand (which is the case when we are going to execute the pipe we are building on the fly), we can substitute `coroutine.yield` for output and wrap the following filter into a coroutine, thus getting another generator that yields the input as processed by the filter. We can then take this as input for the next filter and repeat the procedure, ending, after the last filter, with a generator that yields the output of the whole pipe. We could use this in a `for` loop, but to execute the pipe we had to add another construct to drain the generator or, better, append the sink by some other construct, providing it with its output parameter. The notation for output would this way be different to the one used for feeding input. Also, if we build generator chains this way, we'll reimplement most of what `pipe()` does but won't be able to use this notation to create pipes we could use as subpipes, for instance.

I therefore settled for the second option we have: building a reusable pipe and execute it immediately.

Building pipes is easy. All we need is to wrap the first filter into a proxy, that is, a table with attached metatable that defines the metamethods implementing the operators we need. Since the first filter is often a data source built by `source()` or one provided by our library like `luareader`, this wrapping can mostly be hidden. For those cases where it must be done explicitly, we make the `bind()` function publicly visible:

```-- Metatable
local sPipeOperations

-- Binds its argument to the library by wrapping it into a proxy
function bind( pValue )
local lProxy = { payload = pValue }
setmetatable( lProxy, sPipeOperations )
return lProxy
end
```

The metatable defines the `__mul` metamethod to implement the * operator for connecting filters. I chose multiplication instead of, for instance, addition or concatenation, because the effects of filters stuffed together indeed do multiply, not add. (You can see this very clearly in our `seq_squares_squared` example above.) Besides, the asterisk not only looks prominent but, more importantly, binds stronger than most other operators, making it possible to apply those to a pipe without using parentheses. The operator returns a proxy as well so we can chain it:

```sPipeOperations = {
-- Chains a proxy with a function or other proxy
__mul = function( pHeadProxy, pFilter )
local lProxy = { head = pHeadProxy, payload = fncompile( pFilter ) }
setmetatable( lProxy, sPipeOperations )
return lProxy
end,
...
}
```

To be able to call the wrapped functions transparently, we also implement the `__call` metamethod as executing the function the proxy represents with the given arguments. We build the function represented by the proxy using two internal utility functions:

```-- Compiles a proxy to the function it represents
local function fncompile( pSpec )
if type( pSpec ) == "function" then
return pSpec
else
if pSpec.head then
return fncompile( pipe( unpack( unchain( pSpec ) ) ) )
else
return fncompile( pSpec.payload )
end
end
end

-- Collects the payloads of a proxy chain into an array
local function unchain( pProxy )
if not pProxy then return {} end
local lResult = unchain( pProxy.head )
table.insert( lResult, pProxy.payload )
return lResult
end

sPipeOperations = {
...
__call = function( pProxy, ... )
return fncompile( pProxy )( unpack( arg ) )
end,
...
}
```

The reason `fncompile()` is implemented as a separate function and is also called on the result of `pipe()` is that we want to be able to freely mix and match pipes constructed using * or `pipe()`. For this to achieve, `pipe()` like any other function taking functions as arguments uses `fncompile()` to convert them before use and, like any function building filters, returns a bound function. (I won't provide the changed functions here again, though.)

We can now stuff together pipes or call functions taking a pipe as parameter without using `pipe()` anymore, and if the data source or the first filter is bound also without using `bind()`:

```> seq = bind( seq )
> average_of_seq_of_powers = seq * square * square * avg
> return average_of_seq_of_powers( 4 )
88.5
> filter_foobar_lines = filereader() * grep( "foobar" ) * filewriter()
> filter_foobar_lines( "data.in", "data.out" )
```

Concerning inline calls we still haven't won much until now since all we can remove now from the ugly notation form above are four letters: `pipe`. We need to keep the parentheses around the pipe constructor expression since the function call operator binds stronger than multiplication:

```> (filereader() * grep( "foobar" ) * filewriter())( "data.in", "data.out" )
```

So, this isn't yet where we would like to arrive. For pipes that must be called with two arguments there is probably nothing we can easily do any more without overcomplicating matters.

We can, however, achieve a pipe like notation for feeding data in, combined with an assignment for the result of the transformation when the pipe is to be called with only an input argument and returns its result (that is, works as a function call with a single argument). Remember our `average_of_seq_of_powers` example above that we will now rewrite as follows:

```> result =  4 .. seq * square * square * avg
```

Data transformations the result of which goes into a program variable for later processing are a common case, and at the same time we are accustomed to use an assignment notation in such cases. So this notation seems like a good compromise, and most likely we wouldn't gain much more if we tried to tweak the notation further until we get the receiver of the result at the right end of the pipe like UNIX.

The above notation becomes possible if we implement the `__concat` metamethod that Lua calls for the .. operator if it's provided by any one of the two operands:

```sPipeOperations = {
...
__concat = function( pInput, pProxy )
return fncompile( pProxy )( pInput )
end
}
```

(Note that we cannot use the more nicely looking > operator by implementing the `__lt` metamethod since Lua would convert its result to a boolean what is not what we need.)

Let's finally illustrate the power of this notation, returning to our `seq_of_average_of_seq_of_powers` subpipe example. As we are forced to compute a value instead of writing to output, we'll add another average computation, getting the average of the averages of the sequences of length 1 up to the given number, of the powers of 4:

```> return 4 .. seq * pass( seq * square * square * avg ) * avg
32.666666666667
```

This completes our metaprogramming trip. Before coming to some finishing thoughts, let's note the following observation: We could use the .. notation we implemented for calling any function taking one argument if we `bind` it before use:

```> return 1 + "4" .. bind( tonumber )
5
```

And, if all functions we have at hand are bound, we could construct entire call chains this way by using the concatenation operator! (Using other operators in conjunction like in the above example would, however, be subject to operator precedence, so I wouldn't recommend it.) There is, however a better way to represent such call chains if our data is represented as tables (which is more often than not the case): We could equip our data with a metatable defining a `__call` metamethod that calls the function given to it as first argument with the data and the remaining arguments). The resulting notation could look like this (the `map` function, not provided here, builds a table by storing the values from the source table, converted by the function provided to it, under their original keys in the result table):

```t (map, tonumber) (map, square) (avg)
```

This looks like a pretty nice function call chain converting an array of strings representing numbers `t` to numbers, squaring them and computing the average. It evaluates to

```avg( map( map( t, tonumber ), square ) )
```

what is much less readable.

Note that the functions `square` and `avg` above are not those we defined earlier. Those were filters, and we could use them in a pipe to compute the same result (provided we had a function `values` accepting a table and returning a generator yielding its values). We could then use one of the following notations, the first one exploiting the `__call` metamethod of the table, and the second one the `__concat` metamethod of the pipe proxy:

```t (source( values ) * pass( tonumber ) * square * avg)
t .. source( values ) * pass( tonumber ) * square * avg
```

Note that while the second notation looks much nicer, it cannot be used as a statement like the first one - Lua requires us something to do with the returned value. This isn't normally a problem since we'd anyway want to use the result the pipe provides as return value, but it would be a limitation if the result of the expression were entirely in side effects. But in this case the notation would probably anyway suffer from the lack to provide an output parameter what could be easily done with the first notation:

```t (source( values ) * pass( tonumber ) * square, print)
```

## Some Final Considerations

We have arrived at the end of our discussion, let's add some final considerations about the mechanism we implemented.

You might have wondered why we don't need a boolean return from the output function in our filter interface to indicate to the filter that the output went o.k. and, more importantly, that more output will be accepted. After all, we implemented our filters as active entities writing output, and there must be someone to receive it. On UNIX, we see the `pipe closed` error when a filter tries to write to the input of the next filter if that has already finished execution and closed its input so that no more data is accepted.

The reason we don't need this is that in our implementation any filter is resumed by the filter following it when requests input. That is, it can only execute if the previous output has been processed and new output is accepted (it's even expected, indeed). So there can never be a situation where a filter processed its input and cannot deliver its output: output can never fail - we won't see the pipe closed error we know from UNIX some day! This is an immediate result of our pull style communication scheme. This communication scheme thus also results in a nice symmetry of our filter interface: The input function takes no value and returns data, and the output function takes data and returns no value.

A side effect of this is that there is no guarantee for any filter to read its input to end. Premature end of input processing thus will go unnoticed. It would be easy to implement such a check - any filter must not die before its predecessor - but is probably not worth the effort. (There may, however, remain open files if for instance file readers aren't exhausted but throwing an error wouldn't help with this anyway.)

If we implemented pipes using a push style communication scheme (having filters yield on input and resume their output) we'd have another situation: When output returns (from the resume call to the next filter) it knows if this filter is still alive and thus will accept more input. It could communicate this to the calling filter so it could stop processing (or ignore it, provoking a pipe closed error on the next output). The input/output interface would then lose its symmetry, and the loops making up filter implementations wouldn't any more be controlled by input alone. As it seems, our intuitive pull style implementation is preferrable also from this perspective and yields more natural implementations.

Thinking again about output being unable to fail, there seems to be discrepancy with reality where output, for instance to a network connection, can surely fail. Let's therefore have a closer look:

Any filter in a pipe but the last gets `coroutine.yield` as output. This call indeed can never fail. (It returns when the coroutine gets resumed again.) The last filter in a pipe, on the other hand, is supposed to be a data sink, so it won't call its output, meaning the output call can't fail for it too. So, where do the output errors emerge then? Obviously where the data sink calls some function that may fail. This means that it's the data sinks who need to have a contract with whatever they deliver their data to that allows for communicating the state of the output channel and handle any errors. Alternatively, we could instead of writing a custom data sink, `drain()` the pipe in a custom program loop to feed its output into some channel, handling any issues ourselves. In neither case any filter has to deal with any communication errors. (If, however, we give a function that may fail as output to a pipe not ending with a sink, errors will either go unnoticed or will be thrown to our code that called the pipe executing.)

What about input errors? Any error occurring in any filter is transparently forwarded (re-thrown along the input chain) to the data sink. Thus, the sink or, if we like a loop we write to read from the end of the pipe, will handle (or not handle) input errors as well as output errors.

Another interesting observation: Remember the scenario of reading and writing input and output of a pipe from controlling code running in one thread.

To solve the synchronization problems it poses we could decide to decouple the code handling both ends of the pipe by using a callback architecture: giving a `callback` to the pipe through which it can obtain its input, and thus having the controlling code to manage only the output end of the pipe. As it turns out, this is exactly how we work with pipes in this article: The `callback` is the input function given to the pipe (the generator)! This is the scenario of `drain()`.

Looking from the other end, we could decide to provide a `call forward` to the pipe to use for delivering its output and have the controlling code handle only the input end of the pipe. Again, this is what our filter interface is about: What we give as output parameter is such a `call forward` function. This is the scenario of `feed()`.

If we compare the two scenarios, we see that in both scenarios the pipes (which are, as we know, inherently active as is every filter) are wrapped into passive entities represented by functions, reacting only on external stimulus (function call): Pulling data from the output end of the pipe in the pull type pipes implementation we use here or feeding data into the input end of the pipe if we implemented pipes in push style, having the filters yield on input instead of output. (This leads to a less natural and more complicated solution than the generator approach we used here, and additionally poses the stated restrictions on using for loops in filter implementations, anyway.)

As long as pipes are only executed (that is, contain a data source and a data sink), there would be no externally visible difference between the two pipe implementations.

Have at this place a final look again at our filter web from the same section: This is, in fact, an event driven application that is built from a web of active processing nodes. It reacts on events happening on its inputs (the input functions being called from external to feed data in), transforming them into events on its outputs (calling the output functions to deliver data) and/or changing its internal state (possibly even reconfiguring itself). There might be any sort of processing nodes, not just what we consider to be filters, by the way! While this is in essence a `call forward` architecture, event driven applications are typically seen in a `callback` architecture. This is interesting, but there's no contradiction, everything depends on the viewpoint from which we are looking at a call - from the called (application) or calling (framework) code. So, when we connect our processing web to some framework feeding its inputs from the external sources (mouse events, for instance) and maybe also providing the functions it uses for output (graphic routines, for instance), we'll naturally provide our input functions to it as `callbacks`.

RecentChanges · preferences
edit · history
Last edited March 28, 2012 7:00 am GMT (diff)