diff options
author | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-15 16:12:02 -0400 |
---|---|---|
committer | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-18 23:07:14 -0400 |
commit | ead0756a0df1de9d2011359393234a605b6f0313 (patch) | |
tree | f2b84e3f0169f2d3914bf7f58e3712dc9caa40c2 | |
parent | 7c0b2a96a8c2cff5ddf57186f583125f5a02668b (diff) | |
download | stasis-ead0756a0df1de9d2011359393234a605b6f0313.tar.gz |
Split mp_task into to functions
-rw-r--r-- | src/multiprocessing.c | 137 |
1 files changed, 74 insertions, 63 deletions
diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 70f86be..7b3f997 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -5,73 +5,30 @@ static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessing return &pool->task[pool->num_used]; } -struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { - struct MultiProcessingTask *slot = mp_pool_next_available(pool); - if (pool->num_used != pool->num_alloc) { - pool->num_used++; - } else { - fprintf(stderr, "Maximum number of tasks reached\n"); - return NULL; - } - - memset(slot->ident, 0, sizeof(slot->ident)); - strncpy(slot->ident, ident, sizeof(slot->ident) - 1); - - strcat(slot->log_file, pool->log_root); - strcat(slot->log_file, "/"); - - FILE *tp = NULL; - char *t_name; - t_name = xmkstemp(&tp, "w"); - if (!t_name || !tp) { - return NULL; - } - - // Set the script's permissions so that only the calling user can use it - // This should help prevent eavesdropping if keys are applied in plain-text - // somewhere. - chmod(t_name, 0700); - - memset(slot->parent_script, 0, sizeof(slot->parent_script)); - strncpy(slot->parent_script, t_name, PATH_MAX - 1); - - fprintf(tp, "#!/bin/bash\n%s\n", cmd); - fflush(tp); - fclose(tp); - - char sema_name[PATH_MAX] = {0}; - sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident); - sem_unlink(sema_name); - slot->gate = sem_open(sema_name, O_CREAT, 0644, 0); - if (slot->gate == SEM_FAILED) { - perror("sem_open failed"); - exit(1); - } - +static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { pid_t pid = fork(); int child_status = 0; if (pid == -1) { - return NULL; + return -1; } else if (pid == 0) { // child process char *cwd = NULL; FILE *fp_log = NULL; // Synchronize sub-process startup // Stop here until summoned by mp_pool_join() - if (sem_wait(slot->gate) < 0) { + if (sem_wait(task->gate) < 0) { perror("sem_wait failed"); exit(1); } - // Redirect stdout and stderr to the log file fflush(stdout); fflush(stderr); - printf("[%s:%s] Task started (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); - fp_log = freopen(slot->log_file, "w+", stdout); + printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid); + fp_log = freopen(task->log_file, "w+", stdout); if (!fp_log) { - fprintf(stderr, "unable to open '%s' for writing: %s\n", slot->log_file, strerror(errno)); - return NULL; + fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno)); + return -1; } dup2(fileno(stdout), fileno(stderr)); @@ -91,7 +48,7 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char // Generate log header fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); - fprintf(fp_log, "# PID: %d\n", slot->parent_pid); + fprintf(fp_log, "# PID: %d\n", task->parent_pid); fprintf(fp_log, "# WORKDIR: %s\n", cwd); fprintf(fp_log, "# COMMAND:\n%s\n", cmd); fprintf(fp_log, "# OUTPUT:\n"); @@ -102,28 +59,77 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char // Execute task fflush(stdout); fflush(stderr); - char *args[] = {"bash", "--norc", t_name, (char *) NULL}; - slot->status = execvp("/bin/bash", args); + char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL}; + task->status = execvp("/bin/bash", args); } else { // parent process - printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, slot->ident, pid); + printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid); // Give the child process access to our PID value - slot->pid = pid; - slot->parent_pid = pid; + task->pid = pid; + task->parent_pid = pid; // Set log file name - sprintf(slot->log_file + strlen(slot->log_file), "task-%zu-%d.log", mp_global_task_count, slot->parent_pid); + sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid); mp_global_task_count++; // Check child's status pid_t code = waitpid(pid, &child_status, WUNTRACED | WCONTINUED | WNOHANG); if (code < 0) { perror("waitpid failed"); - return NULL; + return -1; } - return slot; } - return NULL; + return 0; +} + +struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { + struct MultiProcessingTask *slot = mp_pool_next_available(pool); + if (pool->num_used != pool->num_alloc) { + pool->num_used++; + } else { + fprintf(stderr, "Maximum number of tasks reached\n"); + return NULL; + } + + memset(slot->ident, 0, sizeof(slot->ident)); + strncpy(slot->ident, ident, sizeof(slot->ident) - 1); + + strcat(slot->log_file, pool->log_root); + strcat(slot->log_file, "/"); + + FILE *tp = NULL; + char *t_name; + t_name = xmkstemp(&tp, "w"); + if (!t_name || !tp) { + return NULL; + } + + // Set the script's permissions so that only the calling user can use it + // This should help prevent eavesdropping if keys are applied in plain-text + // somewhere. + chmod(t_name, 0700); + + memset(slot->parent_script, 0, sizeof(slot->parent_script)); + strncpy(slot->parent_script, t_name, PATH_MAX - 1); + guard_free(t_name); + + fprintf(tp, "#!/bin/bash\n%s\n", cmd); + fflush(tp); + fclose(tp); + + char sema_name[PATH_MAX] = {0}; + sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident); + sem_unlink(sema_name); + slot->gate = sem_open(sema_name, O_CREAT, 0644, 0); + if (slot->gate == SEM_FAILED) { + perror("sem_open failed"); + exit(1); + } + + if (mp_task_fork(pool, slot, cmd)) { + return NULL; + } + return slot; } static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) { @@ -173,12 +179,10 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { size_t tasks_complete = 0; size_t lower_i = 0; size_t upper_i = jobs; - do { - //if (upper_i >= pool->num_alloc) { + size_t hang_check = 0; if (upper_i >= pool->num_used) { size_t x = upper_i - pool->num_used; - //size_t x = upper_i - pool->num_alloc; upper_i -= (size_t) x; } for (size_t i = lower_i; i < upper_i; i++) { @@ -191,6 +195,12 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { if (slot->pid == MP_POOL_PID_UNUSED) { // PID is already used up, skip it + hang_check++; + if (hang_check >= pool->num_used) { + fprintf(stderr, "%s is deadlocked\n", pool->ident); + failures++; + goto pool_deadlocked; + } continue; } @@ -275,6 +285,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { sleep(1); } while (1); + pool_deadlocked: puts(""); return failures; } |