diff options
| author | Joseph Hunkeler <jhunkeler@users.noreply.github.com> | 2024-10-04 08:40:39 -0400 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-10-04 08:40:39 -0400 | 
| commit | d7e3deba72703ad36c497f5becf6772ca00a0d6d (patch) | |
| tree | eff3b2ec3dcc31126041529c8e00a714997f2d7b /include | |
| parent | 9691ccf51b3efd8113e9620c4afa8b5382d7f161 (diff) | |
| parent | f0ba8cd378a460f927644e41f49be95d0e956f81 (diff) | |
| download | stasis-d7e3deba72703ad36c497f5becf6772ca00a0d6d.tar.gz | |
Merge pull request #46 from jhunkeler/split-delivery-code
Add multiprocessing / Split delivery code
Diffstat (limited to 'include')
| -rw-r--r-- | include/core.h | 16 | ||||
| -rw-r--r-- | include/core_mem.h | 18 | ||||
| -rw-r--r-- | include/delivery.h | 28 | ||||
| -rw-r--r-- | include/multiprocessing.h | 131 | ||||
| -rw-r--r-- | include/template_func_proto.h | 1 | 
5 files changed, 181 insertions, 13 deletions
| diff --git a/include/core.h b/include/core.h index ef90e96..e09b212 100644 --- a/include/core.h +++ b/include/core.h @@ -21,6 +21,8 @@  #define HTTP_ERROR(X) X >= 400  #include "config.h" +#include "core_mem.h" +#include "multiprocessing.h"  #include "envctl.h"  #include "template.h"  #include "utils.h" @@ -42,16 +44,6 @@  #include "github.h"  #include "template_func_proto.h" -#define guard_runtime_free(X) do { if (X) { runtime_free(X); X = NULL; } } while (0) -#define guard_strlist_free(X) do { if ((*X)) { strlist_free(X); (*X) = NULL; } } while (0) -#define guard_free(X) do { if (X) { free(X); X = NULL; } } while (0) -#define GENERIC_ARRAY_FREE(ARR) do { \ -    for (size_t ARR_I = 0; ARR && ARR[ARR_I] != NULL; ARR_I++) { \ -        guard_free(ARR[ARR_I]); \ -    } \ -    guard_free(ARR); \ -} while (0) -  #define COE_CHECK_ABORT(COND, MSG) \      do {\          if (!globals.continue_on_error && COND) { \ @@ -71,6 +63,10 @@ struct STASIS_GLOBAL {      bool enable_testing; //!< Enable package testing      bool enable_overwrite; //!< Enable release file clobbering      bool enable_rewrite_spec_stage_2; //!< Enable automatic @STR@ replacement in output files +    bool enable_parallel; //!< Enable testing in parallel +    long cpu_limit; //!< Limit parallel processing to n cores (default: max - 1) +    long parallel_fail_fast; //!< Fail immediately on error +    int pool_status_interval; //!< Report "Task is running" every n seconds      struct StrList *conda_packages; //!< Conda packages to install after initial activation      struct StrList *pip_packages; //!< Pip packages to install after initial activation      char *tmpdir; //!< Path to temporary storage directory diff --git a/include/core_mem.h b/include/core_mem.h new file mode 100644 index 0000000..ef07a00 --- /dev/null +++ b/include/core_mem.h @@ -0,0 +1,18 @@ +// +// Created by jhunk on 9/13/24. +// + +#ifndef STASIS_CORE_MEM_H +#define STASIS_CORE_MEM_H + +#define guard_runtime_free(X) do { if (X) { runtime_free(X); X = NULL; } } while (0) +#define guard_strlist_free(X) do { if ((*X)) { strlist_free(X); (*X) = NULL; } } while (0) +#define guard_free(X) do { if (X) { free(X); X = NULL; } } while (0) +#define GENERIC_ARRAY_FREE(ARR) do { \ +    for (size_t ARR_I = 0; ARR && ARR[ARR_I] != NULL; ARR_I++) { \ +        guard_free(ARR[ARR_I]); \ +    } \ +    guard_free(ARR); \ +} while (0) + +#endif //STASIS_CORE_MEM_H diff --git a/include/delivery.h b/include/delivery.h index 067cd0b..6da890a 100644 --- a/include/delivery.h +++ b/include/delivery.h @@ -7,6 +7,8 @@  #include <stdbool.h>  #include <unistd.h>  #include <sys/utsname.h> +#include <fnmatch.h> +#include <sys/statvfs.h>  #include "core.h"  #define DELIVERY_PLATFORM_MAX 4 @@ -149,7 +151,10 @@ struct Delivery {          char *name;                     ///< Name of package          char *version;                  ///< Version of package          char *repository;               ///< Git repository of package +        char *script_setup;             ///< Commands to execute before the main script          char *script;                   ///< Commands to execute +        bool disable;                   ///< Toggle a test block +        bool parallel;                  ///< Toggle parallel or serial execution          char *build_recipe;             ///< Conda recipe to build (optional)          char *repository_info_ref;      ///< Git commit hash          char *repository_info_tag;      ///< Git tag (first parent) @@ -286,7 +291,7 @@ int delivery_copy_conda_artifacts(struct Delivery *ctx);   * Retrieve Conda installer   * @param installer_url URL to installation script   */ -int delivery_get_installer(struct Delivery *ctx, char *installer_url); +int delivery_get_conda_installer(struct Delivery *ctx, char *installer_url);  /**   * Generate URL based on Delivery context @@ -294,7 +299,7 @@ int delivery_get_installer(struct Delivery *ctx, char *installer_url);   * @param result pointer to char   * @return in result   */ -void delivery_get_installer_url(struct Delivery *ctx, char *result); +void delivery_get_conda_installer_url(struct Delivery *ctx, char *result);  /**   * Install packages based on Delivery context @@ -376,6 +381,12 @@ void delivery_gather_tool_versions(struct Delivery *ctx);  // helper function  int delivery_init_tmpdir(struct Delivery *ctx); +void delivery_init_dirs_stage1(struct Delivery *ctx); + +void delivery_init_dirs_stage2(struct Delivery *ctx); + +int delivery_init_platform(struct Delivery *ctx); +  int delivery_init_artifactory(struct Delivery *ctx);  int delivery_artifact_upload(struct Delivery *ctx); @@ -386,10 +397,21 @@ int delivery_docker(struct Delivery *ctx);  int delivery_fixup_test_results(struct Delivery *ctx); -int *bootstrap_build_info(struct Delivery *ctx); +int bootstrap_build_info(struct Delivery *ctx);  int delivery_dump_metadata(struct Delivery *ctx); +int populate_info(struct Delivery *ctx); + +int populate_delivery_cfg(struct Delivery *ctx, int render_mode); + +int populate_delivery_ini(struct Delivery *ctx, int render_mode); + +int populate_mission_ini(struct Delivery **ctx, int render_mode); + +void validate_delivery_ini(struct INIFILE *ini); + +int filter_repo_tags(char *repo, struct StrList *patterns);  /**   * Determine whether a release on-disk matches the release name in use   * @param ctx Delivery context diff --git a/include/multiprocessing.h b/include/multiprocessing.h new file mode 100644 index 0000000..5919462 --- /dev/null +++ b/include/multiprocessing.h @@ -0,0 +1,131 @@ +/// @file multiprocessing.h +#ifndef STASIS_MULTIPROCESSING_H +#define STASIS_MULTIPROCESSING_H + +#include "core.h" +#include <signal.h> +#include <sys/wait.h> +#include <semaphore.h> +#include <sys/mman.h> +#include <fcntl.h> +#include <sys/stat.h> + +struct MultiProcessingTask { +    pid_t pid; ///< Program PID +    pid_t parent_pid; ///< Program PID (parent process) +    int status; ///< Child process exit status +    int signaled_by; ///< Last signal received, if any +    time_t _now; ///< Current time +    time_t _seconds; ///< Time elapsed (used by MultiprocessingPool.status_interval) +    char ident[255]; ///< Identity of the pool task +    char *cmd; ///< Shell command(s) to be executed +    size_t cmd_len; ///< Length of command string (for mmap/munmap) +    char working_dir[PATH_MAX]; ///< Path to directory `cmd` should be executed in +    char log_file[PATH_MAX]; ///< Full path to stdout/stderr log file +    char parent_script[PATH_MAX]; ///< Path to temporary script executing the task +    struct { +        struct timespec t_start; +        struct timespec t_stop; +    } time_data; ///< Wall-time counters +}; + +struct MultiProcessingPool { +    struct MultiProcessingTask *task; ///< Array of tasks to execute +    size_t num_used; ///< Number of tasks populated in the task array +    size_t num_alloc; ///< Number of tasks allocated by the task array +    char ident[255]; ///< Identity of task pool +    char log_root[PATH_MAX]; ///< Base directory to store stderr/stdout log files +    int status_interval; ///< Report a pooled task is "running" every n seconds +}; + +/// Maximum number of multiprocessing tasks STASIS can execute +#define MP_POOL_TASK_MAX 1000 + +/// Value signifies a process is unused or finished executing +#define MP_POOL_PID_UNUSED 0 + +/// Option flags for mp_pool_join() +#define MP_POOL_FAIL_FAST 1 << 1 + +/** + * Create a multiprocessing pool + * + * ```c + * #include "multiprocessing.h" + * #include "utils.h" // for get_cpu_count() + * + * int main(int argc, char *argv[]) { + *     struct MultiProcessingPool *mp; + *     mp = mp_pool_init("mypool", "/tmp/mypool_logs"); + *     if (mp) { + *         char *commands[] = { + *             "/bin/echo hello world", + *             "/bin/echo world hello", + *             NULL + *         } + *         for (size_t i = 0; commands[i] != NULL); i++) { + *             struct MultiProcessingTask *task; + *             char task_name[100]; + * + *             sprintf(task_name, "mytask%zu", i); + *             task = mp_task(mp, task_name, commands[i]); + *             if (!task) { + *                 // handle task creation error + *             } + *         } + *         if (mp_pool_join(mp, get_cpu_count(), MP_POOL_FAIL_FAST)) { + *             // handle pool execution error + *         } + *         mp_pool_free(&mp); + *     } else { + *         // handle pool initialization error + *     } + * } + * ``` + * + * @param ident a name to identify the pool + * @param log_root the path to store program output + * @return pointer to initialized MultiProcessingPool + * @return NULL on error + */ +struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root); + +/** + * Create a multiprocessing pool task + * + * @param pool a pointer to MultiProcessingPool + * @param ident a name to identify the task + * @param cmd a command to execute + * @return pointer to MultiProcessingTask structure + * @return NULL on error + */ +struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const char *ident, char *working_dir, char *cmd); + +/** + * Execute all tasks in a pool + * + * @param pool a pointer to MultiProcessingPool + * @param jobs the number of processes to spawn at once (for serial execution use `1`) + * @param flags option to be OR'd (MP_POOL_FAIL_FAST) + * @return 0 on success + * @return >0 on failure + * @return <0 on error + */ +int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags); + +/** + * Show summary of pool tasks + * + * @param pool a pointer to MultiProcessingPool + */ +void mp_pool_show_summary(struct MultiProcessingPool *pool); + +/** + * Release resources allocated by mp_pool_init() + * + * @param a pointer to MultiProcessingPool + */ +void mp_pool_free(struct MultiProcessingPool **pool); + + +#endif //STASIS_MULTIPROCESSING_H diff --git a/include/template_func_proto.h b/include/template_func_proto.h index 7778a11..0c8bbb7 100644 --- a/include/template_func_proto.h +++ b/include/template_func_proto.h @@ -7,5 +7,6 @@ int get_github_release_notes_tplfunc_entrypoint(void *frame, void *data_out);  int get_github_release_notes_auto_tplfunc_entrypoint(void *frame, void *data_out);  int get_junitxml_file_entrypoint(void *frame, void *data_out);  int get_basetemp_dir_entrypoint(void *frame, void *data_out); +int tox_run_entrypoint(void *frame, void *data_out);  #endif //TEMPLATE_FUNC_PROTO_H
\ No newline at end of file | 
