1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
|
public static void ParallelForEach<T>(IEnumerable<T> source, Action<T> action)
{
// Get the number of processors, initialize the number of remaining
// threads, and set the starting point for the iteration.
int numProcs = Environment.ProcessorCount;
int remainingWorkItems = numProcs;
int nextIteration = 0;
int exclusiveUpperBound = source.Count();
const int batchSize = 3;
using (ManualResetEvent mre = new ManualResetEvent(false))
{
// Create each of the work items.
for (int p = 0; p < numProcs; p++)
{
ThreadPool.QueueUserWorkItem(delegate
{
int index;
while ((index = Interlocked.Add(ref nextIteration, batchSize) - batchSize) < exclusiveUpperBound)
{
// In a real implementation, wed need to handle
// overflow on this arithmetic.
int end = index + batchSize;
if (end > 0)
{
if (end >= exclusiveUpperBound)
{
end = exclusiveUpperBound;
}
for (int i = index; i < end; i++)
{
action(source.ElementAt(i));
}
}
}
if (Interlocked.Decrement(ref remainingWorkItems) == 0)
mre.Set();
});
}
// Wait for all threads to complete
mre.WaitOne();
}
// return
} |
Partager