#include #include #include "assert.h" #include "threadimpl.h" static Lock chanlock; // Central channel access lock static int emptyentry(Channel *c) { int i, bitno, byteno; for (i = 0; i < Nqbits; i++) { bitno = i & Nqmask; byteno = i >> Nqshift; if ((c->qused[byteno] & 1 << bitno) == 0){ c->qused[byteno] |= 1 << bitno; return i; } } sysfatal("alt: too many waiting threads on channel 0x%p", c); return -1; } void chanfree(Channel *c) { int i, inuse; lock(&chanlock); inuse = 0; for (i = 0; i < Nqwds; i++) if (c->qused[i]) inuse = 1; if (inuse) c->freed = 1; else free(c); unlock(&chanlock); } int chaninit(Channel *c, int elemsize, int elemcnt) { int i; if(elemcnt < 0 || elemsize <= 0 || c == nil) return -1; c->f = 0; c->n = 0; c->freed = 0; for (i = 0; i < Nqwds; i++) c->qused[i] = 0; c->s = elemcnt; c->e = elemsize; _threaddebug(DBGCHAN, "chaninit %lux", c); return 1; } Channel * chancreate(int elemsize, int elemcnt) { Channel *c; if(elemcnt < 0 || elemsize <= 0) return nil; c = _threadmalloc(sizeof(Channel) + elemcnt * elemsize, 1); c->s = elemcnt; c->e = elemsize; _threaddebug(DBGCHAN, "chancreate %lux", c); return c; } int alt(Alt *alts) { Alt *a, *xa; Channel *c; uchar *v; int i, n, entry, bitno, byteno; Proc *p; Thread *t; lock(&chanlock); (*procp)->curthread->alt = alts; (*procp)->curthread->call = Callalt; repeat: // Test which channels can proceed n = 1; a = nil; entry = -1; for (xa = alts; xa->op; xa++) { if (xa->op == CHANNOP) continue; if (xa->op == CHANNOBLK) { if (a == nil) { (*procp)->curthread->call = Callnil; unlock(&chanlock); return xa - alts; } else break; } c = xa->c; if (c == nil) sysfatal("alt: nil channel in entry %ld\n", xa - alts); if ((xa->op == CHANSND && c->n < c->s) || (xa->op == CHANRCV && c->n)) { // There's room to send in the channel if (nrand(n) == 0) { a = xa; entry = -1; } n++; } else { // Test for blocked senders or receivers for (i = 0; i < Nqbits; i++) { bitno = i & Nqmask; byteno = i >> Nqshift; // Is it claimed? if ( (c->qused[byteno] & (1 << bitno)) && xa->op == (CHANSND+CHANRCV) - c->qentry[i]->op // complementary op && *c->qentry[i]->tag == nil ) { // No if (nrand(n) == 0) { a = xa; entry = i; } n++; break; } } } } if (a == nil) { // Nothing can proceed, enqueue on all channels c = nil; for (a = alts; a->op; a++) { Channel *cp; if (a->op == CHANNOP || a->op == CHANNOBLK) continue; cp = a->c; a->tag = &c; i = emptyentry(cp); cp->qentry[i] = a; a->q[i >> Nqshift] = 1 << (i & Nqmask); } // And wait for the rendez vous unlock(&chanlock); if (_threadrendezvous((ulong)&c, 0) == ~0) { (*procp)->curthread->call = Callnil; return -1; } lock(&chanlock); /* We rendezed-vous on channel c, dequeue from all channels * and find the Alt struct to which c belongs */ a = nil; for (xa = alts; xa->op; xa++) { Channel *xc; if (xa->op == CHANNOP || xa->op == CHANNOBLK) continue; xc = xa->c; for (i = 0; i < Nqwds; i++) xc->qused[i] &= ~xa->q[i]; if (xc == c) a = xa; } if (c->s) { // Buffered channel, try again sleep(0); goto repeat; } unlock(&chanlock); if (c->freed) chanfree(c); p = *procp; t = p->curthread; if (t->exiting) threadexits(nil); (*procp)->curthread->call = Callnil; return a - alts; } c = a->c; // Channel c can proceed if (c->s) { // Send or receive via the buffered channel if (a->op == CHANSND) { v = c->v + ((c->f + c->n) % c->s) * c->e; if (a->v) memmove(v, a->v, c->e); else memset(v, 0, c->e); c->n++; } else { if (a->v) { v = c->v + (c->f % c->s) * c->e; memmove(a->v, v, c->e); } c->n--; c->f++; } } if (entry < 0) for (i = 0; i < Nqbits; i++) { bitno = i & Nqmask; byteno = i >> Nqshift; if ( (c->qused[byteno] & (1 << bitno)) && c->qentry[i]->op == (CHANSND+CHANRCV) - a->op && *c->qentry[i]->tag == nil ) { // Unblock peer process xa = c->qentry[i]; *xa->tag = c; unlock(&chanlock); if (_threadrendezvous((ulong)xa->tag, 0) == ~0) { (*procp)->curthread->call = Callnil; return -1; } (*procp)->curthread->call = Callnil; return a - alts; } } if (entry >= 0) { xa = c->qentry[entry]; if (a->op == CHANSND) { if (xa->v) { if (a->v) memmove(xa->v, a->v, c->e); else memset(xa->v, 0, c->e); } } else { if (a->v) { if (xa->v) memmove(a->v, xa->v, c->e); else memset(a->v, 0, c->e); } } *xa->tag = c; unlock(&chanlock); if (_threadrendezvous((ulong)xa->tag, 0) == ~0) { (*procp)->curthread->call = Callnil; return -1; } (*procp)->curthread->call = Callnil; return a - alts; } unlock(&chanlock); yield(); (*procp)->curthread->call = Callnil; return a - alts; } int nbrecv(Channel *c, void *v) { Alt *a; int i, bitno, byteno; lock(&chanlock); for (i = 0; i < Nqbits; i++) { bitno = i & Nqmask; byteno = i >> Nqshift; a = c->qentry[i]; if ( (c->qused[byteno] & (1 << bitno)) && a->op == CHANSND && *a->tag == nil ) { *a->tag = c; if (c->n) { // There's an item to receive in the buffered channel if (v) memmove(v, c->v + (c->f % c->s) * c->e, c->e); c->n--; c->f++; } else { if (v) { if (a->v) memmove(v, a->v, c->e); else memset(v, 0, c->e); } } unlock(&chanlock); if (_threadrendezvous((ulong)a->tag, 0) == ~0) return -1; return 1; } } if (c->n) { // There's an item to receive in the buffered channel if (v) memmove(v, c->v + (c->f % c->s) * c->e, c->e); c->n--; c->f++; unlock(&chanlock); return 1; } unlock(&chanlock); return 0; } int recv(Channel *c, void *v) { Alt a; Channel *tag; int i, byteno; Proc *p; Thread *t; retry: if (i = nbrecv(c, v)) return i; lock(&chanlock); // Unbuffered, or buffered but empty tag = nil; a.c = c; a.v = v; a.tag = &tag; a.op = CHANRCV; p = *procp; t = p->curthread; t->alt = &a; t->call = Callrcv; // enqueue on the channel i = emptyentry(c); byteno = i >> Nqshift; c->qentry[i] = &a; a.q[byteno] = 1 << (i & Nqmask); unlock(&chanlock); if (_threadrendezvous((ulong)&tag, 0) == ~0) { t->call = Callnil; return -1; } lock(&chanlock); // dequeue from the channel c->qused[byteno] &= ~a.q[byteno]; unlock(&chanlock); if (c->s) goto retry; // Buffered channels: try the queue again if (c->freed) chanfree(c); t->call = Callnil; if (t->exiting) threadexits(nil); return 1; } int nbsend(Channel *c, void *v) { Alt *a; int i, bitno, byteno; lock(&chanlock); for (i = 0; i < Nqbits; i++) { bitno = i & Nqmask; byteno = i >> Nqshift; a = c->qentry[i]; if ( (c->qused[byteno] & 1 << bitno) && a->op == CHANRCV && *a->tag == nil ) { *a->tag = c; if (c->n < c->s) { // There's room to send in the buffered channel if (v) memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); else memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); c->n++; } else { if (a->v) { if (v) memmove(a->v, v, c->e); else memset(a->v, 0, c->e); } } unlock(&chanlock); if (_threadrendezvous((ulong)a->tag, 0) == ~0) return -1; return 1; } } if (c->n < c->s) { // There's room to send in the buffered channel if (v) memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e); else memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e); c->n++; unlock(&chanlock); yield(); return 1; } unlock(&chanlock); return 0; } int send(Channel *c, void *v) { Alt a; Channel *tag; int i, byteno; Proc *p; Thread *t; retry: if (i = nbsend(c, v)) return i; lock(&chanlock); tag = nil; a.c = c; a.v = v; a.tag = &tag; a.op = CHANSND; p = *procp; t = p->curthread; t->alt = &a; t->call = Callsnd; // enqueue on the channel i = emptyentry(c); byteno = i >> Nqshift; c->qentry[i] = &a; a.q[byteno] = 1 << (i & Nqmask); unlock(&chanlock); if (_threadrendezvous((ulong)&tag, 0) == ~0) { t->call = Callnil; return -1; } lock(&chanlock); // dequeue from the channel c->qused[byteno] &= ~a.q[byteno]; unlock(&chanlock); if (c->s) goto retry; // Buffered channels: try the queue again // Unbuffered channels: data is already transferred if (c->freed) chanfree(c); t->call = Callnil; if (t->exiting) threadexits(nil); return 1; } int sendul(Channel *c, ulong v) { threadassert(c->e == sizeof(ulong)); return send(c, &v); } ulong recvul(Channel *c) { ulong v; threadassert(c->e == sizeof(ulong)); if (recv(c, &v) < 0) return ~0; return v; } int sendp(Channel *c, void *v) { threadassert(c->e == sizeof(void *)); return send(c, &v); } void * recvp(Channel *c) { void * v; threadassert(c->e == sizeof(void *)); if (recv(c, &v) < 0) return nil; return v; } int nbsendul(Channel *c, ulong v) { threadassert(c->e == sizeof(ulong)); return nbsend(c, &v); } ulong nbrecvul(Channel *c) { ulong v; threadassert(c->e == sizeof(ulong)); if (nbrecv(c, &v) == 0) return 0; return v; } int nbsendp(Channel *c, void *v) { threadassert(c->e == sizeof(void *)); return nbsend(c, &v); } void * nbrecvp(Channel *c) { void * v; threadassert(c->e == sizeof(void *)); if (nbrecv(c, &v) == 0) return nil; return v; }