aboutsummaryrefslogtreecommitdiff
path: root/tests/test_multiprocessing.c
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_multiprocessing.c')
-rw-r--r--tests/test_multiprocessing.c92
1 files changed, 92 insertions, 0 deletions
diff --git a/tests/test_multiprocessing.c b/tests/test_multiprocessing.c
index b9cd309..7c9d695 100644
--- a/tests/test_multiprocessing.c
+++ b/tests/test_multiprocessing.c
@@ -1,5 +1,6 @@
#include "testing.h"
#include "multiprocessing.h"
+#include <pthread.h>
static struct MultiProcessingPool *pool;
char *commands[] = {
@@ -12,6 +13,10 @@ char *commands[] = {
};
void test_mp_pool_init() {
+ STASIS_ASSERT((pool = mp_pool_init(NULL, "mplogs")) == NULL, "Pool should not be initialized with invalid ident");
+ STASIS_ASSERT((pool = mp_pool_init("mypool", NULL)) == NULL, "Pool should not be initialized with invalid logname");
+ STASIS_ASSERT((pool = mp_pool_init(NULL, NULL)) == NULL, "Pool should not be initialized with invalid arguments");
+ pool = NULL;
STASIS_ASSERT((pool = mp_pool_init("mypool", "mplogs")) != NULL, "Pool initialization failed");
STASIS_ASSERT_FATAL(pool != NULL, "Should not be NULL");
STASIS_ASSERT(pool->num_alloc == MP_POOL_TASK_MAX, "Wrong number of default records");
@@ -56,6 +61,7 @@ void test_mp_task() {
pool = mp_pool_init("mypool", "mplogs");
if (pool) {
+ pool->status_interval = 3;
for (size_t i = 0; i < sizeof(commands) / sizeof(*commands); i++) {
struct MultiProcessingTask *task;
char task_name[100] = {0};
@@ -113,6 +119,90 @@ void test_mp_pool_workflow() {
}
}
+void test_mp_fail_fast() {
+ char *commands_ff[128] = {
+ "sleep 3; true",
+ "sleep 5; false",
+ };
+
+ // Pad the array with tasks. None of these should execute when
+ // the "fail fast" conditions are met
+ char *nopcmd = "sleep 30; true";
+ for (size_t i = 2; i < sizeof(commands_ff) / sizeof(*commands_ff); i++) {
+ commands_ff[i] = nopcmd;
+ }
+
+ struct MultiProcessingPool *p;
+ STASIS_ASSERT((p = mp_pool_init("failfast", "failfastlogs")) != NULL, "Failed to initialize pool");
+ for (size_t i = 0; i < sizeof(commands_ff) / sizeof(*commands_ff); i++) {
+ char *command = commands_ff[i];
+ char taskname[100] = {0};
+ snprintf(taskname, sizeof(taskname) - 1, "task_%03zu", i);
+ STASIS_ASSERT(mp_pool_task(p, taskname, NULL, (char *) command) != NULL, "Failed to queue task");
+ }
+
+ STASIS_ASSERT(mp_pool_join(p, get_cpu_count(), MP_POOL_FAIL_FAST) < 0, "Unexpected result");
+
+ struct result {
+ int total_signaled;
+ int total_status_fail;
+ int total_status_success;
+ int total_unused;
+ } result = {
+ .total_signaled = 0,
+ .total_status_fail = 0,
+ .total_status_success = 0,
+ .total_unused = 0,
+ };
+ for (size_t i = 0; i < p->num_used; i++) {
+ struct MultiProcessingTask *task = &p->task[i];
+ if (task->signaled_by) result.total_signaled++;
+ if (task->status > 0) result.total_status_fail++;
+ if (task->status == 0) result.total_status_success++;
+ if (task->pid == MP_POOL_PID_UNUSED && task->status == MP_POOL_TASK_STATUS_INITIAL) result.total_unused++;
+ }
+ fprintf(stderr, "total_status_fail = %d\ntotal_status_success = %d\ntotal_signaled = %d\ntotal_unused = %d\n",
+ result.total_status_fail, result.total_status_success, result.total_signaled, result.total_unused);
+ STASIS_ASSERT(result.total_status_fail, "Should have failures");
+ STASIS_ASSERT(result.total_status_success, "Should have successes");
+ STASIS_ASSERT(result.total_signaled, "Should have signaled PIDs");
+ STASIS_ASSERT(result.total_unused, "Should have PIDs marked UNUSED.");
+ mp_pool_show_summary(p);
+ mp_pool_free(&p);
+}
+
+pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
+static void *pool_container(void *data) {
+ char *commands_sc[] = {
+ "sleep 10; echo done sleeping"
+ };
+ struct MultiProcessingPool **x = (struct MultiProcessingPool **) data;
+ struct MultiProcessingPool *p = (*x);
+ pthread_mutex_lock(&mutex);
+ mp_pool_task(p, "stop_resume_test", NULL, commands_sc[0]);
+ mp_pool_join(p, 1, 0);
+ mp_pool_show_summary(p);
+ mp_pool_free(&p);
+ pthread_mutex_unlock(&mutex);
+ return NULL;
+}
+
+void test_mp_stop_continue() {
+ struct MultiProcessingPool *p = NULL;
+ STASIS_ASSERT((p = mp_pool_init("stopcontinue", "stopcontinuelogs")) != NULL, "Failed to initialize pool");
+ pthread_t th;
+ pthread_create(&th, NULL, pool_container, &p);
+ sleep(2);
+ if (p->task[0].pid != MP_POOL_PID_UNUSED) {
+ STASIS_ASSERT(kill(p->task[0].pid, SIGSTOP) == 0, "SIGSTOP failed");
+ sleep(2);
+ STASIS_ASSERT(kill(p->task[0].pid, SIGCONT) == 0, "SIGCONT failed");
+ } else {
+ STASIS_ASSERT(false, "Task was marked as unused when it shouldn't have been");
+ }
+ pthread_join(th, NULL);
+}
+
int main(int argc, char *argv[]) {
STASIS_TEST_BEGIN_MAIN();
STASIS_TEST_FUNC *tests[] = {
@@ -121,6 +211,8 @@ int main(int argc, char *argv[]) {
test_mp_pool_join,
test_mp_pool_free,
test_mp_pool_workflow,
+ test_mp_fail_fast,
+ test_mp_stop_continue
};
STASIS_TEST_RUN(tests);
STASIS_TEST_END_MAIN();