diff options
Diffstat (limited to 'Src/nu/threadpool/ThreadID.cpp')
-rw-r--r-- | Src/nu/threadpool/ThreadID.cpp | 274 |
1 files changed, 274 insertions, 0 deletions
diff --git a/Src/nu/threadpool/ThreadID.cpp b/Src/nu/threadpool/ThreadID.cpp new file mode 100644 index 00000000..28d6f1ca --- /dev/null +++ b/Src/nu/threadpool/ThreadID.cpp @@ -0,0 +1,274 @@ +#include "ThreadID.h" + +DWORD ThreadID::thread_func_stub(LPVOID param) +{ + ThreadID *t = static_cast<ThreadID*>(param); + if (t != NULL) + { + return t->ThreadFunction(); + } + else return 0; +} + +void ThreadID::Kill() +{ + if (threadHandle && threadHandle != INVALID_HANDLE_VALUE) + { + //cut: WaitForSingleObject(threadHandle, INFINITE); + while (WaitForMultipleObjectsEx(1, &threadHandle, FALSE, INFINITE, TRUE) != WAIT_OBJECT_0) + { + } + } + +} + +ThreadID::ThreadID(ThreadFunctions *t_f, HANDLE killswitch, HANDLE global_functions_semaphore, + ThreadPoolTypes::HandleList &inherited_handles, + volatile LONG *thread_count, HANDLE _max_load_event, + int _reserved, int _com_type) : ThreadFunctions(_reserved) +{ + /* initialize values */ + released = false; + InitializeCriticalSection(&handle_lock); + + /* grab values passed to us */ + reserved = _reserved; + com_type = _com_type; + max_load_event = _max_load_event; + global_functions = t_f; + num_threads_available = thread_count; + + /* wait_handles[0] is kill switch */ + wait_handles.push_back(killswitch); + + /* wait_handles[1] is wake switch */ + wakeHandle = CreateSemaphore(0, 0, ThreadPoolTypes::MAX_SEMAPHORE_VALUE, 0); + wait_handles.push_back(wakeHandle); + + if (reserved) + { + /* if thread is reserved, + wait_handles[2] is a Funcion Call wake semaphore + for this thread only. */ + wait_handles.push_back(functions_semaphore); // WAIT_OBJECT_0+1 == per-thread queued functions + } + else + { + /* if thread is not reserved, + wait_handles[2] is a Function Call wake semaphore + global to all threads */ + wait_handles.push_back(global_functions_semaphore); // WAIT_OBJECT_0+2 == any-thread queued functions + } + + /* add inherited handles + (handles added to thread pool before this thread was created) */ + for ( ThreadPoolTypes::HandleList::iterator itr = inherited_handles.begin(); itr != inherited_handles.end(); itr++ ) + { + wait_handles.push_back( *itr ); + } + + /* start thread */ + threadHandle = CreateThread(0, 0, thread_func_stub, this, 0, 0); +} + +ThreadID::~ThreadID() +{ + CloseHandle(threadHandle); + CloseHandle(wakeHandle); + DeleteCriticalSection(&handle_lock); +} + +bool ThreadID::TryAddHandle(HANDLE new_handle) +{ + // let's see if we get lucky and can access the handle list directly + if (TryEnterCriticalSection(&handle_lock)) + { + // made it + wait_handles.push_back(new_handle); + LeaveCriticalSection(&handle_lock); + return true; + } + else + { + ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple... + return false; + } +} + +void ThreadID::WaitAddHandle(HANDLE handle) +{ + // wakeHandle already got released once by nature of this function being called + EnterCriticalSection(&handle_lock); + wait_handles.push_back(handle); + LeaveCriticalSection(&handle_lock); + ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait +} + +void ThreadID::AddHandle(HANDLE new_handle) +{ + if (!TryAddHandle(new_handle)) + WaitAddHandle(new_handle); +} + +bool ThreadID::TryRemoveHandle(HANDLE handle) +{ + // let's see if we get lucky and can access the handle list directly + if (TryEnterCriticalSection(&handle_lock)) + { + RemoveHandle_Internal(handle); + LeaveCriticalSection(&handle_lock); + return true; + } + else + { + ReleaseSemaphore(wakeHandle, 1, 0); // kick the thread out of WaitForMultiple... + return false; + } + return false; +} + +void ThreadID::WaitRemoveHandle(HANDLE handle) +{ + // wakeHandle already got released once by nature of this function being called + EnterCriticalSection(&handle_lock); + RemoveHandle_Internal(handle); + LeaveCriticalSection(&handle_lock); + ReleaseSemaphore(wakeHandle, 1, 0); // kick out the second wait +} + +void ThreadID::RemoveHandle(HANDLE handle) +{ + if (!TryRemoveHandle(handle)) + WaitRemoveHandle(handle); +} + +void ThreadID::RemoveHandle_Internal(HANDLE handle) +{ + // first three handles are reserved, so start after that + for (size_t i=3;i<wait_handles.size();i++) + { + if (wait_handles[i] == handle) + { + wait_handles.erase(wait_handles.begin() + i); + i--; + } + } +} + +bool ThreadID::IsReserved() const +{ + return !!reserved; +} + +DWORD CALLBACK ThreadID::ThreadFunction() +{ + switch(com_type) + { + case api_threadpool::FLAG_REQUIRE_COM_MT: + CoInitializeEx(0, COINIT_MULTITHREADED); + break; + case api_threadpool::FLAG_REQUIRE_COM_STA: + CoInitialize(0); + break; + } + + while (1) + { + InterlockedIncrement(num_threads_available); + EnterCriticalSection(&handle_lock); + DWORD ret = WaitForMultipleObjectsEx((DWORD)wait_handles.size(), wait_handles.data(), FALSE, INFINITE, TRUE); + // cut: LeaveCriticalSection(&handle_lock); + if (InterlockedDecrement(num_threads_available) == 0 && !reserved) + SetEvent(max_load_event); // notify the watch dog if all the threads are used up + + if (ret == WAIT_OBJECT_0) + { + // killswitch + LeaveCriticalSection(&handle_lock); + break; + } + else if (ret == WAIT_OBJECT_0 + 1) + { + // we got woken up to release the handles lock + // wait for the second signal + LeaveCriticalSection(&handle_lock); + InterlockedIncrement(num_threads_available); + WaitForSingleObject(wakeHandle, INFINITE); + InterlockedDecrement(num_threads_available); + } + else if (ret == WAIT_OBJECT_0 + 2) + { + LeaveCriticalSection(&handle_lock); + api_threadpool::ThreadPoolFunc func; + void *user_data; + intptr_t id; + if (reserved) + { + // per-thread queued functions + if (PopFunction(&func, &user_data, &id)) + { + func(0, user_data, id); + } + } + else + { + // global queued functions + if (global_functions->PopFunction(&func, &user_data, &id)) + { + func(0, user_data, id); + } + } + } + else if (ret > WAIT_OBJECT_0 && ret < (WAIT_OBJECT_0 + wait_handles.size())) + { + DWORD index = ret - WAIT_OBJECT_0; + HANDLE handle = wait_handles[index]; + LeaveCriticalSection(&handle_lock); + /* !!! race condition here if someone calls ThreadPool::RemoveHandle and then CloseHandle() !!! + before calling RemoveHandle, caller needs to either + ensure that Event is unsignalled (And won't be signalled) + or call RemoveHandle from within the function callback */ + api_threadpool::ThreadPoolFunc func; + void *user_data; + intptr_t id; + if (global_functions->Get(handle, &func, &user_data, &id)) + { + func(handle, user_data, id); + } + } + else + { + LeaveCriticalSection(&handle_lock); + } + } + if (com_type & api_threadpool::MASK_COM_FLAGS) + CoUninitialize(); + return 0; +} + +bool ThreadID::CanRunCOM(int flags) const +{ + switch(com_type) + { + case api_threadpool::FLAG_REQUIRE_COM_MT: // if we're a CONIT_MULTITHREADEX thread (default) + return !(flags & api_threadpool::FLAG_REQUIRE_COM_STA); // don't let STA stuff run + case api_threadpool::FLAG_REQUIRE_COM_STA: // if we're a CoInitialize(0) thread + return !(flags & api_threadpool::FLAG_REQUIRE_COM_MT); // don't let MT stuff run + } + return false; // shouldn't get here +} + +bool ThreadID::IsReleased() const +{ + return released; +} + +void ThreadID::Reserve() +{ + released=false; +} + +void ThreadID::Release() +{ + released=true; +}
\ No newline at end of file |