From mboxrd@z Thu Jan 1 00:00:00 1970 Message-ID: <4672ddba9b22be0ab00effce6cae068f@quanstro.net> From: erik quanstrom Date: Sun, 13 Sep 2009 10:44:22 -0400 To: 9fans@9fans.net In-Reply-To: <140e7ec30909121108l411080bam3a34bac1168f911f@mail.gmail.com> MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="upas-vwhndabyjoubrwpucedplftmin" Subject: Re: [9fans] sd(3) and concurrent readers/writers? Topicbox-Message-UUID: 6e04d962-ead5-11e9-9d60-3106f5b1d025 This is a multi-part message in MIME format. --upas-vwhndabyjoubrwpucedplftmin Content-Disposition: inline Content-Type: text/plain; charset="US-ASCII" Content-Transfer-Encoding: 7bit > dd -if /dev/sdE0/data -of /dev/sdF0/data -bs 1048576 i was thinking about your technique, and it occured to me that this command is equvalent to for(;;) read(E0, buf, 1mb) write(F0, buf, 1mb) but if you wrote it like this dd -if data -bs 64k -count 20000 |dd -bs 64k -of ../sda1/data the read and write could be run in parallel, at the expense of a buffer copy. i didn't have anything except for some very fast (120mb/s) sas disks to test with, but even they showed 10% performance improvement. even at the expense of copies 0.01u 0.62s 11.63r rc -c dd -if data -bs 64k -count 20000 -of ../sda1/data 0.02u 0.97s 10.72r rc -c dd -if data -bs 64k -count 20000|dd -bs 64k -of ../sda1/data not all that impressive with my disks, perhaps this would show more improvement on normal disks. > fn chk { > for(i in sdE0 sdF0) dd -if /dev/$i/data -bs 1048576 -iseek $1 > -count 1 |md5sum > } i found this interesting, too. i wrote a short program using the threads library to do the reads and compares in parallel. in the process of writing that i realized that the md5sum is not necessary. a memcmp would do. i finished the program up (attached) and found that it performed pretty well. giving me ~123mb/s. that's about what these drives will do. but i was wondering why cmp would just work. it occurred to me that i could run the dds in parallel with a command like this cmp <{dd -if data -bs 64k -count 20000} <{dd -if ../sda1/data -bs 64k -count 20000} surprisingly, this was just as fast on my setup as the specalized program 0.07u 0.04s 10.65r 8.out -n 20000 data ../sda1/data ... 0.32u 0.26s 10.65r cmp /fd/7 /fd/6 clearly if the compare is more involved, like sha1sum, it would be more fruitful to use a modified version of the threaded program. (unless you see a way of parallelizing the cmp part of that command without byzantine contortions.) i ran this test and found a surprising speedup: 0.06u 0.03s 13.65r 8.out -sn 20000 data ../sda1/data ... i suspect there is something a bit amiss with time(1)'s accounting. i suppose that a motivated person could write a book on parallel programming with the shell. tony hoar would be proud. - erik --upas-vwhndabyjoubrwpucedplftmin Content-Disposition: attachment; filename=main.c Content-Type: text/plain; charset="UTF-8" Content-Transfer-Encoding: 8bit /* * cf. cmp <{dd -if data -bs 64k -count 20000} <{dd -if ../sda1/data -bs 64k -count 20000} * copyright © 2009 erik quanstrom */ #include #include #include #include enum { Stack = 64*1024, Block = 64*1024, Buffer = 3, Memcmp= 1<<0, Sha1 = 1<<1, Ferror = 1<<1, Fcmp = 1<<2, Fend = 1<<3, }; typedef struct Ddargs Ddargs; struct Ddargs { int fd; Channel *c; ulong bs; uvlong start; uvlong end; }; typedef struct Bargs Bargs; struct Bargs { uvlong nblocks; ulong bs; int nend; }; typedef struct Msgbuf Msgbuf; struct Msgbuf { uint flags; uvlong lba; char status[ERRMAX]; uchar data[Block]; }; Channel *blockfree; Channel *blockalloc; static Alt alts[3]; void blockproc(void *a) { uint h, t, f, e, c, m; uvlong i; Bargs *args; Msgbuf *s, *r, **tab; threadsetname("blockproc"); alts[0].c = blockalloc; alts[0].v = &s; alts[0].op = CHANSND; alts[1].c = blockfree; alts[1].v = &r; alts[1].op = CHANRCV; alts[2].op = CHANEND; args = (Bargs*)a; tab = malloc(args->nblocks * sizeof tab[0]); m = args->nblocks - 1; if(tab == nil) sysfatal("malloc: %r"); for(i = 0; i < args->nblocks; i++){ tab[i] = malloc(sizeof(Msgbuf)); if(tab[i] == nil) sysfatal("malloc: %r"); } h = t = 0; e = c = 0; s = nil; for(f = args->nend; f > 0;){ if(s == nil){ s = tab[h % m]; if(s != nil){ tab[h++ % m] = nil; alts[0].op = CHANSND; }else alts[0].op = CHANNOP; } switch(alt(alts)){ case 0: s = nil; break; case 1: assert(r != nil && tab[t % m] == nil); tab[t++ % m] = r; if(r->flags & Fend) f--; if(r->flags & Fcmp) c++; if(r->flags & Ferror) e++; r = nil; break; } } for(i = 0; i < args->nblocks; i++) free(tab[i]); free(tab); if(e > 0) threadexitsall("errors"); if(c > 0) threadexitsall("cmp"); threadexitsall(""); } Msgbuf* bufalloc(void) { Msgbuf *b; b = recvp(blockalloc); if(b == nil) sysfatal("recvp: %r"); b->flags = 0; b->lba = 0; b->status[0] = 0; return b; } static int preadn(int fd, void *av, long n, vlong o) { char *a; long m, t; a = av; t = 0; while(t < n){ m = pread(fd, a+t, n-t, o+t); if(m <= 0){ if(t == 0) return m; break; } t += m; } return t; } void ddproc(void *a) { int rv; uvlong i; Ddargs *d; Msgbuf *b; threadsetname("ddproc"); d = (Ddargs*)a; for(i = d->start; i < d->end; i++){ b = bufalloc(); b->lba = i; rv = preadn(d->fd, b->data, d->bs, b->lba * d->bs); if(rv != d->bs){ errstr(b->status, sizeof b->status); b->flags |= Ferror; } sendp(d->c, b); } close(d->fd); b = bufalloc(); b->flags |= Fend; sendp(d->c, b); threadexits(""); } uint bs = Block; uint cmptype = Memcmp; Channel *dev[2]; QLock cmplock; uint diffat(uchar *a, uchar *b, uint l) { uint i; for(i = 0; i < l; i++) if(a[i] != b[i]) return i; abort(); return ~0; } int docmp(uchar *a, uchar *b, int l) { uchar suma[SHA1dlen], sumb[SHA1dlen]; if(cmptype == Memcmp) return memcmp(a, b, bs) != 0; sha1(a, l, suma, nil); sha1(b, l, sumb, nil); // Bprint(&out, "%A %A\n", suma, sumb); return memcmp(suma, sumb, sizeof suma) != 0; } void cmpproc(void*) { uchar *x, *y; int i; Msgbuf *b[2]; threadsetname("cmpproc"); for(;;){ qlock(&cmplock); for(i = 0; i < 2; i++) b[i] = recvp(dev[i]); qunlock(&cmplock); assert(b[0] != nil && b[1] != nil); assert(b[0]->lba == b[1]->lba); x = b[0]->data; y = b[1]->data; if(b[0]->flags & Ferror) print("cmp error: %llud: device 0 error: %s\n", b[0]->lba, b[0]->status); else if(b[0]->flags & Ferror) print("cmp error: %llud: device 1 error: %s\n", b[1]->lba, b[1]->status); else if(b[0]->flags & Fend){ }else if(docmp(x, y, bs)){ b[0]->flags |= Fcmp; print("%llud + %ud\n", b[0]->lba, diffat(x, y, bs)); } sendp(blockfree, b[0]); sendp(blockfree, b[1]); } } void usage(void) { fprint(2, "usage: disk/cmp [-n nblocks] [-b blocksz] dev0 dev1\n"); threadexitsall("usage"); } Ddargs d[2]; Bargs a; void threadmain(int argc, char **argv) { int i; uvlong nblocks; Dir *e; nblocks = 0; ARGBEGIN{ case 'n': nblocks = atoi(EARGF(usage())); break; case 'b': bs = atoi(EARGF(usage())); break; case 's': cmptype = Sha1; break; default: usage(); }ARGEND if(argc != 2) usage(); for(i = 0; i < 2; i++){ d[i].fd = open(argv[i], OREAD); if(d[i].fd == -1) sysfatal("open: %r"); d[i].bs = bs; d[i].start = 0; if(nblocks != 0) d[i].end = nblocks; else{ e = dirfstat(d[i].fd); if(e == nil) sysfatal("dirfstat: %r"); d[i].end = e->length / d[i].bs; free(e); } d[i].c = dev[i] = chancreate(sizeof(Msgbuf*), Buffer); if(d[i].c == nil) sysfatal("chancreate: %r"); } blockfree = chancreate(sizeof(Msgbuf*), 1); blockalloc = chancreate(sizeof(Msgbuf*), 1); if(blockalloc == nil || blockfree == nil) sysfatal("chancreate: %r"); a.nblocks = 2*Buffer; a.bs = bs; a.nend = 2; proccreate(ddproc, d + 0, Stack); proccreate(ddproc, d + 1, Stack); for(i = 0; i < 4; i++) proccreate(cmpproc, nil, Stack); blockproc(&a); threadexitsall(""); } --upas-vwhndabyjoubrwpucedplftmin--