From: ori@eigenstate.org
To: 9front@9front.org
Subject: Re: [9front] upas/runq: parallel sending
Date: Sat, 16 Jan 2021 16:42:42 -0800 [thread overview]
Message-ID: <59390BC4E9DA865564F648841C225EB9@eigenstate.org> (raw)
In-Reply-To: <60563470E4A517681CAFF1552E11CD8E@eigenstate.org>
I've been running the previous patch for the
last week without problems: has anyone else
had a chance to test that it doesn't break
their systems?
Quoth ori@eigenstate.org:
> I'd also like to float the idea of removing the '-a'
> flag, and reclaiming the '-n' flag for this purpose.
> If anyone is runnign upas/runq with -a, I propose
> this alternative:
>
Here's a more agressive version that removes
the multi-user queue sweeping code. I doubt
that anyone is using it. If they are, I'd be
interested in knowing if there's an issue with
migrating to xargs or a shell loop that runs
each job in parallel by directory.
If there are no objections, I'm going to let
this version bake for a week on my own system,
and then commit next weekend.
diff -r 24a38dd884b0 sys/src/cmd/upas/q/runq.c
--- a/sys/src/cmd/upas/q/runq.c Sat Jan 16 16:17:27 2021 -0800
+++ b/sys/src/cmd/upas/q/runq.c Sat Jan 16 16:40:41 2021 -0800
@@ -1,9 +1,25 @@
#include "common.h"
#include <ctype.h>
+typedef struct Job Job;
+
+struct Job {
+ Job *next;
+ int pid;
+ int ac;
+ int dfd;
+ char **av;
+ char *buf; /* backing for av */
+ Dir *dp; /* not owned */
+ Mlock *l;
+ Biobuf *b;
+};
+
void doalldirs(void);
void dodir(char*);
-void dofile(Dir*);
+Job* dofile(Dir*);
+Job* donefile(Job*, Waitmsg*);
+void freejob(Job*);
void rundir(char*);
char* file(char*, char);
void warning(char*, void*);
@@ -17,7 +33,6 @@
char *root;
int debug;
int giveup = 2*24*60*60;
-int load;
int limit;
/* the current directory */
@@ -28,67 +43,48 @@
char *runqlog = "runq";
-int *pidlist;
char **badsys; /* array of recalcitrant systems */
int nbad;
-int npid = 50;
-int sflag; /* single thread per directory */
-int aflag; /* all directories */
+int njob = 1; /* number of concurrent jobs to invoke */
int Eflag; /* ignore E.xxxxxx dates */
int Rflag; /* no giving up, ever */
void
usage(void)
{
- fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
+ fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] [-p njobs] q-root cmd\n");
exits("");
}
void
main(int argc, char **argv)
{
- char *qdir, *x;
+ char *qdir;
qdir = 0;
ARGBEGIN{
- case 'l':
- x = ARGF();
- if(x == 0)
- usage();
- load = atoi(x);
- if(load < 0)
- load = 0;
- break;
case 'E':
Eflag++;
break;
case 'R': /* no giving up -- just leave stuff in the queue */
Rflag++;
break;
- case 'a':
- aflag++;
- break;
case 'd':
debug++;
break;
case 'r':
- limit = atoi(ARGF());
- break;
- case 's':
- sflag++;
+ limit = atoi(EARGF(usage()));
break;
case 't':
- giveup = 60*60*atoi(ARGF());
+ giveup = 60*60*atoi(EARGF(usage()));
break;
case 'q':
- qdir = ARGF();
- if(qdir == 0)
- usage();
+ qdir = EARGF(usage());
break;
case 'n':
- npid = atoi(ARGF());
- if(npid == 0)
+ njob = atoi(EARGF(usage()));
+ if(njob == 0)
usage();
break;
}ARGEND;
@@ -96,27 +92,17 @@
if(argc != 2)
usage();
- pidlist = malloc(npid*sizeof(*pidlist));
- if(pidlist == 0)
- error("can't malloc", 0);
-
- if(aflag == 0 && qdir == 0) {
+ if(qdir == nil)
qdir = getuser();
- if(qdir == 0)
- error("unknown user", 0);
- }
+ if(qdir == nil)
+ error("unknown user", 0);
root = argv[0];
cmd = argv[1];
if(chdir(root) < 0)
error("can't cd to %s", root);
- doload(1);
- if(aflag)
- doalldirs();
- else
- dodir(qdir);
- doload(0);
+ dodir(qdir);
exits(0);
}
@@ -142,74 +128,6 @@
return 0;
}
-int
-forkltd(void)
-{
- int i;
- int pid;
-
- for(i = 0; i < npid; i++){
- if(pidlist[i] <= 0)
- break;
- }
-
- while(i >= npid){
- pid = waitpid();
- if(pid < 0){
- syslog(0, runqlog, "forkltd confused");
- exits(0);
- }
-
- for(i = 0; i < npid; i++)
- if(pidlist[i] == pid)
- break;
- }
- pidlist[i] = fork();
- return pidlist[i];
-}
-
-/*
- * run all user directories, must be bootes (or root on unix) to do this
- */
-void
-doalldirs(void)
-{
- Dir *db;
- int fd;
- long i, n;
-
-
- fd = open(".", OREAD);
- if(fd == -1){
- warning("reading %s", root);
- return;
- }
- n = dirreadall(fd, &db);
- if(n > 0){
- for(i=0; i<n; i++){
- if(db[i].qid.type & QTDIR){
- if(emptydir(db[i].name))
- continue;
- switch(forkltd()){
- case -1:
- syslog(0, runqlog, "out of procs");
- doload(0);
- exits(0);
- case 0:
- if(sysdetach() < 0)
- error("%r", 0);
- dodir(db[i].name);
- exits(0);
- default:
- break;
- }
- }
- }
- free(db);
- }
- close(fd);
-}
-
/*
* cd to a user directory and run it
*/
@@ -234,30 +152,57 @@
void
rundir(char *name)
{
- int fd;
- long i;
+ Job *hd, *j, **p;
+ int nlive, fidx, fd, found;
+ Waitmsg *w;
- if(aflag && sflag)
- fd = sysopenlocked(".", OREAD);
- else
- fd = open(".", OREAD);
+ fd = open(".", OREAD);
if(fd == -1){
warning("reading %s", name);
return;
}
+ fidx= 0;
+ hd = nil;
+ nlive = 0;
nfiles = dirreadall(fd, &dirbuf);
- if(nfiles > 0){
- for(i=0; i<nfiles; i++){
- if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
+ while(nlive > 0 || fidx< nfiles){
+ while(fidx< nfiles && nlive < njob){
+ if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
+ fidx++;
continue;
- dofile(&dirbuf[i]);
+ }
+ if((j = dofile(&dirbuf[fidx])) != nil){
+ nlive++;
+ j->next = hd;
+ hd = j;
+ }
+ fidx++;
}
- free(dirbuf);
+ if(nlive == 0){
+ fprint(2, "nothing live\n");
+ break;
+ }
+rescan:
+ if((w = wait()) == nil){
+ syslog(0, "runq", "wait error: %r");
+ break;
+ }
+ found = 0;
+ for(p = &hd; *p != nil; p = &(*p)->next){
+ if(w->pid == (*p)->pid){
+ *p = donefile(*p, w);
+ found++;
+ nlive--;
+ break;
+ }
+ }
+ free(w);
+ if(!found)
+ goto rescan;
}
- if(aflag && sflag)
- sysunlockfile(fd);
- else
- close(fd);
+ assert(hd == nil);
+ free(dirbuf);
+ close(fd);
}
/*
@@ -316,15 +261,14 @@
/*
* try a message
*/
-void
+Job*
dofile(Dir *dp)
{
+ int dtime, efd, i, etime;
+ Job *j;
+// Mlock *l;
Dir *d;
- int dfd, ac, dtime, efd, pid, i, etime;
- char *buf, *cp, **av;
- Waitmsg *wm;
- Biobuf *b;
- Mlock *l = nil;
+ char *cp;
if(debug)
fprint(2, "dofile %s\n", dp->name);
@@ -337,14 +281,14 @@
if(d == nil){
syslog(0, runqlog, "no data file for %s", dp->name);
remmatch(dp->name);
- return;
+ return nil;
}
if(dp->length == 0){
if(time(0)-dp->mtime > 15*60){
syslog(0, runqlog, "empty ctl file for %s", dp->name);
remmatch(dp->name);
}
- return;
+ return nil;
}
dtime = d->mtime;
free(d);
@@ -358,31 +302,35 @@
if(etime - dtime < 60*60){
/* up to the first hour, try every 15 minutes */
if(time(0) - etime < 15*60)
- return;
+ return nil;
} else {
/* after the first hour, try once an hour */
if(time(0) - etime < 60*60)
- return;
+ return nil;
}
-
}
/*
* open control and data
*/
- b = sysopen(file(dp->name, 'C'), "rl", 0660);
- if(b == 0) {
+ j = malloc(sizeof(Job));
+ if(j == nil)
+ return nil;
+ memset(j, 0, sizeof(Job));
+ j->dp = dp;
+ j->dfd = -1;
+ j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
+ if(j->b == 0) {
if(debug)
fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
- return;
+ return nil;
}
- dfd = open(file(dp->name, 'D'), OREAD);
- if(dfd < 0){
+ j->dfd = open(file(dp->name, 'D'), OREAD);
+ if(j->dfd < 0){
if(debug)
fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
- Bterm(b);
- sysunlockfile(Bfildes(b));
- return;
+ freejob(j);
+ return nil;
}
/*
@@ -390,48 +338,36 @@
* - read args into (malloc'd) buffer
* - malloc a vector and copy pointers to args into it
*/
- buf = malloc(dp->length+1);
- if(buf == 0){
+
+ j->buf = malloc(dp->length+1);
+ if(j->buf == nil){
warning("buffer allocation", 0);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- close(dfd);
- return;
+ freejob(j);
+ return nil;
}
- if(Bread(b, buf, dp->length) != dp->length){
+ if(Bread(j->b, j->buf, dp->length) != dp->length){
warning("reading control file %s\n", dp->name);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- close(dfd);
- free(buf);
- return;
+ freejob(j);
+ return nil;
}
- buf[dp->length] = 0;
- av = malloc(2*sizeof(char*));
- if(av == 0){
+ j->buf[dp->length] = 0;
+ j->av = malloc(2*sizeof(char*));
+ if(j->av == 0){
warning("argv allocation", 0);
- close(dfd);
- free(buf);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- return;
+ freejob(j);
+ return nil;
}
- for(ac = 1, cp = buf; *cp; ac++){
+ for(j->ac = 1, cp = j->buf; *cp; j->ac++){
while(isspace(*cp))
*cp++ = 0;
if(*cp == 0)
break;
- av = realloc(av, (ac+2)*sizeof(char*));
- if(av == 0){
+ j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
+ if(j->av == 0){
warning("argv allocation", 0);
- close(dfd);
- free(buf);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- return;
}
- av[ac] = cp;
+ j->av[j->ac] = cp;
while(*cp && !isspace(*cp)){
if(*cp++ == '"'){
while(*cp && *cp != '"')
@@ -441,18 +377,18 @@
}
}
}
- av[0] = cmd;
- av[ac] = 0;
+ j->av[0] = cmd;
+ j->av[j->ac] = 0;
if(!Eflag &&time(0) - dtime > giveup){
- if(returnmail(av, dp->name, "Giveup") != 0)
- logit("returnmail failed", dp->name, av);
+ if(returnmail(j->av, dp->name, "Giveup") != 0)
+ logit("returnmail failed", dp->name, j->av);
remmatch(dp->name);
goto done;
}
for(i = 0; i < nbad; i++){
- if(strcmp(av[3], badsys[i]) == 0)
+ if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
goto done;
}
@@ -460,33 +396,34 @@
* Ken's fs, for example, gives us 5 minutes of inactivity before
* the lock goes stale, so we have to keep reading it.
*/
- l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
+ j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
/*
* transfer
*/
- pid = fork();
- switch(pid){
+ j->pid = fork();
+ switch(j->pid){
case -1:
- sysunlock(l);
- sysunlockfile(Bfildes(b));
+ sysunlock(j->l);
+ sysunlockfile(Bfildes(j->b));
syslog(0, runqlog, "out of procs");
exits(0);
case 0:
if(debug) {
- fprint(2, "Starting %s", cmd);
- for(ac = 0; av[ac]; ac++)
- fprint(2, " %s", av[ac]);
- fprint(2, "\n");
+ fprint(2, "Starting %s\n", cmd);
+// for(i = 0; j->av[i]; i++)
+// fprint(2, " %s", j->av[i]);
+// fprint(2, "\n");
}
- logit("execing", dp->name, av);
+ logit("execing", dp->name, j->av);
close(0);
- dup(dfd, 0);
- close(dfd);
+ dup(j->dfd, 0);
+ close(j->dfd);
close(2);
efd = open(file(dp->name, 'E'), OWRITE);
if(efd < 0){
- if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
+ if(debug)
+ syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
efd = create(file(dp->name, 'E'), OWRITE, 0666);
if(efd < 0){
if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
@@ -494,50 +431,62 @@
}
}
seek(efd, 0, 2);
- exec(cmd, av);
+ exec(cmd, j->av);
error("can't exec %s", cmd);
break;
default:
- for(;;){
- wm = wait();
- if(wm == nil)
- error("wait failed: %r", "");
- if(wm->pid == pid)
- break;
- free(wm);
- }
- if(debug)
- fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
-
- if(wm->msg[0]){
- if(debug)
- fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
- if(!Rflag && strstr(wm->msg, "Retry")==0){
- /* return the message and remove it */
- if(returnmail(av, dp->name, wm->msg) != 0)
- logit("returnmail failed", dp->name, av);
- remmatch(dp->name);
- } else {
- /* add sys to bad list and try again later */
- nbad++;
- badsys = realloc(badsys, nbad*sizeof(char*));
- badsys[nbad-1] = strdup(av[3]);
- }
- } else {
- /* it worked remove the message */
- remmatch(dp->name);
- }
- free(wm);
-
+ return j;
}
done:
- if (l)
- sysunlock(l);
- Bterm(b);
- sysunlockfile(Bfildes(b));
- free(buf);
- free(av);
- close(dfd);
+ freejob(j);
+ return nil;
+}
+
+Job*
+donefile(Job *j, Waitmsg *wm)
+{
+ Job *n;
+
+ if(debug)
+ fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
+
+ if(wm->msg[0]){
+ if(debug)
+ fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
+ if(!Rflag && strstr(wm->msg, "Retry")==0){
+ /* return the message and remove it */
+ if(returnmail(j->av, j->dp->name, wm->msg) != 0)
+ logit("returnmail failed", j->dp->name, j->av);
+ remmatch(j->dp->name);
+ } else {
+ /* add sys to bad list and try again later */
+ nbad++;
+ badsys = realloc(badsys, nbad*sizeof(char*));
+ badsys[nbad-1] = strdup(j->av[3]);
+ }
+ } else {
+ /* it worked remove the message */
+ remmatch(j->dp->name);
+ }
+ n = j->next;
+ freejob(j);
+ return n;
+}
+
+void
+freejob(Job *j)
+{
+ if(j->b != nil){
+ sysunlockfile(Bfildes(j->b));
+ Bterm(j->b);
+ }
+ if(j->dfd != -1)
+ close(j->dfd);
+ if(j->l != nil)
+ sysunlock(j->l);
+ free(j->buf);
+ free(j->av);
+ free(j);
}
@@ -691,71 +640,3 @@
}
syslog(0, runqlog, "%s", buf);
}
-
-char *loadfile = ".runqload";
-
-/*
- * load balancing
- */
-void
-doload(int start)
-{
- int fd;
- char buf[32];
- int i, n;
- Mlock *l;
- Dir *d;
-
- if(load <= 0)
- return;
-
- if(chdir(root) < 0){
- load = 0;
- return;
- }
-
- l = syslock(loadfile);
- fd = open(loadfile, ORDWR);
- if(fd < 0){
- fd = create(loadfile, 0666, ORDWR);
- if(fd < 0){
- load = 0;
- sysunlock(l);
- return;
- }
- }
-
- /* get current load */
- i = 0;
- n = read(fd, buf, sizeof(buf)-1);
- if(n >= 0){
- buf[n] = 0;
- i = atoi(buf);
- }
- if(i < 0)
- i = 0;
-
- /* ignore load if file hasn't been changed in 30 minutes */
- d = dirfstat(fd);
- if(d != nil){
- if(d->mtime + 30*60 < time(0))
- i = 0;
- free(d);
- }
-
- /* if load already too high, give up */
- if(start && i >= load){
- sysunlock(l);
- exits(0);
- }
-
- /* increment/decrement load */
- if(start)
- i++;
- else
- i--;
- seek(fd, 0, 0);
- fprint(fd, "%d\n", i);
- sysunlock(l);
- close(fd);
-}
next prev parent reply other threads:[~2021-01-17 1:09 UTC|newest]
Thread overview: 4+ messages / expand[flat|nested] mbox.gz Atom feed top
2021-01-10 2:27 ori
2021-01-10 8:04 ` hiro
2021-01-17 0:42 ` ori [this message]
2021-01-17 1:06 ` ori
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=59390BC4E9DA865564F648841C225EB9@eigenstate.org \
--to=ori@eigenstate.org \
--cc=9front@9front.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).