From: ori@eigenstate.org
To: 9front@9front.org
Subject: [9front] upas/runq: parallel sending
Date: Sat, 09 Jan 2021 18:27:06 -0800 [thread overview]
Message-ID: <60563470E4A517681CAFF1552E11CD8E@eigenstate.org> (raw)
As you may have noticed, you've been getting a
bunch of test messages from the 9front list. Sorry.
We've been shaking out some performance issues on a
new host.
The mailing list was spawning a runq process for every
subscriber, which were contending heavily on the queue
directory, causing high system load. On the new host,
disks were slow enough that this was causing issues.
This was fixed by paring down to a single runq process,
but that means that sending out emails for our hundreds
of subscribers can take a while.
This change can probably use some work, but it adds
a '-p njob' flag, which allows us to send the jobs in
a message queue with limited parallelism. That should
bring down the amount of time it takes for a message
to work its way through our mailing lists.
I'd also like to float the idea of removing the '-a'
flag, and reclaiming the '-n' flag for this purpose.
If anyone is runnign upas/runq with -a, I propose
this alternative:
fn runq-a {
dirs=(/mail/queue/*)
for(d in $dirs) upas/runq ... $d
}
with -n:
fn runq-a {
dirs=(/mail/queue/*)
for(d in $dirs) upas/runq ... $d &
for(d in $dirs) wait
}
diff -r 808eb735fc45 sys/src/cmd/upas/q/runq.c
--- a/sys/src/cmd/upas/q/runq.c Sat Jan 09 12:20:49 2021 -0800
+++ b/sys/src/cmd/upas/q/runq.c Sat Jan 09 18:15:41 2021 -0800
@@ -1,9 +1,25 @@
#include "common.h"
#include <ctype.h>
+typedef struct Job Job;
+
+struct Job {
+ Job *next;
+ int pid;
+ int ac;
+ int dfd;
+ char **av;
+ char *buf; /* backing for av */
+ Dir *dp; /* not owned */
+ Mlock *l;
+ Biobuf *b;
+};
+
void doalldirs(void);
void dodir(char*);
-void dofile(Dir*);
+Job* dofile(Dir*);
+Job* donefile(Job*, Waitmsg*);
+void freejob(Job*);
void rundir(char*);
char* file(char*, char);
void warning(char*, void*);
@@ -32,6 +48,7 @@
char **badsys; /* array of recalcitrant systems */
int nbad;
int npid = 50;
+int njob = 1; /* number of concurrent jobs to invoke */
int sflag; /* single thread per directory */
int aflag; /* all directories */
int Eflag; /* ignore E.xxxxxx dates */
@@ -40,7 +57,7 @@
void
usage(void)
{
- fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
+ fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] [-p njobs] q-root cmd\n");
exits("");
}
@@ -86,6 +103,11 @@
if(qdir == 0)
usage();
break;
+ case 'p':
+ njob = atoi(ARGF());
+ if(njob == 0)
+ usage();
+ break;
case 'n':
npid = atoi(ARGF());
if(npid == 0)
@@ -234,8 +256,9 @@
void
rundir(char *name)
{
- int fd;
- long i;
+ Job *hd, *j, **p;
+ int nlive, fidx, fd, found;
+ Waitmsg *w;
if(aflag && sflag)
fd = sysopenlocked(".", OREAD);
@@ -245,15 +268,47 @@
warning("reading %s", name);
return;
}
+ fidx= 0;
+ hd = nil;
+ nlive = 0;
nfiles = dirreadall(fd, &dirbuf);
- if(nfiles > 0){
- for(i=0; i<nfiles; i++){
- if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
+ while(nlive > 0 || fidx< nfiles){
+ while(fidx< nfiles && nlive < njob){
+ if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
+ fidx++;
continue;
- dofile(&dirbuf[i]);
+ }
+ if((j = dofile(&dirbuf[fidx])) != nil){
+ nlive++;
+ j->next = hd;
+ hd = j;
+ }
+ fidx++;
}
- free(dirbuf);
+ if(nlive == 0){
+ fprint(2, "nothing live\n");
+ break;
+ }
+rescan:
+ if((w = wait()) == nil){
+ syslog(0, "runq", "wait error: %r");
+ break;
+ }
+ found = 0;
+ for(p = &hd; *p != nil; p = &(*p)->next){
+ if(w->pid == (*p)->pid){
+ *p = donefile(*p, w);
+ found++;
+ nlive--;
+ break;
+ }
+ }
+ free(w);
+ if(!found)
+ goto rescan;
}
+ assert(hd == nil);
+ free(dirbuf);
if(aflag && sflag)
sysunlockfile(fd);
else
@@ -316,15 +371,14 @@
/*
* try a message
*/
-void
+Job*
dofile(Dir *dp)
{
+ int dtime, efd, i, etime;
+ Job *j;
+// Mlock *l;
Dir *d;
- int dfd, ac, dtime, efd, pid, i, etime;
- char *buf, *cp, **av;
- Waitmsg *wm;
- Biobuf *b;
- Mlock *l = nil;
+ char *cp;
if(debug)
fprint(2, "dofile %s\n", dp->name);
@@ -337,14 +391,14 @@
if(d == nil){
syslog(0, runqlog, "no data file for %s", dp->name);
remmatch(dp->name);
- return;
+ return nil;
}
if(dp->length == 0){
if(time(0)-dp->mtime > 15*60){
syslog(0, runqlog, "empty ctl file for %s", dp->name);
remmatch(dp->name);
}
- return;
+ return nil;
}
dtime = d->mtime;
free(d);
@@ -358,31 +412,35 @@
if(etime - dtime < 60*60){
/* up to the first hour, try every 15 minutes */
if(time(0) - etime < 15*60)
- return;
+ return nil;
} else {
/* after the first hour, try once an hour */
if(time(0) - etime < 60*60)
- return;
+ return nil;
}
-
}
/*
* open control and data
*/
- b = sysopen(file(dp->name, 'C'), "rl", 0660);
- if(b == 0) {
+ j = malloc(sizeof(Job));
+ if(j == nil)
+ return nil;
+ memset(j, 0, sizeof(Job));
+ j->dp = dp;
+ j->dfd = -1;
+ j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
+ if(j->b == 0) {
if(debug)
fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
- return;
+ return nil;
}
- dfd = open(file(dp->name, 'D'), OREAD);
- if(dfd < 0){
+ j->dfd = open(file(dp->name, 'D'), OREAD);
+ if(j->dfd < 0){
if(debug)
fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
- Bterm(b);
- sysunlockfile(Bfildes(b));
- return;
+ freejob(j);
+ return nil;
}
/*
@@ -390,48 +448,36 @@
* - read args into (malloc'd) buffer
* - malloc a vector and copy pointers to args into it
*/
- buf = malloc(dp->length+1);
- if(buf == 0){
+
+ j->buf = malloc(dp->length+1);
+ if(j->buf == nil){
warning("buffer allocation", 0);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- close(dfd);
- return;
+ freejob(j);
+ return nil;
}
- if(Bread(b, buf, dp->length) != dp->length){
+ if(Bread(j->b, j->buf, dp->length) != dp->length){
warning("reading control file %s\n", dp->name);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- close(dfd);
- free(buf);
- return;
+ freejob(j);
+ return nil;
}
- buf[dp->length] = 0;
- av = malloc(2*sizeof(char*));
- if(av == 0){
+ j->buf[dp->length] = 0;
+ j->av = malloc(2*sizeof(char*));
+ if(j->av == 0){
warning("argv allocation", 0);
- close(dfd);
- free(buf);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- return;
+ freejob(j);
+ return nil;
}
- for(ac = 1, cp = buf; *cp; ac++){
+ for(j->ac = 1, cp = j->buf; *cp; j->ac++){
while(isspace(*cp))
*cp++ = 0;
if(*cp == 0)
break;
- av = realloc(av, (ac+2)*sizeof(char*));
- if(av == 0){
+ j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
+ if(j->av == 0){
warning("argv allocation", 0);
- close(dfd);
- free(buf);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- return;
}
- av[ac] = cp;
+ j->av[j->ac] = cp;
while(*cp && !isspace(*cp)){
if(*cp++ == '"'){
while(*cp && *cp != '"')
@@ -441,18 +487,18 @@
}
}
}
- av[0] = cmd;
- av[ac] = 0;
+ j->av[0] = cmd;
+ j->av[j->ac] = 0;
if(!Eflag &&time(0) - dtime > giveup){
- if(returnmail(av, dp->name, "Giveup") != 0)
- logit("returnmail failed", dp->name, av);
+ if(returnmail(j->av, dp->name, "Giveup") != 0)
+ logit("returnmail failed", dp->name, j->av);
remmatch(dp->name);
goto done;
}
for(i = 0; i < nbad; i++){
- if(strcmp(av[3], badsys[i]) == 0)
+ if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
goto done;
}
@@ -460,33 +506,34 @@
* Ken's fs, for example, gives us 5 minutes of inactivity before
* the lock goes stale, so we have to keep reading it.
*/
- l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
+ j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
/*
* transfer
*/
- pid = fork();
- switch(pid){
+ j->pid = fork();
+ switch(j->pid){
case -1:
- sysunlock(l);
- sysunlockfile(Bfildes(b));
+ sysunlock(j->l);
+ sysunlockfile(Bfildes(j->b));
syslog(0, runqlog, "out of procs");
exits(0);
case 0:
if(debug) {
- fprint(2, "Starting %s", cmd);
- for(ac = 0; av[ac]; ac++)
- fprint(2, " %s", av[ac]);
- fprint(2, "\n");
+ fprint(2, "Starting %s\n", cmd);
+// for(i = 0; j->av[i]; i++)
+// fprint(2, " %s", j->av[i]);
+// fprint(2, "\n");
}
- logit("execing", dp->name, av);
+ logit("execing", dp->name, j->av);
close(0);
- dup(dfd, 0);
- close(dfd);
+ dup(j->dfd, 0);
+ close(j->dfd);
close(2);
efd = open(file(dp->name, 'E'), OWRITE);
if(efd < 0){
- if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
+ if(debug)
+ syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
efd = create(file(dp->name, 'E'), OWRITE, 0666);
if(efd < 0){
if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
@@ -494,50 +541,62 @@
}
}
seek(efd, 0, 2);
- exec(cmd, av);
+ exec(cmd, j->av);
error("can't exec %s", cmd);
break;
default:
- for(;;){
- wm = wait();
- if(wm == nil)
- error("wait failed: %r", "");
- if(wm->pid == pid)
- break;
- free(wm);
- }
- if(debug)
- fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
-
- if(wm->msg[0]){
- if(debug)
- fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
- if(!Rflag && strstr(wm->msg, "Retry")==0){
- /* return the message and remove it */
- if(returnmail(av, dp->name, wm->msg) != 0)
- logit("returnmail failed", dp->name, av);
- remmatch(dp->name);
- } else {
- /* add sys to bad list and try again later */
- nbad++;
- badsys = realloc(badsys, nbad*sizeof(char*));
- badsys[nbad-1] = strdup(av[3]);
- }
- } else {
- /* it worked remove the message */
- remmatch(dp->name);
- }
- free(wm);
-
+ return j;
}
done:
- if (l)
- sysunlock(l);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- free(buf);
- free(av);
- close(dfd);
+ freejob(j);
+ return nil;
+}
+
+Job*
+donefile(Job *j, Waitmsg *wm)
+{
+ Job *n;
+
+ if(debug)
+ fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
+
+ if(wm->msg[0]){
+ if(debug)
+ fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
+ if(!Rflag && strstr(wm->msg, "Retry")==0){
+ /* return the message and remove it */
+ if(returnmail(j->av, j->dp->name, wm->msg) != 0)
+ logit("returnmail failed", j->dp->name, j->av);
+ remmatch(j->dp->name);
+ } else {
+ /* add sys to bad list and try again later */
+ nbad++;
+ badsys = realloc(badsys, nbad*sizeof(char*));
+ badsys[nbad-1] = strdup(j->av[3]);
+ }
+ } else {
+ /* it worked remove the message */
+ remmatch(j->dp->name);
+ }
+ n = j->next;
+ freejob(j);
+ return n;
+}
+
+void
+freejob(Job *j)
+{
+ if(j->b != nil){
+ sysunlockfile(Bfildes(j->b));
+ Bterm(j->b);
+ }
+ if(j->dfd != -1)
+ close(j->dfd);
+ if(j->l != nil)
+ sysunlock(j->l);
+ free(j->buf);
+ free(j->av);
+ free(j);
}
next reply other threads:[~2021-01-10 2:57 UTC|newest]
Thread overview: 4+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-01-10 2:27 ori [this message]
2021-01-10 8:04 ` hiro
2021-01-17 0:42 ` ori
2021-01-17 1:06 ` ori
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=60563470E4A517681CAFF1552E11CD8E@eigenstate.org \
--to=ori@eigenstate.org \
--cc=9front@9front.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).