9front - general discussion about 9front
 help / color / mirror / Atom feed
* [9front] upas/runq: parallel sending
@ 2021-01-10  2:27 ori
  2021-01-10  8:04 ` hiro
  2021-01-17  0:42 ` ori
  0 siblings, 2 replies; 4+ messages in thread
From: ori @ 2021-01-10  2:27 UTC (permalink / raw)
  To: 9front

As you may have noticed, you've been getting a
bunch of test messages from the 9front list. Sorry.
We've been shaking out some performance issues on a
new host.

The mailing list was spawning a runq process for every
subscriber, which were contending heavily on the queue
directory, causing high system load. On the new host,
disks were slow enough that this was causing issues.

This was fixed by paring down to a single runq process,
but that means that sending out emails for our hundreds
of subscribers can take a while.

This change can probably use some work, but it adds
a '-p njob' flag, which allows us to send the jobs in
a message queue with limited parallelism. That should
bring down the amount of time it takes for a message
to work its way through our mailing lists.

I'd also like to float the idea of removing the '-a'
flag, and reclaiming the '-n' flag for this purpose.
If anyone is runnign upas/runq with -a, I propose
this alternative:

fn runq-a {
	dirs=(/mail/queue/*)
	for(d in $dirs) upas/runq ... $d
}

with -n:

fn runq-a {
	dirs=(/mail/queue/*)
	for(d in $dirs) upas/runq ... $d &
	for(d in $dirs) wait
}


diff -r 808eb735fc45 sys/src/cmd/upas/q/runq.c
--- a/sys/src/cmd/upas/q/runq.c	Sat Jan 09 12:20:49 2021 -0800
+++ b/sys/src/cmd/upas/q/runq.c	Sat Jan 09 18:15:41 2021 -0800
@@ -1,9 +1,25 @@
 #include "common.h"
 #include <ctype.h>
 
+typedef struct Job Job;
+
+struct Job {
+	Job	*next;
+	int	pid;
+	int	ac;
+	int	dfd;
+	char	**av;
+	char	*buf;	/* backing for av */
+	Dir	*dp;	/* not owned */
+	Mlock	*l;
+	Biobuf	*b;
+};
+
 void	doalldirs(void);
 void	dodir(char*);
-void	dofile(Dir*);
+Job*	dofile(Dir*);
+Job*	donefile(Job*, Waitmsg*);
+void	freejob(Job*);
 void	rundir(char*);
 char*	file(char*, char);
 void	warning(char*, void*);
@@ -32,6 +48,7 @@
 char	**badsys;		/* array of recalcitrant systems */
 int	nbad;
 int	npid = 50;
+int	njob = 1;		/* number of concurrent jobs to invoke */
 int	sflag;			/* single thread per directory */
 int	aflag;			/* all directories */
 int	Eflag;			/* ignore E.xxxxxx dates */
@@ -40,7 +57,7 @@
 void
 usage(void)
 {
-	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
+	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] [-p njobs] q-root cmd\n");
 	exits("");
 }
 
@@ -86,6 +103,11 @@
 		if(qdir == 0)
 			usage();
 		break;
+	case 'p':
+		njob = atoi(ARGF());
+		if(njob == 0)
+			usage();
+		break;
 	case 'n':
 		npid = atoi(ARGF());
 		if(npid == 0)
@@ -234,8 +256,9 @@
 void
 rundir(char *name)
 {
-	int fd;
-	long i;
+	Job *hd, *j, **p;
+	int nlive, fidx, fd, found;
+	Waitmsg *w;
 
 	if(aflag && sflag)
 		fd = sysopenlocked(".", OREAD);
@@ -245,15 +268,47 @@
 		warning("reading %s", name);
 		return;
 	}
+	fidx= 0;
+	hd = nil;
+	nlive = 0;
 	nfiles = dirreadall(fd, &dirbuf);
-	if(nfiles > 0){
-		for(i=0; i<nfiles; i++){
-			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
+	while(nlive > 0 ||  fidx< nfiles){
+		while(fidx< nfiles && nlive < njob){
+			if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
+				fidx++;
 				continue;
-			dofile(&dirbuf[i]);
+			}
+			if((j = dofile(&dirbuf[fidx])) != nil){
+				nlive++;
+				j->next = hd;
+				hd = j;
+			}
+			fidx++;
 		}
-		free(dirbuf);
+		if(nlive == 0){
+			fprint(2, "nothing live\n");
+			break;
+		}
+rescan:
+		if((w = wait()) == nil){
+			syslog(0, "runq", "wait error: %r");
+			break;
+		}
+		found = 0;
+		for(p = &hd; *p != nil; p = &(*p)->next){
+			if(w->pid == (*p)->pid){
+				*p = donefile(*p, w);
+				found++;
+				nlive--;
+				break;
+			}
+		}
+		free(w);
+		if(!found)
+			goto rescan;
 	}
+	assert(hd == nil);
+	free(dirbuf);
 	if(aflag && sflag)
 		sysunlockfile(fd);
 	else
@@ -316,15 +371,14 @@
 /*
  *  try a message
  */
-void
+Job*
 dofile(Dir *dp)
 {
+	int dtime, efd, i, etime;
+	Job *j;
+//	Mlock *l;
 	Dir *d;
-	int dfd, ac, dtime, efd, pid, i, etime;
-	char *buf, *cp, **av;
-	Waitmsg *wm;
-	Biobuf *b;
-	Mlock *l = nil;
+	char *cp;
 
 	if(debug)
 		fprint(2, "dofile %s\n", dp->name);
@@ -337,14 +391,14 @@
 	if(d == nil){
 		syslog(0, runqlog, "no data file for %s", dp->name);
 		remmatch(dp->name);
-		return;
+		return nil;
 	}
 	if(dp->length == 0){
 		if(time(0)-dp->mtime > 15*60){
 			syslog(0, runqlog, "empty ctl file for %s", dp->name);
 			remmatch(dp->name);
 		}
-		return;
+		return nil;
 	}
 	dtime = d->mtime;
 	free(d);
@@ -358,31 +412,35 @@
 		if(etime - dtime < 60*60){
 			/* up to the first hour, try every 15 minutes */
 			if(time(0) - etime < 15*60)
-				return;
+				return nil;
 		} else {
 			/* after the first hour, try once an hour */
 			if(time(0) - etime < 60*60)
-				return;
+				return nil;
 		}
-
 	}
 
 	/*
 	 *  open control and data
 	 */
-	b = sysopen(file(dp->name, 'C'), "rl", 0660);
-	if(b == 0) {
+	j = malloc(sizeof(Job));
+	if(j == nil)
+		return nil;
+	memset(j, 0, sizeof(Job));
+	j->dp = dp;
+	j->dfd = -1;
+	j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
+	if(j->b == 0) {
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
-		return;
+		return nil;
 	}
-	dfd = open(file(dp->name, 'D'), OREAD);
-	if(dfd < 0){
+	j->dfd = open(file(dp->name, 'D'), OREAD);
+	if(j->dfd < 0){
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
 
 	/*
@@ -390,48 +448,36 @@
 	 *	- read args into (malloc'd) buffer
 	 *	- malloc a vector and copy pointers to args into it
 	 */
-	buf = malloc(dp->length+1);
-	if(buf == 0){
+
+	j->buf = malloc(dp->length+1);
+	if(j->buf == nil){
 		warning("buffer allocation", 0);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		return;
+		freejob(j);
+		return nil;
 	}
-	if(Bread(b, buf, dp->length) != dp->length){
+	if(Bread(j->b, j->buf, dp->length) != dp->length){
 		warning("reading control file %s\n", dp->name);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		free(buf);
-		return;
+		freejob(j);
+		return nil;
 	}
-	buf[dp->length] = 0;
-	av = malloc(2*sizeof(char*));
-	if(av == 0){
+	j->buf[dp->length] = 0;
+	j->av = malloc(2*sizeof(char*));
+	if(j->av == 0){
 		warning("argv allocation", 0);
-		close(dfd);
-		free(buf);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
-	for(ac = 1, cp = buf; *cp; ac++){
+	for(j->ac = 1, cp = j->buf; *cp; j->ac++){
 		while(isspace(*cp))
 			*cp++ = 0;
 		if(*cp == 0)
 			break;
 
-		av = realloc(av, (ac+2)*sizeof(char*));
-		if(av == 0){
+		j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
+		if(j->av == 0){
 			warning("argv allocation", 0);
-			close(dfd);
-			free(buf);
-			Bterm(b);
-			sysunlockfile(Bfildes(b));
-			return;
 		}
-		av[ac] = cp;
+		j->av[j->ac] = cp;
 		while(*cp && !isspace(*cp)){
 			if(*cp++ == '"'){
 				while(*cp && *cp != '"')
@@ -441,18 +487,18 @@
 			}
 		}
 	}
-	av[0] = cmd;
-	av[ac] = 0;
+	j->av[0] = cmd;
+	j->av[j->ac] = 0;
 
 	if(!Eflag &&time(0) - dtime > giveup){
-		if(returnmail(av, dp->name, "Giveup") != 0)
-			logit("returnmail failed", dp->name, av);
+		if(returnmail(j->av, dp->name, "Giveup") != 0)
+			logit("returnmail failed", dp->name, j->av);
 		remmatch(dp->name);
 		goto done;
 	}
 
 	for(i = 0; i < nbad; i++){
-		if(strcmp(av[3], badsys[i]) == 0)
+		if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
 			goto done;
 	}
 
@@ -460,33 +506,34 @@
 	 * Ken's fs, for example, gives us 5 minutes of inactivity before
 	 * the lock goes stale, so we have to keep reading it.
  	 */
-	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
+	j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
 
 	/*
 	 *  transfer
 	 */
-	pid = fork();
-	switch(pid){
+	j->pid = fork();
+	switch(j->pid){
 	case -1:
-		sysunlock(l);
-		sysunlockfile(Bfildes(b));
+		sysunlock(j->l);
+		sysunlockfile(Bfildes(j->b));
 		syslog(0, runqlog, "out of procs");
 		exits(0);
 	case 0:
 		if(debug) {
-			fprint(2, "Starting %s", cmd);
-			for(ac = 0; av[ac]; ac++)
-				fprint(2, " %s", av[ac]);
-			fprint(2, "\n");
+			fprint(2, "Starting %s\n", cmd);
+//			for(i = 0; j->av[i]; i++)
+//				fprint(2, " %s", j->av[i]);
+//			fprint(2, "\n");
 		}
-		logit("execing", dp->name, av);
+		logit("execing", dp->name, j->av);
 		close(0);
-		dup(dfd, 0);
-		close(dfd);
+		dup(j->dfd, 0);
+		close(j->dfd);
 		close(2);
 		efd = open(file(dp->name, 'E'), OWRITE);
 		if(efd < 0){
-			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
+			if(debug)
+				syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
 			efd = create(file(dp->name, 'E'), OWRITE, 0666);
 			if(efd < 0){
 				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
@@ -494,50 +541,62 @@
 			}
 		}
 		seek(efd, 0, 2);
-		exec(cmd, av);
+		exec(cmd, j->av);
 		error("can't exec %s", cmd);
 		break;
 	default:
-		for(;;){
-			wm = wait();
-			if(wm == nil)
-				error("wait failed: %r", "");
-			if(wm->pid == pid)
-				break;
-			free(wm);
-		}
-		if(debug)
-			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
-
-		if(wm->msg[0]){
-			if(debug)
-				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
-			if(!Rflag && strstr(wm->msg, "Retry")==0){
-				/* return the message and remove it */
-				if(returnmail(av, dp->name, wm->msg) != 0)
-					logit("returnmail failed", dp->name, av);
-				remmatch(dp->name);
-			} else {
-				/* add sys to bad list and try again later */
-				nbad++;
-				badsys = realloc(badsys, nbad*sizeof(char*));
-				badsys[nbad-1] = strdup(av[3]);
-			}
-		} else {
-			/* it worked remove the message */
-			remmatch(dp->name);
-		}
-		free(wm);
-
+		return j;
 	}
 done:
-	if (l)
-		sysunlock(l);
-	Bterm(b);
-	sysunlockfile(Bfildes(b));
-	free(buf);
-	free(av);
-	close(dfd);
+	freejob(j);
+	return nil;
+}
+
+Job*
+donefile(Job *j, Waitmsg *wm)
+{
+	Job *n;
+
+	if(debug)
+		fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
+
+	if(wm->msg[0]){
+		if(debug)
+			fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
+		if(!Rflag && strstr(wm->msg, "Retry")==0){
+			/* return the message and remove it */
+			if(returnmail(j->av, j->dp->name, wm->msg) != 0)
+				logit("returnmail failed", j->dp->name, j->av);
+			remmatch(j->dp->name);
+		} else {
+			/* add sys to bad list and try again later */
+			nbad++;
+			badsys = realloc(badsys, nbad*sizeof(char*));
+			badsys[nbad-1] = strdup(j->av[3]);
+		}
+	} else {
+		/* it worked remove the message */
+		remmatch(j->dp->name);
+	}
+	n = j->next;
+	freejob(j);
+	return n;
+}
+
+void
+freejob(Job *j)
+{
+	if(j->b != nil){
+		sysunlockfile(Bfildes(j->b));
+		Bterm(j->b);
+	}
+	if(j->dfd != -1)
+		close(j->dfd);
+	if(j->l != nil)
+		sysunlock(j->l);
+	free(j->buf);
+	free(j->av);
+	free(j);
 }
 
 

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2021-01-17  1:36 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-01-10  2:27 [9front] upas/runq: parallel sending ori
2021-01-10  8:04 ` hiro
2021-01-17  0:42 ` ori
2021-01-17  1:06   ` ori

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).