aboutsummaryrefslogtreecommitdiff
path: root/src/multiprocessing.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/multiprocessing.c')
-rw-r--r--src/multiprocessing.c137
1 files changed, 74 insertions, 63 deletions
diff --git a/src/multiprocessing.c b/src/multiprocessing.c
index 70f86be..7b3f997 100644
--- a/src/multiprocessing.c
+++ b/src/multiprocessing.c
@@ -5,73 +5,30 @@ static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessing
return &pool->task[pool->num_used];
}
-struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) {
- struct MultiProcessingTask *slot = mp_pool_next_available(pool);
- if (pool->num_used != pool->num_alloc) {
- pool->num_used++;
- } else {
- fprintf(stderr, "Maximum number of tasks reached\n");
- return NULL;
- }
-
- memset(slot->ident, 0, sizeof(slot->ident));
- strncpy(slot->ident, ident, sizeof(slot->ident) - 1);
-
- strcat(slot->log_file, pool->log_root);
- strcat(slot->log_file, "/");
-
- FILE *tp = NULL;
- char *t_name;
- t_name = xmkstemp(&tp, "w");
- if (!t_name || !tp) {
- return NULL;
- }
-
- // Set the script's permissions so that only the calling user can use it
- // This should help prevent eavesdropping if keys are applied in plain-text
- // somewhere.
- chmod(t_name, 0700);
-
- memset(slot->parent_script, 0, sizeof(slot->parent_script));
- strncpy(slot->parent_script, t_name, PATH_MAX - 1);
-
- fprintf(tp, "#!/bin/bash\n%s\n", cmd);
- fflush(tp);
- fclose(tp);
-
- char sema_name[PATH_MAX] = {0};
- sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident);
- sem_unlink(sema_name);
- slot->gate = sem_open(sema_name, O_CREAT, 0644, 0);
- if (slot->gate == SEM_FAILED) {
- perror("sem_open failed");
- exit(1);
- }
-
+static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) {
pid_t pid = fork();
int child_status = 0;
if (pid == -1) {
- return NULL;
+ return -1;
} else if (pid == 0) { // child process
char *cwd = NULL;
FILE *fp_log = NULL;
// Synchronize sub-process startup
// Stop here until summoned by mp_pool_join()
- if (sem_wait(slot->gate) < 0) {
+ if (sem_wait(task->gate) < 0) {
perror("sem_wait failed");
exit(1);
}
-
// Redirect stdout and stderr to the log file
fflush(stdout);
fflush(stderr);
- printf("[%s:%s] Task started (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid);
- fp_log = freopen(slot->log_file, "w+", stdout);
+ printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid);
+ fp_log = freopen(task->log_file, "w+", stdout);
if (!fp_log) {
- fprintf(stderr, "unable to open '%s' for writing: %s\n", slot->log_file, strerror(errno));
- return NULL;
+ fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno));
+ return -1;
}
dup2(fileno(stdout), fileno(stderr));
@@ -91,7 +48,7 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char
// Generate log header
fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown");
- fprintf(fp_log, "# PID: %d\n", slot->parent_pid);
+ fprintf(fp_log, "# PID: %d\n", task->parent_pid);
fprintf(fp_log, "# WORKDIR: %s\n", cwd);
fprintf(fp_log, "# COMMAND:\n%s\n", cmd);
fprintf(fp_log, "# OUTPUT:\n");
@@ -102,28 +59,77 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char
// Execute task
fflush(stdout);
fflush(stderr);
- char *args[] = {"bash", "--norc", t_name, (char *) NULL};
- slot->status = execvp("/bin/bash", args);
+ char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL};
+ task->status = execvp("/bin/bash", args);
} else { // parent process
- printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, slot->ident, pid);
+ printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid);
// Give the child process access to our PID value
- slot->pid = pid;
- slot->parent_pid = pid;
+ task->pid = pid;
+ task->parent_pid = pid;
// Set log file name
- sprintf(slot->log_file + strlen(slot->log_file), "task-%zu-%d.log", mp_global_task_count, slot->parent_pid);
+ sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid);
mp_global_task_count++;
// Check child's status
pid_t code = waitpid(pid, &child_status, WUNTRACED | WCONTINUED | WNOHANG);
if (code < 0) {
perror("waitpid failed");
- return NULL;
+ return -1;
}
- return slot;
}
- return NULL;
+ return 0;
+}
+
+struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) {
+ struct MultiProcessingTask *slot = mp_pool_next_available(pool);
+ if (pool->num_used != pool->num_alloc) {
+ pool->num_used++;
+ } else {
+ fprintf(stderr, "Maximum number of tasks reached\n");
+ return NULL;
+ }
+
+ memset(slot->ident, 0, sizeof(slot->ident));
+ strncpy(slot->ident, ident, sizeof(slot->ident) - 1);
+
+ strcat(slot->log_file, pool->log_root);
+ strcat(slot->log_file, "/");
+
+ FILE *tp = NULL;
+ char *t_name;
+ t_name = xmkstemp(&tp, "w");
+ if (!t_name || !tp) {
+ return NULL;
+ }
+
+ // Set the script's permissions so that only the calling user can use it
+ // This should help prevent eavesdropping if keys are applied in plain-text
+ // somewhere.
+ chmod(t_name, 0700);
+
+ memset(slot->parent_script, 0, sizeof(slot->parent_script));
+ strncpy(slot->parent_script, t_name, PATH_MAX - 1);
+ guard_free(t_name);
+
+ fprintf(tp, "#!/bin/bash\n%s\n", cmd);
+ fflush(tp);
+ fclose(tp);
+
+ char sema_name[PATH_MAX] = {0};
+ sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident);
+ sem_unlink(sema_name);
+ slot->gate = sem_open(sema_name, O_CREAT, 0644, 0);
+ if (slot->gate == SEM_FAILED) {
+ perror("sem_open failed");
+ exit(1);
+ }
+
+ if (mp_task_fork(pool, slot, cmd)) {
+ return NULL;
+ }
+ return slot;
}
static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) {
@@ -173,12 +179,10 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) {
size_t tasks_complete = 0;
size_t lower_i = 0;
size_t upper_i = jobs;
-
do {
- //if (upper_i >= pool->num_alloc) {
+ size_t hang_check = 0;
if (upper_i >= pool->num_used) {
size_t x = upper_i - pool->num_used;
- //size_t x = upper_i - pool->num_alloc;
upper_i -= (size_t) x;
}
for (size_t i = lower_i; i < upper_i; i++) {
@@ -191,6 +195,12 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) {
if (slot->pid == MP_POOL_PID_UNUSED) {
// PID is already used up, skip it
+ hang_check++;
+ if (hang_check >= pool->num_used) {
+ fprintf(stderr, "%s is deadlocked\n", pool->ident);
+ failures++;
+ goto pool_deadlocked;
+ }
continue;
}
@@ -275,6 +285,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) {
sleep(1);
} while (1);
+ pool_deadlocked:
puts("");
return failures;
}