aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoseph Hunkeler <jhunkeler@gmail.com>2024-09-13 09:35:34 -0400
committerJoseph Hunkeler <jhunkeler@gmail.com>2024-09-18 23:06:08 -0400
commitb7251ce3bf65bcbec7ecbb98a0eb0b3c9abde507 (patch)
tree120eac93d20476ed344b2b188f262ec86f1cc38a
parent9691ccf51b3efd8113e9620c4afa8b5382d7f161 (diff)
downloadstasis-b7251ce3bf65bcbec7ecbb98a0eb0b3c9abde507.tar.gz
Move guard_ macros to core_mem.h
* Move core_mem.h below config.h
-rw-r--r--include/core.h13
-rw-r--r--include/core_mem.h18
-rw-r--r--include/multiprocessing.h120
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/multiprocessing.c373
5 files changed, 515 insertions, 10 deletions
diff --git a/include/core.h b/include/core.h
index ef90e96..614a894 100644
--- a/include/core.h
+++ b/include/core.h
@@ -21,6 +21,7 @@
#define HTTP_ERROR(X) X >= 400
#include "config.h"
+#include "core_mem.h"
#include "envctl.h"
#include "template.h"
#include "utils.h"
@@ -42,16 +43,6 @@
#include "github.h"
#include "template_func_proto.h"
-#define guard_runtime_free(X) do { if (X) { runtime_free(X); X = NULL; } } while (0)
-#define guard_strlist_free(X) do { if ((*X)) { strlist_free(X); (*X) = NULL; } } while (0)
-#define guard_free(X) do { if (X) { free(X); X = NULL; } } while (0)
-#define GENERIC_ARRAY_FREE(ARR) do { \
- for (size_t ARR_I = 0; ARR && ARR[ARR_I] != NULL; ARR_I++) { \
- guard_free(ARR[ARR_I]); \
- } \
- guard_free(ARR); \
-} while (0)
-
#define COE_CHECK_ABORT(COND, MSG) \
do {\
if (!globals.continue_on_error && COND) { \
@@ -71,6 +62,8 @@ struct STASIS_GLOBAL {
bool enable_testing; //!< Enable package testing
bool enable_overwrite; //!< Enable release file clobbering
bool enable_rewrite_spec_stage_2; //!< Enable automatic @STR@ replacement in output files
+ long cpu_limit;
+ long parallel_fail_fast;
struct StrList *conda_packages; //!< Conda packages to install after initial activation
struct StrList *pip_packages; //!< Pip packages to install after initial activation
char *tmpdir; //!< Path to temporary storage directory
diff --git a/include/core_mem.h b/include/core_mem.h
new file mode 100644
index 0000000..ef07a00
--- /dev/null
+++ b/include/core_mem.h
@@ -0,0 +1,18 @@
+//
+// Created by jhunk on 9/13/24.
+//
+
+#ifndef STASIS_CORE_MEM_H
+#define STASIS_CORE_MEM_H
+
+#define guard_runtime_free(X) do { if (X) { runtime_free(X); X = NULL; } } while (0)
+#define guard_strlist_free(X) do { if ((*X)) { strlist_free(X); (*X) = NULL; } } while (0)
+#define guard_free(X) do { if (X) { free(X); X = NULL; } } while (0)
+#define GENERIC_ARRAY_FREE(ARR) do { \
+ for (size_t ARR_I = 0; ARR && ARR[ARR_I] != NULL; ARR_I++) { \
+ guard_free(ARR[ARR_I]); \
+ } \
+ guard_free(ARR); \
+} while (0)
+
+#endif //STASIS_CORE_MEM_H
diff --git a/include/multiprocessing.h b/include/multiprocessing.h
new file mode 100644
index 0000000..6bcf18e
--- /dev/null
+++ b/include/multiprocessing.h
@@ -0,0 +1,120 @@
+/// @file multiprocessing.h
+#ifndef STASIS_MULTIPROCESSING_H
+#define STASIS_MULTIPROCESSING_H
+
+#include <stdio.h>
+#include <unistd.h>
+#include <sys/wait.h>
+#include <errno.h>
+#include <stdlib.h>
+#include <string.h>
+#include <semaphore.h>
+#include <sys/mman.h>
+#include <time.h>
+#include <fcntl.h>
+#include <linux/limits.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+struct MultiProcessingTask {
+ sem_t *gate; ///< Child process startup lock
+ pid_t pid; ///< Program PID
+ pid_t parent_pid; ///< Program PID (parent process)
+ int status; ///< Child process exit status
+ char ident[NAME_MAX]; ///< Identity of the pool task
+ char log_file[NAME_MAX]; ///< Path to stdout/stderr log file
+ char parent_script[PATH_MAX]; ///< Path to temporary script executing the task
+};
+
+struct MultiProcessingPool {
+ struct MultiProcessingTask *task; ///< Array of tasks to execute
+ size_t num_used; ///< Number of tasks populated in the task array
+ size_t num_alloc; ///< Number of tasks allocated by the task array
+ const char *ident; ///< Identity of task pool
+ const char *log_root; ///< Base directory to store stderr/stdout log files
+};
+
+///!< Maximum number of multiprocessing tasks STASIS can execute
+#define MP_POOL_TASK_MAX 1000
+
+///!< Value signifies a process is unused or finished executing
+#define MP_POOL_PID_UNUSED 0
+
+// Option flags for mp_pool_join()
+#define MP_POOL_FAIL_FAST 1 << 1
+
+/**
+ * Create a multiprocessing pool
+ *
+ * ```c
+ * #include "multiprocessing.h"
+ * #include "utils.h" // for get_cpu_count()
+ *
+ * int main(int argc, char *argv[]) {
+ * struct MultiProcessingPool *mp;
+ * mp = mp_pool_init("mypool", "/tmp/mypool_logs");
+ * if (mp) {
+ * char *commands[] = {
+ * "/bin/echo hello world",
+ * "/bin/echo world hello",
+ * NULL
+ * }
+ * for (size_t i = 0; commands[i] != NULL); i++) {
+ * struct MultiProcessingTask *task;
+ * char task_name[100];
+ *
+ * sprintf(task_name, "mytask%zu", i);
+ * task = mp_task(mp, task_name, commands[i]);
+ * if (!task) {
+ * // handle task creation error
+ * }
+ * }
+ * if (mp_pool_join(mp, get_cpu_count())) {
+ * // handle pool execution error
+ * }
+ * mp_pool_free(&mp);
+ * } else {
+ * // handle pool initialization error
+ * }
+ * }
+ * ```
+ *
+ * @param ident a name to identify the pool
+ * @param log_root the path to store program output
+ * @return pointer to initialized MultiProcessingPool
+ * @return NULL on error
+ */
+struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root);
+
+/**
+ * Create a multiprocessing pool task
+ *
+ * @param pool a pointer to MultiProcessingPool
+ * @param ident a name to identify the task
+ * @param cmd a command to execute
+ * @return pointer to MultiProcessingTask structure
+ * @return NULL on error
+ */
+struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd);
+
+/**
+ * Execute all tasks in a pool
+ *
+ * @param pool a pointer to MultiProcessingPool
+ * @param jobs the number of processes to spawn at once (for serial execution use `1`)
+ * @param flags option to be OR'd (MP_POOL_FAIL_FAST)
+ * @return 0 on success
+ * @return >0 on failure
+ * @return <0 on error
+ */
+int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags);
+
+/**
+ * Release resources allocated by mp_pool_init()
+ *
+ * @param a pointer to MultiProcessingPool
+ */
+void mp_pool_free(struct MultiProcessingPool **pool);
+
+
+#endif //STASIS_MULTIPROCESSING_H
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 2399dc5..5d1c8ff 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -25,6 +25,7 @@ add_library(stasis_core STATIC
github.c
template_func_proto.c
envctl.c
+ multiprocessing.c
)
add_executable(stasis
diff --git a/src/multiprocessing.c b/src/multiprocessing.c
new file mode 100644
index 0000000..09190d3
--- /dev/null
+++ b/src/multiprocessing.c
@@ -0,0 +1,373 @@
+#include "core_mem.h"
+#include "multiprocessing.h"
+#include "utils.h"
+
+size_t mp_global_task_count = 0;
+static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) {
+ return &pool->task[pool->num_used];
+}
+
+struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) {
+ struct MultiProcessingTask *slot = mp_pool_next_available(pool);
+ //struct MultiProcessingTask *slot = mp_pool_any_available(pool);
+ pool->num_used++;
+
+ memset(slot->ident, 0, sizeof(slot->ident));
+ strncpy(slot->ident, ident, sizeof(slot->ident) - 1);
+
+ strcat(slot->log_file, pool->log_root);
+ strcat(slot->log_file, "/");
+
+ FILE *tp = NULL;
+ char *t_name;
+ t_name = xmkstemp(&tp, "w");
+ if (!t_name || !tp) {
+ return NULL;
+ }
+
+ // Set the script's permissions so that only the calling user can use it
+ // This should help prevent eavesdropping if keys are applied in plain-text
+ // somewhere.
+ chmod(t_name, 0700);
+
+ memset(slot->parent_script, 0, sizeof(slot->parent_script));
+ strncpy(slot->parent_script, t_name, PATH_MAX - 1);
+
+ fprintf(tp, "#!/bin/bash\n%s\n", cmd);
+ fflush(tp);
+ fclose(tp);
+
+ pid_t pid = fork();
+ int child_status = 0;
+ if (pid == -1) {
+ return NULL;
+ } else if (pid == 0) { // child process
+ char *cwd = NULL;
+ FILE *fp_log = NULL;
+
+ // Synchronize sub-process startup
+ // Stop here until summoned by mp_pool_join()
+ if (sem_wait(slot->gate) < 0) {
+ perror("sem_wait failed");
+ exit(1);
+ }
+
+
+ // Redirect stdout and stderr to the log file
+ fflush(stdout);
+ fflush(stderr);
+ printf("[%s:%s] Task started (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid);
+ fp_log = freopen(slot->log_file, "w+", stdout);
+ if (!fp_log) {
+ fprintf(stderr, "unable to open '%s' for writing: %s\n", slot->log_file, strerror(errno));
+ return NULL;
+ }
+ dup2(fileno(stdout), fileno(stderr));
+
+ // Generate timestamp for log header
+ time_t t = time(NULL);
+ char *timebuf = ctime(&t);
+ if (timebuf) {
+ // strip line feed from timestamp
+ timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0;
+ }
+
+ cwd = getcwd(NULL, PATH_MAX);
+ if (!cwd) {
+ perror(cwd);
+ exit(1);
+ }
+
+ // Generate log header
+ fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown");
+ fprintf(fp_log, "# PID: %d\n", slot->parent_pid);
+ fprintf(fp_log, "# WORKDIR: %s\n", cwd);
+ fprintf(fp_log, "# COMMAND:\n%s\n", cmd);
+ fprintf(fp_log, "# OUTPUT:\n");
+ // Commit header to log file / clean up
+ fflush(fp_log);
+ guard_free(cwd);
+
+ // Execute task
+ fflush(stdout);
+ fflush(stderr);
+ char *args[] = {"bash", "--norc", t_name, (char *) NULL};
+ slot->status = execvp("/bin/bash", args);
+ } else { // parent process
+ printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, slot->ident, pid);
+
+ // Give the child process access to our PID value
+ slot->pid = pid;
+ slot->parent_pid = pid;
+
+ // Set log file name
+ sprintf(slot->log_file + strlen(slot->log_file), "task-%zu-%d.log", mp_global_task_count, slot->parent_pid);
+ mp_global_task_count++;
+
+ // Check child's status
+ pid_t code = waitpid(pid, &child_status, WUNTRACED | WCONTINUED | WNOHANG);
+ if (code < 0) {
+ perror("waitpid failed");
+ return NULL;
+ }
+ return slot;
+ }
+ return NULL;
+}
+
+static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) {
+ FILE *fp = fopen(task->log_file, "r");
+ if (!fp) {
+ return -1;
+ }
+ char buf[BUFSIZ] = {0};
+ while ((fgets(buf, sizeof(buf) - 1, fp)) != NULL) {
+ fprintf(stream, "%s", buf);
+ memset(buf, 0, sizeof(buf));
+ }
+ fprintf(stream, "\n");
+ return 0;
+}
+
+int mp_pool_kill(struct MultiProcessingPool *pool, int signum) {
+ printf("Sending signal %d to pool '%s'\n", signum, pool->ident);
+ //for (size_t i = 0; i < pool->num_alloc; i++) {
+ for (size_t i = 0; i < pool->num_used; i++) {
+ struct MultiProcessingTask *slot = &pool->task[i];
+ if (!slot) {
+ return -1;
+ }
+ if (slot->pid > 0) {
+ int status;
+ printf("Sending signal %d to task '%s' (pid: %d)\n", signum, slot->ident, slot->pid);
+ status = kill(slot->pid, signum);
+ if (status && errno != ESRCH) {
+ fprintf(stderr, "Task '%s' (pid: %d) did not respond: %s\n", slot->ident, slot->pid, strerror(errno));
+ }
+ }
+ if (!access(slot->log_file, F_OK)) {
+ remove(slot->log_file);
+ }
+ if (!access(slot->parent_script, F_OK)) {
+ remove(slot->parent_script);
+ }
+ }
+ return 0;
+}
+
+int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) {
+ int status = 0;
+ int watcher = 0;
+ int failures = 0;
+ size_t tasks_complete = 0;
+ size_t lower_i = 0;
+ size_t upper_i = jobs;
+
+ do {
+ //if (upper_i >= pool->num_alloc) {
+ if (upper_i >= pool->num_used) {
+ size_t x = upper_i - pool->num_used;
+ //size_t x = upper_i - pool->num_alloc;
+ upper_i -= (size_t) x;
+ }
+ for (size_t i = lower_i; i < upper_i; i++) {
+ struct MultiProcessingTask *slot = &pool->task[i];
+ // Unlock the semaphore to allow the queued processes to start up
+ if (sem_post(slot->gate) < 0) {
+ perror("sem_post failed");
+ exit(1);
+ }
+
+ if (slot->pid == MP_POOL_PID_UNUSED) {
+ // PID is already used up, skip it
+ continue;
+ }
+
+ // Is the process finished?
+ pid_t pid = waitpid(slot->pid, &status, WNOHANG | WUNTRACED | WCONTINUED);
+
+ // Update status
+ slot->status = status;
+
+ char progress[1024] = {0};
+ if (pid > 0) {
+ double percent = ((double) (tasks_complete) / (double) pool->num_used) * 100;
+ //double percent = ((double) (tasks_complete) / (double) pool->num_alloc) * 100;
+ sprintf(progress, "[%s:%s] [%3.1f%%]", pool->ident, slot->ident, percent);
+
+ // The process ended in one the following ways
+ // Note: SIGSTOP nor SIGCONT will not increment the tasks_complete counter
+ if (WIFEXITED(status)) {
+ printf("%s Task finished (status: %d)\n", progress, WEXITSTATUS(status));
+ tasks_complete++;
+ } else if (WIFSIGNALED(status)) {
+ printf("%s Task ended by signal %d (%s)\n", progress, WTERMSIG(status), strsignal(WTERMSIG(status)));
+ tasks_complete++;
+ } else if (WIFSTOPPED(status)) {
+ printf("%s Task was suspended (%d)\n", progress, WSTOPSIG(status));
+ continue;
+ } else if (WIFCONTINUED(status)) {
+ printf("%s Task was resumed\n", progress);
+ continue;
+ } else {
+ fprintf(stderr, "%s Task state is unknown (0x%04X)\n", progress, status);
+ }
+
+ // Show the log (always)
+ if (show_log_contents(stdout, slot)) {
+ perror(slot->log_file);
+ }
+ if (status >> 8 != 0 || (status & 0xff) != 0) {
+ fprintf(stderr, "%s Task failed\n", progress);
+ if (flags & MP_POOL_FAIL_FAST) {
+ mp_pool_kill(pool, SIGTERM);
+ return -2;
+ }
+ } else {
+ printf("%s Task finished\n", progress);
+ }
+
+ // Clean up logs and scripts left behind by the task
+ if (remove(slot->log_file)) {
+ fprintf(stderr, "%s Unable to remove log file: '%s': %s\n", progress, slot->parent_script, strerror(errno));
+ }
+ if (remove(slot->parent_script)) {
+ fprintf(stderr, "%s Unable to remove temporary script '%s': %s\n", progress, slot->parent_script, strerror(errno));
+ }
+
+ // Update progress and tell the poller to ignore the PID. The process is gone.
+ slot->pid = MP_POOL_PID_UNUSED;
+ failures += status;
+ } else if (pid < 0) {
+ fprintf(stderr, "waitpid failed: %s\n", strerror(errno));
+ return -1;
+ } else {
+ if (watcher > 9) {
+ printf("[%s:%s] Task is running (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid);
+ watcher = 0;
+ } else {
+ watcher++;
+ }
+ }
+ }
+
+ if (tasks_complete == pool->num_used) {
+ break;
+ }
+
+ if (tasks_complete == upper_i) {
+ lower_i += jobs;
+ upper_i += jobs;
+ }
+
+ // Poll again after a short delay
+ sleep(1);
+ } while (1);
+
+ puts("");
+ return failures;
+}
+
+
+struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root) {
+ struct MultiProcessingPool *pool;
+
+ if (!log_root) {
+ // log_root must be set
+ return NULL;
+ }
+
+ //pool = malloc(1 * sizeof(*pool));
+ pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0);
+ pool->ident = ident;
+ pool->log_root = log_root;
+ pool->num_used = 0;
+ pool->num_alloc = MP_POOL_TASK_MAX;
+
+ if (mkdirs(log_root, 0700) < 0) {
+ if (errno != EEXIST) {
+ perror(log_root);
+ mp_pool_free(&pool);
+ return NULL;
+ }
+ }
+
+ //pool->task = calloc(pool->num_alloc + 1, sizeof(*pool->task));
+ pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0);
+ if (pool->task == MAP_FAILED) {
+ perror("mmap");
+ mp_pool_free(&pool);
+ return NULL;
+ }
+
+ for (size_t i = 0; i < pool->num_alloc; i++) {
+ struct MultiProcessingTask *slot = &pool->task[i];
+ slot->gate = mmap(NULL, sizeof(*slot->gate), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0);
+ if (slot->gate == MAP_FAILED) {
+ perror("mmap failed");
+ exit(1);
+ }
+
+ if (sem_init(slot->gate, 1, 0) < 0) {
+ perror("sem_init failed");
+ exit(1);
+ }
+ }
+
+
+ return pool;
+}
+
+void mp_pool_free(struct MultiProcessingPool **pool) {
+ //for (size_t i = 0; i < (*pool)->num_used; i++) {
+ for (size_t i = 0; i < (*pool)->num_alloc; i++) {
+ if ((*pool)->task[i].gate) {
+ if (sem_destroy((*pool)->task[i].gate) < 0) {
+ perror("sem_destroy failed");
+ exit(1);
+ }
+ if (munmap((*pool)->task[i].gate, sizeof(*(*pool)->task[i].gate)) < 0) {
+ perror("munmap");
+ exit(1);
+ }
+ }
+ }
+ if ((*pool)->task) {
+ if (munmap((*pool)->task, sizeof(*(*pool)->task) * (*pool)->num_alloc) < 0) {
+ perror("munmap");
+ }
+ }
+ if ((*pool)) {
+ if (munmap((*pool), sizeof(*(*pool))) < 0) {
+ perror("munmap");
+ }
+ (*pool) = NULL;
+ }
+}
+
+int exmain(int argc, char *argv[]) {
+ size_t i;
+ struct MultiProcessingPool *pool = mp_pool_init("generic", "logs");
+ if (!pool) {
+ perror("pool init");
+ exit(1);
+ }
+
+ char *commands[] = {
+ "sleep 2; dd if=/dev/zero of=file.dat bs=1M count=1",
+ "/bin/echo hello world; sleep 5",
+ "python -c 'print(1+1)'",
+ "(for x in {1..10}; do echo $x; sleep 0.5; done)",
+ "echo stdout >&1; echo stderr >&2; exit 1"
+ };
+
+ for (i = 0; i < sizeof(commands) / sizeof(*commands); i++) {
+ if (mp_task(pool, "commands array", commands[i]) == NULL) {
+ printf("Too many tasks queued (max: %d)\n", MP_POOL_TASK_MAX);
+ break;
+ }
+ }
+ mp_pool_join(pool, get_cpu_count() - 1, MP_POOL_FAIL_FAST);
+ mp_pool_free(&pool);
+ return 0;
+}