aboutsummaryrefslogtreecommitdiff
path: root/include/multiprocessing.h
blob: 2db33df5a7e03bb6a75ae94aa1ef43bc400e79b1 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/// @file multiprocessing.h
#ifndef STASIS_MULTIPROCESSING_H
#define STASIS_MULTIPROCESSING_H

#include "core.h"
#include <signal.h>
#include <sys/wait.h>
#include <semaphore.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <sys/stat.h>

struct MultiProcessingTask {
    sem_t *gate; ///< Child process startup lock
    pid_t pid; ///< Program PID
    pid_t parent_pid; ///< Program PID (parent process)
    int status; ///< Child process exit status
    int signaled_by; ///< Last signal received, if any
    char ident[255]; ///< Identity of the pool task
    char log_file[255]; ///< Path to stdout/stderr log file
    char parent_script[PATH_MAX]; ///< Path to temporary script executing the task
    struct {
        struct timespec t_start;
        struct timespec t_stop;
    } time_data; ///< Wall-time counters
};

struct MultiProcessingPool {
    struct MultiProcessingTask *task; ///< Array of tasks to execute
    size_t num_used; ///< Number of tasks populated in the task array
    size_t num_alloc; ///< Number of tasks allocated by the task array
    char ident[255]; ///< Identity of task pool
    char log_root[PATH_MAX]; ///< Base directory to store stderr/stdout log files
};

/// Maximum number of multiprocessing tasks STASIS can execute
#define MP_POOL_TASK_MAX 1000

/// Value signifies a process is unused or finished executing
#define MP_POOL_PID_UNUSED 0

/// Option flags for mp_pool_join()
#define MP_POOL_FAIL_FAST 1 << 1

/**
 * Create a multiprocessing pool
 *
 * ```c
 * #include "multiprocessing.h"
 * #include "utils.h" // for get_cpu_count()
 *
 * int main(int argc, char *argv[]) {
 *     struct MultiProcessingPool *mp;
 *     mp = mp_pool_init("mypool", "/tmp/mypool_logs");
 *     if (mp) {
 *         char *commands[] = {
 *             "/bin/echo hello world",
 *             "/bin/echo world hello",
 *             NULL
 *         }
 *         for (size_t i = 0; commands[i] != NULL); i++) {
 *             struct MultiProcessingTask *task;
 *             char task_name[100];
 *
 *             sprintf(task_name, "mytask%zu", i);
 *             task = mp_task(mp, task_name, commands[i]);
 *             if (!task) {
 *                 // handle task creation error
 *             }
 *         }
 *         if (mp_pool_join(mp, get_cpu_count(), MP_POOL_FAIL_FAST)) {
 *             // handle pool execution error
 *         }
 *         mp_pool_free(&mp);
 *     } else {
 *         // handle pool initialization error
 *     }
 * }
 * ```
 *
 * @param ident a name to identify the pool
 * @param log_root the path to store program output
 * @return pointer to initialized MultiProcessingPool
 * @return NULL on error
 */
struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root);

/**
 * Create a multiprocessing pool task
 *
 * @param pool a pointer to MultiProcessingPool
 * @param ident a name to identify the task
 * @param cmd a command to execute
 * @return pointer to MultiProcessingTask structure
 * @return NULL on error
 */
struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const char *ident, char *cmd);

/**
 * Execute all tasks in a pool
 *
 * @param pool a pointer to MultiProcessingPool
 * @param jobs the number of processes to spawn at once (for serial execution use `1`)
 * @param flags option to be OR'd (MP_POOL_FAIL_FAST)
 * @return 0 on success
 * @return >0 on failure
 * @return <0 on error
 */
int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags);

/**
 * Show summary of pool tasks
 *
 * @pararm pool a pointer to MultiProcessingPool
 */
void mp_pool_show_summary(struct MultiProcessingPool *pool);

/**
 * Release resources allocated by mp_pool_init()
 *
 * @param a pointer to MultiProcessingPool
 */
void mp_pool_free(struct MultiProcessingPool **pool);


#endif //STASIS_MULTIPROCESSING_H