aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoseph Hunkeler <jhunkeler@gmail.com>2024-09-17 09:32:12 -0400
committerJoseph Hunkeler <jhunkeler@gmail.com>2024-09-18 23:07:14 -0400
commitdaf8c81c35f590f2ac145801933792a070ccbee2 (patch)
treec9570402e27990ddee331d2943fa9d319c74ffb4
parent6921cb44f5464b59a2a169167cc63c66bcc95a9f (diff)
downloadstasis-daf8c81c35f590f2ac145801933792a070ccbee2.tar.gz
Add comments, remove dead code
-rw-r--r--src/multiprocessing.c26
1 files changed, 24 insertions, 2 deletions
diff --git a/src/multiprocessing.c b/src/multiprocessing.c
index a1d961e..25b667c 100644
--- a/src/multiprocessing.c
+++ b/src/multiprocessing.c
@@ -1,6 +1,8 @@
#include "core.h"
+/// The sum of all tasks queued by mp_task()
size_t mp_global_task_count = 0;
+
static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) {
return &pool->task[pool->num_used];
}
@@ -107,12 +109,15 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char
return NULL;
}
+ // Set task identifier string
memset(slot->ident, 0, sizeof(slot->ident));
strncpy(slot->ident, ident, sizeof(slot->ident) - 1);
+ // Set log file path
strcat(slot->log_file, pool->log_root);
strcat(slot->log_file, "/");
+ // Create a temporary file to act as our intermediate command script
FILE *tp = NULL;
char *t_name;
t_name = xmkstemp(&tp, "w");
@@ -125,14 +130,18 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char
// somewhere.
chmod(t_name, 0700);
+ // Record the script path
memset(slot->parent_script, 0, sizeof(slot->parent_script));
strncpy(slot->parent_script, t_name, PATH_MAX - 1);
guard_free(t_name);
+ // Populate the script
fprintf(tp, "#!/bin/bash\n%s\n", cmd);
fflush(tp);
fclose(tp);
+ // Create a uniquely named semaphore.
+ // This is used by the child process to prevent task execution until mp_pool_join is called
char sema_name[PATH_MAX] = {0};
sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident);
sem_unlink(sema_name);
@@ -142,6 +151,7 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char
exit(1);
}
+ // Execute task
if (mp_task_fork(pool, slot, cmd)) {
return NULL;
}
@@ -164,12 +174,12 @@ static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) {
int mp_pool_kill(struct MultiProcessingPool *pool, int signum) {
printf("Sending signal %d to pool '%s'\n", signum, pool->ident);
- //for (size_t i = 0; i < pool->num_alloc; i++) {
for (size_t i = 0; i < pool->num_used; i++) {
struct MultiProcessingTask *slot = &pool->task[i];
if (!slot) {
return -1;
}
+ // Kill tasks in progress
if (slot->pid > 0) {
int status;
printf("Sending signal %d to task '%s' (pid: %d)\n", signum, slot->ident, slot->pid);
@@ -209,10 +219,13 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) {
exit(1);
}
+ // Has the child been processed already?
if (slot->pid == MP_POOL_PID_UNUSED) {
- // PID is already used up, skip it
+ // Child is already used up, skip it
hang_check++;
if (hang_check >= pool->num_used) {
+ // Unlikely to happen when called correctly, but if you join a pool that's already finished
+ // it will spin forever. This protects the program from entering an infinite loop.
fprintf(stderr, "%s is deadlocked\n", pool->ident);
failures++;
goto pool_deadlocked;
@@ -314,15 +327,20 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root
return NULL;
}
+ // The pool is shared with children
pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
+
+ // Set pool identity string
memset(pool->ident, 0, sizeof(pool->ident));
strncpy(pool->ident, ident, sizeof(pool->ident) - 1);
+ // Set logging base directory
memset(pool->log_root, 0, sizeof(pool->log_root));
strncpy(pool->log_root, log_root, sizeof(pool->log_root) - 1);
pool->num_used = 0;
pool->num_alloc = MP_POOL_TASK_MAX;
+ // Create the log directory
if (mkdirs(log_root, 0700) < 0) {
if (errno != EEXIST) {
perror(log_root);
@@ -331,6 +349,7 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root
}
}
+ // Task array is shared with children
pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
if (pool->task == MAP_FAILED) {
perror("mmap");
@@ -343,6 +362,7 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root
void mp_pool_free(struct MultiProcessingPool **pool) {
for (size_t i = 0; i < (*pool)->num_alloc; i++) {
+ // Close all semaphores
if ((*pool)->task[i].gate) {
if (sem_close((*pool)->task[i].gate) < 0) {
perror("sem_close failed");
@@ -350,11 +370,13 @@ void mp_pool_free(struct MultiProcessingPool **pool) {
}
}
}
+ // Unmap all pool tasks
if ((*pool)->task) {
if (munmap((*pool)->task, sizeof(*(*pool)->task) * (*pool)->num_alloc) < 0) {
perror("munmap");
}
}
+ // Unmap the pool
if ((*pool)) {
if (munmap((*pool), sizeof(*(*pool))) < 0) {
perror("munmap");