diff options
author | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-17 09:32:12 -0400 |
---|---|---|
committer | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-18 23:07:14 -0400 |
commit | daf8c81c35f590f2ac145801933792a070ccbee2 (patch) | |
tree | c9570402e27990ddee331d2943fa9d319c74ffb4 | |
parent | 6921cb44f5464b59a2a169167cc63c66bcc95a9f (diff) | |
download | stasis-daf8c81c35f590f2ac145801933792a070ccbee2.tar.gz |
Add comments, remove dead code
-rw-r--r-- | src/multiprocessing.c | 26 |
1 files changed, 24 insertions, 2 deletions
diff --git a/src/multiprocessing.c b/src/multiprocessing.c index a1d961e..25b667c 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -1,6 +1,8 @@ #include "core.h" +/// The sum of all tasks queued by mp_task() size_t mp_global_task_count = 0; + static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) { return &pool->task[pool->num_used]; } @@ -107,12 +109,15 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char return NULL; } + // Set task identifier string memset(slot->ident, 0, sizeof(slot->ident)); strncpy(slot->ident, ident, sizeof(slot->ident) - 1); + // Set log file path strcat(slot->log_file, pool->log_root); strcat(slot->log_file, "/"); + // Create a temporary file to act as our intermediate command script FILE *tp = NULL; char *t_name; t_name = xmkstemp(&tp, "w"); @@ -125,14 +130,18 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char // somewhere. chmod(t_name, 0700); + // Record the script path memset(slot->parent_script, 0, sizeof(slot->parent_script)); strncpy(slot->parent_script, t_name, PATH_MAX - 1); guard_free(t_name); + // Populate the script fprintf(tp, "#!/bin/bash\n%s\n", cmd); fflush(tp); fclose(tp); + // Create a uniquely named semaphore. + // This is used by the child process to prevent task execution until mp_pool_join is called char sema_name[PATH_MAX] = {0}; sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident); sem_unlink(sema_name); @@ -142,6 +151,7 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char exit(1); } + // Execute task if (mp_task_fork(pool, slot, cmd)) { return NULL; } @@ -164,12 +174,12 @@ static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) { int mp_pool_kill(struct MultiProcessingPool *pool, int signum) { printf("Sending signal %d to pool '%s'\n", signum, pool->ident); - //for (size_t i = 0; i < pool->num_alloc; i++) { for (size_t i = 0; i < pool->num_used; i++) { struct MultiProcessingTask *slot = &pool->task[i]; if (!slot) { return -1; } + // Kill tasks in progress if (slot->pid > 0) { int status; printf("Sending signal %d to task '%s' (pid: %d)\n", signum, slot->ident, slot->pid); @@ -209,10 +219,13 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { exit(1); } + // Has the child been processed already? if (slot->pid == MP_POOL_PID_UNUSED) { - // PID is already used up, skip it + // Child is already used up, skip it hang_check++; if (hang_check >= pool->num_used) { + // Unlikely to happen when called correctly, but if you join a pool that's already finished + // it will spin forever. This protects the program from entering an infinite loop. fprintf(stderr, "%s is deadlocked\n", pool->ident); failures++; goto pool_deadlocked; @@ -314,15 +327,20 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root return NULL; } + // The pool is shared with children pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + + // Set pool identity string memset(pool->ident, 0, sizeof(pool->ident)); strncpy(pool->ident, ident, sizeof(pool->ident) - 1); + // Set logging base directory memset(pool->log_root, 0, sizeof(pool->log_root)); strncpy(pool->log_root, log_root, sizeof(pool->log_root) - 1); pool->num_used = 0; pool->num_alloc = MP_POOL_TASK_MAX; + // Create the log directory if (mkdirs(log_root, 0700) < 0) { if (errno != EEXIST) { perror(log_root); @@ -331,6 +349,7 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root } } + // Task array is shared with children pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); if (pool->task == MAP_FAILED) { perror("mmap"); @@ -343,6 +362,7 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root void mp_pool_free(struct MultiProcessingPool **pool) { for (size_t i = 0; i < (*pool)->num_alloc; i++) { + // Close all semaphores if ((*pool)->task[i].gate) { if (sem_close((*pool)->task[i].gate) < 0) { perror("sem_close failed"); @@ -350,11 +370,13 @@ void mp_pool_free(struct MultiProcessingPool **pool) { } } } + // Unmap all pool tasks if ((*pool)->task) { if (munmap((*pool)->task, sizeof(*(*pool)->task) * (*pool)->num_alloc) < 0) { perror("munmap"); } } + // Unmap the pool if ((*pool)) { if (munmap((*pool), sizeof(*(*pool))) < 0) { perror("munmap"); |