Aller au contenu

« Task » : différence entre les versions

De Banane Atomic
 
(118 versions intermédiaires par le même utilisateur non affichées)
Ligne 1 : Ligne 1 :
[[Category:CSharp]]
[[Category:CSharp]]
= Description =
= Links =
Permet d'effectuer des tâches en parallèle ou des tâches sans consommer le thread de l'UI.<br />
* [[LINQ#Select_async|Select async]]
L'utilisation de Task permet de gagner en réactivité mais pas en performance.
* [[LINQ#Select_async|SelectMany async]]


= [https://msdn.microsoft.com/en-us/library/system.threading.tasks.task(v=vs.110).aspx Task] =
= Task =
Délégué asynchrone
A Task represents an asynchronous operation.
<kode lang=csharp>
<kode lang=cs>
// création et lancement asynchrone de la tâche
// create and run a task in a new thread
Task taskA = Task.Run(() => Thread.Sleep(4000));
var result = await Task.Run(async () =>  
Console.WriteLine("taskA Status: {0}", taskA.Status); // WaitingToRun
{
Console.WriteLine("taskA Status: {0}", taskA.Status);  // Running
    await Task.Delay(4000);
    return 0;
});
</kode>
 
= [https://stackoverflow.com/questions/27464287/what-is-the-difference-between-await-taskt-and-taskt-result await vs Result] =
{| class="wikitable wtp"
! await
! Result
|-
| asynchronous wait || blocking wait
|-
| "yield" the current thread || block the current thread
|-
| (re-)raise the exception || wrap the exception in an AggregateException
|}


// attente de la fin de la tâche, code bloquant
= WhenAll =
taskA.Wait();
{{info | The order of the output of WhenAll is the same as the order of the input tasks.}}
Console.WriteLine("taskA Status: {0}", taskA.Status); // RanToCompletion
<kode lang='cs'>
var inputAndTask1 = new { Input = 1, Task = GetDataAsync(1) };
var inputAndTask2 = new { Input = 2, Task = GetDataAsync(2) };
var inputAndTask3 = new { Input = 3, Task = GetDataAsync(3) };


// valeur de retour
await Task.WhenAll(task1.Task, task2.Task, task3.Task);
Task<int> t2 = Task.Run(() => 42 );
// code bloquant, attente de la fin de la tâche
t2.Result; // 42
</kode>


= Enchaînement de tâches =
// Result allow to acces to the result of the task without blocking because the WhenAll ensure all the tasks are finished
<kode lang=cs>
Console.WriteLine($"Input: {inputAndTask1.Input} - Output: {inputAndTask1.Task.Result}"));
Task<int> t = Task.Run(() => 42)
Console.WriteLine($"Input: {inputAndTask2.Input} - Output: {inputAndTask2.Task.Result}"));
.ContinueWith((antecedent) => antecedent.Result * 2);
Console.WriteLine($"Input: {inputAndTask3.Input} - Output: {inputAndTask3.Task.Result}"));


Task<int> t = Task.Run(() => 42);
async Task<int> GetDataAsync(int i)
t.ContinueWith((antecedent) =>
{
{
     Console.WriteLine("Canceled");
    await Task.Delay(2000);
}, TaskContinuationOptions.OnlyOnCanceled); // run only if the antecedent has been canceled
     Console.WriteLine($"{DateTime.Now} - GetDataAsync - {i}");
    return i * 2;
}
</kode>


t.ContinueWith((antecedent) =>
<kode lang='cs'>
{
var inputAndTasks = Enumerable.Range(1, 10)
     Console.WriteLine("Faulted");
    .Select(i => new { Input = i, Task = GetDataAsync(i) })
}, TaskContinuationOptions.OnlyOnFaulted); // run only if the antecedent threw an unhandled exception
     .ToList();


var completedTask = t.ContinueWith((antecedent) =>
await Task.WhenAll(inputAndTasks.Select(x => x.Task));
{
    Console.WriteLine("Completed");
}, TaskContinuationOptions.OnlyOnRanToCompletion); // run only if the antecedant ran to completion


completedTask.Wait();
// Result allow to acces to the result of the task without blocking because the WhenAll ensure all the tasks are finished
inputAndTasks.ForEach(x => Console.WriteLine($"{x.Input} - {x.Task.Result}"));
</kode>
</kode>


= TaskFactory et sous-tâches =
<kode lang='cs'>
<kode lang='cs'>
Task<int[]> parent = Task.Run(() =>
var inputs = Enumerable.Range(1, 10).ToArray();
{
    var results = new int[3];
    TaskFactory tf = new TaskFactory(TaskCreationOptions.AttachedToParent,
    TaskContinuationOptions.ExecuteSynchronously);
    tf.StartNew(() => results[0] = 0);
    tf.StartNew(() => results[1] = 1);
    tf.StartNew(() => results[2] = 2);
    return results;
});
 
var finalTask = parent.ContinueWith(parentTask =>
{
    foreach (int i in parentTask.Result)
        Console.WriteLine(i);
});


finalTask.Wait();
var outputs = await Task.WhenAll(inputs.Select(GetDataAsync));


// exécution simultanée des tâches
// as the order is the same, you can Zip
Task[] tasks;
var inputAndOutputs = inputs
Task.WaitAll(tasks);
    .Zip(outputs, (input, output) => (input, output))
    .ToArray();


// attende de la fin d'une des tâches
inputAndOutputs.ForEach(x => Console.WriteLine($"{input} - {output}"));
int index = Task.WaitAny(tasks);
</kode>
</kode>


= Annuler une tâche =
= Cancel a task =
<kode lang='cs'>
<kode lang='cs'>
var cancellationTokenSource = new CancellationTokenSource();
var cancellationTokenSource = new CancellationTokenSource(5000); // cancel after 5s
CancellationToken token = cancellationTokenSource.Token;
CancellationToken token = cancellationTokenSource.Token;


Task task = Task.Run(() =>
try
{
{
     while (!token.IsCancellationRequested)
     await Task.Run(async () =>
     {
     {
         Console.Write("*");
         var i = 1;
        Thread.Sleep(1000);
        while (!token.IsCancellationRequested) // break the loop if cancellation is requested
    }
        {
    token.ThrowIfCancellationRequested();
            Console.Write($"{i++} ");
}, token);
            await Task.Delay(1000);
        }


Console.WriteLine("Press enter to stop the task");
        token.ThrowIfCancellationRequested(); // or throw an OperationCanceledException
Console.ReadLine();
     }, token);
cancellationTokenSource.Cancel();
try
{
     task.Wait();
}
}
catch (AggregateException e)
catch (OperationCanceledException e)
{
{
     Console.WriteLine(e.InnerExceptions[0].Message);
     Console.WriteLine(e.Message);
}
}
cancellationTokenSource.Token.Register(() => {
    /* what to when the token is cancelled */
});
</kode>
</kode>


= async await =
= [https://www.meziantou.net/fire-and-forget-a-task-in-dotnet.htm Fire and forget] =
Permet d'écrire facilement du code asynchrone, exemple: ne pas bloquer le thread graphique lors de tâches longues à exécuter.<br>
Call an async method without waiting for the response. Exceptions will be lost.
Alternative élégante au [[BackgroundWorker_et_Dispatcher|BackgroundWorker]]<br />
<kode lang='cs'>
Une méthode {{boxx|async}} tourne séquentiellement mais permet de faire des appels {{boxx|await}} sur des blocs de code.<br />
// without await, the method is called and the following lines of code are executed without waiting the end of MyMethodAsync
{{boxx|await}} permet d’exécuter des tâches longues sans bloquer le thread UI.
MyMethodAsync();
* [https://msdn.microsoft.com/en-us/library/mt674882.aspx Programmation asynchrone]
* [http://blog.stephencleary.com/2012/02/async-and-await.html Guidelines]
<kode lang=cs>
button.Clicked += Button_Clicked;


// La méthode async doit retourner Task ou Task<T>, void est possible mais réservé au handler d'événement
// without await, the code is executed in background
async void Button_Clicked(object sender, EventArgs e)
// you may need to create a scope if you want to consume scoped services
Task.Run(async () =>
{
{
     // appel asynchrone d'une méthode asynchrone
     Method1();
     await DoItAsync();
     await Method2Async(); // wait the end of Method2Async before calling Method3
 
     Method3();
    // ou exécution direct du code avec le mot clé await
});
    label.Text = "Traitement en cours !!!";
     await Task.Delay(1000);
    label.Text = "Traitement terminé !!!";
}
 
// La méthode async doit retourner Task ou Task<T>, void est possible mais réservé au handler d'événement
// Le nom de la méthode doit se terminer par Async (convention)
async Task DoItAsync()
{
    label1.Text = "Traitement en cours !!!";


    await Task.Delay(1000);
MyMethodAsync().Forget();


    // le code suivant ne sera exécute qu'une fois la tache terminée
async Task MyMethodAsync() {}
    // ce qui simplifie le code en évitant la création d'un event TaskCompleted pour avertir de la fin de la tache
    label1.Text = "Traitement terminé !!!";
}
</kode>


== Callback ==
public static class TaskExtension
<kode lang='cs'>
private async Task DoItAsync(string param, Action callback)
{
{
     // Do some stuff
     public static void Forget(this Task task)
     await MyMethodAsync();
     {
        if (!task.IsCompleted || task.IsFaulted)
        {
            _ = ForgetAwaited(task);
        }


    // Then, when it's done, call the callback
        async static Task ForgetAwaited(Task task)
    callback();
        {
</kode>
            await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
        }
    }


== Paramètre out ==
    // simple version: returns void
Les méthodes async ne supportent pas les paramètres out.<br>
    public static async void Forget(this Task task)
Utiliser un tuple comme paramètre de retour à la place.
    {
<kode lang='cs'>
        await task.ConfigureAwait(false); // ConfigureAwait to avoid deadlock
public async Task<(int status, string result)> MyMethod() {}
    }
</kode>
 
== Application Console ==
<kode lang='cs'>
// permet d'appeler une méthode async dans la méthode Main d'un projet Console
string result = MyMethodAsync().GetAwaiter().GetResult();
 
static async Task<string> MyMethodAsync()
{
}
}
</kode>
</kode>


== [https://stackoverflow.com/questions/48212998/how-to-call-an-async-method-from-a-property-setter/48217792?noredirect=1#comment83625536_48217792 Propriété WPF] ==
= [https://stackoverflow.com/questions/48212998/how-to-call-an-async-method-from-a-property-setter/48217792?noredirect=1#comment83625536_48217792 Propriété WPF] =
Nuget:
Nuget:
* {{boxx|Nito.Mvvm.Async}} prerelease
* {{boxx|Nito.Mvvm.Async}} prerelease
* {{boxx|FontAwesome.WPF}}
* {{boxx|FontAwesome.WPF}}
<filebox fn='MainWindow.xaml'>
 
<filebox fn='MainWindow.xaml' collapsed>
<Window xmlns:Controls="clr-namespace:System.Windows.Controls;assembly=PresentationFramework"
<Window xmlns:Controls="clr-namespace:System.Windows.Controls;assembly=PresentationFramework"
         xmlns:fa="http://schemas.fontawesome.io/icons/">
         xmlns:fa="http://schemas.fontawesome.io/icons/">
Ligne 194 : Ligne 172 :
                                 Converter={StaticResource BooleanToVisibilityConverter}}"/>
                                 Converter={StaticResource BooleanToVisibilityConverter}}"/>
</filebox>
</filebox>
<filebox fn='MainVM.cs'>
 
<filebox fn='MainVM.cs' collapsed>
private string _query;
private string _query;
public string Query
public string Query
Ligne 238 : Ligne 217 :
</filebox>
</filebox>


= Exceptions =
= Parallel =
Utile si le code n'est pas séquentiel.
<kode lang='cs'>
// for loop
Parallel.For(1, 20, i =>
{
    Console.WriteLine(i);
    Thread.Sleep(1000);
});
 
// for each loop
Parallel.ForEach(Enumerable.Range(1, 20), i =>
{
    Console.WriteLine(i);
    Thread.Sleep(1000);
});
 
// invoke actions
Parallel.Invoke(
    () => {
        Console.WriteLine(1);
        Thread.Sleep(1000);
    },
    () => {
        Console.WriteLine(2);
        Thread.Sleep(1000);
    }
);
</kode>
 
== Parallel options ==
<kode lang='cs'>
// after 4s throw an OperationCanceledException
// no further operations will start but don't stop currently executing operations
var cancellationTokenSource = new CancellationTokenSource(4000);
 
var parallelOptions = new ParallelOptions
{
    MaxDegreeOfParallelism = 12, // by default use as much computer power as possible
    TaskScheduler = null,
    CancellationToken = cancellationTokenSource.Token
}
 
Parallel.ForEach(
    numbers,
    parallelOptions,
    (int i, ParallelLoopState loopState) =>
{
    if (loopState.ShouldExitCurrentIteration) // check if another iteration has requested to break
    {
        loopState.Break(); // break loop
    }
 
    if (!cancellationTokenSource.Token.IsCancellationRequested) { /* next operation step */ } // useful for long operation to break
});
</kode>
 
== Handling exceptions ==
All the exceptions are catched and when all the tasks have been executed then an AggregateException is thrown if any.
<kode lang='cs'>
<kode lang='cs'>
catch (AggregateException e)
try
{
    Parallel.Invoke(
        () =>
        {
            var waitTime = DateTime.UtcNow.AddSeconds(4);
            while (DateTime.UtcNow < waitTime) { }
        },
        () =>
        {
            throw new Exception("MyException");
        }
    );
}
catch (AggregateException ex)
{
{
     Console.WriteLine("There where {0} exceptions", e.InnerExceptions.Count);
     ex.InnerExceptions; // ReadOnlyCollection<Exception>
}
}
</kode>
</kode>


= Accès aux ressources =
== Shared variable ==
=== [https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/statements/lock Lock] ===
<kode lang='cs'>
private readonly object sumLock = new();
 
var sum = 0m; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
    lock(sumLock) // only 1 thread at a time can access
    {
        sum += 0.5m; // code inside the lock should take as little time as possible
    }
});
</kode>
{{warn | To avoid deadlocks:
* use 1 lock object for each shared resource
* avoid nested locks
* use a {{boxx|new object}}}}
 
=== Interlocked ===
Create thread-safe atomic operations.
{{warn | Faster than lock, but {{boxx|Interlocked}} only works with integers.}}
<kode lang='cs'>
int sum = 0; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
    Interlocked.Increment(ref sum); // add 1 to sum and return sum + 1
    Interlocked.Add(ref sum, 2);    // add 2 to sum and return sum + 2
});
</kode>
 
== AsyncLocal ==
Allow to have a different variable for each async task.
<kode lang='cs'>
private static AsyncLocal<decimal?> asyncLocal = new();
Parallel.For(0, 100, async (i) =>
{
    asyncLocal.Value = 10; // the asyncLocal is not shared among async tasks
});
</kode>
 
== Concurrent collections ==
== Concurrent collections ==
{| class="wikitable wtp wtmono1"  
{| class="wikitable wtp wtmono1"  
Ligne 269 : Ligne 360 :
</kode>
</kode>


== lock ==
== Tasks ==
Permet de synchroniser l'accès aux ressources.
{{warn | Deadlock si 2 threads s'attendent l'un l'autre.}}
<kode lang='cs'>
<kode lang='cs'>
// pour lock, utiliser un élément private de type ref sauf string
// AsParallel PLINQ
object _lock = new object();
var tasks = Enumerable.Range(1, 30).AsParallel().Select(x => MyTaskAsync(x));
await Task.WhenAll(tasks);
 
// ForEachAsync
await Parallel.ForEachAsync(Enumerable.Range(1, 30), async (x, token) => await MyTaskAsync(x));
 
// Task.Run run task in a new thread
// even without Task.Run, Task.WhenAll will run the tasks in parallel and then for all of them to be done
var tasks = Enumerable.Range(1, 30).Select(x => Task.Run(() => MyTaskAsync(x)));
await Task.WhenAll(tasks);


int n = 0;
private async Task MyTaskAsync(int i)
var up = Task.Run(() =>
{
{
     for (int i = 0; i < 1_000_000; i++)
     Console.WriteLine(i);
     {
     await Task.Delay(4000);
        lock (_lock)
}
            n++;
</kode>
    }
});


for (int i = 0; i < 1_000_000; i++)
== Handle exceptions with Task.WhenAll ==
=== The problem ===
If at least one exception occurred among the tasks, it is not possible to get the result of the working tasks.
<kode lang='cs' collapsed>
// create 10 jobs in parallel, the jobs 5 and 7 will raise an Exception
IEnumerable<Task<Job>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}"));
Task<Job[]> mainTask = Task.WhenAll(createJobTasks);
try
{
{
     lock (_lock)
     var jobs = await mainTask;
        n--;
}
}
catch (Exception e)
{
    e.Message; // Error Job 5, only the first exception is catched
    // AggregateException
    mainTask.Exception.Message; // One or more errors occurred. (Error Job 5) (Error Job 7)
    // ReadOnlyCollection<Exception>
    mainTask.InnerExceptions; // [0] Error Job 5, [1] Error Job 7
}
async Task<Job> CreateJobAsync(string name)
{
    await Task.Delay(1);
    if (name.EndsWith("5") || name.EndsWith("7") )
        throw new Exception($"Error {name}");


up.Wait();
    return new Job { Name = name };
Console.WriteLine(n);
}
// sans lock, le résultat est toujours différent
</kode>


<kode lang='cs'>
class Job
// lock sur une ressource statique
public static class MyStaticResource
{
{
    static object _lock = new object();
     public string Name { get; set; }
 
     public static void DoSomething()
    {
        lock (_lock)
        { ... }
    }
}
}
</kode>
</kode>
[[Design_Patterns#Double-checked_locking|Double-checked locking]]


== Interlocked ==
=== Solution: use the task ids ===
Permet de réaliser des opérations atomiques. Évite qu'un autre thread s’insère entre la lecture et l'écriture d'une variable.
<kode lang='cs'>
<kode lang='cs'>
var up = Task.Run(() =>
List<(string, Task<Job>)> jobNameWithCreateJobTasks = Enumerable.Range(1, 10).Select(x => (jobName: $"Job {x}", task: CreateJobAsync($"Job {x}"))).ToList();
{
Dictionary<int, string> jobNameFromTaskId = jobNameWithCreateJobTasks.ToDictionary(x => x.task.Id, x => x.jobName);
    for (int i = 0; i < 1_000_000; i++)
List<Task<Job>> createJobTasks = jobNameWithCreateJobTasks.Select(x => x.task).ToList();
        Interlocked.Increment(ref n);
});


for (int i = 0; i < 1_000_000; i++)
List<(string JobName, Job? Job, string ErrorMessage)> results;
    Interlocked.Decrement(ref n);
var results = await TaskExtension.WhenAll(createJobTasks);


// assigne une nouvelle valeur et retourne l'ancienne valeure.
results.Select(x => new
int oldValue = Interlocked.Exchange(ref value, newValue);
    {
 
        JobName = x.Result?.JobName ?? jobNameFromTaskId[x.taskId],
// si value == compareTo alors value = newValue
        Job = x.Result,
Interlocked.CompareExchange(ref value, newValue, compareTo);
        GlobalErrorMessage = x.Exception?.Message ?? string.Empty
    }));
</kode>
</kode>


= Parallel =
<filebox fn='TaskExtension.cs' collapsed>
Utile si le code n'est pas séquentiel.
public static async Task<IReadOnlyCollection<(int taskId, T? Result, AggregateException? Exception)>> WhenAll<T>(IReadOnlyCollection<Task<T>> tasks)
<kode lang='cs'>
Parallel.For(0, 10, i =>
{
{
     Thread.Sleep(1000);
     ArgumentNullException.ThrowIfNull(tasks);
     Console.WriteLine(i);
 
});
     List<(int taskId, T? Result, AggregateException? Exception)> results = new();


var numbers = Enumerable.Range(10, 20);
    try
Parallel.ForEach(numbers, (int i, ParallelLoopState loopState) =>
    {
{
        await Task.WhenAll(tasks);
    Thread.Sleep(1000);
    }
    Console.WriteLine(i);
    catch
    {
        results.AddRange(tasks.Where(x => x.IsFaulted).Select(x => (x.Id, (T?)default, x.Exception)));
    }
    finally
    {
        results.AddRange(
            tasks.Where(x => x.IsCanceled).Select(x => (x.Id, (T?)default,
            (AggregateException?)new AggregateException(new[] { new TaskCanceledException(x) }))));
        results.AddRange(
            tasks.Where(x => x.IsCompletedSuccessfully).Select(x => (x.Id, (T?)x.Result,
            (AggregateException?)default)));
    }


     // break loop
     return results;
    loopState.Break();
}
});
</filebox>
</kode>


== Tasks ==
=== [https://stackoverflow.com/questions/55887028/is-it-possible-to-get-successful-results-from-a-task-whenall-when-one-of-the-tas Solution: use ContinueWith] ===
<kode lang='cs'>
<kode lang='cs' collapsed>
var ids = Enumerable.Range(1, 30).ToList();
var (jobs, exceptions) = await WhenAllWithExceptions(createJobTasks);


var tasks = ids.Select(x => Task.Run(() =>
static Task<(T[] Results, Exception[] Exceptions)> WhenAllWithExceptions<T>(IReadOnlyCollection<Task<T>> tasks)
{
{
     Thread.Sleep(1000);
     ArgumentNullException.ThrowIfNull(tasks);
    Console.WriteLine(x);
}));


await Task.WhenAll(tasks);
    return Task.WhenAll(tasks).ContinueWith(t =>
</kode>
        {
            T[] results = tasks
                .Where(t => t.IsCompletedSuccessfully)
                .Select(t => t.Result)
                .ToArray();


== Dataflow's ActionBlock ==
            var aggregateExceptions = tasks
<kode lang='cs'>
                .Where(t => t.IsFaulted)
using System.Threading.Tasks.Dataflow;
                .Select(t => t.Exception!);


var ids = Enumerable.Range(1, 30).ToList();
            var exceptions = new AggregateException(aggregateExceptions)
                .Flatten()
                .InnerExceptions
                .ToArray();


var block = new ActionBlock<int>(
            // No exceptions and at least one task was canceled
    x =>
            if (exceptions.Length == 0 && t.IsCanceled)
    {
            {
        Thread.Sleep(1000);
                exceptions = new[] { new TaskCanceledException(t) };
        Console.WriteLine($"{x}/{ids.Count}");
            }
    },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });


ids.ForEach(x => block.Post(x));
            return (results, exceptions);
 
        },
block.Complete();       // Signal completion
        default,
await block.Completion; // Asynchronously wait for completion.
        TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously,
        TaskScheduler.Default);
}
</kode>
</kode>


== ParallelQuery ==
=== [https://thesharperdev.com/csharps-whenall-and-exception-handling/ Solution: wrapping the task into a TaskResult] ===
<kode lang='cs'>
Instead of having a unique try/catch for all the tasks, have it for each task.
using MoreLinq;
<kode lang='cs' collapsed>
// the created jobs are wrapped into TaskResult to handle Exception
IEnumerable<Task<TaskResult<Job>>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}").ToTaskResultAsync());
var jobs = await Task.WhenAll(createJobTasks);


var ids = Enumerable.Range(1, 30).ToList();
// if the TaskResult is a succes then access to the Result otherwise access to the Exception ErrorMessage
var writeTasks = jobs.Select(x => Task.Run(() => Console.WriteLine(x.Success ? x.Result.Name : x.ErrorMessage)));
await Task.WhenAll(writeTasks);
</kode>


var dop = 2;
<filebox fn='TaskExtension.cs' collapsed>
var tasks = ids.Batch((ids.Count / dop) + 1)
public static class TaskExtension
    .AsParallel()
{
    .WithDegreeOfParallelism(dop)
     public static async Task<TaskResult<T>> ToTaskResultAsync<T>(this Task<T> task)
     .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
     {
     .Select(
         try
         (batchedIds, workerId) =>
         {
         {
             var batchedTasks = batchedIds.Select(x => Task.Run(() =>
             return new TaskResult<T> { Result = await task };
            {
        }
                Thread.Sleep(1000);
        catch (Exception e)
                Console.WriteLine(x);
        {
             }));
             return new TaskResult<T> { Exception = e };
            return batchedTasks;
         }
         })
     }
     .SelectMany(x => x);


foreach (var task in tasks)
    public class TaskResult<T>
{
    {
     await task;
        public T? Result;
        public Exception? Exception { get; set; }
        public string ErrorMessage => Exception?.InnerException?.Message ?? Exception?.Message ?? string.Empty;
        public bool Success => Exception is null;
     }
}
}
</kode>
</filebox>


== PLINQ ==
== PLINQ ==
{{boxx|AsParallel}} analyses the query to see if it is suitable for parallelization. This analysis adds overhead.<br>
If it is unsafe or faster to run sequentially then it won't be run in parallel.
<kode lang='cs'>
<kode lang='cs'>
var numbers = Enumerable.Range(0, 100_000_000);
var numbers = Enumerable.Range(0, 100_000_000);


var parallelResult = numbers.AsParallel().AsOrdered()
var parallelResult = numbers.AsParallel()
  .Where(i => i % 2 == 0);
                            .WithDegreeOfParallelism(2)
                            .WithCancellation(token)
                            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                            .WithMergeOptions(ParallelMergeOptions.Default)
                            .AsOrdered() // add overhead
                            .Where(i => i % 2 == 0);


// parcourt d'itération en mode parallèle, l'ordre est perdu.
// parcourt d'itération en mode parallèle, l'ordre est perdu.
// le parcourt commence même si parallelResult n'est pas au complet
// le parcourt commence même si parallelResult n'est pas au complet
parallelResult.ForAll(e => Console.WriteLine(e));
parallelResult.ForAll(e => Console.WriteLine(e));
</kode>
== [https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/ ForEachAsync] ==
<kode lang='cs'>
// extension de méthode
// dop = degree of parallelism = max number of threads
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}
await myItems.ForEachAsync(100, async (myItem) =>
{
    myItem.MyProperty = await MyMethodAsync();
});
</kode>
</kode>

Dernière version du 25 novembre 2024 à 11:18

Links

Task

A Task represents an asynchronous operation.

// create and run a task in a new thread
var result = await Task.Run(async () => 
{
    await Task.Delay(4000);
    return 0;
});

await vs Result

await Result
asynchronous wait blocking wait
"yield" the current thread block the current thread
(re-)raise the exception wrap the exception in an AggregateException

WhenAll

The order of the output of WhenAll is the same as the order of the input tasks.
var inputAndTask1 = new { Input = 1, Task = GetDataAsync(1) };
var inputAndTask2 = new { Input = 2, Task = GetDataAsync(2) };
var inputAndTask3 = new { Input = 3, Task = GetDataAsync(3) };

await Task.WhenAll(task1.Task, task2.Task, task3.Task);

// Result allow to acces to the result of the task without blocking because the WhenAll ensure all the tasks are finished
Console.WriteLine($"Input: {inputAndTask1.Input} - Output: {inputAndTask1.Task.Result}"));
Console.WriteLine($"Input: {inputAndTask2.Input} - Output: {inputAndTask2.Task.Result}"));
Console.WriteLine($"Input: {inputAndTask3.Input} - Output: {inputAndTask3.Task.Result}"));

async Task<int> GetDataAsync(int i)
{
    await Task.Delay(2000);
    Console.WriteLine($"{DateTime.Now} - GetDataAsync - {i}");
    return i * 2;
}
var inputAndTasks = Enumerable.Range(1, 10)
    .Select(i => new { Input = i, Task = GetDataAsync(i) })
    .ToList();

await Task.WhenAll(inputAndTasks.Select(x => x.Task));

// Result allow to acces to the result of the task without blocking because the WhenAll ensure all the tasks are finished
inputAndTasks.ForEach(x => Console.WriteLine($"{x.Input} - {x.Task.Result}"));
var inputs = Enumerable.Range(1, 10).ToArray();

var outputs = await Task.WhenAll(inputs.Select(GetDataAsync));

// as the order is the same, you can Zip
var inputAndOutputs = inputs
    .Zip(outputs, (input, output) => (input, output))
    .ToArray();

inputAndOutputs.ForEach(x => Console.WriteLine($"{input} - {output}"));

Cancel a task

var cancellationTokenSource = new CancellationTokenSource(5000); // cancel after 5s
CancellationToken token = cancellationTokenSource.Token;

try
{
    await Task.Run(async () =>
    {
        var i = 1;
        while (!token.IsCancellationRequested) // break the loop if cancellation is requested
        {
            Console.Write($"{i++} ");
            await Task.Delay(1000);
        }

        token.ThrowIfCancellationRequested(); // or throw an OperationCanceledException
    }, token);
}
catch (OperationCanceledException e)
{
    Console.WriteLine(e.Message);
}

cancellationTokenSource.Token.Register(() => {
    /* what to when the token is cancelled */
});

Fire and forget

Call an async method without waiting for the response. Exceptions will be lost.

// without await, the method is called and the following lines of code are executed without waiting the end of MyMethodAsync
MyMethodAsync();

// without await, the code is executed in background
// you may need to create a scope if you want to consume scoped services
Task.Run(async () =>
{
    Method1();
    await Method2Async(); // wait the end of Method2Async before calling Method3
    Method3();
});

MyMethodAsync().Forget();

async Task MyMethodAsync() {}

public static class TaskExtension
{
    public static void Forget(this Task task)
    {
        if (!task.IsCompleted || task.IsFaulted)
        {
            _ = ForgetAwaited(task);
        }

        async static Task ForgetAwaited(Task task)
        {
            await task.ConfigureAwait(ConfigureAwaitOptions.SuppressThrowing);
        }
    }

    // simple version: returns void
    public static async void Forget(this Task task)
    {
        await task.ConfigureAwait(false);  // ConfigureAwait to avoid deadlock
    }
}

Propriété WPF

Nuget:

  • Nito.Mvvm.Async prerelease
  • FontAwesome.WPF
MainWindow.xaml
<Window xmlns:Controls="clr-namespace:System.Windows.Controls;assembly=PresentationFramework"
        xmlns:fa="http://schemas.fontawesome.io/icons/">
    <Window.Resources>
        <Controls:BooleanToVisibilityConverter x:Key="BooleanToVisibilityConverter" />
    </Window.Resources>

    <TextBox Text="{Binding Query, UpdateSourceTrigger=PropertyChanged}" />
    <TextBlock Text="{Binding ResultTask.Result}" />

    <Label Content="Loading..." 
           Visibility="{Binding ResultTask.IsNotCompleted, 
                                Converter={StaticResource BooleanToVisibilityConverter}, 
                                FallbackValue=Collapsed}"/>
    <fa:ImageAwesome Icon="Refresh" Spin="True" Height="16" Width="16" 
                     Visibility="{Binding ResultTask.IsNotCompleted, 
                     Converter={StaticResource BooleanToVisibilityConverter}, 
                     FallbackValue=Collapsed}" />

    <Label Content="{Binding NotifyValuesTask.ErrorMessage}" 
           Visibility="{Binding ResultTask.IsFaulted, 
                                Converter={StaticResource BooleanToVisibilityConverter}}"/>
MainVM.cs
private string _query;
public string Query
{
    get { return _query; }
    set
    {
        Set(() => Query, ref _query, value, true);
        ResultTask = NotifyTask.Create(GetResultAsync(_query, MyCallback));
    }
}

private void MyCallback() { /* ... */ }

private NotifyTask<string> _resultTask;
public NotifyTask<string> ResultTask
{
    get
    {
        return _resultTask;
    }
    set
    {
        Set(() => ResultTask, ref _resultTask, value, true);
    }
}

public async Task<string> GetResultAsync(string query, Action callback)
{
    var url = $"http://localhost:57157/api/v1/test/result/{query}";
    var responseMessage = await _client.GetAsync(url);
    if (responseMessage.IsSuccessStatusCode)
    {
        return await responseMessage.Content.ReadAsStringAsync();
    }
    else
    {
        return await Task.FromResult($"{responseMessage.StatusCode}: {responseMessage.ReasonPhrase}");
    }

    callback();
}

Parallel

Utile si le code n'est pas séquentiel.

// for loop
Parallel.For(1, 20, i =>
{
    Console.WriteLine(i);
    Thread.Sleep(1000);
});

// for each loop
Parallel.ForEach(Enumerable.Range(1, 20), i =>
{
    Console.WriteLine(i);
    Thread.Sleep(1000);
});

// invoke actions
Parallel.Invoke(
    () => {
        Console.WriteLine(1);
        Thread.Sleep(1000);
    },
    () => {
        Console.WriteLine(2);
        Thread.Sleep(1000);
    }
);

Parallel options

// after 4s throw an OperationCanceledException
// no further operations will start but don't stop currently executing operations
var cancellationTokenSource = new CancellationTokenSource(4000);

var parallelOptions = new ParallelOptions
{
    MaxDegreeOfParallelism = 12, // by default use as much computer power as possible
    TaskScheduler = null,
    CancellationToken = cancellationTokenSource.Token
}

Parallel.ForEach(
    numbers,
    parallelOptions,
    (int i, ParallelLoopState loopState) =>
{
    if (loopState.ShouldExitCurrentIteration) // check if another iteration has requested to break
    {
        loopState.Break(); // break loop
    }

    if (!cancellationTokenSource.Token.IsCancellationRequested) { /* next operation step */ } // useful for long operation to break
});

Handling exceptions

All the exceptions are catched and when all the tasks have been executed then an AggregateException is thrown if any.

try
{
    Parallel.Invoke(
        () =>
        {
            var waitTime = DateTime.UtcNow.AddSeconds(4);
            while (DateTime.UtcNow < waitTime) { }
        },
        () =>
        {
            throw new Exception("MyException");
        }
    );
}
catch (AggregateException ex)
{
    ex.InnerExceptions; // ReadOnlyCollection<Exception>
}

Shared variable

Lock

private readonly object sumLock = new();

var sum = 0m; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
    lock(sumLock) // only 1 thread at a time can access
    {
        sum += 0.5m; // code inside the lock should take as little time as possible
    }
});
To avoid deadlocks:
  • use 1 lock object for each shared resource
  • avoid nested locks
  • use a new object

Interlocked

Create thread-safe atomic operations.

Faster than lock, but Interlocked only works with integers.
int sum = 0; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
    Interlocked.Increment(ref sum); // add 1 to sum and return sum + 1
    Interlocked.Add(ref sum, 2);    // add 2 to sum and return sum + 2
});

AsyncLocal

Allow to have a different variable for each async task.

private static AsyncLocal<decimal?> asyncLocal = new();
Parallel.For(0, 100, async (i) =>
{
    asyncLocal.Value = 10; // the asyncLocal is not shared among async tasks
});

Concurrent collections

BlockingCollection<T> ajout et suppression thread-safe. Add, Take. FIFO par défaut.
ConcurrentBag<T> sans ordre, doublons autorisés. Add, TryTake, TryPeek.
ConcurrentDictionary<TKey,T> TryAdd, TryUpdate, AddOrUpdate, GetOrAdd.
ConcurrentQueue<T> FIFO. Enqueue, TryDequeue.
ConcurrentStack<T> LIFO. Push, TryPop.
BlockingCollection<string> col = new BlockingCollection<string>();
col.Add("text");
string s = col.Take();

foreach (string v in col.GetConsumingEnumerable())
    Console.WriteLine(v);

Tasks

// AsParallel PLINQ
var tasks = Enumerable.Range(1, 30).AsParallel().Select(x => MyTaskAsync(x));
await Task.WhenAll(tasks);

// ForEachAsync
await Parallel.ForEachAsync(Enumerable.Range(1, 30), async (x, token) => await MyTaskAsync(x));

// Task.Run run task in a new thread
// even without Task.Run, Task.WhenAll will run the tasks in parallel and then for all of them to be done
var tasks = Enumerable.Range(1, 30).Select(x => Task.Run(() => MyTaskAsync(x)));
await Task.WhenAll(tasks);

private async Task MyTaskAsync(int i)
{
    Console.WriteLine(i);
    await Task.Delay(4000);
}

Handle exceptions with Task.WhenAll

The problem

If at least one exception occurred among the tasks, it is not possible to get the result of the working tasks.

// create 10 jobs in parallel, the jobs 5 and 7 will raise an Exception
IEnumerable<Task<Job>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}"));
Task<Job[]> mainTask = Task.WhenAll(createJobTasks);
try
{
    var jobs = await mainTask;
}
catch (Exception e)
{
    e.Message; // Error Job 5, only the first exception is catched
    // AggregateException
    mainTask.Exception.Message; // One or more errors occurred. (Error Job 5) (Error Job 7)
    // ReadOnlyCollection<Exception>
    mainTask.InnerExceptions; // [0] Error Job 5, [1] Error Job 7
} 

async Task<Job> CreateJobAsync(string name)
{
    await Task.Delay(1);

    if (name.EndsWith("5") || name.EndsWith("7") )
        throw new Exception($"Error {name}");

    return new Job { Name = name };
}

class Job
{
    public string Name { get; set; }
}

Solution: use the task ids

List<(string, Task<Job>)> jobNameWithCreateJobTasks = Enumerable.Range(1, 10).Select(x => (jobName: $"Job {x}", task: CreateJobAsync($"Job {x}"))).ToList();
Dictionary<int, string> jobNameFromTaskId = jobNameWithCreateJobTasks.ToDictionary(x => x.task.Id, x => x.jobName);
List<Task<Job>> createJobTasks = jobNameWithCreateJobTasks.Select(x => x.task).ToList();

List<(string JobName, Job? Job, string ErrorMessage)> results;
var results = await TaskExtension.WhenAll(createJobTasks);

results.Select(x => new
    {
        JobName = x.Result?.JobName ?? jobNameFromTaskId[x.taskId],
        Job = x.Result,
        GlobalErrorMessage = x.Exception?.Message ?? string.Empty
    }));
TaskExtension.cs
public static async Task<IReadOnlyCollection<(int taskId, T? Result, AggregateException? Exception)>> WhenAll<T>(IReadOnlyCollection<Task<T>> tasks)
{
    ArgumentNullException.ThrowIfNull(tasks);

    List<(int taskId, T? Result, AggregateException? Exception)> results = new();

    try
    {
        await Task.WhenAll(tasks);
    }
    catch
    {
        results.AddRange(tasks.Where(x => x.IsFaulted).Select(x => (x.Id, (T?)default, x.Exception)));
    }
    finally
    {
        results.AddRange(
            tasks.Where(x => x.IsCanceled).Select(x => (x.Id, (T?)default,
            (AggregateException?)new AggregateException(new[] { new TaskCanceledException(x) }))));
        results.AddRange(
            tasks.Where(x => x.IsCompletedSuccessfully).Select(x => (x.Id, (T?)x.Result,
            (AggregateException?)default)));
    }

    return results;
}

Solution: use ContinueWith

var (jobs, exceptions) = await WhenAllWithExceptions(createJobTasks);

static Task<(T[] Results, Exception[] Exceptions)> WhenAllWithExceptions<T>(IReadOnlyCollection<Task<T>> tasks)
{
    ArgumentNullException.ThrowIfNull(tasks);

    return Task.WhenAll(tasks).ContinueWith(t =>
        {
            T[] results = tasks
                .Where(t => t.IsCompletedSuccessfully)
                .Select(t => t.Result)
                .ToArray();

            var aggregateExceptions = tasks
                .Where(t => t.IsFaulted)
                .Select(t => t.Exception!);

            var exceptions = new AggregateException(aggregateExceptions)
                .Flatten()
                .InnerExceptions
                .ToArray();

            // No exceptions and at least one task was canceled
            if (exceptions.Length == 0 && t.IsCanceled)
            {
                exceptions = new[] { new TaskCanceledException(t) };
            }

            return (results, exceptions);
        },
        default,
        TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously,
        TaskScheduler.Default);
}

Solution: wrapping the task into a TaskResult

Instead of having a unique try/catch for all the tasks, have it for each task.

// the created jobs are wrapped into TaskResult to handle Exception
IEnumerable<Task<TaskResult<Job>>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}").ToTaskResultAsync());
var jobs = await Task.WhenAll(createJobTasks);

// if the TaskResult is a succes then access to the Result otherwise access to the Exception ErrorMessage
var writeTasks = jobs.Select(x => Task.Run(() => Console.WriteLine(x.Success ? x.Result.Name : x.ErrorMessage)));
await Task.WhenAll(writeTasks);
TaskExtension.cs
public static class TaskExtension
{
    public static async Task<TaskResult<T>> ToTaskResultAsync<T>(this Task<T> task)
    {
        try
        {
            return new TaskResult<T> { Result = await task };
        }
        catch (Exception e)
        {
            return new TaskResult<T> { Exception = e };
        }
    }

    public class TaskResult<T>
    {
        public T? Result;
        public Exception? Exception { get; set; }
        public string ErrorMessage => Exception?.InnerException?.Message ?? Exception?.Message ?? string.Empty;
        public bool Success => Exception is null;
    }
}

PLINQ

AsParallel analyses the query to see if it is suitable for parallelization. This analysis adds overhead.
If it is unsafe or faster to run sequentially then it won't be run in parallel.

var numbers = Enumerable.Range(0, 100_000_000);

var parallelResult = numbers.AsParallel()
                            .WithDegreeOfParallelism(2)
                            .WithCancellation(token)
                            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                            .WithMergeOptions(ParallelMergeOptions.Default)
                            .AsOrdered() // add overhead
                            .Where(i => i % 2 == 0);

// parcourt d'itération en mode parallèle, l'ordre est perdu.
// le parcourt commence même si parallelResult n'est pas au complet
parallelResult.ForAll(e => Console.WriteLine(e));