An alternative approach to pipelines

There was a lot of exciting new stuff on show at the Sitecore’s recent MVP Summit and Symposium the other week. Plenty of others have written up the general goings on at those events (have a google – there’s lots to read), so I thought I’d focus on something more specific that piqued my interest: the novel approach that’s being taken to pipelines in some of the new code Sitecore are producing.

Some background…

Commerce Pipelines Pipelines are a common concept in Sitecore, but I first came across the idea of a pipeline in code when I started working with a beta of Commerce Server back in 2000. So it’s fitting that one of the places Sitecore were showing off their new code was in Sitecore Commerce – the replacement for the old Commerce Server codebase. I’ve also made use of pipeline patterns in some personal code I’ve written over the years – most recently in my homebrew replacement for the old Google Reader, which I use to keep on top of the news.

One of the things that all these pipeline implementations have in common is that they use a single type as the input and output of the overall process, and each step in the pipeline has that type as their input and output. An object goes in the top, passes through each step and then comes out the end. A lot of the time, that’s all well and good. But sometimes I’ve felt that the object being passed through ends up a bit messy as a result of having to store state and data for every step. For example, in Commerce Server’s shopping baskets they solved the issue of needing complex data types for the pipeline by using a dictionary object, which acquired more entries and child lists and dictionaries over the course of the execution. And my RSS reader’s “processing new RSS Feed items” pipeline suffered from a similar issue. The pipeline data object there ended up with lots of fixed properties to deal with turning an entry from an RSS feed into “safe” title, metadata and content data to record in the database.

So, that background made me very interested in the way some of the new code Sitecore are producing was working. Having seen some presentations, I had a very interesting chat with Stephen Pope in the Symposium hotel bar, which got me interested in reproducing their idea of code-first pipelines which don’t need to have the same type for input as it has for output. It seemed to me that I might be able to tidy up some of my code – so I started investigating how it might work for me.

[NB: While I’m focusing on pipelines that are wholly defined in code here, I don’t mean to suggest that Sitecore are too. The concepts of config files and patches are just things I’ve not needed in the code I’m interested in]

And after a bit of tinkering and thinking about what Stephen had been saying, it turns out you can do quite interesting things with some pretty simple code here…

One approach…

In generic terms, a pipeline step can be described as an interface with single a processing method:

public interface IPipelineStep<INPUT, OUTPUT>
{
    OUTPUT Process(INPUT input);
}

By declaring different type parameters for the input and the output, we can describe a step that changes the type of the data it’s processing…

So a (trivial) concrete example of such a step might turn an integer to a string like so:

public class IntToStringStep : IPipelineStep<int, string>
{
    public string Process(int input)
    {
        return input.ToString();
    }
}

A pipeline needs to be able to compose together a set of these steps. The usage I’ve been thinking about would adopt a “code first” approach to setting these things up. Extension methods give an easy way to achieve this:

public static class PipelineStepExtensions
{
    public static OUTPUT Step<INPUT, OUTPUT>(this INPUT input, IPipelineStep<INPUT, OUTPUT> step)
    {
        return step.Process(input);
    }
}

So you can stack together a set of steps using code like:

int input=5;
string result = input
                  .Step(new IntToStringStep())
                  .Step(new DoSomethingToAString());

But literal code like that isn’t much use if we want to inject a pipeline via DI, so we need a wrapper class which can be used as a base for types that will be be registered for injection. At its most simple, (ignoring the whole business of naming pipelines, patching of pipeline steps etc) that just needs to expose a method to process the data, and set up the pipeline internally:

public abstract class Pipeline<INPUT, OUTPUT>
{
    public Func<INPUT, OUTPUT> PipelineSteps { get; protected set; }

    public OUTPUT Process(INPUT input)
    {
        return PipelineSteps(input);
    }
}

So you can create a concrete pipeline just by defining the set of steps you need in the constructor. Taking the trivial example from above:

public class TrivalPipeline : Pipeline<int, string>
{
    public TrivialPipeline()
    {
        PipelineSteps = input => input
            .Step(new IntToStringStep())
            .Step(new DoSomethingToAString());
    }
}

Already that’s looking interesting, but:

Here’s where it gets more interesting…

So that works fairly simply and easily, and you can build up quite complex pipelines with the compiler doing all your type checking for you. But during my chat with Stephen he suggested some other really interesting ideas that can stem from this design…

First up: what if the overall pipeline class above implements the IPipelineStep interface? It’s a trivial change:

public abstract class Pipeline<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT>
{
    public Func<INPUT, OUTPUT> PipelineSteps { get; protected set; }

    public OUTPUT Process(INPUT input)
    {
        return PipelineSteps(input);
    }
}

But it allows you to nest your pipeline objects, so that the top level pipeline can use other pipelines instead of individual steps. So you can build up more complex behaviour from groups of simpler tasks. Continuing with the trivial examples:

public class CompoundPipeline : Pipeline<int, string>
{
    public ExamplePipeline()
    {
        PipelineSteps = input => input
            .Step(new AnInitialStep())
            .Step(new InnerPipeline())
            .Step(new IntToStringStep())
            .Step(new DoSomethingWithAString())
    }
}

public class InnerPipeline : Pipeline<int, int>
{
    public InnerPipeline()
    {
        PipelineSteps = input => input
            .Step(new DoSomethingWithAnInteger())
            .Step(new SomethingElseWithAnInteger());
    }
}

The second interesting thing that you can do with this approach is that pipeline steps can be decorators that add logic to an inner step. For example, what about a scenario where you want to make a particular step optional, based on some sort of criteria? Rather than baking that logic into the underlying component, you can put the logic into a decorator and wrap it around any component you want to make optional. For example:

public class OptionalStep<INPUT, OUTPUT> : IPipelineStep<INPUT, OUTPUT> where INPUT : OUTPUT
{
    private IPipelineStep<INPUT, OUTPUT> _step;
    private Func<INPUT, bool> _choice;

    public OptionalStep(Func<INPUT, bool> choice, IPipelineStep<INPUT, OUTPUT> step)
    {
        _choice = choice;
        _step = step;
    }

    public OUTPUT Process(INPUT input)
    {
        if (_choice(input))
        {
            return _step.Process(input);
        }
        else
        {
            return input;
        }
    }
}

This step takes a function that operates on the input, to decide whether to run the child step or just pass the output through. Note that unlike the previous steps we’ve looked at, this one requires that the input and the output are the same type, because otherwise the “skip over” behaviour doesn’t work – so this has a type constraint to ensure INPUT and OUTPUT are related. So if you wanted a particular step to only run if the input was greater than 15 you could use something like:

public class PipelineWithOptionalStep : Pipeline<int, int>
{
    public PipelineWithOptionalStep()
    {
        PipelineSteps = input => input
            .Step(new DoSomething())
            .Step(new OptionalStep<int, int>(i => i > 15, new ThisStepIsOptional()))
            .Step(new DoADifferentThing());
    }
}

And it’s easy to move from this code to a step which can choose between options for the step to run as well. And of course the optional step or steps you pick from can be entire pipelines themselves… Potentially other things like “exception handling of failing steps” or logging could be wrapped up in that style.

In conclusion…

While this is a very simple “proof of concept” implementation of some of the ideas I’ve heard discussed, it shows some interesting promise. I’m going to have to spend some time implementing some real pipelines for my RSS Reader when I have a free moment, and see whether I can make that code look cleaner with this approach. If you’re interested in playing with this style of pipeline, you can grab the code for my tinkering from GitHub.

[Edited to add: After a conversation on twitter I also added a gist for a simple example for how a decorator could be used to apply a particular step to all the data in an input enumeration. Plus one for how a decorator could make pipeline steps raise start and end events.]

And in the meantime I’m even more interested to see the detail of how Sitecore’s code has approached this once it gets released, so I can see how much further they’ve managed to take it…

Advertisements

24 thoughts on “An alternative approach to pipelines

  1. Pingback: Getting pipelines from config | Jeremy Davis
    • It was an example really – talking about the theory of how you can put together functional-style code-first pipelines. But I have used these patterns to build real code.

      • The reason I ask, is that I am new to the theory and practice of implementing Pipelines..
        Your code has helped me a lot with my understanding and implementation.

        For the most part, the code is up and running;
        However, I’m hoping you can help – in generating objects from the xml file, the parsePipelineSteps method is saying that it can’t case from the source type to that of the Interface. Are you able to assist please?

        I know its probably cheeky of me, is it possible that you can provide me with a working and complete example?
        Thanks
        Dave

      • Happy to try and help if I can – that’s not a problem.

        If you’re looking at pipelines configured from XML, I’m guessing you’ve been looking at my follow-on post from this one, and hence have seen the gist I published with that? The ModifyablePipelineTest() method in Program.cs is a very simple complete example of loading a pipeline from XML.

        Making a guess, if you’re seeing an error about failing to cast a concrete type to an interface, that error around line 48 of ConfigBasedPipeline.cs (or your own version of that logic)? That’s the point where it’s using Reflection to turn the type name data in the XML into a concrete pipeline step object.

        If so, there’s two things I’d suggest you check:

        First, in your xml file, are you correctly describing the name of the type that needs to be loaded. In my gist’s example file (ConfigBasedPipeline.xml) the type attribute contains a .net type description string. It’s the fully qualified type name, followed (after a comma) by the name of the DLL that type lives in. So you need to make sure your type data here is correct. (And if the type is in a separate DLL, then you need to make sure that DLL is in the same folder as your main program’s dll)

        (Also, note that the dll name is dependent on the Visual Studio project file’s settings – I didn’t put the .csproj file into my gist, so if you project isn’t compiling to “StronglyTypedPipelines.exe” then you’d need to change the XML to match your project)

        And secondly, you need to ensure that the type being loaded implements the correct pipeline step interface. It must implement the IPipelineStep interface. That interface is just a marker, which says “this object is a pipeline step” but it doesn’t include any methods. (The method that gets called when the step is executed is added by the generic IPipelineStep<INPUT,OUTPUT> interface. And that interface inherits from the marker interface)

        Does that help?

  2. @jermdavis,

    Brilliant, many thanks to you…

    I’ve tried your code, it builds and functions as described.
    This will give me a great starting point, thank you.

    Thank you also for your tips on the configuring of the xml file. I shall bear those in mind for the future.

    Regards
    Dave

      • Jeremy,

        I’ve been looking around your github stuff..
        I see that you have done a Logging implementation, along with a version which uses event within the pipeline stuff.

        My first comment: Great Stuff.
        Secondly: I’d be interested in completing your examples by showing code of your implementations of logging (i’ve gotten a hold of your events implementation…

        Thanks
        Dave

      • So I did write an example of logging before, but the patterns were specific to a question someone asked me on Twitter, and I’m not sure if that will run as-is. (Too long ago 😉 )

        Fundamentally to make standardised logging work, you need to extend the basic pipeline step interface so that a concrete step can find the right logging object. (There’s nothing to stop you using something like Trace.WriteLine() without this – but that doesn’t guarantee the same behaviour across all your pipeline steps) There are various ways you could do provide the log to the steps, but giving the log object to the parent pipeline object, and giving each step a reference to its parent works fine for a single-level pipeline. (It needs a little extra code if you can have child pipelines in your code though) From then your individual steps can just call the log object whenever they have something to report. Alternatively, you could make the execute process for the pipeline pass the log object into each step’s process method when it’s called. Works either way – the “right” pattern probably depends on your use case.

        There’s a runnable example here, which hopefully makes it a bit clearer? [And it’s in a gist as well, as I’m sure I’ll want this code again eventually]

  3. Pingback: Thinking about errors in code pipelines | Jeremy Davis
  4. Jeremy,
    Thanks for the runable example – thinking of making use of your code as a sampler..
    However, should the Pipeline be switched over for ALL projects to the logging pipeline rather than the pipeline as is?
    Thanks
    Dave

    • I think that’s an architecture decision for your code, not something that can be wholly answered by my generic examples. If that logging pattern works for you, and having logged data for debugging will help you work with the code, then yes you can use it for all your pipelines. But you could potentially have some pipelines use it and some not. And you might decide your code fits better with a different pattern for how to implement logging…

  5. Jeremy,

    Okay.
    I would like to implement the pipeline, via config file and with logging.
    Presumably, I would have to change over from IPipelineStep to ILoggingPipelineStep..
    How should I be adding the log object to the list of pipeline elements?

    Hope you can help.
    Thanks
    Dave

      • Jeremy,

        >> The gist example that had a hard coded pipeline passed the log object to the constructor of the concrete pipeline class.
        Yes, I see that, thanks 🙂

        Research, trialling and testing has showed that, the same code loads the pipeline class – but without a reference to the log object;

        The issue is the, “parseLoggingPipelineSteps”, method, whereby the invoke method is called;
        Please advise as to how the log object would be be passed in?

        Should the process/flow be:
        1) load the config into a list of IPipelineStep objects;
        2) iterate over the list, casting to an ILoggingPipelineStep at that point, add in a reference to the logging object?

        Hope you can help and advise, please.

        Many Thanks

        Dave

      • The logging aware steps access the log by knowing what their parent pipeline is. The parent pipeline holds a reference to the logging object, and when it adds the steps into its collection, it passes them a reference to itself. So you need to modify the code that loads the pipeline steps from config to match that behaviour. The reflection code that loads a pipeline step will need to pass a reference to the pipeline object into the step’s constructor when reflection calls it. Then when the loaded step is added to the pipeline’s step collection, it will have access to the log object.

  6. Jeremy,

    Thanks for your reply of 27th February 2019 1144: However, words sometimes aren’t clear enough.
    Please would you provide example code? This could well be useful for others.

    Many Thanks

    Dave

      • Yes, absolutely. You can use GetConstructor() to find a constructor which takes a specific parameter. So you could use that (and the subsequent invoke of that constructor) to pass in data like a reference to the Pipeline, or pass in the log object directly to the step if you preferred.

      • Well I can’t see the code you’re working on, so basing something on what I’d put in gists before… (Busy with work, so typing this from memory – apologies for any typos) If you’re merging the logging behaviour with the load from config behaviour, you need to change two key things. Firstly you need your pipeline steps to be based on the LoggingPipelineStep<INPUT, OUTPUT> type. And you need to base your overall pipeline class on ConfigBasedPipeline<INPUT, OUTPUT> and update the loading so it sets the ParentPipeline property as each step is loaded. That happens in the parsePipelineSteps() method of the config based pipeline. So you might end up with something like:

        private IList<IPipelineStep> parsePipelineSteps(XElement pipelineXml)
        {
            var pipeline = new List<IPipelineStep>();
        
            foreach (var xStep in pipelineXml.Elements("step"))
            {
                string typeName = xStep.Attribute("type").Value;
        
                var type = Type.GetType(typeName);
                var ctr = type.GetConstructor(Type.EmptyTypes);
                var obj = (ILoggingPipelineStep)ctr.Invoke(Type.EmptyTypes);
        
                obj.ParentPipeline = this;
        
                pipeline.Add(obj);
            }
        
            return pipeline;
        }
        

        Now that relies on having a non-generic base interface for all the steps that exposes the parent pipeline property required to access the logging object, so in this model LoggingPipelineStep<INPUT, OUTPUT> needs to inherit something like:

        public interface ILoggingPipelineStep
        {
             ILoggingPipeline ParentPipeline { get; set; }
        }
        

        There will be some other changes required to the original code, because the base type of the pipeline step has changed I think. Busy with work, so don’t have time to bottom that out – but you should be able to work through those…

  7. Jeremy,

    Further to your posting of: 27th February 2019 at 16:15

    Yes, I was trying to merge the two configurations (basic config and logging..)

    Many Thanks ..
    I’m pleased to announce that I was nearly there with my implementation ..

    All sorted now.

    Thanks very much.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.