diff options
Diffstat (limited to 'Src/nu/threadpool/ThreadPool.cpp')
-rw-r--r-- | Src/nu/threadpool/ThreadPool.cpp | 365 |
1 files changed, 365 insertions, 0 deletions
diff --git a/Src/nu/threadpool/ThreadPool.cpp b/Src/nu/threadpool/ThreadPool.cpp new file mode 100644 index 00000000..950df5d1 --- /dev/null +++ b/Src/nu/threadpool/ThreadPool.cpp @@ -0,0 +1,365 @@ +#include "ThreadPool.h" + +ThreadPool::ThreadPool() +{ + for ( int i = 0; i < THREAD_TYPES; i++ ) + { + num_threads_available[ i ] = 0; + max_load_event[ i ] = CreateEvent( NULL, TRUE, FALSE, NULL ); + } + + killswitch = CreateEvent( NULL, TRUE, FALSE, NULL ); + + // one thread of each type to start + for ( int i = 0; i < 2; i++ ) + CreateNewThread_Internal( i ); + + watchdog_thread_handle = CreateThread( 0, 0, WatchDogThreadProcedure_stub, this, 0, 0 ); +} + +void ThreadPool::Kill() +{ + SetEvent( killswitch ); + WaitForSingleObject( watchdog_thread_handle, INFINITE ); + CloseHandle( watchdog_thread_handle ); + + for ( ThreadID *l_thread : threads ) + { + l_thread->Kill(); + delete l_thread; + } + + CloseHandle( killswitch ); + + for ( int i = 0; i < THREAD_TYPES; i++ ) + CloseHandle( max_load_event[ i ] ); +} + +DWORD ThreadPool::WatchDogThreadProcedure_stub( LPVOID param ) +{ + ThreadPool *_this = (ThreadPool *)param; + return _this->WatchDogThreadProcedure(); +} + + +/* +watchdog will get woken up when number of available threads hits zero +it creates a new thread, sleeps for a bit to let things "settle" and then reset the event +it will need a copy of all "any-thread" handles to build the new thread, and will need to manage in a thread safe way +(so a new thread doesn't "miss" a handle that is added in the interim) +*/ +DWORD CALLBACK ThreadPool::WatchDogThreadProcedure() +{ + // we ignore the max load event for reserved threads + HANDLE events[ 3 ] = { killswitch, max_load_event[ TYPE_MT ], max_load_event[ TYPE_STA ] }; + + while ( 1 ) + { + DWORD ret = WaitForMultipleObjects( 3, events, FALSE, INFINITE ); + if ( ret == WAIT_OBJECT_0 ) + { + break; + } + else if ( ret == WAIT_OBJECT_0 + 1 || ret == WAIT_OBJECT_0 + 2 ) + { + int thread_type = ret - ( WAIT_OBJECT_0 + 1 ); + // this signal is for "full thread load reached" + + // lets make sure we're actually at max capacity + Sleep( 10 ); // sleep a bit + if ( num_threads_available[ thread_type ] != 0 ) // see if we're still fully-loaded + continue; + + guard.Lock(); + CreateNewThread_Internal( thread_type ); + guard.Unlock(); + + Sleep( 250 ); // give the system time to 'settle down' so we don't spawn a ton of threads in a row + + ResetEvent( max_load_event[ thread_type ] ); + } + } + + return 0; +} + +ThreadID *ThreadPool::ReserveThread( int flags ) +{ + + // first, check to see if there's any released threads we can grab + Nullsoft::Utility::AutoLock threadlock( guard ); + for ( ThreadID *t : threads ) + { + if ( t->IsReserved() && t->IsReleased() && t->CanRunCOM( flags ) ) + { + t->Reserve(); + return t; + } + } + + // TODO: if there are enough free threads available, mark one as reserved +// this will involve signalling the thread to switch to 'reserved' mode +// swapping out the 'function list' semaphore with a local one +// and removing all 'busy handles' from the queue +// can probably use the 'wake' handle to synchronize this + +/* +int thread_type = GetThreadType(flags); +if (num_threads_available[thread_type > 2]) +{ + for (size_t i=0;i!=threads.size();i++) + { + if (threads[i]->IsReserved() == false && threads[i]->CanRunCOM(flags)) + { + + } + } +} +*/ + + ThreadID *new_thread = CreateNewThread_Internal( GetThreadType( flags, 1 ) ); + + return new_thread; +} + +void ThreadPool::ReleaseThread( ThreadID *thread_id ) +{ + if ( thread_id ) + { + // lock so there's no race condition between ReserveThread() and ReleaseThread() + Nullsoft::Utility::AutoLock threadlock( guard ); + thread_id->Release(); + } +} + + +int ThreadPool::AddHandle( ThreadID *thread_id, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags ) +{ + // TODO: need to ensure that handles are not duplicated + thread_functions.Add( handle, func, user_data, id ); + + if ( thread_id ) + { + if ( thread_id->CanRunCOM( flags ) ) + thread_id->AddHandle( handle ); + else + return 1; + return 0; + } + else + { + /* increment thread counts temporarily - because the massive wake-up + causes thread counts to go to 0 */ + for ( int i = 0; i < THREAD_TYPES; i++ ) + InterlockedIncrement( &num_threads_available[ i ] ); + + guard.Lock(); + + AddHandle_Internal( 0, handle, flags ); + + bool thread_types[ THREAD_TYPES ]; + GetThreadTypes( flags, thread_types ); + + for ( int i = 0; i < THREAD_TYPES; i++ ) + { + if ( thread_types[ i ] ) + any_thread_handles[ i ].push_back( handle ); + } + + guard.Unlock(); + + for ( int i = 0; i < THREAD_TYPES; i++ ) + InterlockedDecrement( &num_threads_available[ i ] ); + + + } + + return 0; +} + +/* helper functions for adding/removing handles, +we keep going through the list as long as we can add/remove immediately. +once we have to block, we recurse the function starting at the next handle +when the function returns, we wait. +this lets us do some work rather than sit and wait for each thread's lock */ +void ThreadPool::RemoveHandle_Internal(size_t start, HANDLE handle) +{ + for (;start!=threads.size();start++) + { + ThreadID *t = threads[start]; + if (!t->TryRemoveHandle(handle)) // try to remove + { + // have to wait + RemoveHandle_Internal(start+1, handle); // recurse start with the next thread + t->WaitRemoveHandle(handle); // finish the job + return; + } + } +} + +void ThreadPool::AddHandle_Internal(size_t start, HANDLE handle, int flags) +{ + for (;start<threads.size();start++) + { + ThreadID *t = threads[start]; + if ((flags & api_threadpool::FLAG_LONG_EXECUTION) && t->IsReserved()) + continue; + + if (!t->CanRunCOM(flags)) + continue; + + if (!t->TryAddHandle(handle)) // try to add + { + // have to wait, + AddHandle_Internal(start+1, handle, flags); // recurse start with the next thread + t->WaitAddHandle(handle); // finish the job + return; + } + } +} + +void ThreadPool::RemoveHandle(ThreadID *thread_id, HANDLE handle) +{ + if (thread_id) + { + thread_id->RemoveHandle(handle); + } + else + { + /* increment thread counts temporarily - because the massive wake-up + causes thread counts to go to 0 */ + for (int i=0;i<THREAD_TYPES;i++) + InterlockedIncrement(&num_threads_available[i]); + guard.Lock(); + RemoveHandle_Internal(0, handle); + + for (int j=0;j<THREAD_TYPES;j++) + { + //for (ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin(); + // itr != any_thread_handles[j].end(); + // itr++) + + ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin(); + while(itr != any_thread_handles[j].end()) + { + if (*itr == handle) + { + itr = any_thread_handles[j].erase(itr); + } + else + { + itr++; + } + } + } + guard.Unlock(); + for (int i=0;i<THREAD_TYPES;i++) + InterlockedDecrement(&num_threads_available[i]); + } +} + +int ThreadPool::RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags) +{ + if (threadid) + threadid->QueueFunction(func, user_data, id); + else + thread_functions.QueueFunction(func, user_data, id); + return 0; +} + +ThreadID *ThreadPool::CreateNewThread_Internal(int thread_type) +{ + int reserved=0; + int com_type = api_threadpool::FLAG_REQUIRE_COM_MT; // default + switch(thread_type) + { + case TYPE_STA_RESERVED: + reserved=1; + case TYPE_STA: + com_type = api_threadpool::FLAG_REQUIRE_COM_STA; + break; + case TYPE_MT_RESERVED: + reserved=1; + case TYPE_MT: + com_type = api_threadpool::FLAG_REQUIRE_COM_MT; + break; + } + + Nullsoft::Utility::AutoLock threadlock(guard); // lock here (rather than after new ThreadID) to protect any_thread_handles + ThreadID *new_thread = new ThreadID(&thread_functions, killswitch, thread_functions.functions_semaphore, + any_thread_handles[thread_type], + &num_threads_available[thread_type], max_load_event[thread_type], + reserved, com_type); + threads.push_back(new_thread); + return new_thread; +} + +size_t ThreadPool::GetNumberOfThreads() +{ + Nullsoft::Utility::AutoLock threadlock(guard); + return threads.size(); +} + +size_t ThreadPool::GetNumberOfActiveThreads() +{ + size_t numThreads = GetNumberOfThreads(); + for (int i=0;i<THREAD_TYPES;i++) + numThreads -= num_threads_available[i]; + return numThreads; +} + +int ThreadPool::GetThreadType(int flags, int reserved) +{ + flags &= api_threadpool::MASK_COM_FLAGS; + int thread_type=TYPE_MT; + switch(flags) + { + case api_threadpool::FLAG_REQUIRE_COM_STA: + thread_type = reserved?TYPE_STA_RESERVED:TYPE_STA; + break; + case 0: // default + case api_threadpool::FLAG_REQUIRE_COM_MT: + thread_type = reserved?TYPE_MT_RESERVED:TYPE_MT; + break; + } + + return thread_type; +} + +void ThreadPool::GetThreadTypes(int flags, bool types[THREAD_TYPES]) +{ + for (int i=0;i<THREAD_TYPES;i++) + { + types[i]=true; + } + + if (flags & api_threadpool::FLAG_REQUIRE_COM_STA) + { + types[TYPE_MT] = false; + types[TYPE_MT] = false; + } + + if (flags & api_threadpool::FLAG_REQUIRE_COM_STA) + { + types[TYPE_STA] = false; + types[TYPE_STA_RESERVED] = false; + } + + if (flags & api_threadpool::FLAG_LONG_EXECUTION) + { + types[TYPE_STA_RESERVED] = false; + types[TYPE_MT_RESERVED] = false; + } + +} +#define CBCLASS ThreadPool +START_DISPATCH; +CB(RESERVETHREAD, ReserveThread) +VCB(RELEASETHREAD, ReleaseThread) +CB(ADDHANDLE, AddHandle) +VCB(REMOVEHANDLE, RemoveHandle) +CB(RUNFUNCTION, RunFunction) +CB(GETNUMBEROFTHREADS, GetNumberOfThreads) +CB(GETNUMBEROFACTIVETHREADS, GetNumberOfActiveThreads) +END_DISPATCH; +#undef CBCLASS |