Pipelines and async

Ages ago I wrote some posts about an approach to pipeline-style code patterns for C# code. Recently I got a question on a gist I’d written to go with that article, because someone was having issues adding async code into pipeline components. Async patterns are much more common now, but this wasn’t something I’d actually tried. Cue some interesting experiments…

Async gets everywhere…

In my original code I had a basic pipeline step pattern that could map an input to an output:

public interface IPipelineStep<INPUT, OUTPUT>
{
	OUTPUT Process(INPUT input);
}

If you want to make the Process() method be awaitable, then you have to declare that method slightly differently:

public interface IAsyncPipelineStep<INPUT, OUTPUT>
{
	Task<OUTPUT> Process(INPUT input);
}

It has to return a Task<T> to keep the compiler happy. In the background the compiler does some trickery to allow your C# to look like a simple synchronous program, but allow some things to happen asynchronously. And that Task<T> return data is part of this magic. But this causes a bit of a problem: Now the input of our next step has to take in a Task<INPUT>. What does the output become? The code above would suggest it turns into Task<Task<OUTPUT>> and that doesn’t sound right. What would happen in a ten step pipeline? The nested Tasks would grow out of control…

We want to define our pipeline steps based on their data inputs and outputs. We don’t want to have to make the definition based on which position in the overall pipeline it will occupy. That would make our pipeline components really hard to work with – if you have to insert a new step at the start of the pipeline, all subsequent definitions would need to change. And that is Not Good…

So what can we do instead?

A first try at fixing this…

If the output of a pipeline step that awaits things needs to be a Task<OUTPUT> then perhaps we should make the input Task<INPUT> too? That solves our order-of-steps issue because every step can have the same signature:

So the basic definition becomes:

public interface IAsyncPipelineStep<INPUT, OUTPUT>
{
	Task<OUTPUT> Process(Task<INPUT> input);
}

Based on that, we need to tweak the definitions of the abstract pipeline based on these steps, to follow the same pattern:

public abstract class AsyncPipeline<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
	public Func<Task<TIn>, Task<TOut>> _pipelineSteps { get; protected set; }

	public Task<TOut> ProcessAsync(Task<TIn> Input)
	{
		return _pipelineSteps(Input);
	}
}

And similarly adjust the helper method which allows us to compose the steps together:

public static class AsyncPipelineStepExtensions
{
	public static Task<TOut> Step<TIn, TOut>(this Task<TIn> Input, IAsyncPipelineStep<TIn, TOut> Step)
	{
		return Step.ProcessAsync(Input);
	}
}

That gives us a basis to try defining some steps… How about an async fetch for some HTML from a website?

public class HttpFetchAsyncStep : IAsyncPipelineStep<Uri, string>
{
	private static readonly HttpClient _client = new HttpClient();
	
	public async Task<string> ProcessAsync(Task<Uri> Input)
	{
		var uri = await Input;
		return await _client.GetStringAsync(uri);
	}
}

The interesting thing here is that our input to each step is a Task<INPUT> – that’s the promise that there will be an instance of the INPUT class at some point in the future. That means our step needs to wait for that future-point before it does anything with the input. Hence the await Input being the first line here…

* sound of ominous foreshadowing *

And writing to disk can be async too:

public class DiskWriteAsyncStep : IAsyncPipelineStep<string, string>
{
	public async Task<string> ProcessAsync(Task<string> Input)
	{
		var data = await Input;
		var fileName = System.IO.Path.GetTempFileName();
		
		await System.IO.File.WriteAllTextAsync(fileName, data);
		
		return fileName;
	}
}

This is slightly different, because we’re awaiting something which doesn’t return data – but it’s broadly the same pattern.

And we can compose those together into a pipeline:

public class ExampleAsyncPipeline : AsyncPipeline<Uri, string>
{
	public ExampleAsyncPipeline()
	{
		_pipelineSteps = input => input
			.Step(new HttpFetchAsyncStep())
			.Step(new DiskWriteAsyncStep());
		
	}
}

That doesn’t change from our pre-async definition of a new pipeline. But we do need a little tweak to how we use it. Because our steps always have to take a Task<T> as input, we need a little tweak to ensure we have one for the input to the pipeline:

async Task Main()
{
	var pipeline = new ExampleAsyncPipeline();
	var uri = new Uri("https://news.bbc.co.uk/");

	var input = Task.FromResult(uri);

	var tempFile = await pipeline.ProcessAsync(input);

	Console.WriteLine($"{uri} saved to {tempFile}");
}

Now this runs, and it gives us some valid output:

https://news.bbc.co.uk/ saved to C:\Users\jeremy\AppData\Local\Temp\tmpF77A.tmp

But thinking about it, the code should probably hide the need for the Task.FromResult() call for us. We can amend the abstract AsyncPipeline to do this easily:

public abstract class AsyncPipeline<TIn, TOut> : IAsyncPipelineStep<TIn, TOut>
{
	public Func<Task<TIn>, Task<TOut>> _pipelineSteps { get; protected set; }

	public Task<TOut> ProcessAsync(Task<TIn> Input)
	{
		return _pipelineSteps(Input);
	}

	public Task<TOut> ProcessAsync(TIn Input)
	{
		var inputTask = Task.FromResult(Input);
		return _pipelineSteps(inputTask);
	}
}

Abstracting that away means we won’t forget to do it later, and it makes the code to run the pipeline marginally neater too:

async Task Main()
{
	var pipeline = new ExampleAsyncPipeline();
	var uri = new Uri("https://news.bbc.co.uk/");

	var tempFile = await pipeline.ProcessAsync(uri);

	Console.WriteLine($"{uri} saved to {tempFile}");
}

And now all our pipelines can go async if necessary…

As previously, all the code above is in a gist if you want to tinker with it. (I’ve pasted that from LinqPad, so it’s not a complete C# program, but it does include all the relevant code)

(And I want to add a “thank you!” to Walanem Figueiredo for asking the question that prompted me to stop and think about this – it’s turned out to be a surprisingly interesting topic)

But!!! There’s a problem here, I think

The code above works, and I was going to leave it at that. But that foreshadowing… Further back-and-forth in the comments on my original gist has made me realise that this is probably not the right answer. Having the await Input logic in each pipeline step is annoying boilerplate that you have to remember every time you write a step, so that’s not ideal. But it could also be the cause of some interesting bugs I think. And text time out I’ll explain a bit about how I realised that, and how I thought about correcting that problem.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google photo

You are commenting using your Google account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.