// for loop
Parallel.For(1, 20, i =>
{
Console.WriteLine(i);
Thread.Sleep(1000);
});
// for each loop
Parallel.ForEach(Enumerable.Range(1, 20), i =>
{
Console.WriteLine(i);
Thread.Sleep(1000);
});
// invoke actions
Parallel.Invoke(
() => {
Console.WriteLine(1);
Thread.Sleep(1000);
},
() => {
Console.WriteLine(2);
Thread.Sleep(1000);
}
);
Parallel options
// after 4s throw an OperationCanceledException// no further operations will start but don't stop currently executing operationsvarcancellationTokenSource = newCancellationTokenSource(4000);
varparallelOptions = newParallelOptions
{
MaxDegreeOfParallelism = 12, // by default use as much computer power as possible
TaskScheduler = null,
CancellationToken = cancellationTokenSource.Token
}
Parallel.ForEach(
numbers,
parallelOptions,
(inti, ParallelLoopStateloopState) =>
{
if (loopState.ShouldExitCurrentIteration) // check if another iteration has requested to break
{
loopState.Break(); // break loop
}
if (!cancellationTokenSource.Token.IsCancellationRequested) { /* next operation step */ } // useful for long operation to break
});
Handling exceptions
All the exceptions are catched and when all the tasks have been executed then an AggregateException is thrown if any.
privatestaticreadonlyobjectsumLock = new();
varsum = 0m; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
lock(sumLock) // only 1 thread at a time can access
{
sum += 0.5m; // code inside the lock should take as little time as possible
}
});
To avoid deadlocks:
use 1 lock object for each shared resource
avoid nested locks
use a new object
Interlocked
Create thread-safe atomic operations.
Faster than lock, but Interlocked only works with integers.
intsum = 0; // shared variable, updated by threads
Parallel.For(0, 100, i =>
{
Interlocked.Add(refsum, 1);
});
AsyncLocal
Allow to have a different variable for each async task.
privatestaticAsyncLocal<decimal?> asyncLocal = new();
Parallel.For(0, 100, async (i) =>
{
asyncLocal.Value = 10; // the asyncLocal is not shared among async tasks
});
Concurrent collections
BlockingCollection<T>
ajout et suppression thread-safe. Add, Take. FIFO par défaut.
ConcurrentBag<T>
sans ordre, doublons autorisés. Add, TryTake, TryPeek.
ConcurrentDictionary<TKey,T>
TryAdd, TryUpdate, AddOrUpdate, GetOrAdd.
ConcurrentQueue<T>
FIFO. Enqueue, TryDequeue.
ConcurrentStack<T>
LIFO. Push, TryPop.
BlockingCollection<string> col = newBlockingCollection<string>();
col.Add("text");
strings = col.Take();
foreach (string v in col.GetConsumingEnumerable())
Console.WriteLine(v);
AsParallel analyses the query to see if it is suitable for parallelization. This analysis adds overhead.
If it is unsafe or faster to run sequentially then it won't be run in parallel.
varnumbers = Enumerable.Range(0, 100_000_000);
varparallelResult = numbers.AsParallel()
.WithDegreeOfParallelism(2)
.WithCancellation(token)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithMergeOptions(ParallelMergeOptions.Default)
.AsOrdered() // add overhead
.Where(i => i % 2 == 0);
// parcourt d'itération en mode parallèle, l'ordre est perdu.// le parcourt commence même si parallelResult n'est pas au complet
parallelResult.ForAll(e => Console.WriteLine(e));