diff options
| author | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-13 09:58:17 -0400 | 
|---|---|---|
| committer | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-18 23:06:08 -0400 | 
| commit | 8f17199d16bcdb29516d34514f95d1a117f6bd26 (patch) | |
| tree | 15b96d2d2ab577472fcf392aea7cc4f15f8de360 /src | |
| parent | b7251ce3bf65bcbec7ecbb98a0eb0b3c9abde507 (diff) | |
| download | stasis-8f17199d16bcdb29516d34514f95d1a117f6bd26.tar.gz | |
Implement multiprocessing pool(s)
* Adds --cpu-limit and --parallel-fail-fast arguments
* Adds disable, parallel, and setup_script keys to [test] blocks
Diffstat (limited to 'src')
| -rw-r--r-- | src/delivery.c | 187 | ||||
| -rw-r--r-- | src/globals.c | 28 | ||||
| -rw-r--r-- | src/stasis_main.c | 27 | ||||
| -rw-r--r-- | src/template_func_proto.c | 37 | 
4 files changed, 216 insertions, 63 deletions
| diff --git a/src/delivery.c b/src/delivery.c index 3e99aad..2718c08 100644 --- a/src/delivery.c +++ b/src/delivery.c @@ -2,6 +2,7 @@  #include <fnmatch.h>  #include "core.h" +#include "multiprocessing.h"  extern struct STASIS_GLOBAL globals; @@ -560,7 +561,13 @@ static int populate_delivery_ini(struct Delivery *ctx, int render_mode) {              test->version = ini_getval_str(ini, section_name, "version", render_mode, &err);              test->repository = ini_getval_str(ini, section_name, "repository", render_mode, &err); +            test->script_setup = ini_getval_str(ini, section_name, "script_setup", INI_READ_RAW, &err);              test->script = ini_getval_str(ini, section_name, "script", INI_READ_RAW, &err); +            test->disable = ini_getval_bool(ini, section_name, "disable", render_mode, &err); +            test->parallel = ini_getval_bool(ini, section_name, "parallel", render_mode, &err); +            if (err) { +                test->parallel = true; +            }              test->repository_remove_tags = ini_getval_strlist(ini, section_name, "repository_remove_tags", LINE_SEP, render_mode, &err);              test->build_recipe = ini_getval_str(ini, section_name, "build_recipe", render_mode, &err);              test->runtime.environ = ini_getval_strlist(ini, section_name, "runtime", LINE_SEP, render_mode, &err); @@ -1702,6 +1709,9 @@ int delivery_index_conda_artifacts(struct Delivery *ctx) {  }  void delivery_tests_run(struct Delivery *ctx) { +    struct MultiProcessingPool *pool_parallel; +    struct MultiProcessingPool *pool_serial; +    struct MultiProcessingPool *pool_setup;      struct Process proc;      memset(&proc, 0, sizeof(proc)); @@ -1715,20 +1725,40 @@ void delivery_tests_run(struct Delivery *ctx) {      if (!ctx->tests[0].name) {          msg(STASIS_MSG_WARN | STASIS_MSG_L2, "no tests are defined!\n");      } else { +        pool_parallel = mp_pool_init("parallel", ctx->storage.tmpdir); +        if (!pool_parallel) { +            perror("mp_pool_init/parallel"); +            exit(1); +        } +         +        pool_serial = mp_pool_init("serial", ctx->storage.tmpdir); +        if (!pool_serial) { +            perror("mp_pool_init/serial"); +            exit(1); +        } + +        pool_setup = mp_pool_init("setup", ctx->storage.tmpdir); +        if (!pool_setup) { +            perror("mp_pool_init/setup"); +            exit(1); +        } + +        const char *runner_cmd_fmt = "set -e -x\n%s\n";          for (size_t i = 0; i < sizeof(ctx->tests) / sizeof(ctx->tests[0]); i++) { -            if (!ctx->tests[i].name && !ctx->tests[i].repository && !ctx->tests[i].script) { +            struct Test *test = &ctx->tests[i]; +            if (!test->name && !test->repository && !test->script) {                  // skip unused test records                  continue;              } -            msg(STASIS_MSG_L2, "Executing tests for %s %s\n", ctx->tests[i].name, ctx->tests[i].version); -            if (!ctx->tests[i].script || !strlen(ctx->tests[i].script)) { +            msg(STASIS_MSG_L2, "Executing tests for %s %s\n", test->name, test->version); +            if (!test->script || !strlen(test->script)) {                  msg(STASIS_MSG_WARN | STASIS_MSG_L3, "Nothing to do. To fix, declare a 'script' in section: [test:%s]\n", -                    ctx->tests[i].name); +                    test->name);                  continue;              }              char destdir[PATH_MAX]; -            sprintf(destdir, "%s/%s", ctx->storage.build_sources_dir, path_basename(ctx->tests[i].repository)); +            sprintf(destdir, "%s/%s", ctx->storage.build_sources_dir, path_basename(test->repository));              if (!access(destdir, F_OK)) {                  msg(STASIS_MSG_L3, "Purging repository %s\n", destdir); @@ -1736,44 +1766,31 @@ void delivery_tests_run(struct Delivery *ctx) {                      COE_CHECK_ABORT(1, "Unable to remove repository\n");                  }              } -            msg(STASIS_MSG_L3, "Cloning repository %s\n", ctx->tests[i].repository); -            if (!git_clone(&proc, ctx->tests[i].repository, destdir, ctx->tests[i].version)) { -                ctx->tests[i].repository_info_tag = strdup(git_describe(destdir)); -                ctx->tests[i].repository_info_ref = strdup(git_rev_parse(destdir, "HEAD")); +            msg(STASIS_MSG_L3, "Cloning repository %s\n", test->repository); +            if (!git_clone(&proc, test->repository, destdir, test->version)) { +                test->repository_info_tag = strdup(git_describe(destdir)); +                test->repository_info_ref = strdup(git_rev_parse(destdir, "HEAD"));              } else {                  COE_CHECK_ABORT(1, "Unable to clone repository\n");              } -            if (ctx->tests[i].repository_remove_tags && strlist_count(ctx->tests[i].repository_remove_tags)) { -                filter_repo_tags(destdir, ctx->tests[i].repository_remove_tags); +            if (test->repository_remove_tags && strlist_count(test->repository_remove_tags)) { +                filter_repo_tags(destdir, test->repository_remove_tags);              }              if (pushd(destdir)) {                  COE_CHECK_ABORT(1, "Unable to enter repository directory\n");              } else { -#if 1 -                int status; -                char *cmd = calloc(strlen(ctx->tests[i].script) + STASIS_BUFSIZ, sizeof(*cmd)); +                char *cmd = calloc(strlen(test->script) + STASIS_BUFSIZ, sizeof(*cmd)); +                if (!cmd) { +                    SYSERROR("Unable to allocate test script buffer: %s", strerror(errno)); +                    exit(1); +                } -                msg(STASIS_MSG_L3, "Testing %s\n", ctx->tests[i].name); +                msg(STASIS_MSG_L3, "Testing %s\n", test->name);                  memset(&proc, 0, sizeof(proc)); -                // Apply workaround for tox positional arguments -                char *toxconf = NULL; -                if (!access("tox.ini", F_OK)) { -                    if (!fix_tox_conf("tox.ini", &toxconf)) { -                        msg(STASIS_MSG_L3, "Fixing tox positional arguments\n"); -                        if (!globals.workaround.tox_posargs) { -                            globals.workaround.tox_posargs = calloc(PATH_MAX, sizeof(*globals.workaround.tox_posargs)); -                        } else { -                            memset(globals.workaround.tox_posargs, 0, PATH_MAX); -                        } -                        snprintf(globals.workaround.tox_posargs, PATH_MAX - 1, "-c %s --root .", toxconf); -                    } -                } - -                // enable trace mode before executing each test script -                strcpy(cmd, ctx->tests[i].script); +                strcpy(cmd, test->script);                  char *cmd_rendered = tpl_render(cmd);                  if (cmd_rendered) {                      if (strcmp(cmd_rendered, cmd) != 0) { @@ -1786,36 +1803,110 @@ void delivery_tests_run(struct Delivery *ctx) {                      exit(1);                  } -                puts(cmd); +                if (test->disable) { +                    msg(STASIS_MSG_L2, "Script execution disabled by configuration\n", test->name); +                    guard_free(cmd); +                    continue; +                } +                  char runner_cmd[0xFFFF] = {0}; -                sprintf(runner_cmd, "set +x\nsource %s/etc/profile.d/conda.sh\nsource %s/etc/profile.d/mamba.sh\n\nmamba activate ${CONDA_DEFAULT_ENV}\n\n%s\n", -                    ctx->storage.conda_install_prefix, -		    ctx->storage.conda_install_prefix, -		    cmd); -                status = shell(&proc, runner_cmd); -                if (status) { -                    msg(STASIS_MSG_ERROR, "Script failure: %s\n%s\n\nExit code: %d\n", ctx->tests[i].name, ctx->tests[i].script, status); +                char pool_name[100] = "parallel"; +                struct MultiProcessingTask *task = NULL; +                struct MultiProcessingPool *pool = pool_parallel; +                if (!test->parallel) { +                    pool = pool_serial; +                    memset(pool_name, 0, sizeof(pool_name)); +                    strcpy(pool_name, "serial"); +                } + +                sprintf(runner_cmd, runner_cmd_fmt, cmd); +                task = mp_task(pool, test->name, runner_cmd); +                if (!task) { +                    SYSERROR("Failed to add task to %s pool: %s", pool_name, runner_cmd);                      popd(); -                    guard_free(cmd);                      if (!globals.continue_on_error) {                          tpl_free();                          delivery_free(ctx);                          globals_free();                      } -                    COE_CHECK_ABORT(1, "Test failure"); +                    exit(1);                  }                  guard_free(cmd); +                popd(); + +            } +        } -                if (toxconf) { -                    remove(toxconf); -                    guard_free(toxconf); +        // Configure "script_setup" tasks +        // Directories should exist now, so no need to go through initializing everything all over again. +        for (size_t i = 0; i < sizeof(ctx->tests) / sizeof(ctx->tests[0]); i++) { +            struct Test *test = &ctx->tests[i]; +            if (test->script_setup) { +                char destdir[PATH_MAX]; +                sprintf(destdir, "%s/%s", ctx->storage.build_sources_dir, path_basename(test->repository)); +                if (access(destdir, F_OK)) { +                    SYSERROR("%s: %s", destdir, strerror(errno)); +                    exit(1); +                } +                if (!pushd(destdir)) { +                    char *cmd = calloc(strlen(test->script_setup) + STASIS_BUFSIZ, sizeof(*cmd)); +                    if (!cmd) { +                        SYSERROR("Unable to allocate test script_setup buffer: %s", strerror(errno)); +                        exit(1); +                    } + +                    strcpy(cmd, test->script_setup); +                    char *cmd_rendered = tpl_render(cmd); +                    if (cmd_rendered) { +                        if (strcmp(cmd_rendered, cmd) != 0) { +                            strcpy(cmd, cmd_rendered); +                            cmd[strlen(cmd_rendered) ? strlen(cmd_rendered) - 1 : 0] = 0; +                        } +                        guard_free(cmd_rendered); +                    } else { +                        SYSERROR("An error occurred while rendering the following:\n%s", cmd); +                        exit(1); +                    } + +                    struct MultiProcessingPool *pool = pool_setup; +                    struct MultiProcessingTask *task = NULL; +                    char runner_cmd[0xFFFF] = {0}; +                    sprintf(runner_cmd, runner_cmd_fmt, cmd); + +                    task = mp_task(pool, test->name, runner_cmd); +                    if (!task) { +                        SYSERROR("Failed to add task %s to setup pool: %s", test->name, runner_cmd); +                        popd(); +                        if (!globals.continue_on_error) { +                            tpl_free(); +                            delivery_free(ctx); +                            globals_free(); +                        } +                        exit(1); +                    } +                    guard_free(cmd); +                    popd();                  } -                popd(); -#else -                msg(STASIS_MSG_WARNING | STASIS_MSG_L3, "TESTING DISABLED BY CODE!\n"); -#endif              }          } + +        size_t opt_flags = 0; +        opt_flags |= globals.parallel_fail_fast; + +        if (pool_setup->num_used) { +            COE_CHECK_ABORT(mp_pool_join(pool_setup, 1, opt_flags) != 0, "Failure in setup task pool"); +            mp_pool_free(&pool_setup); +        } + +        if (pool_parallel->num_used) { +            COE_CHECK_ABORT(mp_pool_join(pool_parallel, globals.cpu_limit, opt_flags) != 0, "Failure in parallel task pool"); +            mp_pool_free(&pool_parallel); +        } + +        if (pool_serial->num_used) { +            COE_CHECK_ABORT(mp_pool_join(pool_serial, 1, opt_flags) != 0, "Failure in serial task pool"); +            mp_pool_free(&pool_serial); +        }      }  } diff --git a/src/globals.c b/src/globals.c index 1e27959..667809b 100644 --- a/src/globals.c +++ b/src/globals.c @@ -25,19 +25,20 @@ const char *BANNER =          "Association of Universities for Research in Astronomy (AURA)\n";  struct STASIS_GLOBAL globals = { -        .verbose = false, -        .continue_on_error = false, -        .always_update_base_environment = false, -        .conda_fresh_start = true, -        .conda_install_prefix = NULL, -        .conda_packages = NULL, -        .pip_packages = NULL, -        .tmpdir = NULL, -        .enable_docker = true, -        .enable_artifactory = true, -        .enable_artifactory_build_info = true, -        .enable_testing = true, -        .enable_rewrite_spec_stage_2 = true, +        .verbose = false, ///< Toggle verbose mode +        .continue_on_error = false, ///< Do not stop program on error +        .always_update_base_environment = false, ///< Run "conda update --all" after installing Conda +        .conda_fresh_start = true, ///< Remove/reinstall Conda at startup +        .conda_install_prefix = NULL, ///< Path to install Conda +        .conda_packages = NULL, ///< Conda packages to install +        .pip_packages = NULL, ///< Python packages to install +        .tmpdir = NULL, ///< Path to store temporary data +        .enable_docker = true, ///< Toggle docker usage +        .enable_artifactory = true, ///< Toggle artifactory server usage +        .enable_artifactory_build_info = true, ///< Toggle build-info uploads +        .enable_testing = true, ///< Toggle [test] block "script" execution. "script_setup" always executes. +        .enable_rewrite_spec_stage_2 = true, ///< Leave template stings in output files +        .parallel_fail_fast = false, ///< Kill ALL multiprocessing tasks immediately on error  };  void globals_free() { @@ -55,7 +56,6 @@ void globals_free() {      guard_free(globals.jfrog.jfrog_artifactory_base_url);      guard_free(globals.jfrog.jfrog_artifactory_product);      guard_free(globals.jfrog.remote_filename); -    guard_free(globals.workaround.tox_posargs);      guard_free(globals.workaround.conda_reactivate);      if (globals.envctl) {          envctl_free(&globals.envctl); diff --git a/src/stasis_main.c b/src/stasis_main.c index 7ea465c..164a9ca 100644 --- a/src/stasis_main.c +++ b/src/stasis_main.c @@ -12,15 +12,18 @@  #define OPT_NO_TESTING 1004  #define OPT_OVERWRITE 1005  #define OPT_NO_REWRITE_SPEC_STAGE_2 1006 +#define OPT_PARALLEL_FAIL_FAST 1007  static struct option long_options[] = {          {"help", no_argument, 0, 'h'},          {"version", no_argument, 0, 'V'},          {"continue-on-error", no_argument, 0, 'C'},          {"config", required_argument, 0, 'c'}, +        {"cpu-limit", required_argument, 0, 'l'},          {"python", required_argument, 0, 'p'},          {"verbose", no_argument, 0, 'v'},          {"unbuffered", no_argument, 0, 'U'},          {"update-base", no_argument, 0, OPT_ALWAYS_UPDATE_BASE}, +        {"parallel-fail-fast", no_argument, 0, OPT_PARALLEL_FAIL_FAST},          {"overwrite", no_argument, 0, OPT_OVERWRITE},          {"no-docker", no_argument, 0, OPT_NO_DOCKER},          {"no-artifactory", no_argument, 0, OPT_NO_ARTIFACTORY}, @@ -35,10 +38,12 @@ const char *long_options_help[] = {          "Display program version",          "Allow tests to fail",          "Read configuration file", +        "Number of processes to spawn concurrently (default: cpus - 1)",          "Override version of Python in configuration",          "Increase output verbosity",          "Disable line buffering",          "Update conda installation prior to STASIS environment creation", +        "On test error, terminate all concurrent tasks",          "Overwrite an existing release",          "Do not build docker images",          "Do not upload artifacts to Artifactory", @@ -201,6 +206,13 @@ static void check_requirements(struct Delivery *ctx) {  }  int main(int argc, char *argv[]) { +    /* +    extern int exmain(int argc, char *argv[]); +    exmain(argc, argv); +    printf("ending program\n"); +    exit(0); +     */ +      struct Delivery ctx;      struct Process proc = {              .f_stdout = "", @@ -214,6 +226,10 @@ int main(int argc, char *argv[]) {      char installer_url[PATH_MAX];      char python_override_version[STASIS_NAME_MAX];      int user_disabled_docker = false; +    globals.cpu_limit = get_cpu_count(); +    if (globals.cpu_limit > 1) { +        globals.cpu_limit--; +    }      memset(env_name, 0, sizeof(env_name));      memset(env_name_testing, 0, sizeof(env_name_testing)); @@ -241,9 +257,18 @@ int main(int argc, char *argv[]) {              case 'p':                  strcpy(python_override_version, optarg);                  break; +            case 'l': +                globals.cpu_limit = strtol(optarg, NULL, 10); +                if (globals.cpu_limit < 1) { +                    globals.cpu_limit = 1; +                } +                break;              case OPT_ALWAYS_UPDATE_BASE:                  globals.always_update_base_environment = true;                  break; +            case OPT_PARALLEL_FAIL_FAST: +                globals.parallel_fail_fast = true; +                break;              case 'U':                  setenv("PYTHONUNBUFFERED", "1", 1);                  fflush(stdout); @@ -327,7 +352,6 @@ int main(int argc, char *argv[]) {      tpl_register("deploy.jfrog.repo", &globals.jfrog.repo);      tpl_register("deploy.jfrog.url", &globals.jfrog.url);      tpl_register("deploy.docker.registry", &ctx.deploy.docker.registry); -    tpl_register("workaround.tox_posargs", &globals.workaround.tox_posargs);      tpl_register("workaround.conda_reactivate", &globals.workaround.conda_reactivate);      // Expose function(s) to the template engine @@ -336,6 +360,7 @@ int main(int argc, char *argv[]) {      tpl_register_func("get_github_release_notes_auto", &get_github_release_notes_auto_tplfunc_entrypoint, 1, &ctx);      tpl_register_func("junitxml_file", &get_junitxml_file_entrypoint, 1, &ctx);      tpl_register_func("basetemp_dir", &get_basetemp_dir_entrypoint, 1, &ctx); +    tpl_register_func("tox_run", &tox_run_entrypoint, 2, &ctx);      // Set up PREFIX/etc directory information      // The user may manipulate the base directory path with STASIS_SYSCONFDIR diff --git a/src/template_func_proto.c b/src/template_func_proto.c index 3cf66e4..ebb595e 100644 --- a/src/template_func_proto.c +++ b/src/template_func_proto.c @@ -109,4 +109,41 @@ int get_basetemp_dir_entrypoint(void *frame, void *data_out) {      sprintf(*output, "%s/truth-%s-%s", ctx->storage.tmpdir, name, ctx->info.release_name);      return result; +} + +int tox_run_entrypoint(void *frame, void *data_out) { +    char **output = (char **) data_out; +    struct tplfunc_frame *f = (struct tplfunc_frame *) frame; +    const struct Delivery *ctx = (const struct Delivery *) f->data_in; + +    // Apply workaround for tox positional arguments +    char *toxconf = NULL; +    if (!access("tox.ini", F_OK)) { +        if (!fix_tox_conf("tox.ini", &toxconf)) { +            msg(STASIS_MSG_L3, "Fixing tox positional arguments\n"); +            *output = calloc(STASIS_BUFSIZ, sizeof(**output)); +            if (!*output) { +                return -1; +            } +            char *basetemp_path = NULL; +            if (get_basetemp_dir_entrypoint(f, &basetemp_path)) { +                return -2; +            } +            char *jxml_path = NULL; +            if (get_junitxml_file_entrypoint(f, &jxml_path)) { +                return -3; +            } +            const char *tox_target = f->argv[0].t_char_ptr; +            const char *pytest_args = f->argv[1].t_char_ptr; +            if (isempty(toxconf) || !strcmp(toxconf, "/")) { +                SYSERROR("Unsafe toxconf path: '%s'", toxconf); +                return -4; +            } +            snprintf(*output, STASIS_BUFSIZ - 1, "\npip install tox && (tox -e py%s%s -c %s --root . -- --basetemp=\"%s\" --junitxml=\"%s\" %s ; rm -f '%s')\n", ctx->meta.python_compact, tox_target, toxconf, basetemp_path, jxml_path, pytest_args ? pytest_args : "", toxconf); + +            guard_free(jxml_path); +            guard_free(basetemp_path); +        } +    } +    return 0;  }
\ No newline at end of file | 
