diff options
author | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-13 09:35:34 -0400 |
---|---|---|
committer | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-18 23:06:08 -0400 |
commit | b7251ce3bf65bcbec7ecbb98a0eb0b3c9abde507 (patch) | |
tree | 120eac93d20476ed344b2b188f262ec86f1cc38a /include | |
parent | 9691ccf51b3efd8113e9620c4afa8b5382d7f161 (diff) | |
download | stasis-b7251ce3bf65bcbec7ecbb98a0eb0b3c9abde507.tar.gz |
Move guard_ macros to core_mem.h
* Move core_mem.h below config.h
Diffstat (limited to 'include')
-rw-r--r-- | include/core.h | 13 | ||||
-rw-r--r-- | include/core_mem.h | 18 | ||||
-rw-r--r-- | include/multiprocessing.h | 120 |
3 files changed, 141 insertions, 10 deletions
diff --git a/include/core.h b/include/core.h index ef90e96..614a894 100644 --- a/include/core.h +++ b/include/core.h @@ -21,6 +21,7 @@ #define HTTP_ERROR(X) X >= 400 #include "config.h" +#include "core_mem.h" #include "envctl.h" #include "template.h" #include "utils.h" @@ -42,16 +43,6 @@ #include "github.h" #include "template_func_proto.h" -#define guard_runtime_free(X) do { if (X) { runtime_free(X); X = NULL; } } while (0) -#define guard_strlist_free(X) do { if ((*X)) { strlist_free(X); (*X) = NULL; } } while (0) -#define guard_free(X) do { if (X) { free(X); X = NULL; } } while (0) -#define GENERIC_ARRAY_FREE(ARR) do { \ - for (size_t ARR_I = 0; ARR && ARR[ARR_I] != NULL; ARR_I++) { \ - guard_free(ARR[ARR_I]); \ - } \ - guard_free(ARR); \ -} while (0) - #define COE_CHECK_ABORT(COND, MSG) \ do {\ if (!globals.continue_on_error && COND) { \ @@ -71,6 +62,8 @@ struct STASIS_GLOBAL { bool enable_testing; //!< Enable package testing bool enable_overwrite; //!< Enable release file clobbering bool enable_rewrite_spec_stage_2; //!< Enable automatic @STR@ replacement in output files + long cpu_limit; + long parallel_fail_fast; struct StrList *conda_packages; //!< Conda packages to install after initial activation struct StrList *pip_packages; //!< Pip packages to install after initial activation char *tmpdir; //!< Path to temporary storage directory diff --git a/include/core_mem.h b/include/core_mem.h new file mode 100644 index 0000000..ef07a00 --- /dev/null +++ b/include/core_mem.h @@ -0,0 +1,18 @@ +// +// Created by jhunk on 9/13/24. +// + +#ifndef STASIS_CORE_MEM_H +#define STASIS_CORE_MEM_H + +#define guard_runtime_free(X) do { if (X) { runtime_free(X); X = NULL; } } while (0) +#define guard_strlist_free(X) do { if ((*X)) { strlist_free(X); (*X) = NULL; } } while (0) +#define guard_free(X) do { if (X) { free(X); X = NULL; } } while (0) +#define GENERIC_ARRAY_FREE(ARR) do { \ + for (size_t ARR_I = 0; ARR && ARR[ARR_I] != NULL; ARR_I++) { \ + guard_free(ARR[ARR_I]); \ + } \ + guard_free(ARR); \ +} while (0) + +#endif //STASIS_CORE_MEM_H diff --git a/include/multiprocessing.h b/include/multiprocessing.h new file mode 100644 index 0000000..6bcf18e --- /dev/null +++ b/include/multiprocessing.h @@ -0,0 +1,120 @@ +/// @file multiprocessing.h +#ifndef STASIS_MULTIPROCESSING_H +#define STASIS_MULTIPROCESSING_H + +#include <stdio.h> +#include <unistd.h> +#include <sys/wait.h> +#include <errno.h> +#include <stdlib.h> +#include <string.h> +#include <semaphore.h> +#include <sys/mman.h> +#include <time.h> +#include <fcntl.h> +#include <linux/limits.h> +#include <sys/stat.h> +#include <unistd.h> + +struct MultiProcessingTask { + sem_t *gate; ///< Child process startup lock + pid_t pid; ///< Program PID + pid_t parent_pid; ///< Program PID (parent process) + int status; ///< Child process exit status + char ident[NAME_MAX]; ///< Identity of the pool task + char log_file[NAME_MAX]; ///< Path to stdout/stderr log file + char parent_script[PATH_MAX]; ///< Path to temporary script executing the task +}; + +struct MultiProcessingPool { + struct MultiProcessingTask *task; ///< Array of tasks to execute + size_t num_used; ///< Number of tasks populated in the task array + size_t num_alloc; ///< Number of tasks allocated by the task array + const char *ident; ///< Identity of task pool + const char *log_root; ///< Base directory to store stderr/stdout log files +}; + +///!< Maximum number of multiprocessing tasks STASIS can execute +#define MP_POOL_TASK_MAX 1000 + +///!< Value signifies a process is unused or finished executing +#define MP_POOL_PID_UNUSED 0 + +// Option flags for mp_pool_join() +#define MP_POOL_FAIL_FAST 1 << 1 + +/** + * Create a multiprocessing pool + * + * ```c + * #include "multiprocessing.h" + * #include "utils.h" // for get_cpu_count() + * + * int main(int argc, char *argv[]) { + * struct MultiProcessingPool *mp; + * mp = mp_pool_init("mypool", "/tmp/mypool_logs"); + * if (mp) { + * char *commands[] = { + * "/bin/echo hello world", + * "/bin/echo world hello", + * NULL + * } + * for (size_t i = 0; commands[i] != NULL); i++) { + * struct MultiProcessingTask *task; + * char task_name[100]; + * + * sprintf(task_name, "mytask%zu", i); + * task = mp_task(mp, task_name, commands[i]); + * if (!task) { + * // handle task creation error + * } + * } + * if (mp_pool_join(mp, get_cpu_count())) { + * // handle pool execution error + * } + * mp_pool_free(&mp); + * } else { + * // handle pool initialization error + * } + * } + * ``` + * + * @param ident a name to identify the pool + * @param log_root the path to store program output + * @return pointer to initialized MultiProcessingPool + * @return NULL on error + */ +struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root); + +/** + * Create a multiprocessing pool task + * + * @param pool a pointer to MultiProcessingPool + * @param ident a name to identify the task + * @param cmd a command to execute + * @return pointer to MultiProcessingTask structure + * @return NULL on error + */ +struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd); + +/** + * Execute all tasks in a pool + * + * @param pool a pointer to MultiProcessingPool + * @param jobs the number of processes to spawn at once (for serial execution use `1`) + * @param flags option to be OR'd (MP_POOL_FAIL_FAST) + * @return 0 on success + * @return >0 on failure + * @return <0 on error + */ +int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags); + +/** + * Release resources allocated by mp_pool_init() + * + * @param a pointer to MultiProcessingPool + */ +void mp_pool_free(struct MultiProcessingPool **pool); + + +#endif //STASIS_MULTIPROCESSING_H |