Hemme's Blog

Stories from behind the keyboard

  • RSS
  • Twitter
  • Linkedin

Some days ago Raffaele Rialdi spoke at the 18th DotNetMarche Workshop about .NET Parallel Extensions and other goodies – by the way, if you speak Italian you cannot miss the recordings of the event: go here and grab Raffaele’s sessions and Alkampfer’s whole recap about Linq.

In this post I will use the .NET Parallel Library to implement a simple producer/consumer pattern. The scenario is quite simple: a method, a.k.a. the producer, keeps on pushing data inside a collection (in my case, a queue) while one or more methods, a.k.a. the consumers, access - in parallel - the collection and grab their own bits of data. In order to keep things simple, I’m implementing these methods in the form of lambdas.

The queue I’m going to use is a ConcurrentQueue<T>: it is included in a new namespace called System.Collections.Concurrent and it supports parallel computation in a simple and stable way.

var queue = new ConcurrentQueue<int>();

The producer will insert 1000 numbers inside the queue, at intervals of 10 milliseconds. The ForAll() extension method is available to ParallelQuery objects; they, in turn, are the “parallelizable” version of a query or an IEnumerable<T>. In the following code, I use Enumerable.Range() to get one thousand integers, then I convert this IEnumerable<int> to a ParallelQuery<int> and, finally, I set up a parallel computation, pushing the integers into the queue.

Action producer = ()=>{
 Enumerable.Range(0,1000).AsParallel().ForAll(
   n=>{
  queue.Enqueue(n);
  Thread.Sleep(10);
 });
};

The consumer is meant to consume the queue, trying to pull out the next number, if available. If the queue looks empty, the consumer returns and writes the number of items it could de-queue. In this simulation, each cycle of the consumer is delayed by random Thread.Sleep(..). The following code declares a Random object and the consumer Action (the integer index the consumer receives is used to label the messages it writes to the Console):

var r = new Random();

Action<int> consumer = idx=>{
  var count = 0;
  do{
     Thread.Sleep(r.Next(150));
     int n;
     if (queue.TryDequeue(out n))
     {
        count++;
        Console.Write("({0}:{1})",idx,n);
     }
  } while (queue.Count>0);

  Console.Write("\n[#{0}: count={1}]",idx,count); 
};

The following instruction is the magic that starts the producer and – in parallel – activates ten instances of the consumer Action:

Parallel.Invoke(producer, ()=>Parallel.For(0,10,consumer));

All of this stuff is done thanks to the Parallel static class, that belongs to the brand new System.Threading.Tasks namespace. In effect, Parallel.Invoke() is able to launch a parallel computation, running all the actions it receives in input. In this case, it gets only two actions: the producer, and another one (written as a lambda expression) that calls the Parallel.For() method. Parallel.For() is the the asynchronous equivalent of a for(int i;i<max;i++)  statement: in the previous code, it will cycle from 0 (included) to 10 (excluded) and will pass each value to a consumer action, establishing a parallel computation.

You can test the code inside a simple Console application project or – as I did – using LINQPad. The following picture is a sample of the output I got running the code with LINQPad. As you can see, various instances of the consumer alternate each other and grab numbers from the ConcurrentQueue<int>, consuming each datum at different speed - thanks to the random duration Thread.Sleep().

...
(2:890)(7:922)(4:891)(0:923)(9:892)(6:924)(8:893)(5:925)(6:894)(1:926)(8:895)
(1:927)(3:896)(2:928)(7:897)(4:929)(6:898)(5:930)(1:899)(7:931)(0:900)(2:932)
(2:901)(2:933)(9:902)(3:934)(8:903)(7:935)(2:904)(4:936)(6:905)(5:937)(8:938)
(1:906)(6:939)(0:907)(9:940)(3:908)(5:941)(9:909)(7:942)(9:910)(6:943)(2:911)
(1:944)(5:976)(8:945)(8:977)(6:946)(4:978)(8:947)(7:979)(6:980)(0:948)(3:949)
(5:981)(1:982)(2:950)(8:983)(9:951)(2:984)(8:952)(6:953)(1:985)(5:954)(9:986)
(7:987)(4:955)(3:988)(0:956)(1:957)(3:989)(4:958)(8:990)(3:991)(6:959)(2:960)
(9:992)(0:993)(7:961)(5:962)(9:994)(4:995)(9:963)(7:964)(9:996)(1:997)(8:965)
(2:966)(4:998)(3:967)(6:999)(5:968)(0:969)(0:970)(7:971)(9:972)(7:973)(3:974)
(0:975)
[#0: count=113]
[#9: count=71]
[#1: count=115]
[#8: count=89]
[#2: count=107]
[#5: count=108]
[#4: count=77]
[#6: count=110]
[#3: count=112]
[#7: count=98]

The code described in this post lets you try some of the new classes (and patterns) included in the .NET Parallel Extensions. These classes provide powerful yet simple-to-use methods that let you implement parallel algorithms.

Happy programming!

1 comment:

Diego said...

Can be cool to try using Rx Framework and consume your list with code like:

var obs = list
.ToObservable(Scheduler.NewThread)
.BufferWithCount(TimeSpan.FromSeconds(interval), 1);
using (obs.Subscribe(i => {...})