aboutsummaryrefslogtreecommitdiff
path: root/Src/nu/threadpool/ThreadPool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Src/nu/threadpool/ThreadPool.cpp')
-rw-r--r--Src/nu/threadpool/ThreadPool.cpp365
1 files changed, 365 insertions, 0 deletions
diff --git a/Src/nu/threadpool/ThreadPool.cpp b/Src/nu/threadpool/ThreadPool.cpp
new file mode 100644
index 00000000..950df5d1
--- /dev/null
+++ b/Src/nu/threadpool/ThreadPool.cpp
@@ -0,0 +1,365 @@
+#include "ThreadPool.h"
+
+ThreadPool::ThreadPool()
+{
+ for ( int i = 0; i < THREAD_TYPES; i++ )
+ {
+ num_threads_available[ i ] = 0;
+ max_load_event[ i ] = CreateEvent( NULL, TRUE, FALSE, NULL );
+ }
+
+ killswitch = CreateEvent( NULL, TRUE, FALSE, NULL );
+
+ // one thread of each type to start
+ for ( int i = 0; i < 2; i++ )
+ CreateNewThread_Internal( i );
+
+ watchdog_thread_handle = CreateThread( 0, 0, WatchDogThreadProcedure_stub, this, 0, 0 );
+}
+
+void ThreadPool::Kill()
+{
+ SetEvent( killswitch );
+ WaitForSingleObject( watchdog_thread_handle, INFINITE );
+ CloseHandle( watchdog_thread_handle );
+
+ for ( ThreadID *l_thread : threads )
+ {
+ l_thread->Kill();
+ delete l_thread;
+ }
+
+ CloseHandle( killswitch );
+
+ for ( int i = 0; i < THREAD_TYPES; i++ )
+ CloseHandle( max_load_event[ i ] );
+}
+
+DWORD ThreadPool::WatchDogThreadProcedure_stub( LPVOID param )
+{
+ ThreadPool *_this = (ThreadPool *)param;
+ return _this->WatchDogThreadProcedure();
+}
+
+
+/*
+watchdog will get woken up when number of available threads hits zero
+it creates a new thread, sleeps for a bit to let things "settle" and then reset the event
+it will need a copy of all "any-thread" handles to build the new thread, and will need to manage in a thread safe way
+(so a new thread doesn't "miss" a handle that is added in the interim)
+*/
+DWORD CALLBACK ThreadPool::WatchDogThreadProcedure()
+{
+ // we ignore the max load event for reserved threads
+ HANDLE events[ 3 ] = { killswitch, max_load_event[ TYPE_MT ], max_load_event[ TYPE_STA ] };
+
+ while ( 1 )
+ {
+ DWORD ret = WaitForMultipleObjects( 3, events, FALSE, INFINITE );
+ if ( ret == WAIT_OBJECT_0 )
+ {
+ break;
+ }
+ else if ( ret == WAIT_OBJECT_0 + 1 || ret == WAIT_OBJECT_0 + 2 )
+ {
+ int thread_type = ret - ( WAIT_OBJECT_0 + 1 );
+ // this signal is for "full thread load reached"
+
+ // lets make sure we're actually at max capacity
+ Sleep( 10 ); // sleep a bit
+ if ( num_threads_available[ thread_type ] != 0 ) // see if we're still fully-loaded
+ continue;
+
+ guard.Lock();
+ CreateNewThread_Internal( thread_type );
+ guard.Unlock();
+
+ Sleep( 250 ); // give the system time to 'settle down' so we don't spawn a ton of threads in a row
+
+ ResetEvent( max_load_event[ thread_type ] );
+ }
+ }
+
+ return 0;
+}
+
+ThreadID *ThreadPool::ReserveThread( int flags )
+{
+
+ // first, check to see if there's any released threads we can grab
+ Nullsoft::Utility::AutoLock threadlock( guard );
+ for ( ThreadID *t : threads )
+ {
+ if ( t->IsReserved() && t->IsReleased() && t->CanRunCOM( flags ) )
+ {
+ t->Reserve();
+ return t;
+ }
+ }
+
+ // TODO: if there are enough free threads available, mark one as reserved
+// this will involve signalling the thread to switch to 'reserved' mode
+// swapping out the 'function list' semaphore with a local one
+// and removing all 'busy handles' from the queue
+// can probably use the 'wake' handle to synchronize this
+
+/*
+int thread_type = GetThreadType(flags);
+if (num_threads_available[thread_type > 2])
+{
+ for (size_t i=0;i!=threads.size();i++)
+ {
+ if (threads[i]->IsReserved() == false && threads[i]->CanRunCOM(flags))
+ {
+
+ }
+ }
+}
+*/
+
+ ThreadID *new_thread = CreateNewThread_Internal( GetThreadType( flags, 1 ) );
+
+ return new_thread;
+}
+
+void ThreadPool::ReleaseThread( ThreadID *thread_id )
+{
+ if ( thread_id )
+ {
+ // lock so there's no race condition between ReserveThread() and ReleaseThread()
+ Nullsoft::Utility::AutoLock threadlock( guard );
+ thread_id->Release();
+ }
+}
+
+
+int ThreadPool::AddHandle( ThreadID *thread_id, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags )
+{
+ // TODO: need to ensure that handles are not duplicated
+ thread_functions.Add( handle, func, user_data, id );
+
+ if ( thread_id )
+ {
+ if ( thread_id->CanRunCOM( flags ) )
+ thread_id->AddHandle( handle );
+ else
+ return 1;
+ return 0;
+ }
+ else
+ {
+ /* increment thread counts temporarily - because the massive wake-up
+ causes thread counts to go to 0 */
+ for ( int i = 0; i < THREAD_TYPES; i++ )
+ InterlockedIncrement( &num_threads_available[ i ] );
+
+ guard.Lock();
+
+ AddHandle_Internal( 0, handle, flags );
+
+ bool thread_types[ THREAD_TYPES ];
+ GetThreadTypes( flags, thread_types );
+
+ for ( int i = 0; i < THREAD_TYPES; i++ )
+ {
+ if ( thread_types[ i ] )
+ any_thread_handles[ i ].push_back( handle );
+ }
+
+ guard.Unlock();
+
+ for ( int i = 0; i < THREAD_TYPES; i++ )
+ InterlockedDecrement( &num_threads_available[ i ] );
+
+
+ }
+
+ return 0;
+}
+
+/* helper functions for adding/removing handles,
+we keep going through the list as long as we can add/remove immediately.
+once we have to block, we recurse the function starting at the next handle
+when the function returns, we wait.
+this lets us do some work rather than sit and wait for each thread's lock */
+void ThreadPool::RemoveHandle_Internal(size_t start, HANDLE handle)
+{
+ for (;start!=threads.size();start++)
+ {
+ ThreadID *t = threads[start];
+ if (!t->TryRemoveHandle(handle)) // try to remove
+ {
+ // have to wait
+ RemoveHandle_Internal(start+1, handle); // recurse start with the next thread
+ t->WaitRemoveHandle(handle); // finish the job
+ return;
+ }
+ }
+}
+
+void ThreadPool::AddHandle_Internal(size_t start, HANDLE handle, int flags)
+{
+ for (;start<threads.size();start++)
+ {
+ ThreadID *t = threads[start];
+ if ((flags & api_threadpool::FLAG_LONG_EXECUTION) && t->IsReserved())
+ continue;
+
+ if (!t->CanRunCOM(flags))
+ continue;
+
+ if (!t->TryAddHandle(handle)) // try to add
+ {
+ // have to wait,
+ AddHandle_Internal(start+1, handle, flags); // recurse start with the next thread
+ t->WaitAddHandle(handle); // finish the job
+ return;
+ }
+ }
+}
+
+void ThreadPool::RemoveHandle(ThreadID *thread_id, HANDLE handle)
+{
+ if (thread_id)
+ {
+ thread_id->RemoveHandle(handle);
+ }
+ else
+ {
+ /* increment thread counts temporarily - because the massive wake-up
+ causes thread counts to go to 0 */
+ for (int i=0;i<THREAD_TYPES;i++)
+ InterlockedIncrement(&num_threads_available[i]);
+ guard.Lock();
+ RemoveHandle_Internal(0, handle);
+
+ for (int j=0;j<THREAD_TYPES;j++)
+ {
+ //for (ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin();
+ // itr != any_thread_handles[j].end();
+ // itr++)
+
+ ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin();
+ while(itr != any_thread_handles[j].end())
+ {
+ if (*itr == handle)
+ {
+ itr = any_thread_handles[j].erase(itr);
+ }
+ else
+ {
+ itr++;
+ }
+ }
+ }
+ guard.Unlock();
+ for (int i=0;i<THREAD_TYPES;i++)
+ InterlockedDecrement(&num_threads_available[i]);
+ }
+}
+
+int ThreadPool::RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags)
+{
+ if (threadid)
+ threadid->QueueFunction(func, user_data, id);
+ else
+ thread_functions.QueueFunction(func, user_data, id);
+ return 0;
+}
+
+ThreadID *ThreadPool::CreateNewThread_Internal(int thread_type)
+{
+ int reserved=0;
+ int com_type = api_threadpool::FLAG_REQUIRE_COM_MT; // default
+ switch(thread_type)
+ {
+ case TYPE_STA_RESERVED:
+ reserved=1;
+ case TYPE_STA:
+ com_type = api_threadpool::FLAG_REQUIRE_COM_STA;
+ break;
+ case TYPE_MT_RESERVED:
+ reserved=1;
+ case TYPE_MT:
+ com_type = api_threadpool::FLAG_REQUIRE_COM_MT;
+ break;
+ }
+
+ Nullsoft::Utility::AutoLock threadlock(guard); // lock here (rather than after new ThreadID) to protect any_thread_handles
+ ThreadID *new_thread = new ThreadID(&thread_functions, killswitch, thread_functions.functions_semaphore,
+ any_thread_handles[thread_type],
+ &num_threads_available[thread_type], max_load_event[thread_type],
+ reserved, com_type);
+ threads.push_back(new_thread);
+ return new_thread;
+}
+
+size_t ThreadPool::GetNumberOfThreads()
+{
+ Nullsoft::Utility::AutoLock threadlock(guard);
+ return threads.size();
+}
+
+size_t ThreadPool::GetNumberOfActiveThreads()
+{
+ size_t numThreads = GetNumberOfThreads();
+ for (int i=0;i<THREAD_TYPES;i++)
+ numThreads -= num_threads_available[i];
+ return numThreads;
+}
+
+int ThreadPool::GetThreadType(int flags, int reserved)
+{
+ flags &= api_threadpool::MASK_COM_FLAGS;
+ int thread_type=TYPE_MT;
+ switch(flags)
+ {
+ case api_threadpool::FLAG_REQUIRE_COM_STA:
+ thread_type = reserved?TYPE_STA_RESERVED:TYPE_STA;
+ break;
+ case 0: // default
+ case api_threadpool::FLAG_REQUIRE_COM_MT:
+ thread_type = reserved?TYPE_MT_RESERVED:TYPE_MT;
+ break;
+ }
+
+ return thread_type;
+}
+
+void ThreadPool::GetThreadTypes(int flags, bool types[THREAD_TYPES])
+{
+ for (int i=0;i<THREAD_TYPES;i++)
+ {
+ types[i]=true;
+ }
+
+ if (flags & api_threadpool::FLAG_REQUIRE_COM_STA)
+ {
+ types[TYPE_MT] = false;
+ types[TYPE_MT] = false;
+ }
+
+ if (flags & api_threadpool::FLAG_REQUIRE_COM_STA)
+ {
+ types[TYPE_STA] = false;
+ types[TYPE_STA_RESERVED] = false;
+ }
+
+ if (flags & api_threadpool::FLAG_LONG_EXECUTION)
+ {
+ types[TYPE_STA_RESERVED] = false;
+ types[TYPE_MT_RESERVED] = false;
+ }
+
+}
+#define CBCLASS ThreadPool
+START_DISPATCH;
+CB(RESERVETHREAD, ReserveThread)
+VCB(RELEASETHREAD, ReleaseThread)
+CB(ADDHANDLE, AddHandle)
+VCB(REMOVEHANDLE, RemoveHandle)
+CB(RUNFUNCTION, RunFunction)
+CB(GETNUMBEROFTHREADS, GetNumberOfThreads)
+CB(GETNUMBEROFACTIVETHREADS, GetNumberOfActiveThreads)
+END_DISPATCH;
+#undef CBCLASS