diff options
Diffstat (limited to 'Src/nu/threadpool')
-rw-r--r-- | Src/nu/threadpool/ThreadFunctions.cpp | 79 | ||||
-rw-r--r-- | Src/nu/threadpool/ThreadFunctions.h | 31 | ||||
-rw-r--r-- | Src/nu/threadpool/ThreadID.cpp | 274 | ||||
-rw-r--r-- | Src/nu/threadpool/ThreadID.h | 56 | ||||
-rw-r--r-- | Src/nu/threadpool/ThreadPool.cpp | 365 | ||||
-rw-r--r-- | Src/nu/threadpool/ThreadPool.h | 98 | ||||
-rw-r--r-- | Src/nu/threadpool/TimerHandle.hpp | 54 | ||||
-rw-r--r-- | Src/nu/threadpool/api_threadpool.h | 92 | ||||
-rw-r--r-- | Src/nu/threadpool/threadpool.sln | 23 | ||||
-rw-r--r-- | Src/nu/threadpool/threadpool.vcproj | 137 | ||||
-rw-r--r-- | Src/nu/threadpool/threadpool_types.h | 8 |
11 files changed, 1217 insertions, 0 deletions
diff --git a/Src/nu/threadpool/ThreadFunctions.cpp b/Src/nu/threadpool/ThreadFunctions.cpp new file mode 100644 index 00000000..0ae23730 --- /dev/null +++ b/Src/nu/threadpool/ThreadFunctions.cpp @@ -0,0 +1,79 @@ +#include "ThreadFunctions.h" +#include "threadpool_types.h" + +ThreadFunctions::ThreadFunctions(int create_function_list) +{ + if (create_function_list) + { + functions_semaphore = CreateSemaphore(0, 0, ThreadPoolTypes::MAX_SEMAPHORE_VALUE, 0); + InitializeCriticalSectionAndSpinCount(&functions_guard, 200); + } + else + functions_semaphore = 0; +} + +ThreadFunctions::~ThreadFunctions() +{ + if (functions_semaphore) + { + CloseHandle(functions_semaphore); + DeleteCriticalSection(&functions_guard); + } +} + +void ThreadFunctions::Add(HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id) +{ + Nullsoft::Utility::AutoLock l(guard); + Data *new_data = (Data *)calloc(1, sizeof(Data)); + new_data->func = func; + new_data->user_data = user_data; + new_data->id = id; + data[handle] = new_data; +} + +bool ThreadFunctions::Get(HANDLE handle, api_threadpool::ThreadPoolFunc *func, void **user_data, intptr_t *id) +{ + Nullsoft::Utility::AutoLock l(guard); + DataMap::iterator found = data.find(handle); + if (found == data.end()) + return false; + + const Data *d = found->second; + *func = d->func; + *user_data = d->user_data; + *id = d->id; + return true; +} + +void ThreadFunctions::QueueFunction(api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id) +{ + Data *new_data = (Data *)calloc(1, sizeof(Data)); + new_data->func = func; + new_data->user_data = user_data; + new_data->id = id; + EnterCriticalSection(&functions_guard); + functions_list.push_front(new_data); + LeaveCriticalSection(&functions_guard); // unlock before releasing the semaphore early so we don't lock convoy + ReleaseSemaphore(functions_semaphore, 1, 0); +} + +bool ThreadFunctions::PopFunction(api_threadpool::ThreadPoolFunc *func, void **user_data, intptr_t *id) +{ + EnterCriticalSection(&functions_guard); + if (!functions_list.empty()) + { + ThreadFunctions::Data *data = functions_list.back(); + functions_list.pop_back(); + LeaveCriticalSection(&functions_guard); + *func = data->func; + *user_data = data->user_data; + *id = data->id; + free(data); + return true; + } + else + { + LeaveCriticalSection(&functions_guard); + return false; + } +} diff --git a/Src/nu/threadpool/ThreadFunctions.h b/Src/nu/threadpool/ThreadFunctions.h new file mode 100644 index 00000000..bbc5e6c6 --- /dev/null +++ b/Src/nu/threadpool/ThreadFunctions.h @@ -0,0 +1,31 @@ +#pragma once +#include "api_threadpool.h" +#include <map> +#include <deque> +#include "../AutoLock.h" + +class ThreadFunctions +{ +public: + struct Data + { + api_threadpool::ThreadPoolFunc func; + void *user_data; + intptr_t id; + }; + ThreadFunctions(int create_function_list=1); + ~ThreadFunctions(); + void Add(HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id); + bool Get(HANDLE handle, api_threadpool::ThreadPoolFunc *func, void **user_data, intptr_t *id); + void QueueFunction(api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id); + bool PopFunction(api_threadpool::ThreadPoolFunc *func, void **user_data, intptr_t *id); + + typedef std::map<HANDLE, const ThreadFunctions::Data*> DataMap; + DataMap data; + Nullsoft::Utility::LockGuard guard; + + typedef std::deque<ThreadFunctions::Data*> FuncList; + FuncList functions_list; + CRITICAL_SECTION functions_guard; + HANDLE functions_semaphore; +}; diff --git a/Src/nu/threadpool/ThreadID.cpp b/Src/nu/threadpool/ThreadID.cpp new file mode 100644 index 00000000..28d6f1ca --- /dev/null +++ b/Src/nu/threadpool/ThreadID.cpp @@ -0,0 +1,274 @@ +#include "ThreadID.h" + +DWORD ThreadID::thread_func_stub(LPVOID param) +{ + ThreadID *t = static_cast<ThreadID*>(param); + if (t != NULL) + { + return t->ThreadFunction(); + } + else return 0; +} + +void ThreadID::Kill() +{ + if (threadHandle && threadHandle != INVALID_HANDLE_VALUE) + { + //cut: WaitForSingleObject(threadHandle, INFINITE); + while (WaitForMultipleObjectsEx(1, &threadHandle, FALSE, INFINITE, TRUE) != WAIT_OBJECT_0) + { + } + } + +} + +ThreadID::ThreadID(ThreadFunctions *t_f, HANDLE killswitch, HANDLE global_functions_semaphore, + ThreadPoolTypes::HandleList &inherited_handles, + volatile LONG *thread_count, HANDLE _max_load_event, + int _reserved, int _com_type) : ThreadFunctions(_reserved) +{ + /* initialize values */ + released = false; + InitializeCriticalSection(&handle_lock); + + /* grab values passed to us */ + reserved = _reserved; + com_type = _com_type; + max_load_event = _max_load_event; + global_functions = t_f; + num_threads_available = thread_count; + + /* wait_handles[0] is kill switch */ + wait_handles.push_back(killswitch); + + /* wait_handles[1] is wake switch */ + wakeHandle = CreateSemaphore(0, 0, ThreadPoolTypes::MAX_SEMAPHORE_VALUE, 0); + wait_handles.push_back(wakeHandle); + + if (reserved) + { + /* if thread is reserved, + wait_handles[2] is a Funcion Call wake semaphore + for this thread only. */ + wait_handles.push_back(functions_semaphore); // WAIT_OBJECT_0+1 == per-thread queued functions + } + else + { + /* if thread is not reserved, + wait_handles[2] is a Function Call wake semaphore + global to all threads */ + wait_handles.push_back(global_functions_semaphore); // WAIT_OBJECT_0+2 == any-thread queued functions + } + + /* add inherited handles + (handles added to thread pool before this thread was created) */ + for ( ThreadPoolTypes::HandleList::iterator itr = inherited_handles.begin(); itr != inherited_handles.end(); itr++ ) + { + wait_handles.push_back( *itr ); + } + + /* start thread */ + threadHandle = CreateThread(0, 0, thread_func_stub, this, 0, 0); +} + +ThreadID::~ThreadID() +{ + CloseHandle(threadHandle); + CloseHandle(wakeHandle); + DeleteCriticalSection(&handle_lock); +} + +bool ThreadID::TryAddHandle(HANDLE new_handle) +{ + // let's see if we get lucky and can access the handle list directly + if (TryEnterCriticalSection(&handle_lock)) + { + // made it + wait_handles.push_back(new_handle); + LeaveCriticalSection(&handle_lock); + return true; + } + else + { + ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple... + return false; + } +} + +void ThreadID::WaitAddHandle(HANDLE handle) +{ + // wakeHandle already got released once by nature of this function being called + EnterCriticalSection(&handle_lock); + wait_handles.push_back(handle); + LeaveCriticalSection(&handle_lock); + ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait +} + +void ThreadID::AddHandle(HANDLE new_handle) +{ + if (!TryAddHandle(new_handle)) + WaitAddHandle(new_handle); +} + +bool ThreadID::TryRemoveHandle(HANDLE handle) +{ + // let's see if we get lucky and can access the handle list directly + if (TryEnterCriticalSection(&handle_lock)) + { + RemoveHandle_Internal(handle); + LeaveCriticalSection(&handle_lock); + return true; + } + else + { + ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple... + return false; + } + return false; +} + +void ThreadID::WaitRemoveHandle(HANDLE handle) +{ + // wakeHandle already got released once by nature of this function being called + EnterCriticalSection(&handle_lock); + RemoveHandle_Internal(handle); + LeaveCriticalSection(&handle_lock); + ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait +} + +void ThreadID::RemoveHandle(HANDLE handle) +{ + if (!TryRemoveHandle(handle)) + WaitRemoveHandle(handle); +} + +void ThreadID::RemoveHandle_Internal(HANDLE handle) +{ + // first three handles are reserved, so start after that + for (size_t i=3;i<wait_handles.size();i++) + { + if (wait_handles[i] == handle) + { + wait_handles.erase(wait_handles.begin() + i); + i--; + } + } +} + +bool ThreadID::IsReserved() const +{ + return !!reserved; +} + +DWORD CALLBACK ThreadID::ThreadFunction() +{ + switch(com_type) + { + case api_threadpool::FLAG_REQUIRE_COM_MT: + CoInitializeEx(0, COINIT_MULTITHREADED); + break; + case api_threadpool::FLAG_REQUIRE_COM_STA: + CoInitialize(0); + break; + } + + while (1) + { + InterlockedIncrement(num_threads_available); + EnterCriticalSection(&handle_lock); + DWORD ret = WaitForMultipleObjectsEx((DWORD)wait_handles.size(), wait_handles.data(), FALSE, INFINITE, TRUE); + // cut: LeaveCriticalSection(&handle_lock); + if (InterlockedDecrement(num_threads_available) == 0 && !reserved) + SetEvent(max_load_event); // notify the watch dog if all the threads are used up + + if (ret == WAIT_OBJECT_0) + { + // killswitch + LeaveCriticalSection(&handle_lock); + break; + } + else if (ret == WAIT_OBJECT_0 + 1) + { + // we got woken up to release the handles lock + // wait for the second signal + LeaveCriticalSection(&handle_lock); + InterlockedIncrement(num_threads_available); + WaitForSingleObject(wakeHandle, INFINITE); + InterlockedDecrement(num_threads_available); + } + else if (ret == WAIT_OBJECT_0 + 2) + { + LeaveCriticalSection(&handle_lock); + api_threadpool::ThreadPoolFunc func; + void *user_data; + intptr_t id; + if (reserved) + { + // per-thread queued functions + if (PopFunction(&func, &user_data, &id)) + { + func(0, user_data, id); + } + } + else + { + // global queued functions + if (global_functions->PopFunction(&func, &user_data, &id)) + { + func(0, user_data, id); + } + } + } + else if (ret > WAIT_OBJECT_0 && ret < (WAIT_OBJECT_0 + wait_handles.size())) + { + DWORD index = ret - WAIT_OBJECT_0; + HANDLE handle = wait_handles[index]; + LeaveCriticalSection(&handle_lock); + /* !!! race condition here if someone calls ThreadPool::RemoveHandle and then CloseHandle() !!! + before calling RemoveHandle, caller needs to either + ensure that Event is unsignalled (And won't be signalled) + or call RemoveHandle from within the function callback */ + api_threadpool::ThreadPoolFunc func; + void *user_data; + intptr_t id; + if (global_functions->Get(handle, &func, &user_data, &id)) + { + func(handle, user_data, id); + } + } + else + { + LeaveCriticalSection(&handle_lock); + } + } + if (com_type & api_threadpool::MASK_COM_FLAGS) + CoUninitialize(); + return 0; +} + +bool ThreadID::CanRunCOM(int flags) const +{ + switch(com_type) + { + case api_threadpool::FLAG_REQUIRE_COM_MT: // if we're a CONIT_MULTITHREADEX thread (default) + return !(flags & api_threadpool::FLAG_REQUIRE_COM_STA); // don't let STA stuff run + case api_threadpool::FLAG_REQUIRE_COM_STA: // if we're a CoInitialize(0) thread + return !(flags & api_threadpool::FLAG_REQUIRE_COM_MT); // don't let MT stuff run + } + return false; // shouldn't get here +} + +bool ThreadID::IsReleased() const +{ + return released; +} + +void ThreadID::Reserve() +{ + released=false; +} + +void ThreadID::Release() +{ + released=true; +}
\ No newline at end of file diff --git a/Src/nu/threadpool/ThreadID.h b/Src/nu/threadpool/ThreadID.h new file mode 100644 index 00000000..e5d1e09e --- /dev/null +++ b/Src/nu/threadpool/ThreadID.h @@ -0,0 +1,56 @@ +#pragma once +#include <windows.h> +#include "ThreadFunctions.h" +#include "threadpool_types.h" +#include <vector> + + +class ThreadID : private ThreadFunctions +{ +public: + static DWORD CALLBACK thread_func_stub(LPVOID param); + ThreadID(ThreadFunctions *t_f, HANDLE killswitch, HANDLE global_functions_semaphore, ThreadPoolTypes::HandleList &inherited_handles, volatile LONG *thread_count, HANDLE _max_load_event, int _reserved, int _com_type); + ~ThreadID(); + void Kill(); + + /* Try and Wait must be paired!!! */ + bool TryAddHandle(HANDLE new_handle); + void WaitAddHandle(HANDLE new_handle); + void AddHandle(HANDLE new_handle); + + /* Try and Wait must be paired!!! */ + bool TryRemoveHandle(HANDLE handle); + void WaitRemoveHandle(HANDLE handle); + void RemoveHandle(HANDLE handle); + + using ThreadFunctions::QueueFunction; + bool IsReserved() const; + bool IsReleased() const; + bool CanRunCOM(int flags) const; + void Reserve(); // re-reserves a released thread + void Release(); // release a reversed thread +private: + void RemoveHandle_Internal(HANDLE handle); + DWORD CALLBACK ThreadFunction(); + + int reserved; + ThreadFunctions *global_functions; + volatile LONG *num_threads_available; + int com_type; + bool released; + + ThreadFunctions local_functions; + + // list of handles we're waiting on + typedef std::vector<HANDLE> HandleList; + HandleList wait_handles; + CRITICAL_SECTION handle_lock; + + // handles we create/own + HANDLE threadHandle; + HANDLE wakeHandle; + + // handles given to us + HANDLE max_load_event; + +}; diff --git a/Src/nu/threadpool/ThreadPool.cpp b/Src/nu/threadpool/ThreadPool.cpp new file mode 100644 index 00000000..950df5d1 --- /dev/null +++ b/Src/nu/threadpool/ThreadPool.cpp @@ -0,0 +1,365 @@ +#include "ThreadPool.h" + +ThreadPool::ThreadPool() +{ + for ( int i = 0; i < THREAD_TYPES; i++ ) + { + num_threads_available[ i ] = 0; + max_load_event[ i ] = CreateEvent( NULL, TRUE, FALSE, NULL ); + } + + killswitch = CreateEvent( NULL, TRUE, FALSE, NULL ); + + // one thread of each type to start + for ( int i = 0; i < 2; i++ ) + CreateNewThread_Internal( i ); + + watchdog_thread_handle = CreateThread( 0, 0, WatchDogThreadProcedure_stub, this, 0, 0 ); +} + +void ThreadPool::Kill() +{ + SetEvent( killswitch ); + WaitForSingleObject( watchdog_thread_handle, INFINITE ); + CloseHandle( watchdog_thread_handle ); + + for ( ThreadID *l_thread : threads ) + { + l_thread->Kill(); + delete l_thread; + } + + CloseHandle( killswitch ); + + for ( int i = 0; i < THREAD_TYPES; i++ ) + CloseHandle( max_load_event[ i ] ); +} + +DWORD ThreadPool::WatchDogThreadProcedure_stub( LPVOID param ) +{ + ThreadPool *_this = (ThreadPool *)param; + return _this->WatchDogThreadProcedure(); +} + + +/* +watchdog will get woken up when number of available threads hits zero +it creates a new thread, sleeps for a bit to let things "settle" and then reset the event +it will need a copy of all "any-thread" handles to build the new thread, and will need to manage in a thread safe way +(so a new thread doesn't "miss" a handle that is added in the interim) +*/ +DWORD CALLBACK ThreadPool::WatchDogThreadProcedure() +{ + // we ignore the max load event for reserved threads + HANDLE events[ 3 ] = { killswitch, max_load_event[ TYPE_MT ], max_load_event[ TYPE_STA ] }; + + while ( 1 ) + { + DWORD ret = WaitForMultipleObjects( 3, events, FALSE, INFINITE ); + if ( ret == WAIT_OBJECT_0 ) + { + break; + } + else if ( ret == WAIT_OBJECT_0 + 1 || ret == WAIT_OBJECT_0 + 2 ) + { + int thread_type = ret - ( WAIT_OBJECT_0 + 1 ); + // this signal is for "full thread load reached" + + // lets make sure we're actually at max capacity + Sleep( 10 ); // sleep a bit + if ( num_threads_available[ thread_type ] != 0 ) // see if we're still fully-loaded + continue; + + guard.Lock(); + CreateNewThread_Internal( thread_type ); + guard.Unlock(); + + Sleep( 250 ); // give the system time to 'settle down' so we don't spawn a ton of threads in a row + + ResetEvent( max_load_event[ thread_type ] ); + } + } + + return 0; +} + +ThreadID *ThreadPool::ReserveThread( int flags ) +{ + + // first, check to see if there's any released threads we can grab + Nullsoft::Utility::AutoLock threadlock( guard ); + for ( ThreadID *t : threads ) + { + if ( t->IsReserved() && t->IsReleased() && t->CanRunCOM( flags ) ) + { + t->Reserve(); + return t; + } + } + + // TODO: if there are enough free threads available, mark one as reserved +// this will involve signalling the thread to switch to 'reserved' mode +// swapping out the 'function list' semaphore with a local one +// and removing all 'busy handles' from the queue +// can probably use the 'wake' handle to synchronize this + +/* +int thread_type = GetThreadType(flags); +if (num_threads_available[thread_type > 2]) +{ + for (size_t i=0;i!=threads.size();i++) + { + if (threads[i]->IsReserved() == false && threads[i]->CanRunCOM(flags)) + { + + } + } +} +*/ + + ThreadID *new_thread = CreateNewThread_Internal( GetThreadType( flags, 1 ) ); + + return new_thread; +} + +void ThreadPool::ReleaseThread( ThreadID *thread_id ) +{ + if ( thread_id ) + { + // lock so there's no race condition between ReserveThread() and ReleaseThread() + Nullsoft::Utility::AutoLock threadlock( guard ); + thread_id->Release(); + } +} + + +int ThreadPool::AddHandle( ThreadID *thread_id, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags ) +{ + // TODO: need to ensure that handles are not duplicated + thread_functions.Add( handle, func, user_data, id ); + + if ( thread_id ) + { + if ( thread_id->CanRunCOM( flags ) ) + thread_id->AddHandle( handle ); + else + return 1; + return 0; + } + else + { + /* increment thread counts temporarily - because the massive wake-up + causes thread counts to go to 0 */ + for ( int i = 0; i < THREAD_TYPES; i++ ) + InterlockedIncrement( &num_threads_available[ i ] ); + + guard.Lock(); + + AddHandle_Internal( 0, handle, flags ); + + bool thread_types[ THREAD_TYPES ]; + GetThreadTypes( flags, thread_types ); + + for ( int i = 0; i < THREAD_TYPES; i++ ) + { + if ( thread_types[ i ] ) + any_thread_handles[ i ].push_back( handle ); + } + + guard.Unlock(); + + for ( int i = 0; i < THREAD_TYPES; i++ ) + InterlockedDecrement( &num_threads_available[ i ] ); + + + } + + return 0; +} + +/* helper functions for adding/removing handles, +we keep going through the list as long as we can add/remove immediately. +once we have to block, we recurse the function starting at the next handle +when the function returns, we wait. +this lets us do some work rather than sit and wait for each thread's lock */ +void ThreadPool::RemoveHandle_Internal(size_t start, HANDLE handle) +{ + for (;start!=threads.size();start++) + { + ThreadID *t = threads[start]; + if (!t->TryRemoveHandle(handle)) // try to remove + { + // have to wait + RemoveHandle_Internal(start+1, handle); // recurse start with the next thread + t->WaitRemoveHandle(handle); // finish the job + return; + } + } +} + +void ThreadPool::AddHandle_Internal(size_t start, HANDLE handle, int flags) +{ + for (;start<threads.size();start++) + { + ThreadID *t = threads[start]; + if ((flags & api_threadpool::FLAG_LONG_EXECUTION) && t->IsReserved()) + continue; + + if (!t->CanRunCOM(flags)) + continue; + + if (!t->TryAddHandle(handle)) // try to add + { + // have to wait, + AddHandle_Internal(start+1, handle, flags); // recurse start with the next thread + t->WaitAddHandle(handle); // finish the job + return; + } + } +} + +void ThreadPool::RemoveHandle(ThreadID *thread_id, HANDLE handle) +{ + if (thread_id) + { + thread_id->RemoveHandle(handle); + } + else + { + /* increment thread counts temporarily - because the massive wake-up + causes thread counts to go to 0 */ + for (int i=0;i<THREAD_TYPES;i++) + InterlockedIncrement(&num_threads_available[i]); + guard.Lock(); + RemoveHandle_Internal(0, handle); + + for (int j=0;j<THREAD_TYPES;j++) + { + //for (ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin(); + // itr != any_thread_handles[j].end(); + // itr++) + + ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin(); + while(itr != any_thread_handles[j].end()) + { + if (*itr == handle) + { + itr = any_thread_handles[j].erase(itr); + } + else + { + itr++; + } + } + } + guard.Unlock(); + for (int i=0;i<THREAD_TYPES;i++) + InterlockedDecrement(&num_threads_available[i]); + } +} + +int ThreadPool::RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags) +{ + if (threadid) + threadid->QueueFunction(func, user_data, id); + else + thread_functions.QueueFunction(func, user_data, id); + return 0; +} + +ThreadID *ThreadPool::CreateNewThread_Internal(int thread_type) +{ + int reserved=0; + int com_type = api_threadpool::FLAG_REQUIRE_COM_MT; // default + switch(thread_type) + { + case TYPE_STA_RESERVED: + reserved=1; + case TYPE_STA: + com_type = api_threadpool::FLAG_REQUIRE_COM_STA; + break; + case TYPE_MT_RESERVED: + reserved=1; + case TYPE_MT: + com_type = api_threadpool::FLAG_REQUIRE_COM_MT; + break; + } + + Nullsoft::Utility::AutoLock threadlock(guard); // lock here (rather than after new ThreadID) to protect any_thread_handles + ThreadID *new_thread = new ThreadID(&thread_functions, killswitch, thread_functions.functions_semaphore, + any_thread_handles[thread_type], + &num_threads_available[thread_type], max_load_event[thread_type], + reserved, com_type); + threads.push_back(new_thread); + return new_thread; +} + +size_t ThreadPool::GetNumberOfThreads() +{ + Nullsoft::Utility::AutoLock threadlock(guard); + return threads.size(); +} + +size_t ThreadPool::GetNumberOfActiveThreads() +{ + size_t numThreads = GetNumberOfThreads(); + for (int i=0;i<THREAD_TYPES;i++) + numThreads -= num_threads_available[i]; + return numThreads; +} + +int ThreadPool::GetThreadType(int flags, int reserved) +{ + flags &= api_threadpool::MASK_COM_FLAGS; + int thread_type=TYPE_MT; + switch(flags) + { + case api_threadpool::FLAG_REQUIRE_COM_STA: + thread_type = reserved?TYPE_STA_RESERVED:TYPE_STA; + break; + case 0: // default + case api_threadpool::FLAG_REQUIRE_COM_MT: + thread_type = reserved?TYPE_MT_RESERVED:TYPE_MT; + break; + } + + return thread_type; +} + +void ThreadPool::GetThreadTypes(int flags, bool types[THREAD_TYPES]) +{ + for (int i=0;i<THREAD_TYPES;i++) + { + types[i]=true; + } + + if (flags & api_threadpool::FLAG_REQUIRE_COM_STA) + { + types[TYPE_MT] = false; + types[TYPE_MT] = false; + } + + if (flags & api_threadpool::FLAG_REQUIRE_COM_STA) + { + types[TYPE_STA] = false; + types[TYPE_STA_RESERVED] = false; + } + + if (flags & api_threadpool::FLAG_LONG_EXECUTION) + { + types[TYPE_STA_RESERVED] = false; + types[TYPE_MT_RESERVED] = false; + } + +} +#define CBCLASS ThreadPool +START_DISPATCH; +CB(RESERVETHREAD, ReserveThread) +VCB(RELEASETHREAD, ReleaseThread) +CB(ADDHANDLE, AddHandle) +VCB(REMOVEHANDLE, RemoveHandle) +CB(RUNFUNCTION, RunFunction) +CB(GETNUMBEROFTHREADS, GetNumberOfThreads) +CB(GETNUMBEROFACTIVETHREADS, GetNumberOfActiveThreads) +END_DISPATCH; +#undef CBCLASS diff --git a/Src/nu/threadpool/ThreadPool.h b/Src/nu/threadpool/ThreadPool.h new file mode 100644 index 00000000..f68efc12 --- /dev/null +++ b/Src/nu/threadpool/ThreadPool.h @@ -0,0 +1,98 @@ +#pragma once + +#include <windows.h> +#include <bfc/platform/types.h> +#include <vector> +#include "../autolock.h" +#include "ThreadID.h" +#include "ThreadFunctions.h" +#include "threadpool_types.h" +/* random notes + +HANDLEs common to all threads + +WaitForMultipleObjectsEx() around these +0 - killswitch +1 - shared APC event. since threads might want to use APCs themselves, we'll use a different mechanism (thread-safe FIFO and an event). the intention is that APCs that can go on any thread will use this handle +2 - per thread APC event. + + +parameters for "run my function" method +function pointer, user data, flags +flags: +interrupt - for very short non-locking functions where it is safe to interrupt another thread, uses QueueUserAPC +no_wait - spawn a new thread if all threads are busy +com_multithreaded - all threads are created with CoInitialize(0), if you need a COINIT_MULTITHREADED thread, use this flag + +parameters for "add my handle" method +handle, function pointer, user data, flags +flags: +single_thread - only one thread in the pool will wait on your object, useful if your handle is not auto-reset + +parameters for "function call repeat" - calls your function until you return 0 +function pointer, user data, flags +flags: +single_thread - keep calling on the same thread +*/ + + +class ThreadPool : public api_threadpool +{ +public: + static const char *getServiceName() { return "Thread Pool API"; } + static const GUID getServiceGuid() { return ThreadPoolGUID; } +public: + // Owner API: + ThreadPool(); + void Kill(); + + // User API: + /* If you have multiple events, APCs, etc and you need them to always run on the same thread + you can reserve one */ + ThreadID *ReserveThread(int flags); + /* Release a thread you've previously reserved */ + void ReleaseThread(ThreadID *thread_id); + + /* adds a waitable handle to the thread pool. when the event is signalled, your function ptr will get called + user_data and id values get passed to your function. + your function should return 1 to indicate that it can be removed + flags, see api_threadpool */ + int AddHandle(ThreadID *threadid, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags); + void RemoveHandle(ThreadID *threadid, HANDLE handle); + int RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags); + + size_t GetNumberOfThreads(); // total number of threads in the threadpool + size_t GetNumberOfActiveThreads(); // number of threads that are currently being used (inside user function but not necessarily busy) + +private: + enum + { + TYPE_MT = 0, + TYPE_STA = 1, + TYPE_MT_RESERVED = 2, + TYPE_STA_RESERVED = 3, + + THREAD_TYPES = 4, // two thread types, single threaded apartment COM and multithreaded COM + }; +private: + static DWORD CALLBACK WatchDogThreadProcedure_stub(LPVOID param); + ThreadID *CreateNewThread_Internal(int thread_type = 0); + DWORD CALLBACK WatchDogThreadProcedure(); + static int GetThreadType(int flags, int reserved = 0); + static void GetThreadTypes(int flags, bool types[THREAD_TYPES]); + void RemoveHandle_Internal(size_t start, HANDLE handle); // recursive helper function for RemoveHandle() + void AddHandle_Internal(size_t start, HANDLE handle, int flags); // recursive helper function for RemoveHandle() + + Nullsoft::Utility::LockGuard guard; // guards threads, any_thread_handles, and non_reserved_handles data structures + typedef std::vector<ThreadID*> ThreadList; + ThreadList threads; + ThreadPoolTypes::HandleList any_thread_handles[THREAD_TYPES]; + HANDLE killswitch; + HANDLE watchdog_thread_handle; + volatile LONG num_threads_available[THREAD_TYPES]; + ThreadFunctions thread_functions; + HANDLE max_load_event[THREAD_TYPES]; +protected: + RECVS_DISPATCH; +}; + diff --git a/Src/nu/threadpool/TimerHandle.hpp b/Src/nu/threadpool/TimerHandle.hpp new file mode 100644 index 00000000..ccbb6ef6 --- /dev/null +++ b/Src/nu/threadpool/TimerHandle.hpp @@ -0,0 +1,54 @@ +#ifndef NU_THREADPOOL_TIMERHANDLE_H +#define NU_THREADPOOL_TIMERHANDLE_H + +#if !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x400) +#error Must define _WIN32_WINNT >= 0x400 to use TimerHandle +#endif + +#include <windows.h> +#include <bfc/platform/types.h> +/* +TimerHandle() constructor will make a new timer handle +TimerHandle(existing_handle) will "take over" an existing handle +~TimerHandle() DOES NOT CloseHandle as this object is meant as a helper +call Close() to kill the timer handle + +The timer will be "one shot" auto-reset. +Because it is meant to be compatible with the threadpool, manual-reset timers and periodic timers +are not recommended!! You will have re-entrancy problems +If you want "periodic" behavior, call Wait() at the end of your ThreadPoolFunc +*/ +class TimerHandle +{ +public: + TimerHandle() { timerHandle = CreateWaitableTimer( 0, FALSE, 0 ); } + TimerHandle( HANDLE p_handle ) { timerHandle = p_handle; } + + void Close() { CloseHandle( timerHandle ); } + + void Wait( uint64_t p_milliseconds ) + { + /* MSDN notes about SetWaitableTimer: 100 nanosecond resolution, Negative values indicate relative time*/ + LARGE_INTEGER timeout = { 0 }; + timeout.QuadPart = -( (int64_t)p_milliseconds * 1000LL /*to microseconds*/ * 10LL /* to 100 nanoseconds */ ); + SetWaitableTimer( timerHandle, &timeout, 0, 0, 0, FALSE ); + } + + void Poll( uint64_t p_milliseconds ) // only use on a reserved thread!!! + { + /* MSDN notes about SetWaitableTimer: 100 nanosecond resolution, Negative values indicate relative time*/ + LARGE_INTEGER timeout = { 0 }; + timeout.QuadPart = -( (int64_t)p_milliseconds * 1000LL /*to microseconds*/ * 10LL /* to 100 nanoseconds */ ); + SetWaitableTimer( timerHandle, &timeout, (LONG)p_milliseconds, 0, 0, FALSE ); + } + + /* TODO: WaitUntil method for absolute times */ + + void Cancel() { CancelWaitableTimer( timerHandle ); } + operator HANDLE() { return timerHandle; } + +private: + HANDLE timerHandle; +}; + +#endif // !NU_THREADPOOL_TIMERHANDLE_H
\ No newline at end of file diff --git a/Src/nu/threadpool/api_threadpool.h b/Src/nu/threadpool/api_threadpool.h new file mode 100644 index 00000000..4cf5ab4f --- /dev/null +++ b/Src/nu/threadpool/api_threadpool.h @@ -0,0 +1,92 @@ +#pragma once + +#include <windows.h> +#include <bfc/platform/types.h> +#include <bfc/dispatch.h> + +class ThreadID; + +class api_threadpool : public Dispatchable +{ +protected: + api_threadpool() {} + ~api_threadpool() {} +public: + typedef int (*ThreadPoolFunc)(HANDLE handle, void *user_data, intptr_t id); + enum + { + // pass this flag to AddHandle or RunFunction indicate that your thread function + // might run for a long time + FLAG_LONG_EXECUTION = 0x1, + FLAG_REQUIRE_COM_STA = 0x2, + FLAG_REQUIRE_COM_MT = 0x4, + MASK_COM_FLAGS = 0x6, + }; + +public: + ThreadID *ReserveThread(int flags); + /* Release a thread you've previously reserved */ + void ReleaseThread(ThreadID *thread_id); + + /* adds a waitable handle to the thread pool. when the event is signalled, your function ptr will get called + user_data and id values get passed to your function. + your function should return 1 to indicate that it can be removed + flags, see api_threadpool */ + int AddHandle(ThreadID *threadid, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags); + void RemoveHandle(ThreadID *threadid, HANDLE handle); + int RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags); + + size_t GetNumberOfThreads(); // total number of threads in the threadpool + size_t GetNumberOfActiveThreads(); // number of threads that are currently being used (inside user function but not necessarily busy) + + enum + { + RESERVETHREAD = 0, + RELEASETHREAD = 1, + ADDHANDLE = 2, + REMOVEHANDLE = 3, + RUNFUNCTION = 4, + GETNUMBEROFTHREADS = 5, + GETNUMBEROFACTIVETHREADS = 6, + }; +}; + +inline ThreadID *api_threadpool::ReserveThread(int flags) +{ + return _call(RESERVETHREAD, (ThreadID *)0, flags); +} + +inline void api_threadpool::ReleaseThread(ThreadID *thread_id) +{ + _voidcall(RELEASETHREAD, thread_id); +} + +inline int api_threadpool::AddHandle(ThreadID *threadid, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags) +{ + return _call(ADDHANDLE, (int)1, threadid, handle, func, user_data, id, flags); +} + +inline void api_threadpool::RemoveHandle(ThreadID *threadid, HANDLE handle) +{ + _voidcall(REMOVEHANDLE, threadid, handle); +} + +inline int api_threadpool::RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags) +{ + return _call(RUNFUNCTION, (int)1, threadid, func, user_data, id, flags); +} + +inline size_t api_threadpool::GetNumberOfThreads() +{ + return _call(GETNUMBEROFTHREADS, (size_t)0); +} + +inline size_t api_threadpool::GetNumberOfActiveThreads() +{ + return _call(GETNUMBEROFACTIVETHREADS, (size_t)0); +} + +// {4DE015D3-11D8-4ac6-A3E6-216DF5252107} +static const GUID ThreadPoolGUID = +{ 0x4de015d3, 0x11d8, 0x4ac6, { 0xa3, 0xe6, 0x21, 0x6d, 0xf5, 0x25, 0x21, 0x7 } }; + diff --git a/Src/nu/threadpool/threadpool.sln b/Src/nu/threadpool/threadpool.sln new file mode 100644 index 00000000..790cf5e9 --- /dev/null +++ b/Src/nu/threadpool/threadpool.sln @@ -0,0 +1,23 @@ +Microsoft Visual Studio Solution File, Format Version 8.00 +Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "threadpool", "threadpool.vcproj", "{12CC5DA2-87FF-456A-AF20-A243F168EFE8}" + ProjectSection(ProjectDependencies) = postProject + EndProjectSection +EndProject +Global + GlobalSection(SolutionConfiguration) = preSolution + Debug = Debug + Release = Release + EndGlobalSection + GlobalSection(ProjectDependencies) = postSolution + EndGlobalSection + GlobalSection(ProjectConfiguration) = postSolution + {12CC5DA2-87FF-456A-AF20-A243F168EFE8}.Debug.ActiveCfg = Debug|Win32 + {12CC5DA2-87FF-456A-AF20-A243F168EFE8}.Debug.Build.0 = Debug|Win32 + {12CC5DA2-87FF-456A-AF20-A243F168EFE8}.Release.ActiveCfg = Release|Win32 + {12CC5DA2-87FF-456A-AF20-A243F168EFE8}.Release.Build.0 = Release|Win32 + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + EndGlobalSection + GlobalSection(ExtensibilityAddIns) = postSolution + EndGlobalSection +EndGlobal diff --git a/Src/nu/threadpool/threadpool.vcproj b/Src/nu/threadpool/threadpool.vcproj new file mode 100644 index 00000000..c526cdaa --- /dev/null +++ b/Src/nu/threadpool/threadpool.vcproj @@ -0,0 +1,137 @@ +<?xml version="1.0" encoding="Windows-1252"?> +<VisualStudioProject + ProjectType="Visual C++" + Version="7.10" + Name="threadpool" + ProjectGUID="{12CC5DA2-87FF-456A-AF20-A243F168EFE8}" + Keyword="Win32Proj"> + <Platforms> + <Platform + Name="Win32"/> + </Platforms> + <Configurations> + <Configuration + Name="Debug|Win32" + OutputDirectory="Debug" + IntermediateDirectory="Debug" + ConfigurationType="4" + CharacterSet="2"> + <Tool + Name="VCCLCompilerTool" + Optimization="0" + AdditionalIncludeDirectories="../../Wasabi" + PreprocessorDefinitions="WIN32;_DEBUG;_LIB;_WIN32_WINNT=0x400" + MinimalRebuild="TRUE" + BasicRuntimeChecks="3" + RuntimeLibrary="5" + UsePrecompiledHeader="0" + WarningLevel="3" + Detect64BitPortabilityProblems="TRUE" + DebugInformationFormat="4"/> + <Tool + Name="VCCustomBuildTool"/> + <Tool + Name="VCLibrarianTool" + OutputFile="$(OutDir)/threadpool.lib"/> + <Tool + Name="VCMIDLTool"/> + <Tool + Name="VCPostBuildEventTool"/> + <Tool + Name="VCPreBuildEventTool"/> + <Tool + Name="VCPreLinkEventTool"/> + <Tool + Name="VCResourceCompilerTool"/> + <Tool + Name="VCWebServiceProxyGeneratorTool"/> + <Tool + Name="VCXMLDataGeneratorTool"/> + <Tool + Name="VCManagedWrapperGeneratorTool"/> + <Tool + Name="VCAuxiliaryManagedWrapperGeneratorTool"/> + </Configuration> + <Configuration + Name="Release|Win32" + OutputDirectory="Release" + IntermediateDirectory="Release" + ConfigurationType="4" + CharacterSet="2"> + <Tool + Name="VCCLCompilerTool" + AdditionalIncludeDirectories="../../Wasabi" + PreprocessorDefinitions="WIN32;NDEBUG;_LIB;_WIN32_WINNT=0x400" + RuntimeLibrary="4" + UsePrecompiledHeader="0" + WarningLevel="3" + Detect64BitPortabilityProblems="TRUE" + DebugInformationFormat="3"/> + <Tool + Name="VCCustomBuildTool"/> + <Tool + Name="VCLibrarianTool" + OutputFile="$(OutDir)/threadpool.lib"/> + <Tool + Name="VCMIDLTool"/> + <Tool + Name="VCPostBuildEventTool"/> + <Tool + Name="VCPreBuildEventTool"/> + <Tool + Name="VCPreLinkEventTool"/> + <Tool + Name="VCResourceCompilerTool"/> + <Tool + Name="VCWebServiceProxyGeneratorTool"/> + <Tool + Name="VCXMLDataGeneratorTool"/> + <Tool + Name="VCManagedWrapperGeneratorTool"/> + <Tool + Name="VCAuxiliaryManagedWrapperGeneratorTool"/> + </Configuration> + </Configurations> + <References> + </References> + <Files> + <Filter + Name="Source Files" + Filter="cpp;c;cxx;def;odl;idl;hpj;bat;asm;asmx" + UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}"> + <File + RelativePath=".\ThreadFunctions.cpp"> + </File> + <File + RelativePath=".\ThreadID.cpp"> + </File> + <File + RelativePath=".\ThreadPool.cpp"> + </File> + </Filter> + <Filter + Name="Header Files" + Filter="h;hpp;hxx;hm;inl;inc;xsd" + UniqueIdentifier="{93995380-89BD-4b04-88EB-625FBE52EBFB}"> + <File + RelativePath=".\api_threadpool.h"> + </File> + <File + RelativePath=".\ThreadFunctions.h"> + </File> + <File + RelativePath=".\ThreadID.h"> + </File> + <File + RelativePath=".\ThreadPool.h"> + </File> + </Filter> + <Filter + Name="Resource Files" + Filter="rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx" + UniqueIdentifier="{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}"> + </Filter> + </Files> + <Globals> + </Globals> +</VisualStudioProject> diff --git a/Src/nu/threadpool/threadpool_types.h b/Src/nu/threadpool/threadpool_types.h new file mode 100644 index 00000000..9a23e833 --- /dev/null +++ b/Src/nu/threadpool/threadpool_types.h @@ -0,0 +1,8 @@ +#pragma once +#include <deque> +#include <windows.h> +namespace ThreadPoolTypes +{ + typedef std::deque<HANDLE> HandleList; + const int MAX_SEMAPHORE_VALUE = 1024; //some arbitrarily high amount* +}
\ No newline at end of file |