1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
| unit ThreadPools;
interface
uses Winapi.Windows, System.SysUtils;
type
TThreadPool = class
public type
TState = class
private
FThreadPool :TThreadPool;
FAborted :THandle;
class function DoAbort(aThreadPool :TThreadPool) :cardinal; stdcall; static;
public
function Aborted :boolean;
procedure Abort;
constructor Create(aThreadPool :TThreadPool);
destructor Destroy; override;
end;
TCallbackProc = TProc<PTPCallbackInstance, pointer>;
TParallelProc = TProc<integer, TState>;
PCallbackData = ^TCallbackData;
TCallbackData = record
Proc :TCallbackProc;
Context :pointer;
end;
private
CBEnv :TTPCallbackEnviron;
Pool :PTPPool;
CleanupGroup :PTPCleanupGroup;
FMaxThreads :integer;
procedure SetMaxThreads(const Value: integer);
procedure CleanUp(aCancelPending :boolean);
class procedure WorkCallback(aInstance :PTPCallbackInstance; aData :PCallbackData; aWork :PTPWork); stdcall; static;
public
function Add(aProc :TCallbackProc; aContext :pointer = nil) :boolean;
property MaxThreads :integer read FMaxThreads write SetMaxThreads;
constructor Create(aMaxThreads :integer = -1);
destructor Destroy; override;
class function Parallel(aMaxThreads, aLow, aHigh :integer; aProc :TParallelProc; aMayRunLong :boolean = FALSE) :boolean; overload;
class function Parallel(aLow, aHigh :integer; aProc :TParallelProc; aMayRunLong :boolean = FALSE) :boolean; overload;
end;
implementation
{ TThreadPool.TState }
class function TThreadPool.TState.DoAbort(aThreadPool: TThreadPool): cardinal;
begin
aThreadPool.CleanUp(TRUE);
Result := ERROR_SUCCESS;
end;
procedure TThreadPool.TState.Abort;
var
ThreadID :cardinal;
begin
if not Aborted then
begin
SetEvent(FAborted);
CloseHandle(CreateThread(nil, 0, @DoAbort, FThreadPool, 0, ThreadID));
end;
end;
function TThreadPool.TState.Aborted: boolean;
begin
Result := WaitForSingleObject(FAborted, 0) = WAIT_OBJECT_0;
end;
constructor TThreadPool.TState.Create(aThreadPool :TThreadPool);
begin
FThreadPool := aThreadPool;
FAborted := CreateEvent(nil, TRUE, FALSE, '');
end;
destructor TThreadPool.TState.Destroy;
begin
CloseHandle(FAborted);
inherited;
end;
{ TThreadPool }
function TThreadPool.Add(aProc :TCallbackProc; aContext :pointer): boolean;
var
Data :PCallbackData;
begin
Data := AllocMem(SizeOf(TCallbackData));
Data.Proc := aProc;
Data.Context := aContext;
const Work = CreateThreadpoolWork(@WorkCallback, Data, CBEnv);
Result := Assigned(Work);
if Result
then SubmitThreadpoolWork(Work)
else FreeMem(Data);
end;
procedure TThreadPool.CleanUp(aCancelPending: boolean);
begin
CloseThreadpoolCleanupGroupMembers(CleanupGroup, aCancelPending, nil);
end;
constructor TThreadPool.Create(aMaxThreads :integer);
begin
InitializeThreadpoolEnvironment(CBEnv);
Pool := CreateThreadpool(nil);
CleanupGroup := CreateThreadpoolCleanupGroup;
SetThreadpoolCallbackPool(CBEnv, Pool);
CBEnv.CleanupGroup := CleanupGroup;
SetThreadpoolThreadMinimum(Pool, 1);
SetMaxThreads(aMaxThreads);
end;
destructor TThreadPool.Destroy;
begin
CleanUp(TRUE);
CloseThreadpoolCleanupGroup(CleanupGroup);
CloseThreadpool(Pool);
inherited;
end;
class function TThreadPool.Parallel(aMaxThreads, aLow, aHigh: integer; aProc: TParallelProc; aMayRunLong :boolean): boolean;
begin
if aHigh < aLow then Exit(TRUE);
const ThreadPool = TThreadPool.Create(aMaxThreads);
try
const State = TState.Create(ThreadPool);
try
const DoParallel = procedure(aIndex: integer)
begin
ThreadPool.Add(procedure(aInstance: PTPCallbackInstance; aContext :pointer)
begin
if not State.Aborted then
begin
if aMayRunLong then
CallbackMayRunLong(aInstance);
aProc(aIndex, State);
end;
end);
end;
for var i := aLow to aHigh do
DoParallel(i);
ThreadPool.CleanUp(FALSE);
finally
Result := not State.Aborted;
State.Free;
end;
finally
ThreadPool.Free;
end;
end;
class function TThreadPool.Parallel(aLow, aHigh: integer; aProc: TParallelProc; aMayRunLong :boolean): boolean;
begin
Result := Parallel(0, aLow, aHigh, aProc, aMayRunLong);
end;
procedure TThreadPool.SetMaxThreads(const Value: integer);
begin
if Value > 0
then FMaxThreads := Value
else FMaxThreads := CPUCount *2;
SetThreadpoolThreadMaximum(Pool, FMaxThreads);
end;
class procedure TThreadPool.WorkCallback(aInstance: PTPCallbackInstance; aData :PCallbackData; aWork: PTPWork);
begin
try
aData.Proc(aInstance, aData.Context);
finally
FreeMem(aData);
end;
end;
end. |
Partager