diff options
| -rw-r--r-- | src/multiprocessing.c | 137 | 
1 files changed, 74 insertions, 63 deletions
| diff --git a/src/multiprocessing.c b/src/multiprocessing.c index 70f86be..7b3f997 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -5,73 +5,30 @@ static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessing      return &pool->task[pool->num_used];  } -struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { -    struct MultiProcessingTask *slot = mp_pool_next_available(pool); -    if (pool->num_used != pool->num_alloc) { -        pool->num_used++; -    } else { -        fprintf(stderr, "Maximum number of tasks reached\n"); -        return NULL; -    } - -    memset(slot->ident, 0, sizeof(slot->ident)); -    strncpy(slot->ident, ident, sizeof(slot->ident) - 1); - -    strcat(slot->log_file, pool->log_root); -    strcat(slot->log_file, "/"); - -    FILE *tp = NULL; -    char *t_name; -    t_name = xmkstemp(&tp, "w"); -    if (!t_name || !tp) { -        return NULL; -    } - -    // Set the script's permissions so that only the calling user can use it -    // This should help prevent eavesdropping if keys are applied in plain-text -    // somewhere. -    chmod(t_name, 0700); - -    memset(slot->parent_script, 0, sizeof(slot->parent_script)); -    strncpy(slot->parent_script, t_name, PATH_MAX - 1); - -    fprintf(tp, "#!/bin/bash\n%s\n", cmd); -    fflush(tp); -    fclose(tp); - -    char sema_name[PATH_MAX] = {0}; -    sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident); -    sem_unlink(sema_name); -    slot->gate = sem_open(sema_name, O_CREAT, 0644, 0); -    if (slot->gate == SEM_FAILED) { -        perror("sem_open failed"); -        exit(1); -    } - +static int mp_task_fork(struct MultiProcessingPool *pool, struct MultiProcessingTask *task, const char *cmd) {      pid_t pid = fork();      int child_status = 0;      if (pid == -1) { -        return NULL; +        return -1;      } else if (pid == 0) { // child process          char *cwd = NULL;          FILE *fp_log = NULL;          // Synchronize sub-process startup          // Stop here until summoned by mp_pool_join() -        if (sem_wait(slot->gate) < 0) { +        if (sem_wait(task->gate) < 0) {              perror("sem_wait failed");              exit(1);          } -          // Redirect stdout and stderr to the log file          fflush(stdout);          fflush(stderr); -        printf("[%s:%s] Task started (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); -        fp_log = freopen(slot->log_file, "w+", stdout); +        printf("[%s:%s] Task started (pid: %d)\n", pool->ident, task->ident, task->parent_pid); +        fp_log = freopen(task->log_file, "w+", stdout);          if (!fp_log) { -            fprintf(stderr, "unable to open '%s' for writing: %s\n", slot->log_file, strerror(errno)); -            return NULL; +            fprintf(stderr, "unable to open '%s' for writing: %s\n", task->log_file, strerror(errno)); +            return -1;          }          dup2(fileno(stdout), fileno(stderr)); @@ -91,7 +48,7 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char          // Generate log header          fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); -        fprintf(fp_log, "# PID: %d\n", slot->parent_pid); +        fprintf(fp_log, "# PID: %d\n", task->parent_pid);          fprintf(fp_log, "# WORKDIR: %s\n", cwd);          fprintf(fp_log, "# COMMAND:\n%s\n", cmd);          fprintf(fp_log, "# OUTPUT:\n"); @@ -102,28 +59,77 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char          // Execute task          fflush(stdout);          fflush(stderr); -        char *args[] = {"bash", "--norc", t_name, (char *) NULL}; -        slot->status = execvp("/bin/bash", args); +        char *args[] = {"bash", "--norc", task->parent_script, (char *) NULL}; +        task->status = execvp("/bin/bash", args);      } else { // parent process -        printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, slot->ident, pid); +        printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, task->ident, pid);          // Give the child process access to our PID value -        slot->pid = pid; -        slot->parent_pid = pid; +        task->pid = pid; +        task->parent_pid = pid;          // Set log file name -        sprintf(slot->log_file + strlen(slot->log_file), "task-%zu-%d.log", mp_global_task_count, slot->parent_pid); +        sprintf(task->log_file + strlen(task->log_file), "task-%zu-%d.log", mp_global_task_count, task->parent_pid);          mp_global_task_count++;          // Check child's status          pid_t code = waitpid(pid, &child_status, WUNTRACED | WCONTINUED | WNOHANG);          if (code < 0) {              perror("waitpid failed"); -            return NULL; +            return -1;          } -        return slot;      } -    return NULL; +    return 0; +} + +struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { +    struct MultiProcessingTask *slot = mp_pool_next_available(pool); +    if (pool->num_used != pool->num_alloc) { +        pool->num_used++; +    } else { +        fprintf(stderr, "Maximum number of tasks reached\n"); +        return NULL; +    } + +    memset(slot->ident, 0, sizeof(slot->ident)); +    strncpy(slot->ident, ident, sizeof(slot->ident) - 1); + +    strcat(slot->log_file, pool->log_root); +    strcat(slot->log_file, "/"); + +    FILE *tp = NULL; +    char *t_name; +    t_name = xmkstemp(&tp, "w"); +    if (!t_name || !tp) { +        return NULL; +    } + +    // Set the script's permissions so that only the calling user can use it +    // This should help prevent eavesdropping if keys are applied in plain-text +    // somewhere. +    chmod(t_name, 0700); + +    memset(slot->parent_script, 0, sizeof(slot->parent_script)); +    strncpy(slot->parent_script, t_name, PATH_MAX - 1); +    guard_free(t_name); + +    fprintf(tp, "#!/bin/bash\n%s\n", cmd); +    fflush(tp); +    fclose(tp); + +    char sema_name[PATH_MAX] = {0}; +    sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident); +    sem_unlink(sema_name); +    slot->gate = sem_open(sema_name, O_CREAT, 0644, 0); +    if (slot->gate == SEM_FAILED) { +        perror("sem_open failed"); +        exit(1); +    } + +    if (mp_task_fork(pool, slot, cmd)) { +        return NULL; +    } +    return slot;  }  static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) { @@ -173,12 +179,10 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) {      size_t tasks_complete = 0;      size_t lower_i = 0;      size_t upper_i = jobs; -      do { -        //if (upper_i >= pool->num_alloc) { +        size_t hang_check = 0;          if (upper_i >= pool->num_used) {              size_t x = upper_i - pool->num_used; -            //size_t x = upper_i - pool->num_alloc;              upper_i -= (size_t) x;          }          for (size_t i = lower_i; i < upper_i; i++) { @@ -191,6 +195,12 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) {              if (slot->pid == MP_POOL_PID_UNUSED) {                  // PID is already used up, skip it +                hang_check++; +                if (hang_check >= pool->num_used) { +                    fprintf(stderr, "%s is deadlocked\n", pool->ident); +                    failures++; +                    goto pool_deadlocked; +                }                  continue;              } @@ -275,6 +285,7 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) {          sleep(1);      } while (1); +    pool_deadlocked:      puts("");      return failures;  } | 
