diff options
| author | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-17 09:32:12 -0400 | 
|---|---|---|
| committer | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-18 23:07:14 -0400 | 
| commit | daf8c81c35f590f2ac145801933792a070ccbee2 (patch) | |
| tree | c9570402e27990ddee331d2943fa9d319c74ffb4 /src | |
| parent | 6921cb44f5464b59a2a169167cc63c66bcc95a9f (diff) | |
| download | stasis-daf8c81c35f590f2ac145801933792a070ccbee2.tar.gz | |
Add comments, remove dead code
Diffstat (limited to 'src')
| -rw-r--r-- | src/multiprocessing.c | 26 | 
1 files changed, 24 insertions, 2 deletions
| diff --git a/src/multiprocessing.c b/src/multiprocessing.c index a1d961e..25b667c 100644 --- a/src/multiprocessing.c +++ b/src/multiprocessing.c @@ -1,6 +1,8 @@  #include "core.h" +/// The sum of all tasks queued by mp_task()  size_t mp_global_task_count = 0; +  static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) {      return &pool->task[pool->num_used];  } @@ -107,12 +109,15 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char          return NULL;      } +    // Set task identifier string      memset(slot->ident, 0, sizeof(slot->ident));      strncpy(slot->ident, ident, sizeof(slot->ident) - 1); +    // Set log file path      strcat(slot->log_file, pool->log_root);      strcat(slot->log_file, "/"); +    // Create a temporary file to act as our intermediate command script      FILE *tp = NULL;      char *t_name;      t_name = xmkstemp(&tp, "w"); @@ -125,14 +130,18 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char      // somewhere.      chmod(t_name, 0700); +    // Record the script path      memset(slot->parent_script, 0, sizeof(slot->parent_script));      strncpy(slot->parent_script, t_name, PATH_MAX - 1);      guard_free(t_name); +    // Populate the script      fprintf(tp, "#!/bin/bash\n%s\n", cmd);      fflush(tp);      fclose(tp); +    // Create a uniquely named semaphore. +    // This is used by the child process to prevent task execution until mp_pool_join is called      char sema_name[PATH_MAX] = {0};      sprintf(sema_name, "/sem-%zu-%s-%s", mp_global_task_count, pool->ident, slot->ident);      sem_unlink(sema_name); @@ -142,6 +151,7 @@ struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char          exit(1);      } +    // Execute task      if (mp_task_fork(pool, slot, cmd)) {          return NULL;      } @@ -164,12 +174,12 @@ static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) {  int mp_pool_kill(struct MultiProcessingPool *pool, int signum) {      printf("Sending signal %d to pool '%s'\n", signum, pool->ident); -    //for (size_t i = 0; i < pool->num_alloc; i++) {      for (size_t i = 0; i < pool->num_used; i++) {          struct MultiProcessingTask *slot = &pool->task[i];          if (!slot) {              return -1;          } +        // Kill tasks in progress          if (slot->pid > 0) {              int status;              printf("Sending signal %d to task '%s' (pid: %d)\n", signum, slot->ident, slot->pid); @@ -209,10 +219,13 @@ int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) {                  exit(1);              } +            // Has the child been processed already?              if (slot->pid == MP_POOL_PID_UNUSED) { -                // PID is already used up, skip it +                // Child is already used up, skip it                  hang_check++;                  if (hang_check >= pool->num_used) { +                    // Unlikely to happen when called correctly, but if you join a pool that's already finished +                    // it will spin forever. This protects the program from entering an infinite loop.                      fprintf(stderr, "%s is deadlocked\n", pool->ident);                      failures++;                      goto pool_deadlocked; @@ -314,15 +327,20 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root          return NULL;      } +    // The pool is shared with children      pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + +    // Set pool identity string      memset(pool->ident, 0, sizeof(pool->ident));      strncpy(pool->ident, ident, sizeof(pool->ident) - 1); +    // Set logging base directory      memset(pool->log_root, 0, sizeof(pool->log_root));      strncpy(pool->log_root, log_root, sizeof(pool->log_root) - 1);      pool->num_used = 0;      pool->num_alloc = MP_POOL_TASK_MAX; +    // Create the log directory      if (mkdirs(log_root, 0700) < 0) {          if (errno != EEXIST) {              perror(log_root); @@ -331,6 +349,7 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root          }      } +    // Task array is shared with children      pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);      if (pool->task == MAP_FAILED) {          perror("mmap"); @@ -343,6 +362,7 @@ struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root  void mp_pool_free(struct MultiProcessingPool **pool) {      for (size_t i = 0; i < (*pool)->num_alloc; i++) { +        // Close all semaphores          if ((*pool)->task[i].gate) {              if (sem_close((*pool)->task[i].gate) < 0) {                  perror("sem_close failed"); @@ -350,11 +370,13 @@ void mp_pool_free(struct MultiProcessingPool **pool) {              }          }      } +    // Unmap all pool tasks      if ((*pool)->task) {          if (munmap((*pool)->task, sizeof(*(*pool)->task) * (*pool)->num_alloc) < 0) {              perror("munmap");          }      } +    // Unmap the pool      if ((*pool)) {          if (munmap((*pool), sizeof(*(*pool))) < 0) {              perror("munmap"); | 
