From b7251ce3bf65bcbec7ecbb98a0eb0b3c9abde507 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 13 Sep 2024 09:35:34 -0400 Subject: Move guard_ macros to core_mem.h * Move core_mem.h below config.h --- src/multiprocessing.c | 373 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 373 insertions(+) create mode 100644 src/multiprocessing.c (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c new file mode 100644 index 0000000..09190d3 --- /dev/null +++ b/src/multiprocessing.c @@ -0,0 +1,373 @@ +#include "core_mem.h" +#include "multiprocessing.h" +#include "utils.h" + +size_t mp_global_task_count = 0; +static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) { + return &pool->task[pool->num_used]; +} + +struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { + struct MultiProcessingTask *slot = mp_pool_next_available(pool); + //struct MultiProcessingTask *slot = mp_pool_any_available(pool); + pool->num_used++; + + memset(slot->ident, 0, sizeof(slot->ident)); + strncpy(slot->ident, ident, sizeof(slot->ident) - 1); + + strcat(slot->log_file, pool->log_root); + strcat(slot->log_file, "/"); + + FILE *tp = NULL; + char *t_name; + t_name = xmkstemp(&tp, "w"); + if (!t_name || !tp) { + return NULL; + } + + // Set the script's permissions so that only the calling user can use it + // This should help prevent eavesdropping if keys are applied in plain-text + // somewhere. + chmod(t_name, 0700); + + memset(slot->parent_script, 0, sizeof(slot->parent_script)); + strncpy(slot->parent_script, t_name, PATH_MAX - 1); + + fprintf(tp, "#!/bin/bash\n%s\n", cmd); + fflush(tp); + fclose(tp); + + pid_t pid = fork(); + int child_status = 0; + if (pid == -1) { + return NULL; + } else if (pid == 0) { // child process + char *cwd = NULL; + FILE *fp_log = NULL; + + // Synchronize sub-process startup + // Stop here until summoned by mp_pool_join() + if (sem_wait(slot->gate) < 0) { + perror("sem_wait failed"); + exit(1); + } + + + // Redirect stdout and stderr to the log file + fflush(stdout); + fflush(stderr); + printf("[%s:%s] Task started (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); + fp_log = freopen(slot->log_file, "w+", stdout); + if (!fp_log) { + fprintf(stderr, "unable to open '%s' for writing: %s\n", slot->log_file, strerror(errno)); + return NULL; + } + dup2(fileno(stdout), fileno(stderr)); + + // Generate timestamp for log header + time_t t = time(NULL); + char *timebuf = ctime(&t); + if (timebuf) { + // strip line feed from timestamp + timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0; + } + + cwd = getcwd(NULL, PATH_MAX); + if (!cwd) { + perror(cwd); + exit(1); + } + + // Generate log header + fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); + fprintf(fp_log, "# PID: %d\n", slot->parent_pid); + fprintf(fp_log, "# WORKDIR: %s\n", cwd); + fprintf(fp_log, "# COMMAND:\n%s\n", cmd); + fprintf(fp_log, "# OUTPUT:\n"); + // Commit header to log file / clean up + fflush(fp_log); + guard_free(cwd); + + // Execute task + fflush(stdout); + fflush(stderr); + char *args[] = {"bash", "--norc", t_name, (char *) NULL}; + slot->status = execvp("/bin/bash", args); + } else { // parent process + printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, slot->ident, pid); + + // Give the child process access to our PID value + slot->pid = pid; + slot->parent_pid = pid; + + // Set log file name + sprintf(slot->log_file + strlen(slot->log_file), "task-%zu-%d.log", mp_global_task_count, slot->parent_pid); + mp_global_task_count++; + + // Check child's status + pid_t code = waitpid(pid, &child_status, WUNTRACED | WCONTINUED | WNOHANG); + if (code < 0) { + perror("waitpid failed"); + return NULL; + } + return slot; + } + return NULL; +} + +static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) { + FILE *fp = fopen(task->log_file, "r"); + if (!fp) { + return -1; + } + char buf[BUFSIZ] = {0}; + while ((fgets(buf, sizeof(buf) - 1, fp)) != NULL) { + fprintf(stream, "%s", buf); + memset(buf, 0, sizeof(buf)); + } + fprintf(stream, "\n"); + return 0; +} + +int mp_pool_kill(struct MultiProcessingPool *pool, int signum) { + printf("Sending signal %d to pool '%s'\n", signum, pool->ident); + //for (size_t i = 0; i < pool->num_alloc; i++) { + for (size_t i = 0; i < pool->num_used; i++) { + struct MultiProcessingTask *slot = &pool->task[i]; + if (!slot) { + return -1; + } + if (slot->pid > 0) { + int status; + printf("Sending signal %d to task '%s' (pid: %d)\n", signum, slot->ident, slot->pid); + status = kill(slot->pid, signum); + if (status && errno != ESRCH) { + fprintf(stderr, "Task '%s' (pid: %d) did not respond: %s\n", slot->ident, slot->pid, strerror(errno)); + } + } + if (!access(slot->log_file, F_OK)) { + remove(slot->log_file); + } + if (!access(slot->parent_script, F_OK)) { + remove(slot->parent_script); + } + } + return 0; +} + +int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { + int status = 0; + int watcher = 0; + int failures = 0; + size_t tasks_complete = 0; + size_t lower_i = 0; + size_t upper_i = jobs; + + do { + //if (upper_i >= pool->num_alloc) { + if (upper_i >= pool->num_used) { + size_t x = upper_i - pool->num_used; + //size_t x = upper_i - pool->num_alloc; + upper_i -= (size_t) x; + } + for (size_t i = lower_i; i < upper_i; i++) { + struct MultiProcessingTask *slot = &pool->task[i]; + // Unlock the semaphore to allow the queued processes to start up + if (sem_post(slot->gate) < 0) { + perror("sem_post failed"); + exit(1); + } + + if (slot->pid == MP_POOL_PID_UNUSED) { + // PID is already used up, skip it + continue; + } + + // Is the process finished? + pid_t pid = waitpid(slot->pid, &status, WNOHANG | WUNTRACED | WCONTINUED); + + // Update status + slot->status = status; + + char progress[1024] = {0}; + if (pid > 0) { + double percent = ((double) (tasks_complete) / (double) pool->num_used) * 100; + //double percent = ((double) (tasks_complete) / (double) pool->num_alloc) * 100; + sprintf(progress, "[%s:%s] [%3.1f%%]", pool->ident, slot->ident, percent); + + // The process ended in one the following ways + // Note: SIGSTOP nor SIGCONT will not increment the tasks_complete counter + if (WIFEXITED(status)) { + printf("%s Task finished (status: %d)\n", progress, WEXITSTATUS(status)); + tasks_complete++; + } else if (WIFSIGNALED(status)) { + printf("%s Task ended by signal %d (%s)\n", progress, WTERMSIG(status), strsignal(WTERMSIG(status))); + tasks_complete++; + } else if (WIFSTOPPED(status)) { + printf("%s Task was suspended (%d)\n", progress, WSTOPSIG(status)); + continue; + } else if (WIFCONTINUED(status)) { + printf("%s Task was resumed\n", progress); + continue; + } else { + fprintf(stderr, "%s Task state is unknown (0x%04X)\n", progress, status); + } + + // Show the log (always) + if (show_log_contents(stdout, slot)) { + perror(slot->log_file); + } + if (status >> 8 != 0 || (status & 0xff) != 0) { + fprintf(stderr, "%s Task failed\n", progress); + if (flags & MP_POOL_FAIL_FAST) { + mp_pool_kill(pool, SIGTERM); + return -2; + } + } else { + printf("%s Task finished\n", progress); + } + + // Clean up logs and scripts left behind by the task + if (remove(slot->log_file)) { + fprintf(stderr, "%s Unable to remove log file: '%s': %s\n", progress, slot->parent_script, strerror(errno)); + } + if (remove(slot->parent_script)) { + fprintf(stderr, "%s Unable to remove temporary script '%s': %s\n", progress, slot->parent_script, strerror(errno)); + } + + // Update progress and tell the poller to ignore the PID. The process is gone. + slot->pid = MP_POOL_PID_UNUSED; + failures += status; + } else if (pid < 0) { + fprintf(stderr, "waitpid failed: %s\n", strerror(errno)); + return -1; + } else { + if (watcher > 9) { + printf("[%s:%s] Task is running (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); + watcher = 0; + } else { + watcher++; + } + } + } + + if (tasks_complete == pool->num_used) { + break; + } + + if (tasks_complete == upper_i) { + lower_i += jobs; + upper_i += jobs; + } + + // Poll again after a short delay + sleep(1); + } while (1); + + puts(""); + return failures; +} + + +struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root) { + struct MultiProcessingPool *pool; + + if (!log_root) { + // log_root must be set + return NULL; + } + + //pool = malloc(1 * sizeof(*pool)); + pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); + pool->ident = ident; + pool->log_root = log_root; + pool->num_used = 0; + pool->num_alloc = MP_POOL_TASK_MAX; + + if (mkdirs(log_root, 0700) < 0) { + if (errno != EEXIST) { + perror(log_root); + mp_pool_free(&pool); + return NULL; + } + } + + //pool->task = calloc(pool->num_alloc + 1, sizeof(*pool->task)); + pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); + if (pool->task == MAP_FAILED) { + perror("mmap"); + mp_pool_free(&pool); + return NULL; + } + + for (size_t i = 0; i < pool->num_alloc; i++) { + struct MultiProcessingTask *slot = &pool->task[i]; + slot->gate = mmap(NULL, sizeof(*slot->gate), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); + if (slot->gate == MAP_FAILED) { + perror("mmap failed"); + exit(1); + } + + if (sem_init(slot->gate, 1, 0) < 0) { + perror("sem_init failed"); + exit(1); + } + } + + + return pool; +} + +void mp_pool_free(struct MultiProcessingPool **pool) { + //for (size_t i = 0; i < (*pool)->num_used; i++) { + for (size_t i = 0; i < (*pool)->num_alloc; i++) { + if ((*pool)->task[i].gate) { + if (sem_destroy((*pool)->task[i].gate) < 0) { + perror("sem_destroy failed"); + exit(1); + } + if (munmap((*pool)->task[i].gate, sizeof(*(*pool)->task[i].gate)) < 0) { + perror("munmap"); + exit(1); + } + } + } + if ((*pool)->task) { + if (munmap((*pool)->task, sizeof(*(*pool)->task) * (*pool)->num_alloc) < 0) { + perror("munmap"); + } + } + if ((*pool)) { + if (munmap((*pool), sizeof(*(*pool))) < 0) { + perror("munmap"); + } + (*pool) = NULL; + } +} + +int exmain(int argc, char *argv[]) { + size_t i; + struct MultiProcessingPool *pool = mp_pool_init("generic", "logs"); + if (!pool) { + perror("pool init"); + exit(1); + } + + char *commands[] = { + "sleep 2; dd if=/dev/zero of=file.dat bs=1M count=1", + "/bin/echo hello world; sleep 5", + "python -c 'print(1+1)'", + "(for x in {1..10}; do echo $x; sleep 0.5; done)", + "echo stdout >&1; echo stderr >&2; exit 1" + }; + + for (i = 0; i < sizeof(commands) / sizeof(*commands); i++) { + if (mp_task(pool, "commands array", commands[i]) == NULL) { + printf("Too many tasks queued (max: %d)\n", MP_POOL_TASK_MAX); + break; + } + } + mp_pool_join(pool, get_cpu_count() - 1, MP_POOL_FAIL_FAST); + mp_pool_free(&pool); + return 0; +} -- cgit From 87971633eea3306a0b85a55d7d581841b9dbf905 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 13 Sep 2024 10:10:21 -0400 Subject: Fixing headers --- src/multiprocessing.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 09190d3..6793f2d 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -1,6 +1,4 @@ -#include "core_mem.h" #include "multiprocessing.h" -#include "utils.h" size_t mp_global_task_count = 0; static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) { -- cgit From 1fe385d782ae117d2a68266e14777d890eddf4e0 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 13 Sep 2024 12:16:41 -0400 Subject: Darwin portability: Use sem_open and sem_close instead of sem_init and sem_destroy --- src/multiprocessing.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 6793f2d..0a91a56 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -306,7 +306,11 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root exit(1); } - if (sem_init(slot->gate, 1, 0) < 0) { + char sema_name[PATH_MAX] = {0}; + sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident); + sem_unlink(sema_name); + slot->gate = sem_open(sema_name, O_CREAT, 0600, sizeof(*slot->gate)); + if (slot->gate == SEM_FAILED) { perror("sem_init failed"); exit(1); } @@ -317,11 +321,10 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root } void mp_pool_free(struct MultiProcessingPool **pool) { - //for (size_t i = 0; i < (*pool)->num_used; i++) { for (size_t i = 0; i < (*pool)->num_alloc; i++) { if ((*pool)->task[i].gate) { - if (sem_destroy((*pool)->task[i].gate) < 0) { - perror("sem_destroy failed"); + if (sem_close((*pool)->task[i].gate) < 0) { + perror("sem_close failed"); exit(1); } if (munmap((*pool)->task[i].gate, sizeof(*(*pool)->task[i].gate)) < 0) { -- cgit From 4e0e40bf54f68a98b2cfbb419c8d1cbabf7986ba Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 13 Sep 2024 12:22:43 -0400 Subject: Darwin: Remove mmap MAP_POPULATE flag --- src/multiprocessing.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 0a91a56..76bd241 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -276,7 +276,7 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root } //pool = malloc(1 * sizeof(*pool)); - pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); + pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); pool->ident = ident; pool->log_root = log_root; pool->num_used = 0; @@ -291,7 +291,7 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root } //pool->task = calloc(pool->num_alloc + 1, sizeof(*pool->task)); - pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); + pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); if (pool->task == MAP_FAILED) { perror("mmap"); mp_pool_free(&pool); @@ -300,7 +300,7 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root for (size_t i = 0; i < pool->num_alloc; i++) { struct MultiProcessingTask *slot = &pool->task[i]; - slot->gate = mmap(NULL, sizeof(*slot->gate), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); + slot->gate = mmap(NULL, sizeof(*slot->gate), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); if (slot->gate == MAP_FAILED) { perror("mmap failed"); exit(1); -- cgit From 17d3d0517123f5f07b4ac6bb9f1dec73c1c8ce4c Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 13 Sep 2024 13:36:09 -0400 Subject: Fix sem_open initial state * Move slot->gate assignment to mp_pool_task() * Remove mmap() to slot->gate. * Change type of ident and log_root variables for the sake of easy (fewer maps) --- src/multiprocessing.c | 41 ++++++++++++++--------------------------- 1 file changed, 14 insertions(+), 27 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 76bd241..ad6dec5 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -35,6 +35,15 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char fflush(tp); fclose(tp); + char sema_name[PATH_MAX] = {0}; + sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident); + sem_unlink(sema_name); + slot->gate = sem_open(sema_name, O_CREAT, 0644, 0); + if (slot->gate == SEM_FAILED) { + perror("sem_open failed"); + exit(1); + } + pid_t pid = fork(); int child_status = 0; if (pid == -1) { @@ -275,10 +284,12 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root return NULL; } - //pool = malloc(1 * sizeof(*pool)); pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); - pool->ident = ident; - pool->log_root = log_root; + memset(pool->ident, 0, sizeof(pool->ident)); + strncpy(pool->ident, ident, sizeof(pool->ident) - 1); + + memset(pool->log_root, 0, sizeof(pool->log_root)); + strncpy(pool->log_root, log_root, sizeof(pool->log_root) - 1); pool->num_used = 0; pool->num_alloc = MP_POOL_TASK_MAX; @@ -290,7 +301,6 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root } } - //pool->task = calloc(pool->num_alloc + 1, sizeof(*pool->task)); pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); if (pool->task == MAP_FAILED) { perror("mmap"); @@ -298,25 +308,6 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root return NULL; } - for (size_t i = 0; i < pool->num_alloc; i++) { - struct MultiProcessingTask *slot = &pool->task[i]; - slot->gate = mmap(NULL, sizeof(*slot->gate), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); - if (slot->gate == MAP_FAILED) { - perror("mmap failed"); - exit(1); - } - - char sema_name[PATH_MAX] = {0}; - sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident); - sem_unlink(sema_name); - slot->gate = sem_open(sema_name, O_CREAT, 0600, sizeof(*slot->gate)); - if (slot->gate == SEM_FAILED) { - perror("sem_init failed"); - exit(1); - } - } - - return pool; } @@ -327,10 +318,6 @@ void mp_pool_free(struct MultiProcessingPool **pool) { perror("sem_close failed"); exit(1); } - if (munmap((*pool)->task[i].gate, sizeof(*(*pool)->task[i].gate)) < 0) { - perror("munmap"); - exit(1); - } } } if ((*pool)->task) { -- cgit From 63cf3148cc3f60edf5c604a562ec95b40b8cb010 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 13 Sep 2024 15:01:29 -0400 Subject: Add multiprocessing.h to core.h * Remove multiprocessing.h from other files --- src/multiprocessing.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index ad6dec5..bdd74ba 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -1,4 +1,4 @@ -#include "multiprocessing.h" +#include "core.h" size_t mp_global_task_count = 0; static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) { -- cgit From 7c0b2a96a8c2cff5ddf57186f583125f5a02668b Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 13 Sep 2024 17:37:20 -0400 Subject: Guard against overrun --- src/multiprocessing.c | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index bdd74ba..70f86be 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -7,8 +7,12 @@ static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessing struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { struct MultiProcessingTask *slot = mp_pool_next_available(pool); - //struct MultiProcessingTask *slot = mp_pool_any_available(pool); - pool->num_used++; + if (pool->num_used != pool->num_alloc) { + pool->num_used++; + } else { + fprintf(stderr, "Maximum number of tasks reached\n"); + return NULL; + } memset(slot->ident, 0, sizeof(slot->ident)); strncpy(slot->ident, ident, sizeof(slot->ident) - 1); -- cgit From ead0756a0df1de9d2011359393234a605b6f0313 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Sun, 15 Sep 2024 16:12:02 -0400 Subject: Split mp_task into to functions --- src/multiprocessing.c | 137 +++++++++++++++++++++++++++----------------------- 1 file changed, 74 insertions(+), 63 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 70f86be..7b3f997 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -5,73 +5,30 @@ static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessing return &pool->task[pool->num_used]; } -struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { - struct MultiProcessingTask *slot = mp_pool_next_available(pool); - if (pool->num_used != pool->num_alloc) { - pool->num_used++; - } else { - fprintf(stderr, "Maximum number of tasks reached\n"); - return NULL; - } - - memset(slot->ident, 0, sizeof(slot->ident)); - strncpy(slot->ident, ident, sizeof(slot->ident) - 1); - - strcat(slot->log_file, pool->log_root); - strcat(slot->log_file, "/"); - - FILE *tp = NULL; - char *t_name; - t_name = xmkstemp(&tp, "w"); - if (!t_name || !tp) { - return NULL; - } - - // Set the script's permissions so that only the calling user can use it - // This should help prevent eavesdropping if keys are applied in plain-text - // somewhere. - chmod(t_name, 0700); - - memset(slot->parent_script, 0, sizeof(slot->parent_script)); - strncpy(slot->parent_script, t_name, PATH_MAX - 1); - - fprintf(tp, "#!/bin/bash\n%s\n", cmd); - fflush(tp); - fclose(tp); - - char sema_name[PATH_MAX] = {0}; - sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident); - sem_unlink(sema_name); - slot->gate = sem_open(sema_name, O_CREAT, 0644, 0); - if (slot->gate == SEM_FAILED) { - perror("sem_open failed"); - exit(1); - } - +static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { pid_t pid = fork(); int child_status = 0; if (pid == -1) { - return NULL; + return -1; } else if (pid == 0) { // child process char *cwd = NULL; FILE *fp_log = NULL; // Synchronize sub-process startup // Stop here until summoned by mp_pool_join() - if (sem_wait(slot->gate) < 0) { + if (sem_wait(task->gate) < 0) { perror("sem_wait failed"); exit(1); } - // Redirect stdout and stderr to the log file fflush(stdout); fflush(stderr); - printf("[%s:%s] Task started (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); - fp_log = freopen(slot->log_file, "w+", stdout); + printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid); + fp_log = freopen(task->log_file, "w+", stdout); if (!fp_log) { - fprintf(stderr, "unable to open '%s' for writing: %s\n", slot->log_file, strerror(errno)); - return NULL; + fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno)); + return -1; } dup2(fileno(stdout), fileno(stderr)); @@ -91,7 +48,7 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char // Generate log header fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); - fprintf(fp_log, "# PID: %d\n", slot->parent_pid); + fprintf(fp_log, "# PID: %d\n", task->parent_pid); fprintf(fp_log, "# WORKDIR: %s\n", cwd); fprintf(fp_log, "# COMMAND:\n%s\n", cmd); fprintf(fp_log, "# OUTPUT:\n"); @@ -102,28 +59,77 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char // Execute task fflush(stdout); fflush(stderr); - char *args[] = {"bash", "--norc", t_name, (char *) NULL}; - slot->status = execvp("/bin/bash", args); + char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL}; + task->status = execvp("/bin/bash", args); } else { // parent process - printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, slot->ident, pid); + printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid); // Give the child process access to our PID value - slot->pid = pid; - slot->parent_pid = pid; + task->pid = pid; + task->parent_pid = pid; // Set log file name - sprintf(slot->log_file + strlen(slot->log_file), "task-%zu-%d.log", mp_global_task_count, slot->parent_pid); + sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid); mp_global_task_count++; // Check child's status pid_t code = waitpid(pid, &child_status, WUNTRACED | WCONTINUED | WNOHANG); if (code < 0) { perror("waitpid failed"); - return NULL; + return -1; } - return slot; } - return NULL; + return 0; +} + +struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { + struct MultiProcessingTask *slot = mp_pool_next_available(pool); + if (pool->num_used != pool->num_alloc) { + pool->num_used++; + } else { + fprintf(stderr, "Maximum number of tasks reached\n"); + return NULL; + } + + memset(slot->ident, 0, sizeof(slot->ident)); + strncpy(slot->ident, ident, sizeof(slot->ident) - 1); + + strcat(slot->log_file, pool->log_root); + strcat(slot->log_file, "/"); + + FILE *tp = NULL; + char *t_name; + t_name = xmkstemp(&tp, "w"); + if (!t_name || !tp) { + return NULL; + } + + // Set the script's permissions so that only the calling user can use it + // This should help prevent eavesdropping if keys are applied in plain-text + // somewhere. + chmod(t_name, 0700); + + memset(slot->parent_script, 0, sizeof(slot->parent_script)); + strncpy(slot->parent_script, t_name, PATH_MAX - 1); + guard_free(t_name); + + fprintf(tp, "#!/bin/bash\n%s\n", cmd); + fflush(tp); + fclose(tp); + + char sema_name[PATH_MAX] = {0}; + sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident); + sem_unlink(sema_name); + slot->gate = sem_open(sema_name, O_CREAT, 0644, 0); + if (slot->gate == SEM_FAILED) { + perror("sem_open failed"); + exit(1); + } + + if (mp_task_fork(pool, slot, cmd)) { + return NULL; + } + return slot; } static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) { @@ -173,12 +179,10 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { size_t tasks_complete = 0; size_t lower_i = 0; size_t upper_i = jobs; - do { - //if (upper_i >= pool->num_alloc) { + size_t hang_check = 0; if (upper_i >= pool->num_used) { size_t x = upper_i - pool->num_used; - //size_t x = upper_i - pool->num_alloc; upper_i -= (size_t) x; } for (size_t i = lower_i; i < upper_i; i++) { @@ -191,6 +195,12 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { if (slot->pid == MP_POOL_PID_UNUSED) { // PID is already used up, skip it + hang_check++; + if (hang_check >= pool->num_used) { + fprintf(stderr, "%s is deadlocked\n", pool->ident); + failures++; + goto pool_deadlocked; + } continue; } @@ -275,6 +285,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { sleep(1); } while (1); + pool_deadlocked: puts(""); return failures; } -- cgit From 6921cb44f5464b59a2a169167cc63c66bcc95a9f Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Tue, 17 Sep 2024 09:30:16 -0400 Subject: Break parent/child calls into static functions --- src/multiprocessing.c | 157 +++++++++++++++++++++++++++----------------------- 1 file changed, 86 insertions(+), 71 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 7b3f997..a1d961e 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -5,83 +5,99 @@ static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessing return &pool->task[pool->num_used]; } -static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { - pid_t pid = fork(); - int child_status = 0; - if (pid == -1) { +int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { + char *cwd = NULL; + FILE *fp_log = NULL; + + // Synchronize sub-process startup + // Stop here until summoned by mp_pool_join() + if (sem_wait(task->gate) < 0) { + perror("sem_wait failed"); + exit(1); + } + + // Record the task start time + if (clock_gettime(CLOCK_REALTIME, &task->time_data.t_start) < 0) { + perror("clock_gettime"); + exit(1); + } + + // Redirect stdout and stderr to the log file + fflush(stdout); + fflush(stderr); + printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid); + fp_log = freopen(task->log_file, "w+", stdout); + if (!fp_log) { + fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno)); return -1; - } else if (pid == 0) { // child process - char *cwd = NULL; - FILE *fp_log = NULL; - - // Synchronize sub-process startup - // Stop here until summoned by mp_pool_join() - if (sem_wait(task->gate) < 0) { - perror("sem_wait failed"); - exit(1); - } + } + dup2(fileno(stdout), fileno(stderr)); + + // Generate timestamp for log header + time_t t = time(NULL); + char *timebuf = ctime(&t); + if (timebuf) { + // strip line feed from timestamp + timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0; + } - // Redirect stdout and stderr to the log file - fflush(stdout); - fflush(stderr); - printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid); - fp_log = freopen(task->log_file, "w+", stdout); - if (!fp_log) { - fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno)); - return -1; - } - dup2(fileno(stdout), fileno(stderr)); - - // Generate timestamp for log header - time_t t = time(NULL); - char *timebuf = ctime(&t); - if (timebuf) { - // strip line feed from timestamp - timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0; - } + // Get the path to the current working directory + // Used by the log header. Informative only. + cwd = getcwd(NULL, PATH_MAX); + if (!cwd) { + perror(cwd); + exit(1); + } - cwd = getcwd(NULL, PATH_MAX); - if (!cwd) { - perror(cwd); - exit(1); - } + // Generate log header + fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); + fprintf(fp_log, "# PID: %d\n", task->parent_pid); + fprintf(fp_log, "# WORKDIR: %s\n", cwd); + fprintf(fp_log, "# COMMAND:\n%s\n", cmd); + fprintf(fp_log, "# OUTPUT:\n"); + // Commit header to log file / clean up + fflush(fp_log); + guard_free(cwd); + + // Execute task + fflush(stdout); + fflush(stderr); + char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL}; + task->status = execvp("/bin/bash", args); + return 0; // NOP return to satisfy the compiler +} - // Generate log header - fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); - fprintf(fp_log, "# PID: %d\n", task->parent_pid); - fprintf(fp_log, "# WORKDIR: %s\n", cwd); - fprintf(fp_log, "# COMMAND:\n%s\n", cmd); - fprintf(fp_log, "# OUTPUT:\n"); - // Commit header to log file / clean up - fflush(fp_log); - guard_free(cwd); - - // Execute task - fflush(stdout); - fflush(stderr); - char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL}; - task->status = execvp("/bin/bash", args); - } else { // parent process - printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid); - - // Give the child process access to our PID value - task->pid = pid; - task->parent_pid = pid; - - // Set log file name - sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid); - mp_global_task_count++; - - // Check child's status - pid_t code = waitpid(pid, &child_status, WUNTRACED | WCONTINUED | WNOHANG); - if (code < 0) { - perror("waitpid failed"); - return -1; - } +int parent(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, pid_t pid, int *child_status) { + printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid); + + // Give the child process access to our PID value + task->pid = pid; + task->parent_pid = pid; + + // Set log file name + sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid); + mp_global_task_count++; + + // Check child's status + pid_t code = waitpid(pid, child_status, WUNTRACED | WCONTINUED | WNOHANG); + if (code < 0) { + perror("waitpid failed"); + return -1; } return 0; } +static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { + pid_t pid = fork(); + int child_status = 0; + if (pid == -1) { + return -1; + } else if (pid == 0) { + child(pool, task, cmd); + } + return parent(pool, task, pid, &child_status); +} + struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { struct MultiProcessingTask *slot = mp_pool_next_available(pool); if (pool->num_used != pool->num_alloc) { @@ -212,8 +228,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { char progress[1024] = {0}; if (pid > 0) { - double percent = ((double) (tasks_complete) / (double) pool->num_used) * 100; - //double percent = ((double) (tasks_complete) / (double) pool->num_alloc) * 100; + double percent = ((double) (tasks_complete + 1) / (double) pool->num_used) * 100; sprintf(progress, "[%s:%s] [%3.1f%%]", pool->ident, slot->ident, percent); // The process ended in one the following ways -- cgit From daf8c81c35f590f2ac145801933792a070ccbee2 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Tue, 17 Sep 2024 09:32:12 -0400 Subject: Add comments, remove dead code --- src/multiprocessing.c | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index a1d961e..25b667c 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -1,6 +1,8 @@ #include "core.h" +/// The sum of all tasks queued by mp_task() size_t mp_global_task_count = 0; + static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) { return &pool->task[pool->num_used]; } @@ -107,12 +109,15 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char return NULL; } + // Set task identifier string memset(slot->ident, 0, sizeof(slot->ident)); strncpy(slot->ident, ident, sizeof(slot->ident) - 1); + // Set log file path strcat(slot->log_file, pool->log_root); strcat(slot->log_file, "/"); + // Create a temporary file to act as our intermediate command script FILE *tp = NULL; char *t_name; t_name = xmkstemp(&tp, "w"); @@ -125,14 +130,18 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char // somewhere. chmod(t_name, 0700); + // Record the script path memset(slot->parent_script, 0, sizeof(slot->parent_script)); strncpy(slot->parent_script, t_name, PATH_MAX - 1); guard_free(t_name); + // Populate the script fprintf(tp, "#!/bin/bash\n%s\n", cmd); fflush(tp); fclose(tp); + // Create a uniquely named semaphore. + // This is used by the child process to prevent task execution until mp_pool_join is called char sema_name[PATH_MAX] = {0}; sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident); sem_unlink(sema_name); @@ -142,6 +151,7 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char exit(1); } + // Execute task if (mp_task_fork(pool, slot, cmd)) { return NULL; } @@ -164,12 +174,12 @@ static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) { int mp_pool_kill(struct MultiProcessingPool *pool, int signum) { printf("Sending signal %d to pool '%s'\n", signum, pool->ident); - //for (size_t i = 0; i < pool->num_alloc; i++) { for (size_t i = 0; i < pool->num_used; i++) { struct MultiProcessingTask *slot = &pool->task[i]; if (!slot) { return -1; } + // Kill tasks in progress if (slot->pid > 0) { int status; printf("Sending signal %d to task '%s' (pid: %d)\n", signum, slot->ident, slot->pid); @@ -209,10 +219,13 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { exit(1); } + // Has the child been processed already? if (slot->pid == MP_POOL_PID_UNUSED) { - // PID is already used up, skip it + // Child is already used up, skip it hang_check++; if (hang_check >= pool->num_used) { + // Unlikely to happen when called correctly, but if you join a pool that's already finished + // it will spin forever. This protects the program from entering an infinite loop. fprintf(stderr, "%s is deadlocked\n", pool->ident); failures++; goto pool_deadlocked; @@ -314,15 +327,20 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root return NULL; } + // The pool is shared with children pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + + // Set pool identity string memset(pool->ident, 0, sizeof(pool->ident)); strncpy(pool->ident, ident, sizeof(pool->ident) - 1); + // Set logging base directory memset(pool->log_root, 0, sizeof(pool->log_root)); strncpy(pool->log_root, log_root, sizeof(pool->log_root) - 1); pool->num_used = 0; pool->num_alloc = MP_POOL_TASK_MAX; + // Create the log directory if (mkdirs(log_root, 0700) < 0) { if (errno != EEXIST) { perror(log_root); @@ -331,6 +349,7 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root } } + // Task array is shared with children pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); if (pool->task == MAP_FAILED) { perror("mmap"); @@ -343,6 +362,7 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root void mp_pool_free(struct MultiProcessingPool **pool) { for (size_t i = 0; i < (*pool)->num_alloc; i++) { + // Close all semaphores if ((*pool)->task[i].gate) { if (sem_close((*pool)->task[i].gate) < 0) { perror("sem_close failed"); @@ -350,11 +370,13 @@ void mp_pool_free(struct MultiProcessingPool **pool) { } } } + // Unmap all pool tasks if ((*pool)->task) { if (munmap((*pool)->task, sizeof(*(*pool)->task) * (*pool)->num_alloc) < 0) { perror("munmap"); } } + // Unmap the pool if ((*pool)) { if (munmap((*pool), sizeof(*(*pool))) < 0) { perror("munmap"); -- cgit From 3c3468d0c4bba45ed8626894613cc44e5fcd51c0 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Tue, 17 Sep 2024 09:32:17 -0400 Subject: Set task status to -1 by default --- src/multiprocessing.c | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 25b667c..0c993d7 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -109,6 +109,9 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char return NULL; } + // Set default status to "error" + slot->status = -1; + // Set task identifier string memset(slot->ident, 0, sizeof(slot->ident)); strncpy(slot->ident, ident, sizeof(slot->ident) - 1); -- cgit From 0f95b43e2d3853995106c5c6aa55bf92f63fb331 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Tue, 17 Sep 2024 09:35:07 -0400 Subject: Wait for signaled processes to hang up * Only initiate a kill if we have more than one process. The current process is already failed out, no need to terminate it again. --- src/multiprocessing.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 0c993d7..fe15ab6 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -189,6 +189,10 @@ int mp_pool_kill(struct MultiProcessingPool *pool, int signum) { status = kill(slot->pid, signum); if (status && errno != ESRCH) { fprintf(stderr, "Task '%s' (pid: %d) did not respond: %s\n", slot->ident, slot->pid, strerror(errno)); + // Wait for process to handle the signal, then set the status accordingly + if (waitpid(slot->pid, &status, 0) >= 0) { + slot->signaled_by = WTERMSIG(status); + } } } if (!access(slot->log_file, F_OK)) { @@ -271,7 +275,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { } if (status >> 8 != 0 || (status & 0xff) != 0) { fprintf(stderr, "%s Task failed\n", progress); - if (flags & MP_POOL_FAIL_FAST) { + if (flags & MP_POOL_FAIL_FAST && pool->num_used > 1) { mp_pool_kill(pool, SIGTERM); return -2; } -- cgit From db1a3056296ea3ed13c5a425cf1f11602b43a6c7 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Tue, 17 Sep 2024 09:37:14 -0400 Subject: Add pool summary and elapsed time output * Add get_task_duration() * Add get_pool_show_summary() * Add signaled_by member to MultiProcessingTask * Add time_data member to MultiProcessingTask for duration tracking --- src/multiprocessing.c | 48 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index fe15ab6..5e605a6 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -161,6 +161,41 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char return slot; } +static void get_task_duration(struct MultiProcessingTask *task, struct timespec *result) { + struct timespec *start = &task->time_data.t_start; + struct timespec *stop = &task->time_data.t_stop; + result->tv_sec = (stop->tv_sec - start->tv_sec); + result->tv_nsec = (stop->tv_nsec - start->tv_nsec); + if (result->tv_nsec < 0) { + --result->tv_sec; + result->tv_nsec += 1000000000L; + } +} + +void mp_pool_show_summary(struct MultiProcessingPool *pool) { + print_banner("=", 79); + printf("Pool execution summary for \"%s\"\n", pool->ident); + print_banner("=", 79); + printf("STATUS PID DURATION IDENT\n"); + for (size_t i = 0; i < pool->num_used; i++) { + struct MultiProcessingTask *task = &pool->task[i]; + char status_str[10] = {0}; + if (!task->status && !task->signaled_by) { + strcpy(status_str, "DONE"); + } else if (task->signaled_by) { + strcpy(status_str, "TERM"); + } else { + strcpy(status_str, "FAIL"); + } + + struct timespec duration; + get_task_duration(task, &duration); + long diff = duration.tv_sec + duration.tv_nsec / 1000000000L; + printf("%-4s %-10d %7lds %-10s\n", status_str, task->parent_pid, diff, task->ident) ; + } + puts(""); +} + static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) { FILE *fp = fopen(task->log_file, "r"); if (!fp) { @@ -189,9 +224,15 @@ int mp_pool_kill(struct MultiProcessingPool *pool, int signum) { status = kill(slot->pid, signum); if (status && errno != ESRCH) { fprintf(stderr, "Task '%s' (pid: %d) did not respond: %s\n", slot->ident, slot->pid, strerror(errno)); + } else { // Wait for process to handle the signal, then set the status accordingly if (waitpid(slot->pid, &status, 0) >= 0) { slot->signaled_by = WTERMSIG(status); + // Record the task stop time + if (clock_gettime(CLOCK_REALTIME, &slot->time_data.t_stop) < 0) { + perror("clock_gettime"); + exit(1); + } } } } @@ -273,6 +314,13 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { if (show_log_contents(stdout, slot)) { perror(slot->log_file); } + + // Record the task stop time + if (clock_gettime(CLOCK_REALTIME, &slot->time_data.t_stop) < 0) { + perror("clock_gettime"); + exit(1); + } + if (status >> 8 != 0 || (status & 0xff) != 0) { fprintf(stderr, "%s Task failed\n", progress); if (flags & MP_POOL_FAIL_FAST && pool->num_used > 1) { -- cgit From 60c0a3c8607b835f73e77d89b5f67a8e6bd5c8b3 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Tue, 17 Sep 2024 10:12:02 -0400 Subject: Fix format spacing --- src/multiprocessing.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 5e605a6..5eb9f81 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -191,7 +191,7 @@ void mp_pool_show_summary(struct MultiProcessingPool *pool) { struct timespec duration; get_task_duration(task, &duration); long diff = duration.tv_sec + duration.tv_nsec / 1000000000L; - printf("%-4s %-10d %7lds %-10s\n", status_str, task->parent_pid, diff, task->ident) ; + printf("%-4s %10d %7lds %-10s\n", status_str, task->parent_pid, diff, task->ident) ; } puts(""); } -- cgit From 3ef3a2758c572dcdeed7d693f52c5049394f11f5 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Tue, 17 Sep 2024 10:29:13 -0400 Subject: Fix test status expectation * Fix child not returning result of execvp(). task->status is for program status, not fork() status. --- src/multiprocessing.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 5eb9f81..8709911 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -65,8 +65,7 @@ int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, co fflush(stdout); fflush(stderr); char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL}; - task->status = execvp("/bin/bash", args); - return 0; // NOP return to satisfy the compiler + return execvp("/bin/bash", args); } int parent(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, pid_t pid, int *child_status) { -- cgit From 6f7cf6e1094e54e593801cc452b65ae9c7b4824a Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Tue, 17 Sep 2024 10:56:41 -0400 Subject: Remove short circuit test code * Remove exmain() and dead comments from main() --- src/multiprocessing.c | 29 +---------------------------- 1 file changed, 1 insertion(+), 28 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 8709911..8e11a93 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -437,31 +437,4 @@ void mp_pool_free(struct MultiProcessingPool **pool) { } (*pool) = NULL; } -} - -int exmain(int argc, char *argv[]) { - size_t i; - struct MultiProcessingPool *pool = mp_pool_init("generic", "logs"); - if (!pool) { - perror("pool init"); - exit(1); - } - - char *commands[] = { - "sleep 2; dd if=/dev/zero of=file.dat bs=1M count=1", - "/bin/echo hello world; sleep 5", - "python -c 'print(1+1)'", - "(for x in {1..10}; do echo $x; sleep 0.5; done)", - "echo stdout >&1; echo stderr >&2; exit 1" - }; - - for (i = 0; i < sizeof(commands) / sizeof(*commands); i++) { - if (mp_task(pool, "commands array", commands[i]) == NULL) { - printf("Too many tasks queued (max: %d)\n", MP_POOL_TASK_MAX); - break; - } - } - mp_pool_join(pool, get_cpu_count() - 1, MP_POOL_FAIL_FAST); - mp_pool_free(&pool); - return 0; -} +} \ No newline at end of file -- cgit From 4efce32c6efcb98e3d4eda4886a75872aff6fff9 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Tue, 17 Sep 2024 22:33:46 -0400 Subject: Bugfix: log_show_contents() did not close FILE pointer --- src/multiprocessing.c | 1 + 1 file changed, 1 insertion(+) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 8e11a93..fce1a80 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -206,6 +206,7 @@ static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) { memset(buf, 0, sizeof(buf)); } fprintf(stream, "\n"); + fclose(fp); return 0; } -- cgit From 8b47235f7c81e04fa5efef492974509789f40273 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Wed, 18 Sep 2024 10:04:21 -0400 Subject: Rename mp_task to mp_pool_task --- src/multiprocessing.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index fce1a80..c784073 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -99,7 +99,7 @@ static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessing return parent(pool, task, pid, &child_status); } -struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { +struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { struct MultiProcessingTask *slot = mp_pool_next_available(pool); if (pool->num_used != pool->num_alloc) { pool->num_used++; -- cgit From 528b3b271ce13a5094d00f98e9b204ee0e24e795 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Wed, 18 Sep 2024 10:12:06 -0400 Subject: mp_pool_kill marks PIDs as unused --- src/multiprocessing.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index c784073..5478ca5 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -233,6 +233,8 @@ int mp_pool_kill(struct MultiProcessingPool *pool, int signum) { perror("clock_gettime"); exit(1); } + // We are short-circuiting the normal flow, and the process is now dead, so mark it as such + slot->pid = MP_POOL_PID_UNUSED; } } } -- cgit From 94e473cf5670f5b0373c3ec0692707e2ef9618ad Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Wed, 18 Sep 2024 10:15:49 -0400 Subject: Move process status checks outside of the if-statement * Reverse the order of checks so signals actually get reported when they occur * Changed "Task finished" to "Task ended". "Finished" sounds more like "success" and might get confusing when an error occurs --- src/multiprocessing.c | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 5478ca5..8c7c2e8 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -285,9 +285,17 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { // Is the process finished? pid_t pid = waitpid(slot->pid, &status, WNOHANG | WUNTRACED | WCONTINUED); + int task_ended = WIFEXITED(status); + int task_ended_by_signal = WIFSIGNALED(status); + int task_stopped = WIFSTOPPED(status); + int task_continued = WIFCONTINUED(status); + int status_exit = WEXITSTATUS(status); + int status_signal = WTERMSIG(status); + int status_stopped = WSTOPSIG(status); // Update status - slot->status = status; + slot->status = status_exit; + slot->signaled_by = status_signal; char progress[1024] = {0}; if (pid > 0) { @@ -296,18 +304,18 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { // The process ended in one the following ways // Note: SIGSTOP nor SIGCONT will not increment the tasks_complete counter - if (WIFEXITED(status)) { - printf("%s Task finished (status: %d)\n", progress, WEXITSTATUS(status)); - tasks_complete++; - } else if (WIFSIGNALED(status)) { - printf("%s Task ended by signal %d (%s)\n", progress, WTERMSIG(status), strsignal(WTERMSIG(status))); - tasks_complete++; - } else if (WIFSTOPPED(status)) { - printf("%s Task was suspended (%d)\n", progress, WSTOPSIG(status)); + if (task_stopped) { + printf("%s Task was suspended (%d)\n", progress, status_stopped); continue; - } else if (WIFCONTINUED(status)) { + } else if (task_continued) { printf("%s Task was resumed\n", progress); continue; + } else if (task_ended_by_signal) { + printf("%s Task ended by signal %d (%s)\n", progress, status_signal, strsignal(status_signal)); + tasks_complete++; + } else if (task_ended) { + printf("%s Task ended (status: %d)\n", progress, status_exit); + tasks_complete++; } else { fprintf(stderr, "%s Task state is unknown (0x%04X)\n", progress, status); } -- cgit From 37f68b28cf1624d9188872bea1e8efc15141824c Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Wed, 18 Sep 2024 10:16:17 -0400 Subject: Comment get_task_duration --- src/multiprocessing.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 8c7c2e8..4ae9b31 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -161,6 +161,8 @@ struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const } static void get_task_duration(struct MultiProcessingTask *task, struct timespec *result) { + // based on the timersub() macro in time.h + // This implementation uses timespec and increases the resolution from microseconds to nanoseconds. struct timespec *start = &task->time_data.t_start; struct timespec *stop = &task->time_data.t_stop; result->tv_sec = (stop->tv_sec - start->tv_sec); -- cgit From 4a4cdeb3f0255b0c9979397635c3a0c930fd153f Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Wed, 18 Sep 2024 10:18:06 -0400 Subject: Fix incorrect failure count * No longer adds the raw status value to the failure count. Just increment it instead. --- src/multiprocessing.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 4ae9b31..7452314 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -335,6 +335,8 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { if (status >> 8 != 0 || (status & 0xff) != 0) { fprintf(stderr, "%s Task failed\n", progress); + failures++; + if (flags & MP_POOL_FAIL_FAST && pool->num_used > 1) { mp_pool_kill(pool, SIGTERM); return -2; @@ -353,7 +355,6 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { // Update progress and tell the poller to ignore the PID. The process is gone. slot->pid = MP_POOL_PID_UNUSED; - failures += status; } else if (pid < 0) { fprintf(stderr, "waitpid failed: %s\n", strerror(errno)); return -1; -- cgit From fb4a7d1b212c9c6a51551e5043f6941a2371d075 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Thu, 26 Sep 2024 09:51:59 -0400 Subject: Remove usage of POSIX semaphores --- src/multiprocessing.c | 35 +++++++++-------------------------- 1 file changed, 9 insertions(+), 26 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 7452314..433d57a 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -11,10 +11,9 @@ int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, co char *cwd = NULL; FILE *fp_log = NULL; - // Synchronize sub-process startup - // Stop here until summoned by mp_pool_join() - if (sem_wait(task->gate) < 0) { - perror("sem_wait failed"); + // The task starts inside the requested working directory + if (chdir(task->working_dir)) { + perror(task->working_dir); exit(1); } @@ -142,16 +141,6 @@ struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const fflush(tp); fclose(tp); - // Create a uniquely named semaphore. - // This is used by the child process to prevent task execution until mp_pool_join is called - char sema_name[PATH_MAX] = {0}; - sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident); - sem_unlink(sema_name); - slot->gate = sem_open(sema_name, O_CREAT, 0644, 0); - if (slot->gate == SEM_FAILED) { - perror("sem_open failed"); - exit(1); - } // Execute task if (mp_task_fork(pool, slot, cmd)) { @@ -265,11 +254,12 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { } for (size_t i = lower_i; i < upper_i; i++) { struct MultiProcessingTask *slot = &pool->task[i]; - // Unlock the semaphore to allow the queued processes to start up - if (sem_post(slot->gate) < 0) { - perror("sem_post failed"); - exit(1); - } + if (slot->status == -1) { + if (mp_task_fork(pool, slot)) { + fprintf(stderr, "%s: mp_task_fork failed\n", slot->ident); + exit(1); + } + } // Has the child been processed already? if (slot->pid == MP_POOL_PID_UNUSED) { @@ -430,13 +420,6 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root void mp_pool_free(struct MultiProcessingPool **pool) { for (size_t i = 0; i < (*pool)->num_alloc; i++) { - // Close all semaphores - if ((*pool)->task[i].gate) { - if (sem_close((*pool)->task[i].gate) < 0) { - perror("sem_close failed"); - exit(1); - } - } } // Unmap all pool tasks if ((*pool)->task) { -- cgit From 3e610e935858995f411df7bb0a77e2efeeae3d66 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Thu, 26 Sep 2024 09:56:25 -0400 Subject: Implement cmd and working_dir --- src/multiprocessing.c | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 433d57a..5c66672 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -7,8 +7,7 @@ static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessing return &pool->task[pool->num_used]; } -int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { - char *cwd = NULL; +int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task) { FILE *fp_log = NULL; // The task starts inside the requested working directory @@ -42,23 +41,14 @@ int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, co timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0; } - // Get the path to the current working directory - // Used by the log header. Informative only. - cwd = getcwd(NULL, PATH_MAX); - if (!cwd) { - perror(cwd); - exit(1); - } - // Generate log header fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); fprintf(fp_log, "# PID: %d\n", task->parent_pid); - fprintf(fp_log, "# WORKDIR: %s\n", cwd); - fprintf(fp_log, "# COMMAND:\n%s\n", cmd); + fprintf(fp_log, "# WORKDIR: %s\n", task->working_dir); + fprintf(fp_log, "# COMMAND:\n%s\n", task->cmd); fprintf(fp_log, "# OUTPUT:\n"); // Commit header to log file / clean up fflush(fp_log); - guard_free(cwd); // Execute task fflush(stdout); @@ -87,18 +77,18 @@ int parent(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, p return 0; } -static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { +static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task) { pid_t pid = fork(); int child_status = 0; if (pid == -1) { return -1; } else if (pid == 0) { - child(pool, task, cmd); + child(pool, task); } return parent(pool, task, pid, &child_status); } -struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { +struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const char *ident, char *working_dir, char *cmd) { struct MultiProcessingTask *slot = mp_pool_next_available(pool); if (pool->num_used != pool->num_alloc) { pool->num_used++; @@ -118,6 +108,13 @@ struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const strcat(slot->log_file, pool->log_root); strcat(slot->log_file, "/"); + // Set working directory + if (isempty(working_dir)) { + strcpy(slot->working_dir, "."); + } else { + strcpy(slot->working_dir, working_dir); + } + // Create a temporary file to act as our intermediate command script FILE *tp = NULL; char *t_name; -- cgit From 34aaf0e2bbf47e712a0b55fa3f94387b466f314b Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Thu, 26 Sep 2024 09:58:12 -0400 Subject: Zero log_file to avoid garbage output --- src/multiprocessing.c | 1 + 1 file changed, 1 insertion(+) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 5c66672..cd48c6f 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -105,6 +105,7 @@ struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const strncpy(slot->ident, ident, sizeof(slot->ident) - 1); // Set log file path + memset(slot->log_file, 0, sizeof(*slot->log_file)); strcat(slot->log_file, pool->log_root); strcat(slot->log_file, "/"); -- cgit From 25cb7df692786c0802dd67f5e32edf77fe344e5c Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Thu, 26 Sep 2024 09:58:47 -0400 Subject: Do not fork immediately after a task is created --- src/multiprocessing.c | 4 ---- 1 file changed, 4 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index cd48c6f..2b77059 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -140,10 +140,6 @@ struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const fclose(tp); - // Execute task - if (mp_task_fork(pool, slot, cmd)) { - return NULL; - } return slot; } -- cgit From c645252886ce1ff63df9ebb252acca81ffa7b8bf Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Thu, 26 Sep 2024 09:58:57 -0400 Subject: Spacing --- src/multiprocessing.c | 1 + 1 file changed, 1 insertion(+) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 2b77059..3c3c5eb 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -246,6 +246,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { size_t x = upper_i - pool->num_used; upper_i -= (size_t) x; } + for (size_t i = lower_i; i < upper_i; i++) { struct MultiProcessingTask *slot = &pool->task[i]; if (slot->status == -1) { -- cgit From c59871a5018927836af991799022c2831cdccb5d Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 27 Sep 2024 10:00:20 -0400 Subject: "Task started" message is redundant --- src/multiprocessing.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 3c3c5eb..1bd9ec0 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -25,7 +25,8 @@ int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task) { // Redirect stdout and stderr to the log file fflush(stdout); fflush(stderr); - printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid); + // Set log file name + sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid); fp_log = freopen(task->log_file, "w+", stdout); if (!fp_log) { fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno)); -- cgit From b2cf2c2acbdda61a510b6cf36f07d14375163887 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 27 Sep 2024 10:00:43 -0400 Subject: Move log_file path assignment into the child --- src/multiprocessing.c | 2 -- 1 file changed, 2 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 1bd9ec0..8677909 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -65,8 +65,6 @@ int parent(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, p task->pid = pid; task->parent_pid = pid; - // Set log file name - sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid); mp_global_task_count++; // Check child's status -- cgit From d657705546d8a4758ec1c65ce13ae7d2f954bc95 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 27 Sep 2024 10:01:22 -0400 Subject: Terminate everything when forking fails --- src/multiprocessing.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 8677909..bd4678a 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -251,7 +251,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { if (slot->status == -1) { if (mp_task_fork(pool, slot)) { fprintf(stderr, "%s: mp_task_fork failed\n", slot->ident); - exit(1); + kill(0, SIGTERM); } } -- cgit From 8012b6f930210701dd62f2ffb21247e58037d80f Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 27 Sep 2024 10:04:39 -0400 Subject: Show "Task is running" every X seconds, instead of X iterations --- src/multiprocessing.c | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index bd4678a..6a97b3b 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -234,8 +234,8 @@ int mp_pool_kill(struct MultiProcessingPool *pool, int signum) { int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { int status = 0; - int watcher = 0; int failures = 0; + time_t watcher = time(NULL); size_t tasks_complete = 0; size_t lower_i = 0; size_t upper_i = jobs; @@ -253,7 +253,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { fprintf(stderr, "%s: mp_task_fork failed\n", slot->ident); kill(0, SIGTERM); } - } + } // Has the child been processed already? if (slot->pid == MP_POOL_PID_UNUSED) { @@ -343,11 +343,12 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { fprintf(stderr, "waitpid failed: %s\n", strerror(errno)); return -1; } else { - if (watcher > 9) { + time_t watcher_diff = time(NULL) - watcher; + if (watcher_diff == 0) { printf("[%s:%s] Task is running (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); - watcher = 0; - } else { - watcher++; + } + if (watcher > 9) { + watcher = time(NULL); } } } -- cgit From 108242ce16fc7d7d9e81a1a6e9783fd9bda8b60c Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 27 Sep 2024 13:23:07 -0400 Subject: mp_pool_init(): return NULL when ident argument is NULL * reported by @kmacdonald-stsci --- src/multiprocessing.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 6a97b3b..5d22901 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -375,7 +375,8 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root) { struct MultiProcessingPool *pool; - if (!log_root) { + if (!ident || !log_root) { + // Pool must have an ident string // log_root must be set return NULL; } -- cgit From 9f535ba4e016f02c6d1bca45d6adfd04a036c9c1 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 27 Sep 2024 13:24:10 -0400 Subject: Fix missing COMMAND string in the log header --- src/multiprocessing.c | 10 ++++++++++ 1 file changed, 10 insertions(+) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 5d22901..1320a2a 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -138,6 +138,11 @@ struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const fflush(tp); fclose(tp); + // Record the command(s) + slot->cmd_len = (strlen(cmd) * sizeof(*cmd)) + 1; + slot->cmd = mmap(NULL, slot->cmd_len, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + memset(slot->cmd, 0, slot->cmd_len); + strncpy(slot->cmd, cmd, slot->cmd_len); return slot; } @@ -419,6 +424,11 @@ void mp_pool_free(struct MultiProcessingPool **pool) { } // Unmap all pool tasks if ((*pool)->task) { + if ((*pool)->task->cmd) { + if (munmap((*pool)->task->cmd, (*pool)->task->cmd_len) < 0) { + perror("munmap"); + } + } if (munmap((*pool)->task, sizeof(*(*pool)->task) * (*pool)->num_alloc) < 0) { perror("munmap"); } -- cgit From a84b874a027fd8007efc10e0602396c4b5da170c Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Fri, 27 Sep 2024 16:29:03 -0400 Subject: Fix leak * When strdup fails and the temporary file handle is open, close the handle and die. * reported by @kmacdonald-stsci --- src/multiprocessing.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 1320a2a..74ba981 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -117,7 +117,7 @@ struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const // Create a temporary file to act as our intermediate command script FILE *tp = NULL; - char *t_name; + char *t_name = NULL; t_name = xmkstemp(&tp, "w"); if (!t_name || !tp) { return NULL; -- cgit From 4e45792a63fad1ed8ff159168b44387136e23b12 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Mon, 30 Sep 2024 12:39:43 -0400 Subject: Shorten comment --- src/multiprocessing.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 74ba981..039383e 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -265,8 +265,9 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { // Child is already used up, skip it hang_check++; if (hang_check >= pool->num_used) { - // Unlikely to happen when called correctly, but if you join a pool that's already finished - // it will spin forever. This protects the program from entering an infinite loop. + // If you join a pool that's already finished it will spin + // forever. This protects the program from entering an + // infinite loop. fprintf(stderr, "%s is deadlocked\n", pool->ident); failures++; goto pool_deadlocked; -- cgit From 3881ec0a37f9880d6e602a8205b0fb2bd4d54b4f Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Mon, 30 Sep 2024 12:40:03 -0400 Subject: Replace sprintf with snprintf --- src/multiprocessing.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 039383e..5f5451e 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -292,7 +292,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { char progress[1024] = {0}; if (pid > 0) { double percent = ((double) (tasks_complete + 1) / (double) pool->num_used) * 100; - sprintf(progress, "[%s:%s] [%3.1f%%]", pool->ident, slot->ident, percent); + snprintf(progress, sizeof(progress) - 1, "[%s:%s] [%3.1f%%]", pool->ident, slot->ident, percent); // The process ended in one the following ways // Note: SIGSTOP nor SIGCONT will not increment the tasks_complete counter -- cgit From d5fa0c23542746c642a00d24956f20afa639392c Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Mon, 30 Sep 2024 12:48:49 -0400 Subject: Replace strlcpy with strncpy (maybe later) --- src/multiprocessing.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 5f5451e..38d85fc 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -112,7 +112,7 @@ struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const if (isempty(working_dir)) { strcpy(slot->working_dir, "."); } else { - strcpy(slot->working_dir, working_dir); + strncpy(slot->working_dir, working_dir, PATH_MAX - 1); } // Create a temporary file to act as our intermediate command script -- cgit From 23253d98bce3c4b21b887f175c7cd7135614ea5a Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Tue, 1 Oct 2024 15:49:11 -0400 Subject: Use watcher_diff to see how many seconds have elapsed. --- src/multiprocessing.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 38d85fc..1c585ff 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -352,8 +352,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { time_t watcher_diff = time(NULL) - watcher; if (watcher_diff == 0) { printf("[%s:%s] Task is running (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); - } - if (watcher > 9) { + } else if (watcher_diff > 9) { watcher = time(NULL); } } -- cgit From 9028e5ef90c1b7f5a42c6bf969ac3c838b570a7e Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Wed, 2 Oct 2024 14:58:31 -0400 Subject: Allow user to define the time interval for "task is running" message --- src/multiprocessing.c | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 1c585ff..baa6df7 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -240,10 +240,10 @@ int mp_pool_kill(struct MultiProcessingPool *pool, int signum) { int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { int status = 0; int failures = 0; - time_t watcher = time(NULL); size_t tasks_complete = 0; size_t lower_i = 0; size_t upper_i = jobs; + do { size_t hang_check = 0; if (upper_i >= pool->num_used) { @@ -349,11 +349,16 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { fprintf(stderr, "waitpid failed: %s\n", strerror(errno)); return -1; } else { - time_t watcher_diff = time(NULL) - watcher; - if (watcher_diff == 0) { + // Track the number of seconds elapsed for each task. + // When a task has executed for longer than status_intervals, print a status update + // _seconds represents the time between intervals, not the total runtime of the task + slot->_seconds = time(NULL) - slot->_now; + if (slot->_seconds > pool->status_interval) { + slot->_now = time(NULL); + slot->_seconds = 0; + } + if (slot->_seconds == 0) { printf("[%s:%s] Task is running (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); - } else if (watcher_diff > 9) { - watcher = time(NULL); } } } -- cgit From b041e8216c05940f5b3676ef5c7e0ce2b4441bc8 Mon Sep 17 00:00:00 2001 From: Joseph Hunkeler Date: Wed, 2 Oct 2024 15:00:57 -0400 Subject: "Task started" is more accurate than "queued" when this is printed --- src/multiprocessing.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/multiprocessing.c') diff --git a/src/multiprocessing.c b/src/multiprocessing.c index baa6df7..2a1b350 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -1,6 +1,6 @@ #include "core.h" -/// The sum of all tasks queued by mp_task() +/// The sum of all tasks started by mp_task() size_t mp_global_task_count = 0; static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) { @@ -59,7 +59,7 @@ int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task) { } int parent(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, pid_t pid, int *child_status) { - printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid); + printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, pid); // Give the child process access to our PID value task->pid = pid; -- cgit