9front - general discussion about 9front
 help / color / mirror / Atom feed
From: ori@eigenstate.org
To: 9front@9front.org
Subject: Re: [9front] upas/runq: parallel sending
Date: Sat, 16 Jan 2021 16:42:42 -0800	[thread overview]
Message-ID: <59390BC4E9DA865564F648841C225EB9@eigenstate.org> (raw)
In-Reply-To: <60563470E4A517681CAFF1552E11CD8E@eigenstate.org>

I've been running the previous patch for the
last week without problems: has anyone else
had a chance to test that it doesn't break
their systems?

Quoth ori@eigenstate.org:
> I'd also like to float the idea of removing the '-a'
> flag, and reclaiming the '-n' flag for this purpose.
> If anyone is runnign upas/runq with -a, I propose
> this alternative:
> 

Here's a more agressive version that removes
the multi-user queue sweeping code. I doubt
that anyone is using it. If they are, I'd be
interested in knowing if there's an issue with
migrating to xargs or a shell loop that runs
each job in parallel by directory.

If there are no objections, I'm going to let
this version bake for a week on my own system,
and then commit next weekend.

diff -r 24a38dd884b0 sys/src/cmd/upas/q/runq.c
--- a/sys/src/cmd/upas/q/runq.c	Sat Jan 16 16:17:27 2021 -0800
+++ b/sys/src/cmd/upas/q/runq.c	Sat Jan 16 16:40:41 2021 -0800
@@ -1,9 +1,25 @@
 #include "common.h"
 #include <ctype.h>
 
+typedef struct Job Job;
+
+struct Job {
+	Job	*next;
+	int	pid;
+	int	ac;
+	int	dfd;
+	char	**av;
+	char	*buf;	/* backing for av */
+	Dir	*dp;	/* not owned */
+	Mlock	*l;
+	Biobuf	*b;
+};
+
 void	doalldirs(void);
 void	dodir(char*);
-void	dofile(Dir*);
+Job*	dofile(Dir*);
+Job*	donefile(Job*, Waitmsg*);
+void	freejob(Job*);
 void	rundir(char*);
 char*	file(char*, char);
 void	warning(char*, void*);
@@ -17,7 +33,6 @@
 char	*root;
 int	debug;
 int	giveup = 2*24*60*60;
-int	load;
 int	limit;
 
 /* the current directory */
@@ -28,67 +43,48 @@
 
 char *runqlog = "runq";
 
-int	*pidlist;
 char	**badsys;		/* array of recalcitrant systems */
 int	nbad;
-int	npid = 50;
-int	sflag;			/* single thread per directory */
-int	aflag;			/* all directories */
+int	njob = 1;		/* number of concurrent jobs to invoke */
 int	Eflag;			/* ignore E.xxxxxx dates */
 int	Rflag;			/* no giving up, ever */
 
 void
 usage(void)
 {
-	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
+	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] [-p njobs] q-root cmd\n");
 	exits("");
 }
 
 void
 main(int argc, char **argv)
 {
-	char *qdir, *x;
+	char *qdir;
 
 	qdir = 0;
 
 	ARGBEGIN{
-	case 'l':
-		x = ARGF();
-		if(x == 0)
-			usage();
-		load = atoi(x);
-		if(load < 0)
-			load = 0;
-		break;
 	case 'E':
 		Eflag++;
 		break;
 	case 'R':	/* no giving up -- just leave stuff in the queue */
 		Rflag++;
 		break;
-	case 'a':
-		aflag++;
-		break;
 	case 'd':
 		debug++;
 		break;
 	case 'r':
-		limit = atoi(ARGF());
-		break;
-	case 's':
-		sflag++;
+		limit = atoi(EARGF(usage()));
 		break;
 	case 't':
-		giveup = 60*60*atoi(ARGF());
+		giveup = 60*60*atoi(EARGF(usage()));
 		break;
 	case 'q':
-		qdir = ARGF();
-		if(qdir == 0)
-			usage();
+		qdir = EARGF(usage());
 		break;
 	case 'n':
-		npid = atoi(ARGF());
-		if(npid == 0)
+		njob = atoi(EARGF(usage()));
+		if(njob == 0)
 			usage();
 		break;
 	}ARGEND;
@@ -96,27 +92,17 @@
 	if(argc != 2)
 		usage();
 
-	pidlist = malloc(npid*sizeof(*pidlist));
-	if(pidlist == 0)
-		error("can't malloc", 0);
-
-	if(aflag == 0 && qdir == 0) {
+	if(qdir == nil) 
 		qdir = getuser();
-		if(qdir == 0)
-			error("unknown user", 0);
-	}
+	if(qdir == nil)
+		error("unknown user", 0);
 	root = argv[0];
 	cmd = argv[1];
 
 	if(chdir(root) < 0)
 		error("can't cd to %s", root);
 
-	doload(1);
-	if(aflag)
-		doalldirs();
-	else
-		dodir(qdir);
-	doload(0);
+	dodir(qdir);
 	exits(0);
 }
 
@@ -142,74 +128,6 @@
 	return 0;
 }
 
-int
-forkltd(void)
-{
-	int i;
-	int pid;
-
-	for(i = 0; i < npid; i++){
-		if(pidlist[i] <= 0)
-			break;
-	}
-
-	while(i >= npid){
-		pid = waitpid();
-		if(pid < 0){
-			syslog(0, runqlog, "forkltd confused");
-			exits(0);
-		}
-
-		for(i = 0; i < npid; i++)
-			if(pidlist[i] == pid)
-				break;
-	}
-	pidlist[i] = fork();
-	return pidlist[i];
-}
-
-/*
- *  run all user directories, must be bootes (or root on unix) to do this
- */
-void
-doalldirs(void)
-{
-	Dir *db;
-	int fd;
-	long i, n;
-
-
-	fd = open(".", OREAD);
-	if(fd == -1){
-		warning("reading %s", root);
-		return;
-	}
-	n = dirreadall(fd, &db);
-	if(n > 0){
-		for(i=0; i<n; i++){
-			if(db[i].qid.type & QTDIR){
-				if(emptydir(db[i].name))
-					continue;
-				switch(forkltd()){
-				case -1:
-					syslog(0, runqlog, "out of procs");
-					doload(0);
-					exits(0);
-				case 0:
-					if(sysdetach() < 0)
-						error("%r", 0);
-					dodir(db[i].name);
-					exits(0);
-				default:
-					break;
-				}
-			}
-		}
-		free(db);
-	}
-	close(fd);
-}
-
 /*
  *  cd to a user directory and run it
  */
@@ -234,30 +152,57 @@
 void
 rundir(char *name)
 {
-	int fd;
-	long i;
+	Job *hd, *j, **p;
+	int nlive, fidx, fd, found;
+	Waitmsg *w;
 
-	if(aflag && sflag)
-		fd = sysopenlocked(".", OREAD);
-	else
-		fd = open(".", OREAD);
+	fd = open(".", OREAD);
 	if(fd == -1){
 		warning("reading %s", name);
 		return;
 	}
+	fidx= 0;
+	hd = nil;
+	nlive = 0;
 	nfiles = dirreadall(fd, &dirbuf);
-	if(nfiles > 0){
-		for(i=0; i<nfiles; i++){
-			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
+	while(nlive > 0 ||  fidx< nfiles){
+		while(fidx< nfiles && nlive < njob){
+			if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
+				fidx++;
 				continue;
-			dofile(&dirbuf[i]);
+			}
+			if((j = dofile(&dirbuf[fidx])) != nil){
+				nlive++;
+				j->next = hd;
+				hd = j;
+			}
+			fidx++;
 		}
-		free(dirbuf);
+		if(nlive == 0){
+			fprint(2, "nothing live\n");
+			break;
+		}
+rescan:
+		if((w = wait()) == nil){
+			syslog(0, "runq", "wait error: %r");
+			break;
+		}
+		found = 0;
+		for(p = &hd; *p != nil; p = &(*p)->next){
+			if(w->pid == (*p)->pid){
+				*p = donefile(*p, w);
+				found++;
+				nlive--;
+				break;
+			}
+		}
+		free(w);
+		if(!found)
+			goto rescan;
 	}
-	if(aflag && sflag)
-		sysunlockfile(fd);
-	else
-		close(fd);
+	assert(hd == nil);
+	free(dirbuf);
+	close(fd);
 }
 
 /*
@@ -316,15 +261,14 @@
 /*
  *  try a message
  */
-void
+Job*
 dofile(Dir *dp)
 {
+	int dtime, efd, i, etime;
+	Job *j;
+//	Mlock *l;
 	Dir *d;
-	int dfd, ac, dtime, efd, pid, i, etime;
-	char *buf, *cp, **av;
-	Waitmsg *wm;
-	Biobuf *b;
-	Mlock *l = nil;
+	char *cp;
 
 	if(debug)
 		fprint(2, "dofile %s\n", dp->name);
@@ -337,14 +281,14 @@
 	if(d == nil){
 		syslog(0, runqlog, "no data file for %s", dp->name);
 		remmatch(dp->name);
-		return;
+		return nil;
 	}
 	if(dp->length == 0){
 		if(time(0)-dp->mtime > 15*60){
 			syslog(0, runqlog, "empty ctl file for %s", dp->name);
 			remmatch(dp->name);
 		}
-		return;
+		return nil;
 	}
 	dtime = d->mtime;
 	free(d);
@@ -358,31 +302,35 @@
 		if(etime - dtime < 60*60){
 			/* up to the first hour, try every 15 minutes */
 			if(time(0) - etime < 15*60)
-				return;
+				return nil;
 		} else {
 			/* after the first hour, try once an hour */
 			if(time(0) - etime < 60*60)
-				return;
+				return nil;
 		}
-
 	}
 
 	/*
 	 *  open control and data
 	 */
-	b = sysopen(file(dp->name, 'C'), "rl", 0660);
-	if(b == 0) {
+	j = malloc(sizeof(Job));
+	if(j == nil)
+		return nil;
+	memset(j, 0, sizeof(Job));
+	j->dp = dp;
+	j->dfd = -1;
+	j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
+	if(j->b == 0) {
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
-		return;
+		return nil;
 	}
-	dfd = open(file(dp->name, 'D'), OREAD);
-	if(dfd < 0){
+	j->dfd = open(file(dp->name, 'D'), OREAD);
+	if(j->dfd < 0){
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
 
 	/*
@@ -390,48 +338,36 @@
 	 *	- read args into (malloc'd) buffer
 	 *	- malloc a vector and copy pointers to args into it
 	 */
-	buf = malloc(dp->length+1);
-	if(buf == 0){
+
+	j->buf = malloc(dp->length+1);
+	if(j->buf == nil){
 		warning("buffer allocation", 0);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		return;
+		freejob(j);
+		return nil;
 	}
-	if(Bread(b, buf, dp->length) != dp->length){
+	if(Bread(j->b, j->buf, dp->length) != dp->length){
 		warning("reading control file %s\n", dp->name);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		free(buf);
-		return;
+		freejob(j);
+		return nil;
 	}
-	buf[dp->length] = 0;
-	av = malloc(2*sizeof(char*));
-	if(av == 0){
+	j->buf[dp->length] = 0;
+	j->av = malloc(2*sizeof(char*));
+	if(j->av == 0){
 		warning("argv allocation", 0);
-		close(dfd);
-		free(buf);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
-	for(ac = 1, cp = buf; *cp; ac++){
+	for(j->ac = 1, cp = j->buf; *cp; j->ac++){
 		while(isspace(*cp))
 			*cp++ = 0;
 		if(*cp == 0)
 			break;
 
-		av = realloc(av, (ac+2)*sizeof(char*));
-		if(av == 0){
+		j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
+		if(j->av == 0){
 			warning("argv allocation", 0);
-			close(dfd);
-			free(buf);
-			Bterm(b);
-			sysunlockfile(Bfildes(b));
-			return;
 		}
-		av[ac] = cp;
+		j->av[j->ac] = cp;
 		while(*cp && !isspace(*cp)){
 			if(*cp++ == '"'){
 				while(*cp && *cp != '"')
@@ -441,18 +377,18 @@
 			}
 		}
 	}
-	av[0] = cmd;
-	av[ac] = 0;
+	j->av[0] = cmd;
+	j->av[j->ac] = 0;
 
 	if(!Eflag &&time(0) - dtime > giveup){
-		if(returnmail(av, dp->name, "Giveup") != 0)
-			logit("returnmail failed", dp->name, av);
+		if(returnmail(j->av, dp->name, "Giveup") != 0)
+			logit("returnmail failed", dp->name, j->av);
 		remmatch(dp->name);
 		goto done;
 	}
 
 	for(i = 0; i < nbad; i++){
-		if(strcmp(av[3], badsys[i]) == 0)
+		if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
 			goto done;
 	}
 
@@ -460,33 +396,34 @@
 	 * Ken's fs, for example, gives us 5 minutes of inactivity before
 	 * the lock goes stale, so we have to keep reading it.
  	 */
-	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
+	j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
 
 	/*
 	 *  transfer
 	 */
-	pid = fork();
-	switch(pid){
+	j->pid = fork();
+	switch(j->pid){
 	case -1:
-		sysunlock(l);
-		sysunlockfile(Bfildes(b));
+		sysunlock(j->l);
+		sysunlockfile(Bfildes(j->b));
 		syslog(0, runqlog, "out of procs");
 		exits(0);
 	case 0:
 		if(debug) {
-			fprint(2, "Starting %s", cmd);
-			for(ac = 0; av[ac]; ac++)
-				fprint(2, " %s", av[ac]);
-			fprint(2, "\n");
+			fprint(2, "Starting %s\n", cmd);
+//			for(i = 0; j->av[i]; i++)
+//				fprint(2, " %s", j->av[i]);
+//			fprint(2, "\n");
 		}
-		logit("execing", dp->name, av);
+		logit("execing", dp->name, j->av);
 		close(0);
-		dup(dfd, 0);
-		close(dfd);
+		dup(j->dfd, 0);
+		close(j->dfd);
 		close(2);
 		efd = open(file(dp->name, 'E'), OWRITE);
 		if(efd < 0){
-			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
+			if(debug)
+				syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
 			efd = create(file(dp->name, 'E'), OWRITE, 0666);
 			if(efd < 0){
 				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
@@ -494,50 +431,62 @@
 			}
 		}
 		seek(efd, 0, 2);
-		exec(cmd, av);
+		exec(cmd, j->av);
 		error("can't exec %s", cmd);
 		break;
 	default:
-		for(;;){
-			wm = wait();
-			if(wm == nil)
-				error("wait failed: %r", "");
-			if(wm->pid == pid)
-				break;
-			free(wm);
-		}
-		if(debug)
-			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
-
-		if(wm->msg[0]){
-			if(debug)
-				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
-			if(!Rflag && strstr(wm->msg, "Retry")==0){
-				/* return the message and remove it */
-				if(returnmail(av, dp->name, wm->msg) != 0)
-					logit("returnmail failed", dp->name, av);
-				remmatch(dp->name);
-			} else {
-				/* add sys to bad list and try again later */
-				nbad++;
-				badsys = realloc(badsys, nbad*sizeof(char*));
-				badsys[nbad-1] = strdup(av[3]);
-			}
-		} else {
-			/* it worked remove the message */
-			remmatch(dp->name);
-		}
-		free(wm);
-
+		return j;
 	}
 done:
-	if (l)
-		sysunlock(l);
-	Bterm(b);
-	sysunlockfile(Bfildes(b));
-	free(buf);
-	free(av);
-	close(dfd);
+	freejob(j);
+	return nil;
+}
+
+Job*
+donefile(Job *j, Waitmsg *wm)
+{
+	Job *n;
+
+	if(debug)
+		fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
+
+	if(wm->msg[0]){
+		if(debug)
+			fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
+		if(!Rflag && strstr(wm->msg, "Retry")==0){
+			/* return the message and remove it */
+			if(returnmail(j->av, j->dp->name, wm->msg) != 0)
+				logit("returnmail failed", j->dp->name, j->av);
+			remmatch(j->dp->name);
+		} else {
+			/* add sys to bad list and try again later */
+			nbad++;
+			badsys = realloc(badsys, nbad*sizeof(char*));
+			badsys[nbad-1] = strdup(j->av[3]);
+		}
+	} else {
+		/* it worked remove the message */
+		remmatch(j->dp->name);
+	}
+	n = j->next;
+	freejob(j);
+	return n;
+}
+
+void
+freejob(Job *j)
+{
+	if(j->b != nil){
+		sysunlockfile(Bfildes(j->b));
+		Bterm(j->b);
+	}
+	if(j->dfd != -1)
+		close(j->dfd);
+	if(j->l != nil)
+		sysunlock(j->l);
+	free(j->buf);
+	free(j->av);
+	free(j);
 }
 
 
@@ -691,71 +640,3 @@
 	}
 	syslog(0, runqlog, "%s", buf);
 }
-
-char *loadfile = ".runqload";
-
-/*
- *  load balancing
- */
-void
-doload(int start)
-{
-	int fd;
-	char buf[32];
-	int i, n;
-	Mlock *l;
-	Dir *d;
-
-	if(load <= 0)
-		return;
-
-	if(chdir(root) < 0){
-		load = 0;
-		return;
-	}
-
-	l = syslock(loadfile);
-	fd = open(loadfile, ORDWR);
-	if(fd < 0){
-		fd = create(loadfile, 0666, ORDWR);
-		if(fd < 0){
-			load = 0;
-			sysunlock(l);
-			return;
-		}
-	}
-
-	/* get current load */
-	i = 0;
-	n = read(fd, buf, sizeof(buf)-1);
-	if(n >= 0){
-		buf[n] = 0;
-		i = atoi(buf);
-	}
-	if(i < 0)
-		i = 0;
-
-	/* ignore load if file hasn't been changed in 30 minutes */
-	d = dirfstat(fd);
-	if(d != nil){
-		if(d->mtime + 30*60 < time(0))
-			i = 0;
-		free(d);
-	}
-
-	/* if load already too high, give up */
-	if(start && i >= load){
-		sysunlock(l);
-		exits(0);
-	}
-
-	/* increment/decrement load */
-	if(start)
-		i++;
-	else
-		i--;
-	seek(fd, 0, 0);
-	fprint(fd, "%d\n", i);
-	sysunlock(l);
-	close(fd);
-}

  parent reply	other threads:[~2021-01-17  1:09 UTC|newest]

Thread overview: 4+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-10  2:27 ori
2021-01-10  8:04 ` hiro
2021-01-17  0:42 ` ori [this message]
2021-01-17  1:06   ` ori

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=59390BC4E9DA865564F648841C225EB9@eigenstate.org \
    --to=ori@eigenstate.org \
    --cc=9front@9front.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).