From mboxrd@z Thu Jan 1 00:00:00 1970 To: 9fans@cse.psu.edu Subject: Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack? From: Sape Mullender MIME-Version: 1.0 Content-Type: multipart/mixed; boundary="upas-hnwcvhnatzzyjznphwlbtnbwim" Message-Id: <20011120211511.146EE19A4F@mail.cse.psu.edu> Date: Tue, 20 Nov 2001 16:15:09 -0500 Topicbox-Message-UUID: 2594e6be-eaca-11e9-9e20-41e7f4b1d025 This is a multi-part message in MIME format. --upas-hnwcvhnatzzyjznphwlbtnbwim Content-Disposition: inline Content-Type: text/plain; charset="US-ASCII" Content-Transfer-Encoding: 7bit Here are some fixes for channel.c. You can now have up to 64 waiting threads. Overflow is now flagged. Let me know how you fare. You probably need to integrate my thread.c with yours; we use a new version of 9p here with lots of changed interfaces. The data structures that matter are the Channel and Alt structs. Sape --upas-hnwcvhnatzzyjznphwlbtnbwim Content-Disposition: attachment; filename=channel.c Content-Type: text/plain; charset="US-ASCII" Content-Transfer-Encoding: 7bit #include #include #include "assert.h" #include "threadimpl.h" static Lock chanlock; // Central channel access lock static int emptyentry(Channel *c) { int i, bitno, byteno; for (i = 0; i < Nqbits; i++) { bitno = i & Nqmask; byteno = i >> Nqshift; if ((c->qused[byteno] & 1 << bitno) == 0){ c->qused[byteno] |= 1 << bitno; return i; } } sysfatal("alt: too many waiting threads on channel 0x%p", c); return -1; } void chanfree(Channel *c) { int i, inuse; lock(&chanlock); inuse = 0; for (i = 0; i < Nqwds; i++) if (c->qused[i]) inuse = 1; if (inuse) c->freed = 1; else free(c); unlock(&chanlock); } int chaninit(Channel *c, int elemsize, int elemcnt) { int i; if(elemcnt < 0 || elemsize <= 0 || c == nil) return -1; c->f = 0; c->n = 0; c->freed = 0; for (i = 0; i < Nqwds; i++) c->qused[i] = 0; c->s = elemcnt; c->e = elemsize; _threaddebug(DBGCHAN, "chaninit %lux", c); return 1; } Channel * chancreate(int elemsize, int elemcnt) { Channel *c; if(elemcnt < 0 || elemsize <= 0) return nil; c = _threadmalloc(sizeof(Channel) + elemcnt * elemsize, 1); c->s = elemcnt; c->e = elemsize; _threaddebug(DBGCHAN, "chancreate %lux", c); return c; } int alt(Alt *alts) { Alt *a, *xa; Channel *c; uchar *v; int i, n, entry, bitno, byteno; Proc *p; Thread *t; lock(&chanlock); (*procp)->curthread->alt = alts; (*procp)->curthread->call = Callalt; repeat: // Test which channels can proceed n = 1; a = nil; entry = -1; for (xa = alts; xa->op; xa++) { if (xa->op == CHANNOP) continue; if (xa->op == CHANNOBLK) { if (a == nil) { (*procp)->curthread->call = Callnil; unlock(&chanlock); return xa - alts; } else break; } c = xa->c; if (c == nil) sysfatal("alt: nil channel in entry %ld\n", xa - alts); if ((xa->op == CHANSND && c->n < c->s) || (xa->op == CHANRCV && c->n)) { // There's room to send in the channel if (nrand(n) == 0) { a = xa; entry = -1; } n++; } else { // Test for blocked senders or receivers for (i = 0; i < Nqbits; i++) { bitno = i & Nqmask; byteno = i >> Nqshift; // Is it claimed? if ( (c->qused[byteno] & (1 << bitno)) && xa->op == (CHANSND+CHANRCV) - c->qentry[i]->op // complementary op && *c->qentry[i]->tag == nil ) { // No if (nrand(n) == 0) { a = xa; entry = i; } n++; break; } } } } if (a == nil) { // Nothing can proceed, enqueue on all channels c = nil; for (a = alts; a->op; a++) { Channel *cp; if (a->op == CHANNOP || a->op == CHANNOBLK) continue; cp = a->c; a->tag = &c; i = emptyentry(cp); cp->qentry[i] = a; a->q[i >> Nqshift] = 1 << (i & Nqmask); } // And wait for the rendez vous unlock(&chanlock); if (_threadrendezvous((ulong)&c, 0) == ~0) { (*procp)->curthread->call = Callnil; return -1; } lock(&chanlock); /* We rendezed-vous on channel c, dequeue from all channels * and find the Alt struct to which c belongs */ a = nil; for (xa = alts; xa->op; xa++) { Channel *xc; if (xa->op == CHANNOP || xa->op == CHANNOBLK) continue; xc = xa->c; for (i = 0; i < Nqwds; i++) xc->qused[i] &= ~xa->q[i]; if (xc == c) a = xa; } if (c->s) { // Buffered channel, try again sleep(0); goto repeat; } unlock(&chanlock); if (c->freed) chanfree(c); p = *procp; t = p->curthread; if (t->exiting) threadexits(nil); (*procp)->curthread->call = Callnil; return a - alts; } c = a->c; // Channel c can proceed if (c->s) { // Send or receive via the buffered channel if (a->op == CHANSND) { v = c->v + ((c->f + c->n) % c->s) * c->e; if (a->v) memmove(v, a->v, c->e); else memset(v, 0, c->e); c->n++; } else { if (a->v) { v = c->v + (c->f % c->s) * c->e; memmove(a->v, v, c->e); } c->n--; c->f++; } } if (entry < 0) for (i = 0; i < Nqbits; i++) { bitno = i & Nqmask; byteno = i >> Nqshift; if ( (c->qused[byteno] & (1 << bitno)) && c->qentry[i]->op == (CHANSND+CHANRCV) - a->op && *c->qentry[i]->tag == nil ) { // Unblock peer process xa = c->qentry[i]; *xa->tag = c; unlock(&chanlock); if (_threadrendezvous((ulong)xa->tag, 0) == ~0) { (*procp)->curthread->call = Callnil; return -1; } (*procp)->curthread->call = Callnil; return a - alts; } } if (entry >= 0) { xa = c->qentry[entry]; if (a->op == CHANSND) { if (xa->v) { if (a->v) memmove(xa->v, a->v, c->e); else memset(xa->v, 0, c->e); } } else { if (a->v) { if (xa->v) memmove(a->v, xa->v, c->e); else memset(a->v, 0, c->e); } } *xa->tag = c; unlock(&chanlock); if (_threadrendezvous((ulong)xa->tag, 0) == ~0) { (*procp)->curthread->call = Callnil; return -1; } (*procp)->curthread->call = Callnil; return a - alts; } unlock(&chanlock); yield(); (*procp)->curthread->call = Callnil; return a - alts; } int nbrecv(Channel *c, void *v) { Alt *a; int i, bitno, byteno; lock(&chanlock); for (i = 0; i < Nqbits; i++) { bitno = i & Nqmask; byteno = i >> Nqshift; a = c->qentry[i]; if ( (c->qused[byteno] & (1 << bitno)) && a->op == CHANSND && *a->tag == nil ) { *a->tag = c; if (c->n) { // There's an item to receive in the buffered channel if (v) memmove(v, c->v + (c->f % c->s) * c->e, c->e); c->n--; c->f++; } else { if (v) { if (a->v) memmove(v, a->v, c->e); else memset(v, 0, c->e); } } unlock(&chanlock); if (_threadrendezvous((ulong)a->tag, 0) == ~0) return -1; return 1; } } if (c->n) { // There's an item to receive in the buffered channel if (v) memmove(v, c->v + (c->f % c->s) * c->e, c->e); c->n--; c->f++; unlock(&chanlock); return 1; } unlock(&chanlock); return 0; } int recv(Channel *c, void *v) { Alt a; Channel *tag; int i, byteno; Proc *p; Thread *t; retry: if (i = nbrecv(c, v)) return i; lock(&chanlock); // Unbuffered, or buffered but empty tag = nil; a.c = c; a.v = v; a.tag = &tag; a.op = CHANRCV; p = *procp; t = p->curthread; t->alt = &a; t->call = Callrcv; // enqueue on the channel i = emptyentry(c); byteno = i >> Nqshift; c->qentry[i] = &a; a.q[byteno] = 1 << (i & Nqmask); unlock(&chanlock); if (_threadrendezvous((ulong)&tag, 0) == ~0) { t->call = Callnil; return -1; } lock(&chanlock); // dequeue from the channel c->qused[byteno] &= ~a.q[byteno]; unlock(&chanlock); if (c->s) goto retry; // Buffered channels: try the queue again if (c->freed) chanfree(c); t->call = Callnil; if (t->exiting) threadexits(nil); return 1; } int nbsend(Channel *c, void *v) { Alt *a; int i, bitno, byteno; lock(&chanlock); for (i = 0; i < Nqbits; i++) { bitno = i & Nqmask; byteno = i >> Nqshift; a = c->qentry[i]; if ( (c->qused[byteno] & 1 << bitno) && a->op == CHANRCV && *a->tag == nil ) { *a->tag = c; if (c->n < c->s) { // There's room to send in the buffered channel if (v) memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); else memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); c->n++; } else { if (a->v) { if (v) memmove(a->v, v, c->e); else memset(a->v, 0, c->e); } } unlock(&chanlock); if (_threadrendezvous((ulong)a->tag, 0) == ~0) return -1; return 1; } } if (c->n < c->s) { // There's room to send in the buffered channel if (v) memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); else memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); c->n++; unlock(&chanlock); yield(); return 1; } unlock(&chanlock); return 0; } int send(Channel *c, void *v) { Alt a; Channel *tag; int i, byteno; Proc *p; Thread *t; retry: if (i = nbsend(c, v)) return i; lock(&chanlock); tag = nil; a.c = c; a.v = v; a.tag = &tag; a.op = CHANSND; p = *procp; t = p->curthread; t->alt = &a; t->call = Callsnd; // enqueue on the channel i = emptyentry(c); byteno = i >> Nqshift; c->qentry[i] = &a; a.q[byteno] = 1 << (i & Nqmask); unlock(&chanlock); if (_threadrendezvous((ulong)&tag, 0) == ~0) { t->call = Callnil; return -1; } lock(&chanlock); // dequeue from the channel c->qused[byteno] &= ~a.q[byteno]; unlock(&chanlock); if (c->s) goto retry; // Buffered channels: try the queue again // Unbuffered channels: data is already transferred if (c->freed) chanfree(c); t->call = Callnil; if (t->exiting) threadexits(nil); return 1; } int sendul(Channel *c, ulong v) { threadassert(c->e == sizeof(ulong)); return send(c, &v); } ulong recvul(Channel *c) { ulong v; threadassert(c->e == sizeof(ulong)); if (recv(c, &v) < 0) return ~0; return v; } int sendp(Channel *c, void *v) { threadassert(c->e == sizeof(void *)); return send(c, &v); } void * recvp(Channel *c) { void * v; threadassert(c->e == sizeof(void *)); if (recv(c, &v) < 0) return nil; return v; } int nbsendul(Channel *c, ulong v) { threadassert(c->e == sizeof(ulong)); return nbsend(c, &v); } ulong nbrecvul(Channel *c) { ulong v; threadassert(c->e == sizeof(ulong)); if (nbrecv(c, &v) == 0) return 0; return v; } int nbsendp(Channel *c, void *v) { threadassert(c->e == sizeof(void *)); return nbsend(c, &v); } void * nbrecvp(Channel *c) { void * v; threadassert(c->e == sizeof(void *)); if (nbrecv(c, &v) == 0) return nil; return v; } --upas-hnwcvhnatzzyjznphwlbtnbwim Content-Disposition: attachment; filename=thread.h Content-Type: text/plain; charset="US-ASCII" Content-Transfer-Encoding: 7bit #pragma src "/sys/src/libthread" #pragma lib "libthread.a" #pragma varargck argpos threadprint 2 #pragma varargck argpos chanprint 2 typedef struct Proc Proc; typedef struct Thread Thread; typedef struct Tqueue Tqueue; typedef struct Alt Alt; typedef struct Channel Channel; typedef struct Ref Ref; /* Channel structure. S is the size of the buffer. For unbuffered channels * s is zero. v is an array of s values. If s is zero, v is unused. * f and n represent the state of the queue pointed to by v. * rcvrs and sndrs must be initialized to nil and should not be touched * by code outside channel.c */ enum { Nqwds = 2, Nqshift = 5, Nqbits = (1 << Nqshift) * Nqwds, Nqmask = (1 << Nqshift) - 1, }; struct Channel { int s; // Size of the channel (may be zero) uint f; // Extraction point (insertion pt: (f + n) % s) uint n; // Number of values in the channel int e; // Element size int freed; // Set when channel is being deleted ulong qused[Nqwds]; // Bitmap of used entries in rcvrs Alt * qentry[Nqbits]; // Receivers/senders waiting uchar v[1]; // Array of max(1, s) values in the channel }; /* Channel operations for alt: */ enum{ CHANEND, CHANSND, CHANRCV, CHANNOP, CHANNOBLK, }; struct Alt { Channel *c; /* channel */ void *v; /* pointer to value */ int op; /* operation 0 == send, 1 == receive */ /* the next variables are used internally to alt * they need not be initialized */ Channel **tag; /* pointer to rendez-vous tag */ ulong q[Nqwds]; }; struct Ref { long ref; }; int alt(Alt alts[]); Channel* chancreate(int elemsize, int bufsize); int chaninit(Channel *c, int elemsize, int elemcnt); void chanfree(Channel *c); int chanprint(Channel *, char *, ...); long decref(Ref *r); /* returns 0 iff value is now zero */ void incref(Ref *r); int nbrecv(Channel *c, void *v); void* nbrecvp(Channel *c); ulong nbrecvul(Channel *c); int nbsend(Channel *c, void *v); int nbsendp(Channel *c, void *v); int nbsendul(Channel *c, ulong v); int proccreate(void (*f)(void *arg), void *arg, uint stacksize); int procrfork(void (*f)(void *arg), void *arg, uint stacksize, int flag); ulong * procdata(void); void procexec(Channel *, char *, char *[]); void procexecl(Channel *, char *, ...); int recv(Channel *c, void *v); void* recvp(Channel *c); ulong recvul(Channel *c); int send(Channel *c, void *v); int sendp(Channel *c, void *v); int sendul(Channel *c, ulong v); int threadcreate(void (*f)(void *arg), void *arg, uint stacksize); ulong* threaddata(void); void threadexits(char *); void threadexitsall(char *); int threadgetgrp(void); /* return thread group of current thread */ char* threadgetname(void); void threadkill(int); /* kill thread */ void threadkillgrp(int); /* kill threads in group */ void threadmain(int argc, char *argv[]); void threadnonotes(void); int threadid(void); int threadpid(int); int threadprint(int, char*, ...); int threadsetgrp(int); /* set thread group, return old */ void threadsetname(char *name); Channel* threadwaitchan(void); void yield(void); --upas-hnwcvhnatzzyjznphwlbtnbwim--