aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoseph Hunkeler <jhunkeler@gmail.com>2025-11-02 15:48:01 -0500
committerJoseph Hunkeler <jhunkeler@gmail.com>2025-11-02 19:35:54 -0500
commit24cdd92a08d6519118b2fe7b0e6b66699383237a (patch)
treefaf8fab829af14ece76b4164d385f3addabc90f5
parent825b61374c29b529fce76f5a26198c30fa229b3c (diff)
downloadstasis-24cdd92a08d6519118b2fe7b0e6b66699383237a.tar.gz
Integrate semaphore
-rw-r--r--src/lib/core/include/multiprocessing.h3
-rw-r--r--src/lib/core/multiprocessing.c19
2 files changed, 19 insertions, 3 deletions
diff --git a/src/lib/core/include/multiprocessing.h b/src/lib/core/include/multiprocessing.h
index ff674e9..6477818 100644
--- a/src/lib/core/include/multiprocessing.h
+++ b/src/lib/core/include/multiprocessing.h
@@ -3,9 +3,9 @@
#define STASIS_MULTIPROCESSING_H
#include "core.h"
+#include "sem.h"
#include <signal.h>
#include <sys/wait.h>
-#include <semaphore.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <sys/stat.h>
@@ -38,6 +38,7 @@ struct MultiProcessingPool {
char ident[255]; ///< Identity of task pool
char log_root[PATH_MAX]; ///< Base directory to store stderr/stdout log files
int status_interval; ///< Report a pooled task is "running" every n seconds
+ struct Semaphore semaphore;
};
/// A multiprocessing task's initial state (i.e. "FAIL")
diff --git a/src/lib/core/multiprocessing.c b/src/lib/core/multiprocessing.c
index 0cf251e..a3c341f 100644
--- a/src/lib/core/multiprocessing.c
+++ b/src/lib/core/multiprocessing.c
@@ -79,14 +79,20 @@ int parent(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, p
static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task) {
SYSDEBUG("Preparing to fork() child task %s:%s", pool->ident, task->ident);
+ semaphore_wait(&pool->semaphore);
pid_t pid = fork();
+ int parent_status = 0;
int child_status = 0;
if (pid == -1) {
return -1;
- } else if (pid == 0) {
+ }
+ if (pid == 0) {
child(pool, task);
+ } else {
+ parent_status = parent(pool, task, pid, &child_status);
}
- return parent(pool, task, pid, &child_status);
+ semaphore_post(&pool->semaphore);
+ return parent_status;
}
struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const char *ident, char *working_dir, char *cmd) {
@@ -397,6 +403,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) {
pool_deadlocked:
puts("");
+
return failures;
}
@@ -440,12 +447,20 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root
return NULL;
}
+ if (semaphore_init(&pool->semaphore, "stasis_pool_lock", 2) != 0) {
+ fprintf(stderr, "unable to initialize semaphore\n");
+ mp_pool_free(&pool);
+ return NULL;
+ }
+
return pool;
}
void mp_pool_free(struct MultiProcessingPool **pool) {
for (size_t i = 0; i < (*pool)->num_alloc; i++) {
}
+ semaphore_destroy(&(*pool)->semaphore);
+
// Unmap all pool tasks
if ((*pool)->task) {
if ((*pool)->task->cmd) {