9front - general discussion about 9front
 help / color / mirror / Atom feed
* [9front] upas/runq: parallel sending
@ 2021-01-10  2:27 ori
  2021-01-10  8:04 ` hiro
  2021-01-17  0:42 ` ori
  0 siblings, 2 replies; 4+ messages in thread
From: ori @ 2021-01-10  2:27 UTC (permalink / raw)
  To: 9front

As you may have noticed, you've been getting a
bunch of test messages from the 9front list. Sorry.
We've been shaking out some performance issues on a
new host.

The mailing list was spawning a runq process for every
subscriber, which were contending heavily on the queue
directory, causing high system load. On the new host,
disks were slow enough that this was causing issues.

This was fixed by paring down to a single runq process,
but that means that sending out emails for our hundreds
of subscribers can take a while.

This change can probably use some work, but it adds
a '-p njob' flag, which allows us to send the jobs in
a message queue with limited parallelism. That should
bring down the amount of time it takes for a message
to work its way through our mailing lists.

I'd also like to float the idea of removing the '-a'
flag, and reclaiming the '-n' flag for this purpose.
If anyone is runnign upas/runq with -a, I propose
this alternative:

fn runq-a {
	dirs=(/mail/queue/*)
	for(d in $dirs) upas/runq ... $d
}

with -n:

fn runq-a {
	dirs=(/mail/queue/*)
	for(d in $dirs) upas/runq ... $d &
	for(d in $dirs) wait
}


diff -r 808eb735fc45 sys/src/cmd/upas/q/runq.c
--- a/sys/src/cmd/upas/q/runq.c	Sat Jan 09 12:20:49 2021 -0800
+++ b/sys/src/cmd/upas/q/runq.c	Sat Jan 09 18:15:41 2021 -0800
@@ -1,9 +1,25 @@
 #include "common.h"
 #include <ctype.h>
 
+typedef struct Job Job;
+
+struct Job {
+	Job	*next;
+	int	pid;
+	int	ac;
+	int	dfd;
+	char	**av;
+	char	*buf;	/* backing for av */
+	Dir	*dp;	/* not owned */
+	Mlock	*l;
+	Biobuf	*b;
+};
+
 void	doalldirs(void);
 void	dodir(char*);
-void	dofile(Dir*);
+Job*	dofile(Dir*);
+Job*	donefile(Job*, Waitmsg*);
+void	freejob(Job*);
 void	rundir(char*);
 char*	file(char*, char);
 void	warning(char*, void*);
@@ -32,6 +48,7 @@
 char	**badsys;		/* array of recalcitrant systems */
 int	nbad;
 int	npid = 50;
+int	njob = 1;		/* number of concurrent jobs to invoke */
 int	sflag;			/* single thread per directory */
 int	aflag;			/* all directories */
 int	Eflag;			/* ignore E.xxxxxx dates */
@@ -40,7 +57,7 @@
 void
 usage(void)
 {
-	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
+	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] [-p njobs] q-root cmd\n");
 	exits("");
 }
 
@@ -86,6 +103,11 @@
 		if(qdir == 0)
 			usage();
 		break;
+	case 'p':
+		njob = atoi(ARGF());
+		if(njob == 0)
+			usage();
+		break;
 	case 'n':
 		npid = atoi(ARGF());
 		if(npid == 0)
@@ -234,8 +256,9 @@
 void
 rundir(char *name)
 {
-	int fd;
-	long i;
+	Job *hd, *j, **p;
+	int nlive, fidx, fd, found;
+	Waitmsg *w;
 
 	if(aflag && sflag)
 		fd = sysopenlocked(".", OREAD);
@@ -245,15 +268,47 @@
 		warning("reading %s", name);
 		return;
 	}
+	fidx= 0;
+	hd = nil;
+	nlive = 0;
 	nfiles = dirreadall(fd, &dirbuf);
-	if(nfiles > 0){
-		for(i=0; i<nfiles; i++){
-			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
+	while(nlive > 0 ||  fidx< nfiles){
+		while(fidx< nfiles && nlive < njob){
+			if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
+				fidx++;
 				continue;
-			dofile(&dirbuf[i]);
+			}
+			if((j = dofile(&dirbuf[fidx])) != nil){
+				nlive++;
+				j->next = hd;
+				hd = j;
+			}
+			fidx++;
 		}
-		free(dirbuf);
+		if(nlive == 0){
+			fprint(2, "nothing live\n");
+			break;
+		}
+rescan:
+		if((w = wait()) == nil){
+			syslog(0, "runq", "wait error: %r");
+			break;
+		}
+		found = 0;
+		for(p = &hd; *p != nil; p = &(*p)->next){
+			if(w->pid == (*p)->pid){
+				*p = donefile(*p, w);
+				found++;
+				nlive--;
+				break;
+			}
+		}
+		free(w);
+		if(!found)
+			goto rescan;
 	}
+	assert(hd == nil);
+	free(dirbuf);
 	if(aflag && sflag)
 		sysunlockfile(fd);
 	else
@@ -316,15 +371,14 @@
 /*
  *  try a message
  */
-void
+Job*
 dofile(Dir *dp)
 {
+	int dtime, efd, i, etime;
+	Job *j;
+//	Mlock *l;
 	Dir *d;
-	int dfd, ac, dtime, efd, pid, i, etime;
-	char *buf, *cp, **av;
-	Waitmsg *wm;
-	Biobuf *b;
-	Mlock *l = nil;
+	char *cp;
 
 	if(debug)
 		fprint(2, "dofile %s\n", dp->name);
@@ -337,14 +391,14 @@
 	if(d == nil){
 		syslog(0, runqlog, "no data file for %s", dp->name);
 		remmatch(dp->name);
-		return;
+		return nil;
 	}
 	if(dp->length == 0){
 		if(time(0)-dp->mtime > 15*60){
 			syslog(0, runqlog, "empty ctl file for %s", dp->name);
 			remmatch(dp->name);
 		}
-		return;
+		return nil;
 	}
 	dtime = d->mtime;
 	free(d);
@@ -358,31 +412,35 @@
 		if(etime - dtime < 60*60){
 			/* up to the first hour, try every 15 minutes */
 			if(time(0) - etime < 15*60)
-				return;
+				return nil;
 		} else {
 			/* after the first hour, try once an hour */
 			if(time(0) - etime < 60*60)
-				return;
+				return nil;
 		}
-
 	}
 
 	/*
 	 *  open control and data
 	 */
-	b = sysopen(file(dp->name, 'C'), "rl", 0660);
-	if(b == 0) {
+	j = malloc(sizeof(Job));
+	if(j == nil)
+		return nil;
+	memset(j, 0, sizeof(Job));
+	j->dp = dp;
+	j->dfd = -1;
+	j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
+	if(j->b == 0) {
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
-		return;
+		return nil;
 	}
-	dfd = open(file(dp->name, 'D'), OREAD);
-	if(dfd < 0){
+	j->dfd = open(file(dp->name, 'D'), OREAD);
+	if(j->dfd < 0){
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
 
 	/*
@@ -390,48 +448,36 @@
 	 *	- read args into (malloc'd) buffer
 	 *	- malloc a vector and copy pointers to args into it
 	 */
-	buf = malloc(dp->length+1);
-	if(buf == 0){
+
+	j->buf = malloc(dp->length+1);
+	if(j->buf == nil){
 		warning("buffer allocation", 0);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		return;
+		freejob(j);
+		return nil;
 	}
-	if(Bread(b, buf, dp->length) != dp->length){
+	if(Bread(j->b, j->buf, dp->length) != dp->length){
 		warning("reading control file %s\n", dp->name);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		free(buf);
-		return;
+		freejob(j);
+		return nil;
 	}
-	buf[dp->length] = 0;
-	av = malloc(2*sizeof(char*));
-	if(av == 0){
+	j->buf[dp->length] = 0;
+	j->av = malloc(2*sizeof(char*));
+	if(j->av == 0){
 		warning("argv allocation", 0);
-		close(dfd);
-		free(buf);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
-	for(ac = 1, cp = buf; *cp; ac++){
+	for(j->ac = 1, cp = j->buf; *cp; j->ac++){
 		while(isspace(*cp))
 			*cp++ = 0;
 		if(*cp == 0)
 			break;
 
-		av = realloc(av, (ac+2)*sizeof(char*));
-		if(av == 0){
+		j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
+		if(j->av == 0){
 			warning("argv allocation", 0);
-			close(dfd);
-			free(buf);
-			Bterm(b);
-			sysunlockfile(Bfildes(b));
-			return;
 		}
-		av[ac] = cp;
+		j->av[j->ac] = cp;
 		while(*cp && !isspace(*cp)){
 			if(*cp++ == '"'){
 				while(*cp && *cp != '"')
@@ -441,18 +487,18 @@
 			}
 		}
 	}
-	av[0] = cmd;
-	av[ac] = 0;
+	j->av[0] = cmd;
+	j->av[j->ac] = 0;
 
 	if(!Eflag &&time(0) - dtime > giveup){
-		if(returnmail(av, dp->name, "Giveup") != 0)
-			logit("returnmail failed", dp->name, av);
+		if(returnmail(j->av, dp->name, "Giveup") != 0)
+			logit("returnmail failed", dp->name, j->av);
 		remmatch(dp->name);
 		goto done;
 	}
 
 	for(i = 0; i < nbad; i++){
-		if(strcmp(av[3], badsys[i]) == 0)
+		if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
 			goto done;
 	}
 
@@ -460,33 +506,34 @@
 	 * Ken's fs, for example, gives us 5 minutes of inactivity before
 	 * the lock goes stale, so we have to keep reading it.
  	 */
-	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
+	j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
 
 	/*
 	 *  transfer
 	 */
-	pid = fork();
-	switch(pid){
+	j->pid = fork();
+	switch(j->pid){
 	case -1:
-		sysunlock(l);
-		sysunlockfile(Bfildes(b));
+		sysunlock(j->l);
+		sysunlockfile(Bfildes(j->b));
 		syslog(0, runqlog, "out of procs");
 		exits(0);
 	case 0:
 		if(debug) {
-			fprint(2, "Starting %s", cmd);
-			for(ac = 0; av[ac]; ac++)
-				fprint(2, " %s", av[ac]);
-			fprint(2, "\n");
+			fprint(2, "Starting %s\n", cmd);
+//			for(i = 0; j->av[i]; i++)
+//				fprint(2, " %s", j->av[i]);
+//			fprint(2, "\n");
 		}
-		logit("execing", dp->name, av);
+		logit("execing", dp->name, j->av);
 		close(0);
-		dup(dfd, 0);
-		close(dfd);
+		dup(j->dfd, 0);
+		close(j->dfd);
 		close(2);
 		efd = open(file(dp->name, 'E'), OWRITE);
 		if(efd < 0){
-			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
+			if(debug)
+				syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
 			efd = create(file(dp->name, 'E'), OWRITE, 0666);
 			if(efd < 0){
 				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
@@ -494,50 +541,62 @@
 			}
 		}
 		seek(efd, 0, 2);
-		exec(cmd, av);
+		exec(cmd, j->av);
 		error("can't exec %s", cmd);
 		break;
 	default:
-		for(;;){
-			wm = wait();
-			if(wm == nil)
-				error("wait failed: %r", "");
-			if(wm->pid == pid)
-				break;
-			free(wm);
-		}
-		if(debug)
-			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
-
-		if(wm->msg[0]){
-			if(debug)
-				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
-			if(!Rflag && strstr(wm->msg, "Retry")==0){
-				/* return the message and remove it */
-				if(returnmail(av, dp->name, wm->msg) != 0)
-					logit("returnmail failed", dp->name, av);
-				remmatch(dp->name);
-			} else {
-				/* add sys to bad list and try again later */
-				nbad++;
-				badsys = realloc(badsys, nbad*sizeof(char*));
-				badsys[nbad-1] = strdup(av[3]);
-			}
-		} else {
-			/* it worked remove the message */
-			remmatch(dp->name);
-		}
-		free(wm);
-
+		return j;
 	}
 done:
-	if (l)
-		sysunlock(l);
-	Bterm(b);
-	sysunlockfile(Bfildes(b));
-	free(buf);
-	free(av);
-	close(dfd);
+	freejob(j);
+	return nil;
+}
+
+Job*
+donefile(Job *j, Waitmsg *wm)
+{
+	Job *n;
+
+	if(debug)
+		fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
+
+	if(wm->msg[0]){
+		if(debug)
+			fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
+		if(!Rflag && strstr(wm->msg, "Retry")==0){
+			/* return the message and remove it */
+			if(returnmail(j->av, j->dp->name, wm->msg) != 0)
+				logit("returnmail failed", j->dp->name, j->av);
+			remmatch(j->dp->name);
+		} else {
+			/* add sys to bad list and try again later */
+			nbad++;
+			badsys = realloc(badsys, nbad*sizeof(char*));
+			badsys[nbad-1] = strdup(j->av[3]);
+		}
+	} else {
+		/* it worked remove the message */
+		remmatch(j->dp->name);
+	}
+	n = j->next;
+	freejob(j);
+	return n;
+}
+
+void
+freejob(Job *j)
+{
+	if(j->b != nil){
+		sysunlockfile(Bfildes(j->b));
+		Bterm(j->b);
+	}
+	if(j->dfd != -1)
+		close(j->dfd);
+	if(j->l != nil)
+		sysunlock(j->l);
+	free(j->buf);
+	free(j->av);
+	free(j);
 }
 
 

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [9front] upas/runq: parallel sending
  2021-01-10  2:27 [9front] upas/runq: parallel sending ori
@ 2021-01-10  8:04 ` hiro
  2021-01-17  0:42 ` ori
  1 sibling, 0 replies; 4+ messages in thread
From: hiro @ 2021-01-10  8:04 UTC (permalink / raw)
  To: 9front

well it took less than 600 seconds to receive all 3 mails on the
mailing list for me.
but it seems i'm missing 2 mails for rather hours or days in the meantime...
it feels like that is not just a delay but an error any more.

On 1/10/21, ori@eigenstate.org <ori@eigenstate.org> wrote:
> As you may have noticed, you've been getting a
> bunch of test messages from the 9front list. Sorry.
> We've been shaking out some performance issues on a
> new host.
>
> The mailing list was spawning a runq process for every
> subscriber, which were contending heavily on the queue
> directory, causing high system load. On the new host,
> disks were slow enough that this was causing issues.
>
> This was fixed by paring down to a single runq process,
> but that means that sending out emails for our hundreds
> of subscribers can take a while.
>
> This change can probably use some work, but it adds
> a '-p njob' flag, which allows us to send the jobs in
> a message queue with limited parallelism. That should
> bring down the amount of time it takes for a message
> to work its way through our mailing lists.
>
> I'd also like to float the idea of removing the '-a'
> flag, and reclaiming the '-n' flag for this purpose.
> If anyone is runnign upas/runq with -a, I propose
> this alternative:
>
> fn runq-a {
> 	dirs=(/mail/queue/*)
> 	for(d in $dirs) upas/runq ... $d
> }
>
> with -n:
>
> fn runq-a {
> 	dirs=(/mail/queue/*)
> 	for(d in $dirs) upas/runq ... $d &
> 	for(d in $dirs) wait
> }
>
>
> diff -r 808eb735fc45 sys/src/cmd/upas/q/runq.c
> --- a/sys/src/cmd/upas/q/runq.c	Sat Jan 09 12:20:49 2021 -0800
> +++ b/sys/src/cmd/upas/q/runq.c	Sat Jan 09 18:15:41 2021 -0800
> @@ -1,9 +1,25 @@
>  #include "common.h"
>  #include <ctype.h>
>
> +typedef struct Job Job;
> +
> +struct Job {
> +	Job	*next;
> +	int	pid;
> +	int	ac;
> +	int	dfd;
> +	char	**av;
> +	char	*buf;	/* backing for av */
> +	Dir	*dp;	/* not owned */
> +	Mlock	*l;
> +	Biobuf	*b;
> +};
> +
>  void	doalldirs(void);
>  void	dodir(char*);
> -void	dofile(Dir*);
> +Job*	dofile(Dir*);
> +Job*	donefile(Job*, Waitmsg*);
> +void	freejob(Job*);
>  void	rundir(char*);
>  char*	file(char*, char);
>  void	warning(char*, void*);
> @@ -32,6 +48,7 @@
>  char	**badsys;		/* array of recalcitrant systems */
>  int	nbad;
>  int	npid = 50;
> +int	njob = 1;		/* number of concurrent jobs to invoke */
>  int	sflag;			/* single thread per directory */
>  int	aflag;			/* all directories */
>  int	Eflag;			/* ignore E.xxxxxx dates */
> @@ -40,7 +57,7 @@
>  void
>  usage(void)
>  {
> -	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles]
> [-n nprocs] q-root cmd\n");
> +	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles]
> [-n nprocs] [-p njobs] q-root cmd\n");
>  	exits("");
>  }
>
> @@ -86,6 +103,11 @@
>  		if(qdir == 0)
>  			usage();
>  		break;
> +	case 'p':
> +		njob = atoi(ARGF());
> +		if(njob == 0)
> +			usage();
> +		break;
>  	case 'n':
>  		npid = atoi(ARGF());
>  		if(npid == 0)
> @@ -234,8 +256,9 @@
>  void
>  rundir(char *name)
>  {
> -	int fd;
> -	long i;
> +	Job *hd, *j, **p;
> +	int nlive, fidx, fd, found;
> +	Waitmsg *w;
>
>  	if(aflag && sflag)
>  		fd = sysopenlocked(".", OREAD);
> @@ -245,15 +268,47 @@
>  		warning("reading %s", name);
>  		return;
>  	}
> +	fidx= 0;
> +	hd = nil;
> +	nlive = 0;
>  	nfiles = dirreadall(fd, &dirbuf);
> -	if(nfiles > 0){
> -		for(i=0; i<nfiles; i++){
> -			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
> +	while(nlive > 0 ||  fidx< nfiles){
> +		while(fidx< nfiles && nlive < njob){
> +			if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
> +				fidx++;
>  				continue;
> -			dofile(&dirbuf[i]);
> +			}
> +			if((j = dofile(&dirbuf[fidx])) != nil){
> +				nlive++;
> +				j->next = hd;
> +				hd = j;
> +			}
> +			fidx++;
>  		}
> -		free(dirbuf);
> +		if(nlive == 0){
> +			fprint(2, "nothing live\n");
> +			break;
> +		}
> +rescan:
> +		if((w = wait()) == nil){
> +			syslog(0, "runq", "wait error: %r");
> +			break;
> +		}
> +		found = 0;
> +		for(p = &hd; *p != nil; p = &(*p)->next){
> +			if(w->pid == (*p)->pid){
> +				*p = donefile(*p, w);
> +				found++;
> +				nlive--;
> +				break;
> +			}
> +		}
> +		free(w);
> +		if(!found)
> +			goto rescan;
>  	}
> +	assert(hd == nil);
> +	free(dirbuf);
>  	if(aflag && sflag)
>  		sysunlockfile(fd);
>  	else
> @@ -316,15 +371,14 @@
>  /*
>   *  try a message
>   */
> -void
> +Job*
>  dofile(Dir *dp)
>  {
> +	int dtime, efd, i, etime;
> +	Job *j;
> +//	Mlock *l;
>  	Dir *d;
> -	int dfd, ac, dtime, efd, pid, i, etime;
> -	char *buf, *cp, **av;
> -	Waitmsg *wm;
> -	Biobuf *b;
> -	Mlock *l = nil;
> +	char *cp;
>
>  	if(debug)
>  		fprint(2, "dofile %s\n", dp->name);
> @@ -337,14 +391,14 @@
>  	if(d == nil){
>  		syslog(0, runqlog, "no data file for %s", dp->name);
>  		remmatch(dp->name);
> -		return;
> +		return nil;
>  	}
>  	if(dp->length == 0){
>  		if(time(0)-dp->mtime > 15*60){
>  			syslog(0, runqlog, "empty ctl file for %s", dp->name);
>  			remmatch(dp->name);
>  		}
> -		return;
> +		return nil;
>  	}
>  	dtime = d->mtime;
>  	free(d);
> @@ -358,31 +412,35 @@
>  		if(etime - dtime < 60*60){
>  			/* up to the first hour, try every 15 minutes */
>  			if(time(0) - etime < 15*60)
> -				return;
> +				return nil;
>  		} else {
>  			/* after the first hour, try once an hour */
>  			if(time(0) - etime < 60*60)
> -				return;
> +				return nil;
>  		}
> -
>  	}
>
>  	/*
>  	 *  open control and data
>  	 */
> -	b = sysopen(file(dp->name, 'C'), "rl", 0660);
> -	if(b == 0) {
> +	j = malloc(sizeof(Job));
> +	if(j == nil)
> +		return nil;
> +	memset(j, 0, sizeof(Job));
> +	j->dp = dp;
> +	j->dfd = -1;
> +	j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
> +	if(j->b == 0) {
>  		if(debug)
>  			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
> -		return;
> +		return nil;
>  	}
> -	dfd = open(file(dp->name, 'D'), OREAD);
> -	if(dfd < 0){
> +	j->dfd = open(file(dp->name, 'D'), OREAD);
> +	if(j->dfd < 0){
>  		if(debug)
>  			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
> -		Bterm(b);
> -		sysunlockfile(Bfildes(b));
> -		return;
> +		freejob(j);
> +		return nil;
>  	}
>
>  	/*
> @@ -390,48 +448,36 @@
>  	 *	- read args into (malloc'd) buffer
>  	 *	- malloc a vector and copy pointers to args into it
>  	 */
> -	buf = malloc(dp->length+1);
> -	if(buf == 0){
> +
> +	j->buf = malloc(dp->length+1);
> +	if(j->buf == nil){
>  		warning("buffer allocation", 0);
> -		Bterm(b);
> -		sysunlockfile(Bfildes(b));
> -		close(dfd);
> -		return;
> +		freejob(j);
> +		return nil;
>  	}
> -	if(Bread(b, buf, dp->length) != dp->length){
> +	if(Bread(j->b, j->buf, dp->length) != dp->length){
>  		warning("reading control file %s\n", dp->name);
> -		Bterm(b);
> -		sysunlockfile(Bfildes(b));
> -		close(dfd);
> -		free(buf);
> -		return;
> +		freejob(j);
> +		return nil;
>  	}
> -	buf[dp->length] = 0;
> -	av = malloc(2*sizeof(char*));
> -	if(av == 0){
> +	j->buf[dp->length] = 0;
> +	j->av = malloc(2*sizeof(char*));
> +	if(j->av == 0){
>  		warning("argv allocation", 0);
> -		close(dfd);
> -		free(buf);
> -		Bterm(b);
> -		sysunlockfile(Bfildes(b));
> -		return;
> +		freejob(j);
> +		return nil;
>  	}
> -	for(ac = 1, cp = buf; *cp; ac++){
> +	for(j->ac = 1, cp = j->buf; *cp; j->ac++){
>  		while(isspace(*cp))
>  			*cp++ = 0;
>  		if(*cp == 0)
>  			break;
>
> -		av = realloc(av, (ac+2)*sizeof(char*));
> -		if(av == 0){
> +		j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
> +		if(j->av == 0){
>  			warning("argv allocation", 0);
> -			close(dfd);
> -			free(buf);
> -			Bterm(b);
> -			sysunlockfile(Bfildes(b));
> -			return;
>  		}
> -		av[ac] = cp;
> +		j->av[j->ac] = cp;
>  		while(*cp && !isspace(*cp)){
>  			if(*cp++ == '"'){
>  				while(*cp && *cp != '"')
> @@ -441,18 +487,18 @@
>  			}
>  		}
>  	}
> -	av[0] = cmd;
> -	av[ac] = 0;
> +	j->av[0] = cmd;
> +	j->av[j->ac] = 0;
>
>  	if(!Eflag &&time(0) - dtime > giveup){
> -		if(returnmail(av, dp->name, "Giveup") != 0)
> -			logit("returnmail failed", dp->name, av);
> +		if(returnmail(j->av, dp->name, "Giveup") != 0)
> +			logit("returnmail failed", dp->name, j->av);
>  		remmatch(dp->name);
>  		goto done;
>  	}
>
>  	for(i = 0; i < nbad; i++){
> -		if(strcmp(av[3], badsys[i]) == 0)
> +		if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
>  			goto done;
>  	}
>
> @@ -460,33 +506,34 @@
>  	 * Ken's fs, for example, gives us 5 minutes of inactivity before
>  	 * the lock goes stale, so we have to keep reading it.
>   	 */
> -	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
> +	j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
>
>  	/*
>  	 *  transfer
>  	 */
> -	pid = fork();
> -	switch(pid){
> +	j->pid = fork();
> +	switch(j->pid){
>  	case -1:
> -		sysunlock(l);
> -		sysunlockfile(Bfildes(b));
> +		sysunlock(j->l);
> +		sysunlockfile(Bfildes(j->b));
>  		syslog(0, runqlog, "out of procs");
>  		exits(0);
>  	case 0:
>  		if(debug) {
> -			fprint(2, "Starting %s", cmd);
> -			for(ac = 0; av[ac]; ac++)
> -				fprint(2, " %s", av[ac]);
> -			fprint(2, "\n");
> +			fprint(2, "Starting %s\n", cmd);
> +//			for(i = 0; j->av[i]; i++)
> +//				fprint(2, " %s", j->av[i]);
> +//			fprint(2, "\n");
>  		}
> -		logit("execing", dp->name, av);
> +		logit("execing", dp->name, j->av);
>  		close(0);
> -		dup(dfd, 0);
> -		close(dfd);
> +		dup(j->dfd, 0);
> +		close(j->dfd);
>  		close(2);
>  		efd = open(file(dp->name, 'E'), OWRITE);
>  		if(efd < 0){
> -			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'),
> getuser());
> +			if(debug)
> +				syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
>  			efd = create(file(dp->name, 'E'), OWRITE, 0666);
>  			if(efd < 0){
>  				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'),
> getuser());
> @@ -494,50 +541,62 @@
>  			}
>  		}
>  		seek(efd, 0, 2);
> -		exec(cmd, av);
> +		exec(cmd, j->av);
>  		error("can't exec %s", cmd);
>  		break;
>  	default:
> -		for(;;){
> -			wm = wait();
> -			if(wm == nil)
> -				error("wait failed: %r", "");
> -			if(wm->pid == pid)
> -				break;
> -			free(wm);
> -		}
> -		if(debug)
> -			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
> -
> -		if(wm->msg[0]){
> -			if(debug)
> -				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
> -			if(!Rflag && strstr(wm->msg, "Retry")==0){
> -				/* return the message and remove it */
> -				if(returnmail(av, dp->name, wm->msg) != 0)
> -					logit("returnmail failed", dp->name, av);
> -				remmatch(dp->name);
> -			} else {
> -				/* add sys to bad list and try again later */
> -				nbad++;
> -				badsys = realloc(badsys, nbad*sizeof(char*));
> -				badsys[nbad-1] = strdup(av[3]);
> -			}
> -		} else {
> -			/* it worked remove the message */
> -			remmatch(dp->name);
> -		}
> -		free(wm);
> -
> +		return j;
>  	}
>  done:
> -	if (l)
> -		sysunlock(l);
> -	Bterm(b);
> -	sysunlockfile(Bfildes(b));
> -	free(buf);
> -	free(av);
> -	close(dfd);
> +	freejob(j);
> +	return nil;
> +}
> +
> +Job*
> +donefile(Job *j, Waitmsg *wm)
> +{
> +	Job *n;
> +
> +	if(debug)
> +		fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
> +
> +	if(wm->msg[0]){
> +		if(debug)
> +			fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
> +		if(!Rflag && strstr(wm->msg, "Retry")==0){
> +			/* return the message and remove it */
> +			if(returnmail(j->av, j->dp->name, wm->msg) != 0)
> +				logit("returnmail failed", j->dp->name, j->av);
> +			remmatch(j->dp->name);
> +		} else {
> +			/* add sys to bad list and try again later */
> +			nbad++;
> +			badsys = realloc(badsys, nbad*sizeof(char*));
> +			badsys[nbad-1] = strdup(j->av[3]);
> +		}
> +	} else {
> +		/* it worked remove the message */
> +		remmatch(j->dp->name);
> +	}
> +	n = j->next;
> +	freejob(j);
> +	return n;
> +}
> +
> +void
> +freejob(Job *j)
> +{
> +	if(j->b != nil){
> +		sysunlockfile(Bfildes(j->b));
> +		Bterm(j->b);
> +	}
> +	if(j->dfd != -1)
> +		close(j->dfd);
> +	if(j->l != nil)
> +		sysunlock(j->l);
> +	free(j->buf);
> +	free(j->av);
> +	free(j);
>  }
>
>
>

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [9front] upas/runq: parallel sending
  2021-01-10  2:27 [9front] upas/runq: parallel sending ori
  2021-01-10  8:04 ` hiro
@ 2021-01-17  0:42 ` ori
  2021-01-17  1:06   ` ori
  1 sibling, 1 reply; 4+ messages in thread
From: ori @ 2021-01-17  0:42 UTC (permalink / raw)
  To: 9front

I've been running the previous patch for the
last week without problems: has anyone else
had a chance to test that it doesn't break
their systems?

Quoth ori@eigenstate.org:
> I'd also like to float the idea of removing the '-a'
> flag, and reclaiming the '-n' flag for this purpose.
> If anyone is runnign upas/runq with -a, I propose
> this alternative:
> 

Here's a more agressive version that removes
the multi-user queue sweeping code. I doubt
that anyone is using it. If they are, I'd be
interested in knowing if there's an issue with
migrating to xargs or a shell loop that runs
each job in parallel by directory.

If there are no objections, I'm going to let
this version bake for a week on my own system,
and then commit next weekend.

diff -r 24a38dd884b0 sys/src/cmd/upas/q/runq.c
--- a/sys/src/cmd/upas/q/runq.c	Sat Jan 16 16:17:27 2021 -0800
+++ b/sys/src/cmd/upas/q/runq.c	Sat Jan 16 16:40:41 2021 -0800
@@ -1,9 +1,25 @@
 #include "common.h"
 #include <ctype.h>
 
+typedef struct Job Job;
+
+struct Job {
+	Job	*next;
+	int	pid;
+	int	ac;
+	int	dfd;
+	char	**av;
+	char	*buf;	/* backing for av */
+	Dir	*dp;	/* not owned */
+	Mlock	*l;
+	Biobuf	*b;
+};
+
 void	doalldirs(void);
 void	dodir(char*);
-void	dofile(Dir*);
+Job*	dofile(Dir*);
+Job*	donefile(Job*, Waitmsg*);
+void	freejob(Job*);
 void	rundir(char*);
 char*	file(char*, char);
 void	warning(char*, void*);
@@ -17,7 +33,6 @@
 char	*root;
 int	debug;
 int	giveup = 2*24*60*60;
-int	load;
 int	limit;
 
 /* the current directory */
@@ -28,67 +43,48 @@
 
 char *runqlog = "runq";
 
-int	*pidlist;
 char	**badsys;		/* array of recalcitrant systems */
 int	nbad;
-int	npid = 50;
-int	sflag;			/* single thread per directory */
-int	aflag;			/* all directories */
+int	njob = 1;		/* number of concurrent jobs to invoke */
 int	Eflag;			/* ignore E.xxxxxx dates */
 int	Rflag;			/* no giving up, ever */
 
 void
 usage(void)
 {
-	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
+	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] [-p njobs] q-root cmd\n");
 	exits("");
 }
 
 void
 main(int argc, char **argv)
 {
-	char *qdir, *x;
+	char *qdir;
 
 	qdir = 0;
 
 	ARGBEGIN{
-	case 'l':
-		x = ARGF();
-		if(x == 0)
-			usage();
-		load = atoi(x);
-		if(load < 0)
-			load = 0;
-		break;
 	case 'E':
 		Eflag++;
 		break;
 	case 'R':	/* no giving up -- just leave stuff in the queue */
 		Rflag++;
 		break;
-	case 'a':
-		aflag++;
-		break;
 	case 'd':
 		debug++;
 		break;
 	case 'r':
-		limit = atoi(ARGF());
-		break;
-	case 's':
-		sflag++;
+		limit = atoi(EARGF(usage()));
 		break;
 	case 't':
-		giveup = 60*60*atoi(ARGF());
+		giveup = 60*60*atoi(EARGF(usage()));
 		break;
 	case 'q':
-		qdir = ARGF();
-		if(qdir == 0)
-			usage();
+		qdir = EARGF(usage());
 		break;
 	case 'n':
-		npid = atoi(ARGF());
-		if(npid == 0)
+		njob = atoi(EARGF(usage()));
+		if(njob == 0)
 			usage();
 		break;
 	}ARGEND;
@@ -96,27 +92,17 @@
 	if(argc != 2)
 		usage();
 
-	pidlist = malloc(npid*sizeof(*pidlist));
-	if(pidlist == 0)
-		error("can't malloc", 0);
-
-	if(aflag == 0 && qdir == 0) {
+	if(qdir == nil) 
 		qdir = getuser();
-		if(qdir == 0)
-			error("unknown user", 0);
-	}
+	if(qdir == nil)
+		error("unknown user", 0);
 	root = argv[0];
 	cmd = argv[1];
 
 	if(chdir(root) < 0)
 		error("can't cd to %s", root);
 
-	doload(1);
-	if(aflag)
-		doalldirs();
-	else
-		dodir(qdir);
-	doload(0);
+	dodir(qdir);
 	exits(0);
 }
 
@@ -142,74 +128,6 @@
 	return 0;
 }
 
-int
-forkltd(void)
-{
-	int i;
-	int pid;
-
-	for(i = 0; i < npid; i++){
-		if(pidlist[i] <= 0)
-			break;
-	}
-
-	while(i >= npid){
-		pid = waitpid();
-		if(pid < 0){
-			syslog(0, runqlog, "forkltd confused");
-			exits(0);
-		}
-
-		for(i = 0; i < npid; i++)
-			if(pidlist[i] == pid)
-				break;
-	}
-	pidlist[i] = fork();
-	return pidlist[i];
-}
-
-/*
- *  run all user directories, must be bootes (or root on unix) to do this
- */
-void
-doalldirs(void)
-{
-	Dir *db;
-	int fd;
-	long i, n;
-
-
-	fd = open(".", OREAD);
-	if(fd == -1){
-		warning("reading %s", root);
-		return;
-	}
-	n = dirreadall(fd, &db);
-	if(n > 0){
-		for(i=0; i<n; i++){
-			if(db[i].qid.type & QTDIR){
-				if(emptydir(db[i].name))
-					continue;
-				switch(forkltd()){
-				case -1:
-					syslog(0, runqlog, "out of procs");
-					doload(0);
-					exits(0);
-				case 0:
-					if(sysdetach() < 0)
-						error("%r", 0);
-					dodir(db[i].name);
-					exits(0);
-				default:
-					break;
-				}
-			}
-		}
-		free(db);
-	}
-	close(fd);
-}
-
 /*
  *  cd to a user directory and run it
  */
@@ -234,30 +152,57 @@
 void
 rundir(char *name)
 {
-	int fd;
-	long i;
+	Job *hd, *j, **p;
+	int nlive, fidx, fd, found;
+	Waitmsg *w;
 
-	if(aflag && sflag)
-		fd = sysopenlocked(".", OREAD);
-	else
-		fd = open(".", OREAD);
+	fd = open(".", OREAD);
 	if(fd == -1){
 		warning("reading %s", name);
 		return;
 	}
+	fidx= 0;
+	hd = nil;
+	nlive = 0;
 	nfiles = dirreadall(fd, &dirbuf);
-	if(nfiles > 0){
-		for(i=0; i<nfiles; i++){
-			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
+	while(nlive > 0 ||  fidx< nfiles){
+		while(fidx< nfiles && nlive < njob){
+			if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
+				fidx++;
 				continue;
-			dofile(&dirbuf[i]);
+			}
+			if((j = dofile(&dirbuf[fidx])) != nil){
+				nlive++;
+				j->next = hd;
+				hd = j;
+			}
+			fidx++;
 		}
-		free(dirbuf);
+		if(nlive == 0){
+			fprint(2, "nothing live\n");
+			break;
+		}
+rescan:
+		if((w = wait()) == nil){
+			syslog(0, "runq", "wait error: %r");
+			break;
+		}
+		found = 0;
+		for(p = &hd; *p != nil; p = &(*p)->next){
+			if(w->pid == (*p)->pid){
+				*p = donefile(*p, w);
+				found++;
+				nlive--;
+				break;
+			}
+		}
+		free(w);
+		if(!found)
+			goto rescan;
 	}
-	if(aflag && sflag)
-		sysunlockfile(fd);
-	else
-		close(fd);
+	assert(hd == nil);
+	free(dirbuf);
+	close(fd);
 }
 
 /*
@@ -316,15 +261,14 @@
 /*
  *  try a message
  */
-void
+Job*
 dofile(Dir *dp)
 {
+	int dtime, efd, i, etime;
+	Job *j;
+//	Mlock *l;
 	Dir *d;
-	int dfd, ac, dtime, efd, pid, i, etime;
-	char *buf, *cp, **av;
-	Waitmsg *wm;
-	Biobuf *b;
-	Mlock *l = nil;
+	char *cp;
 
 	if(debug)
 		fprint(2, "dofile %s\n", dp->name);
@@ -337,14 +281,14 @@
 	if(d == nil){
 		syslog(0, runqlog, "no data file for %s", dp->name);
 		remmatch(dp->name);
-		return;
+		return nil;
 	}
 	if(dp->length == 0){
 		if(time(0)-dp->mtime > 15*60){
 			syslog(0, runqlog, "empty ctl file for %s", dp->name);
 			remmatch(dp->name);
 		}
-		return;
+		return nil;
 	}
 	dtime = d->mtime;
 	free(d);
@@ -358,31 +302,35 @@
 		if(etime - dtime < 60*60){
 			/* up to the first hour, try every 15 minutes */
 			if(time(0) - etime < 15*60)
-				return;
+				return nil;
 		} else {
 			/* after the first hour, try once an hour */
 			if(time(0) - etime < 60*60)
-				return;
+				return nil;
 		}
-
 	}
 
 	/*
 	 *  open control and data
 	 */
-	b = sysopen(file(dp->name, 'C'), "rl", 0660);
-	if(b == 0) {
+	j = malloc(sizeof(Job));
+	if(j == nil)
+		return nil;
+	memset(j, 0, sizeof(Job));
+	j->dp = dp;
+	j->dfd = -1;
+	j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
+	if(j->b == 0) {
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
-		return;
+		return nil;
 	}
-	dfd = open(file(dp->name, 'D'), OREAD);
-	if(dfd < 0){
+	j->dfd = open(file(dp->name, 'D'), OREAD);
+	if(j->dfd < 0){
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
 
 	/*
@@ -390,48 +338,36 @@
 	 *	- read args into (malloc'd) buffer
 	 *	- malloc a vector and copy pointers to args into it
 	 */
-	buf = malloc(dp->length+1);
-	if(buf == 0){
+
+	j->buf = malloc(dp->length+1);
+	if(j->buf == nil){
 		warning("buffer allocation", 0);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		return;
+		freejob(j);
+		return nil;
 	}
-	if(Bread(b, buf, dp->length) != dp->length){
+	if(Bread(j->b, j->buf, dp->length) != dp->length){
 		warning("reading control file %s\n", dp->name);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		free(buf);
-		return;
+		freejob(j);
+		return nil;
 	}
-	buf[dp->length] = 0;
-	av = malloc(2*sizeof(char*));
-	if(av == 0){
+	j->buf[dp->length] = 0;
+	j->av = malloc(2*sizeof(char*));
+	if(j->av == 0){
 		warning("argv allocation", 0);
-		close(dfd);
-		free(buf);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
-	for(ac = 1, cp = buf; *cp; ac++){
+	for(j->ac = 1, cp = j->buf; *cp; j->ac++){
 		while(isspace(*cp))
 			*cp++ = 0;
 		if(*cp == 0)
 			break;
 
-		av = realloc(av, (ac+2)*sizeof(char*));
-		if(av == 0){
+		j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
+		if(j->av == 0){
 			warning("argv allocation", 0);
-			close(dfd);
-			free(buf);
-			Bterm(b);
-			sysunlockfile(Bfildes(b));
-			return;
 		}
-		av[ac] = cp;
+		j->av[j->ac] = cp;
 		while(*cp && !isspace(*cp)){
 			if(*cp++ == '"'){
 				while(*cp && *cp != '"')
@@ -441,18 +377,18 @@
 			}
 		}
 	}
-	av[0] = cmd;
-	av[ac] = 0;
+	j->av[0] = cmd;
+	j->av[j->ac] = 0;
 
 	if(!Eflag &&time(0) - dtime > giveup){
-		if(returnmail(av, dp->name, "Giveup") != 0)
-			logit("returnmail failed", dp->name, av);
+		if(returnmail(j->av, dp->name, "Giveup") != 0)
+			logit("returnmail failed", dp->name, j->av);
 		remmatch(dp->name);
 		goto done;
 	}
 
 	for(i = 0; i < nbad; i++){
-		if(strcmp(av[3], badsys[i]) == 0)
+		if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
 			goto done;
 	}
 
@@ -460,33 +396,34 @@
 	 * Ken's fs, for example, gives us 5 minutes of inactivity before
 	 * the lock goes stale, so we have to keep reading it.
  	 */
-	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
+	j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
 
 	/*
 	 *  transfer
 	 */
-	pid = fork();
-	switch(pid){
+	j->pid = fork();
+	switch(j->pid){
 	case -1:
-		sysunlock(l);
-		sysunlockfile(Bfildes(b));
+		sysunlock(j->l);
+		sysunlockfile(Bfildes(j->b));
 		syslog(0, runqlog, "out of procs");
 		exits(0);
 	case 0:
 		if(debug) {
-			fprint(2, "Starting %s", cmd);
-			for(ac = 0; av[ac]; ac++)
-				fprint(2, " %s", av[ac]);
-			fprint(2, "\n");
+			fprint(2, "Starting %s\n", cmd);
+//			for(i = 0; j->av[i]; i++)
+//				fprint(2, " %s", j->av[i]);
+//			fprint(2, "\n");
 		}
-		logit("execing", dp->name, av);
+		logit("execing", dp->name, j->av);
 		close(0);
-		dup(dfd, 0);
-		close(dfd);
+		dup(j->dfd, 0);
+		close(j->dfd);
 		close(2);
 		efd = open(file(dp->name, 'E'), OWRITE);
 		if(efd < 0){
-			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
+			if(debug)
+				syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
 			efd = create(file(dp->name, 'E'), OWRITE, 0666);
 			if(efd < 0){
 				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
@@ -494,50 +431,62 @@
 			}
 		}
 		seek(efd, 0, 2);
-		exec(cmd, av);
+		exec(cmd, j->av);
 		error("can't exec %s", cmd);
 		break;
 	default:
-		for(;;){
-			wm = wait();
-			if(wm == nil)
-				error("wait failed: %r", "");
-			if(wm->pid == pid)
-				break;
-			free(wm);
-		}
-		if(debug)
-			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
-
-		if(wm->msg[0]){
-			if(debug)
-				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
-			if(!Rflag && strstr(wm->msg, "Retry")==0){
-				/* return the message and remove it */
-				if(returnmail(av, dp->name, wm->msg) != 0)
-					logit("returnmail failed", dp->name, av);
-				remmatch(dp->name);
-			} else {
-				/* add sys to bad list and try again later */
-				nbad++;
-				badsys = realloc(badsys, nbad*sizeof(char*));
-				badsys[nbad-1] = strdup(av[3]);
-			}
-		} else {
-			/* it worked remove the message */
-			remmatch(dp->name);
-		}
-		free(wm);
-
+		return j;
 	}
 done:
-	if (l)
-		sysunlock(l);
-	Bterm(b);
-	sysunlockfile(Bfildes(b));
-	free(buf);
-	free(av);
-	close(dfd);
+	freejob(j);
+	return nil;
+}
+
+Job*
+donefile(Job *j, Waitmsg *wm)
+{
+	Job *n;
+
+	if(debug)
+		fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
+
+	if(wm->msg[0]){
+		if(debug)
+			fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
+		if(!Rflag && strstr(wm->msg, "Retry")==0){
+			/* return the message and remove it */
+			if(returnmail(j->av, j->dp->name, wm->msg) != 0)
+				logit("returnmail failed", j->dp->name, j->av);
+			remmatch(j->dp->name);
+		} else {
+			/* add sys to bad list and try again later */
+			nbad++;
+			badsys = realloc(badsys, nbad*sizeof(char*));
+			badsys[nbad-1] = strdup(j->av[3]);
+		}
+	} else {
+		/* it worked remove the message */
+		remmatch(j->dp->name);
+	}
+	n = j->next;
+	freejob(j);
+	return n;
+}
+
+void
+freejob(Job *j)
+{
+	if(j->b != nil){
+		sysunlockfile(Bfildes(j->b));
+		Bterm(j->b);
+	}
+	if(j->dfd != -1)
+		close(j->dfd);
+	if(j->l != nil)
+		sysunlock(j->l);
+	free(j->buf);
+	free(j->av);
+	free(j);
 }
 
 
@@ -691,71 +640,3 @@
 	}
 	syslog(0, runqlog, "%s", buf);
 }
-
-char *loadfile = ".runqload";
-
-/*
- *  load balancing
- */
-void
-doload(int start)
-{
-	int fd;
-	char buf[32];
-	int i, n;
-	Mlock *l;
-	Dir *d;
-
-	if(load <= 0)
-		return;
-
-	if(chdir(root) < 0){
-		load = 0;
-		return;
-	}
-
-	l = syslock(loadfile);
-	fd = open(loadfile, ORDWR);
-	if(fd < 0){
-		fd = create(loadfile, 0666, ORDWR);
-		if(fd < 0){
-			load = 0;
-			sysunlock(l);
-			return;
-		}
-	}
-
-	/* get current load */
-	i = 0;
-	n = read(fd, buf, sizeof(buf)-1);
-	if(n >= 0){
-		buf[n] = 0;
-		i = atoi(buf);
-	}
-	if(i < 0)
-		i = 0;
-
-	/* ignore load if file hasn't been changed in 30 minutes */
-	d = dirfstat(fd);
-	if(d != nil){
-		if(d->mtime + 30*60 < time(0))
-			i = 0;
-		free(d);
-	}
-
-	/* if load already too high, give up */
-	if(start && i >= load){
-		sysunlock(l);
-		exits(0);
-	}
-
-	/* increment/decrement load */
-	if(start)
-		i++;
-	else
-		i--;
-	seek(fd, 0, 0);
-	fprint(fd, "%d\n", i);
-	sysunlock(l);
-	close(fd);
-}

^ permalink raw reply	[flat|nested] 4+ messages in thread

* Re: [9front] upas/runq: parallel sending
  2021-01-17  0:42 ` ori
@ 2021-01-17  1:06   ` ori
  0 siblings, 0 replies; 4+ messages in thread
From: ori @ 2021-01-17  1:06 UTC (permalink / raw)
  To: 9front

Quoth ori@eigenstate.org:
> I've been running the previous patch for the
> last week without problems: has anyone else
> had a chance to test that it doesn't break
> their systems?
> 
> Quoth ori@eigenstate.org:
> > I'd also like to float the idea of removing the '-a'
> > flag, and reclaiming the '-n' flag for this purpose.
> > If anyone is runnign upas/runq with -a, I propose
> > this alternative:
> > 
> 
> Here's a more agressive version that removes
> the multi-user queue sweeping code. I doubt
> that anyone is using it. If they are, I'd be
> interested in knowing if there's an issue with
> migrating to xargs or a shell loop that runs
> each job in parallel by directory.
> 
> If there are no objections, I'm going to let
> this version bake for a week on my own system,
> and then commit next weekend.

And, another update with some commented
code removed and docs updated.


diff -r 24a38dd884b0 sys/man/8/qer
--- a/sys/man/8/qer	Sat Jan 16 16:17:27 2021 -0800
+++ b/sys/man/8/qer	Sat Jan 16 17:05:12 2021 -0800
@@ -15,7 +15,7 @@
 .br
 .B runq
 [
-.B -adsER
+.B -dER
 ]
 [
 .B -f
@@ -26,10 +26,6 @@
 .I subdir
 ]
 [
-.B -l
-.I load
-]
-[
 .B -t
 .I time
 ]
@@ -39,7 +35,7 @@
 ]
 [
 .B -n
-.I nprocs
+.I njobs
 ]
 .I root cmd
 .SH DESCRIPTION
@@ -84,10 +80,7 @@
 .I Runq
 processes the files queued by
 .IR qer .
-Without the
-.B -a
-option,
-.I runq
+.I Runq
 processes all requests in the directory
 .IR root / subdir ,
 where
@@ -96,9 +89,6 @@
 .B -q
 if present, else the contents of
 .BR /dev/user .
-With the
-.B -a
-it processes all requests.
 Each request is processed by executing the command
 .I cmd
 with the contents of the control file as its arguments,
@@ -172,31 +162,12 @@
 .I -q
 flag.
 .P
-The
-.BR -s ,
-.BR -n ,
-and
-.B -l
-flags are only meaningful with the
-.B -a
-flag.  They control amount of parallelism that
-is used when sweeping all of the queues.  The argument following the
+The argument following the
 .B -n
-flag specifies the number of queues that are swept
-in parallel; the default is 50.  The argument following the
-.B -l
-flag specifies the total number of queues that are being swept.
-By default, there is no limit.  The number of active sweeps
-is cumulative over all active executions of
-.IR runq .
-The
-.B -s
-flag forces each queue directory to be processed by exactly
-one instance of
-.IR runq .
-This is useful on systems that connect to slow
-external systems and prevents all the queue sweeps from
-piling up trying to process a few slow systems.
+flag specifies the number of queued jobs that are processed
+in parallel from the queue; the default is 1.
+This is useful for a large queue to be processed with a bounded
+amount of parallelism.
 .PP
 .I Runq
 is often called from
diff -r 24a38dd884b0 sys/src/cmd/upas/q/runq.c
--- a/sys/src/cmd/upas/q/runq.c	Sat Jan 16 16:17:27 2021 -0800
+++ b/sys/src/cmd/upas/q/runq.c	Sat Jan 16 17:05:12 2021 -0800
@@ -1,9 +1,25 @@
 #include "common.h"
 #include <ctype.h>
 
+typedef struct Job Job;
+
+struct Job {
+	Job	*next;
+	int	pid;
+	int	ac;
+	int	dfd;
+	char	**av;
+	char	*buf;	/* backing for av */
+	Dir	*dp;	/* not owned */
+	Mlock	*l;
+	Biobuf	*b;
+};
+
 void	doalldirs(void);
 void	dodir(char*);
-void	dofile(Dir*);
+Job*	dofile(Dir*);
+Job*	donefile(Job*, Waitmsg*);
+void	freejob(Job*);
 void	rundir(char*);
 char*	file(char*, char);
 void	warning(char*, void*);
@@ -17,7 +33,6 @@
 char	*root;
 int	debug;
 int	giveup = 2*24*60*60;
-int	load;
 int	limit;
 
 /* the current directory */
@@ -28,67 +43,48 @@
 
 char *runqlog = "runq";
 
-int	*pidlist;
 char	**badsys;		/* array of recalcitrant systems */
 int	nbad;
-int	npid = 50;
-int	sflag;			/* single thread per directory */
-int	aflag;			/* all directories */
+int	njob = 1;		/* number of concurrent jobs to invoke */
 int	Eflag;			/* ignore E.xxxxxx dates */
 int	Rflag;			/* no giving up, ever */
 
 void
 usage(void)
 {
-	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
-	exits("");
+	fprint(2, "usage: runq [-dE] [-q dir] [-l load] [-t time] [-r nfiles] [-n nprocs] q-root cmd\n");
+	exits("usage");
 }
 
 void
 main(int argc, char **argv)
 {
-	char *qdir, *x;
+	char *qdir;
 
 	qdir = 0;
 
 	ARGBEGIN{
-	case 'l':
-		x = ARGF();
-		if(x == 0)
-			usage();
-		load = atoi(x);
-		if(load < 0)
-			load = 0;
-		break;
 	case 'E':
 		Eflag++;
 		break;
 	case 'R':	/* no giving up -- just leave stuff in the queue */
 		Rflag++;
 		break;
-	case 'a':
-		aflag++;
-		break;
 	case 'd':
 		debug++;
 		break;
 	case 'r':
-		limit = atoi(ARGF());
-		break;
-	case 's':
-		sflag++;
+		limit = atoi(EARGF(usage()));
 		break;
 	case 't':
-		giveup = 60*60*atoi(ARGF());
+		giveup = 60*60*atoi(EARGF(usage()));
 		break;
 	case 'q':
-		qdir = ARGF();
-		if(qdir == 0)
-			usage();
+		qdir = EARGF(usage());
 		break;
 	case 'n':
-		npid = atoi(ARGF());
-		if(npid == 0)
+		njob = atoi(EARGF(usage()));
+		if(njob == 0)
 			usage();
 		break;
 	}ARGEND;
@@ -96,27 +92,17 @@
 	if(argc != 2)
 		usage();
 
-	pidlist = malloc(npid*sizeof(*pidlist));
-	if(pidlist == 0)
-		error("can't malloc", 0);
-
-	if(aflag == 0 && qdir == 0) {
+	if(qdir == nil) 
 		qdir = getuser();
-		if(qdir == 0)
-			error("unknown user", 0);
-	}
+	if(qdir == nil)
+		error("unknown user", 0);
 	root = argv[0];
 	cmd = argv[1];
 
 	if(chdir(root) < 0)
 		error("can't cd to %s", root);
 
-	doload(1);
-	if(aflag)
-		doalldirs();
-	else
-		dodir(qdir);
-	doload(0);
+	dodir(qdir);
 	exits(0);
 }
 
@@ -142,74 +128,6 @@
 	return 0;
 }
 
-int
-forkltd(void)
-{
-	int i;
-	int pid;
-
-	for(i = 0; i < npid; i++){
-		if(pidlist[i] <= 0)
-			break;
-	}
-
-	while(i >= npid){
-		pid = waitpid();
-		if(pid < 0){
-			syslog(0, runqlog, "forkltd confused");
-			exits(0);
-		}
-
-		for(i = 0; i < npid; i++)
-			if(pidlist[i] == pid)
-				break;
-	}
-	pidlist[i] = fork();
-	return pidlist[i];
-}
-
-/*
- *  run all user directories, must be bootes (or root on unix) to do this
- */
-void
-doalldirs(void)
-{
-	Dir *db;
-	int fd;
-	long i, n;
-
-
-	fd = open(".", OREAD);
-	if(fd == -1){
-		warning("reading %s", root);
-		return;
-	}
-	n = dirreadall(fd, &db);
-	if(n > 0){
-		for(i=0; i<n; i++){
-			if(db[i].qid.type & QTDIR){
-				if(emptydir(db[i].name))
-					continue;
-				switch(forkltd()){
-				case -1:
-					syslog(0, runqlog, "out of procs");
-					doload(0);
-					exits(0);
-				case 0:
-					if(sysdetach() < 0)
-						error("%r", 0);
-					dodir(db[i].name);
-					exits(0);
-				default:
-					break;
-				}
-			}
-		}
-		free(db);
-	}
-	close(fd);
-}
-
 /*
  *  cd to a user directory and run it
  */
@@ -234,30 +152,57 @@
 void
 rundir(char *name)
 {
-	int fd;
-	long i;
+	Job *hd, *j, **p;
+	int nlive, fidx, fd, found;
+	Waitmsg *w;
 
-	if(aflag && sflag)
-		fd = sysopenlocked(".", OREAD);
-	else
-		fd = open(".", OREAD);
+	fd = open(".", OREAD);
 	if(fd == -1){
 		warning("reading %s", name);
 		return;
 	}
+	fidx= 0;
+	hd = nil;
+	nlive = 0;
 	nfiles = dirreadall(fd, &dirbuf);
-	if(nfiles > 0){
-		for(i=0; i<nfiles; i++){
-			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
+	while(nlive > 0 ||  fidx< nfiles){
+		while(fidx< nfiles && nlive < njob){
+			if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
+				fidx++;
 				continue;
-			dofile(&dirbuf[i]);
+			}
+			if((j = dofile(&dirbuf[fidx])) != nil){
+				nlive++;
+				j->next = hd;
+				hd = j;
+			}
+			fidx++;
 		}
-		free(dirbuf);
+		if(nlive == 0){
+			fprint(2, "nothing live\n");
+			break;
+		}
+rescan:
+		if((w = wait()) == nil){
+			syslog(0, "runq", "wait error: %r");
+			break;
+		}
+		found = 0;
+		for(p = &hd; *p != nil; p = &(*p)->next){
+			if(w->pid == (*p)->pid){
+				*p = donefile(*p, w);
+				found++;
+				nlive--;
+				break;
+			}
+		}
+		free(w);
+		if(!found)
+			goto rescan;
 	}
-	if(aflag && sflag)
-		sysunlockfile(fd);
-	else
-		close(fd);
+	assert(hd == nil);
+	free(dirbuf);
+	close(fd);
 }
 
 /*
@@ -316,15 +261,13 @@
 /*
  *  try a message
  */
-void
+Job*
 dofile(Dir *dp)
 {
+	int dtime, efd, i, etime;
+	Job *j;
 	Dir *d;
-	int dfd, ac, dtime, efd, pid, i, etime;
-	char *buf, *cp, **av;
-	Waitmsg *wm;
-	Biobuf *b;
-	Mlock *l = nil;
+	char *cp;
 
 	if(debug)
 		fprint(2, "dofile %s\n", dp->name);
@@ -337,14 +280,14 @@
 	if(d == nil){
 		syslog(0, runqlog, "no data file for %s", dp->name);
 		remmatch(dp->name);
-		return;
+		return nil;
 	}
 	if(dp->length == 0){
 		if(time(0)-dp->mtime > 15*60){
 			syslog(0, runqlog, "empty ctl file for %s", dp->name);
 			remmatch(dp->name);
 		}
-		return;
+		return nil;
 	}
 	dtime = d->mtime;
 	free(d);
@@ -358,31 +301,35 @@
 		if(etime - dtime < 60*60){
 			/* up to the first hour, try every 15 minutes */
 			if(time(0) - etime < 15*60)
-				return;
+				return nil;
 		} else {
 			/* after the first hour, try once an hour */
 			if(time(0) - etime < 60*60)
-				return;
+				return nil;
 		}
-
 	}
 
 	/*
 	 *  open control and data
 	 */
-	b = sysopen(file(dp->name, 'C'), "rl", 0660);
-	if(b == 0) {
+	j = malloc(sizeof(Job));
+	if(j == nil)
+		return nil;
+	memset(j, 0, sizeof(Job));
+	j->dp = dp;
+	j->dfd = -1;
+	j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
+	if(j->b == 0) {
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
-		return;
+		return nil;
 	}
-	dfd = open(file(dp->name, 'D'), OREAD);
-	if(dfd < 0){
+	j->dfd = open(file(dp->name, 'D'), OREAD);
+	if(j->dfd < 0){
 		if(debug)
 			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
 
 	/*
@@ -390,48 +337,36 @@
 	 *	- read args into (malloc'd) buffer
 	 *	- malloc a vector and copy pointers to args into it
 	 */
-	buf = malloc(dp->length+1);
-	if(buf == 0){
+
+	j->buf = malloc(dp->length+1);
+	if(j->buf == nil){
 		warning("buffer allocation", 0);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		return;
+		freejob(j);
+		return nil;
 	}
-	if(Bread(b, buf, dp->length) != dp->length){
+	if(Bread(j->b, j->buf, dp->length) != dp->length){
 		warning("reading control file %s\n", dp->name);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		close(dfd);
-		free(buf);
-		return;
+		freejob(j);
+		return nil;
 	}
-	buf[dp->length] = 0;
-	av = malloc(2*sizeof(char*));
-	if(av == 0){
+	j->buf[dp->length] = 0;
+	j->av = malloc(2*sizeof(char*));
+	if(j->av == 0){
 		warning("argv allocation", 0);
-		close(dfd);
-		free(buf);
-		Bterm(b);
-		sysunlockfile(Bfildes(b));
-		return;
+		freejob(j);
+		return nil;
 	}
-	for(ac = 1, cp = buf; *cp; ac++){
+	for(j->ac = 1, cp = j->buf; *cp; j->ac++){
 		while(isspace(*cp))
 			*cp++ = 0;
 		if(*cp == 0)
 			break;
 
-		av = realloc(av, (ac+2)*sizeof(char*));
-		if(av == 0){
+		j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
+		if(j->av == 0){
 			warning("argv allocation", 0);
-			close(dfd);
-			free(buf);
-			Bterm(b);
-			sysunlockfile(Bfildes(b));
-			return;
 		}
-		av[ac] = cp;
+		j->av[j->ac] = cp;
 		while(*cp && !isspace(*cp)){
 			if(*cp++ == '"'){
 				while(*cp && *cp != '"')
@@ -441,18 +376,18 @@
 			}
 		}
 	}
-	av[0] = cmd;
-	av[ac] = 0;
+	j->av[0] = cmd;
+	j->av[j->ac] = 0;
 
 	if(!Eflag &&time(0) - dtime > giveup){
-		if(returnmail(av, dp->name, "Giveup") != 0)
-			logit("returnmail failed", dp->name, av);
+		if(returnmail(j->av, dp->name, "Giveup") != 0)
+			logit("returnmail failed", dp->name, j->av);
 		remmatch(dp->name);
 		goto done;
 	}
 
 	for(i = 0; i < nbad; i++){
-		if(strcmp(av[3], badsys[i]) == 0)
+		if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
 			goto done;
 	}
 
@@ -460,33 +395,34 @@
 	 * Ken's fs, for example, gives us 5 minutes of inactivity before
 	 * the lock goes stale, so we have to keep reading it.
  	 */
-	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
+	j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
 
 	/*
 	 *  transfer
 	 */
-	pid = fork();
-	switch(pid){
+	j->pid = fork();
+	switch(j->pid){
 	case -1:
-		sysunlock(l);
-		sysunlockfile(Bfildes(b));
+		sysunlock(j->l);
+		sysunlockfile(Bfildes(j->b));
 		syslog(0, runqlog, "out of procs");
 		exits(0);
 	case 0:
 		if(debug) {
-			fprint(2, "Starting %s", cmd);
-			for(ac = 0; av[ac]; ac++)
-				fprint(2, " %s", av[ac]);
+			fprint(2, "Starting %s\n", cmd);
+			for(i = 0; j->av[i]; i++)
+				fprint(2, " %s", j->av[i]);
 			fprint(2, "\n");
 		}
-		logit("execing", dp->name, av);
+		logit("execing", dp->name, j->av);
 		close(0);
-		dup(dfd, 0);
-		close(dfd);
+		dup(j->dfd, 0);
+		close(j->dfd);
 		close(2);
 		efd = open(file(dp->name, 'E'), OWRITE);
 		if(efd < 0){
-			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
+			if(debug)
+				syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
 			efd = create(file(dp->name, 'E'), OWRITE, 0666);
 			if(efd < 0){
 				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'), getuser());
@@ -494,50 +430,62 @@
 			}
 		}
 		seek(efd, 0, 2);
-		exec(cmd, av);
+		exec(cmd, j->av);
 		error("can't exec %s", cmd);
 		break;
 	default:
-		for(;;){
-			wm = wait();
-			if(wm == nil)
-				error("wait failed: %r", "");
-			if(wm->pid == pid)
-				break;
-			free(wm);
-		}
-		if(debug)
-			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
-
-		if(wm->msg[0]){
-			if(debug)
-				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
-			if(!Rflag && strstr(wm->msg, "Retry")==0){
-				/* return the message and remove it */
-				if(returnmail(av, dp->name, wm->msg) != 0)
-					logit("returnmail failed", dp->name, av);
-				remmatch(dp->name);
-			} else {
-				/* add sys to bad list and try again later */
-				nbad++;
-				badsys = realloc(badsys, nbad*sizeof(char*));
-				badsys[nbad-1] = strdup(av[3]);
-			}
-		} else {
-			/* it worked remove the message */
-			remmatch(dp->name);
-		}
-		free(wm);
-
+		return j;
 	}
 done:
-	if (l)
-		sysunlock(l);
-	Bterm(b);
-	sysunlockfile(Bfildes(b));
-	free(buf);
-	free(av);
-	close(dfd);
+	freejob(j);
+	return nil;
+}
+
+Job*
+donefile(Job *j, Waitmsg *wm)
+{
+	Job *n;
+
+	if(debug)
+		fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
+
+	if(wm->msg[0]){
+		if(debug)
+			fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
+		if(!Rflag && strstr(wm->msg, "Retry")==0){
+			/* return the message and remove it */
+			if(returnmail(j->av, j->dp->name, wm->msg) != 0)
+				logit("returnmail failed", j->dp->name, j->av);
+			remmatch(j->dp->name);
+		} else {
+			/* add sys to bad list and try again later */
+			nbad++;
+			badsys = realloc(badsys, nbad*sizeof(char*));
+			badsys[nbad-1] = strdup(j->av[3]);
+		}
+	} else {
+		/* it worked remove the message */
+		remmatch(j->dp->name);
+	}
+	n = j->next;
+	freejob(j);
+	return n;
+}
+
+void
+freejob(Job *j)
+{
+	if(j->b != nil){
+		sysunlockfile(Bfildes(j->b));
+		Bterm(j->b);
+	}
+	if(j->dfd != -1)
+		close(j->dfd);
+	if(j->l != nil)
+		sysunlock(j->l);
+	free(j->buf);
+	free(j->av);
+	free(j);
 }
 
 
@@ -691,71 +639,3 @@
 	}
 	syslog(0, runqlog, "%s", buf);
 }
-
-char *loadfile = ".runqload";
-
-/*
- *  load balancing
- */
-void
-doload(int start)
-{
-	int fd;
-	char buf[32];
-	int i, n;
-	Mlock *l;
-	Dir *d;
-
-	if(load <= 0)
-		return;
-
-	if(chdir(root) < 0){
-		load = 0;
-		return;
-	}
-
-	l = syslock(loadfile);
-	fd = open(loadfile, ORDWR);
-	if(fd < 0){
-		fd = create(loadfile, 0666, ORDWR);
-		if(fd < 0){
-			load = 0;
-			sysunlock(l);
-			return;
-		}
-	}
-
-	/* get current load */
-	i = 0;
-	n = read(fd, buf, sizeof(buf)-1);
-	if(n >= 0){
-		buf[n] = 0;
-		i = atoi(buf);
-	}
-	if(i < 0)
-		i = 0;
-
-	/* ignore load if file hasn't been changed in 30 minutes */
-	d = dirfstat(fd);
-	if(d != nil){
-		if(d->mtime + 30*60 < time(0))
-			i = 0;
-		free(d);
-	}
-
-	/* if load already too high, give up */
-	if(start && i >= load){
-		sysunlock(l);
-		exits(0);
-	}
-
-	/* increment/decrement load */
-	if(start)
-		i++;
-	else
-		i--;
-	seek(fd, 0, 0);
-	fprint(fd, "%d\n", i);
-	sysunlock(l);
-	close(fd);
-}

^ permalink raw reply	[flat|nested] 4+ messages in thread

end of thread, other threads:[~2021-01-17  1:36 UTC | newest]

Thread overview: 4+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2021-01-10  2:27 [9front] upas/runq: parallel sending ori
2021-01-10  8:04 ` hiro
2021-01-17  0:42 ` ori
2021-01-17  1:06   ` ori

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).