diff options
| author | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-17 09:30:16 -0400 | 
|---|---|---|
| committer | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-18 23:07:14 -0400 | 
| commit | 6921cb44f5464b59a2a169167cc63c66bcc95a9f (patch) | |
| tree | 14e38035ee080d0b1b063919becf260362f455e6 | |
| parent | 754bb98ed952b589d41affdf677f8284926f368a (diff) | |
| download | stasis-6921cb44f5464b59a2a169167cc63c66bcc95a9f.tar.gz | |
Break parent/child calls into static functions
| -rw-r--r-- | src/multiprocessing.c | 157 | 
1 files changed, 86 insertions, 71 deletions
| diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 7b3f997..a1d961e 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -5,83 +5,99 @@ static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessing      return &pool->task[pool->num_used];  } -static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { -    pid_t pid = fork(); -    int child_status = 0; -    if (pid == -1) { +int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { +    char *cwd = NULL; +    FILE *fp_log = NULL; + +    // Synchronize sub-process startup +    // Stop here until summoned by mp_pool_join() +    if (sem_wait(task->gate) < 0) { +        perror("sem_wait failed"); +        exit(1); +    } + +    // Record the task start time +    if (clock_gettime(CLOCK_REALTIME, &task->time_data.t_start) < 0) { +        perror("clock_gettime"); +        exit(1); +    } + +    // Redirect stdout and stderr to the log file +    fflush(stdout); +    fflush(stderr); +    printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid); +    fp_log = freopen(task->log_file, "w+", stdout); +    if (!fp_log) { +        fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno));          return -1; -    } else if (pid == 0) { // child process -        char *cwd = NULL; -        FILE *fp_log = NULL; - -        // Synchronize sub-process startup -        // Stop here until summoned by mp_pool_join() -        if (sem_wait(task->gate) < 0) { -            perror("sem_wait failed"); -            exit(1); -        } +    } +    dup2(fileno(stdout), fileno(stderr)); + +    // Generate timestamp for log header +    time_t t = time(NULL); +    char *timebuf = ctime(&t); +    if (timebuf) { +        // strip line feed from timestamp +        timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0; +    } -        // Redirect stdout and stderr to the log file -        fflush(stdout); -        fflush(stderr); -        printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid); -        fp_log = freopen(task->log_file, "w+", stdout); -        if (!fp_log) { -            fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno)); -            return -1; -        } -        dup2(fileno(stdout), fileno(stderr)); - -        // Generate timestamp for log header -        time_t t = time(NULL); -        char *timebuf = ctime(&t); -        if (timebuf) { -            // strip line feed from timestamp -            timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0; -        } +    // Get the path to the current working directory +    // Used by the log header. Informative only. +    cwd = getcwd(NULL, PATH_MAX); +    if (!cwd) { +        perror(cwd); +        exit(1); +    } -        cwd = getcwd(NULL, PATH_MAX); -        if (!cwd) { -            perror(cwd); -            exit(1); -        } +    // Generate log header +    fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); +    fprintf(fp_log, "# PID: %d\n", task->parent_pid); +    fprintf(fp_log, "# WORKDIR: %s\n", cwd); +    fprintf(fp_log, "# COMMAND:\n%s\n", cmd); +    fprintf(fp_log, "# OUTPUT:\n"); +    // Commit header to log file / clean up +    fflush(fp_log); +    guard_free(cwd); + +    // Execute task +    fflush(stdout); +    fflush(stderr); +    char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL}; +    task->status = execvp("/bin/bash", args); +    return 0; // NOP return to satisfy the compiler +} -        // Generate log header -        fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); -        fprintf(fp_log, "# PID: %d\n", task->parent_pid); -        fprintf(fp_log, "# WORKDIR: %s\n", cwd); -        fprintf(fp_log, "# COMMAND:\n%s\n", cmd); -        fprintf(fp_log, "# OUTPUT:\n"); -        // Commit header to log file / clean up -        fflush(fp_log); -        guard_free(cwd); - -        // Execute task -        fflush(stdout); -        fflush(stderr); -        char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL}; -        task->status = execvp("/bin/bash", args); -    } else { // parent process -        printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid); - -        // Give the child process access to our PID value -        task->pid = pid; -        task->parent_pid = pid; - -        // Set log file name -        sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid); -        mp_global_task_count++; - -        // Check child's status -        pid_t code = waitpid(pid, &child_status, WUNTRACED | WCONTINUED | WNOHANG); -        if (code < 0) { -            perror("waitpid failed"); -            return -1; -        } +int parent(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, pid_t pid, int *child_status) { +    printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid); + +    // Give the child process access to our PID value +    task->pid = pid; +    task->parent_pid = pid; + +    // Set log file name +    sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid); +    mp_global_task_count++; + +    // Check child's status +    pid_t code = waitpid(pid, child_status, WUNTRACED | WCONTINUED | WNOHANG); +    if (code < 0) { +        perror("waitpid failed"); +        return -1;      }      return 0;  } +static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) { +    pid_t pid = fork(); +    int child_status = 0; +    if (pid == -1) { +        return -1; +    } else if (pid == 0) { +        child(pool, task, cmd); +    } +    return parent(pool, task, pid, &child_status); +} +  struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) {      struct MultiProcessingTask *slot = mp_pool_next_available(pool);      if (pool->num_used != pool->num_alloc) { @@ -212,8 +228,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) {              char progress[1024] = {0};              if (pid > 0) { -                double percent = ((double) (tasks_complete) / (double) pool->num_used) * 100; -                //double percent = ((double) (tasks_complete) / (double) pool->num_alloc) * 100; +                double percent = ((double) (tasks_complete + 1) / (double) pool->num_used) * 100;                  sprintf(progress, "[%s:%s] [%3.1f%%]", pool->ident, slot->ident, percent);                  // The process ended in one the following ways | 
