1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
/// @file multiprocessing.h
#ifndef STASIS_MULTIPROCESSING_H
#define STASIS_MULTIPROCESSING_H
#include "core.h"
#include <signal.h>
#include <sys/wait.h>
#include <semaphore.h>
#include <sys/mman.h>
#include <fcntl.h>
#include <sys/stat.h>
struct MultiProcessingTask {
pid_t pid; ///< Program PID
pid_t parent_pid; ///< Program PID (parent process)
int status; ///< Child process exit status
int signaled_by; ///< Last signal received, if any
time_t _now; ///< Current time
time_t _seconds; ///< Time elapsed (used by MultiprocessingPool.status_interval)
char ident[255]; ///< Identity of the pool task
char *cmd; ///< Shell command(s) to be executed
size_t cmd_len; ///< Length of command string (for mmap/munmap)
char working_dir[PATH_MAX]; ///< Path to directory `cmd` should be executed in
char log_file[PATH_MAX]; ///< Full path to stdout/stderr log file
char parent_script[PATH_MAX]; ///< Path to temporary script executing the task
struct {
struct timespec t_start;
struct timespec t_stop;
} time_data; ///< Wall-time counters
};
struct MultiProcessingPool {
struct MultiProcessingTask *task; ///< Array of tasks to execute
size_t num_used; ///< Number of tasks populated in the task array
size_t num_alloc; ///< Number of tasks allocated by the task array
char ident[255]; ///< Identity of task pool
char log_root[PATH_MAX]; ///< Base directory to store stderr/stdout log files
int status_interval; ///< Report a pooled task is "running" every n seconds
};
/// A multiprocessing task's initial state (i.e. "FAIL")
#define MP_POOL_TASK_STATUS_INITIAL (-1)
/// Maximum number of multiprocessing tasks STASIS can execute
#define MP_POOL_TASK_MAX 1000
/// Value signifies a process is unused or finished executing
#define MP_POOL_PID_UNUSED 0
/// Option flags for mp_pool_join()
#define MP_POOL_FAIL_FAST 1 << 1
/**
* Create a multiprocessing pool
*
* ```c
* #include "multiprocessing.h"
* #include "utils.h" // for get_cpu_count()
*
* int main(int argc, char *argv[]) {
* struct MultiProcessingPool *mp;
* mp = mp_pool_init("mypool", "/tmp/mypool_logs");
* if (mp) {
* char *commands[] = {
* "/bin/echo hello world",
* "/bin/echo world hello",
* NULL
* }
* for (size_t i = 0; commands[i] != NULL); i++) {
* struct MultiProcessingTask *task;
* char task_name[100];
*
* sprintf(task_name, "mytask%zu", i);
* task = mp_task(mp, task_name, commands[i]);
* if (!task) {
* // handle task creation error
* }
* }
* if (mp_pool_join(mp, get_cpu_count(), MP_POOL_FAIL_FAST)) {
* // handle pool execution error
* }
* mp_pool_free(&mp);
* } else {
* // handle pool initialization error
* }
* }
* ```
*
* @param ident a name to identify the pool
* @param log_root the path to store program output
* @return pointer to initialized MultiProcessingPool
* @return NULL on error
*/
struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root);
/**
* Create a multiprocessing pool task
*
* @param pool a pointer to MultiProcessingPool
* @param ident a name to identify the task
* @param cmd a command to execute
* @return pointer to MultiProcessingTask structure
* @return NULL on error
*/
struct MultiProcessingTask *mp_pool_task(struct MultiProcessingPool *pool, const char *ident, char *working_dir, char *cmd);
/**
* Execute all tasks in a pool
*
* @param pool a pointer to MultiProcessingPool
* @param jobs the number of processes to spawn at once (for serial execution use `1`)
* @param flags option to be OR'd (MP_POOL_FAIL_FAST)
* @return 0 on success
* @return >0 on failure
* @return <0 on error
*/
int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags);
/**
* Show summary of pool tasks
*
* @param pool a pointer to MultiProcessingPool
*/
void mp_pool_show_summary(struct MultiProcessingPool *pool);
/**
* Release resources allocated by mp_pool_init()
*
* @param a pointer to MultiProcessingPool
*/
void mp_pool_free(struct MultiProcessingPool **pool);
#endif //STASIS_MULTIPROCESSING_H
|