diff options
author | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-13 09:35:34 -0400 |
---|---|---|
committer | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-18 23:06:08 -0400 |
commit | b7251ce3bf65bcbec7ecbb98a0eb0b3c9abde507 (patch) | |
tree | 120eac93d20476ed344b2b188f262ec86f1cc38a | |
parent | 9691ccf51b3efd8113e9620c4afa8b5382d7f161 (diff) | |
download | stasis-b7251ce3bf65bcbec7ecbb98a0eb0b3c9abde507.tar.gz |
Move guard_ macros to core_mem.h
* Move core_mem.h below config.h
-rw-r--r-- | include/core.h | 13 | ||||
-rw-r--r-- | include/core_mem.h | 18 | ||||
-rw-r--r-- | include/multiprocessing.h | 120 | ||||
-rw-r--r-- | src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | src/multiprocessing.c | 373 |
5 files changed, 515 insertions, 10 deletions
diff --git a/include/core.h b/include/core.h index ef90e96..614a894 100644 --- a/include/core.h +++ b/include/core.h @@ -21,6 +21,7 @@ #define HTTP_ERROR(X) X >= 400 #include "config.h" +#include "core_mem.h" #include "envctl.h" #include "template.h" #include "utils.h" @@ -42,16 +43,6 @@ #include "github.h" #include "template_func_proto.h" -#define guard_runtime_free(X) do { if (X) { runtime_free(X); X = NULL; } } while (0) -#define guard_strlist_free(X) do { if ((*X)) { strlist_free(X); (*X) = NULL; } } while (0) -#define guard_free(X) do { if (X) { free(X); X = NULL; } } while (0) -#define GENERIC_ARRAY_FREE(ARR) do { \ - for (size_t ARR_I = 0; ARR && ARR[ARR_I] != NULL; ARR_I++) { \ - guard_free(ARR[ARR_I]); \ - } \ - guard_free(ARR); \ -} while (0) - #define COE_CHECK_ABORT(COND, MSG) \ do {\ if (!globals.continue_on_error && COND) { \ @@ -71,6 +62,8 @@ struct STASIS_GLOBAL { bool enable_testing; //!< Enable package testing bool enable_overwrite; //!< Enable release file clobbering bool enable_rewrite_spec_stage_2; //!< Enable automatic @STR@ replacement in output files + long cpu_limit; + long parallel_fail_fast; struct StrList *conda_packages; //!< Conda packages to install after initial activation struct StrList *pip_packages; //!< Pip packages to install after initial activation char *tmpdir; //!< Path to temporary storage directory diff --git a/include/core_mem.h b/include/core_mem.h new file mode 100644 index 0000000..ef07a00 --- /dev/null +++ b/include/core_mem.h @@ -0,0 +1,18 @@ +// +// Created by jhunk on 9/13/24. +// + +#ifndef STASIS_CORE_MEM_H +#define STASIS_CORE_MEM_H + +#define guard_runtime_free(X) do { if (X) { runtime_free(X); X = NULL; } } while (0) +#define guard_strlist_free(X) do { if ((*X)) { strlist_free(X); (*X) = NULL; } } while (0) +#define guard_free(X) do { if (X) { free(X); X = NULL; } } while (0) +#define GENERIC_ARRAY_FREE(ARR) do { \ + for (size_t ARR_I = 0; ARR && ARR[ARR_I] != NULL; ARR_I++) { \ + guard_free(ARR[ARR_I]); \ + } \ + guard_free(ARR); \ +} while (0) + +#endif //STASIS_CORE_MEM_H diff --git a/include/multiprocessing.h b/include/multiprocessing.h new file mode 100644 index 0000000..6bcf18e --- /dev/null +++ b/include/multiprocessing.h @@ -0,0 +1,120 @@ +/// @file multiprocessing.h +#ifndef STASIS_MULTIPROCESSING_H +#define STASIS_MULTIPROCESSING_H + +#include <stdio.h> +#include <unistd.h> +#include <sys/wait.h> +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <semaphore.h> +#include <sys/mman.h> +#include <time.h> +#include <fcntl.h> +#include <linux/limits.h> +#include <sys/stat.h> +#include <unistd.h> + +struct MultiProcessingTask { + sem_t *gate; ///< Child process startup lock + pid_t pid; ///< Program PID + pid_t parent_pid; ///< Program PID (parent process) + int status; ///< Child process exit status + char ident[NAME_MAX]; ///< Identity of the pool task + char log_file[NAME_MAX]; ///< Path to stdout/stderr log file + char parent_script[PATH_MAX]; ///< Path to temporary script executing the task +}; + +struct MultiProcessingPool { + struct MultiProcessingTask *task; ///< Array of tasks to execute + size_t num_used; ///< Number of tasks populated in the task array + size_t num_alloc; ///< Number of tasks allocated by the task array + const char *ident; ///< Identity of task pool + const char *log_root; ///< Base directory to store stderr/stdout log files +}; + +///!< Maximum number of multiprocessing tasks STASIS can execute +#define MP_POOL_TASK_MAX 1000 + +///!< Value signifies a process is unused or finished executing +#define MP_POOL_PID_UNUSED 0 + +// Option flags for mp_pool_join() +#define MP_POOL_FAIL_FAST 1 << 1 + +/** + * Create a multiprocessing pool + * + * ```c + * #include "multiprocessing.h" + * #include "utils.h" // for get_cpu_count() + * + * int main(int argc, char *argv[]) { + * struct MultiProcessingPool *mp; + * mp = mp_pool_init("mypool", "/tmp/mypool_logs"); + * if (mp) { + * char *commands[] = { + * "/bin/echo hello world", + * "/bin/echo world hello", + * NULL + * } + * for (size_t i = 0; commands[i] != NULL); i++) { + * struct MultiProcessingTask *task; + * char task_name[100]; + * + * sprintf(task_name, "mytask%zu", i); + * task = mp_task(mp, task_name, commands[i]); + * if (!task) { + * // handle task creation error + * } + * } + * if (mp_pool_join(mp, get_cpu_count())) { + * // handle pool execution error + * } + * mp_pool_free(&mp); + * } else { + * // handle pool initialization error + * } + * } + * ``` + * + * @param ident a name to identify the pool + * @param log_root the path to store program output + * @return pointer to initialized MultiProcessingPool + * @return NULL on error + */ +struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root); + +/** + * Create a multiprocessing pool task + * + * @param pool a pointer to MultiProcessingPool + * @param ident a name to identify the task + * @param cmd a command to execute + * @return pointer to MultiProcessingTask structure + * @return NULL on error + */ +struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd); + +/** + * Execute all tasks in a pool + * + * @param pool a pointer to MultiProcessingPool + * @param jobs the number of processes to spawn at once (for serial execution use `1`) + * @param flags option to be OR'd (MP_POOL_FAIL_FAST) + * @return 0 on success + * @return >0 on failure + * @return <0 on error + */ +int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags); + +/** + * Release resources allocated by mp_pool_init() + * + * @param a pointer to MultiProcessingPool + */ +void mp_pool_free(struct MultiProcessingPool **pool); + + +#endif //STASIS_MULTIPROCESSING_H diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2399dc5..5d1c8ff 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(stasis_core STATIC github.c template_func_proto.c envctl.c + multiprocessing.c ) add_executable(stasis diff --git a/src/multiprocessing.c b/src/multiprocessing.c new file mode 100644 index 0000000..09190d3 --- /dev/null +++ b/src/multiprocessing.c @@ -0,0 +1,373 @@ +#include "core_mem.h" +#include "multiprocessing.h" +#include "utils.h" + +size_t mp_global_task_count = 0; +static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) { + return &pool->task[pool->num_used]; +} + +struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { + struct MultiProcessingTask *slot = mp_pool_next_available(pool); + //struct MultiProcessingTask *slot = mp_pool_any_available(pool); + pool->num_used++; + + memset(slot->ident, 0, sizeof(slot->ident)); + strncpy(slot->ident, ident, sizeof(slot->ident) - 1); + + strcat(slot->log_file, pool->log_root); + strcat(slot->log_file, "/"); + + FILE *tp = NULL; + char *t_name; + t_name = xmkstemp(&tp, "w"); + if (!t_name || !tp) { + return NULL; + } + + // Set the script's permissions so that only the calling user can use it + // This should help prevent eavesdropping if keys are applied in plain-text + // somewhere. + chmod(t_name, 0700); + + memset(slot->parent_script, 0, sizeof(slot->parent_script)); + strncpy(slot->parent_script, t_name, PATH_MAX - 1); + + fprintf(tp, "#!/bin/bash\n%s\n", cmd); + fflush(tp); + fclose(tp); + + pid_t pid = fork(); + int child_status = 0; + if (pid == -1) { + return NULL; + } else if (pid == 0) { // child process + char *cwd = NULL; + FILE *fp_log = NULL; + + // Synchronize sub-process startup + // Stop here until summoned by mp_pool_join() + if (sem_wait(slot->gate) < 0) { + perror("sem_wait failed"); + exit(1); + } + + + // Redirect stdout and stderr to the log file + fflush(stdout); + fflush(stderr); + printf("[%s:%s] Task started (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); + fp_log = freopen(slot->log_file, "w+", stdout); + if (!fp_log) { + fprintf(stderr, "unable to open '%s' for writing: %s\n", slot->log_file, strerror(errno)); + return NULL; + } + dup2(fileno(stdout), fileno(stderr)); + + // Generate timestamp for log header + time_t t = time(NULL); + char *timebuf = ctime(&t); + if (timebuf) { + // strip line feed from timestamp + timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0; + } + + cwd = getcwd(NULL, PATH_MAX); + if (!cwd) { + perror(cwd); + exit(1); + } + + // Generate log header + fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); + fprintf(fp_log, "# PID: %d\n", slot->parent_pid); + fprintf(fp_log, "# WORKDIR: %s\n", cwd); + fprintf(fp_log, "# COMMAND:\n%s\n", cmd); + fprintf(fp_log, "# OUTPUT:\n"); + // Commit header to log file / clean up + fflush(fp_log); + guard_free(cwd); + + // Execute task + fflush(stdout); + fflush(stderr); + char *args[] = {"bash", "--norc", t_name, (char *) NULL}; + slot->status = execvp("/bin/bash", args); + } else { // parent process + printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, slot->ident, pid); + + // Give the child process access to our PID value + slot->pid = pid; + slot->parent_pid = pid; + + // Set log file name + sprintf(slot->log_file + strlen(slot->log_file), "task-%zu-%d.log", mp_global_task_count, slot->parent_pid); + mp_global_task_count++; + + // Check child's status + pid_t code = waitpid(pid, &child_status, WUNTRACED | WCONTINUED | WNOHANG); + if (code < 0) { + perror("waitpid failed"); + return NULL; + } + return slot; + } + return NULL; +} + +static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) { + FILE *fp = fopen(task->log_file, "r"); + if (!fp) { + return -1; + } + char buf[BUFSIZ] = {0}; + while ((fgets(buf, sizeof(buf) - 1, fp)) != NULL) { + fprintf(stream, "%s", buf); + memset(buf, 0, sizeof(buf)); + } + fprintf(stream, "\n"); + return 0; +} + +int mp_pool_kill(struct MultiProcessingPool *pool, int signum) { + printf("Sending signal %d to pool '%s'\n", signum, pool->ident); + //for (size_t i = 0; i < pool->num_alloc; i++) { + for (size_t i = 0; i < pool->num_used; i++) { + struct MultiProcessingTask *slot = &pool->task[i]; + if (!slot) { + return -1; + } + if (slot->pid > 0) { + int status; + printf("Sending signal %d to task '%s' (pid: %d)\n", signum, slot->ident, slot->pid); + status = kill(slot->pid, signum); + if (status && errno != ESRCH) { + fprintf(stderr, "Task '%s' (pid: %d) did not respond: %s\n", slot->ident, slot->pid, strerror(errno)); + } + } + if (!access(slot->log_file, F_OK)) { + remove(slot->log_file); + } + if (!access(slot->parent_script, F_OK)) { + remove(slot->parent_script); + } + } + return 0; +} + +int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { + int status = 0; + int watcher = 0; + int failures = 0; + size_t tasks_complete = 0; + size_t lower_i = 0; + size_t upper_i = jobs; + + do { + //if (upper_i >= pool->num_alloc) { + if (upper_i >= pool->num_used) { + size_t x = upper_i - pool->num_used; + //size_t x = upper_i - pool->num_alloc; + upper_i -= (size_t) x; + } + for (size_t i = lower_i; i < upper_i; i++) { + struct MultiProcessingTask *slot = &pool->task[i]; + // Unlock the semaphore to allow the queued processes to start up + if (sem_post(slot->gate) < 0) { + perror("sem_post failed"); + exit(1); + } + + if (slot->pid == MP_POOL_PID_UNUSED) { + // PID is already used up, skip it + continue; + } + + // Is the process finished? + pid_t pid = waitpid(slot->pid, &status, WNOHANG | WUNTRACED | WCONTINUED); + + // Update status + slot->status = status; + + char progress[1024] = {0}; + if (pid > 0) { + double percent = ((double) (tasks_complete) / (double) pool->num_used) * 100; + //double percent = ((double) (tasks_complete) / (double) pool->num_alloc) * 100; + sprintf(progress, "[%s:%s] [%3.1f%%]", pool->ident, slot->ident, percent); + + // The process ended in one the following ways + // Note: SIGSTOP nor SIGCONT will not increment the tasks_complete counter + if (WIFEXITED(status)) { + printf("%s Task finished (status: %d)\n", progress, WEXITSTATUS(status)); + tasks_complete++; + } else if (WIFSIGNALED(status)) { + printf("%s Task ended by signal %d (%s)\n", progress, WTERMSIG(status), strsignal(WTERMSIG(status))); + tasks_complete++; + } else if (WIFSTOPPED(status)) { + printf("%s Task was suspended (%d)\n", progress, WSTOPSIG(status)); + continue; + } else if (WIFCONTINUED(status)) { + printf("%s Task was resumed\n", progress); + continue; + } else { + fprintf(stderr, "%s Task state is unknown (0x%04X)\n", progress, status); + } + + // Show the log (always) + if (show_log_contents(stdout, slot)) { + perror(slot->log_file); + } + if (status >> 8 != 0 || (status & 0xff) != 0) { + fprintf(stderr, "%s Task failed\n", progress); + if (flags & MP_POOL_FAIL_FAST) { + mp_pool_kill(pool, SIGTERM); + return -2; + } + } else { + printf("%s Task finished\n", progress); + } + + // Clean up logs and scripts left behind by the task + if (remove(slot->log_file)) { + fprintf(stderr, "%s Unable to remove log file: '%s': %s\n", progress, slot->parent_script, strerror(errno)); + } + if (remove(slot->parent_script)) { + fprintf(stderr, "%s Unable to remove temporary script '%s': %s\n", progress, slot->parent_script, strerror(errno)); + } + + // Update progress and tell the poller to ignore the PID. The process is gone. + slot->pid = MP_POOL_PID_UNUSED; + failures += status; + } else if (pid < 0) { + fprintf(stderr, "waitpid failed: %s\n", strerror(errno)); + return -1; + } else { + if (watcher > 9) { + printf("[%s:%s] Task is running (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); + watcher = 0; + } else { + watcher++; + } + } + } + + if (tasks_complete == pool->num_used) { + break; + } + + if (tasks_complete == upper_i) { + lower_i += jobs; + upper_i += jobs; + } + + // Poll again after a short delay + sleep(1); + } while (1); + + puts(""); + return failures; +} + + +struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root) { + struct MultiProcessingPool *pool; + + if (!log_root) { + // log_root must be set + return NULL; + } + + //pool = malloc(1 * sizeof(*pool)); + pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); + pool->ident = ident; + pool->log_root = log_root; + pool->num_used = 0; + pool->num_alloc = MP_POOL_TASK_MAX; + + if (mkdirs(log_root, 0700) < 0) { + if (errno != EEXIST) { + perror(log_root); + mp_pool_free(&pool); + return NULL; + } + } + + //pool->task = calloc(pool->num_alloc + 1, sizeof(*pool->task)); + pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); + if (pool->task == MAP_FAILED) { + perror("mmap"); + mp_pool_free(&pool); + return NULL; + } + + for (size_t i = 0; i < pool->num_alloc; i++) { + struct MultiProcessingTask *slot = &pool->task[i]; + slot->gate = mmap(NULL, sizeof(*slot->gate), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); + if (slot->gate == MAP_FAILED) { + perror("mmap failed"); + exit(1); + } + + if (sem_init(slot->gate, 1, 0) < 0) { + perror("sem_init failed"); + exit(1); + } + } + + + return pool; +} + +void mp_pool_free(struct MultiProcessingPool **pool) { + //for (size_t i = 0; i < (*pool)->num_used; i++) { + for (size_t i = 0; i < (*pool)->num_alloc; i++) { + if ((*pool)->task[i].gate) { + if (sem_destroy((*pool)->task[i].gate) < 0) { + perror("sem_destroy failed"); + exit(1); + } + if (munmap((*pool)->task[i].gate, sizeof(*(*pool)->task[i].gate)) < 0) { + perror("munmap"); + exit(1); + } + } + } + if ((*pool)->task) { + if (munmap((*pool)->task, sizeof(*(*pool)->task) * (*pool)->num_alloc) < 0) { + perror("munmap"); + } + } + if ((*pool)) { + if (munmap((*pool), sizeof(*(*pool))) < 0) { + perror("munmap"); + } + (*pool) = NULL; + } +} + +int exmain(int argc, char *argv[]) { + size_t i; + struct MultiProcessingPool *pool = mp_pool_init("generic", "logs"); + if (!pool) { + perror("pool init"); + exit(1); + } + + char *commands[] = { + "sleep 2; dd if=/dev/zero of=file.dat bs=1M count=1", + "/bin/echo hello world; sleep 5", + "python -c 'print(1+1)'", + "(for x in {1..10}; do echo $x; sleep 0.5; done)", + "echo stdout >&1; echo stderr >&2; exit 1" + }; + + for (i = 0; i < sizeof(commands) / sizeof(*commands); i++) { + if (mp_task(pool, "commands array", commands[i]) == NULL) { + printf("Too many tasks queued (max: %d)\n", MP_POOL_TASK_MAX); + break; + } + } + mp_pool_join(pool, get_cpu_count() - 1, MP_POOL_FAIL_FAST); + mp_pool_free(&pool); + return 0; +} |