aboutsummaryrefslogtreecommitdiff
path: root/Src/nu/threadpool
diff options
context:
space:
mode:
Diffstat (limited to 'Src/nu/threadpool')
-rw-r--r--Src/nu/threadpool/ThreadFunctions.cpp79
-rw-r--r--Src/nu/threadpool/ThreadFunctions.h31
-rw-r--r--Src/nu/threadpool/ThreadID.cpp274
-rw-r--r--Src/nu/threadpool/ThreadID.h56
-rw-r--r--Src/nu/threadpool/ThreadPool.cpp365
-rw-r--r--Src/nu/threadpool/ThreadPool.h98
-rw-r--r--Src/nu/threadpool/TimerHandle.hpp54
-rw-r--r--Src/nu/threadpool/api_threadpool.h92
-rw-r--r--Src/nu/threadpool/threadpool.sln23
-rw-r--r--Src/nu/threadpool/threadpool.vcproj137
-rw-r--r--Src/nu/threadpool/threadpool_types.h8
11 files changed, 1217 insertions, 0 deletions
diff --git a/Src/nu/threadpool/ThreadFunctions.cpp b/Src/nu/threadpool/ThreadFunctions.cpp
new file mode 100644
index 00000000..0ae23730
--- /dev/null
+++ b/Src/nu/threadpool/ThreadFunctions.cpp
@@ -0,0 +1,79 @@
+#include "ThreadFunctions.h"
+#include "threadpool_types.h"
+
+ThreadFunctions::ThreadFunctions(int create_function_list)
+{
+ if (create_function_list)
+ {
+ functions_semaphore = CreateSemaphore(0, 0, ThreadPoolTypes::MAX_SEMAPHORE_VALUE, 0);
+ InitializeCriticalSectionAndSpinCount(&functions_guard, 200);
+ }
+ else
+ functions_semaphore = 0;
+}
+
+ThreadFunctions::~ThreadFunctions()
+{
+ if (functions_semaphore)
+ {
+ CloseHandle(functions_semaphore);
+ DeleteCriticalSection(&functions_guard);
+ }
+}
+
+void ThreadFunctions::Add(HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id)
+{
+ Nullsoft::Utility::AutoLock l(guard);
+ Data *new_data = (Data *)calloc(1, sizeof(Data));
+ new_data->func = func;
+ new_data->user_data = user_data;
+ new_data->id = id;
+ data[handle] = new_data;
+}
+
+bool ThreadFunctions::Get(HANDLE handle, api_threadpool::ThreadPoolFunc *func, void **user_data, intptr_t *id)
+{
+ Nullsoft::Utility::AutoLock l(guard);
+ DataMap::iterator found = data.find(handle);
+ if (found == data.end())
+ return false;
+
+ const Data *d = found->second;
+ *func = d->func;
+ *user_data = d->user_data;
+ *id = d->id;
+ return true;
+}
+
+void ThreadFunctions::QueueFunction(api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id)
+{
+ Data *new_data = (Data *)calloc(1, sizeof(Data));
+ new_data->func = func;
+ new_data->user_data = user_data;
+ new_data->id = id;
+ EnterCriticalSection(&functions_guard);
+ functions_list.push_front(new_data);
+ LeaveCriticalSection(&functions_guard); // unlock before releasing the semaphore early so we don't lock convoy
+ ReleaseSemaphore(functions_semaphore, 1, 0);
+}
+
+bool ThreadFunctions::PopFunction(api_threadpool::ThreadPoolFunc *func, void **user_data, intptr_t *id)
+{
+ EnterCriticalSection(&functions_guard);
+ if (!functions_list.empty())
+ {
+ ThreadFunctions::Data *data = functions_list.back();
+ functions_list.pop_back();
+ LeaveCriticalSection(&functions_guard);
+ *func = data->func;
+ *user_data = data->user_data;
+ *id = data->id;
+ free(data);
+ return true;
+ }
+ else
+ {
+ LeaveCriticalSection(&functions_guard);
+ return false;
+ }
+}
diff --git a/Src/nu/threadpool/ThreadFunctions.h b/Src/nu/threadpool/ThreadFunctions.h
new file mode 100644
index 00000000..bbc5e6c6
--- /dev/null
+++ b/Src/nu/threadpool/ThreadFunctions.h
@@ -0,0 +1,31 @@
+#pragma once
+#include "api_threadpool.h"
+#include <map>
+#include <deque>
+#include "../AutoLock.h"
+
+class ThreadFunctions
+{
+public:
+ struct Data
+ {
+ api_threadpool::ThreadPoolFunc func;
+ void *user_data;
+ intptr_t id;
+ };
+ ThreadFunctions(int create_function_list=1);
+ ~ThreadFunctions();
+ void Add(HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id);
+ bool Get(HANDLE handle, api_threadpool::ThreadPoolFunc *func, void **user_data, intptr_t *id);
+ void QueueFunction(api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id);
+ bool PopFunction(api_threadpool::ThreadPoolFunc *func, void **user_data, intptr_t *id);
+
+ typedef std::map<HANDLE, const ThreadFunctions::Data*> DataMap;
+ DataMap data;
+ Nullsoft::Utility::LockGuard guard;
+
+ typedef std::deque<ThreadFunctions::Data*> FuncList;
+ FuncList functions_list;
+ CRITICAL_SECTION functions_guard;
+ HANDLE functions_semaphore;
+};
diff --git a/Src/nu/threadpool/ThreadID.cpp b/Src/nu/threadpool/ThreadID.cpp
new file mode 100644
index 00000000..28d6f1ca
--- /dev/null
+++ b/Src/nu/threadpool/ThreadID.cpp
@@ -0,0 +1,274 @@
+#include "ThreadID.h"
+
+DWORD ThreadID::thread_func_stub(LPVOID param)
+{
+ ThreadID *t = static_cast<ThreadID*>(param);
+ if (t != NULL)
+ {
+ return t->ThreadFunction();
+ }
+ else return 0;
+}
+
+void ThreadID::Kill()
+{
+ if (threadHandle && threadHandle != INVALID_HANDLE_VALUE)
+ {
+ //cut: WaitForSingleObject(threadHandle, INFINITE);
+ while (WaitForMultipleObjectsEx(1, &threadHandle, FALSE, INFINITE, TRUE) != WAIT_OBJECT_0)
+ {
+ }
+ }
+
+}
+
+ThreadID::ThreadID(ThreadFunctions *t_f, HANDLE killswitch, HANDLE global_functions_semaphore,
+ ThreadPoolTypes::HandleList &inherited_handles,
+ volatile LONG *thread_count, HANDLE _max_load_event,
+ int _reserved, int _com_type) : ThreadFunctions(_reserved)
+{
+ /* initialize values */
+ released = false;
+ InitializeCriticalSection(&handle_lock);
+
+ /* grab values passed to us */
+ reserved = _reserved;
+ com_type = _com_type;
+ max_load_event = _max_load_event;
+ global_functions = t_f;
+ num_threads_available = thread_count;
+
+ /* wait_handles[0] is kill switch */
+ wait_handles.push_back(killswitch);
+
+ /* wait_handles[1] is wake switch */
+ wakeHandle = CreateSemaphore(0, 0, ThreadPoolTypes::MAX_SEMAPHORE_VALUE, 0);
+ wait_handles.push_back(wakeHandle);
+
+ if (reserved)
+ {
+ /* if thread is reserved,
+ wait_handles[2] is a Funcion Call wake semaphore
+ for this thread only. */
+ wait_handles.push_back(functions_semaphore); // WAIT_OBJECT_0+1 == per-thread queued functions
+ }
+ else
+ {
+ /* if thread is not reserved,
+ wait_handles[2] is a Function Call wake semaphore
+ global to all threads */
+ wait_handles.push_back(global_functions_semaphore); // WAIT_OBJECT_0+2 == any-thread queued functions
+ }
+
+ /* add inherited handles
+ (handles added to thread pool before this thread was created) */
+ for ( ThreadPoolTypes::HandleList::iterator itr = inherited_handles.begin(); itr != inherited_handles.end(); itr++ )
+ {
+ wait_handles.push_back( *itr );
+ }
+
+ /* start thread */
+ threadHandle = CreateThread(0, 0, thread_func_stub, this, 0, 0);
+}
+
+ThreadID::~ThreadID()
+{
+ CloseHandle(threadHandle);
+ CloseHandle(wakeHandle);
+ DeleteCriticalSection(&handle_lock);
+}
+
+bool ThreadID::TryAddHandle(HANDLE new_handle)
+{
+ // let's see if we get lucky and can access the handle list directly
+ if (TryEnterCriticalSection(&handle_lock))
+ {
+ // made it
+ wait_handles.push_back(new_handle);
+ LeaveCriticalSection(&handle_lock);
+ return true;
+ }
+ else
+ {
+ ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple...
+ return false;
+ }
+}
+
+void ThreadID::WaitAddHandle(HANDLE handle)
+{
+ // wakeHandle already got released once by nature of this function being called
+ EnterCriticalSection(&handle_lock);
+ wait_handles.push_back(handle);
+ LeaveCriticalSection(&handle_lock);
+ ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait
+}
+
+void ThreadID::AddHandle(HANDLE new_handle)
+{
+ if (!TryAddHandle(new_handle))
+ WaitAddHandle(new_handle);
+}
+
+bool ThreadID::TryRemoveHandle(HANDLE handle)
+{
+ // let's see if we get lucky and can access the handle list directly
+ if (TryEnterCriticalSection(&handle_lock))
+ {
+ RemoveHandle_Internal(handle);
+ LeaveCriticalSection(&handle_lock);
+ return true;
+ }
+ else
+ {
+ ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple...
+ return false;
+ }
+ return false;
+}
+
+void ThreadID::WaitRemoveHandle(HANDLE handle)
+{
+ // wakeHandle already got released once by nature of this function being called
+ EnterCriticalSection(&handle_lock);
+ RemoveHandle_Internal(handle);
+ LeaveCriticalSection(&handle_lock);
+ ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait
+}
+
+void ThreadID::RemoveHandle(HANDLE handle)
+{
+ if (!TryRemoveHandle(handle))
+ WaitRemoveHandle(handle);
+}
+
+void ThreadID::RemoveHandle_Internal(HANDLE handle)
+{
+ // first three handles are reserved, so start after that
+ for (size_t i=3;i<wait_handles.size();i++)
+ {
+ if (wait_handles[i] == handle)
+ {
+ wait_handles.erase(wait_handles.begin() + i);
+ i--;
+ }
+ }
+}
+
+bool ThreadID::IsReserved() const
+{
+ return !!reserved;
+}
+
+DWORD CALLBACK ThreadID::ThreadFunction()
+{
+ switch(com_type)
+ {
+ case api_threadpool::FLAG_REQUIRE_COM_MT:
+ CoInitializeEx(0, COINIT_MULTITHREADED);
+ break;
+ case api_threadpool::FLAG_REQUIRE_COM_STA:
+ CoInitialize(0);
+ break;
+ }
+
+ while (1)
+ {
+ InterlockedIncrement(num_threads_available);
+ EnterCriticalSection(&handle_lock);
+ DWORD ret = WaitForMultipleObjectsEx((DWORD)wait_handles.size(), wait_handles.data(), FALSE, INFINITE, TRUE);
+ // cut: LeaveCriticalSection(&handle_lock);
+ if (InterlockedDecrement(num_threads_available) == 0 && !reserved)
+ SetEvent(max_load_event); // notify the watch dog if all the threads are used up
+
+ if (ret == WAIT_OBJECT_0)
+ {
+ // killswitch
+ LeaveCriticalSection(&handle_lock);
+ break;
+ }
+ else if (ret == WAIT_OBJECT_0 + 1)
+ {
+ // we got woken up to release the handles lock
+ // wait for the second signal
+ LeaveCriticalSection(&handle_lock);
+ InterlockedIncrement(num_threads_available);
+ WaitForSingleObject(wakeHandle, INFINITE);
+ InterlockedDecrement(num_threads_available);
+ }
+ else if (ret == WAIT_OBJECT_0 + 2)
+ {
+ LeaveCriticalSection(&handle_lock);
+ api_threadpool::ThreadPoolFunc func;
+ void *user_data;
+ intptr_t id;
+ if (reserved)
+ {
+ // per-thread queued functions
+ if (PopFunction(&func, &user_data, &id))
+ {
+ func(0, user_data, id);
+ }
+ }
+ else
+ {
+ // global queued functions
+ if (global_functions->PopFunction(&func, &user_data, &id))
+ {
+ func(0, user_data, id);
+ }
+ }
+ }
+ else if (ret > WAIT_OBJECT_0 && ret < (WAIT_OBJECT_0 + wait_handles.size()))
+ {
+ DWORD index = ret - WAIT_OBJECT_0;
+ HANDLE handle = wait_handles[index];
+ LeaveCriticalSection(&handle_lock);
+ /* !!! race condition here if someone calls ThreadPool::RemoveHandle and then CloseHandle() !!!
+ before calling RemoveHandle, caller needs to either
+ ensure that Event is unsignalled (And won't be signalled)
+ or call RemoveHandle from within the function callback */
+ api_threadpool::ThreadPoolFunc func;
+ void *user_data;
+ intptr_t id;
+ if (global_functions->Get(handle, &func, &user_data, &id))
+ {
+ func(handle, user_data, id);
+ }
+ }
+ else
+ {
+ LeaveCriticalSection(&handle_lock);
+ }
+ }
+ if (com_type & api_threadpool::MASK_COM_FLAGS)
+ CoUninitialize();
+ return 0;
+}
+
+bool ThreadID::CanRunCOM(int flags) const
+{
+ switch(com_type)
+ {
+ case api_threadpool::FLAG_REQUIRE_COM_MT: // if we're a CONIT_MULTITHREADEX thread (default)
+ return !(flags & api_threadpool::FLAG_REQUIRE_COM_STA); // don't let STA stuff run
+ case api_threadpool::FLAG_REQUIRE_COM_STA: // if we're a CoInitialize(0) thread
+ return !(flags & api_threadpool::FLAG_REQUIRE_COM_MT); // don't let MT stuff run
+ }
+ return false; // shouldn't get here
+}
+
+bool ThreadID::IsReleased() const
+{
+ return released;
+}
+
+void ThreadID::Reserve()
+{
+ released=false;
+}
+
+void ThreadID::Release()
+{
+ released=true;
+} \ No newline at end of file
diff --git a/Src/nu/threadpool/ThreadID.h b/Src/nu/threadpool/ThreadID.h
new file mode 100644
index 00000000..e5d1e09e
--- /dev/null
+++ b/Src/nu/threadpool/ThreadID.h
@@ -0,0 +1,56 @@
+#pragma once
+#include <windows.h>
+#include "ThreadFunctions.h"
+#include "threadpool_types.h"
+#include <vector>
+
+
+class ThreadID : private ThreadFunctions
+{
+public:
+ static DWORD CALLBACK thread_func_stub(LPVOID param);
+ ThreadID(ThreadFunctions *t_f, HANDLE killswitch, HANDLE global_functions_semaphore, ThreadPoolTypes::HandleList &inherited_handles, volatile LONG *thread_count, HANDLE _max_load_event, int _reserved, int _com_type);
+ ~ThreadID();
+ void Kill();
+
+ /* Try and Wait must be paired!!! */
+ bool TryAddHandle(HANDLE new_handle);
+ void WaitAddHandle(HANDLE new_handle);
+ void AddHandle(HANDLE new_handle);
+
+ /* Try and Wait must be paired!!! */
+ bool TryRemoveHandle(HANDLE handle);
+ void WaitRemoveHandle(HANDLE handle);
+ void RemoveHandle(HANDLE handle);
+
+ using ThreadFunctions::QueueFunction;
+ bool IsReserved() const;
+ bool IsReleased() const;
+ bool CanRunCOM(int flags) const;
+ void Reserve(); // re-reserves a released thread
+ void Release(); // release a reversed thread
+private:
+ void RemoveHandle_Internal(HANDLE handle);
+ DWORD CALLBACK ThreadFunction();
+
+ int reserved;
+ ThreadFunctions *global_functions;
+ volatile LONG *num_threads_available;
+ int com_type;
+ bool released;
+
+ ThreadFunctions local_functions;
+
+ // list of handles we're waiting on
+ typedef std::vector<HANDLE> HandleList;
+ HandleList wait_handles;
+ CRITICAL_SECTION handle_lock;
+
+ // handles we create/own
+ HANDLE threadHandle;
+ HANDLE wakeHandle;
+
+ // handles given to us
+ HANDLE max_load_event;
+
+};
diff --git a/Src/nu/threadpool/ThreadPool.cpp b/Src/nu/threadpool/ThreadPool.cpp
new file mode 100644
index 00000000..950df5d1
--- /dev/null
+++ b/Src/nu/threadpool/ThreadPool.cpp
@@ -0,0 +1,365 @@
+#include "ThreadPool.h"
+
+ThreadPool::ThreadPool()
+{
+ for ( int i = 0; i < THREAD_TYPES; i++ )
+ {
+ num_threads_available[ i ] = 0;
+ max_load_event[ i ] = CreateEvent( NULL, TRUE, FALSE, NULL );
+ }
+
+ killswitch = CreateEvent( NULL, TRUE, FALSE, NULL );
+
+ // one thread of each type to start
+ for ( int i = 0; i < 2; i++ )
+ CreateNewThread_Internal( i );
+
+ watchdog_thread_handle = CreateThread( 0, 0, WatchDogThreadProcedure_stub, this, 0, 0 );
+}
+
+void ThreadPool::Kill()
+{
+ SetEvent( killswitch );
+ WaitForSingleObject( watchdog_thread_handle, INFINITE );
+ CloseHandle( watchdog_thread_handle );
+
+ for ( ThreadID *l_thread : threads )
+ {
+ l_thread->Kill();
+ delete l_thread;
+ }
+
+ CloseHandle( killswitch );
+
+ for ( int i = 0; i < THREAD_TYPES; i++ )
+ CloseHandle( max_load_event[ i ] );
+}
+
+DWORD ThreadPool::WatchDogThreadProcedure_stub( LPVOID param )
+{
+ ThreadPool *_this = (ThreadPool *)param;
+ return _this->WatchDogThreadProcedure();
+}
+
+
+/*
+watchdog will get woken up when number of available threads hits zero
+it creates a new thread, sleeps for a bit to let things "settle" and then reset the event
+it will need a copy of all "any-thread" handles to build the new thread, and will need to manage in a thread safe way
+(so a new thread doesn't "miss" a handle that is added in the interim)
+*/
+DWORD CALLBACK ThreadPool::WatchDogThreadProcedure()
+{
+ // we ignore the max load event for reserved threads
+ HANDLE events[ 3 ] = { killswitch, max_load_event[ TYPE_MT ], max_load_event[ TYPE_STA ] };
+
+ while ( 1 )
+ {
+ DWORD ret = WaitForMultipleObjects( 3, events, FALSE, INFINITE );
+ if ( ret == WAIT_OBJECT_0 )
+ {
+ break;
+ }
+ else if ( ret == WAIT_OBJECT_0 + 1 || ret == WAIT_OBJECT_0 + 2 )
+ {
+ int thread_type = ret - ( WAIT_OBJECT_0 + 1 );
+ // this signal is for "full thread load reached"
+
+ // lets make sure we're actually at max capacity
+ Sleep( 10 ); // sleep a bit
+ if ( num_threads_available[ thread_type ] != 0 ) // see if we're still fully-loaded
+ continue;
+
+ guard.Lock();
+ CreateNewThread_Internal( thread_type );
+ guard.Unlock();
+
+ Sleep( 250 ); // give the system time to 'settle down' so we don't spawn a ton of threads in a row
+
+ ResetEvent( max_load_event[ thread_type ] );
+ }
+ }
+
+ return 0;
+}
+
+ThreadID *ThreadPool::ReserveThread( int flags )
+{
+
+ // first, check to see if there's any released threads we can grab
+ Nullsoft::Utility::AutoLock threadlock( guard );
+ for ( ThreadID *t : threads )
+ {
+ if ( t->IsReserved() && t->IsReleased() && t->CanRunCOM( flags ) )
+ {
+ t->Reserve();
+ return t;
+ }
+ }
+
+ // TODO: if there are enough free threads available, mark one as reserved
+// this will involve signalling the thread to switch to 'reserved' mode
+// swapping out the 'function list' semaphore with a local one
+// and removing all 'busy handles' from the queue
+// can probably use the 'wake' handle to synchronize this
+
+/*
+int thread_type = GetThreadType(flags);
+if (num_threads_available[thread_type > 2])
+{
+ for (size_t i=0;i!=threads.size();i++)
+ {
+ if (threads[i]->IsReserved() == false && threads[i]->CanRunCOM(flags))
+ {
+
+ }
+ }
+}
+*/
+
+ ThreadID *new_thread = CreateNewThread_Internal( GetThreadType( flags, 1 ) );
+
+ return new_thread;
+}
+
+void ThreadPool::ReleaseThread( ThreadID *thread_id )
+{
+ if ( thread_id )
+ {
+ // lock so there's no race condition between ReserveThread() and ReleaseThread()
+ Nullsoft::Utility::AutoLock threadlock( guard );
+ thread_id->Release();
+ }
+}
+
+
+int ThreadPool::AddHandle( ThreadID *thread_id, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags )
+{
+ // TODO: need to ensure that handles are not duplicated
+ thread_functions.Add( handle, func, user_data, id );
+
+ if ( thread_id )
+ {
+ if ( thread_id->CanRunCOM( flags ) )
+ thread_id->AddHandle( handle );
+ else
+ return 1;
+ return 0;
+ }
+ else
+ {
+ /* increment thread counts temporarily - because the massive wake-up
+ causes thread counts to go to 0 */
+ for ( int i = 0; i < THREAD_TYPES; i++ )
+ InterlockedIncrement( &num_threads_available[ i ] );
+
+ guard.Lock();
+
+ AddHandle_Internal( 0, handle, flags );
+
+ bool thread_types[ THREAD_TYPES ];
+ GetThreadTypes( flags, thread_types );
+
+ for ( int i = 0; i < THREAD_TYPES; i++ )
+ {
+ if ( thread_types[ i ] )
+ any_thread_handles[ i ].push_back( handle );
+ }
+
+ guard.Unlock();
+
+ for ( int i = 0; i < THREAD_TYPES; i++ )
+ InterlockedDecrement( &num_threads_available[ i ] );
+
+
+ }
+
+ return 0;
+}
+
+/* helper functions for adding/removing handles,
+we keep going through the list as long as we can add/remove immediately.
+once we have to block, we recurse the function starting at the next handle
+when the function returns, we wait.
+this lets us do some work rather than sit and wait for each thread's lock */
+void ThreadPool::RemoveHandle_Internal(size_t start, HANDLE handle)
+{
+ for (;start!=threads.size();start++)
+ {
+ ThreadID *t = threads[start];
+ if (!t->TryRemoveHandle(handle)) // try to remove
+ {
+ // have to wait
+ RemoveHandle_Internal(start+1, handle); // recurse start with the next thread
+ t->WaitRemoveHandle(handle); // finish the job
+ return;
+ }
+ }
+}
+
+void ThreadPool::AddHandle_Internal(size_t start, HANDLE handle, int flags)
+{
+ for (;start<threads.size();start++)
+ {
+ ThreadID *t = threads[start];
+ if ((flags & api_threadpool::FLAG_LONG_EXECUTION) && t->IsReserved())
+ continue;
+
+ if (!t->CanRunCOM(flags))
+ continue;
+
+ if (!t->TryAddHandle(handle)) // try to add
+ {
+ // have to wait,
+ AddHandle_Internal(start+1, handle, flags); // recurse start with the next thread
+ t->WaitAddHandle(handle); // finish the job
+ return;
+ }
+ }
+}
+
+void ThreadPool::RemoveHandle(ThreadID *thread_id, HANDLE handle)
+{
+ if (thread_id)
+ {
+ thread_id->RemoveHandle(handle);
+ }
+ else
+ {
+ /* increment thread counts temporarily - because the massive wake-up
+ causes thread counts to go to 0 */
+ for (int i=0;i<THREAD_TYPES;i++)
+ InterlockedIncrement(&num_threads_available[i]);
+ guard.Lock();
+ RemoveHandle_Internal(0, handle);
+
+ for (int j=0;j<THREAD_TYPES;j++)
+ {
+ //for (ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin();
+ // itr != any_thread_handles[j].end();
+ // itr++)
+
+ ThreadPoolTypes::HandleList::iterator itr = any_thread_handles[j].begin();
+ while(itr != any_thread_handles[j].end())
+ {
+ if (*itr == handle)
+ {
+ itr = any_thread_handles[j].erase(itr);
+ }
+ else
+ {
+ itr++;
+ }
+ }
+ }
+ guard.Unlock();
+ for (int i=0;i<THREAD_TYPES;i++)
+ InterlockedDecrement(&num_threads_available[i]);
+ }
+}
+
+int ThreadPool::RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags)
+{
+ if (threadid)
+ threadid->QueueFunction(func, user_data, id);
+ else
+ thread_functions.QueueFunction(func, user_data, id);
+ return 0;
+}
+
+ThreadID *ThreadPool::CreateNewThread_Internal(int thread_type)
+{
+ int reserved=0;
+ int com_type = api_threadpool::FLAG_REQUIRE_COM_MT; // default
+ switch(thread_type)
+ {
+ case TYPE_STA_RESERVED:
+ reserved=1;
+ case TYPE_STA:
+ com_type = api_threadpool::FLAG_REQUIRE_COM_STA;
+ break;
+ case TYPE_MT_RESERVED:
+ reserved=1;
+ case TYPE_MT:
+ com_type = api_threadpool::FLAG_REQUIRE_COM_MT;
+ break;
+ }
+
+ Nullsoft::Utility::AutoLock threadlock(guard); // lock here (rather than after new ThreadID) to protect any_thread_handles
+ ThreadID *new_thread = new ThreadID(&thread_functions, killswitch, thread_functions.functions_semaphore,
+ any_thread_handles[thread_type],
+ &num_threads_available[thread_type], max_load_event[thread_type],
+ reserved, com_type);
+ threads.push_back(new_thread);
+ return new_thread;
+}
+
+size_t ThreadPool::GetNumberOfThreads()
+{
+ Nullsoft::Utility::AutoLock threadlock(guard);
+ return threads.size();
+}
+
+size_t ThreadPool::GetNumberOfActiveThreads()
+{
+ size_t numThreads = GetNumberOfThreads();
+ for (int i=0;i<THREAD_TYPES;i++)
+ numThreads -= num_threads_available[i];
+ return numThreads;
+}
+
+int ThreadPool::GetThreadType(int flags, int reserved)
+{
+ flags &= api_threadpool::MASK_COM_FLAGS;
+ int thread_type=TYPE_MT;
+ switch(flags)
+ {
+ case api_threadpool::FLAG_REQUIRE_COM_STA:
+ thread_type = reserved?TYPE_STA_RESERVED:TYPE_STA;
+ break;
+ case 0: // default
+ case api_threadpool::FLAG_REQUIRE_COM_MT:
+ thread_type = reserved?TYPE_MT_RESERVED:TYPE_MT;
+ break;
+ }
+
+ return thread_type;
+}
+
+void ThreadPool::GetThreadTypes(int flags, bool types[THREAD_TYPES])
+{
+ for (int i=0;i<THREAD_TYPES;i++)
+ {
+ types[i]=true;
+ }
+
+ if (flags & api_threadpool::FLAG_REQUIRE_COM_STA)
+ {
+ types[TYPE_MT] = false;
+ types[TYPE_MT] = false;
+ }
+
+ if (flags & api_threadpool::FLAG_REQUIRE_COM_STA)
+ {
+ types[TYPE_STA] = false;
+ types[TYPE_STA_RESERVED] = false;
+ }
+
+ if (flags & api_threadpool::FLAG_LONG_EXECUTION)
+ {
+ types[TYPE_STA_RESERVED] = false;
+ types[TYPE_MT_RESERVED] = false;
+ }
+
+}
+#define CBCLASS ThreadPool
+START_DISPATCH;
+CB(RESERVETHREAD, ReserveThread)
+VCB(RELEASETHREAD, ReleaseThread)
+CB(ADDHANDLE, AddHandle)
+VCB(REMOVEHANDLE, RemoveHandle)
+CB(RUNFUNCTION, RunFunction)
+CB(GETNUMBEROFTHREADS, GetNumberOfThreads)
+CB(GETNUMBEROFACTIVETHREADS, GetNumberOfActiveThreads)
+END_DISPATCH;
+#undef CBCLASS
diff --git a/Src/nu/threadpool/ThreadPool.h b/Src/nu/threadpool/ThreadPool.h
new file mode 100644
index 00000000..f68efc12
--- /dev/null
+++ b/Src/nu/threadpool/ThreadPool.h
@@ -0,0 +1,98 @@
+#pragma once
+
+#include <windows.h>
+#include <bfc/platform/types.h>
+#include <vector>
+#include "../autolock.h"
+#include "ThreadID.h"
+#include "ThreadFunctions.h"
+#include "threadpool_types.h"
+/* random notes
+
+HANDLEs common to all threads
+
+WaitForMultipleObjectsEx() around these
+0 - killswitch
+1 - shared APC event. since threads might want to use APCs themselves, we'll use a different mechanism (thread-safe FIFO and an event). the intention is that APCs that can go on any thread will use this handle
+2 - per thread APC event.
+
+
+parameters for "run my function" method
+function pointer, user data, flags
+flags:
+interrupt - for very short non-locking functions where it is safe to interrupt another thread, uses QueueUserAPC
+no_wait - spawn a new thread if all threads are busy
+com_multithreaded - all threads are created with CoInitialize(0), if you need a COINIT_MULTITHREADED thread, use this flag
+
+parameters for "add my handle" method
+handle, function pointer, user data, flags
+flags:
+single_thread - only one thread in the pool will wait on your object, useful if your handle is not auto-reset
+
+parameters for "function call repeat" - calls your function until you return 0
+function pointer, user data, flags
+flags:
+single_thread - keep calling on the same thread
+*/
+
+
+class ThreadPool : public api_threadpool
+{
+public:
+ static const char *getServiceName() { return "Thread Pool API"; }
+ static const GUID getServiceGuid() { return ThreadPoolGUID; }
+public:
+ // Owner API:
+ ThreadPool();
+ void Kill();
+
+ // User API:
+ /* If you have multiple events, APCs, etc and you need them to always run on the same thread
+ you can reserve one */
+ ThreadID *ReserveThread(int flags);
+ /* Release a thread you've previously reserved */
+ void ReleaseThread(ThreadID *thread_id);
+
+ /* adds a waitable handle to the thread pool. when the event is signalled, your function ptr will get called
+ user_data and id values get passed to your function.
+ your function should return 1 to indicate that it can be removed
+ flags, see api_threadpool */
+ int AddHandle(ThreadID *threadid, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags);
+ void RemoveHandle(ThreadID *threadid, HANDLE handle);
+ int RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags);
+
+ size_t GetNumberOfThreads(); // total number of threads in the threadpool
+ size_t GetNumberOfActiveThreads(); // number of threads that are currently being used (inside user function but not necessarily busy)
+
+private:
+ enum
+ {
+ TYPE_MT = 0,
+ TYPE_STA = 1,
+ TYPE_MT_RESERVED = 2,
+ TYPE_STA_RESERVED = 3,
+
+ THREAD_TYPES = 4, // two thread types, single threaded apartment COM and multithreaded COM
+ };
+private:
+ static DWORD CALLBACK WatchDogThreadProcedure_stub(LPVOID param);
+ ThreadID *CreateNewThread_Internal(int thread_type = 0);
+ DWORD CALLBACK WatchDogThreadProcedure();
+ static int GetThreadType(int flags, int reserved = 0);
+ static void GetThreadTypes(int flags, bool types[THREAD_TYPES]);
+ void RemoveHandle_Internal(size_t start, HANDLE handle); // recursive helper function for RemoveHandle()
+ void AddHandle_Internal(size_t start, HANDLE handle, int flags); // recursive helper function for RemoveHandle()
+
+ Nullsoft::Utility::LockGuard guard; // guards threads, any_thread_handles, and non_reserved_handles data structures
+ typedef std::vector<ThreadID*> ThreadList;
+ ThreadList threads;
+ ThreadPoolTypes::HandleList any_thread_handles[THREAD_TYPES];
+ HANDLE killswitch;
+ HANDLE watchdog_thread_handle;
+ volatile LONG num_threads_available[THREAD_TYPES];
+ ThreadFunctions thread_functions;
+ HANDLE max_load_event[THREAD_TYPES];
+protected:
+ RECVS_DISPATCH;
+};
+
diff --git a/Src/nu/threadpool/TimerHandle.hpp b/Src/nu/threadpool/TimerHandle.hpp
new file mode 100644
index 00000000..ccbb6ef6
--- /dev/null
+++ b/Src/nu/threadpool/TimerHandle.hpp
@@ -0,0 +1,54 @@
+#ifndef NU_THREADPOOL_TIMERHANDLE_H
+#define NU_THREADPOOL_TIMERHANDLE_H
+
+#if !defined(_WIN32_WINNT) || (_WIN32_WINNT < 0x400)
+#error Must define _WIN32_WINNT >= 0x400 to use TimerHandle
+#endif
+
+#include <windows.h>
+#include <bfc/platform/types.h>
+/*
+TimerHandle() constructor will make a new timer handle
+TimerHandle(existing_handle) will "take over" an existing handle
+~TimerHandle() DOES NOT CloseHandle as this object is meant as a helper
+call Close() to kill the timer handle
+
+The timer will be "one shot" auto-reset.
+Because it is meant to be compatible with the threadpool, manual-reset timers and periodic timers
+are not recommended!! You will have re-entrancy problems
+If you want "periodic" behavior, call Wait() at the end of your ThreadPoolFunc
+*/
+class TimerHandle
+{
+public:
+ TimerHandle() { timerHandle = CreateWaitableTimer( 0, FALSE, 0 ); }
+ TimerHandle( HANDLE p_handle ) { timerHandle = p_handle; }
+
+ void Close() { CloseHandle( timerHandle ); }
+
+ void Wait( uint64_t p_milliseconds )
+ {
+ /* MSDN notes about SetWaitableTimer: 100 nanosecond resolution, Negative values indicate relative time*/
+ LARGE_INTEGER timeout = { 0 };
+ timeout.QuadPart = -( (int64_t)p_milliseconds * 1000LL /*to microseconds*/ * 10LL /* to 100 nanoseconds */ );
+ SetWaitableTimer( timerHandle, &timeout, 0, 0, 0, FALSE );
+ }
+
+ void Poll( uint64_t p_milliseconds ) // only use on a reserved thread!!!
+ {
+ /* MSDN notes about SetWaitableTimer: 100 nanosecond resolution, Negative values indicate relative time*/
+ LARGE_INTEGER timeout = { 0 };
+ timeout.QuadPart = -( (int64_t)p_milliseconds * 1000LL /*to microseconds*/ * 10LL /* to 100 nanoseconds */ );
+ SetWaitableTimer( timerHandle, &timeout, (LONG)p_milliseconds, 0, 0, FALSE );
+ }
+
+ /* TODO: WaitUntil method for absolute times */
+
+ void Cancel() { CancelWaitableTimer( timerHandle ); }
+ operator HANDLE() { return timerHandle; }
+
+private:
+ HANDLE timerHandle;
+};
+
+#endif // !NU_THREADPOOL_TIMERHANDLE_H \ No newline at end of file
diff --git a/Src/nu/threadpool/api_threadpool.h b/Src/nu/threadpool/api_threadpool.h
new file mode 100644
index 00000000..4cf5ab4f
--- /dev/null
+++ b/Src/nu/threadpool/api_threadpool.h
@@ -0,0 +1,92 @@
+#pragma once
+
+#include <windows.h>
+#include <bfc/platform/types.h>
+#include <bfc/dispatch.h>
+
+class ThreadID;
+
+class api_threadpool : public Dispatchable
+{
+protected:
+ api_threadpool() {}
+ ~api_threadpool() {}
+public:
+ typedef int (*ThreadPoolFunc)(HANDLE handle, void *user_data, intptr_t id);
+ enum
+ {
+ // pass this flag to AddHandle or RunFunction indicate that your thread function
+ // might run for a long time
+ FLAG_LONG_EXECUTION = 0x1,
+ FLAG_REQUIRE_COM_STA = 0x2,
+ FLAG_REQUIRE_COM_MT = 0x4,
+ MASK_COM_FLAGS = 0x6,
+ };
+
+public:
+ ThreadID *ReserveThread(int flags);
+ /* Release a thread you've previously reserved */
+ void ReleaseThread(ThreadID *thread_id);
+
+ /* adds a waitable handle to the thread pool. when the event is signalled, your function ptr will get called
+ user_data and id values get passed to your function.
+ your function should return 1 to indicate that it can be removed
+ flags, see api_threadpool */
+ int AddHandle(ThreadID *threadid, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags);
+ void RemoveHandle(ThreadID *threadid, HANDLE handle);
+ int RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags);
+
+ size_t GetNumberOfThreads(); // total number of threads in the threadpool
+ size_t GetNumberOfActiveThreads(); // number of threads that are currently being used (inside user function but not necessarily busy)
+
+ enum
+ {
+ RESERVETHREAD = 0,
+ RELEASETHREAD = 1,
+ ADDHANDLE = 2,
+ REMOVEHANDLE = 3,
+ RUNFUNCTION = 4,
+ GETNUMBEROFTHREADS = 5,
+ GETNUMBEROFACTIVETHREADS = 6,
+ };
+};
+
+inline ThreadID *api_threadpool::ReserveThread(int flags)
+{
+ return _call(RESERVETHREAD, (ThreadID *)0, flags);
+}
+
+inline void api_threadpool::ReleaseThread(ThreadID *thread_id)
+{
+ _voidcall(RELEASETHREAD, thread_id);
+}
+
+inline int api_threadpool::AddHandle(ThreadID *threadid, HANDLE handle, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags)
+{
+ return _call(ADDHANDLE, (int)1, threadid, handle, func, user_data, id, flags);
+}
+
+inline void api_threadpool::RemoveHandle(ThreadID *threadid, HANDLE handle)
+{
+ _voidcall(REMOVEHANDLE, threadid, handle);
+}
+
+inline int api_threadpool::RunFunction(ThreadID *threadid, api_threadpool::ThreadPoolFunc func, void *user_data, intptr_t id, int flags)
+{
+ return _call(RUNFUNCTION, (int)1, threadid, func, user_data, id, flags);
+}
+
+inline size_t api_threadpool::GetNumberOfThreads()
+{
+ return _call(GETNUMBEROFTHREADS, (size_t)0);
+}
+
+inline size_t api_threadpool::GetNumberOfActiveThreads()
+{
+ return _call(GETNUMBEROFACTIVETHREADS, (size_t)0);
+}
+
+// {4DE015D3-11D8-4ac6-A3E6-216DF5252107}
+static const GUID ThreadPoolGUID =
+{ 0x4de015d3, 0x11d8, 0x4ac6, { 0xa3, 0xe6, 0x21, 0x6d, 0xf5, 0x25, 0x21, 0x7 } };
+
diff --git a/Src/nu/threadpool/threadpool.sln b/Src/nu/threadpool/threadpool.sln
new file mode 100644
index 00000000..790cf5e9
--- /dev/null
+++ b/Src/nu/threadpool/threadpool.sln
@@ -0,0 +1,23 @@
+Microsoft Visual Studio Solution File, Format Version 8.00
+Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "threadpool", "threadpool.vcproj", "{12CC5DA2-87FF-456A-AF20-A243F168EFE8}"
+ ProjectSection(ProjectDependencies) = postProject
+ EndProjectSection
+EndProject
+Global
+ GlobalSection(SolutionConfiguration) = preSolution
+ Debug = Debug
+ Release = Release
+ EndGlobalSection
+ GlobalSection(ProjectDependencies) = postSolution
+ EndGlobalSection
+ GlobalSection(ProjectConfiguration) = postSolution
+ {12CC5DA2-87FF-456A-AF20-A243F168EFE8}.Debug.ActiveCfg = Debug|Win32
+ {12CC5DA2-87FF-456A-AF20-A243F168EFE8}.Debug.Build.0 = Debug|Win32
+ {12CC5DA2-87FF-456A-AF20-A243F168EFE8}.Release.ActiveCfg = Release|Win32
+ {12CC5DA2-87FF-456A-AF20-A243F168EFE8}.Release.Build.0 = Release|Win32
+ EndGlobalSection
+ GlobalSection(ExtensibilityGlobals) = postSolution
+ EndGlobalSection
+ GlobalSection(ExtensibilityAddIns) = postSolution
+ EndGlobalSection
+EndGlobal
diff --git a/Src/nu/threadpool/threadpool.vcproj b/Src/nu/threadpool/threadpool.vcproj
new file mode 100644
index 00000000..c526cdaa
--- /dev/null
+++ b/Src/nu/threadpool/threadpool.vcproj
@@ -0,0 +1,137 @@
+<?xml version="1.0" encoding="Windows-1252"?>
+<VisualStudioProject
+ ProjectType="Visual C++"
+ Version="7.10"
+ Name="threadpool"
+ ProjectGUID="{12CC5DA2-87FF-456A-AF20-A243F168EFE8}"
+ Keyword="Win32Proj">
+ <Platforms>
+ <Platform
+ Name="Win32"/>
+ </Platforms>
+ <Configurations>
+ <Configuration
+ Name="Debug|Win32"
+ OutputDirectory="Debug"
+ IntermediateDirectory="Debug"
+ ConfigurationType="4"
+ CharacterSet="2">
+ <Tool
+ Name="VCCLCompilerTool"
+ Optimization="0"
+ AdditionalIncludeDirectories="../../Wasabi"
+ PreprocessorDefinitions="WIN32;_DEBUG;_LIB;_WIN32_WINNT=0x400"
+ MinimalRebuild="TRUE"
+ BasicRuntimeChecks="3"
+ RuntimeLibrary="5"
+ UsePrecompiledHeader="0"
+ WarningLevel="3"
+ Detect64BitPortabilityProblems="TRUE"
+ DebugInformationFormat="4"/>
+ <Tool
+ Name="VCCustomBuildTool"/>
+ <Tool
+ Name="VCLibrarianTool"
+ OutputFile="$(OutDir)/threadpool.lib"/>
+ <Tool
+ Name="VCMIDLTool"/>
+ <Tool
+ Name="VCPostBuildEventTool"/>
+ <Tool
+ Name="VCPreBuildEventTool"/>
+ <Tool
+ Name="VCPreLinkEventTool"/>
+ <Tool
+ Name="VCResourceCompilerTool"/>
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"/>
+ <Tool
+ Name="VCXMLDataGeneratorTool"/>
+ <Tool
+ Name="VCManagedWrapperGeneratorTool"/>
+ <Tool
+ Name="VCAuxiliaryManagedWrapperGeneratorTool"/>
+ </Configuration>
+ <Configuration
+ Name="Release|Win32"
+ OutputDirectory="Release"
+ IntermediateDirectory="Release"
+ ConfigurationType="4"
+ CharacterSet="2">
+ <Tool
+ Name="VCCLCompilerTool"
+ AdditionalIncludeDirectories="../../Wasabi"
+ PreprocessorDefinitions="WIN32;NDEBUG;_LIB;_WIN32_WINNT=0x400"
+ RuntimeLibrary="4"
+ UsePrecompiledHeader="0"
+ WarningLevel="3"
+ Detect64BitPortabilityProblems="TRUE"
+ DebugInformationFormat="3"/>
+ <Tool
+ Name="VCCustomBuildTool"/>
+ <Tool
+ Name="VCLibrarianTool"
+ OutputFile="$(OutDir)/threadpool.lib"/>
+ <Tool
+ Name="VCMIDLTool"/>
+ <Tool
+ Name="VCPostBuildEventTool"/>
+ <Tool
+ Name="VCPreBuildEventTool"/>
+ <Tool
+ Name="VCPreLinkEventTool"/>
+ <Tool
+ Name="VCResourceCompilerTool"/>
+ <Tool
+ Name="VCWebServiceProxyGeneratorTool"/>
+ <Tool
+ Name="VCXMLDataGeneratorTool"/>
+ <Tool
+ Name="VCManagedWrapperGeneratorTool"/>
+ <Tool
+ Name="VCAuxiliaryManagedWrapperGeneratorTool"/>
+ </Configuration>
+ </Configurations>
+ <References>
+ </References>
+ <Files>
+ <Filter
+ Name="Source Files"
+ Filter="cpp;c;cxx;def;odl;idl;hpj;bat;asm;asmx"
+ UniqueIdentifier="{4FC737F1-C7A5-4376-A066-2A32D752A2FF}">
+ <File
+ RelativePath=".\ThreadFunctions.cpp">
+ </File>
+ <File
+ RelativePath=".\ThreadID.cpp">
+ </File>
+ <File
+ RelativePath=".\ThreadPool.cpp">
+ </File>
+ </Filter>
+ <Filter
+ Name="Header Files"
+ Filter="h;hpp;hxx;hm;inl;inc;xsd"
+ UniqueIdentifier="{93995380-89BD-4b04-88EB-625FBE52EBFB}">
+ <File
+ RelativePath=".\api_threadpool.h">
+ </File>
+ <File
+ RelativePath=".\ThreadFunctions.h">
+ </File>
+ <File
+ RelativePath=".\ThreadID.h">
+ </File>
+ <File
+ RelativePath=".\ThreadPool.h">
+ </File>
+ </Filter>
+ <Filter
+ Name="Resource Files"
+ Filter="rc;ico;cur;bmp;dlg;rc2;rct;bin;rgs;gif;jpg;jpeg;jpe;resx"
+ UniqueIdentifier="{67DA6AB6-F800-4c08-8B7A-83BB121AAD01}">
+ </Filter>
+ </Files>
+ <Globals>
+ </Globals>
+</VisualStudioProject>
diff --git a/Src/nu/threadpool/threadpool_types.h b/Src/nu/threadpool/threadpool_types.h
new file mode 100644
index 00000000..9a23e833
--- /dev/null
+++ b/Src/nu/threadpool/threadpool_types.h
@@ -0,0 +1,8 @@
+#pragma once
+#include <deque>
+#include <windows.h>
+namespace ThreadPoolTypes
+{
+ typedef std::deque<HANDLE> HandleList;
+ const int MAX_SEMAPHORE_VALUE = 1024; //some arbitrarily high amount*
+} \ No newline at end of file