diff options
author | Joseph Hunkeler <jhunkeler@users.noreply.github.com> | 2024-10-04 08:40:39 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-10-04 08:40:39 -0400 |
commit | d7e3deba72703ad36c497f5becf6772ca00a0d6d (patch) | |
tree | eff3b2ec3dcc31126041529c8e00a714997f2d7b /src/multiprocessing.c | |
parent | 9691ccf51b3efd8113e9620c4afa8b5382d7f161 (diff) | |
parent | f0ba8cd378a460f927644e41f49be95d0e956f81 (diff) | |
download | stasis-d7e3deba72703ad36c497f5becf6772ca00a0d6d.tar.gz |
Merge pull request #46 from jhunkeler/split-delivery-code
Add multiprocessing / Split delivery code
Diffstat (limited to 'src/multiprocessing.c')
-rw-r--r-- | src/multiprocessing.c | 448 |
1 files changed, 448 insertions, 0 deletions
diff --git a/src/multiprocessing.c b/src/multiprocessing.c new file mode 100644 index 0000000..2a1b350 --- /dev/null +++ b/src/multiprocessing.c @@ -0,0 +1,448 @@ +#include "core.h" + +/// The sum of all tasks started by mp_task() +size_t mp_global_task_count = 0; + +static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) { + return &pool->task[pool->num_used]; +} + +int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task) { + FILE *fp_log = NULL; + + // The task starts inside the requested working directory + if (chdir(task->working_dir)) { + perror(task->working_dir); + exit(1); + } + + // Record the task start time + if (clock_gettime(CLOCK_REALTIME, &task->time_data.t_start) < 0) { + perror("clock_gettime"); + exit(1); + } + + // Redirect stdout and stderr to the log file + fflush(stdout); + fflush(stderr); + // Set log file name + sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid); + fp_log = freopen(task->log_file, "w+", stdout); + if (!fp_log) { + fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno)); + return -1; + } + dup2(fileno(stdout), fileno(stderr)); + + // Generate timestamp for log header + time_t t = time(NULL); + char *timebuf = ctime(&t); + if (timebuf) { + // strip line feed from timestamp + timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0; + } + + // Generate log header + fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); + fprintf(fp_log, "# PID: %d\n", task->parent_pid); + fprintf(fp_log, "# WORKDIR: %s\n", task->working_dir); + fprintf(fp_log, "# COMMAND:\n%s\n", task->cmd); + fprintf(fp_log, "# OUTPUT:\n"); + // Commit header to log file / clean up + fflush(fp_log); + + // Execute task + fflush(stdout); + fflush(stderr); + char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL}; + return execvp("/bin/bash", args); +} + +int parent(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, pid_t pid, int *child_status) { + printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, pid); + + // Give the child process access to our PID value + task->pid = pid; + task->parent_pid = pid; + + mp_global_task_count++; + + // Check child's status + pid_t code = waitpid(pid, child_status, WUNTRACED | WCONTINUED | WNOHANG); + if (code < 0) { + perror("waitpid failed"); + return -1; + } + return 0; +} + +static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task) { + pid_t pid = fork(); + int child_status = 0; + if (pid == -1) { + return -1; + } else if (pid == 0) { + child(pool, task); + } + return parent(pool, task, pid, &child_status); +} + +struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const char *ident, char *working_dir, char *cmd) { + struct MultiProcessingTask *slot = mp_pool_next_available(pool); + if (pool->num_used != pool->num_alloc) { + pool->num_used++; + } else { + fprintf(stderr, "Maximum number of tasks reached\n"); + return NULL; + } + + // Set default status to "error" + slot->status = -1; + + // Set task identifier string + memset(slot->ident, 0, sizeof(slot->ident)); + strncpy(slot->ident, ident, sizeof(slot->ident) - 1); + + // Set log file path + memset(slot->log_file, 0, sizeof(*slot->log_file)); + strcat(slot->log_file, pool->log_root); + strcat(slot->log_file, "/"); + + // Set working directory + if (isempty(working_dir)) { + strcpy(slot->working_dir, "."); + } else { + strncpy(slot->working_dir, working_dir, PATH_MAX - 1); + } + + // Create a temporary file to act as our intermediate command script + FILE *tp = NULL; + char *t_name = NULL; + t_name = xmkstemp(&tp, "w"); + if (!t_name || !tp) { + return NULL; + } + + // Set the script's permissions so that only the calling user can use it + // This should help prevent eavesdropping if keys are applied in plain-text + // somewhere. + chmod(t_name, 0700); + + // Record the script path + memset(slot->parent_script, 0, sizeof(slot->parent_script)); + strncpy(slot->parent_script, t_name, PATH_MAX - 1); + guard_free(t_name); + + // Populate the script + fprintf(tp, "#!/bin/bash\n%s\n", cmd); + fflush(tp); + fclose(tp); + + // Record the command(s) + slot->cmd_len = (strlen(cmd) * sizeof(*cmd)) + 1; + slot->cmd = mmap(NULL, slot->cmd_len, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + memset(slot->cmd, 0, slot->cmd_len); + strncpy(slot->cmd, cmd, slot->cmd_len); + + return slot; +} + +static void get_task_duration(struct MultiProcessingTask *task, struct timespec *result) { + // based on the timersub() macro in time.h + // This implementation uses timespec and increases the resolution from microseconds to nanoseconds. + struct timespec *start = &task->time_data.t_start; + struct timespec *stop = &task->time_data.t_stop; + result->tv_sec = (stop->tv_sec - start->tv_sec); + result->tv_nsec = (stop->tv_nsec - start->tv_nsec); + if (result->tv_nsec < 0) { + --result->tv_sec; + result->tv_nsec += 1000000000L; + } +} + +void mp_pool_show_summary(struct MultiProcessingPool *pool) { + print_banner("=", 79); + printf("Pool execution summary for \"%s\"\n", pool->ident); + print_banner("=", 79); + printf("STATUS PID DURATION IDENT\n"); + for (size_t i = 0; i < pool->num_used; i++) { + struct MultiProcessingTask *task = &pool->task[i]; + char status_str[10] = {0}; + if (!task->status && !task->signaled_by) { + strcpy(status_str, "DONE"); + } else if (task->signaled_by) { + strcpy(status_str, "TERM"); + } else { + strcpy(status_str, "FAIL"); + } + + struct timespec duration; + get_task_duration(task, &duration); + long diff = duration.tv_sec + duration.tv_nsec / 1000000000L; + printf("%-4s %10d %7lds %-10s\n", status_str, task->parent_pid, diff, task->ident) ; + } + puts(""); +} + +static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) { + FILE *fp = fopen(task->log_file, "r"); + if (!fp) { + return -1; + } + char buf[BUFSIZ] = {0}; + while ((fgets(buf, sizeof(buf) - 1, fp)) != NULL) { + fprintf(stream, "%s", buf); + memset(buf, 0, sizeof(buf)); + } + fprintf(stream, "\n"); + fclose(fp); + return 0; +} + +int mp_pool_kill(struct MultiProcessingPool *pool, int signum) { + printf("Sending signal %d to pool '%s'\n", signum, pool->ident); + for (size_t i = 0; i < pool->num_used; i++) { + struct MultiProcessingTask *slot = &pool->task[i]; + if (!slot) { + return -1; + } + // Kill tasks in progress + if (slot->pid > 0) { + int status; + printf("Sending signal %d to task '%s' (pid: %d)\n", signum, slot->ident, slot->pid); + status = kill(slot->pid, signum); + if (status && errno != ESRCH) { + fprintf(stderr, "Task '%s' (pid: %d) did not respond: %s\n", slot->ident, slot->pid, strerror(errno)); + } else { + // Wait for process to handle the signal, then set the status accordingly + if (waitpid(slot->pid, &status, 0) >= 0) { + slot->signaled_by = WTERMSIG(status); + // Record the task stop time + if (clock_gettime(CLOCK_REALTIME, &slot->time_data.t_stop) < 0) { + perror("clock_gettime"); + exit(1); + } + // We are short-circuiting the normal flow, and the process is now dead, so mark it as such + slot->pid = MP_POOL_PID_UNUSED; + } + } + } + if (!access(slot->log_file, F_OK)) { + remove(slot->log_file); + } + if (!access(slot->parent_script, F_OK)) { + remove(slot->parent_script); + } + } + return 0; +} + +int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { + int status = 0; + int failures = 0; + size_t tasks_complete = 0; + size_t lower_i = 0; + size_t upper_i = jobs; + + do { + size_t hang_check = 0; + if (upper_i >= pool->num_used) { + size_t x = upper_i - pool->num_used; + upper_i -= (size_t) x; + } + + for (size_t i = lower_i; i < upper_i; i++) { + struct MultiProcessingTask *slot = &pool->task[i]; + if (slot->status == -1) { + if (mp_task_fork(pool, slot)) { + fprintf(stderr, "%s: mp_task_fork failed\n", slot->ident); + kill(0, SIGTERM); + } + } + + // Has the child been processed already? + if (slot->pid == MP_POOL_PID_UNUSED) { + // Child is already used up, skip it + hang_check++; + if (hang_check >= pool->num_used) { + // If you join a pool that's already finished it will spin + // forever. This protects the program from entering an + // infinite loop. + fprintf(stderr, "%s is deadlocked\n", pool->ident); + failures++; + goto pool_deadlocked; + } + continue; + } + + // Is the process finished? + pid_t pid = waitpid(slot->pid, &status, WNOHANG | WUNTRACED | WCONTINUED); + int task_ended = WIFEXITED(status); + int task_ended_by_signal = WIFSIGNALED(status); + int task_stopped = WIFSTOPPED(status); + int task_continued = WIFCONTINUED(status); + int status_exit = WEXITSTATUS(status); + int status_signal = WTERMSIG(status); + int status_stopped = WSTOPSIG(status); + + // Update status + slot->status = status_exit; + slot->signaled_by = status_signal; + + char progress[1024] = {0}; + if (pid > 0) { + double percent = ((double) (tasks_complete + 1) / (double) pool->num_used) * 100; + snprintf(progress, sizeof(progress) - 1, "[%s:%s] [%3.1f%%]", pool->ident, slot->ident, percent); + + // The process ended in one the following ways + // Note: SIGSTOP nor SIGCONT will not increment the tasks_complete counter + if (task_stopped) { + printf("%s Task was suspended (%d)\n", progress, status_stopped); + continue; + } else if (task_continued) { + printf("%s Task was resumed\n", progress); + continue; + } else if (task_ended_by_signal) { + printf("%s Task ended by signal %d (%s)\n", progress, status_signal, strsignal(status_signal)); + tasks_complete++; + } else if (task_ended) { + printf("%s Task ended (status: %d)\n", progress, status_exit); + tasks_complete++; + } else { + fprintf(stderr, "%s Task state is unknown (0x%04X)\n", progress, status); + } + + // Show the log (always) + if (show_log_contents(stdout, slot)) { + perror(slot->log_file); + } + + // Record the task stop time + if (clock_gettime(CLOCK_REALTIME, &slot->time_data.t_stop) < 0) { + perror("clock_gettime"); + exit(1); + } + + if (status >> 8 != 0 || (status & 0xff) != 0) { + fprintf(stderr, "%s Task failed\n", progress); + failures++; + + if (flags & MP_POOL_FAIL_FAST && pool->num_used > 1) { + mp_pool_kill(pool, SIGTERM); + return -2; + } + } else { + printf("%s Task finished\n", progress); + } + + // Clean up logs and scripts left behind by the task + if (remove(slot->log_file)) { + fprintf(stderr, "%s Unable to remove log file: '%s': %s\n", progress, slot->parent_script, strerror(errno)); + } + if (remove(slot->parent_script)) { + fprintf(stderr, "%s Unable to remove temporary script '%s': %s\n", progress, slot->parent_script, strerror(errno)); + } + + // Update progress and tell the poller to ignore the PID. The process is gone. + slot->pid = MP_POOL_PID_UNUSED; + } else if (pid < 0) { + fprintf(stderr, "waitpid failed: %s\n", strerror(errno)); + return -1; + } else { + // Track the number of seconds elapsed for each task. + // When a task has executed for longer than status_intervals, print a status update + // _seconds represents the time between intervals, not the total runtime of the task + slot->_seconds = time(NULL) - slot->_now; + if (slot->_seconds > pool->status_interval) { + slot->_now = time(NULL); + slot->_seconds = 0; + } + if (slot->_seconds == 0) { + printf("[%s:%s] Task is running (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); + } + } + } + + if (tasks_complete == pool->num_used) { + break; + } + + if (tasks_complete == upper_i) { + lower_i += jobs; + upper_i += jobs; + } + + // Poll again after a short delay + sleep(1); + } while (1); + + pool_deadlocked: + puts(""); + return failures; +} + + +struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root) { + struct MultiProcessingPool *pool; + + if (!ident || !log_root) { + // Pool must have an ident string + // log_root must be set + return NULL; + } + + // The pool is shared with children + pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + + // Set pool identity string + memset(pool->ident, 0, sizeof(pool->ident)); + strncpy(pool->ident, ident, sizeof(pool->ident) - 1); + + // Set logging base directory + memset(pool->log_root, 0, sizeof(pool->log_root)); + strncpy(pool->log_root, log_root, sizeof(pool->log_root) - 1); + pool->num_used = 0; + pool->num_alloc = MP_POOL_TASK_MAX; + + // Create the log directory + if (mkdirs(log_root, 0700) < 0) { + if (errno != EEXIST) { + perror(log_root); + mp_pool_free(&pool); + return NULL; + } + } + + // Task array is shared with children + pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + if (pool->task == MAP_FAILED) { + perror("mmap"); + mp_pool_free(&pool); + return NULL; + } + + return pool; +} + +void mp_pool_free(struct MultiProcessingPool **pool) { + for (size_t i = 0; i < (*pool)->num_alloc; i++) { + } + // Unmap all pool tasks + if ((*pool)->task) { + if ((*pool)->task->cmd) { + if (munmap((*pool)->task->cmd, (*pool)->task->cmd_len) < 0) { + perror("munmap"); + } + } + if (munmap((*pool)->task, sizeof(*(*pool)->task) * (*pool)->num_alloc) < 0) { + perror("munmap"); + } + } + // Unmap the pool + if ((*pool)) { + if (munmap((*pool), sizeof(*(*pool))) < 0) { + perror("munmap"); + } + (*pool) = NULL; + } +}
\ No newline at end of file |