diff options
| author | Joseph Hunkeler <jhunkeler@users.noreply.github.com> | 2024-10-22 11:04:17 -0400 | 
|---|---|---|
| committer | GitHub <noreply@github.com> | 2024-10-22 11:04:17 -0400 | 
| commit | 7729d546d2dbda85ca1d86a913e97b51487355ba (patch) | |
| tree | e9a0e7f9f2069ecd9e718dd66d3e11fa7a80722d /tests/test_multiprocessing.c | |
| parent | 8edc87d51900ccf7d1d67ad3647a4b8fa2d9b7ae (diff) | |
| parent | 30f48145d1a1c747c40f94e2a7314d4bdf61cf55 (diff) | |
| download | stasis-7729d546d2dbda85ca1d86a913e97b51487355ba.tar.gz | |
Merge pull request #63 from jhunkeler/update-tests
Update tests / Bug fixes
Diffstat (limited to 'tests/test_multiprocessing.c')
| -rw-r--r-- | tests/test_multiprocessing.c | 92 | 
1 files changed, 92 insertions, 0 deletions
| diff --git a/tests/test_multiprocessing.c b/tests/test_multiprocessing.c index b9cd309..7c9d695 100644 --- a/tests/test_multiprocessing.c +++ b/tests/test_multiprocessing.c @@ -1,5 +1,6 @@  #include "testing.h"  #include "multiprocessing.h" +#include <pthread.h>  static struct MultiProcessingPool *pool;  char *commands[] = { @@ -12,6 +13,10 @@ char *commands[] = {  };  void test_mp_pool_init() { +    STASIS_ASSERT((pool = mp_pool_init(NULL, "mplogs")) == NULL, "Pool should not be initialized with invalid ident"); +    STASIS_ASSERT((pool = mp_pool_init("mypool", NULL)) == NULL, "Pool should not be initialized with invalid logname"); +    STASIS_ASSERT((pool = mp_pool_init(NULL, NULL)) == NULL, "Pool should not be initialized with invalid arguments"); +    pool = NULL;      STASIS_ASSERT((pool = mp_pool_init("mypool", "mplogs")) != NULL, "Pool initialization failed");      STASIS_ASSERT_FATAL(pool != NULL, "Should not be NULL");      STASIS_ASSERT(pool->num_alloc == MP_POOL_TASK_MAX, "Wrong number of default records"); @@ -56,6 +61,7 @@ void test_mp_task() {      pool = mp_pool_init("mypool", "mplogs");      if (pool) { +        pool->status_interval = 3;          for (size_t i = 0; i < sizeof(commands) / sizeof(*commands); i++) {              struct MultiProcessingTask *task;              char task_name[100] = {0}; @@ -113,6 +119,90 @@ void test_mp_pool_workflow() {      }  } +void test_mp_fail_fast() { +    char *commands_ff[128] = { +        "sleep 3; true", +        "sleep 5; false", +    }; + +    // Pad the array with tasks. None of these should execute when +    // the "fail fast" conditions are met +    char *nopcmd = "sleep 30; true"; +    for (size_t i = 2; i < sizeof(commands_ff) / sizeof(*commands_ff); i++) { +        commands_ff[i] = nopcmd; +    } + +    struct MultiProcessingPool *p; +    STASIS_ASSERT((p = mp_pool_init("failfast", "failfastlogs")) != NULL, "Failed to initialize pool"); +    for (size_t i = 0; i < sizeof(commands_ff) / sizeof(*commands_ff); i++) { +        char *command = commands_ff[i]; +        char taskname[100] = {0}; +        snprintf(taskname, sizeof(taskname) - 1, "task_%03zu", i); +        STASIS_ASSERT(mp_pool_task(p, taskname, NULL, (char *) command) != NULL, "Failed to queue task"); +    } + +    STASIS_ASSERT(mp_pool_join(p, get_cpu_count(), MP_POOL_FAIL_FAST) < 0, "Unexpected result"); + +    struct result { +        int total_signaled; +        int total_status_fail; +        int total_status_success; +        int total_unused; +    } result = { +        .total_signaled = 0, +        .total_status_fail = 0, +        .total_status_success = 0, +        .total_unused = 0, +    }; +    for (size_t i = 0; i < p->num_used; i++) { +        struct MultiProcessingTask *task = &p->task[i]; +        if (task->signaled_by) result.total_signaled++; +        if (task->status > 0) result.total_status_fail++; +        if (task->status == 0) result.total_status_success++; +        if (task->pid == MP_POOL_PID_UNUSED && task->status == MP_POOL_TASK_STATUS_INITIAL) result.total_unused++; +    } +    fprintf(stderr, "total_status_fail = %d\ntotal_status_success = %d\ntotal_signaled = %d\ntotal_unused = %d\n", +            result.total_status_fail, result.total_status_success, result.total_signaled, result.total_unused); +    STASIS_ASSERT(result.total_status_fail, "Should have failures"); +    STASIS_ASSERT(result.total_status_success, "Should have successes"); +    STASIS_ASSERT(result.total_signaled, "Should have signaled PIDs"); +    STASIS_ASSERT(result.total_unused, "Should have PIDs marked UNUSED."); +    mp_pool_show_summary(p); +    mp_pool_free(&p); +} + +pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; +static void *pool_container(void *data) { +    char *commands_sc[] = { +        "sleep 10; echo done sleeping" +    }; +    struct MultiProcessingPool **x = (struct MultiProcessingPool **) data; +    struct MultiProcessingPool *p = (*x); +    pthread_mutex_lock(&mutex); +    mp_pool_task(p, "stop_resume_test", NULL, commands_sc[0]); +    mp_pool_join(p, 1, 0); +    mp_pool_show_summary(p); +    mp_pool_free(&p); +    pthread_mutex_unlock(&mutex); +    return NULL; +} + +void test_mp_stop_continue() { +    struct MultiProcessingPool *p = NULL; +    STASIS_ASSERT((p = mp_pool_init("stopcontinue", "stopcontinuelogs")) != NULL, "Failed to initialize pool"); +    pthread_t th; +    pthread_create(&th, NULL, pool_container, &p); +    sleep(2); +    if (p->task[0].pid != MP_POOL_PID_UNUSED) { +        STASIS_ASSERT(kill(p->task[0].pid, SIGSTOP) == 0, "SIGSTOP failed"); +        sleep(2); +        STASIS_ASSERT(kill(p->task[0].pid, SIGCONT) == 0, "SIGCONT failed"); +    } else { +        STASIS_ASSERT(false, "Task was marked as unused when it shouldn't have been"); +    } +    pthread_join(th, NULL); +} +  int main(int argc, char *argv[]) {      STASIS_TEST_BEGIN_MAIN();      STASIS_TEST_FUNC *tests[] = { @@ -121,6 +211,8 @@ int main(int argc, char *argv[]) {          test_mp_pool_join,          test_mp_pool_free,          test_mp_pool_workflow, +        test_mp_fail_fast, +        test_mp_stop_continue      };      STASIS_TEST_RUN(tests);      STASIS_TEST_END_MAIN(); | 
