9fans - fans of the OS Plan 9 from Bell Labs
 help / color / mirror / Atom feed
From: erik quanstrom <quanstro@quanstro.net>
To: 9fans@9fans.net
Subject: Re: [9fans] sd(3) and concurrent readers/writers?
Date: Sun, 13 Sep 2009 10:44:22 -0400	[thread overview]
Message-ID: <4672ddba9b22be0ab00effce6cae068f@quanstro.net> (raw)
In-Reply-To: <140e7ec30909121108l411080bam3a34bac1168f911f@mail.gmail.com>

[-- Attachment #1: Type: text/plain, Size: 2235 bytes --]

> dd -if /dev/sdE0/data -of /dev/sdF0/data -bs 1048576

i was thinking about your technique, and it occured to me
that this command is equvalent to
	for(;;)
		read(E0, buf, 1mb)
		write(F0, buf, 1mb)
but if you wrote it like this
	dd -if data -bs 64k -count 20000 |dd -bs 64k -of ../sda1/data
the read and write could be run in parallel, at the expense
of a buffer copy.  i didn't have anything except for some
very fast (120mb/s) sas disks to test with, but even they
showed 10% performance improvement. even at the expense
of copies
	0.01u 0.62s 11.63r 	 rc -c dd -if data -bs 64k -count 20000 -of ../sda1/data
	0.02u 0.97s 10.72r 	 rc -c dd -if data -bs 64k -count 20000|dd -bs 64k -of ../sda1/data
not all that impressive with my disks, perhaps this would
show more improvement on normal disks.

> fn chk {
>     for(i in sdE0 sdF0) dd -if /dev/$i/data -bs 1048576 -iseek $1
> -count 1 |md5sum
> }

i found this interesting, too.  i wrote a short program using the
threads library to do the reads and compares in parallel.  in
the process of writing that i realized that the md5sum is not
necessary.  a memcmp would do.  i finished the program up
(attached) and found that it performed pretty well.  giving me
~123mb/s.  that's about what these drives will do.  but i was
wondering why cmp would just work.  it occurred to me that
i could run the dds in parallel with a command like this
	cmp <{dd -if data -bs 64k -count 20000} <{dd -if ../sda1/data -bs 64k -count 20000}
surprisingly, this was just as fast on my setup as the specalized
program
	0.07u 0.04s 10.65r 	 8.out -n 20000 data ../sda1/data ...
	0.32u 0.26s 10.65r 	 cmp /fd/7 /fd/6

clearly if the compare is more involved, like sha1sum, it would
be more fruitful to use a modified version of the threaded program.
(unless you see a way of parallelizing the cmp part of that command
without byzantine contortions.)  i ran this test and found a surprising
speedup:
	0.06u 0.03s 13.65r 	 8.out -sn 20000 data ../sda1/data ...
i suspect there is something a bit amiss with time(1)'s accounting.

i suppose that a motivated person could write a book on parallel
programming with the shell.  tony hoar would be proud.

- erik

[-- Attachment #2: main.c --]
[-- Type: text/plain, Size: 5441 bytes --]

/*
 * cf. cmp <{dd -if data -bs 64k -count 20000} <{dd -if ../sda1/data -bs 64k -count 20000}
 * copyright © 2009 erik quanstrom
 */
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <libsec.h>

enum {
	Stack	= 64*1024,
	Block	= 64*1024,
	Buffer	= 3,

	Memcmp= 1<<0,
	Sha1	= 1<<1,

	Ferror	= 1<<1,
	Fcmp	= 1<<2,
	Fend	= 1<<3,
};

typedef struct Ddargs Ddargs;
struct Ddargs {
	int	fd;
	Channel	*c;
	ulong	bs;
	uvlong	start;
	uvlong	end;
};

typedef struct Bargs Bargs;
struct Bargs {
	uvlong	nblocks;
	ulong	bs;
	int	nend;
};

typedef struct Msgbuf Msgbuf;
struct Msgbuf {
	uint	flags;
	uvlong	lba;
	char	status[ERRMAX];
	uchar	data[Block];
};

Channel	*blockfree;
Channel	*blockalloc;
	static Alt alts[3];

void
blockproc(void *a)
{
	uint h, t, f, e, c, m;
	uvlong i;
	Bargs *args;
	Msgbuf *s, *r, **tab;

	threadsetname("blockproc");

	alts[0].c = blockalloc;
	alts[0].v = &s;
	alts[0].op = CHANSND;
	alts[1].c = blockfree;
	alts[1].v = &r;
	alts[1].op = CHANRCV;
	alts[2].op = CHANEND;

	args = (Bargs*)a;
	tab = malloc(args->nblocks * sizeof tab[0]);
	m = args->nblocks - 1;
	if(tab == nil)
		sysfatal("malloc: %r");
	for(i = 0; i < args->nblocks; i++){
		tab[i] = malloc(sizeof(Msgbuf));
		if(tab[i] == nil)
			sysfatal("malloc: %r");
	}
	h = t = 0;
	e = c = 0;
	s = nil;
	for(f = args->nend; f > 0;){
		if(s == nil){
			s = tab[h % m];
			if(s != nil){
				tab[h++ % m] = nil;
				alts[0].op = CHANSND;
			}else
				alts[0].op = CHANNOP;
		}
		switch(alt(alts)){
		case 0:
			s = nil;
			break;
		case 1:
			assert(r != nil && tab[t % m] == nil);
			tab[t++ % m] = r;
			if(r->flags & Fend)
				f--;
			if(r->flags & Fcmp)
				c++;
			if(r->flags & Ferror)
				e++;
			r = nil;
			break;
		}
	}
	for(i = 0; i < args->nblocks; i++)
		free(tab[i]);
	free(tab);
	if(e > 0)
		threadexitsall("errors");
	if(c > 0)
		threadexitsall("cmp");
	threadexitsall("");
}

Msgbuf*
bufalloc(void)
{
	Msgbuf *b;

	b = recvp(blockalloc);
	if(b == nil)
		sysfatal("recvp: %r");
	b->flags = 0;
	b->lba = 0;
	b->status[0] = 0;
	return b;
}

static int
preadn(int fd, void *av, long n, vlong o)
{
	char *a;
	long m, t;

	a = av;
	t = 0;
	while(t < n){
		m = pread(fd, a+t, n-t, o+t);
		if(m <= 0){
			if(t == 0)
				return m;
			break;
		}
		t += m;
	}
	return t;
}

void
ddproc(void *a)
{
	int rv;
	uvlong i;
	Ddargs *d;
	Msgbuf *b;

	threadsetname("ddproc");
	d = (Ddargs*)a;
	for(i = d->start; i < d->end; i++){
		b = bufalloc();
		b->lba = i;
		rv = preadn(d->fd, b->data, d->bs, b->lba * d->bs);
		if(rv != d->bs){
			errstr(b->status, sizeof b->status);
			b->flags |= Ferror;
		}
		sendp(d->c, b);
	}
	close(d->fd);

	b = bufalloc();
	b->flags |= Fend;
	sendp(d->c, b);
	threadexits("");
}

uint	bs		= Block;
uint	cmptype		= Memcmp;
Channel *dev[2];
QLock	cmplock;

uint
diffat(uchar *a, uchar *b, uint l)
{
	uint i;

	for(i = 0; i < l; i++)
		if(a[i] != b[i])
			return i;
	abort();
	return ~0;
}

int
docmp(uchar *a, uchar *b, int l)
{
	uchar suma[SHA1dlen], sumb[SHA1dlen];

	if(cmptype == Memcmp)
		return memcmp(a, b, bs) != 0;
	sha1(a, l, suma, nil);
	sha1(b, l, sumb, nil);
//	Bprint(&out, "%A %A\n", suma, sumb);
	return memcmp(suma, sumb, sizeof suma) != 0;
}

void
cmpproc(void*)
{
	uchar *x, *y;
	int i;
	Msgbuf *b[2];

	threadsetname("cmpproc");
	for(;;){
		qlock(&cmplock);
		for(i = 0; i < 2; i++)
			b[i] = recvp(dev[i]);
		qunlock(&cmplock);
		assert(b[0] != nil && b[1] != nil);
		assert(b[0]->lba == b[1]->lba);

		x = b[0]->data;
		y = b[1]->data;
		if(b[0]->flags & Ferror)
			print("cmp error: %llud: device 0 error: %s\n",
				b[0]->lba, b[0]->status);
		else if(b[0]->flags & Ferror)
			print("cmp error: %llud: device 1 error: %s\n",
				b[1]->lba, b[1]->status);
		else if(b[0]->flags & Fend){
		}else if(docmp(x, y, bs)){
			b[0]->flags |= Fcmp;
			print("%llud + %ud\n", b[0]->lba, diffat(x, y, bs));
		}
		sendp(blockfree, b[0]);
		sendp(blockfree, b[1]);
	}
}

void
usage(void)
{
	fprint(2, "usage: disk/cmp [-n nblocks] [-b blocksz] dev0 dev1\n");
	threadexitsall("usage");
}

Ddargs d[2];
Bargs a;

void
threadmain(int argc, char **argv)
{
	int i;
	uvlong nblocks;
	Dir *e;

	nblocks = 0;
	ARGBEGIN{
	case 'n':
		nblocks = atoi(EARGF(usage()));
		break;
	case 'b':
		bs = atoi(EARGF(usage()));
		break;
	case 's':
		cmptype = Sha1;
		break;
	default:
		usage();
	}ARGEND
	if(argc != 2)
		usage();
	for(i = 0; i < 2; i++){
		d[i].fd = open(argv[i], OREAD);
		if(d[i].fd == -1)
			sysfatal("open: %r");
		d[i].bs = bs;
		d[i].start = 0;
		if(nblocks != 0)
			d[i].end = nblocks;
		else{
			e = dirfstat(d[i].fd);
			if(e == nil)
				sysfatal("dirfstat: %r");
			d[i].end = e->length / d[i].bs;
			free(e);
		}
		d[i].c = dev[i] = chancreate(sizeof(Msgbuf*), Buffer);
		if(d[i].c == nil)
			sysfatal("chancreate: %r");
	}
	blockfree = chancreate(sizeof(Msgbuf*), 1);
	blockalloc = chancreate(sizeof(Msgbuf*), 1);
	if(blockalloc == nil || blockfree == nil)
		sysfatal("chancreate: %r");
	a.nblocks = 2*Buffer;
	a.bs = bs;
	a.nend = 2;
	proccreate(ddproc, d + 0, Stack);
	proccreate(ddproc, d + 1, Stack);
	for(i = 0; i < 4; i++)
		proccreate(cmpproc, nil, Stack);
	blockproc(&a);
	threadexitsall("");
}

  parent reply	other threads:[~2009-09-13 14:44 UTC|newest]

Thread overview: 4+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2009-09-12 18:08 sqweek
2009-09-12 18:23 ` erik quanstrom
2009-09-13 14:44 ` erik quanstrom [this message]
2009-09-14 18:43   ` sqweek

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=4672ddba9b22be0ab00effce6cae068f@quanstro.net \
    --to=quanstro@quanstro.net \
    --cc=9fans@9fans.net \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).