From: hiro <23hiro@gmail.com>
To: 9front@9front.org
Subject: Re: [9front] upas/runq: parallel sending
Date: Sun, 10 Jan 2021 09:04:54 +0100 [thread overview]
Message-ID: <CAFSF3XPJmKOjnfWnSXbWLw+WqVwKoZdbFtbOOCEVqaq+XcpF3Q@mail.gmail.com> (raw)
In-Reply-To: <60563470E4A517681CAFF1552E11CD8E@eigenstate.org>
well it took less than 600 seconds to receive all 3 mails on the
mailing list for me.
but it seems i'm missing 2 mails for rather hours or days in the meantime...
it feels like that is not just a delay but an error any more.
On 1/10/21, ori@eigenstate.org <ori@eigenstate.org> wrote:
> As you may have noticed, you've been getting a
> bunch of test messages from the 9front list. Sorry.
> We've been shaking out some performance issues on a
> new host.
>
> The mailing list was spawning a runq process for every
> subscriber, which were contending heavily on the queue
> directory, causing high system load. On the new host,
> disks were slow enough that this was causing issues.
>
> This was fixed by paring down to a single runq process,
> but that means that sending out emails for our hundreds
> of subscribers can take a while.
>
> This change can probably use some work, but it adds
> a '-p njob' flag, which allows us to send the jobs in
> a message queue with limited parallelism. That should
> bring down the amount of time it takes for a message
> to work its way through our mailing lists.
>
> I'd also like to float the idea of removing the '-a'
> flag, and reclaiming the '-n' flag for this purpose.
> If anyone is runnign upas/runq with -a, I propose
> this alternative:
>
> fn runq-a {
> dirs=(/mail/queue/*)
> for(d in $dirs) upas/runq ... $d
> }
>
> with -n:
>
> fn runq-a {
> dirs=(/mail/queue/*)
> for(d in $dirs) upas/runq ... $d &
> for(d in $dirs) wait
> }
>
>
> diff -r 808eb735fc45 sys/src/cmd/upas/q/runq.c
> --- a/sys/src/cmd/upas/q/runq.c Sat Jan 09 12:20:49 2021 -0800
> +++ b/sys/src/cmd/upas/q/runq.c Sat Jan 09 18:15:41 2021 -0800
> @@ -1,9 +1,25 @@
> #include "common.h"
> #include <ctype.h>
>
> +typedef struct Job Job;
> +
> +struct Job {
> + Job *next;
> + int pid;
> + int ac;
> + int dfd;
> + char **av;
> + char *buf; /* backing for av */
> + Dir *dp; /* not owned */
> + Mlock *l;
> + Biobuf *b;
> +};
> +
> void doalldirs(void);
> void dodir(char*);
> -void dofile(Dir*);
> +Job* dofile(Dir*);
> +Job* donefile(Job*, Waitmsg*);
> +void freejob(Job*);
> void rundir(char*);
> char* file(char*, char);
> void warning(char*, void*);
> @@ -32,6 +48,7 @@
> char **badsys; /* array of recalcitrant systems */
> int nbad;
> int npid = 50;
> +int njob = 1; /* number of concurrent jobs to invoke */
> int sflag; /* single thread per directory */
> int aflag; /* all directories */
> int Eflag; /* ignore E.xxxxxx dates */
> @@ -40,7 +57,7 @@
> void
> usage(void)
> {
> - fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles]
> [-n nprocs] q-root cmd\n");
> + fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles]
> [-n nprocs] [-p njobs] q-root cmd\n");
> exits("");
> }
>
> @@ -86,6 +103,11 @@
> if(qdir == 0)
> usage();
> break;
> + case 'p':
> + njob = atoi(ARGF());
> + if(njob == 0)
> + usage();
> + break;
> case 'n':
> npid = atoi(ARGF());
> if(npid == 0)
> @@ -234,8 +256,9 @@
> void
> rundir(char *name)
> {
> - int fd;
> - long i;
> + Job *hd, *j, **p;
> + int nlive, fidx, fd, found;
> + Waitmsg *w;
>
> if(aflag && sflag)
> fd = sysopenlocked(".", OREAD);
> @@ -245,15 +268,47 @@
> warning("reading %s", name);
> return;
> }
> + fidx= 0;
> + hd = nil;
> + nlive = 0;
> nfiles = dirreadall(fd, &dirbuf);
> - if(nfiles > 0){
> - for(i=0; i<nfiles; i++){
> - if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
> + while(nlive > 0 || fidx< nfiles){
> + while(fidx< nfiles && nlive < njob){
> + if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
> + fidx++;
> continue;
> - dofile(&dirbuf[i]);
> + }
> + if((j = dofile(&dirbuf[fidx])) != nil){
> + nlive++;
> + j->next = hd;
> + hd = j;
> + }
> + fidx++;
> }
> - free(dirbuf);
> + if(nlive == 0){
> + fprint(2, "nothing live\n");
> + break;
> + }
> +rescan:
> + if((w = wait()) == nil){
> + syslog(0, "runq", "wait error: %r");
> + break;
> + }
> + found = 0;
> + for(p = &hd; *p != nil; p = &(*p)->next){
> + if(w->pid == (*p)->pid){
> + *p = donefile(*p, w);
> + found++;
> + nlive--;
> + break;
> + }
> + }
> + free(w);
> + if(!found)
> + goto rescan;
> }
> + assert(hd == nil);
> + free(dirbuf);
> if(aflag && sflag)
> sysunlockfile(fd);
> else
> @@ -316,15 +371,14 @@
> /*
> * try a message
> */
> -void
> +Job*
> dofile(Dir *dp)
> {
> + int dtime, efd, i, etime;
> + Job *j;
> +// Mlock *l;
> Dir *d;
> - int dfd, ac, dtime, efd, pid, i, etime;
> - char *buf, *cp, **av;
> - Waitmsg *wm;
> - Biobuf *b;
> - Mlock *l = nil;
> + char *cp;
>
> if(debug)
> fprint(2, "dofile %s\n", dp->name);
> @@ -337,14 +391,14 @@
> if(d == nil){
> syslog(0, runqlog, "no data file for %s", dp->name);
> remmatch(dp->name);
> - return;
> + return nil;
> }
> if(dp->length == 0){
> if(time(0)-dp->mtime > 15*60){
> syslog(0, runqlog, "empty ctl file for %s", dp->name);
> remmatch(dp->name);
> }
> - return;
> + return nil;
> }
> dtime = d->mtime;
> free(d);
> @@ -358,31 +412,35 @@
> if(etime - dtime < 60*60){
> /* up to the first hour, try every 15 minutes */
> if(time(0) - etime < 15*60)
> - return;
> + return nil;
> } else {
> /* after the first hour, try once an hour */
> if(time(0) - etime < 60*60)
> - return;
> + return nil;
> }
> -
> }
>
> /*
> * open control and data
> */
> - b = sysopen(file(dp->name, 'C'), "rl", 0660);
> - if(b == 0) {
> + j = malloc(sizeof(Job));
> + if(j == nil)
> + return nil;
> + memset(j, 0, sizeof(Job));
> + j->dp = dp;
> + j->dfd = -1;
> + j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
> + if(j->b == 0) {
> if(debug)
> fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
> - return;
> + return nil;
> }
> - dfd = open(file(dp->name, 'D'), OREAD);
> - if(dfd < 0){
> + j->dfd = open(file(dp->name, 'D'), OREAD);
> + if(j->dfd < 0){
> if(debug)
> fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
> - Bterm(b);
> - sysunlockfile(Bfildes(b));
> - return;
> + freejob(j);
> + return nil;
> }
>
> /*
> @@ -390,48 +448,36 @@
> * - read args into (malloc'd) buffer
> * - malloc a vector and copy pointers to args into it
> */
> - buf = malloc(dp->length+1);
> - if(buf == 0){
> +
> + j->buf = malloc(dp->length+1);
> + if(j->buf == nil){
> warning("buffer allocation", 0);
> - Bterm(b);
> - sysunlockfile(Bfildes(b));
> - close(dfd);
> - return;
> + freejob(j);
> + return nil;
> }
> - if(Bread(b, buf, dp->length) != dp->length){
> + if(Bread(j->b, j->buf, dp->length) != dp->length){
> warning("reading control file %s\n", dp->name);
> - Bterm(b);
> - sysunlockfile(Bfildes(b));
> - close(dfd);
> - free(buf);
> - return;
> + freejob(j);
> + return nil;
> }
> - buf[dp->length] = 0;
> - av = malloc(2*sizeof(char*));
> - if(av == 0){
> + j->buf[dp->length] = 0;
> + j->av = malloc(2*sizeof(char*));
> + if(j->av == 0){
> warning("argv allocation", 0);
> - close(dfd);
> - free(buf);
> - Bterm(b);
> - sysunlockfile(Bfildes(b));
> - return;
> + freejob(j);
> + return nil;
> }
> - for(ac = 1, cp = buf; *cp; ac++){
> + for(j->ac = 1, cp = j->buf; *cp; j->ac++){
> while(isspace(*cp))
> *cp++ = 0;
> if(*cp == 0)
> break;
>
> - av = realloc(av, (ac+2)*sizeof(char*));
> - if(av == 0){
> + j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
> + if(j->av == 0){
> warning("argv allocation", 0);
> - close(dfd);
> - free(buf);
> - Bterm(b);
> - sysunlockfile(Bfildes(b));
> - return;
> }
> - av[ac] = cp;
> + j->av[j->ac] = cp;
> while(*cp && !isspace(*cp)){
> if(*cp++ == '"'){
> while(*cp && *cp != '"')
> @@ -441,18 +487,18 @@
> }
> }
> }
> - av[0] = cmd;
> - av[ac] = 0;
> + j->av[0] = cmd;
> + j->av[j->ac] = 0;
>
> if(!Eflag &&time(0) - dtime > giveup){
> - if(returnmail(av, dp->name, "Giveup") != 0)
> - logit("returnmail failed", dp->name, av);
> + if(returnmail(j->av, dp->name, "Giveup") != 0)
> + logit("returnmail failed", dp->name, j->av);
> remmatch(dp->name);
> goto done;
> }
>
> for(i = 0; i < nbad; i++){
> - if(strcmp(av[3], badsys[i]) == 0)
> + if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
> goto done;
> }
>
> @@ -460,33 +506,34 @@
> * Ken's fs, for example, gives us 5 minutes of inactivity before
> * the lock goes stale, so we have to keep reading it.
> */
> - l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
> + j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
>
> /*
> * transfer
> */
> - pid = fork();
> - switch(pid){
> + j->pid = fork();
> + switch(j->pid){
> case -1:
> - sysunlock(l);
> - sysunlockfile(Bfildes(b));
> + sysunlock(j->l);
> + sysunlockfile(Bfildes(j->b));
> syslog(0, runqlog, "out of procs");
> exits(0);
> case 0:
> if(debug) {
> - fprint(2, "Starting %s", cmd);
> - for(ac = 0; av[ac]; ac++)
> - fprint(2, " %s", av[ac]);
> - fprint(2, "\n");
> + fprint(2, "Starting %s\n", cmd);
> +// for(i = 0; j->av[i]; i++)
> +// fprint(2, " %s", j->av[i]);
> +// fprint(2, "\n");
> }
> - logit("execing", dp->name, av);
> + logit("execing", dp->name, j->av);
> close(0);
> - dup(dfd, 0);
> - close(dfd);
> + dup(j->dfd, 0);
> + close(j->dfd);
> close(2);
> efd = open(file(dp->name, 'E'), OWRITE);
> if(efd < 0){
> - if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'),
> getuser());
> + if(debug)
> + syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
> efd = create(file(dp->name, 'E'), OWRITE, 0666);
> if(efd < 0){
> if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'),
> getuser());
> @@ -494,50 +541,62 @@
> }
> }
> seek(efd, 0, 2);
> - exec(cmd, av);
> + exec(cmd, j->av);
> error("can't exec %s", cmd);
> break;
> default:
> - for(;;){
> - wm = wait();
> - if(wm == nil)
> - error("wait failed: %r", "");
> - if(wm->pid == pid)
> - break;
> - free(wm);
> - }
> - if(debug)
> - fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
> -
> - if(wm->msg[0]){
> - if(debug)
> - fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
> - if(!Rflag && strstr(wm->msg, "Retry")==0){
> - /* return the message and remove it */
> - if(returnmail(av, dp->name, wm->msg) != 0)
> - logit("returnmail failed", dp->name, av);
> - remmatch(dp->name);
> - } else {
> - /* add sys to bad list and try again later */
> - nbad++;
> - badsys = realloc(badsys, nbad*sizeof(char*));
> - badsys[nbad-1] = strdup(av[3]);
> - }
> - } else {
> - /* it worked remove the message */
> - remmatch(dp->name);
> - }
> - free(wm);
> -
> + return j;
> }
> done:
> - if (l)
> - sysunlock(l);
> - Bterm(b);
> - sysunlockfile(Bfildes(b));
> - free(buf);
> - free(av);
> - close(dfd);
> + freejob(j);
> + return nil;
> +}
> +
> +Job*
> +donefile(Job *j, Waitmsg *wm)
> +{
> + Job *n;
> +
> + if(debug)
> + fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
> +
> + if(wm->msg[0]){
> + if(debug)
> + fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
> + if(!Rflag && strstr(wm->msg, "Retry")==0){
> + /* return the message and remove it */
> + if(returnmail(j->av, j->dp->name, wm->msg) != 0)
> + logit("returnmail failed", j->dp->name, j->av);
> + remmatch(j->dp->name);
> + } else {
> + /* add sys to bad list and try again later */
> + nbad++;
> + badsys = realloc(badsys, nbad*sizeof(char*));
> + badsys[nbad-1] = strdup(j->av[3]);
> + }
> + } else {
> + /* it worked remove the message */
> + remmatch(j->dp->name);
> + }
> + n = j->next;
> + freejob(j);
> + return n;
> +}
> +
> +void
> +freejob(Job *j)
> +{
> + if(j->b != nil){
> + sysunlockfile(Bfildes(j->b));
> + Bterm(j->b);
> + }
> + if(j->dfd != -1)
> + close(j->dfd);
> + if(j->l != nil)
> + sysunlock(j->l);
> + free(j->buf);
> + free(j->av);
> + free(j);
> }
>
>
>
next prev parent reply other threads:[~2021-01-10 8:36 UTC|newest]
Thread overview: 4+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-01-10 2:27 ori
2021-01-10 8:04 ` hiro [this message]
2021-01-17 0:42 ` ori
2021-01-17 1:06 ` ori
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=CAFSF3XPJmKOjnfWnSXbWLw+WqVwKoZdbFtbOOCEVqaq+XcpF3Q@mail.gmail.com \
--to=23hiro@gmail.com \
--cc=9front@9front.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).