aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoseph Hunkeler <jhunkeler@gmail.com>2024-09-17 09:30:16 -0400
committerJoseph Hunkeler <jhunkeler@gmail.com>2024-09-18 23:07:14 -0400
commit6921cb44f5464b59a2a169167cc63c66bcc95a9f (patch)
tree14e38035ee080d0b1b063919becf260362f455e6
parent754bb98ed952b589d41affdf677f8284926f368a (diff)
downloadstasis-6921cb44f5464b59a2a169167cc63c66bcc95a9f.tar.gz
Break parent/child calls into static functions
-rw-r--r--src/multiprocessing.c157
1 files changed, 86 insertions, 71 deletions
diff --git a/src/multiprocessing.c b/src/multiprocessing.c
index 7b3f997..a1d961e 100644
--- a/src/multiprocessing.c
+++ b/src/multiprocessing.c
@@ -5,83 +5,99 @@ static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessing
return &pool->task[pool->num_used];
}
-static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) {
- pid_t pid = fork();
- int child_status = 0;
- if (pid == -1) {
+int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) {
+ char *cwd = NULL;
+ FILE *fp_log = NULL;
+
+ // Synchronize sub-process startup
+ // Stop here until summoned by mp_pool_join()
+ if (sem_wait(task->gate) < 0) {
+ perror("sem_wait failed");
+ exit(1);
+ }
+
+ // Record the task start time
+ if (clock_gettime(CLOCK_REALTIME, &task->time_data.t_start) < 0) {
+ perror("clock_gettime");
+ exit(1);
+ }
+
+ // Redirect stdout and stderr to the log file
+ fflush(stdout);
+ fflush(stderr);
+ printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid);
+ fp_log = freopen(task->log_file, "w+", stdout);
+ if (!fp_log) {
+ fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno));
return -1;
- } else if (pid == 0) { // child process
- char *cwd = NULL;
- FILE *fp_log = NULL;
-
- // Synchronize sub-process startup
- // Stop here until summoned by mp_pool_join()
- if (sem_wait(task->gate) < 0) {
- perror("sem_wait failed");
- exit(1);
- }
+ }
+ dup2(fileno(stdout), fileno(stderr));
+
+ // Generate timestamp for log header
+ time_t t = time(NULL);
+ char *timebuf = ctime(&t);
+ if (timebuf) {
+ // strip line feed from timestamp
+ timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0;
+ }
- // Redirect stdout and stderr to the log file
- fflush(stdout);
- fflush(stderr);
- printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid);
- fp_log = freopen(task->log_file, "w+", stdout);
- if (!fp_log) {
- fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno));
- return -1;
- }
- dup2(fileno(stdout), fileno(stderr));
-
- // Generate timestamp for log header
- time_t t = time(NULL);
- char *timebuf = ctime(&t);
- if (timebuf) {
- // strip line feed from timestamp
- timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0;
- }
+ // Get the path to the current working directory
+ // Used by the log header. Informative only.
+ cwd = getcwd(NULL, PATH_MAX);
+ if (!cwd) {
+ perror(cwd);
+ exit(1);
+ }
- cwd = getcwd(NULL, PATH_MAX);
- if (!cwd) {
- perror(cwd);
- exit(1);
- }
+ // Generate log header
+ fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown");
+ fprintf(fp_log, "# PID: %d\n", task->parent_pid);
+ fprintf(fp_log, "# WORKDIR: %s\n", cwd);
+ fprintf(fp_log, "# COMMAND:\n%s\n", cmd);
+ fprintf(fp_log, "# OUTPUT:\n");
+ // Commit header to log file / clean up
+ fflush(fp_log);
+ guard_free(cwd);
+
+ // Execute task
+ fflush(stdout);
+ fflush(stderr);
+ char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL};
+ task->status = execvp("/bin/bash", args);
+ return 0; // NOP return to satisfy the compiler
+}
- // Generate log header
- fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown");
- fprintf(fp_log, "# PID: %d\n", task->parent_pid);
- fprintf(fp_log, "# WORKDIR: %s\n", cwd);
- fprintf(fp_log, "# COMMAND:\n%s\n", cmd);
- fprintf(fp_log, "# OUTPUT:\n");
- // Commit header to log file / clean up
- fflush(fp_log);
- guard_free(cwd);
-
- // Execute task
- fflush(stdout);
- fflush(stderr);
- char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL};
- task->status = execvp("/bin/bash", args);
- } else { // parent process
- printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid);
-
- // Give the child process access to our PID value
- task->pid = pid;
- task->parent_pid = pid;
-
- // Set log file name
- sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid);
- mp_global_task_count++;
-
- // Check child's status
- pid_t code = waitpid(pid, &child_status, WUNTRACED | WCONTINUED | WNOHANG);
- if (code < 0) {
- perror("waitpid failed");
- return -1;
- }
+int parent(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, pid_t pid, int *child_status) {
+ printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid);
+
+ // Give the child process access to our PID value
+ task->pid = pid;
+ task->parent_pid = pid;
+
+ // Set log file name
+ sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid);
+ mp_global_task_count++;
+
+ // Check child's status
+ pid_t code = waitpid(pid, child_status, WUNTRACED | WCONTINUED | WNOHANG);
+ if (code < 0) {
+ perror("waitpid failed");
+ return -1;
}
return 0;
}
+static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) {
+ pid_t pid = fork();
+ int child_status = 0;
+ if (pid == -1) {
+ return -1;
+ } else if (pid == 0) {
+ child(pool, task, cmd);
+ }
+ return parent(pool, task, pid, &child_status);
+}
+
struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) {
struct MultiProcessingTask *slot = mp_pool_next_available(pool);
if (pool->num_used != pool->num_alloc) {
@@ -212,8 +228,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) {
char progress[1024] = {0};
if (pid > 0) {
- double percent = ((double) (tasks_complete) / (double) pool->num_used) * 100;
- //double percent = ((double) (tasks_complete) / (double) pool->num_alloc) * 100;
+ double percent = ((double) (tasks_complete + 1) / (double) pool->num_used) * 100;
sprintf(progress, "[%s:%s] [%3.1f%%]", pool->ident, slot->ident, percent);
// The process ended in one the following ways