diff options
| author | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-13 09:35:34 -0400 | 
|---|---|---|
| committer | Joseph Hunkeler <jhunkeler@gmail.com> | 2024-09-18 23:06:08 -0400 | 
| commit | b7251ce3bf65bcbec7ecbb98a0eb0b3c9abde507 (patch) | |
| tree | 120eac93d20476ed344b2b188f262ec86f1cc38a /src | |
| parent | 9691ccf51b3efd8113e9620c4afa8b5382d7f161 (diff) | |
| download | stasis-b7251ce3bf65bcbec7ecbb98a0eb0b3c9abde507.tar.gz | |
Move guard_ macros to core_mem.h
* Move core_mem.h below config.h
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/multiprocessing.c | 373 | 
2 files changed, 374 insertions, 0 deletions
| diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 2399dc5..5d1c8ff 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -25,6 +25,7 @@ add_library(stasis_core STATIC          github.c          template_func_proto.c          envctl.c +        multiprocessing.c  )  add_executable(stasis diff --git a/src/multiprocessing.c b/src/multiprocessing.c new file mode 100644 index 0000000..09190d3 --- /dev/null +++ b/src/multiprocessing.c @@ -0,0 +1,373 @@ +#include "core_mem.h" +#include "multiprocessing.h" +#include "utils.h" + +size_t mp_global_task_count = 0; +static struct MultiProcessingTask *mp_pool_next_available(struct MultiProcessingPool *pool) { +    return &pool->task[pool->num_used]; +} + +struct MultiProcessingTask *mp_task(struct MultiProcessingPool *pool, const char *ident, char *cmd) { +    struct MultiProcessingTask *slot = mp_pool_next_available(pool); +    //struct MultiProcessingTask *slot = mp_pool_any_available(pool); +    pool->num_used++; + +    memset(slot->ident, 0, sizeof(slot->ident)); +    strncpy(slot->ident, ident, sizeof(slot->ident) - 1); + +    strcat(slot->log_file, pool->log_root); +    strcat(slot->log_file, "/"); + +    FILE *tp = NULL; +    char *t_name; +    t_name = xmkstemp(&tp, "w"); +    if (!t_name || !tp) { +        return NULL; +    } + +    // Set the script's permissions so that only the calling user can use it +    // This should help prevent eavesdropping if keys are applied in plain-text +    // somewhere. +    chmod(t_name, 0700); + +    memset(slot->parent_script, 0, sizeof(slot->parent_script)); +    strncpy(slot->parent_script, t_name, PATH_MAX - 1); + +    fprintf(tp, "#!/bin/bash\n%s\n", cmd); +    fflush(tp); +    fclose(tp); + +    pid_t pid = fork(); +    int child_status = 0; +    if (pid == -1) { +        return NULL; +    } else if (pid == 0) { // child process +        char *cwd = NULL; +        FILE *fp_log = NULL; + +        // Synchronize sub-process startup +        // Stop here until summoned by mp_pool_join() +        if (sem_wait(slot->gate) < 0) { +            perror("sem_wait failed"); +            exit(1); +        } + + +        // Redirect stdout and stderr to the log file +        fflush(stdout); +        fflush(stderr); +        printf("[%s:%s] Task started (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); +        fp_log = freopen(slot->log_file, "w+", stdout); +        if (!fp_log) { +            fprintf(stderr, "unable to open '%s' for writing: %s\n", slot->log_file, strerror(errno)); +            return NULL; +        } +        dup2(fileno(stdout), fileno(stderr)); + +        // Generate timestamp for log header +        time_t t = time(NULL); +        char *timebuf = ctime(&t); +        if (timebuf) { +            // strip line feed from timestamp +            timebuf[strlen(timebuf) ? strlen(timebuf) - 1 : 0] = 0; +        } + +        cwd = getcwd(NULL, PATH_MAX); +        if (!cwd) { +            perror(cwd); +            exit(1); +        } + +        // Generate log header +        fprintf(fp_log, "# STARTED: %s\n", timebuf ? timebuf : "unknown"); +        fprintf(fp_log, "# PID: %d\n", slot->parent_pid); +        fprintf(fp_log, "# WORKDIR: %s\n", cwd); +        fprintf(fp_log, "# COMMAND:\n%s\n", cmd); +        fprintf(fp_log, "# OUTPUT:\n"); +        // Commit header to log file / clean up +        fflush(fp_log); +        guard_free(cwd); + +        // Execute task +        fflush(stdout); +        fflush(stderr); +        char *args[] = {"bash", "--norc", t_name, (char *) NULL}; +        slot->status = execvp("/bin/bash", args); +    } else { // parent process +        printf("[%s:%s] Task queued (pid: %d)\n", pool->ident, slot->ident, pid); + +        // Give the child process access to our PID value +        slot->pid = pid; +        slot->parent_pid = pid; + +        // Set log file name +        sprintf(slot->log_file + strlen(slot->log_file), "task-%zu-%d.log", mp_global_task_count, slot->parent_pid); +        mp_global_task_count++; + +        // Check child's status +        pid_t code = waitpid(pid, &child_status, WUNTRACED | WCONTINUED | WNOHANG); +        if (code < 0) { +            perror("waitpid failed"); +            return NULL; +        } +        return slot; +    } +    return NULL; +} + +static int show_log_contents(FILE *stream, struct MultiProcessingTask *task) { +    FILE *fp = fopen(task->log_file, "r"); +    if (!fp) { +        return -1; +    } +    char buf[BUFSIZ] = {0}; +    while ((fgets(buf, sizeof(buf) - 1, fp)) != NULL) { +        fprintf(stream, "%s", buf); +        memset(buf, 0, sizeof(buf)); +    } +    fprintf(stream, "\n"); +    return 0; +} + +int mp_pool_kill(struct MultiProcessingPool *pool, int signum) { +    printf("Sending signal %d to pool '%s'\n", signum, pool->ident); +    //for (size_t i = 0; i < pool->num_alloc; i++) { +    for (size_t i = 0; i < pool->num_used; i++) { +        struct MultiProcessingTask *slot = &pool->task[i]; +        if (!slot) { +            return -1; +        } +        if (slot->pid > 0) { +            int status; +            printf("Sending signal %d to task '%s' (pid: %d)\n", signum, slot->ident, slot->pid); +            status = kill(slot->pid, signum); +            if (status && errno != ESRCH) { +                fprintf(stderr, "Task '%s' (pid: %d) did not respond: %s\n", slot->ident, slot->pid, strerror(errno)); +            } +        } +        if (!access(slot->log_file, F_OK)) { +            remove(slot->log_file); +        } +        if (!access(slot->parent_script, F_OK)) { +            remove(slot->parent_script); +        } +    } +    return 0; +} + +int mp_pool_join(struct MultiProcessingPool *pool, size_t jobs, size_t flags) { +    int status = 0; +    int watcher = 0; +    int failures = 0; +    size_t tasks_complete = 0; +    size_t lower_i = 0; +    size_t upper_i = jobs; + +    do { +        //if (upper_i >= pool->num_alloc) { +        if (upper_i >= pool->num_used) { +            size_t x = upper_i - pool->num_used; +            //size_t x = upper_i - pool->num_alloc; +            upper_i -= (size_t) x; +        } +        for (size_t i = lower_i; i < upper_i; i++) { +            struct MultiProcessingTask *slot = &pool->task[i]; +            // Unlock the semaphore to allow the queued processes to start up +            if (sem_post(slot->gate) < 0) { +                perror("sem_post failed"); +                exit(1); +            } + +            if (slot->pid == MP_POOL_PID_UNUSED) { +                // PID is already used up, skip it +                continue; +            } + +            // Is the process finished? +            pid_t pid = waitpid(slot->pid, &status, WNOHANG | WUNTRACED | WCONTINUED); + +            // Update status +            slot->status = status; + +            char progress[1024] = {0}; +            if (pid > 0) { +                double percent = ((double) (tasks_complete) / (double) pool->num_used) * 100; +                //double percent = ((double) (tasks_complete) / (double) pool->num_alloc) * 100; +                sprintf(progress, "[%s:%s] [%3.1f%%]", pool->ident, slot->ident, percent); + +                // The process ended in one the following ways +                // Note: SIGSTOP nor SIGCONT will not increment the tasks_complete counter +                if (WIFEXITED(status)) { +                    printf("%s Task finished (status: %d)\n", progress, WEXITSTATUS(status)); +                    tasks_complete++; +                } else if (WIFSIGNALED(status)) { +                    printf("%s Task ended by signal %d (%s)\n", progress, WTERMSIG(status), strsignal(WTERMSIG(status))); +                    tasks_complete++; +                } else if (WIFSTOPPED(status)) { +                    printf("%s Task was suspended (%d)\n", progress, WSTOPSIG(status)); +                    continue; +                } else if (WIFCONTINUED(status)) { +                    printf("%s Task was resumed\n", progress); +                    continue; +                } else { +                    fprintf(stderr, "%s Task state is unknown (0x%04X)\n", progress, status); +                } + +                // Show the log (always) +                if (show_log_contents(stdout, slot)) { +                    perror(slot->log_file); +                } +                if (status >> 8 != 0 || (status & 0xff) != 0) { +                    fprintf(stderr, "%s Task failed\n", progress); +                    if (flags & MP_POOL_FAIL_FAST) { +                        mp_pool_kill(pool, SIGTERM); +                        return -2; +                    } +                } else { +                    printf("%s Task finished\n", progress); +                } + +                // Clean up logs and scripts left behind by the task +                if (remove(slot->log_file)) { +                    fprintf(stderr, "%s Unable to remove log file: '%s': %s\n", progress, slot->parent_script, strerror(errno)); +                } +                if (remove(slot->parent_script)) { +                    fprintf(stderr, "%s Unable to remove temporary script '%s': %s\n", progress, slot->parent_script, strerror(errno)); +                } + +                // Update progress and tell the poller to ignore the PID. The process is gone. +                slot->pid = MP_POOL_PID_UNUSED; +                failures += status; +            } else if (pid < 0) { +                fprintf(stderr, "waitpid failed: %s\n", strerror(errno)); +                return -1; +            } else { +                if (watcher > 9) { +                    printf("[%s:%s] Task is running (pid: %d)\n", pool->ident, slot->ident, slot->parent_pid); +                    watcher = 0; +                } else { +                    watcher++; +                } +            } +        } + +        if (tasks_complete == pool->num_used) { +            break; +        } + +        if (tasks_complete == upper_i) { +            lower_i += jobs; +            upper_i += jobs; +        } + +        // Poll again after a short delay +        sleep(1); +    } while (1); + +    puts(""); +    return failures; +} + + +struct MultiProcessingPool *mp_pool_init(const char *ident, const char *log_root) { +    struct MultiProcessingPool *pool; + +    if (!log_root) { +        // log_root must be set +        return NULL; +    } + +    //pool = malloc(1 * sizeof(*pool)); +    pool = mmap(NULL, sizeof(*pool), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); +    pool->ident = ident; +    pool->log_root = log_root; +    pool->num_used = 0; +    pool->num_alloc = MP_POOL_TASK_MAX; + +    if (mkdirs(log_root, 0700) < 0) { +        if (errno != EEXIST) { +            perror(log_root); +            mp_pool_free(&pool); +            return NULL; +        } +    } + +    //pool->task = calloc(pool->num_alloc + 1, sizeof(*pool->task)); +    pool->task = mmap(NULL, (pool->num_alloc + 1) * sizeof(*pool->task), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); +    if (pool->task == MAP_FAILED) { +        perror("mmap"); +        mp_pool_free(&pool); +        return NULL; +    } + +    for (size_t i = 0; i < pool->num_alloc; i++) { +        struct MultiProcessingTask *slot = &pool->task[i]; +        slot->gate = mmap(NULL, sizeof(*slot->gate), PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS | MAP_POPULATE, -1, 0); +        if (slot->gate == MAP_FAILED) { +            perror("mmap failed"); +            exit(1); +        } + +        if (sem_init(slot->gate, 1, 0) < 0) { +            perror("sem_init failed"); +            exit(1); +        } +    } + + +    return pool; +} + +void mp_pool_free(struct MultiProcessingPool **pool) { +    //for (size_t i = 0; i < (*pool)->num_used; i++) { +    for (size_t i = 0; i < (*pool)->num_alloc; i++) { +        if ((*pool)->task[i].gate) { +            if (sem_destroy((*pool)->task[i].gate) < 0) { +                perror("sem_destroy failed"); +                exit(1); +            } +            if (munmap((*pool)->task[i].gate, sizeof(*(*pool)->task[i].gate)) < 0) { +                perror("munmap"); +                exit(1); +            } +        } +    } +    if ((*pool)->task) { +        if (munmap((*pool)->task, sizeof(*(*pool)->task) * (*pool)->num_alloc) < 0) { +            perror("munmap"); +        } +    } +    if ((*pool)) { +        if (munmap((*pool), sizeof(*(*pool))) < 0) { +            perror("munmap"); +        } +        (*pool) = NULL; +    } +} + +int exmain(int argc, char *argv[]) { +    size_t i; +    struct MultiProcessingPool *pool = mp_pool_init("generic", "logs"); +    if (!pool) { +        perror("pool init"); +        exit(1); +    } + +    char *commands[] = { +        "sleep 2; dd if=/dev/zero of=file.dat bs=1M count=1", +        "/bin/echo hello world; sleep 5", +        "python -c 'print(1+1)'", +        "(for x in {1..10}; do echo $x; sleep 0.5; done)", +        "echo stdout >&1; echo stderr >&2; exit 1" +    }; + +    for (i = 0; i < sizeof(commands) / sizeof(*commands); i++) { +        if (mp_task(pool, "commands array", commands[i]) == NULL) { +            printf("Too many tasks queued (max: %d)\n", MP_POOL_TASK_MAX); +            break; +        } +    } +    mp_pool_join(pool, get_cpu_count() - 1, MP_POOL_FAIL_FAST); +    mp_pool_free(&pool); +    return 0; +} | 
