aboutsummaryrefslogtreecommitdiff
path: root/Src/nu/threadpool/ThreadID.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'Src/nu/threadpool/ThreadID.cpp')
-rw-r--r--Src/nu/threadpool/ThreadID.cpp274
1 files changed, 274 insertions, 0 deletions
diff --git a/Src/nu/threadpool/ThreadID.cpp b/Src/nu/threadpool/ThreadID.cpp
new file mode 100644
index 00000000..28d6f1ca
--- /dev/null
+++ b/Src/nu/threadpool/ThreadID.cpp
@@ -0,0 +1,274 @@
+#include "ThreadID.h"
+
+DWORD ThreadID::thread_func_stub(LPVOID param)
+{
+ ThreadID *t = static_cast<ThreadID*>(param);
+ if (t != NULL)
+ {
+ return t->ThreadFunction();
+ }
+ else return 0;
+}
+
+void ThreadID::Kill()
+{
+ if (threadHandle && threadHandle != INVALID_HANDLE_VALUE)
+ {
+ //cut: WaitForSingleObject(threadHandle, INFINITE);
+ while (WaitForMultipleObjectsEx(1, &threadHandle, FALSE, INFINITE, TRUE) != WAIT_OBJECT_0)
+ {
+ }
+ }
+
+}
+
+ThreadID::ThreadID(ThreadFunctions *t_f, HANDLE killswitch, HANDLE global_functions_semaphore,
+ ThreadPoolTypes::HandleList &inherited_handles,
+ volatile LONG *thread_count, HANDLE _max_load_event,
+ int _reserved, int _com_type) : ThreadFunctions(_reserved)
+{
+ /* initialize values */
+ released = false;
+ InitializeCriticalSection(&handle_lock);
+
+ /* grab values passed to us */
+ reserved = _reserved;
+ com_type = _com_type;
+ max_load_event = _max_load_event;
+ global_functions = t_f;
+ num_threads_available = thread_count;
+
+ /* wait_handles[0] is kill switch */
+ wait_handles.push_back(killswitch);
+
+ /* wait_handles[1] is wake switch */
+ wakeHandle = CreateSemaphore(0, 0, ThreadPoolTypes::MAX_SEMAPHORE_VALUE, 0);
+ wait_handles.push_back(wakeHandle);
+
+ if (reserved)
+ {
+ /* if thread is reserved,
+ wait_handles[2] is a Funcion Call wake semaphore
+ for this thread only. */
+ wait_handles.push_back(functions_semaphore); // WAIT_OBJECT_0+1 == per-thread queued functions
+ }
+ else
+ {
+ /* if thread is not reserved,
+ wait_handles[2] is a Function Call wake semaphore
+ global to all threads */
+ wait_handles.push_back(global_functions_semaphore); // WAIT_OBJECT_0+2 == any-thread queued functions
+ }
+
+ /* add inherited handles
+ (handles added to thread pool before this thread was created) */
+ for ( ThreadPoolTypes::HandleList::iterator itr = inherited_handles.begin(); itr != inherited_handles.end(); itr++ )
+ {
+ wait_handles.push_back( *itr );
+ }
+
+ /* start thread */
+ threadHandle = CreateThread(0, 0, thread_func_stub, this, 0, 0);
+}
+
+ThreadID::~ThreadID()
+{
+ CloseHandle(threadHandle);
+ CloseHandle(wakeHandle);
+ DeleteCriticalSection(&handle_lock);
+}
+
+bool ThreadID::TryAddHandle(HANDLE new_handle)
+{
+ // let's see if we get lucky and can access the handle list directly
+ if (TryEnterCriticalSection(&handle_lock))
+ {
+ // made it
+ wait_handles.push_back(new_handle);
+ LeaveCriticalSection(&handle_lock);
+ return true;
+ }
+ else
+ {
+ ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple...
+ return false;
+ }
+}
+
+void ThreadID::WaitAddHandle(HANDLE handle)
+{
+ // wakeHandle already got released once by nature of this function being called
+ EnterCriticalSection(&handle_lock);
+ wait_handles.push_back(handle);
+ LeaveCriticalSection(&handle_lock);
+ ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait
+}
+
+void ThreadID::AddHandle(HANDLE new_handle)
+{
+ if (!TryAddHandle(new_handle))
+ WaitAddHandle(new_handle);
+}
+
+bool ThreadID::TryRemoveHandle(HANDLE handle)
+{
+ // let's see if we get lucky and can access the handle list directly
+ if (TryEnterCriticalSection(&handle_lock))
+ {
+ RemoveHandle_Internal(handle);
+ LeaveCriticalSection(&handle_lock);
+ return true;
+ }
+ else
+ {
+ ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple...
+ return false;
+ }
+ return false;
+}
+
+void ThreadID::WaitRemoveHandle(HANDLE handle)
+{
+ // wakeHandle already got released once by nature of this function being called
+ EnterCriticalSection(&handle_lock);
+ RemoveHandle_Internal(handle);
+ LeaveCriticalSection(&handle_lock);
+ ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait
+}
+
+void ThreadID::RemoveHandle(HANDLE handle)
+{
+ if (!TryRemoveHandle(handle))
+ WaitRemoveHandle(handle);
+}
+
+void ThreadID::RemoveHandle_Internal(HANDLE handle)
+{
+ // first three handles are reserved, so start after that
+ for (size_t i=3;i<wait_handles.size();i++)
+ {
+ if (wait_handles[i] == handle)
+ {
+ wait_handles.erase(wait_handles.begin() + i);
+ i--;
+ }
+ }
+}
+
+bool ThreadID::IsReserved() const
+{
+ return !!reserved;
+}
+
+DWORD CALLBACK ThreadID::ThreadFunction()
+{
+ switch(com_type)
+ {
+ case api_threadpool::FLAG_REQUIRE_COM_MT:
+ CoInitializeEx(0, COINIT_MULTITHREADED);
+ break;
+ case api_threadpool::FLAG_REQUIRE_COM_STA:
+ CoInitialize(0);
+ break;
+ }
+
+ while (1)
+ {
+ InterlockedIncrement(num_threads_available);
+ EnterCriticalSection(&handle_lock);
+ DWORD ret = WaitForMultipleObjectsEx((DWORD)wait_handles.size(), wait_handles.data(), FALSE, INFINITE, TRUE);
+ // cut: LeaveCriticalSection(&handle_lock);
+ if (InterlockedDecrement(num_threads_available) == 0 && !reserved)
+ SetEvent(max_load_event); // notify the watch dog if all the threads are used up
+
+ if (ret == WAIT_OBJECT_0)
+ {
+ // killswitch
+ LeaveCriticalSection(&handle_lock);
+ break;
+ }
+ else if (ret == WAIT_OBJECT_0 + 1)
+ {
+ // we got woken up to release the handles lock
+ // wait for the second signal
+ LeaveCriticalSection(&handle_lock);
+ InterlockedIncrement(num_threads_available);
+ WaitForSingleObject(wakeHandle, INFINITE);
+ InterlockedDecrement(num_threads_available);
+ }
+ else if (ret == WAIT_OBJECT_0 + 2)
+ {
+ LeaveCriticalSection(&handle_lock);
+ api_threadpool::ThreadPoolFunc func;
+ void *user_data;
+ intptr_t id;
+ if (reserved)
+ {
+ // per-thread queued functions
+ if (PopFunction(&func, &user_data, &id))
+ {
+ func(0, user_data, id);
+ }
+ }
+ else
+ {
+ // global queued functions
+ if (global_functions->PopFunction(&func, &user_data, &id))
+ {
+ func(0, user_data, id);
+ }
+ }
+ }
+ else if (ret > WAIT_OBJECT_0 && ret < (WAIT_OBJECT_0 + wait_handles.size()))
+ {
+ DWORD index = ret - WAIT_OBJECT_0;
+ HANDLE handle = wait_handles[index];
+ LeaveCriticalSection(&handle_lock);
+ /* !!! race condition here if someone calls ThreadPool::RemoveHandle and then CloseHandle() !!!
+ before calling RemoveHandle, caller needs to either
+ ensure that Event is unsignalled (And won't be signalled)
+ or call RemoveHandle from within the function callback */
+ api_threadpool::ThreadPoolFunc func;
+ void *user_data;
+ intptr_t id;
+ if (global_functions->Get(handle, &func, &user_data, &id))
+ {
+ func(handle, user_data, id);
+ }
+ }
+ else
+ {
+ LeaveCriticalSection(&handle_lock);
+ }
+ }
+ if (com_type & api_threadpool::MASK_COM_FLAGS)
+ CoUninitialize();
+ return 0;
+}
+
+bool ThreadID::CanRunCOM(int flags) const
+{
+ switch(com_type)
+ {
+ case api_threadpool::FLAG_REQUIRE_COM_MT: // if we're a CONIT_MULTITHREADEX thread (default)
+ return !(flags & api_threadpool::FLAG_REQUIRE_COM_STA); // don't let STA stuff run
+ case api_threadpool::FLAG_REQUIRE_COM_STA: // if we're a CoInitialize(0) thread
+ return !(flags & api_threadpool::FLAG_REQUIRE_COM_MT); // don't let MT stuff run
+ }
+ return false; // shouldn't get here
+}
+
+bool ThreadID::IsReleased() const
+{
+ return released;
+}
+
+void ThreadID::Reserve()
+{
+ released=false;
+}
+
+void ThreadID::Release()
+{
+ released=true;
+} \ No newline at end of file