From mboxrd@z Thu Jan 1 00:00:00 1970 X-Spam-Checker-Version: SpamAssassin 3.4.4 (2020-01-24) on inbox.vuxu.org X-Spam-Level: X-Spam-Status: No, score=0.0 required=5.0 tests=none autolearn=ham autolearn_force=no version=3.4.4 Received: (qmail 15284 invoked from network); 10 Jan 2021 02:57:20 -0000 Received: from 1ess.inri.net (216.126.196.35) by inbox.vuxu.org with ESMTPUTF8; 10 Jan 2021 02:57:20 -0000 Received: from mimir.eigenstate.org ([206.124.132.107]) by 1ess; Sat Jan 9 21:27:19 -0500 2021 Received: from abbatoir.fios-router.home (pool-74-101-2-6.nycmny.fios.verizon.net [74.101.2.6]) by mimir.eigenstate.org (OpenSMTPD) with ESMTPSA id c29e28a8 (TLSv1.2:ECDHE-RSA-AES256-SHA:256:NO) for <9front@9front.org>; Sat, 9 Jan 2021 18:27:07 -0800 (PST) Message-ID: <60563470E4A517681CAFF1552E11CD8E@eigenstate.org> To: 9front@9front.org Date: Sat, 09 Jan 2021 18:27:06 -0800 From: ori@eigenstate.org MIME-Version: 1.0 Content-Type: text/plain; charset="US-ASCII" Content-Transfer-Encoding: 7bit List-ID: <9front.9front.org> List-Help: X-Glyph: ➈ X-Bullshit: NoSQL out-scaling frontend Subject: [9front] upas/runq: parallel sending Reply-To: 9front@9front.org Precedence: bulk As you may have noticed, you've been getting a bunch of test messages from the 9front list. Sorry. We've been shaking out some performance issues on a new host. The mailing list was spawning a runq process for every subscriber, which were contending heavily on the queue directory, causing high system load. On the new host, disks were slow enough that this was causing issues. This was fixed by paring down to a single runq process, but that means that sending out emails for our hundreds of subscribers can take a while. This change can probably use some work, but it adds a '-p njob' flag, which allows us to send the jobs in a message queue with limited parallelism. That should bring down the amount of time it takes for a message to work its way through our mailing lists. I'd also like to float the idea of removing the '-a' flag, and reclaiming the '-n' flag for this purpose. If anyone is runnign upas/runq with -a, I propose this alternative: fn runq-a { dirs=(/mail/queue/*) for(d in $dirs) upas/runq ... $d } with -n: fn runq-a { dirs=(/mail/queue/*) for(d in $dirs) upas/runq ... $d & for(d in $dirs) wait } diff -r 808eb735fc45 sys/src/cmd/upas/q/runq.c --- a/sys/src/cmd/upas/q/runq.c Sat Jan 09 12:20:49 2021 -0800 +++ b/sys/src/cmd/upas/q/runq.c Sat Jan 09 18:15:41 2021 -0800 @@ -1,9 +1,25 @@ #include "common.h" #include +typedef struct Job Job; + +struct Job { + Job *next; + int pid; + int ac; + int dfd; + char **av; + char *buf; /* backing for av */ + Dir *dp; /* not owned */ + Mlock *l; + Biobuf *b; +}; + void doalldirs(void); void dodir(char*); -void dofile(Dir*); +Job* dofile(Dir*); +Job* donefile(Job*, Waitmsg*); +void freejob(Job*); void rundir(char*); char* file(char*, char); void warning(char*, void*); @@ -32,6 +48,7 @@ char **badsys; /* array of recalcitrant systems */ int nbad; int npid = 50; +int njob = 1; /* number of concurrent jobs to invoke */ int sflag; /* single thread per directory */ int aflag; /* all directories */ int Eflag; /* ignore E.xxxxxx dates */ @@ -40,7 +57,7 @@ void usage(void) { - fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n"); + fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] [-p njobs] q-root cmd\n"); exits(""); } @@ -86,6 +103,11 @@ if(qdir == 0) usage(); break; + case 'p': + njob = atoi(ARGF()); + if(njob == 0) + usage(); + break; case 'n': npid = atoi(ARGF()); if(npid == 0) @@ -234,8 +256,9 @@ void rundir(char *name) { - int fd; - long i; + Job *hd, *j, **p; + int nlive, fidx, fd, found; + Waitmsg *w; if(aflag && sflag) fd = sysopenlocked(".", OREAD); @@ -245,15 +268,47 @@ warning("reading %s", name); return; } + fidx= 0; + hd = nil; + nlive = 0; nfiles = dirreadall(fd, &dirbuf); - if(nfiles > 0){ - for(i=0; i 0 || fidx< nfiles){ + while(fidx< nfiles && nlive < njob){ + if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){ + fidx++; continue; - dofile(&dirbuf[i]); + } + if((j = dofile(&dirbuf[fidx])) != nil){ + nlive++; + j->next = hd; + hd = j; + } + fidx++; } - free(dirbuf); + if(nlive == 0){ + fprint(2, "nothing live\n"); + break; + } +rescan: + if((w = wait()) == nil){ + syslog(0, "runq", "wait error: %r"); + break; + } + found = 0; + for(p = &hd; *p != nil; p = &(*p)->next){ + if(w->pid == (*p)->pid){ + *p = donefile(*p, w); + found++; + nlive--; + break; + } + } + free(w); + if(!found) + goto rescan; } + assert(hd == nil); + free(dirbuf); if(aflag && sflag) sysunlockfile(fd); else @@ -316,15 +371,14 @@ /* * try a message */ -void +Job* dofile(Dir *dp) { + int dtime, efd, i, etime; + Job *j; +// Mlock *l; Dir *d; - int dfd, ac, dtime, efd, pid, i, etime; - char *buf, *cp, **av; - Waitmsg *wm; - Biobuf *b; - Mlock *l = nil; + char *cp; if(debug) fprint(2, "dofile %s\n", dp->name); @@ -337,14 +391,14 @@ if(d == nil){ syslog(0, runqlog, "no data file for %s", dp->name); remmatch(dp->name); - return; + return nil; } if(dp->length == 0){ if(time(0)-dp->mtime > 15*60){ syslog(0, runqlog, "empty ctl file for %s", dp->name); remmatch(dp->name); } - return; + return nil; } dtime = d->mtime; free(d); @@ -358,31 +412,35 @@ if(etime - dtime < 60*60){ /* up to the first hour, try every 15 minutes */ if(time(0) - etime < 15*60) - return; + return nil; } else { /* after the first hour, try once an hour */ if(time(0) - etime < 60*60) - return; + return nil; } - } /* * open control and data */ - b = sysopen(file(dp->name, 'C'), "rl", 0660); - if(b == 0) { + j = malloc(sizeof(Job)); + if(j == nil) + return nil; + memset(j, 0, sizeof(Job)); + j->dp = dp; + j->dfd = -1; + j->b = sysopen(file(dp->name, 'C'), "rl", 0660); + if(j->b == 0) { if(debug) fprint(2, "can't open %s: %r\n", file(dp->name, 'C')); - return; + return nil; } - dfd = open(file(dp->name, 'D'), OREAD); - if(dfd < 0){ + j->dfd = open(file(dp->name, 'D'), OREAD); + if(j->dfd < 0){ if(debug) fprint(2, "can't open %s: %r\n", file(dp->name, 'D')); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; + freejob(j); + return nil; } /* @@ -390,48 +448,36 @@ * - read args into (malloc'd) buffer * - malloc a vector and copy pointers to args into it */ - buf = malloc(dp->length+1); - if(buf == 0){ + + j->buf = malloc(dp->length+1); + if(j->buf == nil){ warning("buffer allocation", 0); - Bterm(b); - sysunlockfile(Bfildes(b)); - close(dfd); - return; + freejob(j); + return nil; } - if(Bread(b, buf, dp->length) != dp->length){ + if(Bread(j->b, j->buf, dp->length) != dp->length){ warning("reading control file %s\n", dp->name); - Bterm(b); - sysunlockfile(Bfildes(b)); - close(dfd); - free(buf); - return; + freejob(j); + return nil; } - buf[dp->length] = 0; - av = malloc(2*sizeof(char*)); - if(av == 0){ + j->buf[dp->length] = 0; + j->av = malloc(2*sizeof(char*)); + if(j->av == 0){ warning("argv allocation", 0); - close(dfd); - free(buf); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; + freejob(j); + return nil; } - for(ac = 1, cp = buf; *cp; ac++){ + for(j->ac = 1, cp = j->buf; *cp; j->ac++){ while(isspace(*cp)) *cp++ = 0; if(*cp == 0) break; - av = realloc(av, (ac+2)*sizeof(char*)); - if(av == 0){ + j->av = realloc(j->av, (j->ac+2)*sizeof(char*)); + if(j->av == 0){ warning("argv allocation", 0); - close(dfd); - free(buf); - Bterm(b); - sysunlockfile(Bfildes(b)); - return; } - av[ac] = cp; + j->av[j->ac] = cp; while(*cp && !isspace(*cp)){ if(*cp++ == '"'){ while(*cp && *cp != '"') @@ -441,18 +487,18 @@ } } } - av[0] = cmd; - av[ac] = 0; + j->av[0] = cmd; + j->av[j->ac] = 0; if(!Eflag &&time(0) - dtime > giveup){ - if(returnmail(av, dp->name, "Giveup") != 0) - logit("returnmail failed", dp->name, av); + if(returnmail(j->av, dp->name, "Giveup") != 0) + logit("returnmail failed", dp->name, j->av); remmatch(dp->name); goto done; } for(i = 0; i < nbad; i++){ - if(strcmp(av[3], badsys[i]) == 0) + if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0) goto done; } @@ -460,33 +506,34 @@ * Ken's fs, for example, gives us 5 minutes of inactivity before * the lock goes stale, so we have to keep reading it. */ - l = keeplockalive(file(dp->name, 'C'), Bfildes(b)); + j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b)); /* * transfer */ - pid = fork(); - switch(pid){ + j->pid = fork(); + switch(j->pid){ case -1: - sysunlock(l); - sysunlockfile(Bfildes(b)); + sysunlock(j->l); + sysunlockfile(Bfildes(j->b)); syslog(0, runqlog, "out of procs"); exits(0); case 0: if(debug) { - fprint(2, "Starting %s", cmd); - for(ac = 0; av[ac]; ac++) - fprint(2, " %s", av[ac]); - fprint(2, "\n"); + fprint(2, "Starting %s\n", cmd); +// for(i = 0; j->av[i]; i++) +// fprint(2, " %s", j->av[i]); +// fprint(2, "\n"); } - logit("execing", dp->name, av); + logit("execing", dp->name, j->av); close(0); - dup(dfd, 0); - close(dfd); + dup(j->dfd, 0); + close(j->dfd); close(2); efd = open(file(dp->name, 'E'), OWRITE); if(efd < 0){ - if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); + if(debug) + syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); efd = create(file(dp->name, 'E'), OWRITE, 0666); if(efd < 0){ if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser()); @@ -494,50 +541,62 @@ } } seek(efd, 0, 2); - exec(cmd, av); + exec(cmd, j->av); error("can't exec %s", cmd); break; default: - for(;;){ - wm = wait(); - if(wm == nil) - error("wait failed: %r", ""); - if(wm->pid == pid) - break; - free(wm); - } - if(debug) - fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); - - if(wm->msg[0]){ - if(debug) - fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); - if(!Rflag && strstr(wm->msg, "Retry")==0){ - /* return the message and remove it */ - if(returnmail(av, dp->name, wm->msg) != 0) - logit("returnmail failed", dp->name, av); - remmatch(dp->name); - } else { - /* add sys to bad list and try again later */ - nbad++; - badsys = realloc(badsys, nbad*sizeof(char*)); - badsys[nbad-1] = strdup(av[3]); - } - } else { - /* it worked remove the message */ - remmatch(dp->name); - } - free(wm); - + return j; } done: - if (l) - sysunlock(l); - Bterm(b); - sysunlockfile(Bfildes(b)); - free(buf); - free(av); - close(dfd); + freejob(j); + return nil; +} + +Job* +donefile(Job *j, Waitmsg *wm) +{ + Job *n; + + if(debug) + fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); + + if(wm->msg[0]){ + if(debug) + fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); + if(!Rflag && strstr(wm->msg, "Retry")==0){ + /* return the message and remove it */ + if(returnmail(j->av, j->dp->name, wm->msg) != 0) + logit("returnmail failed", j->dp->name, j->av); + remmatch(j->dp->name); + } else { + /* add sys to bad list and try again later */ + nbad++; + badsys = realloc(badsys, nbad*sizeof(char*)); + badsys[nbad-1] = strdup(j->av[3]); + } + } else { + /* it worked remove the message */ + remmatch(j->dp->name); + } + n = j->next; + freejob(j); + return n; +} + +void +freejob(Job *j) +{ + if(j->b != nil){ + sysunlockfile(Bfildes(j->b)); + Bterm(j->b); + } + if(j->dfd != -1) + close(j->dfd); + if(j->l != nil) + sysunlock(j->l); + free(j->buf); + free(j->av); + free(j); }