9front - general discussion about 9front
 help / color / mirror / Atom feed
From: hiro <23hiro@gmail.com>
To: 9front@9front.org
Subject: Re: [9front] upas/runq: parallel sending
Date: Sun, 10 Jan 2021 09:04:54 +0100	[thread overview]
Message-ID: <CAFSF3XPJmKOjnfWnSXbWLw+WqVwKoZdbFtbOOCEVqaq+XcpF3Q@mail.gmail.com> (raw)
In-Reply-To: <60563470E4A517681CAFF1552E11CD8E@eigenstate.org>

well it took less than 600 seconds to receive all 3 mails on the
mailing list for me.
but it seems i'm missing 2 mails for rather hours or days in the meantime...
it feels like that is not just a delay but an error any more.

On 1/10/21, ori@eigenstate.org <ori@eigenstate.org> wrote:
> As you may have noticed, you've been getting a
> bunch of test messages from the 9front list. Sorry.
> We've been shaking out some performance issues on a
> new host.
>
> The mailing list was spawning a runq process for every
> subscriber, which were contending heavily on the queue
> directory, causing high system load. On the new host,
> disks were slow enough that this was causing issues.
>
> This was fixed by paring down to a single runq process,
> but that means that sending out emails for our hundreds
> of subscribers can take a while.
>
> This change can probably use some work, but it adds
> a '-p njob' flag, which allows us to send the jobs in
> a message queue with limited parallelism. That should
> bring down the amount of time it takes for a message
> to work its way through our mailing lists.
>
> I'd also like to float the idea of removing the '-a'
> flag, and reclaiming the '-n' flag for this purpose.
> If anyone is runnign upas/runq with -a, I propose
> this alternative:
>
> fn runq-a {
> 	dirs=(/mail/queue/*)
> 	for(d in $dirs) upas/runq ... $d
> }
>
> with -n:
>
> fn runq-a {
> 	dirs=(/mail/queue/*)
> 	for(d in $dirs) upas/runq ... $d &
> 	for(d in $dirs) wait
> }
>
>
> diff -r 808eb735fc45 sys/src/cmd/upas/q/runq.c
> --- a/sys/src/cmd/upas/q/runq.c	Sat Jan 09 12:20:49 2021 -0800
> +++ b/sys/src/cmd/upas/q/runq.c	Sat Jan 09 18:15:41 2021 -0800
> @@ -1,9 +1,25 @@
>  #include "common.h"
>  #include <ctype.h>
>
> +typedef struct Job Job;
> +
> +struct Job {
> +	Job	*next;
> +	int	pid;
> +	int	ac;
> +	int	dfd;
> +	char	**av;
> +	char	*buf;	/* backing for av */
> +	Dir	*dp;	/* not owned */
> +	Mlock	*l;
> +	Biobuf	*b;
> +};
> +
>  void	doalldirs(void);
>  void	dodir(char*);
> -void	dofile(Dir*);
> +Job*	dofile(Dir*);
> +Job*	donefile(Job*, Waitmsg*);
> +void	freejob(Job*);
>  void	rundir(char*);
>  char*	file(char*, char);
>  void	warning(char*, void*);
> @@ -32,6 +48,7 @@
>  char	**badsys;		/* array of recalcitrant systems */
>  int	nbad;
>  int	npid = 50;
> +int	njob = 1;		/* number of concurrent jobs to invoke */
>  int	sflag;			/* single thread per directory */
>  int	aflag;			/* all directories */
>  int	Eflag;			/* ignore E.xxxxxx dates */
> @@ -40,7 +57,7 @@
>  void
>  usage(void)
>  {
> -	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles]
> [-n nprocs] q-root cmd\n");
> +	fprint(2, "usage: runq [-adsE] [-q dir] [-l load] [-t time] [-r nfiles]
> [-n nprocs] [-p njobs] q-root cmd\n");
>  	exits("");
>  }
>
> @@ -86,6 +103,11 @@
>  		if(qdir == 0)
>  			usage();
>  		break;
> +	case 'p':
> +		njob = atoi(ARGF());
> +		if(njob == 0)
> +			usage();
> +		break;
>  	case 'n':
>  		npid = atoi(ARGF());
>  		if(npid == 0)
> @@ -234,8 +256,9 @@
>  void
>  rundir(char *name)
>  {
> -	int fd;
> -	long i;
> +	Job *hd, *j, **p;
> +	int nlive, fidx, fd, found;
> +	Waitmsg *w;
>
>  	if(aflag && sflag)
>  		fd = sysopenlocked(".", OREAD);
> @@ -245,15 +268,47 @@
>  		warning("reading %s", name);
>  		return;
>  	}
> +	fidx= 0;
> +	hd = nil;
> +	nlive = 0;
>  	nfiles = dirreadall(fd, &dirbuf);
> -	if(nfiles > 0){
> -		for(i=0; i<nfiles; i++){
> -			if(dirbuf[i].name[0]!='C' || dirbuf[i].name[1]!='.')
> +	while(nlive > 0 ||  fidx< nfiles){
> +		while(fidx< nfiles && nlive < njob){
> +			if(strncmp(dirbuf[fidx].name, "C.", 2) != 0){
> +				fidx++;
>  				continue;
> -			dofile(&dirbuf[i]);
> +			}
> +			if((j = dofile(&dirbuf[fidx])) != nil){
> +				nlive++;
> +				j->next = hd;
> +				hd = j;
> +			}
> +			fidx++;
>  		}
> -		free(dirbuf);
> +		if(nlive == 0){
> +			fprint(2, "nothing live\n");
> +			break;
> +		}
> +rescan:
> +		if((w = wait()) == nil){
> +			syslog(0, "runq", "wait error: %r");
> +			break;
> +		}
> +		found = 0;
> +		for(p = &hd; *p != nil; p = &(*p)->next){
> +			if(w->pid == (*p)->pid){
> +				*p = donefile(*p, w);
> +				found++;
> +				nlive--;
> +				break;
> +			}
> +		}
> +		free(w);
> +		if(!found)
> +			goto rescan;
>  	}
> +	assert(hd == nil);
> +	free(dirbuf);
>  	if(aflag && sflag)
>  		sysunlockfile(fd);
>  	else
> @@ -316,15 +371,14 @@
>  /*
>   *  try a message
>   */
> -void
> +Job*
>  dofile(Dir *dp)
>  {
> +	int dtime, efd, i, etime;
> +	Job *j;
> +//	Mlock *l;
>  	Dir *d;
> -	int dfd, ac, dtime, efd, pid, i, etime;
> -	char *buf, *cp, **av;
> -	Waitmsg *wm;
> -	Biobuf *b;
> -	Mlock *l = nil;
> +	char *cp;
>
>  	if(debug)
>  		fprint(2, "dofile %s\n", dp->name);
> @@ -337,14 +391,14 @@
>  	if(d == nil){
>  		syslog(0, runqlog, "no data file for %s", dp->name);
>  		remmatch(dp->name);
> -		return;
> +		return nil;
>  	}
>  	if(dp->length == 0){
>  		if(time(0)-dp->mtime > 15*60){
>  			syslog(0, runqlog, "empty ctl file for %s", dp->name);
>  			remmatch(dp->name);
>  		}
> -		return;
> +		return nil;
>  	}
>  	dtime = d->mtime;
>  	free(d);
> @@ -358,31 +412,35 @@
>  		if(etime - dtime < 60*60){
>  			/* up to the first hour, try every 15 minutes */
>  			if(time(0) - etime < 15*60)
> -				return;
> +				return nil;
>  		} else {
>  			/* after the first hour, try once an hour */
>  			if(time(0) - etime < 60*60)
> -				return;
> +				return nil;
>  		}
> -
>  	}
>
>  	/*
>  	 *  open control and data
>  	 */
> -	b = sysopen(file(dp->name, 'C'), "rl", 0660);
> -	if(b == 0) {
> +	j = malloc(sizeof(Job));
> +	if(j == nil)
> +		return nil;
> +	memset(j, 0, sizeof(Job));
> +	j->dp = dp;
> +	j->dfd = -1;
> +	j->b = sysopen(file(dp->name, 'C'), "rl", 0660);
> +	if(j->b == 0) {
>  		if(debug)
>  			fprint(2, "can't open %s: %r\n", file(dp->name, 'C'));
> -		return;
> +		return nil;
>  	}
> -	dfd = open(file(dp->name, 'D'), OREAD);
> -	if(dfd < 0){
> +	j->dfd = open(file(dp->name, 'D'), OREAD);
> +	if(j->dfd < 0){
>  		if(debug)
>  			fprint(2, "can't open %s: %r\n", file(dp->name, 'D'));
> -		Bterm(b);
> -		sysunlockfile(Bfildes(b));
> -		return;
> +		freejob(j);
> +		return nil;
>  	}
>
>  	/*
> @@ -390,48 +448,36 @@
>  	 *	- read args into (malloc'd) buffer
>  	 *	- malloc a vector and copy pointers to args into it
>  	 */
> -	buf = malloc(dp->length+1);
> -	if(buf == 0){
> +
> +	j->buf = malloc(dp->length+1);
> +	if(j->buf == nil){
>  		warning("buffer allocation", 0);
> -		Bterm(b);
> -		sysunlockfile(Bfildes(b));
> -		close(dfd);
> -		return;
> +		freejob(j);
> +		return nil;
>  	}
> -	if(Bread(b, buf, dp->length) != dp->length){
> +	if(Bread(j->b, j->buf, dp->length) != dp->length){
>  		warning("reading control file %s\n", dp->name);
> -		Bterm(b);
> -		sysunlockfile(Bfildes(b));
> -		close(dfd);
> -		free(buf);
> -		return;
> +		freejob(j);
> +		return nil;
>  	}
> -	buf[dp->length] = 0;
> -	av = malloc(2*sizeof(char*));
> -	if(av == 0){
> +	j->buf[dp->length] = 0;
> +	j->av = malloc(2*sizeof(char*));
> +	if(j->av == 0){
>  		warning("argv allocation", 0);
> -		close(dfd);
> -		free(buf);
> -		Bterm(b);
> -		sysunlockfile(Bfildes(b));
> -		return;
> +		freejob(j);
> +		return nil;
>  	}
> -	for(ac = 1, cp = buf; *cp; ac++){
> +	for(j->ac = 1, cp = j->buf; *cp; j->ac++){
>  		while(isspace(*cp))
>  			*cp++ = 0;
>  		if(*cp == 0)
>  			break;
>
> -		av = realloc(av, (ac+2)*sizeof(char*));
> -		if(av == 0){
> +		j->av = realloc(j->av, (j->ac+2)*sizeof(char*));
> +		if(j->av == 0){
>  			warning("argv allocation", 0);
> -			close(dfd);
> -			free(buf);
> -			Bterm(b);
> -			sysunlockfile(Bfildes(b));
> -			return;
>  		}
> -		av[ac] = cp;
> +		j->av[j->ac] = cp;
>  		while(*cp && !isspace(*cp)){
>  			if(*cp++ == '"'){
>  				while(*cp && *cp != '"')
> @@ -441,18 +487,18 @@
>  			}
>  		}
>  	}
> -	av[0] = cmd;
> -	av[ac] = 0;
> +	j->av[0] = cmd;
> +	j->av[j->ac] = 0;
>
>  	if(!Eflag &&time(0) - dtime > giveup){
> -		if(returnmail(av, dp->name, "Giveup") != 0)
> -			logit("returnmail failed", dp->name, av);
> +		if(returnmail(j->av, dp->name, "Giveup") != 0)
> +			logit("returnmail failed", dp->name, j->av);
>  		remmatch(dp->name);
>  		goto done;
>  	}
>
>  	for(i = 0; i < nbad; i++){
> -		if(strcmp(av[3], badsys[i]) == 0)
> +		if(j->ac > 3 && strcmp(j->av[3], badsys[i]) == 0)
>  			goto done;
>  	}
>
> @@ -460,33 +506,34 @@
>  	 * Ken's fs, for example, gives us 5 minutes of inactivity before
>  	 * the lock goes stale, so we have to keep reading it.
>   	 */
> -	l = keeplockalive(file(dp->name, 'C'), Bfildes(b));
> +	j->l = keeplockalive(file(dp->name, 'C'), Bfildes(j->b));
>
>  	/*
>  	 *  transfer
>  	 */
> -	pid = fork();
> -	switch(pid){
> +	j->pid = fork();
> +	switch(j->pid){
>  	case -1:
> -		sysunlock(l);
> -		sysunlockfile(Bfildes(b));
> +		sysunlock(j->l);
> +		sysunlockfile(Bfildes(j->b));
>  		syslog(0, runqlog, "out of procs");
>  		exits(0);
>  	case 0:
>  		if(debug) {
> -			fprint(2, "Starting %s", cmd);
> -			for(ac = 0; av[ac]; ac++)
> -				fprint(2, " %s", av[ac]);
> -			fprint(2, "\n");
> +			fprint(2, "Starting %s\n", cmd);
> +//			for(i = 0; j->av[i]; i++)
> +//				fprint(2, " %s", j->av[i]);
> +//			fprint(2, "\n");
>  		}
> -		logit("execing", dp->name, av);
> +		logit("execing", dp->name, j->av);
>  		close(0);
> -		dup(dfd, 0);
> -		close(dfd);
> +		dup(j->dfd, 0);
> +		close(j->dfd);
>  		close(2);
>  		efd = open(file(dp->name, 'E'), OWRITE);
>  		if(efd < 0){
> -			if(debug) syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'),
> getuser());
> +			if(debug)
> +				syslog(0, "runq", "open %s as %s: %r", file(dp->name,'E'), getuser());
>  			efd = create(file(dp->name, 'E'), OWRITE, 0666);
>  			if(efd < 0){
>  				if(debug) syslog(0, "runq", "create %s as %s: %r", file(dp->name, 'E'),
> getuser());
> @@ -494,50 +541,62 @@
>  			}
>  		}
>  		seek(efd, 0, 2);
> -		exec(cmd, av);
> +		exec(cmd, j->av);
>  		error("can't exec %s", cmd);
>  		break;
>  	default:
> -		for(;;){
> -			wm = wait();
> -			if(wm == nil)
> -				error("wait failed: %r", "");
> -			if(wm->pid == pid)
> -				break;
> -			free(wm);
> -		}
> -		if(debug)
> -			fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
> -
> -		if(wm->msg[0]){
> -			if(debug)
> -				fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
> -			if(!Rflag && strstr(wm->msg, "Retry")==0){
> -				/* return the message and remove it */
> -				if(returnmail(av, dp->name, wm->msg) != 0)
> -					logit("returnmail failed", dp->name, av);
> -				remmatch(dp->name);
> -			} else {
> -				/* add sys to bad list and try again later */
> -				nbad++;
> -				badsys = realloc(badsys, nbad*sizeof(char*));
> -				badsys[nbad-1] = strdup(av[3]);
> -			}
> -		} else {
> -			/* it worked remove the message */
> -			remmatch(dp->name);
> -		}
> -		free(wm);
> -
> +		return j;
>  	}
>  done:
> -	if (l)
> -		sysunlock(l);
> -	Bterm(b);
> -	sysunlockfile(Bfildes(b));
> -	free(buf);
> -	free(av);
> -	close(dfd);
> +	freejob(j);
> +	return nil;
> +}
> +
> +Job*
> +donefile(Job *j, Waitmsg *wm)
> +{
> +	Job *n;
> +
> +	if(debug)
> +		fprint(2, "wm->pid %d wm->msg == %s\n", wm->pid, wm->msg);
> +
> +	if(wm->msg[0]){
> +		if(debug)
> +			fprint(2, "[%d] wm->msg == %s\n", getpid(), wm->msg);
> +		if(!Rflag && strstr(wm->msg, "Retry")==0){
> +			/* return the message and remove it */
> +			if(returnmail(j->av, j->dp->name, wm->msg) != 0)
> +				logit("returnmail failed", j->dp->name, j->av);
> +			remmatch(j->dp->name);
> +		} else {
> +			/* add sys to bad list and try again later */
> +			nbad++;
> +			badsys = realloc(badsys, nbad*sizeof(char*));
> +			badsys[nbad-1] = strdup(j->av[3]);
> +		}
> +	} else {
> +		/* it worked remove the message */
> +		remmatch(j->dp->name);
> +	}
> +	n = j->next;
> +	freejob(j);
> +	return n;
> +}
> +
> +void
> +freejob(Job *j)
> +{
> +	if(j->b != nil){
> +		sysunlockfile(Bfildes(j->b));
> +		Bterm(j->b);
> +	}
> +	if(j->dfd != -1)
> +		close(j->dfd);
> +	if(j->l != nil)
> +		sysunlock(j->l);
> +	free(j->buf);
> +	free(j->av);
> +	free(j);
>  }
>
>
>

  reply	other threads:[~2021-01-10  8:36 UTC|newest]

Thread overview: 4+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2021-01-10  2:27 ori
2021-01-10  8:04 ` hiro [this message]
2021-01-17  0:42 ` ori
2021-01-17  1:06   ` ori

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=CAFSF3XPJmKOjnfWnSXbWLw+WqVwKoZdbFtbOOCEVqaq+XcpF3Q@mail.gmail.com \
    --to=23hiro@gmail.com \
    --cc=9front@9front.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).