diff options
Diffstat (limited to 'src/multiprocessing.c')
-rw-r--r-- | src/multiprocessing.c | 448 |
1 files changed, 0 insertions, 448 deletions
diff --git a/src/multiprocessing.c b/src/multiprocessing.c deleted file mode 100644 index 2a1b350..0000000 --- a/src/multiprocessing.c +++ /dev/null @@ -1,448 +0,0 @@ -#include "core.h" - -/// The sum of all tasks started by mp_task() -size_t mp_global_task_count = 0; - -static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) { - return &pool->task[pool->num_used]; -} - -int child(struct MultiProcessingPool *pool, struct MultiProcessingTask *task) { - FILE *fp_log = NULL; - - // The task starts inside the requested working directory - if (chdir(task->working_dir)) { - perror(task->working_dir); - exit(1); - } - - // Record the task start time - if (clock_gettime(CLOCK_REALTIME, &task->time_data.t_start) < 0) { - perror("clock_gettime"); - exit(1); - } - - // Redirect stdout and stderr to the log file - fflush(stdout); - fflush(stderr); - // Set log file name - sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid); - fp_log = freopen(task->log_file, "w+", stdout); - if (!fp_log) { - fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno)); - return -1; - } - dup2(fileno(stdout), fileno(stderr)); - - // Generate timestamp for log header - time_t t = time(NULL); - char *timebuf = ctime(&t); - if (timebuf) { - // strip line feed from timestamp - timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0; - } - - // Generate log header - fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); - fprintf(fp_log, "# PID: %d\n", task->parent_pid); - fprintf(fp_log, "# WORKDIR: %s\n", task->working_dir); - fprintf(fp_log, "# COMMAND:\n%s\n", task->cmd); - fprintf(fp_log, "# OUTPUT:\n"); - // Commit header to log file / clean up - fflush(fp_log); - - // Execute task - fflush(stdout); - fflush(stderr); - char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL}; - return execvp("/bin/bash", args); -} - -int parent(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, pid_t pid, int *child_status) { - printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, pid); - - // Give the child process access to our PID value - task->pid = pid; - task->parent_pid = pid; - - mp_global_task_count++; - - // Check child's status - pid_t code = waitpid(pid, child_status, WUNTRACED | WCONTINUED | WNOHANG); - if (code < 0) { - perror("waitpid failed"); - return -1; - } - return 0; -} - -static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task) { - pid_t pid = fork(); - int child_status = 0; - if (pid == -1) { - return -1; - } else if (pid == 0) { - child(pool, task); - } - return parent(pool, task, pid, &child_status); -} - -struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const char *ident, char *working_dir, char *cmd) { - struct MultiProcessingTask *slot = mp_pool_next_available(pool); - if (pool->num_used != pool->num_alloc) { - pool->num_used++; - } else { - fprintf(stderr, "Maximum number of tasks reached\n"); - return NULL; - } - - // Set default status to "error" - slot->status = -1; - - // Set task identifier string - memset(slot->ident, 0, sizeof(slot->ident)); - strncpy(slot->ident, ident, sizeof(slot->ident) - 1); - - // Set log file path - memset(slot->log_file, 0, sizeof(*slot->log_file)); - strcat(slot->log_file, pool->log_root); - strcat(slot->log_file, "/"); - - // Set working directory - if (isempty(working_dir)) { - strcpy(slot->working_dir, "."); - } else { - strncpy(slot->working_dir, working_dir, PATH_MAX - 1); - } - - // Create a temporary file to act as our intermediate command script - FILE *tp = NULL; - char *t_name = NULL; - t_name = xmkstemp(&tp, "w"); - if (!t_name || !tp) { - return NULL; - } - - // Set the script's permissions so that only the calling user can use it - // This should help prevent eavesdropping if keys are applied in plain-text - // somewhere. - chmod(t_name, 0700); - - // Record the script path - memset(slot->parent_script, 0, sizeof(slot->parent_script)); - strncpy(slot->parent_script, t_name, PATH_MAX - 1); - guard_free(t_name); - - // Populate the script - fprintf(tp, "#!/bin/bash\n%s\n", cmd); - fflush(tp); - fclose(tp); - - // Record the command(s) - slot->cmd_len = (strlen(cmd) * sizeof(*cmd)) + 1; - slot->cmd = mmap(NULL, slot->cmd_len, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); - memset(slot->cmd, 0, slot->cmd_len); - strncpy(slot->cmd, cmd, slot->cmd_len); - - return slot; -} - -static void get_task_duration(struct MultiProcessingTask *task, struct timespec *result) { - // based on the timersub() macro in time.h - // This implementation uses timespec and increases the resolution from microseconds to nanoseconds. - struct timespec *start = &task->time_data.t_start; - struct timespec *stop = &task->time_data.t_stop; - result->tv_sec = (stop->tv_sec - start->tv_sec); - result->tv_nsec = (stop->tv_nsec - start->tv_nsec); - if (result->tv_nsec < 0) { - --result->tv_sec; - result->tv_nsec += 1000000000L; - } -} - -void mp_pool_show_summary(struct MultiProcessingPool *pool) { - print_banner("=", 79); - printf("Pool execution summary for \"%s\"\n", pool->ident); - print_banner("=", 79); - printf("STATUS PID DURATION IDENT\n"); - for (size_t i = 0; i < pool->num_used; i++) { - struct MultiProcessingTask *task = &pool->task[i]; - char status_str[10] = {0}; - if (!task->status && !task->signaled_by) { - strcpy(status_str, "DONE"); - } else if (task->signaled_by) { - strcpy(status_str, "TERM"); - } else { - strcpy(status_str, "FAIL"); - } - - struct timespec duration; - get_task_duration(task, &duration); - long diff = duration.tv_sec + duration.tv_nsec / 1000000000L; - printf("%-4s %10d %7lds %-10s\n", status_str, task->parent_pid, diff, task->ident) ; - } - puts(""); -} - -static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) { - FILE *fp = fopen(task->log_file, "r"); - if (!fp) { - return -1; - } - char buf[BUFSIZ] = {0}; - while ((fgets(buf, sizeof(buf) - 1, fp)) != NULL) { - fprintf(stream, "%s", buf); - memset(buf, 0, sizeof(buf)); - } - fprintf(stream, "\n"); - fclose(fp); - return 0; -} - -int mp_pool_kill(struct MultiProcessingPool *pool, int signum) { - printf("Sending signal %d to pool '%s'\n", signum, pool->ident); - for (size_t i = 0; i < pool->num_used; i++) { - struct MultiProcessingTask *slot = &pool->task[i]; - if (!slot) { - return -1; - } - // Kill tasks in progress - if (slot->pid > 0) { - int status; - printf("Sending signal %d to task '%s' (pid: %d)\n", signum, slot->ident, slot->pid); - status = kill(slot->pid, signum); - if (status && errno != ESRCH) { - fprintf(stderr, "Task '%s' (pid: %d) did not respond: %s\n", slot->ident, slot->pid, strerror(errno)); - } else { - // Wait for process to handle the signal, then set the status accordingly - if (waitpid(slot->pid, &status, 0) >= 0) { - slot->signaled_by = WTERMSIG(status); - // Record the task stop time - if (clock_gettime(CLOCK_REALTIME, &slot->time_data.t_stop) < 0) { - perror("clock_gettime"); - exit(1); - } - // We are short-circuiting the normal flow, and the process is now dead, so mark it as such - slot->pid = MP_POOL_PID_UNUSED; - } - } - } - if (!access(slot->log_file, F_OK)) { - remove(slot->log_file); - } - if (!access(slot->parent_script, F_OK)) { - remove(slot->parent_script); - } - } - return 0; -} - -int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { - int status = 0; - int failures = 0; - size_t tasks_complete = 0; - size_t lower_i = 0; - size_t upper_i = jobs; - - do { - size_t hang_check = 0; - if (upper_i >= pool->num_used) { - size_t x = upper_i - pool->num_used; - upper_i -= (size_t) x; - } - - for (size_t i = lower_i; i < upper_i; i++) { - struct MultiProcessingTask *slot = &pool->task[i]; - if (slot->status == -1) { - if (mp_task_fork(pool, slot)) { - fprintf(stderr, "%s: mp_task_fork failed\n", slot->ident); - kill(0, SIGTERM); - } - } - - // Has the child been processed already? - if (slot->pid == MP_POOL_PID_UNUSED) { - // Child is already used up, skip it - hang_check++; - if (hang_check >= pool->num_used) { - // If you join a pool that's already finished it will spin - // forever. This protects the program from entering an - // infinite loop. - fprintf(stderr, "%s is deadlocked\n", pool->ident); - failures++; - goto pool_deadlocked; - } - continue; - } - - // Is the process finished? - pid_t pid = waitpid(slot->pid, &status, WNOHANG | WUNTRACED | WCONTINUED); - int task_ended = WIFEXITED(status); - int task_ended_by_signal = WIFSIGNALED(status); - int task_stopped = WIFSTOPPED(status); - int task_continued = WIFCONTINUED(status); - int status_exit = WEXITSTATUS(status); - int status_signal = WTERMSIG(status); - int status_stopped = WSTOPSIG(status); - - // Update status - slot->status = status_exit; - slot->signaled_by = status_signal; - - char progress[1024] = {0}; - if (pid > 0) { - double percent = ((double) (tasks_complete + 1) / (double) pool->num_used) * 100; - snprintf(progress, sizeof(progress) - 1, "[%s:%s] [%3.1f%%]", pool->ident, slot->ident, percent); - - // The process ended in one the following ways - // Note: SIGSTOP nor SIGCONT will not increment the tasks_complete counter - if (task_stopped) { - printf("%s Task was suspended (%d)\n", progress, status_stopped); - continue; - } else if (task_continued) { - printf("%s Task was resumed\n", progress); - continue; - } else if (task_ended_by_signal) { - printf("%s Task ended by signal %d (%s)\n", progress, status_signal, strsignal(status_signal)); - tasks_complete++; - } else if (task_ended) { - printf("%s Task ended (status: %d)\n", progress, status_exit); - tasks_complete++; - } else { - fprintf(stderr, "%s Task state is unknown (0x%04X)\n", progress, status); - } - - // Show the log (always) - if (show_log_contents(stdout, slot)) { - perror(slot->log_file); - } - - // Record the task stop time - if (clock_gettime(CLOCK_REALTIME, &slot->time_data.t_stop) < 0) { - perror("clock_gettime"); - exit(1); - } - - if (status >> 8 != 0 || (status & 0xff) != 0) { - fprintf(stderr, "%s Task failed\n", progress); - failures++; - - if (flags & MP_POOL_FAIL_FAST && pool->num_used > 1) { - mp_pool_kill(pool, SIGTERM); - return -2; - } - } else { - printf("%s Task finished\n", progress); - } - - // Clean up logs and scripts left behind by the task - if (remove(slot->log_file)) { - fprintf(stderr, "%s Unable to remove log file: '%s': %s\n", progress, slot->parent_script, strerror(errno)); - } - if (remove(slot->parent_script)) { - fprintf(stderr, "%s Unable to remove temporary script '%s': %s\n", progress, slot->parent_script, strerror(errno)); - } - - // Update progress and tell the poller to ignore the PID. The process is gone. - slot->pid = MP_POOL_PID_UNUSED; - } else if (pid < 0) { - fprintf(stderr, "waitpid failed: %s\n", strerror(errno)); - return -1; - } else { - // Track the number of seconds elapsed for each task. - // When a task has executed for longer than status_intervals, print a status update - // _seconds represents the time between intervals, not the total runtime of the task - slot->_seconds = time(NULL) - slot->_now; - if (slot->_seconds > pool->status_interval) { - slot->_now = time(NULL); - slot->_seconds = 0; - } - if (slot->_seconds == 0) { - printf("[%s:%s] Task is running (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); - } - } - } - - if (tasks_complete == pool->num_used) { - break; - } - - if (tasks_complete == upper_i) { - lower_i += jobs; - upper_i += jobs; - } - - // Poll again after a short delay - sleep(1); - } while (1); - - pool_deadlocked: - puts(""); - return failures; -} - - -struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root) { - struct MultiProcessingPool *pool; - - if (!ident || !log_root) { - // Pool must have an ident string - // log_root must be set - return NULL; - } - - // The pool is shared with children - pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); - - // Set pool identity string - memset(pool->ident, 0, sizeof(pool->ident)); - strncpy(pool->ident, ident, sizeof(pool->ident) - 1); - - // Set logging base directory - memset(pool->log_root, 0, sizeof(pool->log_root)); - strncpy(pool->log_root, log_root, sizeof(pool->log_root) - 1); - pool->num_used = 0; - pool->num_alloc = MP_POOL_TASK_MAX; - - // Create the log directory - if (mkdirs(log_root, 0700) < 0) { - if (errno != EEXIST) { - perror(log_root); - mp_pool_free(&pool); - return NULL; - } - } - - // Task array is shared with children - pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); - if (pool->task == MAP_FAILED) { - perror("mmap"); - mp_pool_free(&pool); - return NULL; - } - - return pool; -} - -void mp_pool_free(struct MultiProcessingPool **pool) { - for (size_t i = 0; i < (*pool)->num_alloc; i++) { - } - // Unmap all pool tasks - if ((*pool)->task) { - if ((*pool)->task->cmd) { - if (munmap((*pool)->task->cmd, (*pool)->task->cmd_len) < 0) { - perror("munmap"); - } - } - if (munmap((*pool)->task, sizeof(*(*pool)->task) * (*pool)->num_alloc) < 0) { - perror("munmap"); - } - } - // Unmap the pool - if ((*pool)) { - if (munmap((*pool), sizeof(*(*pool))) < 0) { - perror("munmap"); - } - (*pool) = NULL; - } -}
\ No newline at end of file |