aboutsummaryrefslogtreecommitdiff
path: root/Src/replicant/nu/win
diff options
context:
space:
mode:
Diffstat (limited to 'Src/replicant/nu/win')
-rw-r--r--Src/replicant/nu/win/MessageLoop.cpp121
-rw-r--r--Src/replicant/nu/win/MessageLoop.h45
-rw-r--r--Src/replicant/nu/win/ThreadLoop.cpp146
-rw-r--r--Src/replicant/nu/win/ThreadLoop.h40
4 files changed, 352 insertions, 0 deletions
diff --git a/Src/replicant/nu/win/MessageLoop.cpp b/Src/replicant/nu/win/MessageLoop.cpp
new file mode 100644
index 00000000..2353801c
--- /dev/null
+++ b/Src/replicant/nu/win/MessageLoop.cpp
@@ -0,0 +1,121 @@
+#include "MessageLoop.h"
+#include <assert.h>
+
+lifo_t nu::MessageLoop::message_cache = {0,};
+lifo_t nu::MessageLoop::cache_bases= {0,};
+
+#define MESSAAGE_CACHE_SEED 64
+
+typedef uint8_t message_data_t[64]; // ensure all messages are this size
+
+nu::MessageLoop::MessageLoop()
+{
+ mpscq_init(&message_queue);
+ message_notification = CreateEvent(0, FALSE, FALSE, 0);
+}
+
+nu::MessageLoop::~MessageLoop()
+{
+ CloseHandle(message_notification);
+}
+
+void nu::MessageLoop::RefillCache()
+{
+ message_data_t *cache_seed = (message_data_t *)_aligned_malloc(MESSAAGE_CACHE_SEED*sizeof(message_data_t), 64);
+
+ if (cache_seed)
+ {
+ int i=MESSAAGE_CACHE_SEED;
+ while (--i)
+ {
+ lifo_push(&message_cache, (queue_node_t *)&cache_seed[i]);
+ }
+ lifo_push(&cache_bases, (queue_node_t *)cache_seed);
+ }
+ else
+ {
+ Sleep(0); // yield and hope that someone else pops something off soon
+ }
+}
+
+nu::message_node_t *nu::MessageLoop::AllocateMessage()
+{
+ message_node_t *apc = 0;
+
+ do
+ {
+ apc = (message_node_t *)lifo_pop(&message_cache);
+ if (!apc)
+ RefillCache();
+ } while (!apc);
+ return apc;
+}
+
+void nu::MessageLoop::PostMessage(nu::message_node_t *message)
+{
+ if (mpscq_push(&message_queue, message) == 0)
+ SetEvent(message_notification);
+}
+
+void nu::MessageLoop::FreeMessage(nu::message_node_t *message)
+{
+ lifo_push(&message_cache, message);
+}
+
+nu::message_node_t *nu::MessageLoop::GetMessage()
+{
+ message_node_t *message = PeekMessage();
+ if (message)
+ {
+ return message;
+ }
+
+ while (WaitForSingleObject(message_notification, INFINITE) == WAIT_OBJECT_0)
+ {
+ message = PeekMessage();
+ if (message)
+ {
+ return message;
+ }
+ }
+ return 0;
+}
+
+nu::message_node_t *nu::MessageLoop::PeekMessage()
+{
+ for (;;) // loop because we need to handle 'busy' from the queue
+ {
+ message_node_t *message = (message_node_t *)mpscq_pop(&message_queue);
+ if (message == (message_node_t *)1) /* special return value that indicates a busy list */
+ {
+ // benski> although it's tempting to return 0 here, doing so will mess up the Event logic
+ Sleep(0); // yield so that the thread that got pre-empted during push can finish
+ }
+ else
+ {
+ if (message)
+ {
+ return message;
+ }
+ else
+ {
+ return 0;
+ }
+ }
+ }
+}
+
+nu::message_node_t *nu::MessageLoop::PeekMessage(unsigned int milliseconds)
+{
+ message_node_t *message = PeekMessage();
+ if (message)
+ return message;
+
+ if (WaitForSingleObject(message_notification, milliseconds) == WAIT_OBJECT_0)
+ {
+ message = PeekMessage();
+ if (message)
+ return message;
+ }
+ return 0;
+}
diff --git a/Src/replicant/nu/win/MessageLoop.h b/Src/replicant/nu/win/MessageLoop.h
new file mode 100644
index 00000000..88e4520f
--- /dev/null
+++ b/Src/replicant/nu/win/MessageLoop.h
@@ -0,0 +1,45 @@
+#pragma once
+#include "foundation/types.h"
+#include "nu/lfmpscq.h"
+#include "nu/LockFreeLIFO.h"
+#include <windows.h>
+
+namespace nu
+{
+
+ /* you can inherit from message_node_t (or combine inside a struct)
+ but make sure that your message isn't > 64 bytes */
+ struct message_node_t : public queue_node_t
+ {
+ uint32_t message;
+ };
+
+ class MessageLoop
+ {
+ public:
+ MessageLoop();
+ ~MessageLoop();
+ /* API for Message senders */
+ message_node_t *AllocateMessage(); // returns a message for you to fill out
+ void PostMessage(message_node_t *message);
+
+ /* API for Message receivers */
+ void FreeMessage(message_node_t *message);
+ message_node_t *GetMessage(); // waits forever
+ message_node_t *PeekMessage();
+ message_node_t *PeekMessage(unsigned int milliseconds);
+ private:
+ void RefillCache();
+
+ HANDLE message_notification;
+ mpscq_t message_queue;
+
+ /* Memory cache to be able to run APCs without having the memory manager lock
+ we'll allocate 100 at a time (#defined by MESSAGE_CACHE_SEED)
+ and allocate new ones only if the cache is empty (which unfortunately will lock)
+ cache_bases holds the pointers we've allocated (to free on destruction of this object)
+ and message_cache holds the individual pointers */
+ static lifo_t message_cache;
+ static lifo_t cache_bases;
+ };
+} \ No newline at end of file
diff --git a/Src/replicant/nu/win/ThreadLoop.cpp b/Src/replicant/nu/win/ThreadLoop.cpp
new file mode 100644
index 00000000..3ff89e59
--- /dev/null
+++ b/Src/replicant/nu/win/ThreadLoop.cpp
@@ -0,0 +1,146 @@
+#include "ThreadLoop.h"
+#include <limits.h>
+
+lifo_t ThreadLoop::procedure_cache = {0,};
+lifo_t ThreadLoop::cache_bases= {0,};
+
+#define PROCEDURE_CACHE_SEED 64
+ThreadLoop::ThreadLoop()
+{
+ mpscq_init(&procedure_queue);
+ procedure_notification = CreateSemaphoreW(0, 0, LONG_MAX, 0);
+ kill_switch = CreateEvent(0, TRUE, FALSE, 0);
+}
+
+ThreadLoop::~ThreadLoop()
+{
+ CloseHandle(procedure_notification);
+ CloseHandle(kill_switch);
+}
+
+void ThreadLoop::RefillCache()
+{
+ threadloop_node_t *cache_seed = (threadloop_node_t *)malloc(PROCEDURE_CACHE_SEED*sizeof(threadloop_node_t));
+
+ if (cache_seed)
+ {
+ int i=PROCEDURE_CACHE_SEED;
+ while (--i)
+ {
+ lifo_push(&procedure_cache, (queue_node_t *)&cache_seed[i]);
+ }
+ lifo_push(&cache_bases, (queue_node_t *)cache_seed);
+ }
+ else
+ {
+ Sleep(0); // yield and hope that someone else pops something off soon
+ }
+}
+
+void ThreadLoop::Run()
+{
+ HANDLE events[] = {kill_switch, procedure_notification};
+ while (WaitForMultipleObjects(2, events, FALSE, INFINITE) == WAIT_OBJECT_0 + 1)
+ {
+ for (;;)
+ {
+ threadloop_node_t *apc = (threadloop_node_t *)mpscq_pop(&procedure_queue);
+ if (apc == (threadloop_node_t *)1) /* special return value that indicates a busy list */
+ {
+ Sleep(0); // yield so that the thread that got pre-empted during push can finish
+ }
+ else
+ {
+ if (apc)
+ {
+ apc->func(apc->param1, apc->param2, apc->real_value);
+ lifo_push(&procedure_cache, apc);
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
+}
+
+void ThreadLoop::Step(unsigned int milliseconds)
+{
+ HANDLE events[] = {kill_switch, procedure_notification};
+ if (WaitForMultipleObjects(2, events, FALSE, milliseconds) == WAIT_OBJECT_0 + 1)
+ {
+ for (;;)
+ {
+ threadloop_node_t *apc = (threadloop_node_t *)mpscq_pop(&procedure_queue);
+ if (apc == (threadloop_node_t *)1) /* special return value that indicates a busy list */
+ {
+ Sleep(0); // yield so that the thread that got pre-empted during push can finish
+ }
+ else
+ {
+ if (apc)
+ {
+ apc->func(apc->param1, apc->param2, apc->real_value);
+ lifo_push(&procedure_cache, apc);
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
+}
+
+void ThreadLoop::Step()
+{
+ HANDLE events[] = {kill_switch, procedure_notification};
+ if (WaitForMultipleObjects(2, events, FALSE, INFINITE) == WAIT_OBJECT_0 + 1)
+ {
+ for (;;)
+ {
+ threadloop_node_t *apc = (threadloop_node_t *)mpscq_pop(&procedure_queue);
+ if (apc == (threadloop_node_t *)1) /* special return value that indicates a busy list */
+ {
+ Sleep(0); // yield so that the thread that got pre-empted during push can finish
+ }
+ else
+ {
+ if (apc)
+ {
+ apc->func(apc->param1, apc->param2, apc->real_value);
+ lifo_push(&procedure_cache, apc);
+ }
+ else
+ {
+ break;
+ }
+ }
+ }
+ }
+}
+
+threadloop_node_t *ThreadLoop::GetAPC()
+{
+ threadloop_node_t *apc = 0;
+
+ do
+ {
+ apc = (threadloop_node_t *)lifo_pop(&procedure_cache);
+ if (!apc)
+ RefillCache();
+ } while (!apc);
+ return apc;
+}
+
+void ThreadLoop::Schedule(threadloop_node_t *apc)
+{
+ if (mpscq_push(&procedure_queue, apc) == 0)
+ ReleaseSemaphore(procedure_notification, 1, 0);
+}
+
+void ThreadLoop::Kill()
+{
+ SetEvent(kill_switch);
+}
diff --git a/Src/replicant/nu/win/ThreadLoop.h b/Src/replicant/nu/win/ThreadLoop.h
new file mode 100644
index 00000000..22f16c3d
--- /dev/null
+++ b/Src/replicant/nu/win/ThreadLoop.h
@@ -0,0 +1,40 @@
+#pragma once
+#include "nu/lfmpscq.h"
+#include "nu/LockFreeLIFO.h"
+#include <windows.h>
+
+struct threadloop_node_t : public queue_node_t
+{
+ void (*func)(void *param1, void *param2, double real_value);
+
+ void *param1;
+ void *param2;
+ double real_value;
+};
+
+class ThreadLoop
+{
+public:
+ ThreadLoop();
+ ~ThreadLoop();
+ threadloop_node_t *GetAPC(); // returns a node for you to fill out
+ void Schedule(threadloop_node_t *apc);
+ void Step();
+ void Step(unsigned int milliseconds);
+ void Run();
+ void Kill();
+private:
+ void RefillCache();
+
+ HANDLE procedure_notification;
+ HANDLE kill_switch;
+ mpscq_t procedure_queue;
+
+ /* Memory cache to be able to run APCs without having the memory manager lock
+ we'll allocate 100 at a time (#defined by PROCEDURE_CACHE_SEED)
+ and allocate new ones only if the cache is empty (which unfortunately will lock)
+ cache_bases holds the pointers we've allocated (to free on destruction of this object)
+ and procedure_cache holds the individual pointers */
+ static lifo_t procedure_cache;
+ static lifo_t cache_bases;
+}; \ No newline at end of file