From mboxrd@z Thu Jan 1 00:00:00 1970 X-Spam-Checker-Version: SpamAssassin 3.4.4 (2020-01-24) on inbox.vuxu.org X-Spam-Level: X-Spam-Status: No, score=-0.1 required=5.0 tests=DKIM_SIGNED,DKIM_VALID, DKIM_VALID_AU,FREEMAIL_FROM autolearn=ham autolearn_force=no version=3.4.4 Received: (qmail 21866 invoked from network); 10 Jan 2021 08:36:18 -0000 Received: from 1ess.inri.net (216.126.196.35) by inbox.vuxu.org with ESMTPUTF8; 10 Jan 2021 08:36:18 -0000 Received: from mail-ed1-f52.google.com ([209.85.208.52]) by 1ess; Sun Jan 10 03:05:05 -0500 2021 Received: by mail-ed1-f52.google.com with SMTP id v26so6616626eds.13 for <9front@9front.org>; Sun, 10 Jan 2021 00:04:55 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20161025; h=mime-version:in-reply-to:references:from:date:message-id:subject:to; bh=hBfExTuRiFrD4pMmucIncZQfKPpWM3vzeF4NfTqHs8w=; b=dOAja/T3MXUvGBYX0ZunZWbIEJM6WCiKkk1+/wCTICjN1ZA4VwTnNKHrjniTRYRxwt Hhrr1p38UQXiVsrVWd78hVXgsaS4eJaxCQeWk6MQpQLlCCdkaETA/k4kzjxoNQZfU6Lr 4S2RAXPuTh1l6yOFqmsA6cwGm0IlftSr1vXs9bi7YaceTZXQow7vytsM/I8mS9f0C3t+ +NBpjHmvkEIwRcndWGXJ0rpeA2tLlKU4AwWvJphj22ZqEGGuFXaLH7qg1yrGFMDGHWr1 cbJ7k4XP7VxMRFaBmV4LxLeGIO/asQpnyoBGaHN9SN72Z4VqgfedEuUsXY1Fp2M2TJ43 ep3Q== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20161025; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to; bh=hBfExTuRiFrD4pMmucIncZQfKPpWM3vzeF4NfTqHs8w=; b=nmLaCClfMFPdfAXFJ/YU9p1UOoP+hx4qqu+2DVeRs8JcTs6AOLdZW/UfvXZQshrr6b h2QAg86WkT8ab+6Bd9q1eHKAr5QOGFbKQD3ZwOSQ1ol5iz5IcqDTC0nSZq9Xie43zxKA ROqBToYOIIg5hlByryHpcFRwCekxll2Tx8burMT+UJebN/iN25YfWWE9/pmqWECmU1SO 3+pdUGnTHj4Pg3LRDyxDWNpHbBzZamJWO/LqW3O3njK9QXW7mb2XGvbEHZwEsFzejbnv gNrQihfnw+7p/ik8jp23Lq9EiMbXogvT2dacLzWZ0YL18v5F2mfdBjK3K8ceERhcoY9y nEvA== X-Gm-Message-State: AOAM533WPfpIF/a9Jm4gKWv8jQt7+3VMwM61KcQzYfN3b8zE9t10YKYp kbAAK62SnNgKWDxZ6hi0Yf5D6tbbY8JSopS4ZjjotT+S X-Google-Smtp-Source: ABdhPJx+Ly3oxsHxAuQV3emZV4VgABoEK1VQ8oKKj1BIYvFgkw97RYqCl+zyEfO/BeGoUZe49Jn2K9ISCX6Kc+2POVo= X-Received: by 2002:aa7:c849:: with SMTP id g9mr10454227edt.48.1610265894668; Sun, 10 Jan 2021 00:04:54 -0800 (PST) MIME-Version: 1.0 Received: by 2002:a17:906:6848:0:0:0:0 with HTTP; Sun, 10 Jan 2021 00:04:54 -0800 (PST) In-Reply-To: <60563470E4A517681CAFF1552E11CD8E@eigenstate.org> References: <60563470E4A517681CAFF1552E11CD8E@eigenstate.org> From: hiro <23hiro@gmail.com> Date: Sun, 10 Jan 2021 09:04:54 +0100 Message-ID: To: 9front@9front.org Content-Type: text/plain; charset="UTF-8" List-ID: <9front.9front.org> List-Help: X-Glyph: ➈ X-Bullshit: extensible agile TOR over SQL dependency DOM out-scaling locator Subject: Re: [9front] upas/runq: parallel sending Reply-To: 9front@9front.org Precedence: bulk well it took less than 600 seconds to receive all 3 mails on the mailing list for me. but it seems i'm missing 2 mails for rather hours or days in the meantime... it feels like that is not just a delay but an error any more. On 1/10/21, ori@eigenstate.org wrote: > As you may have noticed, you've been getting a > bunch of test messages from the 9front list. Sorry. > We've been shaking out some performance issues on a > new host. > > The mailing list was spawning a runq process for every > subscriber, which were contending heavily on the queue > directory, causing high system load. On the new host, > disks were slow enough that this was causing issues. > > This was fixed by paring down to a single runq process, > but that means that sending out emails for our hundreds > of subscribers can take a while. > > This change can probably use some work, but it adds > a '-p njob' flag, which allows us to send the jobs in > a message queue with limited parallelism. That should > bring down the amount of time it takes for a message > to work its way through our mailing lists. > > I'd also like to float the idea of removing the '-a' > flag, and reclaiming the '-n' flag for this purpose. > If anyone is runnign upas/runq with -a, I propose > this alternative: > > fn runq-a { > dirs=(/mail/queue/*) > for(d in $dirs) upas/runq ... $d > } > > with -n: > > fn runq-a { > dirs=(/mail/queue/*) > for(d in $dirs) upas/runq ... $d & > for(d in $dirs) wait > } > > > diff -r 808eb735fc45 sys/src/cmd/upas/q/runq.c > --- a/sys/src/cmd/upas/q/runq.c Sat Jan 09 12:20:49 2021 -0800 > +++ b/sys/src/cmd/upas/q/runq.c Sat Jan 09 18:15:41 2021 -0800 > @@ -1,9 +1,25 @@ > #include "common.h" > #include > > +typedef struct Job Job; > + > +struct Job { > + Job *next; > + int pid; > + int ac; > + int dfd; > + char **av; > + char *buf; /* backing for av */ > + Dir *dp; /* not owned */ > + Mlock *l; > + Biobuf *b; > +}; > + > void doalldirs(void); > void dodir(char*); > -void dofile(Dir*); > +Job* dofile(Dir*); > +Job* donefile(Job*, Waitmsg*); > +void freejob(Job*); > void rundir(char*); > char* file(char*, char); > void warning(char*, void*); > @@ -32,6 +48,7 @@ > char **badsys; /* array of recalcitrant systems */ > int nbad; > int npid = 50; > +int njob = 1; /* number of concurrent jobs to invoke */ > int sflag; /* single thread per directory */ > int aflag; /* all directories */ > int Eflag; /* ignore E.xxxxxx dates */ > @@ -40,7 +57,7 @@ > void > usage(void) > { > - fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] > [-n nprocs] q-root cmd\n"); > + fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] > [-n nprocs] [-p njobs] q-root cmd\n"); > exits(""); > } > > @@ -86,6 +103,11 @@ > if(qdir == 0) > usage(); > break; > + case 'p': > + njob = atoi(ARGF()); > + if(njob == 0) > + usage(); > + break; > case 'n': > npid = atoi(ARGF()); > if(npid == 0) > @@ -234,8 +256,9 @@ > void > rundir(char *name) > { > - int fd; > - long i; > + Job *hd, *j, **p; > + int nlive, fidx, fd, found; > + Waitmsg *w; > > if(aflag && sflag) > fd = sysopenlocked(".", OREAD); > @@ -245,15 +268,47 @@ > warning("reading %s", name); > return; > } > + fidx= 0; > + hd = nil; > + nlive = 0; > nfiles = dirreadall(fd, &dirbuf); > - if(nfiles > 0){ > - for(i=0; i - if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.') > + while(nlive > 0 || fidx< nfiles){ > + while(fidx< nfiles && nlive < njob){ > + if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){ > + fidx++; > continue; > - dofile(&dirbuf[i]); > + } > + if((j = dofile(&dirbuf[fidx])) != nil){ > + nlive++; > + j->next = hd; > + hd = j; > + } > + fidx++; > } > - free(dirbuf); > + if(nlive == 0){ > + fprint(2, "nothing live\n"); > + break; > + } > +rescan: > + if((w = wait()) == nil){ > + syslog(0, "runq", "wait error: %r"); > + break; > + } > + found = 0; > + for(p = &hd; *p != nil; p = &(*p)->next){ > + if(w->pid == (*p)->pid){ > + *p = donefile(*p, w); > + found++; > + nlive--; > + break; > + } > + } > + free(w); > + if(!found) > + goto rescan; > } > + assert(hd == nil); > + free(dirbuf); > if(aflag && sflag) > sysunlockfile(fd); > else > @@ -316,15 +371,14 @@ > /* > * try a message > */ > -void > +Job* > dofile(Dir *dp) > { > + int dtime, efd, i, etime; > + Job *j; > +// Mlock *l; > Dir *d; > - int dfd, ac, dtime, efd, pid, i, etime; > - char *buf, *cp, **av; > - Waitmsg *wm; > - Biobuf *b; > - Mlock *l = nil; > + char *cp; > > if(debug) > fprint(2, "dofile %s\n", dp->name); > @@ -337,14 +391,14 @@ > if(d == nil){ > syslog(0, runqlog, "no data file for %s", dp->name); > remmatch(dp->name); > - return; > + return nil; > } > if(dp->length == 0){ > if(time(0)-dp->mtime > 15*60){ > syslog(0, runqlog, "empty ctl file for %s", dp->name); > remmatch(dp->name); > } > - return; > + return nil; > } > dtime = d->mtime; > free(d); > @@ -358,31 +412,35 @@ > if(etime - dtime < 60*60){ > /* up to the first hour, try every 15 minutes */ > if(time(0) - etime < 15*60) > - return; > + return nil; > } else { > /* after the first hour, try once an hour */ > if(time(0) - etime < 60*60) > - return; > + return nil; > } > - > } > > /* > * open control and data > */ > - b = sysopen(file(dp->name, 'C'), "rl", 0660); > - if(b == 0) { > + j = malloc(sizeof(Job)); > + if(j == nil) > + return nil; > + memset(j, 0, sizeof(Job)); > + j->dp = dp; > + j->dfd = -1; > + j->b = sysopen(file(dp->name, 'C'), "rl", 0660); > + if(j->b == 0) { > if(debug) > fprint(2, "can't open %s: %r\n", file(dp->name, 'C')); > - return; > + return nil; > } > - dfd = open(file(dp->name, 'D'), OREAD); > - if(dfd < 0){ > + j->dfd = open(file(dp->name, 'D'), OREAD); > + if(j->dfd < 0){ > if(debug) > fprint(2, "can't open %s: %r\n", file(dp->name, 'D')); > - Bterm(b); > - sysunlockfile(Bfildes(b)); > - return; > + freejob(j); > + return nil; > } > > /* > @@ -390,48 +448,36 @@ > * - read args into (malloc'd) buffer > * - malloc a vector and copy pointers to args into it > */ > - buf = malloc(dp->length+1); > - if(buf == 0){ > + > + j->buf = malloc(dp->length+1); > + if(j->buf == nil){ > warning("buffer allocation", 0); > - Bterm(b); > - sysunlockfile(Bfildes(b)); > - close(dfd); > - return; > + freejob(j); > + return nil; > } > - if(Bread(b, buf, dp->length) != dp->length){ > + if(Bread(j->b, j->buf, dp->length) != dp->length){ > warning("reading control file %s\n", dp->name); > - Bterm(b); > - sysunlockfile(Bfildes(b)); > - close(dfd); > - free(buf); > - return; > + freejob(j); > + return nil; > } > - buf[dp->length] = 0; > - av = malloc(2*sizeof(char*)); > - if(av == 0){ > + j->buf[dp->length] = 0; > + j->av = malloc(2*sizeof(char*)); > + if(j->av == 0){ > warning("argv allocation", 0); > - close(dfd); > - free(buf); > - Bterm(b); > - sysunlockfile(Bfildes(b)); > - return; > + freejob(j); > + return nil; > } > - for(ac = 1, cp = buf; *cp; ac++){ > + for(j->ac = 1, cp = j->buf; *cp; j->ac++){ > while(isspace(*cp)) > *cp++ = 0; > if(*cp == 0) > break; > > - av = realloc(av, (ac+2)*sizeof(char*)); > - if(av == 0){ > + j->av = realloc(j->av, (j->ac+2)*sizeof(char*)); > + if(j->av == 0){ > warning("argv allocation", 0); > - close(dfd); > - free(buf); > - Bterm(b); > - sysunlockfile(Bfildes(b)); > - return; > } > - av[ac] = cp; > + j->av[j->ac] = cp; > while(*cp && !isspace(*cp)){ > if(*cp++ == '"'){ > while(*cp && *cp != '"') > @@ -441,18 +487,18 @@ > } > } > } > - av[0] = cmd; > - av[ac] = 0; > + j->av[0] = cmd; > + j->av[j->ac] = 0; > > if(!Eflag &&time(0) - dtime > giveup){ > - if(returnmail(av, dp->name, "Giveup") != 0) > - logit("returnmail failed", dp->name, av); > + if(returnmail(j->av, dp->name, "Giveup") != 0) > + logit("returnmail failed", dp->name, j->av); > remmatch(dp->name); > goto done; > } > > for(i = 0; i < nbad; i++){ > - if(strcmp(av[3], badsys[i]) == 0) > + if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0) > goto done; > } > > @@ -460,33 +506,34 @@ > * Ken's fs, for example, gives us 5 minutes of inactivity before > * the lock goes stale, so we have to keep reading it. > */ > - l = keeplockalive(file(dp->name, 'C'), Bfildes(b)); > + j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b)); > > /* > * transfer > */ > - pid = fork(); > - switch(pid){ > + j->pid = fork(); > + switch(j->pid){ > case -1: > - sysunlock(l); > - sysunlockfile(Bfildes(b)); > + sysunlock(j->l); > + sysunlockfile(Bfildes(j->b)); > syslog(0, runqlog, "out of procs"); > exits(0); > case 0: > if(debug) { > - fprint(2, "Starting %s", cmd); > - for(ac = 0; av[ac]; ac++) > - fprint(2, " %s", av[ac]); > - fprint(2, "\n"); > + fprint(2, "Starting %s\n", cmd); > +// for(i = 0; j->av[i]; i++) > +// fprint(2, " %s", j->av[i]); > +// fprint(2, "\n"); > } > - logit("execing", dp->name, av); > + logit("execing", dp->name, j->av); > close(0); > - dup(dfd, 0); > - close(dfd); > + dup(j->dfd, 0); > + close(j->dfd); > close(2); > efd = open(file(dp->name, 'E'), OWRITE); > if(efd < 0){ > - if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), > getuser()); > + if(debug) > + syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser()); > efd = create(file(dp->name, 'E'), OWRITE, 0666); > if(efd < 0){ > if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), > getuser()); > @@ -494,50 +541,62 @@ > } > } > seek(efd, 0, 2); > - exec(cmd, av); > + exec(cmd, j->av); > error("can't exec %s", cmd); > break; > default: > - for(;;){ > - wm = wait(); > - if(wm == nil) > - error("wait failed: %r", ""); > - if(wm->pid == pid) > - break; > - free(wm); > - } > - if(debug) > - fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); > - > - if(wm->msg[0]){ > - if(debug) > - fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); > - if(!Rflag && strstr(wm->msg, "Retry")==0){ > - /* return the message and remove it */ > - if(returnmail(av, dp->name, wm->msg) != 0) > - logit("returnmail failed", dp->name, av); > - remmatch(dp->name); > - } else { > - /* add sys to bad list and try again later */ > - nbad++; > - badsys = realloc(badsys, nbad*sizeof(char*)); > - badsys[nbad-1] = strdup(av[3]); > - } > - } else { > - /* it worked remove the message */ > - remmatch(dp->name); > - } > - free(wm); > - > + return j; > } > done: > - if (l) > - sysunlock(l); > - Bterm(b); > - sysunlockfile(Bfildes(b)); > - free(buf); > - free(av); > - close(dfd); > + freejob(j); > + return nil; > +} > + > +Job* > +donefile(Job *j, Waitmsg *wm) > +{ > + Job *n; > + > + if(debug) > + fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg); > + > + if(wm->msg[0]){ > + if(debug) > + fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg); > + if(!Rflag && strstr(wm->msg, "Retry")==0){ > + /* return the message and remove it */ > + if(returnmail(j->av, j->dp->name, wm->msg) != 0) > + logit("returnmail failed", j->dp->name, j->av); > + remmatch(j->dp->name); > + } else { > + /* add sys to bad list and try again later */ > + nbad++; > + badsys = realloc(badsys, nbad*sizeof(char*)); > + badsys[nbad-1] = strdup(j->av[3]); > + } > + } else { > + /* it worked remove the message */ > + remmatch(j->dp->name); > + } > + n = j->next; > + freejob(j); > + return n; > +} > + > +void > +freejob(Job *j) > +{ > + if(j->b != nil){ > + sysunlockfile(Bfildes(j->b)); > + Bterm(j->b); > + } > + if(j->dfd != -1) > + close(j->dfd); > + if(j->l != nil) > + sysunlock(j->l); > + free(j->buf); > + free(j->av); > + free(j); > } > > >