Task

De Banane Atomic
Aller à la navigationAller à la recherche

Description

Permet d'effectuer des tâches en parallèle ou des tâches sans consommer le thread de l'UI.
L'utilisation de Task permet de gagner en réactivité mais pas en performance.

Task

Délégué asynchrone

Csharp.svg
// création et lancement asynchrone de la tâche
Task taskA = Task.Run(() => Thread.Sleep(4000));
Console.WriteLine("taskA Status: {0}", taskA.Status);  // WaitingToRun
Console.WriteLine("taskA Status: {0}", taskA.Status);  // Running

// attente de la fin de la tâche, code bloquant
taskA.Wait();
Console.WriteLine("taskA Status: {0}", taskA.Status);  // RanToCompletion

// valeur de retour
Task<int> t2 = Task.Run(() => 42 );
// code bloquant, attente de la fin de la tâche
t2.Result;  // 42

Enchaînement de tâches

Cs.svg
Task<int> t = Task.Run(() => 42)
.ContinueWith((antecedent) => antecedent.Result * 2);

Task<int> t = Task.Run(() => 42);
t.ContinueWith((antecedent) =>
{
    Console.WriteLine("Canceled");
}, TaskContinuationOptions.OnlyOnCanceled); // run only if the antecedent has been canceled

t.ContinueWith((antecedent) =>
{
    Console.WriteLine("Faulted");
}, TaskContinuationOptions.OnlyOnFaulted); // run only if the antecedent threw an unhandled exception

var completedTask = t.ContinueWith((antecedent) =>
{
    Console.WriteLine("Completed");
}, TaskContinuationOptions.OnlyOnRanToCompletion); // run only if the antecedant ran to completion

completedTask.Wait();

TaskFactory et sous-tâches

Cs.svg
Task<int[]> parent = Task.Run(() =>
{
    var results = new int[3];
    TaskFactory tf = new TaskFactory(TaskCreationOptions.AttachedToParent,
    TaskContinuationOptions.ExecuteSynchronously);
    tf.StartNew(() => results[0] = 0);
    tf.StartNew(() => results[1] = 1);
    tf.StartNew(() => results[2] = 2);
    return results;
});

var finalTask = parent.ContinueWith(parentTask =>
{
    foreach (int i in parentTask.Result)
        Console.WriteLine(i);
});

finalTask.Wait();

// exécution simultanée des tâches
Task[] tasks;
Task.WaitAll(tasks);

// attende de la fin d'une des tâches
int index = Task.WaitAny(tasks);

Annuler une tâche

Cs.svg
var cancellationTokenSource = new CancellationTokenSource();
CancellationToken token = cancellationTokenSource.Token;

Task task = Task.Run(() =>
{
    while (!token.IsCancellationRequested)
    {
        Console.Write("*");
        Thread.Sleep(1000);
    }
    token.ThrowIfCancellationRequested();
}, token);

Console.WriteLine("Press enter to stop the task");
Console.ReadLine();
cancellationTokenSource.Cancel();
try
{
    task.Wait();
}
catch (AggregateException e)
{
    Console.WriteLine(e.InnerExceptions[0].Message);
}

async await

Permet d'écrire facilement du code asynchrone, exemple: ne pas bloquer le thread graphique lors de tâches longues à exécuter.
Alternative élégante au BackgroundWorker
Une méthode async tourne séquentiellement mais permet de faire des appels await sur des blocs de code.
await permet d’exécuter des tâches longues sans bloquer le thread UI.

Cs.svg
button.Clicked += Button_Clicked;

// La méthode async doit retourner Task ou Task<T>, void est possible mais réservé au handler d'événement
async void Button_Clicked(object sender, EventArgs e)
{
    // appel asynchrone d'une méthode asynchrone
    await DoItAsync();

    // ou exécution direct du code avec le mot clé await
    label.Text = "Traitement en cours !!!";
    await Task.Delay(1000);
    label.Text = "Traitement terminé !!!";
}

// La méthode async doit retourner Task ou Task<T>, void est possible mais réservé au handler d'événement
// Le nom de la méthode doit se terminer par Async (convention)
async Task DoItAsync()
{
    label1.Text = "Traitement en cours !!!";

    await Task.Delay(1000);

    // le code suivant ne sera exécute qu'une fois la tache terminée
    // ce qui simplifie le code en évitant la création d'un event TaskCompleted pour avertir de la fin de la tache
    label1.Text = "Traitement terminé !!!";
}

Callback

Cs.svg
private async Task DoItAsync(string param, Action callback)
{
    // Do some stuff
    await MyMethodAsync();

    // Then, when it's done, call the callback
    callback();

Paramètre out

Les méthodes async ne supportent pas les paramètres out.
Utiliser un tuple comme paramètre de retour à la place.

Cs.svg
public async Task<(int status, string result)> MyMethod() {}

Application Console

Cs.svg
// permet d'appeler une méthode async dans la méthode Main d'un projet Console
string result = MyMethodAsync().GetAwaiter().GetResult();

static async Task<string> MyMethodAsync()
{
}

Fire and forget

Call an async method without waiting for the response. Exceptions will be lost.

Cs.svg
_ = MyMethodAsync().ConfigureAwait(false);  // ConfigureAwait to avoid deadlock
MyMethodAsync().Forget();

async Task MyMethodAsync() {}

static class TaskExtension
{
    public static void Forget(this Task _)
    {
    }
}

Propriété WPF

Nuget:

  • Nito.Mvvm.Async prerelease
  • FontAwesome.WPF
MainWindow.xaml
<Window xmlns:Controls="clr-namespace:System.Windows.Controls;assembly=PresentationFramework"
        xmlns:fa="http://schemas.fontawesome.io/icons/">
    <Window.Resources>
        <Controls:BooleanToVisibilityConverter x:Key="BooleanToVisibilityConverter" />
    </Window.Resources>

    <TextBox Text="{Binding Query, UpdateSourceTrigger=PropertyChanged}" />
    <TextBlock Text="{Binding ResultTask.Result}" />

    <Label Content="Loading..." 
           Visibility="{Binding ResultTask.IsNotCompleted, 
                                Converter={StaticResource BooleanToVisibilityConverter}, 
                                FallbackValue=Collapsed}"/>
    <fa:ImageAwesome Icon="Refresh" Spin="True" Height="16" Width="16" 
                     Visibility="{Binding ResultTask.IsNotCompleted, 
                     Converter={StaticResource BooleanToVisibilityConverter}, 
                     FallbackValue=Collapsed}" />

    <Label Content="{Binding NotifyValuesTask.ErrorMessage}" 
           Visibility="{Binding ResultTask.IsFaulted, 
                                Converter={StaticResource BooleanToVisibilityConverter}}"/>
MainVM.cs
private string _query;
public string Query
{
    get { return _query; }
    set
    {
        Set(() => Query, ref _query, value, true);
        ResultTask = NotifyTask.Create(GetResultAsync(_query, MyCallback));
    }
}

private void MyCallback() { /* ... */ }

private NotifyTask<string> _resultTask;
public NotifyTask<string> ResultTask
{
    get
    {
        return _resultTask;
    }
    set
    {
        Set(() => ResultTask, ref _resultTask, value, true);
    }
}

public async Task<string> GetResultAsync(string query, Action callback)
{
    var url = $"http://localhost:57157/api/v1/test/result/{query}";
    var responseMessage = await _client.GetAsync(url);
    if (responseMessage.IsSuccessStatusCode)
    {
        return await responseMessage.Content.ReadAsStringAsync();
    }
    else
    {
        return await Task.FromResult($"{responseMessage.StatusCode}: {responseMessage.ReasonPhrase}");
    }

    callback();
}

Exceptions

Cs.svg
catch (AggregateException e)
{
    Console.WriteLine("There where {0} exceptions", e.InnerExceptions.Count);
}

Accès aux ressources

Concurrent collections

BlockingCollection<T> ajout et suppression thread-safe. Add, Take. FIFO par défaut.
ConcurrentBag<T> sans ordre, doublons autorisés. Add, TryTake, TryPeek.
ConcurrentDictionary<TKey,T> TryAdd, TryUpdate, AddOrUpdate, GetOrAdd.
ConcurrentQueue<T> FIFO. Enqueue, TryDequeue.
ConcurrentStack<T> LIFO. Push, TryPop.
Cs.svg
BlockingCollection<string> col = new BlockingCollection<string>();
col.Add("text");
string s = col.Take();

foreach (string v in col.GetConsumingEnumerable())
    Console.WriteLine(v);

lock

Permet de synchroniser l'accès aux ressources.

Deadlock si 2 threads s'attendent l'un l'autre.
Cs.svg
//  pour lock, utiliser un élément private de type ref sauf string
object _lock = new object();

int n = 0;
var up = Task.Run(() =>
{
    for (int i = 0; i < 1_000_000; i++)
    {
        lock (_lock)
            n++;
    }
});

for (int i = 0; i < 1_000_000; i++)
{
    lock (_lock)
        n--;
}

up.Wait();
Console.WriteLine(n);
// sans lock, le résultat est toujours différent
Cs.svg
// lock sur une ressource statique
public static class MyStaticResource
{
    private static readonly object _lock = new object();

    public static void DoSomething()
    {
        lock (_lock)
        { ... }
    }
}

Double-checked locking

Interlocked

Permet de réaliser des opérations atomiques. Évite qu'un autre thread s’insère entre la lecture et l'écriture d'une variable.

Cs.svg
var up = Task.Run(() =>
{
    for (int i = 0; i < 1_000_000; i++)
        Interlocked.Increment(ref n);
});

for (int i = 0; i < 1_000_000; i++)
    Interlocked.Decrement(ref n);

// assigne une nouvelle valeur et retourne l'ancienne valeure.
int oldValue = Interlocked.Exchange(ref value, newValue);

// si value == compareTo alors value = newValue
Interlocked.CompareExchange(ref value, newValue, compareTo);

Parallel

Utile si le code n'est pas séquentiel.

Cs.svg
Parallel.For(1, 20, i =>
{
    Console.WriteLine(i);
    Thread.Sleep(1000);
});

Parallel.ForEach(Enumerable.Range(1, 20), i =>
{
    Console.WriteLine(i);
    Thread.Sleep(1000);
});

Parallel.Invoke(
    () => {
        Console.WriteLine(1);
        Thread.Sleep(1000);
    },
    () => {
        Console.WriteLine(2);
        Thread.Sleep(1000);
    }
);

// MaxDegreeOfParallelism, CancellationToken, TaskScheduler and ParallelLoopState
Parallel.ForEach(
    numbers,
    new ParallelOptions { MaxDegreeOfParallelism = 12, TaskScheduler = null , CancellationToken = cancellationToken },
    (int i, ParallelLoopState loopState) =>
{
    if (loopState.ShouldExitCurrentIteration) // check if another iteration has requested to break
    {
        loopState.Break(); // break loop
    }
});
Option Description
MaxDegreeOfParallelism by default use as much computer power as possible

Handling exceptions

All the exceptions are catched and when all the tasks have been executed then an AggregateException is thrown if any.

Cs.svg
try
{
    Parallel.Invoke(
        () =>
        {
            logger.Info(1);
            var waitTime = DateTime.UtcNow.AddSeconds(4);
            while (DateTime.UtcNow < waitTime) { }
        },
        () =>
        {
            throw new Exception("222");
        }
    );
}
catch (AggregateException ex)
{
    ex.InnerExceptions; // ReadOnlyCollection<Exception>
}

Shared variable

Lock

Cs.svg
static object syncLocker = new();

var sum = 0m; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
    lock(syncLocker) // only 1 thread at a time can access
    {
        sum += 0.5m; // code inside the lock should take as little time as possible
    }
});
To avoid deadlocks:
  • use 1 lock object for each shared resource

Interlocked

Faster than lock, but Interlocked only works with integers.
Cs.svg
int sum = 0; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
    Interlocked.Add(ref sum, 1);
});

Tasks

Cs.svg
var task1 = Task1(cancellationToken);
var task2 = Task2(cancellationToken);

await Task.WhenAll(task1, task2);

var result1 = task1.Result;
var result2 = task2.Result;

private await Task<bool> Task1(CancellationToken cancellationToken)
{
    await Task.Delay(1000);
    return true;
}


var ids = Enumerable.Range(1, 30).ToList();
var tasks = ids.Select(x => Task.Run(() =>
{
    Thread.Sleep(1000);
    Console.WriteLine(x);
}));
await Task.WhenAll(tasks);

Dataflow's ActionBlock

Cs.svg
using System.Threading.Tasks.Dataflow;

var ids = Enumerable.Range(1, 30).ToList();

var block = new ActionBlock<int>(
    x =>
    {
        Thread.Sleep(1000);
        Console.WriteLine($"{x}/{ids.Count}");
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });

ids.ForEach(x => block.Post(x));

block.Complete();       // Signal completion
await block.Completion; // Asynchronously wait for completion.

ParallelQuery

Cs.svg
using MoreLinq;

var ids = Enumerable.Range(1, 30).ToList();

var dop = 2;
var tasks = ids.Batch((ids.Count / dop) + 1)
    .AsParallel()
    .WithDegreeOfParallelism(dop)
    .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
    .Select(
        (batchedIds, workerId) =>
        {
            var batchedTasks = batchedIds.Select(x => Task.Run(() =>
            {
                Thread.Sleep(1000);
                Console.WriteLine(x);
            }));
            return batchedTasks;
        })
    .SelectMany(x => x);

foreach (var task in tasks)
{
    await task;
}

PLINQ

Cs.svg
var numbers = Enumerable.Range(0, 100_000_000);

var parallelResult = numbers.AsParallel().AsOrdered()
  .Where(i => i % 2 == 0);

// parcourt d'itération en mode parallèle, l'ordre est perdu.
// le parcourt commence même si parallelResult n'est pas au complet
parallelResult.ForAll(e => Console.WriteLine(e));

ForEachAsync

Cs.svg
// extension de méthode
// dop = degree of parallelism = max number of threads
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}

await myItems.ForEachAsync(100, async (myItem) =>
{
    myItem.MyProperty = await MyMethodAsync();
});