1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
#include "testing.h"
#include "multiprocessing.h"
static struct MultiProcessingPool *pool;
char *commands[] = {
"sleep 1; true",
"sleep 2; uname -a",
"sleep 3; /bin/echo hello world",
"sleep 4; true",
"sleep 5; uname -a",
"sleep 6; /bin/echo hello world",
};
void test_mp_pool_init() {
STASIS_ASSERT((pool = mp_pool_init("mypool", "mplogs")) != NULL, "Pool initialization failed");
STASIS_ASSERT_FATAL(pool != NULL, "Should not be NULL");
STASIS_ASSERT(pool->num_alloc == MP_POOL_TASK_MAX, "Wrong number of default records");
STASIS_ASSERT(pool->num_used == 0, "Wrong number of used records");
STASIS_ASSERT(strcmp(pool->log_root, "mplogs") == 0, "Wrong log root directory");
STASIS_ASSERT(strcmp(pool->ident, "mypool") == 0, "Wrong identity");
int data_bad_total = 0;
for (size_t i = 0; i < pool->num_alloc; i++) {
int data_bad = 0;
struct MultiProcessingTask *task = &pool->task[i];
data_bad += task->status == 0 ? 0 : 1;
data_bad += task->pid == 0 ? 0 : 1;
data_bad += task->parent_pid == 0 ? 0 : 1;
data_bad += task->signaled_by == 0 ? 0 : 1;
data_bad += task->time_data.t_start.tv_nsec == 0 ? 0 : 1;
data_bad += task->time_data.t_start.tv_sec == 0 ? 0 : 1;
data_bad += task->time_data.t_stop.tv_nsec == 0 ? 0 : 1;
data_bad += task->time_data.t_stop.tv_sec == 0 ? 0 : 1;
data_bad += (int) strlen(task->ident) == 0 ? 0 : 1;
data_bad += (int) strlen(task->parent_script) == 0 ? 0 : 1;
if (data_bad) {
SYSERROR("%s.task[%zu] has garbage values!", pool->ident, i);
SYSERROR(" ident: %s", task->ident);
SYSERROR(" status: %d", task->status);
SYSERROR(" pid: %d", task->pid);
SYSERROR(" parent_pid: %d", task->parent_pid);
SYSERROR(" signaled_by: %d", task->signaled_by);
SYSERROR(" t_start.tv_nsec: %ld", task->time_data.t_start.tv_nsec);
SYSERROR(" t_start.tv_sec: %ld", task->time_data.t_start.tv_sec);
SYSERROR(" t_stop.tv_nsec: %ld", task->time_data.t_stop.tv_nsec);
SYSERROR(" t_stop.tv_sec: %ld", task->time_data.t_stop.tv_sec);
data_bad_total++;
}
}
STASIS_ASSERT(data_bad_total == 0, "Task array is not pristine");
mp_pool_free(&pool);
}
void test_mp_task() {
pool = mp_pool_init("mypool", "mplogs");
if (pool) {
for (size_t i = 0; i < sizeof(commands) / sizeof(*commands); i++) {
struct MultiProcessingTask *task;
char task_name[100] = {0};
sprintf(task_name, "mytask%zu", i);
STASIS_ASSERT_FATAL((task = mp_pool_task(pool, task_name, NULL, commands[i])) != NULL, "Task should not be NULL");
STASIS_ASSERT(task->pid == MP_POOL_PID_UNUSED, "PID should be non-zero at this point");
STASIS_ASSERT(task->parent_pid == MP_POOL_PID_UNUSED, "Parent PID should be non-zero");
STASIS_ASSERT(task->status == -1, "Status should be -1 (not started yet)");
STASIS_ASSERT(strcmp(task->ident, task_name) == 0, "Wrong task identity");
STASIS_ASSERT(strstr(task->log_file, pool->log_root) != NULL, "Log file path must be in log_root");
}
}
}
void test_mp_pool_join() {
STASIS_ASSERT(mp_pool_join(pool, get_cpu_count(), 0) == 0, "Pool tasks should have not have failed");
for (size_t i = 0; i < pool->num_used; i++) {
struct MultiProcessingTask *task = &pool->task[i];
STASIS_ASSERT(task->pid == MP_POOL_PID_UNUSED, "Task should be marked as unused");
STASIS_ASSERT(task->status == 0, "Task status should be zero (success)");
}
}
void test_mp_pool_free() {
mp_pool_free(&pool);
STASIS_ASSERT(pool == NULL, "Should be NULL");
}
void test_mp_pool_workflow() {
struct testcase {
const char *input_cmd;
int input_join_flags;
int expected_result;
int expected_status;
int expected_signal;
};
struct testcase tc[] = {
{.input_cmd = "true && kill $$", .input_join_flags = 0, .expected_result = 1, .expected_status = 0, .expected_signal = SIGTERM},
{.input_cmd = "false || kill $$", .input_join_flags = 0, .expected_result = 1, .expected_status = 0, .expected_signal = SIGTERM},
{.input_cmd = "true", .input_join_flags = 0,.expected_result = 0, .expected_status = 0, .expected_signal = 0},
{.input_cmd = "false", .input_join_flags = 0, .expected_result = 1, .expected_status = 1, .expected_signal = 0},
};
for (size_t i = 0; i < sizeof(tc) / sizeof(*tc); i++) {
struct testcase *test = &tc[i];
struct MultiProcessingPool *p;
struct MultiProcessingTask *task;
STASIS_ASSERT((p = mp_pool_init("workflow", "mplogs")) != NULL, "Failed to initialize pool");
STASIS_ASSERT((task = mp_pool_task(p, "task", NULL, (char *) test->input_cmd)) != NULL, "Failed to queue task");
STASIS_ASSERT(mp_pool_join(p, get_cpu_count(), test->input_join_flags) == test->expected_result, "Unexpected result");
STASIS_ASSERT(task->status == test->expected_status, "Unexpected status");
STASIS_ASSERT(task->signaled_by == test->expected_signal, "Unexpected signal");
STASIS_ASSERT(task->pid == MP_POOL_PID_UNUSED, "Unexpected PID. Should be marked UNUSED.");
mp_pool_show_summary(p);
mp_pool_free(&p);
}
}
int main(int argc, char *argv[]) {
STASIS_TEST_BEGIN_MAIN();
STASIS_TEST_FUNC *tests[] = {
test_mp_pool_init,
test_mp_task,
test_mp_pool_join,
test_mp_pool_free,
test_mp_pool_workflow,
};
STASIS_TEST_RUN(tests);
STASIS_TEST_END_MAIN();
}
|