diff options
Diffstat (limited to 'Src/replicant/nu/win-amd64')
-rw-r--r-- | Src/replicant/nu/win-amd64/LockFreeLIFO.h | 34 | ||||
-rw-r--r-- | Src/replicant/nu/win-amd64/ThreadLoop.cpp | 70 | ||||
-rw-r--r-- | Src/replicant/nu/win-amd64/ThreadLoop.h | 38 |
3 files changed, 142 insertions, 0 deletions
diff --git a/Src/replicant/nu/win-amd64/LockFreeLIFO.h b/Src/replicant/nu/win-amd64/LockFreeLIFO.h new file mode 100644 index 00000000..5cf087e7 --- /dev/null +++ b/Src/replicant/nu/win-amd64/LockFreeLIFO.h @@ -0,0 +1,34 @@ +#pragma once + + +#include "nu/queue_node.h" +#include <windows.h> +#include <malloc.h> + +/* lock free stack object +multiple threads can push and pop without locking +note that order is not guaranteed. that is, if Thread 1 calls Insert before Thread 2, Thread 2's item might still make it in before. +*/ + +#ifdef __cplusplus +#define NU_LIFO_INLINE inline +extern "C" { +#else +#define NX_ATOMIC_INLINE +#endif + + typedef SLIST_HEADER lifo_t; + + /* use this to allocate an object that will go into this */ + NU_LIFO_INLINE static queue_node_t *lifo_malloc(size_t bytes) { return (queue_node_t *)_aligned_malloc(bytes, MEMORY_ALLOCATION_ALIGNMENT); } + NU_LIFO_INLINE static void lifo_free(queue_node_t *ptr) { _aligned_free(ptr); } + + NU_LIFO_INLINE static lifo_t *lifo_create() { return (lifo_t *)_aligned_malloc(sizeof(SLIST_HEADER), MEMORY_ALLOCATION_ALIGNMENT); } + NU_LIFO_INLINE static void lifo_destroy(lifo_t *lifo) { _aligned_free(lifo); } + NU_LIFO_INLINE static void lifo_init(lifo_t *lifo) { InitializeSListHead(lifo); } + NU_LIFO_INLINE static void lifo_push(lifo_t *lifo, queue_node_t *cl) { InterlockedPushEntrySList(lifo, cl); } + NU_LIFO_INLINE static queue_node_t *lifo_pop(lifo_t *lifo) { return InterlockedPopEntrySList(lifo); } + +#ifdef __cplusplus +} +#endif diff --git a/Src/replicant/nu/win-amd64/ThreadLoop.cpp b/Src/replicant/nu/win-amd64/ThreadLoop.cpp new file mode 100644 index 00000000..4c78a431 --- /dev/null +++ b/Src/replicant/nu/win-amd64/ThreadLoop.cpp @@ -0,0 +1,70 @@ +#include "ThreadLoop.h" + +lifo_t ThreadLoop::procedure_cache = {0,}; +lifo_t ThreadLoop::cache_bases= {0,}; + +#define PROCEDURE_CACHE_SEED 64 +ThreadLoop::ThreadLoop() +{ + mpscq_init(&procedure_queue); + procedure_notification = CreateSemaphore(0, 0, LONG_MAX, 0); + kill_switch = CreateEvent(0, TRUE, FALSE, 0); +} + +void ThreadLoop::RefillCache() +{ + threadloop_node_t *cache_seed = (threadloop_node_t *)lifo_malloc(PROCEDURE_CACHE_SEED*sizeof(threadloop_node_t)); + if (cache_seed) + { + memset(cache_seed, 0, PROCEDURE_CACHE_SEED*sizeof(threadloop_node_t)); + for (int i=0;i<PROCEDURE_CACHE_SEED;i++) + { + lifo_push(&procedure_cache, &cache_seed[i]); + } + lifo_push(&cache_bases, cache_seed); + } +} + +void ThreadLoop::Run() +{ + HANDLE events[] = {kill_switch, procedure_notification}; + while (WaitForMultipleObjects(2, events, FALSE, INFINITE) == WAIT_OBJECT_0 + 1) + { + threadloop_node_t *apc; + for (;;) + { + apc = (threadloop_node_t *)mpscq_pop(&procedure_queue); + if (!apc) + { + Sleep(0); + continue; + } + apc->func(apc->param1, apc->param2, apc->real_value); + } + lifo_push(&procedure_cache, apc); + } +} + +threadloop_node_t *ThreadLoop::GetAPC() +{ + threadloop_node_t *apc = 0; + + do + { + apc = (threadloop_node_t *)lifo_pop(&procedure_cache); + if (!apc) + RefillCache(); + } while (!apc); + return apc; +} + +void ThreadLoop::Schedule(threadloop_node_t *apc) +{ + mpscq_push(&procedure_queue, apc); + ReleaseSemaphore(procedure_notification, 1, 0); +} + +void ThreadLoop::Kill() +{ + SetEvent(kill_switch); +} diff --git a/Src/replicant/nu/win-amd64/ThreadLoop.h b/Src/replicant/nu/win-amd64/ThreadLoop.h new file mode 100644 index 00000000..f1552b51 --- /dev/null +++ b/Src/replicant/nu/win-amd64/ThreadLoop.h @@ -0,0 +1,38 @@ +#pragma once +#include "nu/lfmpscq.h" +#include "nu/LockFreeLIFO.h" +#define WIN32_LEAN_AND_MEAN +#include <windows.h> + +struct threadloop_node_t : public queue_node_t +{ + void (*func)(void *param1, void *param2, double real_value); + + void *param1; + void *param2; + double real_value; +}; + +class ThreadLoop +{ +public: + ThreadLoop(); + threadloop_node_t *GetAPC(); // returns a node for you to fill out + void Schedule(threadloop_node_t *apc); + void Run(); + void Kill(); +private: + void RefillCache(); + + HANDLE procedure_notification; + HANDLE kill_switch; + mpscq_t procedure_queue; + + /* Memory cache to be able to run APCs without having the memory manager lock + we'll allocate 100 at a time (#defined by PROCEDURE_CACHE_SEED) + and allocate new ones only if the cache is empty (which unfortunately will lock) + cache_bases holds the pointers we've allocated (to free on destruction of this object) + and procedure_cache holds the individual pointers */ + static lifo_t procedure_cache; + static lifo_t cache_bases; +};
\ No newline at end of file |