diff options
Diffstat (limited to 'Src/replicant/nu/win')
-rw-r--r-- | Src/replicant/nu/win/MessageLoop.cpp | 121 | ||||
-rw-r--r-- | Src/replicant/nu/win/MessageLoop.h | 45 | ||||
-rw-r--r-- | Src/replicant/nu/win/ThreadLoop.cpp | 146 | ||||
-rw-r--r-- | Src/replicant/nu/win/ThreadLoop.h | 40 |
4 files changed, 352 insertions, 0 deletions
diff --git a/Src/replicant/nu/win/MessageLoop.cpp b/Src/replicant/nu/win/MessageLoop.cpp new file mode 100644 index 00000000..2353801c --- /dev/null +++ b/Src/replicant/nu/win/MessageLoop.cpp @@ -0,0 +1,121 @@ +#include "MessageLoop.h" +#include <assert.h> + +lifo_t nu::MessageLoop::message_cache = {0,}; +lifo_t nu::MessageLoop::cache_bases= {0,}; + +#define MESSAAGE_CACHE_SEED 64 + +typedef uint8_t message_data_t[64]; // ensure all messages are this size + +nu::MessageLoop::MessageLoop() +{ + mpscq_init(&message_queue); + message_notification = CreateEvent(0, FALSE, FALSE, 0); +} + +nu::MessageLoop::~MessageLoop() +{ + CloseHandle(message_notification); +} + +void nu::MessageLoop::RefillCache() +{ + message_data_t *cache_seed = (message_data_t *)_aligned_malloc(MESSAAGE_CACHE_SEED*sizeof(message_data_t), 64); + + if (cache_seed) + { + int i=MESSAAGE_CACHE_SEED; + while (--i) + { + lifo_push(&message_cache, (queue_node_t *)&cache_seed[i]); + } + lifo_push(&cache_bases, (queue_node_t *)cache_seed); + } + else + { + Sleep(0); // yield and hope that someone else pops something off soon + } +} + +nu::message_node_t *nu::MessageLoop::AllocateMessage() +{ + message_node_t *apc = 0; + + do + { + apc = (message_node_t *)lifo_pop(&message_cache); + if (!apc) + RefillCache(); + } while (!apc); + return apc; +} + +void nu::MessageLoop::PostMessage(nu::message_node_t *message) +{ + if (mpscq_push(&message_queue, message) == 0) + SetEvent(message_notification); +} + +void nu::MessageLoop::FreeMessage(nu::message_node_t *message) +{ + lifo_push(&message_cache, message); +} + +nu::message_node_t *nu::MessageLoop::GetMessage() +{ + message_node_t *message = PeekMessage(); + if (message) + { + return message; + } + + while (WaitForSingleObject(message_notification, INFINITE) == WAIT_OBJECT_0) + { + message = PeekMessage(); + if (message) + { + return message; + } + } + return 0; +} + +nu::message_node_t *nu::MessageLoop::PeekMessage() +{ + for (;;) // loop because we need to handle 'busy' from the queue + { + message_node_t *message = (message_node_t *)mpscq_pop(&message_queue); + if (message == (message_node_t *)1) /* special return value that indicates a busy list */ + { + // benski> although it's tempting to return 0 here, doing so will mess up the Event logic + Sleep(0); // yield so that the thread that got pre-empted during push can finish + } + else + { + if (message) + { + return message; + } + else + { + return 0; + } + } + } +} + +nu::message_node_t *nu::MessageLoop::PeekMessage(unsigned int milliseconds) +{ + message_node_t *message = PeekMessage(); + if (message) + return message; + + if (WaitForSingleObject(message_notification, milliseconds) == WAIT_OBJECT_0) + { + message = PeekMessage(); + if (message) + return message; + } + return 0; +} diff --git a/Src/replicant/nu/win/MessageLoop.h b/Src/replicant/nu/win/MessageLoop.h new file mode 100644 index 00000000..88e4520f --- /dev/null +++ b/Src/replicant/nu/win/MessageLoop.h @@ -0,0 +1,45 @@ +#pragma once +#include "foundation/types.h" +#include "nu/lfmpscq.h" +#include "nu/LockFreeLIFO.h" +#include <windows.h> + +namespace nu +{ + + /* you can inherit from message_node_t (or combine inside a struct) + but make sure that your message isn't > 64 bytes */ + struct message_node_t : public queue_node_t + { + uint32_t message; + }; + + class MessageLoop + { + public: + MessageLoop(); + ~MessageLoop(); + /* API for Message senders */ + message_node_t *AllocateMessage(); // returns a message for you to fill out + void PostMessage(message_node_t *message); + + /* API for Message receivers */ + void FreeMessage(message_node_t *message); + message_node_t *GetMessage(); // waits forever + message_node_t *PeekMessage(); + message_node_t *PeekMessage(unsigned int milliseconds); + private: + void RefillCache(); + + HANDLE message_notification; + mpscq_t message_queue; + + /* Memory cache to be able to run APCs without having the memory manager lock + we'll allocate 100 at a time (#defined by MESSAGE_CACHE_SEED) + and allocate new ones only if the cache is empty (which unfortunately will lock) + cache_bases holds the pointers we've allocated (to free on destruction of this object) + and message_cache holds the individual pointers */ + static lifo_t message_cache; + static lifo_t cache_bases; + }; +}
\ No newline at end of file diff --git a/Src/replicant/nu/win/ThreadLoop.cpp b/Src/replicant/nu/win/ThreadLoop.cpp new file mode 100644 index 00000000..3ff89e59 --- /dev/null +++ b/Src/replicant/nu/win/ThreadLoop.cpp @@ -0,0 +1,146 @@ +#include "ThreadLoop.h" +#include <limits.h> + +lifo_t ThreadLoop::procedure_cache = {0,}; +lifo_t ThreadLoop::cache_bases= {0,}; + +#define PROCEDURE_CACHE_SEED 64 +ThreadLoop::ThreadLoop() +{ + mpscq_init(&procedure_queue); + procedure_notification = CreateSemaphoreW(0, 0, LONG_MAX, 0); + kill_switch = CreateEvent(0, TRUE, FALSE, 0); +} + +ThreadLoop::~ThreadLoop() +{ + CloseHandle(procedure_notification); + CloseHandle(kill_switch); +} + +void ThreadLoop::RefillCache() +{ + threadloop_node_t *cache_seed = (threadloop_node_t *)malloc(PROCEDURE_CACHE_SEED*sizeof(threadloop_node_t)); + + if (cache_seed) + { + int i=PROCEDURE_CACHE_SEED; + while (--i) + { + lifo_push(&procedure_cache, (queue_node_t *)&cache_seed[i]); + } + lifo_push(&cache_bases, (queue_node_t *)cache_seed); + } + else + { + Sleep(0); // yield and hope that someone else pops something off soon + } +} + +void ThreadLoop::Run() +{ + HANDLE events[] = {kill_switch, procedure_notification}; + while (WaitForMultipleObjects(2, events, FALSE, INFINITE) == WAIT_OBJECT_0 + 1) + { + for (;;) + { + threadloop_node_t *apc = (threadloop_node_t *)mpscq_pop(&procedure_queue); + if (apc == (threadloop_node_t *)1) /* special return value that indicates a busy list */ + { + Sleep(0); // yield so that the thread that got pre-empted during push can finish + } + else + { + if (apc) + { + apc->func(apc->param1, apc->param2, apc->real_value); + lifo_push(&procedure_cache, apc); + } + else + { + break; + } + } + } + } +} + +void ThreadLoop::Step(unsigned int milliseconds) +{ + HANDLE events[] = {kill_switch, procedure_notification}; + if (WaitForMultipleObjects(2, events, FALSE, milliseconds) == WAIT_OBJECT_0 + 1) + { + for (;;) + { + threadloop_node_t *apc = (threadloop_node_t *)mpscq_pop(&procedure_queue); + if (apc == (threadloop_node_t *)1) /* special return value that indicates a busy list */ + { + Sleep(0); // yield so that the thread that got pre-empted during push can finish + } + else + { + if (apc) + { + apc->func(apc->param1, apc->param2, apc->real_value); + lifo_push(&procedure_cache, apc); + } + else + { + break; + } + } + } + } +} + +void ThreadLoop::Step() +{ + HANDLE events[] = {kill_switch, procedure_notification}; + if (WaitForMultipleObjects(2, events, FALSE, INFINITE) == WAIT_OBJECT_0 + 1) + { + for (;;) + { + threadloop_node_t *apc = (threadloop_node_t *)mpscq_pop(&procedure_queue); + if (apc == (threadloop_node_t *)1) /* special return value that indicates a busy list */ + { + Sleep(0); // yield so that the thread that got pre-empted during push can finish + } + else + { + if (apc) + { + apc->func(apc->param1, apc->param2, apc->real_value); + lifo_push(&procedure_cache, apc); + } + else + { + break; + } + } + } + } +} + +threadloop_node_t *ThreadLoop::GetAPC() +{ + threadloop_node_t *apc = 0; + + do + { + apc = (threadloop_node_t *)lifo_pop(&procedure_cache); + if (!apc) + RefillCache(); + } while (!apc); + return apc; +} + +void ThreadLoop::Schedule(threadloop_node_t *apc) +{ + if (mpscq_push(&procedure_queue, apc) == 0) + ReleaseSemaphore(procedure_notification, 1, 0); +} + +void ThreadLoop::Kill() +{ + SetEvent(kill_switch); +} diff --git a/Src/replicant/nu/win/ThreadLoop.h b/Src/replicant/nu/win/ThreadLoop.h new file mode 100644 index 00000000..22f16c3d --- /dev/null +++ b/Src/replicant/nu/win/ThreadLoop.h @@ -0,0 +1,40 @@ +#pragma once +#include "nu/lfmpscq.h" +#include "nu/LockFreeLIFO.h" +#include <windows.h> + +struct threadloop_node_t : public queue_node_t +{ + void (*func)(void *param1, void *param2, double real_value); + + void *param1; + void *param2; + double real_value; +}; + +class ThreadLoop +{ +public: + ThreadLoop(); + ~ThreadLoop(); + threadloop_node_t *GetAPC(); // returns a node for you to fill out + void Schedule(threadloop_node_t *apc); + void Step(); + void Step(unsigned int milliseconds); + void Run(); + void Kill(); +private: + void RefillCache(); + + HANDLE procedure_notification; + HANDLE kill_switch; + mpscq_t procedure_queue; + + /* Memory cache to be able to run APCs without having the memory manager lock + we'll allocate 100 at a time (#defined by PROCEDURE_CACHE_SEED) + and allocate new ones only if the cache is empty (which unfortunately will lock) + cache_bases holds the pointers we've allocated (to free on destruction of this object) + and procedure_cache holds the individual pointers */ + static lifo_t procedure_cache; + static lifo_t cache_bases; +};
\ No newline at end of file |