From 24cdd92a08d6519118b2fe7b0e6b66699383237a Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Sun, 2 Nov 2025 15:48:01 -0500 Subject: Integrate semaphore --- src/lib/core/include/multiprocessing.h | 3 ++- src/lib/core/multiprocessing.c | 19 +++++++++++++++++-- 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/src/lib/core/include/multiprocessing.h b/src/lib/core/include/multiprocessing.h index ff674e9..6477818 100644 --- a/src/lib/core/include/multiprocessing.h +++ b/src/lib/core/include/multiprocessing.h @@ -3,9 +3,9 @@ #define STASIS_MULTIPROCESSING_H #include "core.h" +#include "sem.h" #include #include -#include #include #include #include @@ -38,6 +38,7 @@ struct MultiProcessingPool { char ident[255]; ///< Identity of task pool char log_root[PATH_MAX]; ///< Base directory to store stderr/stdout log files int status_interval; ///< Report a pooled task is "running" every n seconds + struct Semaphore semaphore; }; /// A multiprocessing task's initial state (i.e. "FAIL") diff --git a/src/lib/core/multiprocessing.c b/src/lib/core/multiprocessing.c index 0cf251e..a3c341f 100644 --- a/src/lib/core/multiprocessing.c +++ b/src/lib/core/multiprocessing.c @@ -79,14 +79,20 @@ int parent(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, p static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task) { SYSDEBUG("Preparing to fork() child task %s:%s", pool->ident, task->ident); + semaphore_wait(&pool->semaphore); pid_t pid = fork(); + int parent_status = 0; int child_status = 0; if (pid == -1) { return -1; - } else if (pid == 0) { + } + if (pid == 0) { child(pool, task); + } else { + parent_status = parent(pool, task, pid, &child_status); } - return parent(pool, task, pid, &child_status); + semaphore_post(&pool->semaphore); + return parent_status; } struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const char *ident, char *working_dir, char *cmd) { @@ -397,6 +403,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { pool_deadlocked: puts(""); + return failures; } @@ -440,12 +447,20 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root return NULL; } + if (semaphore_init(&pool->semaphore, "stasis_pool_lock", 2) != 0) { + fprintf(stderr, "unable to initialize semaphore\n"); + mp_pool_free(&pool); + return NULL; + } + return pool; } void mp_pool_free(struct MultiProcessingPool **pool) { for (size_t i = 0; i < (*pool)->num_alloc; i++) { } + semaphore_destroy(&(*pool)->semaphore); + // Unmap all pool tasks if ((*pool)->task) { if ((*pool)->task->cmd) { -- cgit