« Task » : différence entre les versions

De Banane Atomic
Aller à la navigationAller à la recherche
Aucun résumé des modifications
(47 versions intermédiaires par le même utilisateur non affichées)
Ligne 1 : Ligne 1 :
[[Category:CSharp]]
[[Category:CSharp]]
= Links =
* [[LINQ#Select_async|Select async]]
* [[LINQ#Select_async|SelectMany async]]
= Task =
= Task =
A Task represents an asynchronous operation.
A Task represents an asynchronous operation.
<kode lang=cs>
<kode lang=cs>
// create and run a task which will wait for 4 seconds then return a result
// create and run a task in a new thread
var result = await Task.Run(async () =>  
var result = await Task.Run(async () =>  
{
{
Ligne 21 : Ligne 25 :
     {
     {
         var i = 1;
         var i = 1;
         while (!token.IsCancellationRequested)
         while (!token.IsCancellationRequested) // break the loop if cancellation is requested
         {
         {
             Console.Write($"{i++} ");
             Console.Write($"{i++} ");
             await Task.Delay(1000);
             await Task.Delay(1000);
         }
         }
         token.ThrowIfCancellationRequested();
 
         token.ThrowIfCancellationRequested(); // or throw an OperationCanceledException
     }, token);
     }, token);
}
}
Ligne 33 : Ligne 38 :
     Console.WriteLine(e.Message);
     Console.WriteLine(e.Message);
}
}
cancellationTokenSource.Token.Register(() => {
    /* what to when the token is cancelled */
});
</kode>
</kode>


Ligne 43 : Ligne 52 :
async Task MyMethodAsync() {}
async Task MyMethodAsync() {}


static class TaskExtension
public static class TaskExtension
{
{
     public static void Forget(this Task _)
     public static async void AndForget(this Task task)
     {
     {
        await task;
     }
     }
}
}
Ligne 55 : Ligne 65 :
* {{boxx|Nito.Mvvm.Async}} prerelease
* {{boxx|Nito.Mvvm.Async}} prerelease
* {{boxx|FontAwesome.WPF}}
* {{boxx|FontAwesome.WPF}}
<filebox fn='MainWindow.xaml'>
 
<filebox fn='MainWindow.xaml' collapsed>
<Window xmlns:Controls="clr-namespace:System.Windows.Controls;assembly=PresentationFramework"
<Window xmlns:Controls="clr-namespace:System.Windows.Controls;assembly=PresentationFramework"
         xmlns:fa="http://schemas.fontawesome.io/icons/">
         xmlns:fa="http://schemas.fontawesome.io/icons/">
Ligne 78 : Ligne 89 :
                                 Converter={StaticResource BooleanToVisibilityConverter}}"/>
                                 Converter={StaticResource BooleanToVisibilityConverter}}"/>
</filebox>
</filebox>
<filebox fn='MainVM.cs'>
 
<filebox fn='MainVM.cs' collapsed>
private string _query;
private string _query;
public string Query
public string Query
Ligne 121 : Ligne 133 :
}
}
</filebox>
</filebox>
= Exceptions =
<kode lang='cs'>
catch (AggregateException e)
{
    Console.WriteLine("There where {0} exceptions", e.InnerExceptions.Count);
}
</kode>


= Parallel =
= Parallel =
Ligne 211 : Ligne 215 :


== Shared variable ==
== Shared variable ==
=== Lock ===
=== [https://learn.microsoft.com/en-us/dotnet/csharp/language-reference/statements/lock Lock] ===
<kode lang='cs'>
<kode lang='cs'>
private static readonly object sumLock = new();
private readonly object sumLock = new();


var sum = 0m; // shared variable, updated by threads
var sum = 0m; // shared variable, updated by threads
Ligne 236 : Ligne 240 :
Parallel.For(0, 100, i =>
Parallel.For(0, 100, i =>
{
{
     Interlocked.Add(ref sum, 1);
    Interlocked.Increment(ref sum); // add 1 to sum and return sum + 1
     Interlocked.Add(ref sum, 2);   // add 2 to sum and return sum + 2
});
});
</kode>
</kode>
Ligne 278 : Ligne 283 :
await Task.WhenAll(tasks);
await Task.WhenAll(tasks);


// Task.Run
// ForEachAsync
await Parallel.ForEachAsync(Enumerable.Range(1, 30), async (x, token) => await MyTaskAsync(x));
 
// Task.Run run task in a new thread
// even without Task.Run, Task.WhenAll will run the tasks in parallel and then for all of them to be done
var tasks = Enumerable.Range(1, 30).Select(x => Task.Run(() => MyTaskAsync(x)));
var tasks = Enumerable.Range(1, 30).Select(x => Task.Run(() => MyTaskAsync(x)));
await Task.WhenAll(tasks);
await Task.WhenAll(tasks);


private async Task<int> MyTaskAsync(int i)
private async Task MyTaskAsync(int i)
{
    Console.WriteLine(i);
    await Task.Delay(4000);
}
</kode>
 
== Handle exceptions with Task.WhenAll ==
=== The problem ===
If at least one exception occurred among the tasks, it is not possible to get the result of the working tasks.
<kode lang='cs' collapsed>
// create 10 jobs in parallel, the jobs 5 and 7 will raise an Exception
IEnumerable<Task<Job>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}"));
Task<Job[]> mainTask = Task.WhenAll(createJobTasks);
try
{
    var jobs = await mainTask;
}
catch (Exception e)
{
    e.Message; // Error Job 5, only the first exception is catched
    // AggregateException
    mainTask.Exception.Message; // One or more errors occurred. (Error Job 5) (Error Job 7)
    // ReadOnlyCollection<Exception>
    mainTask.InnerExceptions; // [0] Error Job 5, [1] Error Job 7
}
 
async Task<Job> CreateJobAsync(string name)
{
    await Task.Delay(1);
 
    if (name.EndsWith("5") || name.EndsWith("7") )
        throw new Exception($"Error {name}");
 
    return new Job { Name = name };
}
 
class Job
{
{
     var waitTime = DateTime.UtcNow.AddSeconds(4);
     public string Name { get; set; }
    while (DateTime.UtcNow < waitTime) { }
    await Task.Delay(0);
    return i;
}
}
</kode>
</kode>


== Dataflow's ActionBlock ==
=== Solution: use the task ids ===
<kode lang='cs'>
<kode lang='cs'>
using System.Threading.Tasks.Dataflow;
List<(string, Task<Job>)> jobNameWithCreateJobTasks = Enumerable.Range(1, 10).Select(x => (jobName: $"Job {x}", task: CreateJobAsync($"Job {x}"))).ToList();
Dictionary<int, string> jobNameFromTaskId = jobNameWithCreateJobTasks.ToDictionary(x => x.task.Id, x => x.jobName);
List<Task<Job>> createJobTasks = jobNameWithCreateJobTasks.Select(x => x.task).ToList();


var ids = Enumerable.Range(1, 30).ToList();
List<(string JobName, Job? Job, string ErrorMessage)> results;
var results = await TaskExtension.WhenAll(createJobTasks);


var block = new ActionBlock<int>(
results.Select(x => new
    x =>
     {
     {
         Thread.Sleep(1000);
         JobName = x.Result?.JobName ?? jobNameFromTaskId[x.taskId],
         Console.WriteLine($"{x}/{ids.Count}");
         Job = x.Result,
    },
        GlobalErrorMessage = x.Exception?.Message ?? string.Empty
     new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 });
    }));
</kode>
 
<filebox fn='TaskExtension.cs' collapsed>
public static async Task<IReadOnlyCollection<(int taskId, T? Result, AggregateException? Exception)>> WhenAll<T>(IReadOnlyCollection<Task<T>> tasks)
{
     ArgumentNullException.ThrowIfNull(tasks);


ids.ForEach(x => block.Post(x));
    List<(int taskId, T? Result, AggregateException? Exception)> results = new();


block.Complete();       // Signal completion
    try
await block.Completion; // Asynchronously wait for completion.
    {
</kode>
        await Task.WhenAll(tasks);
    }
    catch
    {
        results.AddRange(tasks.Where(x => x.IsFaulted).Select(x => (x.Id, (T?)default, x.Exception)));
    }
    finally
    {
        results.AddRange(
            tasks.Where(x => x.IsCanceled).Select(x => (x.Id, (T?)default,
            (AggregateException?)new AggregateException(new[] { new TaskCanceledException(x) }))));
        results.AddRange(
            tasks.Where(x => x.IsCompletedSuccessfully).Select(x => (x.Id, (T?)x.Result,
            (AggregateException?)default)));
    }
 
    return results;
}
</filebox>


== ParallelQuery ==
=== [https://stackoverflow.com/questions/55887028/is-it-possible-to-get-successful-results-from-a-task-whenall-when-one-of-the-tas Solution: use ContinueWith] ===
<kode lang='cs'>
<kode lang='cs' collapsed>
using MoreLinq;
var (jobs, exceptions) = await WhenAllWithExceptions(createJobTasks);


var ids = Enumerable.Range(1, 30).ToList();
static Task<(T[] Results, Exception[] Exceptions)> WhenAllWithExceptions<T>(IReadOnlyCollection<Task<T>> tasks)
{
    ArgumentNullException.ThrowIfNull(tasks);


var dop = 2;
     return Task.WhenAll(tasks).ContinueWith(t =>
var tasks = ids.Batch((ids.Count / dop) + 1)
     .AsParallel()
    .WithDegreeOfParallelism(dop)
    .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
    .Select(
        (batchedIds, workerId) =>
         {
         {
             var batchedTasks = batchedIds.Select(x => Task.Run(() =>
            T[] results = tasks
                .Where(t => t.IsCompletedSuccessfully)
                .Select(t => t.Result)
                .ToArray();
 
             var aggregateExceptions = tasks
                .Where(t => t.IsFaulted)
                .Select(t => t.Exception!);
 
            var exceptions = new AggregateException(aggregateExceptions)
                .Flatten()
                .InnerExceptions
                .ToArray();
 
            // No exceptions and at least one task was canceled
            if (exceptions.Length == 0 && t.IsCanceled)
             {
             {
                 Thread.Sleep(1000);
                 exceptions = new[] { new TaskCanceledException(t) };
                Console.WriteLine(x);
             }
            }));
             return batchedTasks;
        })
    .SelectMany(x => x);


foreach (var task in tasks)
            return (results, exceptions);
        },
        default,
        TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously,
        TaskScheduler.Default);
}
</kode>
 
=== [https://thesharperdev.com/csharps-whenall-and-exception-handling/ Solution: wrapping the task into a TaskResult] ===
Instead of having a unique try/catch for all the tasks, have it for each task.
<kode lang='cs' collapsed>
// the created jobs are wrapped into TaskResult to handle Exception
IEnumerable<Task<TaskResult<Job>>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}").ToTaskResultAsync());
var jobs = await Task.WhenAll(createJobTasks);
 
// if the TaskResult is a succes then access to the Result otherwise access to the Exception ErrorMessage
var writeTasks = jobs.Select(x => Task.Run(() => Console.WriteLine(x.Success ? x.Result.Name : x.ErrorMessage)));
await Task.WhenAll(writeTasks);
</kode>
 
<filebox fn='TaskExtension.cs' collapsed>
public static class TaskExtension
{
{
     await task;
     public static async Task<TaskResult<T>> ToTaskResultAsync<T>(this Task<T> task)
    {
        try
        {
            return new TaskResult<T> { Result = await task };
        }
        catch (Exception e)
        {
            return new TaskResult<T> { Exception = e };
        }
    }
 
    public class TaskResult<T>
    {
        public T? Result;
        public Exception? Exception { get; set; }
        public string ErrorMessage => Exception?.InnerException?.Message ?? Exception?.Message ?? string.Empty;
        public bool Success => Exception is null;
    }
}
}
</kode>
</filebox>


== PLINQ ==
== PLINQ ==
Ligne 357 : Ligne 472 :
// le parcourt commence même si parallelResult n'est pas au complet
// le parcourt commence même si parallelResult n'est pas au complet
parallelResult.ForAll(e => Console.WriteLine(e));
parallelResult.ForAll(e => Console.WriteLine(e));
</kode>
== [https://blogs.msdn.microsoft.com/pfxteam/2012/03/05/implementing-a-simple-foreachasync-part-2/ ForEachAsync] ==
<kode lang='cs'>
// extension de méthode
// dop = degree of parallelism = max number of threads
public static Task ParallelForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}
await myItems.ForEachAsync(100, async (myItem) =>
{
    myItem.MyProperty = await MyMethodAsync();
});
</kode>
</kode>

Version du 18 décembre 2023 à 23:11

Links

Task

A Task represents an asynchronous operation.

Cs.svg
// create and run a task in a new thread
var result = await Task.Run(async () => 
{
    await Task.Delay(4000);
    return 0;
});

Cancel a task

Cs.svg
var cancellationTokenSource = new CancellationTokenSource(5000); // cancel after 5s
CancellationToken token = cancellationTokenSource.Token;

try
{
    await Task.Run(async () =>
    {
        var i = 1;
        while (!token.IsCancellationRequested) // break the loop if cancellation is requested
        {
            Console.Write($"{i++} ");
            await Task.Delay(1000);
        }

        token.ThrowIfCancellationRequested(); // or throw an OperationCanceledException
    }, token);
}
catch (OperationCanceledException e)
{
    Console.WriteLine(e.Message);
}

cancellationTokenSource.Token.Register(() => {
    /* what to when the token is cancelled */
});

Fire and forget

Call an async method without waiting for the response. Exceptions will be lost.

Cs.svg
_ = MyMethodAsync().ConfigureAwait(false);  // ConfigureAwait to avoid deadlock
MyMethodAsync().Forget();

async Task MyMethodAsync() {}

public static class TaskExtension
{
    public static async void AndForget(this Task task)
    {
        await task;
    }
}

Propriété WPF

Nuget:

  • Nito.Mvvm.Async prerelease
  • FontAwesome.WPF
MainWindow.xaml
<Window xmlns:Controls="clr-namespace:System.Windows.Controls;assembly=PresentationFramework"
        xmlns:fa="http://schemas.fontawesome.io/icons/">
    <Window.Resources>
        <Controls:BooleanToVisibilityConverter x:Key="BooleanToVisibilityConverter" />
    </Window.Resources>

    <TextBox Text="{Binding Query, UpdateSourceTrigger=PropertyChanged}" />
    <TextBlock Text="{Binding ResultTask.Result}" />

    <Label Content="Loading..." 
           Visibility="{Binding ResultTask.IsNotCompleted, 
                                Converter={StaticResource BooleanToVisibilityConverter}, 
                                FallbackValue=Collapsed}"/>
    <fa:ImageAwesome Icon="Refresh" Spin="True" Height="16" Width="16" 
                     Visibility="{Binding ResultTask.IsNotCompleted, 
                     Converter={StaticResource BooleanToVisibilityConverter}, 
                     FallbackValue=Collapsed}" />

    <Label Content="{Binding NotifyValuesTask.ErrorMessage}" 
           Visibility="{Binding ResultTask.IsFaulted, 
                                Converter={StaticResource BooleanToVisibilityConverter}}"/>
MainVM.cs
private string _query;
public string Query
{
    get { return _query; }
    set
    {
        Set(() => Query, ref _query, value, true);
        ResultTask = NotifyTask.Create(GetResultAsync(_query, MyCallback));
    }
}

private void MyCallback() { /* ... */ }

private NotifyTask<string> _resultTask;
public NotifyTask<string> ResultTask
{
    get
    {
        return _resultTask;
    }
    set
    {
        Set(() => ResultTask, ref _resultTask, value, true);
    }
}

public async Task<string> GetResultAsync(string query, Action callback)
{
    var url = $"http://localhost:57157/api/v1/test/result/{query}";
    var responseMessage = await _client.GetAsync(url);
    if (responseMessage.IsSuccessStatusCode)
    {
        return await responseMessage.Content.ReadAsStringAsync();
    }
    else
    {
        return await Task.FromResult($"{responseMessage.StatusCode}: {responseMessage.ReasonPhrase}");
    }

    callback();
}

Parallel

Utile si le code n'est pas séquentiel.

Cs.svg
// for loop
Parallel.For(1, 20, i =>
{
    Console.WriteLine(i);
    Thread.Sleep(1000);
});

// for each loop
Parallel.ForEach(Enumerable.Range(1, 20), i =>
{
    Console.WriteLine(i);
    Thread.Sleep(1000);
});

// invoke actions
Parallel.Invoke(
    () => {
        Console.WriteLine(1);
        Thread.Sleep(1000);
    },
    () => {
        Console.WriteLine(2);
        Thread.Sleep(1000);
    }
);

Parallel options

Cs.svg
// after 4s throw an OperationCanceledException
// no further operations will start but don't stop currently executing operations
var cancellationTokenSource = new CancellationTokenSource(4000);

var parallelOptions = new ParallelOptions
{
    MaxDegreeOfParallelism = 12, // by default use as much computer power as possible
    TaskScheduler = null,
    CancellationToken = cancellationTokenSource.Token
}

Parallel.ForEach(
    numbers,
    parallelOptions,
    (int i, ParallelLoopState loopState) =>
{
    if (loopState.ShouldExitCurrentIteration) // check if another iteration has requested to break
    {
        loopState.Break(); // break loop
    }

    if (!cancellationTokenSource.Token.IsCancellationRequested) { /* next operation step */ } // useful for long operation to break
});

Handling exceptions

All the exceptions are catched and when all the tasks have been executed then an AggregateException is thrown if any.

Cs.svg
try
{
    Parallel.Invoke(
        () =>
        {
            var waitTime = DateTime.UtcNow.AddSeconds(4);
            while (DateTime.UtcNow < waitTime) { }
        },
        () =>
        {
            throw new Exception("MyException");
        }
    );
}
catch (AggregateException ex)
{
    ex.InnerExceptions; // ReadOnlyCollection<Exception>
}

Shared variable

Lock

Cs.svg
private readonly object sumLock = new();

var sum = 0m; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
    lock(sumLock) // only 1 thread at a time can access
    {
        sum += 0.5m; // code inside the lock should take as little time as possible
    }
});
To avoid deadlocks:
  • use 1 lock object for each shared resource
  • avoid nested locks
  • use a new object

Interlocked

Create thread-safe atomic operations.

Faster than lock, but Interlocked only works with integers.
Cs.svg
int sum = 0; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
    Interlocked.Increment(ref sum); // add 1 to sum and return sum + 1
    Interlocked.Add(ref sum, 2);    // add 2 to sum and return sum + 2
});

AsyncLocal

Allow to have a different variable for each async task.

Cs.svg
private static AsyncLocal<decimal?> asyncLocal = new();
Parallel.For(0, 100, async (i) =>
{
    asyncLocal.Value = 10; // the asyncLocal is not shared among async tasks
});

Concurrent collections

BlockingCollection<T> ajout et suppression thread-safe. Add, Take. FIFO par défaut.
ConcurrentBag<T> sans ordre, doublons autorisés. Add, TryTake, TryPeek.
ConcurrentDictionary<TKey,T> TryAdd, TryUpdate, AddOrUpdate, GetOrAdd.
ConcurrentQueue<T> FIFO. Enqueue, TryDequeue.
ConcurrentStack<T> LIFO. Push, TryPop.
Cs.svg
BlockingCollection<string> col = new BlockingCollection<string>();
col.Add("text");
string s = col.Take();

foreach (string v in col.GetConsumingEnumerable())
    Console.WriteLine(v);

Tasks

Cs.svg
// AsParallel PLINQ
var tasks = Enumerable.Range(1, 30).AsParallel().Select(x => MyTaskAsync(x));
await Task.WhenAll(tasks);

// ForEachAsync
await Parallel.ForEachAsync(Enumerable.Range(1, 30), async (x, token) => await MyTaskAsync(x));

// Task.Run run task in a new thread
// even without Task.Run, Task.WhenAll will run the tasks in parallel and then for all of them to be done
var tasks = Enumerable.Range(1, 30).Select(x => Task.Run(() => MyTaskAsync(x)));
await Task.WhenAll(tasks);

private async Task MyTaskAsync(int i)
{
    Console.WriteLine(i);
    await Task.Delay(4000);
}

Handle exceptions with Task.WhenAll

The problem

If at least one exception occurred among the tasks, it is not possible to get the result of the working tasks.

Cs.svg
// create 10 jobs in parallel, the jobs 5 and 7 will raise an Exception
IEnumerable<Task<Job>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}"));
Task<Job[]> mainTask = Task.WhenAll(createJobTasks);
try
{
    var jobs = await mainTask;
}
catch (Exception e)
{
    e.Message; // Error Job 5, only the first exception is catched
    // AggregateException
    mainTask.Exception.Message; // One or more errors occurred. (Error Job 5) (Error Job 7)
    // ReadOnlyCollection<Exception>
    mainTask.InnerExceptions; // [0] Error Job 5, [1] Error Job 7
} 

async Task<Job> CreateJobAsync(string name)
{
    await Task.Delay(1);

    if (name.EndsWith("5") || name.EndsWith("7") )
        throw new Exception($"Error {name}");

    return new Job { Name = name };
}

class Job
{
    public string Name { get; set; }
}

Solution: use the task ids

Cs.svg
List<(string, Task<Job>)> jobNameWithCreateJobTasks = Enumerable.Range(1, 10).Select(x => (jobName: $"Job {x}", task: CreateJobAsync($"Job {x}"))).ToList();
Dictionary<int, string> jobNameFromTaskId = jobNameWithCreateJobTasks.ToDictionary(x => x.task.Id, x => x.jobName);
List<Task<Job>> createJobTasks = jobNameWithCreateJobTasks.Select(x => x.task).ToList();

List<(string JobName, Job? Job, string ErrorMessage)> results;
var results = await TaskExtension.WhenAll(createJobTasks);

results.Select(x => new
    {
        JobName = x.Result?.JobName ?? jobNameFromTaskId[x.taskId],
        Job = x.Result,
        GlobalErrorMessage = x.Exception?.Message ?? string.Empty
    }));
TaskExtension.cs
public static async Task<IReadOnlyCollection<(int taskId, T? Result, AggregateException? Exception)>> WhenAll<T>(IReadOnlyCollection<Task<T>> tasks)
{
    ArgumentNullException.ThrowIfNull(tasks);

    List<(int taskId, T? Result, AggregateException? Exception)> results = new();

    try
    {
        await Task.WhenAll(tasks);
    }
    catch
    {
        results.AddRange(tasks.Where(x => x.IsFaulted).Select(x => (x.Id, (T?)default, x.Exception)));
    }
    finally
    {
        results.AddRange(
            tasks.Where(x => x.IsCanceled).Select(x => (x.Id, (T?)default,
            (AggregateException?)new AggregateException(new[] { new TaskCanceledException(x) }))));
        results.AddRange(
            tasks.Where(x => x.IsCompletedSuccessfully).Select(x => (x.Id, (T?)x.Result,
            (AggregateException?)default)));
    }

    return results;
}

Solution: use ContinueWith

Cs.svg
var (jobs, exceptions) = await WhenAllWithExceptions(createJobTasks);

static Task<(T[] Results, Exception[] Exceptions)> WhenAllWithExceptions<T>(IReadOnlyCollection<Task<T>> tasks)
{
    ArgumentNullException.ThrowIfNull(tasks);

    return Task.WhenAll(tasks).ContinueWith(t =>
        {
            T[] results = tasks
                .Where(t => t.IsCompletedSuccessfully)
                .Select(t => t.Result)
                .ToArray();

            var aggregateExceptions = tasks
                .Where(t => t.IsFaulted)
                .Select(t => t.Exception!);

            var exceptions = new AggregateException(aggregateExceptions)
                .Flatten()
                .InnerExceptions
                .ToArray();

            // No exceptions and at least one task was canceled
            if (exceptions.Length == 0 && t.IsCanceled)
            {
                exceptions = new[] { new TaskCanceledException(t) };
            }

            return (results, exceptions);
        },
        default,
        TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously,
        TaskScheduler.Default);
}

Solution: wrapping the task into a TaskResult

Instead of having a unique try/catch for all the tasks, have it for each task.

Cs.svg
// the created jobs are wrapped into TaskResult to handle Exception
IEnumerable<Task<TaskResult<Job>>> createJobTasks = Enumerable.Range(1, 10).Select(x => CreateJobAsync($"Job {x}").ToTaskResultAsync());
var jobs = await Task.WhenAll(createJobTasks);

// if the TaskResult is a succes then access to the Result otherwise access to the Exception ErrorMessage
var writeTasks = jobs.Select(x => Task.Run(() => Console.WriteLine(x.Success ? x.Result.Name : x.ErrorMessage)));
await Task.WhenAll(writeTasks);
TaskExtension.cs
public static class TaskExtension
{
    public static async Task<TaskResult<T>> ToTaskResultAsync<T>(this Task<T> task)
    {
        try
        {
            return new TaskResult<T> { Result = await task };
        }
        catch (Exception e)
        {
            return new TaskResult<T> { Exception = e };
        }
    }

    public class TaskResult<T>
    {
        public T? Result;
        public Exception? Exception { get; set; }
        public string ErrorMessage => Exception?.InnerException?.Message ?? Exception?.Message ?? string.Empty;
        public bool Success => Exception is null;
    }
}

PLINQ

AsParallel analyses the query to see if it is suitable for parallelization. This analysis adds overhead.
If it is unsafe or faster to run sequentially then it won't be run in parallel.

Cs.svg
var numbers = Enumerable.Range(0, 100_000_000);

var parallelResult = numbers.AsParallel()
                            .WithDegreeOfParallelism(2)
                            .WithCancellation(token)
                            .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
                            .WithMergeOptions(ParallelMergeOptions.Default)
                            .AsOrdered() // add overhead
                            .Where(i => i % 2 == 0);

// parcourt d'itération en mode parallèle, l'ordre est perdu.
// le parcourt commence même si parallelResult n'est pas au complet
parallelResult.ForAll(e => Console.WriteLine(e));