Parallel Task Library

.Net Framework has many great APIs that makes application development simpler. Instead of wasting time developing software algorithms for mundane tasks such as sorting, multi-threading, handling connections, etc.. .Net has many builds in APIs that you can leverage. One such API is the Parallel Task Library (TPL).

PTL is a .Net API that handles all the complexity of multi-threading, it makes developing multi-threading applications simpler. Below are some benefits of TPL:

  • Representation of “work”
  • Encapsulates synchronous/asynchronous operations
  • Abstracts complexity of thread handling (e.g.: thread pools,…)
  • Can be executed synchronously / Parallel / asynchronously

Note: like Threading, TPL does not guarantee threads usage meaning that despite code is written in a way that supports threading, applications may still not use threads.

Async & Await

  • Async is used to mark a method for asynchronous operation as a task.
  • Async tasks can use “await” keyword to mark an execution resumption point
  • “await” in an async suspends operation until the caller is free to process further, the thread caller of the async method is not blocked.

source – examples of async await
source – MS async example

Demo 1: Async await code

using System.Threading.Tasks;
using NLog;

namespace Example
{
    class Program
    {
        static async Task Main(string[] args)
        {
            Logger logger = LogManager.GetCurrentClassLogger();
            logger.Info("start Program");

            var breakfastPreper = new BreakfastPreper();
            // await needed for below methods because UI thread is not blocked
            // causing premature exit of UI thread unless process is made to
            // wait with await or Console.ReadLine()
            await breakfastPreper.MakeOderlyBreakfast();
            await breakfastPreper.MakeConcurrentBreakfast();
            logger.Info("End Program");
        }
    }
    public class BreakfastPreper
    {
        private readonly Logger logger = LogManager.GetCurrentClassLogger();
        internal async Task MakeOderlyBreakfast()
        {
            var pourCoffee = this.AsyncTask("PourCoffee", 3);
            await pourCoffee;
            logger.Info("PourCoffee is ready");

            var fryEggsTask = this.AsyncTask("FryEggs", 3);
            await fryEggsTask;
            logger.Info("FryEggs is ready");

            var fryBanngers = this.AsyncTask("FryBanngers", 3);
            await fryBanngers;
            logger.Info("FryBanngers is ready");

            var toastBread = this.AsyncTask("ToastBread", 3);
            await toastBread;
            logger.Info("ToastBread is ready");

            var pourOJ = this.AsyncTask("PourOJ", 3);
            await pourOJ;
            logger.Info("PourOJ is ready");
        }
        internal async Task MakeConcurrentBreakfast()
        {
            var pourCoffee = this.AsyncTask("PourOJ", 6);
            var fryEggsTask = this.AsyncTaskWithSubTask("FryEggs");
            var fryBanngers = this.AsyncTask("FryBanngers", 10);
            var toastBread = this.AsyncTask("ToastBread", 10);

            await Task.WhenAll(pourCoffee, fryEggsTask, fryBanngers, toastBread);
            logger.Info("PourCoffee is ready");
            logger.Info("FryEggs is ready");
            logger.Info("FryBanngers is ready");
            logger.Info("ToastBread is ready");

            var pourOJ = this.AsyncTask("PourOJ", 5);
            await pourOJ;
            logger.Info("PourOJ is ready");
        }
        internal Task AsyncTask(string task, int seconds)
        {
            return Task.Run(async () =>
            {
                for (int i = 0; i < seconds; i++)
                {
                    logger.Info("{0}...", task);
                    await Task.Delay(1000);
                }
            });
        }
        internal Task AsyncTaskWithSubTask(string taskWithSubTask, int eggs = 5)
        {
            return Task.Run(async () =>
            {
                for (int i = 0; i < eggs; i++)
                {
                    await Task.WhenAll(
                        this.AsyncSubTask(1, "CrackEgg"),
                        this.AsyncSubTask(1, "SaltEgg")
                    );

                    logger.Info("[{0}] {1} done ...", i, taskWithSubTask);
                }
            });
        }
        internal Task AsyncSubTask(int subTask, string subTaskName, int seconds = 1)
        {
            return Task.Run(async () =>
            {
                for (int i = 0; i < seconds; i++)
                {
                    logger.Info("[{0}] {1}...", subTask, subTaskName);
                    await Task.Delay(1000);
                }
            });
        }
    }
}

Task.Factory.StartNew(…) or Task.Run(…)

Task.Factory.StartNew() returns a Task<T> where the result is the result of the delegate parameter. When used together with async/await (which also returns Task<T>) the result returns a Task<Task<T>>. The problem is when it comes to evaluating the task using Task.WaitAll() the task will evaluate up to the first await within of Task.Factory.StartNew and not the entire Task.

To work around this, you can unwrap the Task.Factory.StartNew() task before waiting for the task using Task.WaitAll(). Alternatively, you can use Task.Run()

Task<Task> startNewTask = Task.Factory.StartNew(...);
Task actualTask = startNewTask.Unwrap();
Task.WaitAll(actualTask);
// OR
Task runTask = Task.Run(...);
Task.WaitAll(runTask);

source WaitAll() not waiting for Task.Factory.StartNew()

Demo 2: Task.Factory.StartNew with async await

using System.Linq;
using System.Threading.Tasks;
using NLog;
using System.Threading;

namespace Example
{
    class Program
    {
        static void Main(string[] args)
        {
            Logger logger = LogManager.GetCurrentClassLogger();
            logger.Info("start Program");
            (new TaskTester()).StartTest();
            logger.Info("End Program");
        }
    }
    public class TaskTester
    {
        private readonly Logger logger = LogManager.GetCurrentClassLogger();
        public void StartTest()
        {
            var tasker = new TaskExample();
            WaitForCompletion(
                tasker.DoWork(3).Unwrap(),
                tasker.DoWork(2).Unwrap(),
                tasker.DoWork(1).Unwrap()
            );
        }
        public void WaitForCompletion(params Task[] tasks)
        {
            do
            {
                logger.Info("Still running ...");
                Thread.Sleep(1000);
            }
            while (!tasks.All(x => x.IsCompleted));
            Task.WaitAll(tasks.ToArray());
        }
    }
    public class TaskExample
    {
        private readonly Logger logger = LogManager.GetCurrentClassLogger();
        public Task<Task> DoWork(int work)
        {
            return Task.Factory.StartNew(async () =>
            {
                logger.Info("[{0}] start work", work);
                for (int i = 0; i < work; i++)
                {
                    logger.Info("[{0}] work...", work);
                    await Task.Delay(1000);
                }
                logger.Info("[{0}] done work", work);
            });
        }
    }
}

Async Exception

When a Task throws an exception, an aggregate Exception is expected – its an aggregate of all tasks that threw an exception

Leave a Reply

Your email address will not be published. Required fields are marked *