Reader Level:
ARTICLE

Functional Programming with C#: Advanced Asynchronous Programming

Posted by Matthew Cochran Articles | Learn .NET January 21, 2008
I wrote about functional asynchronous programming in an earlier article and now will expand on the topic to look at ways to re-synchronize results from asynchronous function calls.
  • 0
  • 0
  • 21494

Introduction

Before we get started, I just have to say that I've been very impressed with what is achievable with a few simple and reusable extension methods in the 3.5 Framework and I would bet that this is just the tip of the iceberg of what we'll see coming out in the next few years using these features.

There are cases where we would like to launch a number of concurrent processes but then need a way to consolidate the output and continue once all of the processes have completed. We'll look at the simple cases first and work towards the more complex ones.

Part I. Simple Concurrency Example

For our first example, let's say we want to perform a series of web requests in parallel and perform some sort of action once all the responses are back as in the following code:

Action<String> f1 = uri =>
{
    WebResponse response = WebRequest.Create(uri).GetResponse();
    Console.WriteLine(String.Format("{0}: ContentType= {1}", uri, response.ContentType));
};

Action final = delegate() { Console.WriteLine("Done"); };

String[] sites = { "http://c-sharpcorner.com", "http://google.com", "http://microsoft.com" };

sites.ForEachParallel(f1, final);

I introduced a ForEach() extension method in my introductory article on functional programming, and a DoAsync() extension method in the previous async article.  We'll take a look at how we can combine these two patterns in order to execute each call concurrently and continue with another method once all concurrent methods have completed.

public static void ForEach<T>(this IEnumerable<T> items, Action<T> act)
{
    foreach (T item in items)
        act(item);
}

public static void DoAsync<TInput>(this Action<TInput> f, TInput arg, Action callback)
{
    f.BeginInvoke(arg, x => { callback(); }, null);
}

The trick is going to be waiting until all responses are back before invoking the "final" delegate.  We will accomplish this by keeping a count of the completed calls and when we have them all back, we'll invoke the consolidation delegate.

public static void ForEachParallel<T>(this IEnumerable<T> items, Action<T> act, Action finalMethod)
{
    Int32
        total = items.Count(),
        count = 0;

    items.ForEach(x =>
        act.DoAsync(x, delegate()
        {
            if (++count == total)
                finalMethod();
        })
    );
}

It is that simple... We now are able to execute a number of calls concurrently and then continue with another process when all the results are in.

Part II. ForEach() with Return Results

A more likely scenario is when we have a series of long-running processes (such as our web request) and want to process them concurrently, but wait for all the results before continuing.  In this sample (below) we see a Func<String, String> instead of an Action<String> because we are going to the url and returning a string instead of writing it to the console right away.  In this case, we want to get all the resulting strings back before we decide what to do with them.  This may be important if we wanted to fail everything if there was an exception thrown during one of the concurrent calls (which could not be done with our previous sample).

Func<String, String> f1 = uri =>
{
    WebResponse response = WebRequest.Create(uri).GetResponse();
    return String.Format("{0}: ContentType= {1}", uri, response.ContentType);
};

String[] sites = { "http://c-sharpcorner.com", "http://google.com", "http://microsoft.com" };

sites.ForEachParallel(
    f1,
    result => result.ForEach(val => Console.WriteLine(val))
);

Console.ReadLine();

We can use the same technique as in our other ForEachParallel() extension method, but this time we keep track of the results in a List<> instead of explicitly counting each result.  Once all the results are in, we call the consolidating "finalMethod" delegate.

public static void ForEachParallel<TInput, TOutput>(this IEnumerable<TInput> items, Func<TInput, TOutput> f, Action<IEnumerable<TOutput>> finalMethod)
{
    Int32 count = items.Count();

    List<TOutput> results = new List<TOutput>(count);
    items.ForEach(x =>
        f.DoAsync(x, y =>
        {
            results.Add(y);

            if (results.Count == count)
                finalMethod(results);
        })
    );
}

Part III. From the Delegate's Perspective

Situations will probably occur where we need to perform async calls from the delegate's perspective rather than the iterator and then have a consolidating process once all the results are in.  Let's take a look at performing similar functionality as we had in Part I of this article but using an extension method off of the delegate.

Action<String> act = uri =>
{
    WebResponse response = WebRequest.Create(uri).GetResponse();
    Console.WriteLine(String.Format("{0}: ContentType= {1}", uri, response.ContentType));
};

Action final = delegate() { Console.WriteLine("Done"); };

String[] sites = { "http://c-sharpcorner.com", "http://google.com", "http://microsoft.com" };

act.DoParallel(sites, final);

Console.ReadLine();

The implementation code is very similar to our first example where we are keeping track of how many methods have completed execution:

public static void DoParallel<TInput>(this Action<TInput> f, IEnumerable<TInput> arg, Action callback)
{
    Int32
        total = arg.Count(),
        count = 0;

    arg.ForEach(value =>
        f.BeginInvoke(value, result =>
        {
            if (++count == total)
                callback();
        }, null)
    );
}

Part IV. DoParallel() with Return Results

Just as we discussed previously, a more likely situation is when the process we want concurrently will have a return result and we want a method to consolidate the results once all concurrent processes have completed.

String[] sites = { "http://c-sharpcorner.com", "http://google.com", "http://microsoft.com" };

Func
<String, String> f = uri =>
{
    WebResponse response = WebRequest.Create(uri).GetResponse();
    return String.Format("{0}: ContentType= {1}", uri, response.ContentType);
};

f.DoParallel(sites, result => result.ForEach(val => Console.WriteLine(val)));

Console
.ReadLine();

Using the same pattern as before, we just keep track of all of the results in a list.

public static void DoParallel<TInput, TOutput>(this Func<TInput, TOutput> f, IEnumerable<TInput> input, Action<IEnumerable<TOutput>> consolidate)
{
    Int32 total = input.Count();
    List<TOutput> results = new List<TOutput>();

    // queue up all the work
    input.ForEach(x => f.DoAsync(x, y =>
    {
        results.Add(y);

        // if all results are in, execute consolidation delegate
        if (results.Count == total)
            consolidate(results);
    }));
}

Part V.  DoParallel() with Multiple Inputs

Although more unlikely, we may have situations where we want to execute a method with multiple inputs in parallel (if we have IEnumerable<> groups of each input).  This takes a bit more code because we have to iterate through each input manually.  Also, we have to be careful of the implications... What if the two input parameters have different lengths?  What if one input is not enumerable?  These are all situations that will most likely be solution-specific, so I'll give you an example of the most general case that will probably have to be tweaked for you application.

 public static void DoParallel<TInput1, TInput2, TOutput>(this Func<TInput1, TInput2, TOutput> f, IEnumerable<TInput1> input, IEnumerable<TInput2> input2, Action<IEnumerable<TOutput>> consolidate)
{
    Int32 total = input.Count();

    List<TOutput> results = new List<TOutput>();

    IEnumerator<TInput1> eInput1 = input.GetEnumerator();
    IEnumerator<TInput2> eInput2 = input2.GetEnumerator();

    // queue up all the work
    while (eInput1.MoveNext() && eInput2.MoveNext())
    {
        f.DoAsync(eInput1.Current, eInput2.Current,
            x =>
            {
                results.Add(x);

                // if all results are in, execute consolidation delegate
                if (results.Count == total)
                    consolidate(results);
            });
    }
}

As you can see, the extension methods we are using are very simple and reusable.  This is one of the things I find exciting about this approach to coding. I hope you found this article useful.

Until next time,
Happy coding

COMMENT USING

Trending up