* [9front] upas/runq: parallel sending @ 2021-01-10 2:27 ori 2021-01-10 8:04 ` hiro 2021-01-17 0:42 ` ori 0 siblings, 2 replies; 4+ messages in thread From: ori @ 2021-01-10 2:27 UTC (permalink / raw) To: 9front As you may have noticed, you've been getting a bunch of test messages from the 9front list. Sorry. We've been shaking out some performance issues on a new host. The mailing list was spawning a runq process for every subscriber, which were contending heavily on the queue directory, causing high system load. On the new host, disks were slow enough that this was causing issues. This was fixed by paring down to a single runq process, but that means that sending out emails for our hundreds of subscribers can take a while. This change can probably use some work, but it adds a '-p njob' flag, which allows us to send the jobs in a message queue with limited parallelism. That should bring down the amount of time it takes for a message to work its way through our mailing lists. I'd also like to float the idea of removing the '-a' flag, and reclaiming the '-n' flag for this purpose. If anyone is runnign upas/runq with -a, I propose this alternative: fn runq-a { dirs=(/mail/queue/*) for(d in $dirs) upas/runq ... $d } with -n: fn runq-a { dirs=(/mail/queue/*) for(d in $dirs) upas/runq ... $d & for(d in $dirs) wait } diff -r 808eb735fc45 sys/src/cmd/upas/q/runq.c --- a/sys/src/cmd/upas/q/runq.c Sat Jan 09 12:20:49 2021 -0800 +++ b/sys/src/cmd/upas/q/runq.c Sat Jan 09 18:15:41 2021 -0800 @@ -1,9 +1,25 @@ #include "common.h" #include <ctype.h> +typedef struct Job Job; + +struct Job { + Job *next; + int pid; + int ac; + int dfd; + char **av; + char *buf; /* backing for av */ + Dir *dp; /* not owned */ + Mlock *l; + Biobuf *b; +}; + void doalldirs(void); void dodir(char*); -void dofile(Dir*); +Job* dofile(Dir*); +Job* donefile(Job*, Waitmsg*); +void freejob(Job*); void rundir(char*); char* file(char*, char); void warning(char*, void*); @@ -32,6 +48,7 @@ char **badsys; /* array of recalcitrant systems */ int nbad; int npid = 50; +int njob = 1; /* number of concurrent jobs to invoke */ int sflag; /* single thread per directory */ int aflag; /* all directories */ int Eflag; /* ignore E.xxxxxx dates */ @@ -40,7 +57,7 @@ void usage(void) { - fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n"); + fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] [-p njobs] q-root cmd\n"); exits(""); } @@ -86,6 +103,11 @@ if(qdir == 0) usage(); break; + case 'p': + njob = atoi(ARGF()); + if(njob == 0) + usage(); + break; case 'n': npid = atoi(ARGF()); if(npid == 0) @@ -234,8 +256,9 @@ void rundir(char *name) { - int fd; - long i; + Job *hd, *j, **p; + int nlive, fidx, fd, found; + Waitmsg *w; if(aflag && sflag) fd = sysopenlocked(".", OREAD); @@ -245,15 +268,47 @@ warning("reading %s", name); return; } + fidx= 0; + hd = nil; + nlive = 0; nfiles = dirreadall(fd, &dirbuf); - if(nfiles > 0){ - for(i=0; i<nfiles; i++){ - if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.') + while(nlive > 0 || fidx< nfiles){ + while(fidx< nfiles && nlive < njob){ + if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){ + fidx++; continue; - dofile(&dirbuf[i]); + } + if((j = dofile(&dirbuf[fidx])) != nil){ + nlive++; + j->next = hd; + hd = j; + } + fidx++; } - free(dirbuf); + if(nlive == 0){ + fprint(2, "nothing live\n"); + break; + } +rescan: + if((w = wait()) == nil){ + syslog(0, "runq", "wait error: %r"); + break; + } + found = 0; + for(p = &hd; *p != nil; p = &(*p)->next){ + if(w->pid == (*p)->pid){ + *p = donefile(*p, w); + found++; + nlive--; + break; + } + } + free(w); + if(!found) + goto rescan; } + assert(hd == nil); + free(dirbuf); if(aflag && sflag) sysunlockfile(fd); else @@ -316,15 +371,14 @@ /* * try a message */ -void +Job* dofile(Dir *dp) { + int dtime, efd, i, etime; + Job *j; +// Mlock *l; Dir *d; - int dfd, ac, dtime, efd, pid, i, etime; - char *buf, *cp, **av; - Waitmsg *wm; - Biobuf *b; - Mlock *l = nil; + char *cp; if(debug) fprint(2, "dofile %s\n", dp->name); @@ -337,14 +391,14 @@ if(d == nil){ syslog(0, runqlog, "no data file for %s", dp->name); remmatch(dp->name); - return; + return nil; } if(dp->length == 0){ if(time(0)-dp->mtime > 15*60){ syslog(0, runqlog, "empty ctl file for %s", dp->name); remmatch(dp->name); } - return; + return nil; } dtime = d->mtime; free(d); @@ -358,31 +412,35 @@ if(etime - dtime < 60*60){ /* up to the first hour, try every 15 minutes */ if(time(0) - etime < 15*60) - return; + return nil; } else { /* after the first hour, try once an hour */ if(time(0) - etime < 60*60) - return; + return nil; } - } /* * open control and data */ - b = sysopen(file(dp->name, 'C'), "rl", 0660); - if(b == 0) { + j = malloc(sizeof(Job)); + if(j == nil) + return nil; + memset(j, 0, sizeof(Job)); + j->dp = dp; + j->dfd = -1; + j->b = sysopen(file(dp->name, 'C'), "rl", 0660); + if(j->b == 0) { if(debug) fprint(2, "can't open %s: %r\n", file(dp->name, 'C')); - return; + return nil; } - dfd = open(file(dp->name, 'D'), OREAD); - if(dfd < 0){ + j->dfd = open(file(dp->name, 'D'), OREAD); + if(j->dfd < 0){ if(debug) fprint(2, "can't open %s: %r\n", file(dp->name, 'D')); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; + freejob(j); + return nil; } /* @@ -390,48 +448,36 @@ * - read args into (malloc'd) buffer * - malloc a vector and copy pointers to args into it */ - buf = malloc(dp->length+1); - if(buf == 0){ + + j->buf = malloc(dp->length+1); + if(j->buf == nil){ warning("buffer allocation", 0); - Bterm(b); - sysunlockfile(Bfildes(b)); - close(dfd); - return; + freejob(j); + return nil; } - if(Bread(b, buf, dp->length) != dp->length){ + if(Bread(j->b, j->buf, dp->length) != dp->length){ warning("reading control file %s\n", dp->name); - Bterm(b); - sysunlockfile(Bfildes(b)); - close(dfd); - free(buf); - return; + freejob(j); + return nil; } - buf[dp->length] = 0; - av = malloc(2*sizeof(char*)); - if(av == 0){ + j->buf[dp->length] = 0; + j->av = malloc(2*sizeof(char*)); + if(j->av == 0){ warning("argv allocation", 0); - close(dfd); - free(buf); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; + freejob(j); + return nil; } - for(ac = 1, cp = buf; *cp; ac++){ + for(j->ac = 1, cp = j->buf; *cp; j->ac++){ while(isspace(*cp)) *cp++ = 0; if(*cp == 0) break; - av = realloc(av, (ac+2)*sizeof(char*)); - if(av == 0){ + j->av = realloc(j->av, (j->ac+2)*sizeof(char*)); + if(j->av == 0){ warning("argv allocation", 0); - close(dfd); - free(buf); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; } - av[ac] = cp; + j->av[j->ac] = cp; while(*cp && !isspace(*cp)){ if(*cp++ == '"'){ while(*cp && *cp != '"') @@ -441,18 +487,18 @@ } } } - av[0] = cmd; - av[ac] = 0; + j->av[0] = cmd; + j->av[j->ac] = 0; if(!Eflag &&time(0) - dtime > giveup){ - if(returnmail(av, dp->name, "Giveup") != 0) - logit("returnmail failed", dp->name, av); + if(returnmail(j->av, dp->name, "Giveup") != 0) + logit("returnmail failed", dp->name, j->av); remmatch(dp->name); goto done; } for(i = 0; i < nbad; i++){ - if(strcmp(av[3], badsys[i]) == 0) + if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0) goto done; } @@ -460,33 +506,34 @@ * Ken's fs, for example, gives us 5 minutes of inactivity before * the lock goes stale, so we have to keep reading it. */ - l = keeplockalive(file(dp->name, 'C'), Bfildes(b)); + j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b)); /* * transfer */ - pid = fork(); - switch(pid){ + j->pid = fork(); + switch(j->pid){ case -1: - sysunlock(l); - sysunlockfile(Bfildes(b)); + sysunlock(j->l); + sysunlockfile(Bfildes(j->b)); syslog(0, runqlog, "out of procs"); exits(0); case 0: if(debug) { - fprint(2, "Starting %s", cmd); - for(ac = 0; av[ac]; ac++) - fprint(2, " %s", av[ac]); - fprint(2, "\n"); + fprint(2, "Starting %s\n", cmd); +// for(i = 0; j->av[i]; i++) +// fprint(2, " %s", j->av[i]); +// fprint(2, "\n"); } - logit("execing", dp->name, av); + logit("execing", dp->name, j->av); close(0); - dup(dfd, 0); - close(dfd); + dup(j->dfd, 0); + close(j->dfd); close(2); efd = open(file(dp->name, 'E'), OWRITE); if(efd < 0){ - if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); + if(debug) + syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); efd = create(file(dp->name, 'E'), OWRITE, 0666); if(efd < 0){ if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser()); @@ -494,50 +541,62 @@ } } seek(efd, 0, 2); - exec(cmd, av); + exec(cmd, j->av); error("can't exec %s", cmd); break; default: - for(;;){ - wm = wait(); - if(wm == nil) - error("wait failed: %r", ""); - if(wm->pid == pid) - break; - free(wm); - } - if(debug) - fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); - - if(wm->msg[0]){ - if(debug) - fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); - if(!Rflag && strstr(wm->msg, "Retry")==0){ - /* return the message and remove it */ - if(returnmail(av, dp->name, wm->msg) != 0) - logit("returnmail failed", dp->name, av); - remmatch(dp->name); - } else { - /* add sys to bad list and try again later */ - nbad++; - badsys = realloc(badsys, nbad*sizeof(char*)); - badsys[nbad-1] = strdup(av[3]); - } - } else { - /* it worked remove the message */ - remmatch(dp->name); - } - free(wm); - + return j; } done: - if (l) - sysunlock(l); - Bterm(b); - sysunlockfile(Bfildes(b)); - free(buf); - free(av); - close(dfd); + freejob(j); + return nil; +} + +Job* +donefile(Job *j, Waitmsg *wm) +{ + Job *n; + + if(debug) + fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); + + if(wm->msg[0]){ + if(debug) + fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); + if(!Rflag && strstr(wm->msg, "Retry")==0){ + /* return the message and remove it */ + if(returnmail(j->av, j->dp->name, wm->msg) != 0) + logit("returnmail failed", j->dp->name, j->av); + remmatch(j->dp->name); + } else { + /* add sys to bad list and try again later */ + nbad++; + badsys = realloc(badsys, nbad*sizeof(char*)); + badsys[nbad-1] = strdup(j->av[3]); + } + } else { + /* it worked remove the message */ + remmatch(j->dp->name); + } + n = j->next; + freejob(j); + return n; +} + +void +freejob(Job *j) +{ + if(j->b != nil){ + sysunlockfile(Bfildes(j->b)); + Bterm(j->b); + } + if(j->dfd != -1) + close(j->dfd); + if(j->l != nil) + sysunlock(j->l); + free(j->buf); + free(j->av); + free(j); } ^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [9front] upas/runq: parallel sending 2021-01-10 2:27 [9front] upas/runq: parallel sending ori @ 2021-01-10 8:04 ` hiro 2021-01-17 0:42 ` ori 1 sibling, 0 replies; 4+ messages in thread From: hiro @ 2021-01-10 8:04 UTC (permalink / raw) To: 9front well it took less than 600 seconds to receive all 3 mails on the mailing list for me. but it seems i'm missing 2 mails for rather hours or days in the meantime... it feels like that is not just a delay but an error any more. On 1/10/21, ori@eigenstate.org <ori@eigenstate.org> wrote: > As you may have noticed, you've been getting a > bunch of test messages from the 9front list. Sorry. > We've been shaking out some performance issues on a > new host. > > The mailing list was spawning a runq process for every > subscriber, which were contending heavily on the queue > directory, causing high system load. On the new host, > disks were slow enough that this was causing issues. > > This was fixed by paring down to a single runq process, > but that means that sending out emails for our hundreds > of subscribers can take a while. > > This change can probably use some work, but it adds > a '-p njob' flag, which allows us to send the jobs in > a message queue with limited parallelism. That should > bring down the amount of time it takes for a message > to work its way through our mailing lists. > > I'd also like to float the idea of removing the '-a' > flag, and reclaiming the '-n' flag for this purpose. > If anyone is runnign upas/runq with -a, I propose > this alternative: > > fn runq-a { > dirs=(/mail/queue/*) > for(d in $dirs) upas/runq ... $d > } > > with -n: > > fn runq-a { > dirs=(/mail/queue/*) > for(d in $dirs) upas/runq ... $d & > for(d in $dirs) wait > } > > > diff -r 808eb735fc45 sys/src/cmd/upas/q/runq.c > --- a/sys/src/cmd/upas/q/runq.c Sat Jan 09 12:20:49 2021 -0800 > +++ b/sys/src/cmd/upas/q/runq.c Sat Jan 09 18:15:41 2021 -0800 > @@ -1,9 +1,25 @@ > #include "common.h" > #include <ctype.h> > > +typedef struct Job Job; > + > +struct Job { > + Job *next; > + int pid; > + int ac; > + int dfd; > + char **av; > + char *buf; /* backing for av */ > + Dir *dp; /* not owned */ > + Mlock *l; > + Biobuf *b; > +}; > + > void doalldirs(void); > void dodir(char*); > -void dofile(Dir*); > +Job* dofile(Dir*); > +Job* donefile(Job*, Waitmsg*); > +void freejob(Job*); > void rundir(char*); > char* file(char*, char); > void warning(char*, void*); > @@ -32,6 +48,7 @@ > char **badsys; /* array of recalcitrant systems */ > int nbad; > int npid = 50; > +int njob = 1; /* number of concurrent jobs to invoke */ > int sflag; /* single thread per directory */ > int aflag; /* all directories */ > int Eflag; /* ignore E.xxxxxx dates */ > @@ -40,7 +57,7 @@ > void > usage(void) > { > - fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] > [-n nprocs] q-root cmd\n"); > + fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] > [-n nprocs] [-p njobs] q-root cmd\n"); > exits(""); > } > > @@ -86,6 +103,11 @@ > if(qdir == 0) > usage(); > break; > + case 'p': > + njob = atoi(ARGF()); > + if(njob == 0) > + usage(); > + break; > case 'n': > npid = atoi(ARGF()); > if(npid == 0) > @@ -234,8 +256,9 @@ > void > rundir(char *name) > { > - int fd; > - long i; > + Job *hd, *j, **p; > + int nlive, fidx, fd, found; > + Waitmsg *w; > > if(aflag && sflag) > fd = sysopenlocked(".", OREAD); > @@ -245,15 +268,47 @@ > warning("reading %s", name); > return; > } > + fidx= 0; > + hd = nil; > + nlive = 0; > nfiles = dirreadall(fd, &dirbuf); > - if(nfiles > 0){ > - for(i=0; i<nfiles; i++){ > - if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.') > + while(nlive > 0 || fidx< nfiles){ > + while(fidx< nfiles && nlive < njob){ > + if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){ > + fidx++; > continue; > - dofile(&dirbuf[i]); > + } > + if((j = dofile(&dirbuf[fidx])) != nil){ > + nlive++; > + j->next = hd; > + hd = j; > + } > + fidx++; > } > - free(dirbuf); > + if(nlive == 0){ > + fprint(2, "nothing live\n"); > + break; > + } > +rescan: > + if((w = wait()) == nil){ > + syslog(0, "runq", "wait error: %r"); > + break; > + } > + found = 0; > + for(p = &hd; *p != nil; p = &(*p)->next){ > + if(w->pid == (*p)->pid){ > + *p = donefile(*p, w); > + found++; > + nlive--; > + break; > + } > + } > + free(w); > + if(!found) > + goto rescan; > } > + assert(hd == nil); > + free(dirbuf); > if(aflag && sflag) > sysunlockfile(fd); > else > @@ -316,15 +371,14 @@ > /* > * try a message > */ > -void > +Job* > dofile(Dir *dp) > { > + int dtime, efd, i, etime; > + Job *j; > +// Mlock *l; > Dir *d; > - int dfd, ac, dtime, efd, pid, i, etime; > - char *buf, *cp, **av; > - Waitmsg *wm; > - Biobuf *b; > - Mlock *l = nil; > + char *cp; > > if(debug) > fprint(2, "dofile %s\n", dp->name); > @@ -337,14 +391,14 @@ > if(d == nil){ > syslog(0, runqlog, "no data file for %s", dp->name); > remmatch(dp->name); > - return; > + return nil; > } > if(dp->length == 0){ > if(time(0)-dp->mtime > 15*60){ > syslog(0, runqlog, "empty ctl file for %s", dp->name); > remmatch(dp->name); > } > - return; > + return nil; > } > dtime = d->mtime; > free(d); > @@ -358,31 +412,35 @@ > if(etime - dtime < 60*60){ > /* up to the first hour, try every 15 minutes */ > if(time(0) - etime < 15*60) > - return; > + return nil; > } else { > /* after the first hour, try once an hour */ > if(time(0) - etime < 60*60) > - return; > + return nil; > } > - > } > > /* > * open control and data > */ > - b = sysopen(file(dp->name, 'C'), "rl", 0660); > - if(b == 0) { > + j = malloc(sizeof(Job)); > + if(j == nil) > + return nil; > + memset(j, 0, sizeof(Job)); > + j->dp = dp; > + j->dfd = -1; > + j->b = sysopen(file(dp->name, 'C'), "rl", 0660); > + if(j->b == 0) { > if(debug) > fprint(2, "can't open %s: %r\n", file(dp->name, 'C')); > - return; > + return nil; > } > - dfd = open(file(dp->name, 'D'), OREAD); > - if(dfd < 0){ > + j->dfd = open(file(dp->name, 'D'), OREAD); > + if(j->dfd < 0){ > if(debug) > fprint(2, "can't open %s: %r\n", file(dp->name, 'D')); > - Bterm(b); > - sysunlockfile(Bfildes(b)); > - return; > + freejob(j); > + return nil; > } > > /* > @@ -390,48 +448,36 @@ > * - read args into (malloc'd) buffer > * - malloc a vector and copy pointers to args into it > */ > - buf = malloc(dp->length+1); > - if(buf == 0){ > + > + j->buf = malloc(dp->length+1); > + if(j->buf == nil){ > warning("buffer allocation", 0); > - Bterm(b); > - sysunlockfile(Bfildes(b)); > - close(dfd); > - return; > + freejob(j); > + return nil; > } > - if(Bread(b, buf, dp->length) != dp->length){ > + if(Bread(j->b, j->buf, dp->length) != dp->length){ > warning("reading control file %s\n", dp->name); > - Bterm(b); > - sysunlockfile(Bfildes(b)); > - close(dfd); > - free(buf); > - return; > + freejob(j); > + return nil; > } > - buf[dp->length] = 0; > - av = malloc(2*sizeof(char*)); > - if(av == 0){ > + j->buf[dp->length] = 0; > + j->av = malloc(2*sizeof(char*)); > + if(j->av == 0){ > warning("argv allocation", 0); > - close(dfd); > - free(buf); > - Bterm(b); > - sysunlockfile(Bfildes(b)); > - return; > + freejob(j); > + return nil; > } > - for(ac = 1, cp = buf; *cp; ac++){ > + for(j->ac = 1, cp = j->buf; *cp; j->ac++){ > while(isspace(*cp)) > *cp++ = 0; > if(*cp == 0) > break; > > - av = realloc(av, (ac+2)*sizeof(char*)); > - if(av == 0){ > + j->av = realloc(j->av, (j->ac+2)*sizeof(char*)); > + if(j->av == 0){ > warning("argv allocation", 0); > - close(dfd); > - free(buf); > - Bterm(b); > - sysunlockfile(Bfildes(b)); > - return; > } > - av[ac] = cp; > + j->av[j->ac] = cp; > while(*cp && !isspace(*cp)){ > if(*cp++ == '"'){ > while(*cp && *cp != '"') > @@ -441,18 +487,18 @@ > } > } > } > - av[0] = cmd; > - av[ac] = 0; > + j->av[0] = cmd; > + j->av[j->ac] = 0; > > if(!Eflag &&time(0) - dtime > giveup){ > - if(returnmail(av, dp->name, "Giveup") != 0) > - logit("returnmail failed", dp->name, av); > + if(returnmail(j->av, dp->name, "Giveup") != 0) > + logit("returnmail failed", dp->name, j->av); > remmatch(dp->name); > goto done; > } > > for(i = 0; i < nbad; i++){ > - if(strcmp(av[3], badsys[i]) == 0) > + if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0) > goto done; > } > > @@ -460,33 +506,34 @@ > * Ken's fs, for example, gives us 5 minutes of inactivity before > * the lock goes stale, so we have to keep reading it. > */ > - l = keeplockalive(file(dp->name, 'C'), Bfildes(b)); > + j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b)); > > /* > * transfer > */ > - pid = fork(); > - switch(pid){ > + j->pid = fork(); > + switch(j->pid){ > case -1: > - sysunlock(l); > - sysunlockfile(Bfildes(b)); > + sysunlock(j->l); > + sysunlockfile(Bfildes(j->b)); > syslog(0, runqlog, "out of procs"); > exits(0); > case 0: > if(debug) { > - fprint(2, "Starting %s", cmd); > - for(ac = 0; av[ac]; ac++) > - fprint(2, " %s", av[ac]); > - fprint(2, "\n"); > + fprint(2, "Starting %s\n", cmd); > +// for(i = 0; j->av[i]; i++) > +// fprint(2, " %s", j->av[i]); > +// fprint(2, "\n"); > } > - logit("execing", dp->name, av); > + logit("execing", dp->name, j->av); > close(0); > - dup(dfd, 0); > - close(dfd); > + dup(j->dfd, 0); > + close(j->dfd); > close(2); > efd = open(file(dp->name, 'E'), OWRITE); > if(efd < 0){ > - if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), > getuser()); > + if(debug) > + syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); > efd = create(file(dp->name, 'E'), OWRITE, 0666); > if(efd < 0){ > if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), > getuser()); > @@ -494,50 +541,62 @@ > } > } > seek(efd, 0, 2); > - exec(cmd, av); > + exec(cmd, j->av); > error("can't exec %s", cmd); > break; > default: > - for(;;){ > - wm = wait(); > - if(wm == nil) > - error("wait failed: %r", ""); > - if(wm->pid == pid) > - break; > - free(wm); > - } > - if(debug) > - fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); > - > - if(wm->msg[0]){ > - if(debug) > - fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); > - if(!Rflag && strstr(wm->msg, "Retry")==0){ > - /* return the message and remove it */ > - if(returnmail(av, dp->name, wm->msg) != 0) > - logit("returnmail failed", dp->name, av); > - remmatch(dp->name); > - } else { > - /* add sys to bad list and try again later */ > - nbad++; > - badsys = realloc(badsys, nbad*sizeof(char*)); > - badsys[nbad-1] = strdup(av[3]); > - } > - } else { > - /* it worked remove the message */ > - remmatch(dp->name); > - } > - free(wm); > - > + return j; > } > done: > - if (l) > - sysunlock(l); > - Bterm(b); > - sysunlockfile(Bfildes(b)); > - free(buf); > - free(av); > - close(dfd); > + freejob(j); > + return nil; > +} > + > +Job* > +donefile(Job *j, Waitmsg *wm) > +{ > + Job *n; > + > + if(debug) > + fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); > + > + if(wm->msg[0]){ > + if(debug) > + fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); > + if(!Rflag && strstr(wm->msg, "Retry")==0){ > + /* return the message and remove it */ > + if(returnmail(j->av, j->dp->name, wm->msg) != 0) > + logit("returnmail failed", j->dp->name, j->av); > + remmatch(j->dp->name); > + } else { > + /* add sys to bad list and try again later */ > + nbad++; > + badsys = realloc(badsys, nbad*sizeof(char*)); > + badsys[nbad-1] = strdup(j->av[3]); > + } > + } else { > + /* it worked remove the message */ > + remmatch(j->dp->name); > + } > + n = j->next; > + freejob(j); > + return n; > +} > + > +void > +freejob(Job *j) > +{ > + if(j->b != nil){ > + sysunlockfile(Bfildes(j->b)); > + Bterm(j->b); > + } > + if(j->dfd != -1) > + close(j->dfd); > + if(j->l != nil) > + sysunlock(j->l); > + free(j->buf); > + free(j->av); > + free(j); > } > > > ^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [9front] upas/runq: parallel sending 2021-01-10 2:27 [9front] upas/runq: parallel sending ori 2021-01-10 8:04 ` hiro @ 2021-01-17 0:42 ` ori 2021-01-17 1:06 ` ori 1 sibling, 1 reply; 4+ messages in thread From: ori @ 2021-01-17 0:42 UTC (permalink / raw) To: 9front I've been running the previous patch for the last week without problems: has anyone else had a chance to test that it doesn't break their systems? Quoth ori@eigenstate.org: > I'd also like to float the idea of removing the '-a' > flag, and reclaiming the '-n' flag for this purpose. > If anyone is runnign upas/runq with -a, I propose > this alternative: > Here's a more agressive version that removes the multi-user queue sweeping code. I doubt that anyone is using it. If they are, I'd be interested in knowing if there's an issue with migrating to xargs or a shell loop that runs each job in parallel by directory. If there are no objections, I'm going to let this version bake for a week on my own system, and then commit next weekend. diff -r 24a38dd884b0 sys/src/cmd/upas/q/runq.c --- a/sys/src/cmd/upas/q/runq.c Sat Jan 16 16:17:27 2021 -0800 +++ b/sys/src/cmd/upas/q/runq.c Sat Jan 16 16:40:41 2021 -0800 @@ -1,9 +1,25 @@ #include "common.h" #include <ctype.h> +typedef struct Job Job; + +struct Job { + Job *next; + int pid; + int ac; + int dfd; + char **av; + char *buf; /* backing for av */ + Dir *dp; /* not owned */ + Mlock *l; + Biobuf *b; +}; + void doalldirs(void); void dodir(char*); -void dofile(Dir*); +Job* dofile(Dir*); +Job* donefile(Job*, Waitmsg*); +void freejob(Job*); void rundir(char*); char* file(char*, char); void warning(char*, void*); @@ -17,7 +33,6 @@ char *root; int debug; int giveup = 2*24*60*60; -int load; int limit; /* the current directory */ @@ -28,67 +43,48 @@ char *runqlog = "runq"; -int *pidlist; char **badsys; /* array of recalcitrant systems */ int nbad; -int npid = 50; -int sflag; /* single thread per directory */ -int aflag; /* all directories */ +int njob = 1; /* number of concurrent jobs to invoke */ int Eflag; /* ignore E.xxxxxx dates */ int Rflag; /* no giving up, ever */ void usage(void) { - fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n"); + fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] [-p njobs] q-root cmd\n"); exits(""); } void main(int argc, char **argv) { - char *qdir, *x; + char *qdir; qdir = 0; ARGBEGIN{ - case 'l': - x = ARGF(); - if(x == 0) - usage(); - load = atoi(x); - if(load < 0) - load = 0; - break; case 'E': Eflag++; break; case 'R': /* no giving up -- just leave stuff in the queue */ Rflag++; break; - case 'a': - aflag++; - break; case 'd': debug++; break; case 'r': - limit = atoi(ARGF()); - break; - case 's': - sflag++; + limit = atoi(EARGF(usage())); break; case 't': - giveup = 60*60*atoi(ARGF()); + giveup = 60*60*atoi(EARGF(usage())); break; case 'q': - qdir = ARGF(); - if(qdir == 0) - usage(); + qdir = EARGF(usage()); break; case 'n': - npid = atoi(ARGF()); - if(npid == 0) + njob = atoi(EARGF(usage())); + if(njob == 0) usage(); break; }ARGEND; @@ -96,27 +92,17 @@ if(argc != 2) usage(); - pidlist = malloc(npid*sizeof(*pidlist)); - if(pidlist == 0) - error("can't malloc", 0); - - if(aflag == 0 && qdir == 0) { + if(qdir == nil) qdir = getuser(); - if(qdir == 0) - error("unknown user", 0); - } + if(qdir == nil) + error("unknown user", 0); root = argv[0]; cmd = argv[1]; if(chdir(root) < 0) error("can't cd to %s", root); - doload(1); - if(aflag) - doalldirs(); - else - dodir(qdir); - doload(0); + dodir(qdir); exits(0); } @@ -142,74 +128,6 @@ return 0; } -int -forkltd(void) -{ - int i; - int pid; - - for(i = 0; i < npid; i++){ - if(pidlist[i] <= 0) - break; - } - - while(i >= npid){ - pid = waitpid(); - if(pid < 0){ - syslog(0, runqlog, "forkltd confused"); - exits(0); - } - - for(i = 0; i < npid; i++) - if(pidlist[i] == pid) - break; - } - pidlist[i] = fork(); - return pidlist[i]; -} - -/* - * run all user directories, must be bootes (or root on unix) to do this - */ -void -doalldirs(void) -{ - Dir *db; - int fd; - long i, n; - - - fd = open(".", OREAD); - if(fd == -1){ - warning("reading %s", root); - return; - } - n = dirreadall(fd, &db); - if(n > 0){ - for(i=0; i<n; i++){ - if(db[i].qid.type & QTDIR){ - if(emptydir(db[i].name)) - continue; - switch(forkltd()){ - case -1: - syslog(0, runqlog, "out of procs"); - doload(0); - exits(0); - case 0: - if(sysdetach() < 0) - error("%r", 0); - dodir(db[i].name); - exits(0); - default: - break; - } - } - } - free(db); - } - close(fd); -} - /* * cd to a user directory and run it */ @@ -234,30 +152,57 @@ void rundir(char *name) { - int fd; - long i; + Job *hd, *j, **p; + int nlive, fidx, fd, found; + Waitmsg *w; - if(aflag && sflag) - fd = sysopenlocked(".", OREAD); - else - fd = open(".", OREAD); + fd = open(".", OREAD); if(fd == -1){ warning("reading %s", name); return; } + fidx= 0; + hd = nil; + nlive = 0; nfiles = dirreadall(fd, &dirbuf); - if(nfiles > 0){ - for(i=0; i<nfiles; i++){ - if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.') + while(nlive > 0 || fidx< nfiles){ + while(fidx< nfiles && nlive < njob){ + if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){ + fidx++; continue; - dofile(&dirbuf[i]); + } + if((j = dofile(&dirbuf[fidx])) != nil){ + nlive++; + j->next = hd; + hd = j; + } + fidx++; } - free(dirbuf); + if(nlive == 0){ + fprint(2, "nothing live\n"); + break; + } +rescan: + if((w = wait()) == nil){ + syslog(0, "runq", "wait error: %r"); + break; + } + found = 0; + for(p = &hd; *p != nil; p = &(*p)->next){ + if(w->pid == (*p)->pid){ + *p = donefile(*p, w); + found++; + nlive--; + break; + } + } + free(w); + if(!found) + goto rescan; } - if(aflag && sflag) - sysunlockfile(fd); - else - close(fd); + assert(hd == nil); + free(dirbuf); + close(fd); } /* @@ -316,15 +261,14 @@ /* * try a message */ -void +Job* dofile(Dir *dp) { + int dtime, efd, i, etime; + Job *j; +// Mlock *l; Dir *d; - int dfd, ac, dtime, efd, pid, i, etime; - char *buf, *cp, **av; - Waitmsg *wm; - Biobuf *b; - Mlock *l = nil; + char *cp; if(debug) fprint(2, "dofile %s\n", dp->name); @@ -337,14 +281,14 @@ if(d == nil){ syslog(0, runqlog, "no data file for %s", dp->name); remmatch(dp->name); - return; + return nil; } if(dp->length == 0){ if(time(0)-dp->mtime > 15*60){ syslog(0, runqlog, "empty ctl file for %s", dp->name); remmatch(dp->name); } - return; + return nil; } dtime = d->mtime; free(d); @@ -358,31 +302,35 @@ if(etime - dtime < 60*60){ /* up to the first hour, try every 15 minutes */ if(time(0) - etime < 15*60) - return; + return nil; } else { /* after the first hour, try once an hour */ if(time(0) - etime < 60*60) - return; + return nil; } - } /* * open control and data */ - b = sysopen(file(dp->name, 'C'), "rl", 0660); - if(b == 0) { + j = malloc(sizeof(Job)); + if(j == nil) + return nil; + memset(j, 0, sizeof(Job)); + j->dp = dp; + j->dfd = -1; + j->b = sysopen(file(dp->name, 'C'), "rl", 0660); + if(j->b == 0) { if(debug) fprint(2, "can't open %s: %r\n", file(dp->name, 'C')); - return; + return nil; } - dfd = open(file(dp->name, 'D'), OREAD); - if(dfd < 0){ + j->dfd = open(file(dp->name, 'D'), OREAD); + if(j->dfd < 0){ if(debug) fprint(2, "can't open %s: %r\n", file(dp->name, 'D')); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; + freejob(j); + return nil; } /* @@ -390,48 +338,36 @@ * - read args into (malloc'd) buffer * - malloc a vector and copy pointers to args into it */ - buf = malloc(dp->length+1); - if(buf == 0){ + + j->buf = malloc(dp->length+1); + if(j->buf == nil){ warning("buffer allocation", 0); - Bterm(b); - sysunlockfile(Bfildes(b)); - close(dfd); - return; + freejob(j); + return nil; } - if(Bread(b, buf, dp->length) != dp->length){ + if(Bread(j->b, j->buf, dp->length) != dp->length){ warning("reading control file %s\n", dp->name); - Bterm(b); - sysunlockfile(Bfildes(b)); - close(dfd); - free(buf); - return; + freejob(j); + return nil; } - buf[dp->length] = 0; - av = malloc(2*sizeof(char*)); - if(av == 0){ + j->buf[dp->length] = 0; + j->av = malloc(2*sizeof(char*)); + if(j->av == 0){ warning("argv allocation", 0); - close(dfd); - free(buf); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; + freejob(j); + return nil; } - for(ac = 1, cp = buf; *cp; ac++){ + for(j->ac = 1, cp = j->buf; *cp; j->ac++){ while(isspace(*cp)) *cp++ = 0; if(*cp == 0) break; - av = realloc(av, (ac+2)*sizeof(char*)); - if(av == 0){ + j->av = realloc(j->av, (j->ac+2)*sizeof(char*)); + if(j->av == 0){ warning("argv allocation", 0); - close(dfd); - free(buf); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; } - av[ac] = cp; + j->av[j->ac] = cp; while(*cp && !isspace(*cp)){ if(*cp++ == '"'){ while(*cp && *cp != '"') @@ -441,18 +377,18 @@ } } } - av[0] = cmd; - av[ac] = 0; + j->av[0] = cmd; + j->av[j->ac] = 0; if(!Eflag &&time(0) - dtime > giveup){ - if(returnmail(av, dp->name, "Giveup") != 0) - logit("returnmail failed", dp->name, av); + if(returnmail(j->av, dp->name, "Giveup") != 0) + logit("returnmail failed", dp->name, j->av); remmatch(dp->name); goto done; } for(i = 0; i < nbad; i++){ - if(strcmp(av[3], badsys[i]) == 0) + if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0) goto done; } @@ -460,33 +396,34 @@ * Ken's fs, for example, gives us 5 minutes of inactivity before * the lock goes stale, so we have to keep reading it. */ - l = keeplockalive(file(dp->name, 'C'), Bfildes(b)); + j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b)); /* * transfer */ - pid = fork(); - switch(pid){ + j->pid = fork(); + switch(j->pid){ case -1: - sysunlock(l); - sysunlockfile(Bfildes(b)); + sysunlock(j->l); + sysunlockfile(Bfildes(j->b)); syslog(0, runqlog, "out of procs"); exits(0); case 0: if(debug) { - fprint(2, "Starting %s", cmd); - for(ac = 0; av[ac]; ac++) - fprint(2, " %s", av[ac]); - fprint(2, "\n"); + fprint(2, "Starting %s\n", cmd); +// for(i = 0; j->av[i]; i++) +// fprint(2, " %s", j->av[i]); +// fprint(2, "\n"); } - logit("execing", dp->name, av); + logit("execing", dp->name, j->av); close(0); - dup(dfd, 0); - close(dfd); + dup(j->dfd, 0); + close(j->dfd); close(2); efd = open(file(dp->name, 'E'), OWRITE); if(efd < 0){ - if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); + if(debug) + syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); efd = create(file(dp->name, 'E'), OWRITE, 0666); if(efd < 0){ if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser()); @@ -494,50 +431,62 @@ } } seek(efd, 0, 2); - exec(cmd, av); + exec(cmd, j->av); error("can't exec %s", cmd); break; default: - for(;;){ - wm = wait(); - if(wm == nil) - error("wait failed: %r", ""); - if(wm->pid == pid) - break; - free(wm); - } - if(debug) - fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); - - if(wm->msg[0]){ - if(debug) - fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); - if(!Rflag && strstr(wm->msg, "Retry")==0){ - /* return the message and remove it */ - if(returnmail(av, dp->name, wm->msg) != 0) - logit("returnmail failed", dp->name, av); - remmatch(dp->name); - } else { - /* add sys to bad list and try again later */ - nbad++; - badsys = realloc(badsys, nbad*sizeof(char*)); - badsys[nbad-1] = strdup(av[3]); - } - } else { - /* it worked remove the message */ - remmatch(dp->name); - } - free(wm); - + return j; } done: - if (l) - sysunlock(l); - Bterm(b); - sysunlockfile(Bfildes(b)); - free(buf); - free(av); - close(dfd); + freejob(j); + return nil; +} + +Job* +donefile(Job *j, Waitmsg *wm) +{ + Job *n; + + if(debug) + fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); + + if(wm->msg[0]){ + if(debug) + fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); + if(!Rflag && strstr(wm->msg, "Retry")==0){ + /* return the message and remove it */ + if(returnmail(j->av, j->dp->name, wm->msg) != 0) + logit("returnmail failed", j->dp->name, j->av); + remmatch(j->dp->name); + } else { + /* add sys to bad list and try again later */ + nbad++; + badsys = realloc(badsys, nbad*sizeof(char*)); + badsys[nbad-1] = strdup(j->av[3]); + } + } else { + /* it worked remove the message */ + remmatch(j->dp->name); + } + n = j->next; + freejob(j); + return n; +} + +void +freejob(Job *j) +{ + if(j->b != nil){ + sysunlockfile(Bfildes(j->b)); + Bterm(j->b); + } + if(j->dfd != -1) + close(j->dfd); + if(j->l != nil) + sysunlock(j->l); + free(j->buf); + free(j->av); + free(j); } @@ -691,71 +640,3 @@ } syslog(0, runqlog, "%s", buf); } - -char *loadfile = ".runqload"; - -/* - * load balancing - */ -void -doload(int start) -{ - int fd; - char buf[32]; - int i, n; - Mlock *l; - Dir *d; - - if(load <= 0) - return; - - if(chdir(root) < 0){ - load = 0; - return; - } - - l = syslock(loadfile); - fd = open(loadfile, ORDWR); - if(fd < 0){ - fd = create(loadfile, 0666, ORDWR); - if(fd < 0){ - load = 0; - sysunlock(l); - return; - } - } - - /* get current load */ - i = 0; - n = read(fd, buf, sizeof(buf)-1); - if(n >= 0){ - buf[n] = 0; - i = atoi(buf); - } - if(i < 0) - i = 0; - - /* ignore load if file hasn't been changed in 30 minutes */ - d = dirfstat(fd); - if(d != nil){ - if(d->mtime + 30*60 < time(0)) - i = 0; - free(d); - } - - /* if load already too high, give up */ - if(start && i >= load){ - sysunlock(l); - exits(0); - } - - /* increment/decrement load */ - if(start) - i++; - else - i--; - seek(fd, 0, 0); - fprint(fd, "%d\n", i); - sysunlock(l); - close(fd); -} ^ permalink raw reply [flat|nested] 4+ messages in thread
* Re: [9front] upas/runq: parallel sending 2021-01-17 0:42 ` ori @ 2021-01-17 1:06 ` ori 0 siblings, 0 replies; 4+ messages in thread From: ori @ 2021-01-17 1:06 UTC (permalink / raw) To: 9front Quoth ori@eigenstate.org: > I've been running the previous patch for the > last week without problems: has anyone else > had a chance to test that it doesn't break > their systems? > > Quoth ori@eigenstate.org: > > I'd also like to float the idea of removing the '-a' > > flag, and reclaiming the '-n' flag for this purpose. > > If anyone is runnign upas/runq with -a, I propose > > this alternative: > > > > Here's a more agressive version that removes > the multi-user queue sweeping code. I doubt > that anyone is using it. If they are, I'd be > interested in knowing if there's an issue with > migrating to xargs or a shell loop that runs > each job in parallel by directory. > > If there are no objections, I'm going to let > this version bake for a week on my own system, > and then commit next weekend. And, another update with some commented code removed and docs updated. diff -r 24a38dd884b0 sys/man/8/qer --- a/sys/man/8/qer Sat Jan 16 16:17:27 2021 -0800 +++ b/sys/man/8/qer Sat Jan 16 17:05:12 2021 -0800 @@ -15,7 +15,7 @@ .br .B runq [ -.B -adsER +.B -dER ] [ .B -f @@ -26,10 +26,6 @@ .I subdir ] [ -.B -l -.I load -] -[ .B -t .I time ] @@ -39,7 +35,7 @@ ] [ .B -n -.I nprocs +.I njobs ] .I root cmd .SH DESCRIPTION @@ -84,10 +80,7 @@ .I Runq processes the files queued by .IR qer . -Without the -.B -a -option, -.I runq +.I Runq processes all requests in the directory .IR root / subdir , where @@ -96,9 +89,6 @@ .B -q if present, else the contents of .BR /dev/user . -With the -.B -a -it processes all requests. Each request is processed by executing the command .I cmd with the contents of the control file as its arguments, @@ -172,31 +162,12 @@ .I -q flag. .P -The -.BR -s , -.BR -n , -and -.B -l -flags are only meaningful with the -.B -a -flag. They control amount of parallelism that -is used when sweeping all of the queues. The argument following the +The argument following the .B -n -flag specifies the number of queues that are swept -in parallel; the default is 50. The argument following the -.B -l -flag specifies the total number of queues that are being swept. -By default, there is no limit. The number of active sweeps -is cumulative over all active executions of -.IR runq . -The -.B -s -flag forces each queue directory to be processed by exactly -one instance of -.IR runq . -This is useful on systems that connect to slow -external systems and prevents all the queue sweeps from -piling up trying to process a few slow systems. +flag specifies the number of queued jobs that are processed +in parallel from the queue; the default is 1. +This is useful for a large queue to be processed with a bounded +amount of parallelism. .PP .I Runq is often called from diff -r 24a38dd884b0 sys/src/cmd/upas/q/runq.c --- a/sys/src/cmd/upas/q/runq.c Sat Jan 16 16:17:27 2021 -0800 +++ b/sys/src/cmd/upas/q/runq.c Sat Jan 16 17:05:12 2021 -0800 @@ -1,9 +1,25 @@ #include "common.h" #include <ctype.h> +typedef struct Job Job; + +struct Job { + Job *next; + int pid; + int ac; + int dfd; + char **av; + char *buf; /* backing for av */ + Dir *dp; /* not owned */ + Mlock *l; + Biobuf *b; +}; + void doalldirs(void); void dodir(char*); -void dofile(Dir*); +Job* dofile(Dir*); +Job* donefile(Job*, Waitmsg*); +void freejob(Job*); void rundir(char*); char* file(char*, char); void warning(char*, void*); @@ -17,7 +33,6 @@ char *root; int debug; int giveup = 2*24*60*60; -int load; int limit; /* the current directory */ @@ -28,67 +43,48 @@ char *runqlog = "runq"; -int *pidlist; char **badsys; /* array of recalcitrant systems */ int nbad; -int npid = 50; -int sflag; /* single thread per directory */ -int aflag; /* all directories */ +int njob = 1; /* number of concurrent jobs to invoke */ int Eflag; /* ignore E.xxxxxx dates */ int Rflag; /* no giving up, ever */ void usage(void) { - fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n"); - exits(""); + fprint(2, "usage: runq [-dE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n"); + exits("usage"); } void main(int argc, char **argv) { - char *qdir, *x; + char *qdir; qdir = 0; ARGBEGIN{ - case 'l': - x = ARGF(); - if(x == 0) - usage(); - load = atoi(x); - if(load < 0) - load = 0; - break; case 'E': Eflag++; break; case 'R': /* no giving up -- just leave stuff in the queue */ Rflag++; break; - case 'a': - aflag++; - break; case 'd': debug++; break; case 'r': - limit = atoi(ARGF()); - break; - case 's': - sflag++; + limit = atoi(EARGF(usage())); break; case 't': - giveup = 60*60*atoi(ARGF()); + giveup = 60*60*atoi(EARGF(usage())); break; case 'q': - qdir = ARGF(); - if(qdir == 0) - usage(); + qdir = EARGF(usage()); break; case 'n': - npid = atoi(ARGF()); - if(npid == 0) + njob = atoi(EARGF(usage())); + if(njob == 0) usage(); break; }ARGEND; @@ -96,27 +92,17 @@ if(argc != 2) usage(); - pidlist = malloc(npid*sizeof(*pidlist)); - if(pidlist == 0) - error("can't malloc", 0); - - if(aflag == 0 && qdir == 0) { + if(qdir == nil) qdir = getuser(); - if(qdir == 0) - error("unknown user", 0); - } + if(qdir == nil) + error("unknown user", 0); root = argv[0]; cmd = argv[1]; if(chdir(root) < 0) error("can't cd to %s", root); - doload(1); - if(aflag) - doalldirs(); - else - dodir(qdir); - doload(0); + dodir(qdir); exits(0); } @@ -142,74 +128,6 @@ return 0; } -int -forkltd(void) -{ - int i; - int pid; - - for(i = 0; i < npid; i++){ - if(pidlist[i] <= 0) - break; - } - - while(i >= npid){ - pid = waitpid(); - if(pid < 0){ - syslog(0, runqlog, "forkltd confused"); - exits(0); - } - - for(i = 0; i < npid; i++) - if(pidlist[i] == pid) - break; - } - pidlist[i] = fork(); - return pidlist[i]; -} - -/* - * run all user directories, must be bootes (or root on unix) to do this - */ -void -doalldirs(void) -{ - Dir *db; - int fd; - long i, n; - - - fd = open(".", OREAD); - if(fd == -1){ - warning("reading %s", root); - return; - } - n = dirreadall(fd, &db); - if(n > 0){ - for(i=0; i<n; i++){ - if(db[i].qid.type & QTDIR){ - if(emptydir(db[i].name)) - continue; - switch(forkltd()){ - case -1: - syslog(0, runqlog, "out of procs"); - doload(0); - exits(0); - case 0: - if(sysdetach() < 0) - error("%r", 0); - dodir(db[i].name); - exits(0); - default: - break; - } - } - } - free(db); - } - close(fd); -} - /* * cd to a user directory and run it */ @@ -234,30 +152,57 @@ void rundir(char *name) { - int fd; - long i; + Job *hd, *j, **p; + int nlive, fidx, fd, found; + Waitmsg *w; - if(aflag && sflag) - fd = sysopenlocked(".", OREAD); - else - fd = open(".", OREAD); + fd = open(".", OREAD); if(fd == -1){ warning("reading %s", name); return; } + fidx= 0; + hd = nil; + nlive = 0; nfiles = dirreadall(fd, &dirbuf); - if(nfiles > 0){ - for(i=0; i<nfiles; i++){ - if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.') + while(nlive > 0 || fidx< nfiles){ + while(fidx< nfiles && nlive < njob){ + if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){ + fidx++; continue; - dofile(&dirbuf[i]); + } + if((j = dofile(&dirbuf[fidx])) != nil){ + nlive++; + j->next = hd; + hd = j; + } + fidx++; } - free(dirbuf); + if(nlive == 0){ + fprint(2, "nothing live\n"); + break; + } +rescan: + if((w = wait()) == nil){ + syslog(0, "runq", "wait error: %r"); + break; + } + found = 0; + for(p = &hd; *p != nil; p = &(*p)->next){ + if(w->pid == (*p)->pid){ + *p = donefile(*p, w); + found++; + nlive--; + break; + } + } + free(w); + if(!found) + goto rescan; } - if(aflag && sflag) - sysunlockfile(fd); - else - close(fd); + assert(hd == nil); + free(dirbuf); + close(fd); } /* @@ -316,15 +261,13 @@ /* * try a message */ -void +Job* dofile(Dir *dp) { + int dtime, efd, i, etime; + Job *j; Dir *d; - int dfd, ac, dtime, efd, pid, i, etime; - char *buf, *cp, **av; - Waitmsg *wm; - Biobuf *b; - Mlock *l = nil; + char *cp; if(debug) fprint(2, "dofile %s\n", dp->name); @@ -337,14 +280,14 @@ if(d == nil){ syslog(0, runqlog, "no data file for %s", dp->name); remmatch(dp->name); - return; + return nil; } if(dp->length == 0){ if(time(0)-dp->mtime > 15*60){ syslog(0, runqlog, "empty ctl file for %s", dp->name); remmatch(dp->name); } - return; + return nil; } dtime = d->mtime; free(d); @@ -358,31 +301,35 @@ if(etime - dtime < 60*60){ /* up to the first hour, try every 15 minutes */ if(time(0) - etime < 15*60) - return; + return nil; } else { /* after the first hour, try once an hour */ if(time(0) - etime < 60*60) - return; + return nil; } - } /* * open control and data */ - b = sysopen(file(dp->name, 'C'), "rl", 0660); - if(b == 0) { + j = malloc(sizeof(Job)); + if(j == nil) + return nil; + memset(j, 0, sizeof(Job)); + j->dp = dp; + j->dfd = -1; + j->b = sysopen(file(dp->name, 'C'), "rl", 0660); + if(j->b == 0) { if(debug) fprint(2, "can't open %s: %r\n", file(dp->name, 'C')); - return; + return nil; } - dfd = open(file(dp->name, 'D'), OREAD); - if(dfd < 0){ + j->dfd = open(file(dp->name, 'D'), OREAD); + if(j->dfd < 0){ if(debug) fprint(2, "can't open %s: %r\n", file(dp->name, 'D')); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; + freejob(j); + return nil; } /* @@ -390,48 +337,36 @@ * - read args into (malloc'd) buffer * - malloc a vector and copy pointers to args into it */ - buf = malloc(dp->length+1); - if(buf == 0){ + + j->buf = malloc(dp->length+1); + if(j->buf == nil){ warning("buffer allocation", 0); - Bterm(b); - sysunlockfile(Bfildes(b)); - close(dfd); - return; + freejob(j); + return nil; } - if(Bread(b, buf, dp->length) != dp->length){ + if(Bread(j->b, j->buf, dp->length) != dp->length){ warning("reading control file %s\n", dp->name); - Bterm(b); - sysunlockfile(Bfildes(b)); - close(dfd); - free(buf); - return; + freejob(j); + return nil; } - buf[dp->length] = 0; - av = malloc(2*sizeof(char*)); - if(av == 0){ + j->buf[dp->length] = 0; + j->av = malloc(2*sizeof(char*)); + if(j->av == 0){ warning("argv allocation", 0); - close(dfd); - free(buf); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; + freejob(j); + return nil; } - for(ac = 1, cp = buf; *cp; ac++){ + for(j->ac = 1, cp = j->buf; *cp; j->ac++){ while(isspace(*cp)) *cp++ = 0; if(*cp == 0) break; - av = realloc(av, (ac+2)*sizeof(char*)); - if(av == 0){ + j->av = realloc(j->av, (j->ac+2)*sizeof(char*)); + if(j->av == 0){ warning("argv allocation", 0); - close(dfd); - free(buf); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; } - av[ac] = cp; + j->av[j->ac] = cp; while(*cp && !isspace(*cp)){ if(*cp++ == '"'){ while(*cp && *cp != '"') @@ -441,18 +376,18 @@ } } } - av[0] = cmd; - av[ac] = 0; + j->av[0] = cmd; + j->av[j->ac] = 0; if(!Eflag &&time(0) - dtime > giveup){ - if(returnmail(av, dp->name, "Giveup") != 0) - logit("returnmail failed", dp->name, av); + if(returnmail(j->av, dp->name, "Giveup") != 0) + logit("returnmail failed", dp->name, j->av); remmatch(dp->name); goto done; } for(i = 0; i < nbad; i++){ - if(strcmp(av[3], badsys[i]) == 0) + if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0) goto done; } @@ -460,33 +395,34 @@ * Ken's fs, for example, gives us 5 minutes of inactivity before * the lock goes stale, so we have to keep reading it. */ - l = keeplockalive(file(dp->name, 'C'), Bfildes(b)); + j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b)); /* * transfer */ - pid = fork(); - switch(pid){ + j->pid = fork(); + switch(j->pid){ case -1: - sysunlock(l); - sysunlockfile(Bfildes(b)); + sysunlock(j->l); + sysunlockfile(Bfildes(j->b)); syslog(0, runqlog, "out of procs"); exits(0); case 0: if(debug) { - fprint(2, "Starting %s", cmd); - for(ac = 0; av[ac]; ac++) - fprint(2, " %s", av[ac]); + fprint(2, "Starting %s\n", cmd); + for(i = 0; j->av[i]; i++) + fprint(2, " %s", j->av[i]); fprint(2, "\n"); } - logit("execing", dp->name, av); + logit("execing", dp->name, j->av); close(0); - dup(dfd, 0); - close(dfd); + dup(j->dfd, 0); + close(j->dfd); close(2); efd = open(file(dp->name, 'E'), OWRITE); if(efd < 0){ - if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); + if(debug) + syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); efd = create(file(dp->name, 'E'), OWRITE, 0666); if(efd < 0){ if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser()); @@ -494,50 +430,62 @@ } } seek(efd, 0, 2); - exec(cmd, av); + exec(cmd, j->av); error("can't exec %s", cmd); break; default: - for(;;){ - wm = wait(); - if(wm == nil) - error("wait failed: %r", ""); - if(wm->pid == pid) - break; - free(wm); - } - if(debug) - fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); - - if(wm->msg[0]){ - if(debug) - fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); - if(!Rflag && strstr(wm->msg, "Retry")==0){ - /* return the message and remove it */ - if(returnmail(av, dp->name, wm->msg) != 0) - logit("returnmail failed", dp->name, av); - remmatch(dp->name); - } else { - /* add sys to bad list and try again later */ - nbad++; - badsys = realloc(badsys, nbad*sizeof(char*)); - badsys[nbad-1] = strdup(av[3]); - } - } else { - /* it worked remove the message */ - remmatch(dp->name); - } - free(wm); - + return j; } done: - if (l) - sysunlock(l); - Bterm(b); - sysunlockfile(Bfildes(b)); - free(buf); - free(av); - close(dfd); + freejob(j); + return nil; +} + +Job* +donefile(Job *j, Waitmsg *wm) +{ + Job *n; + + if(debug) + fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); + + if(wm->msg[0]){ + if(debug) + fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); + if(!Rflag && strstr(wm->msg, "Retry")==0){ + /* return the message and remove it */ + if(returnmail(j->av, j->dp->name, wm->msg) != 0) + logit("returnmail failed", j->dp->name, j->av); + remmatch(j->dp->name); + } else { + /* add sys to bad list and try again later */ + nbad++; + badsys = realloc(badsys, nbad*sizeof(char*)); + badsys[nbad-1] = strdup(j->av[3]); + } + } else { + /* it worked remove the message */ + remmatch(j->dp->name); + } + n = j->next; + freejob(j); + return n; +} + +void +freejob(Job *j) +{ + if(j->b != nil){ + sysunlockfile(Bfildes(j->b)); + Bterm(j->b); + } + if(j->dfd != -1) + close(j->dfd); + if(j->l != nil) + sysunlock(j->l); + free(j->buf); + free(j->av); + free(j); } @@ -691,71 +639,3 @@ } syslog(0, runqlog, "%s", buf); } - -char *loadfile = ".runqload"; - -/* - * load balancing - */ -void -doload(int start) -{ - int fd; - char buf[32]; - int i, n; - Mlock *l; - Dir *d; - - if(load <= 0) - return; - - if(chdir(root) < 0){ - load = 0; - return; - } - - l = syslock(loadfile); - fd = open(loadfile, ORDWR); - if(fd < 0){ - fd = create(loadfile, 0666, ORDWR); - if(fd < 0){ - load = 0; - sysunlock(l); - return; - } - } - - /* get current load */ - i = 0; - n = read(fd, buf, sizeof(buf)-1); - if(n >= 0){ - buf[n] = 0; - i = atoi(buf); - } - if(i < 0) - i = 0; - - /* ignore load if file hasn't been changed in 30 minutes */ - d = dirfstat(fd); - if(d != nil){ - if(d->mtime + 30*60 < time(0)) - i = 0; - free(d); - } - - /* if load already too high, give up */ - if(start && i >= load){ - sysunlock(l); - exits(0); - } - - /* increment/decrement load */ - if(start) - i++; - else - i--; - seek(fd, 0, 0); - fprint(fd, "%d\n", i); - sysunlock(l); - close(fd); -} ^ permalink raw reply [flat|nested] 4+ messages in thread
end of thread, other threads:[~2021-01-17 1:36 UTC | newest] Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed) -- links below jump to the message on this page -- 2021-01-10 2:27 [9front] upas/runq: parallel sending ori 2021-01-10 8:04 ` hiro 2021-01-17 0:42 ` ori 2021-01-17 1:06 ` ori
This is a public inbox, see mirroring instructions for how to clone and mirror all data and code used for this inbox; as well as URLs for NNTP newsgroup(s).