9front - general discussion about 9front
 help / color / mirror / Atom feed
From: ori@eigenstate.org
To: 9front@9front.org
Subject: [9front] upas/runq: parallel sending
Date: Sat, 09 Jan 2021 18:27:06 -0800	[thread overview]
Message-ID: <60563470E4A517681CAFF1552E11CD8E@eigenstate.org> (raw)

As you may have noticed, you've been getting a
bunch of test messages from the 9front list. Sorry.
We've been shaking out some performance issues on a
new host.

The mailing list was spawning a runq process for every
subscriber, which were contending heavily on the queue
directory, causing high system load. On the new host,
disks were slow enough that this was causing issues.

This was fixed by paring down to a single runq process,
but that means that sending out emails for our hundreds
of subscribers can take a while.

This change can probably use some work, but it adds
a '-p njob' flag, which allows us to send the jobs in
a message queue with limited parallelism. That should
bring down the amount of time it takes for a message
to work its way through our mailing lists.

I'd also like to float the idea of removing the '-a'
flag, and reclaiming the '-n' flag for this purpose.
If anyone is runnign upas/runq with -a, I propose
this alternative:

fn runq-a {
	dirs=(/mail/queue/*)
	for(d in $dirs) upas/runq ... $d
}

with -n:

fn runq-a {
	dirs=(/mail/queue/*)
	for(d in $dirs) upas/runq ... $d &
	for(d in $dirs) wait
}


diff -r 808eb735fc45 sys/src/cmd/upas/q/runq.c
--- a/sys/src/cmd/upas/q/runq.c	Sat Jan 09 12:20:49 2021 -0800
+++ b/sys/src/cmd/upas/q/runq.c	Sat Jan 09 18:15:41 2021 -0800
@@ -1,9 +1,25 @@
 #include "common.h"
 #include <ctype.h>
 
+typedef struct Job Job;
+
+struct Job {
+	Job	*next;
+	int	pid;
+	int	ac;
+	int	dfd;
+	char	**av;
+	char	*buf;	/* backing for av */
+	Dir	*dp;	/* not owned */
+	Mlock	*l;
+	Biobuf	*b;
+};
+
 void	doalldirs(void);
 void	dodir(char*);
-void	dofile(Dir*);
+Job*	dofile(Dir*);
+Job*	donefile(Job*, Waitmsg*);
+void	freejob(Job*);
 void	rundir(char*);
 char*	file(char*, char);
 void	warning(char*, void*);
@@ -32,6 +48,7 @@
 char	**badsys;		/* array of recalcitrant systems */
 int	nbad;
 int	npid = 50;
+int	njob = 1;		/* number of concurrent jobs to invoke */
 int	sflag;			/* single thread per directory */
 int	aflag;			/* all directories */
 int	Eflag;			/* ignore E.xxxxxx dates */
@@ -40,7 +57,7 @@
 void
 usage(void)
 {
-	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
+	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] [-p njobs] q-root cmd\n");
 	exits("");
 }
 
@@ -86,6 +103,11 @@
 		if(qdir == 0)
 			usage();
 		break;
+	case 'p':
+		njob = atoi(ARGF());
+		if(njob == 0)
+			usage();
+		break;
 	case 'n':
 		npid = atoi(ARGF());
 		if(npid == 0)
@@ -234,8 +256,9 @@
 void
 rundir(char *name)
 {
-	int fd;
-	long i;
+	Job *hd, *j, **p;
+	int nlive, fidx, fd, found;
+	Waitmsg *w;
 
 	if(aflag && sflag)
 		fd = sysopenlocked(".", OREAD);
@@ -245,15 +268,47 @@
 		warning("reading %s", name);
 		return;
 	}
+	fidx= 0;
+	hd = nil;
+	nlive = 0;
 	nfiles = dirreadall(fd, &dirbuf);
-	if(nfiles > 0){
-		for(i=0; i<nfiles; i++){
-			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
+	while(nlive > 0 ||  fidx< nfiles){
+		while(fidx< nfiles && nlive < njob){
+			if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
+				fidx++;
 				continue;
-			dofile(&dirbuf[i]);
+			}
+			if((j = dofile(&dirbuf[fidx])) != nil){
+				nlive++;
+				j->next = hd;
+				hd = j;
+			}
+			fidx++;
 		}
-		free(dirbuf);
+		if(nlive == 0){
+			fprint(2, "nothing live\n");
+			break;
+		}
+rescan:
+		if((w = wait()) == nil){
+			syslog(0, "runq", "wait error: %r");
+			break;
+		}
+		found = 0;
+		for(p = &hd; *p != nil; p = &(*p)->next){
+			if(w->pid == (*p)->pid){
+				*p = donefile(*p, w);
+				found++;
+				nlive--;
+				break;
+			}
+		}
+		free(w);
+		if(!found)
+			goto rescan;
 	}
+	assert(hd == nil);
+	free(dirbuf);
 	if(aflag && sflag)
 		sysunlockfile(fd);
 	else
@@ -316,15 +371,14 @@
 /*
  *  try a message
  */
-void
+Job*
 dofile(Dir *dp)
 {
+	int dtime, efd, i, etime;
+	Job *j;
+//	Mlock *l;
 	Dir *d;
-	int dfd, ac, dtime, efd, pid, i, etime;
-	char *buf, *cp, **av;
-	Waitmsg *wm;
-	Biobuf *b;
-	Mlock *l = nil;
+	char *cp;
 
 	if(debug)
 		fprint(2, "dofile %s\n", dp->name);
@@ -337,14 +391,14 @@
 	if(d == nil){
 		syslog(0, runqlog, "no data file for %s", dp->name);
 		remmatch(dp->name);
-		return;
+		return nil;
 	}
 	if(dp->length == 0){
 		if(time(0)-dp->mtime > 15*60){
 			syslog(0, runqlog, "empty ctl file for %s", dp->name);
 			remmatch(dp->name);
 		}
-		return;
+		return nil;
 	}
 	dtime = d->mtime;
 	free(d);
@@ -358,31 +412,35 @@
 		if(etime - dtime < 60*60){
 			/* up to the first hour, try every 15 minutes */
 			if(time(0) - etime < 15*60)
-				return;
+				return nil;
 		} else {
 			/* after the first hour, try once an hour */
 			if(time(0) - etime < 60*60)
-				return;
+				return nil;
 		}
-
 	}
 
 	/*
 	 *  open control and data
 	 */
-	b = sysopen(file(dp->name, 'C'), "rl", 0660);
-	if(b == 0) {
+	j = malloc(sizeof(Job));
+	if(j == nil)
+		return nil;
+	memset(j, 0, sizeof(Job));
+	j->dp = dp;
+	j->dfd = -1;
+	j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
+	if(j->b == 0) {
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
-		return;
+		return nil;
 	}
-	dfd = open(file(dp->name, 'D'), OREAD);
-	if(dfd < 0){
+	j->dfd = open(file(dp->name, 'D'), OREAD);
+	if(j->dfd < 0){
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
 
 	/*
@@ -390,48 +448,36 @@
 	 *	- read args into (malloc'd) buffer
 	 *	- malloc a vector and copy pointers to args into it
 	 */
-	buf = malloc(dp->length+1);
-	if(buf == 0){
+
+	j->buf = malloc(dp->length+1);
+	if(j->buf == nil){
 		warning("buffer allocation", 0);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		return;
+		freejob(j);
+		return nil;
 	}
-	if(Bread(b, buf, dp->length) != dp->length){
+	if(Bread(j->b, j->buf, dp->length) != dp->length){
 		warning("reading control file %s\n", dp->name);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		free(buf);
-		return;
+		freejob(j);
+		return nil;
 	}
-	buf[dp->length] = 0;
-	av = malloc(2*sizeof(char*));
-	if(av == 0){
+	j->buf[dp->length] = 0;
+	j->av = malloc(2*sizeof(char*));
+	if(j->av == 0){
 		warning("argv allocation", 0);
-		close(dfd);
-		free(buf);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
-	for(ac = 1, cp = buf; *cp; ac++){
+	for(j->ac = 1, cp = j->buf; *cp; j->ac++){
 		while(isspace(*cp))
 			*cp++ = 0;
 		if(*cp == 0)
 			break;
 
-		av = realloc(av, (ac+2)*sizeof(char*));
-		if(av == 0){
+		j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
+		if(j->av == 0){
 			warning("argv allocation", 0);
-			close(dfd);
-			free(buf);
-			Bterm(b);
-			sysunlockfile(Bfildes(b));
-			return;
 		}
-		av[ac] = cp;
+		j->av[j->ac] = cp;
 		while(*cp && !isspace(*cp)){
 			if(*cp++ == '"'){
 				while(*cp && *cp != '"')
@@ -441,18 +487,18 @@
 			}
 		}
 	}
-	av[0] = cmd;
-	av[ac] = 0;
+	j->av[0] = cmd;
+	j->av[j->ac] = 0;
 
 	if(!Eflag &&time(0) - dtime > giveup){
-		if(returnmail(av, dp->name, "Giveup") != 0)
-			logit("returnmail failed", dp->name, av);
+		if(returnmail(j->av, dp->name, "Giveup") != 0)
+			logit("returnmail failed", dp->name, j->av);
 		remmatch(dp->name);
 		goto done;
 	}
 
 	for(i = 0; i < nbad; i++){
-		if(strcmp(av[3], badsys[i]) == 0)
+		if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
 			goto done;
 	}
 
@@ -460,33 +506,34 @@
 	 * Ken's fs, for example, gives us 5 minutes of inactivity before
 	 * the lock goes stale, so we have to keep reading it.
  	 */
-	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
+	j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
 
 	/*
 	 *  transfer
 	 */
-	pid = fork();
-	switch(pid){
+	j->pid = fork();
+	switch(j->pid){
 	case -1:
-		sysunlock(l);
-		sysunlockfile(Bfildes(b));
+		sysunlock(j->l);
+		sysunlockfile(Bfildes(j->b));
 		syslog(0, runqlog, "out of procs");
 		exits(0);
 	case 0:
 		if(debug) {
-			fprint(2, "Starting %s", cmd);
-			for(ac = 0; av[ac]; ac++)
-				fprint(2, " %s", av[ac]);
-			fprint(2, "\n");
+			fprint(2, "Starting %s\n", cmd);
+//			for(i = 0; j->av[i]; i++)
+//				fprint(2, " %s", j->av[i]);
+//			fprint(2, "\n");
 		}
-		logit("execing", dp->name, av);
+		logit("execing", dp->name, j->av);
 		close(0);
-		dup(dfd, 0);
-		close(dfd);
+		dup(j->dfd, 0);
+		close(j->dfd);
 		close(2);
 		efd = open(file(dp->name, 'E'), OWRITE);
 		if(efd < 0){
-			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
+			if(debug)
+				syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
 			efd = create(file(dp->name, 'E'), OWRITE, 0666);
 			if(efd < 0){
 				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
@@ -494,50 +541,62 @@
 			}
 		}
 		seek(efd, 0, 2);
-		exec(cmd, av);
+		exec(cmd, j->av);
 		error("can't exec %s", cmd);
 		break;
 	default:
-		for(;;){
-			wm = wait();
-			if(wm == nil)
-				error("wait failed: %r", "");
-			if(wm->pid == pid)
-				break;
-			free(wm);
-		}
-		if(debug)
-			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
-
-		if(wm->msg[0]){
-			if(debug)
-				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
-			if(!Rflag && strstr(wm->msg, "Retry")==0){
-				/* return the message and remove it */
-				if(returnmail(av, dp->name, wm->msg) != 0)
-					logit("returnmail failed", dp->name, av);
-				remmatch(dp->name);
-			} else {
-				/* add sys to bad list and try again later */
-				nbad++;
-				badsys = realloc(badsys, nbad*sizeof(char*));
-				badsys[nbad-1] = strdup(av[3]);
-			}
-		} else {
-			/* it worked remove the message */
-			remmatch(dp->name);
-		}
-		free(wm);
-
+		return j;
 	}
 done:
-	if (l)
-		sysunlock(l);
-	Bterm(b);
-	sysunlockfile(Bfildes(b));
-	free(buf);
-	free(av);
-	close(dfd);
+	freejob(j);
+	return nil;
+}
+
+Job*
+donefile(Job *j, Waitmsg *wm)
+{
+	Job *n;
+
+	if(debug)
+		fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
+
+	if(wm->msg[0]){
+		if(debug)
+			fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
+		if(!Rflag && strstr(wm->msg, "Retry")==0){
+			/* return the message and remove it */
+			if(returnmail(j->av, j->dp->name, wm->msg) != 0)
+				logit("returnmail failed", j->dp->name, j->av);
+			remmatch(j->dp->name);
+		} else {
+			/* add sys to bad list and try again later */
+			nbad++;
+			badsys = realloc(badsys, nbad*sizeof(char*));
+			badsys[nbad-1] = strdup(j->av[3]);
+		}
+	} else {
+		/* it worked remove the message */
+		remmatch(j->dp->name);
+	}
+	n = j->next;
+	freejob(j);
+	return n;
+}
+
+void
+freejob(Job *j)
+{
+	if(j->b != nil){
+		sysunlockfile(Bfildes(j->b));
+		Bterm(j->b);
+	}
+	if(j->dfd != -1)
+		close(j->dfd);
+	if(j->l != nil)
+		sysunlock(j->l);
+	free(j->buf);
+	free(j->av);
+	free(j);
 }
 
 

             reply	other threads:[~2021-01-10  2:57 UTC|newest]

Thread overview: 4+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-10  2:27 ori [this message]
2021-01-10  8:04 ` hiro
2021-01-17  0:42 ` ori
2021-01-17  1:06   ` ori

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=60563470E4A517681CAFF1552E11CD8E@eigenstate.org \
    --to=ori@eigenstate.org \
    --cc=9front@9front.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).