diff options
author | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-17 09:30:16 -0400 |
---|---|---|
committer | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-18 23:07:14 -0400 |
commit | 6921cb44f5464b59a2a169167cc63c66bcc95a9f (patch) | |
tree | 14e38035ee080d0b1b063919becf260362f455e6 | |
parent | 754bb98ed952b589d41affdf677f8284926f368a (diff) | |
download | stasis-6921cb44f5464b59a2a169167cc63c66bcc95a9f.tar.gz |
Break parent/child calls into static functions
-rw-r--r-- | src/multiprocessing.c | 157 |
1 files changed, 86 insertions, 71 deletions
diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 7b3f997..a1d961e 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -5,83 +5,99 @@ static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessing return &pool->task[pool->num_used]; } -static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { - pid_t pid = fork(); - int child_status = 0; - if (pid == -1) { +int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { + char *cwd = NULL; + FILE *fp_log = NULL; + + // Synchronize sub-process startup + // Stop here until summoned by mp_pool_join() + if (sem_wait(task->gate) < 0) { + perror("sem_wait failed"); + exit(1); + } + + // Record the task start time + if (clock_gettime(CLOCK_REALTIME, &task->time_data.t_start) < 0) { + perror("clock_gettime"); + exit(1); + } + + // Redirect stdout and stderr to the log file + fflush(stdout); + fflush(stderr); + printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid); + fp_log = freopen(task->log_file, "w+", stdout); + if (!fp_log) { + fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno)); return -1; - } else if (pid == 0) { // child process - char *cwd = NULL; - FILE *fp_log = NULL; - - // Synchronize sub-process startup - // Stop here until summoned by mp_pool_join() - if (sem_wait(task->gate) < 0) { - perror("sem_wait failed"); - exit(1); - } + } + dup2(fileno(stdout), fileno(stderr)); + + // Generate timestamp for log header + time_t t = time(NULL); + char *timebuf = ctime(&t); + if (timebuf) { + // strip line feed from timestamp + timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0; + } - // Redirect stdout and stderr to the log file - fflush(stdout); - fflush(stderr); - printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid); - fp_log = freopen(task->log_file, "w+", stdout); - if (!fp_log) { - fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno)); - return -1; - } - dup2(fileno(stdout), fileno(stderr)); - - // Generate timestamp for log header - time_t t = time(NULL); - char *timebuf = ctime(&t); - if (timebuf) { - // strip line feed from timestamp - timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0; - } + // Get the path to the current working directory + // Used by the log header. Informative only. + cwd = getcwd(NULL, PATH_MAX); + if (!cwd) { + perror(cwd); + exit(1); + } - cwd = getcwd(NULL, PATH_MAX); - if (!cwd) { - perror(cwd); - exit(1); - } + // Generate log header + fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); + fprintf(fp_log, "# PID: %d\n", task->parent_pid); + fprintf(fp_log, "# WORKDIR: %s\n", cwd); + fprintf(fp_log, "# COMMAND:\n%s\n", cmd); + fprintf(fp_log, "# OUTPUT:\n"); + // Commit header to log file / clean up + fflush(fp_log); + guard_free(cwd); + + // Execute task + fflush(stdout); + fflush(stderr); + char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL}; + task->status = execvp("/bin/bash", args); + return 0; // NOP return to satisfy the compiler +} - // Generate log header - fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); - fprintf(fp_log, "# PID: %d\n", task->parent_pid); - fprintf(fp_log, "# WORKDIR: %s\n", cwd); - fprintf(fp_log, "# COMMAND:\n%s\n", cmd); - fprintf(fp_log, "# OUTPUT:\n"); - // Commit header to log file / clean up - fflush(fp_log); - guard_free(cwd); - - // Execute task - fflush(stdout); - fflush(stderr); - char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL}; - task->status = execvp("/bin/bash", args); - } else { // parent process - printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid); - - // Give the child process access to our PID value - task->pid = pid; - task->parent_pid = pid; - - // Set log file name - sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid); - mp_global_task_count++; - - // Check child's status - pid_t code = waitpid(pid, &child_status, WUNTRACED | WCONTINUED | WNOHANG); - if (code < 0) { - perror("waitpid failed"); - return -1; - } +int parent(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, pid_t pid, int *child_status) { + printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid); + + // Give the child process access to our PID value + task->pid = pid; + task->parent_pid = pid; + + // Set log file name + sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid); + mp_global_task_count++; + + // Check child's status + pid_t code = waitpid(pid, child_status, WUNTRACED | WCONTINUED | WNOHANG); + if (code < 0) { + perror("waitpid failed"); + return -1; } return 0; } +static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { + pid_t pid = fork(); + int child_status = 0; + if (pid == -1) { + return -1; + } else if (pid == 0) { + child(pool, task, cmd); + } + return parent(pool, task, pid, &child_status); +} + struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { struct MultiProcessingTask *slot = mp_pool_next_available(pool); if (pool->num_used != pool->num_alloc) { @@ -212,8 +228,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { char progress[1024] = {0}; if (pid > 0) { - double percent = ((double) (tasks_complete) / (double) pool->num_used) * 100; - //double percent = ((double) (tasks_complete) / (double) pool->num_alloc) * 100; + double percent = ((double) (tasks_complete + 1) / (double) pool->num_used) * 100; sprintf(progress, "[%s:%s] [%3.1f%%]", pool->ident, slot->ident, percent); // The process ended in one the following ways |