From: Sape Mullender <sape@plan9.bell-labs.com>
To: 9fans@cse.psu.edu
Subject: Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
Date: Tue, 20 Nov 2001 16:15:09 -0500 [thread overview]
Message-ID: <20011120211511.146EE19A4F@mail.cse.psu.edu> (raw)
[-- Attachment #1: Type: text/plain, Size: 334 bytes --]
Here are some fixes for channel.c. You can now have up to 64 waiting
threads. Overflow is now flagged. Let me know how you fare.
You probably need to integrate my thread.c with yours; we use a new
version of 9p here with lots of changed interfaces. The data structures
that matter are the Channel and Alt structs.
Sape
[-- Attachment #2: channel.c --]
[-- Type: text/plain, Size: 10077 bytes --]
#include <u.h>
#include <libc.h>
#include "assert.h"
#include "threadimpl.h"
static Lock chanlock; // Central channel access lock
static int
emptyentry(Channel *c)
{
int i, bitno, byteno;
for (i = 0; i < Nqbits; i++) {
bitno = i & Nqmask;
byteno = i >> Nqshift;
if ((c->qused[byteno] & 1 << bitno) == 0){
c->qused[byteno] |= 1 << bitno;
return i;
}
}
sysfatal("alt: too many waiting threads on channel 0x%p", c);
return -1;
}
void
chanfree(Channel *c) {
int i, inuse;
lock(&chanlock);
inuse = 0;
for (i = 0; i < Nqwds; i++)
if (c->qused[i]) inuse = 1;
if (inuse)
c->freed = 1;
else
free(c);
unlock(&chanlock);
}
int
chaninit(Channel *c, int elemsize, int elemcnt) {
int i;
if(elemcnt < 0 || elemsize <= 0 || c == nil)
return -1;
c->f = 0;
c->n = 0;
c->freed = 0;
for (i = 0; i < Nqwds; i++)
c->qused[i] = 0;
c->s = elemcnt;
c->e = elemsize;
_threaddebug(DBGCHAN, "chaninit %lux", c);
return 1;
}
Channel *
chancreate(int elemsize, int elemcnt) {
Channel *c;
if(elemcnt < 0 || elemsize <= 0)
return nil;
c = _threadmalloc(sizeof(Channel) + elemcnt * elemsize, 1);
c->s = elemcnt;
c->e = elemsize;
_threaddebug(DBGCHAN, "chancreate %lux", c);
return c;
}
int
alt(Alt *alts) {
Alt *a, *xa;
Channel *c;
uchar *v;
int i, n, entry, bitno, byteno;
Proc *p;
Thread *t;
lock(&chanlock);
(*procp)->curthread->alt = alts;
(*procp)->curthread->call = Callalt;
repeat:
// Test which channels can proceed
n = 1;
a = nil;
entry = -1;
for (xa = alts; xa->op; xa++) {
if (xa->op == CHANNOP) continue;
if (xa->op == CHANNOBLK) {
if (a == nil) {
(*procp)->curthread->call = Callnil;
unlock(&chanlock);
return xa - alts;
} else
break;
}
c = xa->c;
if (c == nil)
sysfatal("alt: nil channel in entry %ld\n", xa - alts);
if ((xa->op == CHANSND && c->n < c->s) ||
(xa->op == CHANRCV && c->n)) {
// There's room to send in the channel
if (nrand(n) == 0) {
a = xa;
entry = -1;
}
n++;
} else {
// Test for blocked senders or receivers
for (i = 0; i < Nqbits; i++) {
bitno = i & Nqmask;
byteno = i >> Nqshift;
// Is it claimed?
if (
(c->qused[byteno] & (1 << bitno))
&& xa->op == (CHANSND+CHANRCV) - c->qentry[i]->op
// complementary op
&& *c->qentry[i]->tag == nil
) {
// No
if (nrand(n) == 0) {
a = xa;
entry = i;
}
n++;
break;
}
}
}
}
if (a == nil) {
// Nothing can proceed, enqueue on all channels
c = nil;
for (a = alts; a->op; a++) {
Channel *cp;
if (a->op == CHANNOP || a->op == CHANNOBLK) continue;
cp = a->c;
a->tag = &c;
i = emptyentry(cp);
cp->qentry[i] = a;
a->q[i >> Nqshift] = 1 << (i & Nqmask);
}
// And wait for the rendez vous
unlock(&chanlock);
if (_threadrendezvous((ulong)&c, 0) == ~0) {
(*procp)->curthread->call = Callnil;
return -1;
}
lock(&chanlock);
/* We rendezed-vous on channel c, dequeue from all channels
* and find the Alt struct to which c belongs
*/
a = nil;
for (xa = alts; xa->op; xa++) {
Channel *xc;
if (xa->op == CHANNOP || xa->op == CHANNOBLK) continue;
xc = xa->c;
for (i = 0; i < Nqwds; i++)
xc->qused[i] &= ~xa->q[i];
if (xc == c)
a = xa;
}
if (c->s) {
// Buffered channel, try again
sleep(0);
goto repeat;
}
unlock(&chanlock);
if (c->freed) chanfree(c);
p = *procp;
t = p->curthread;
if (t->exiting)
threadexits(nil);
(*procp)->curthread->call = Callnil;
return a - alts;
}
c = a->c;
// Channel c can proceed
if (c->s) {
// Send or receive via the buffered channel
if (a->op == CHANSND) {
v = c->v + ((c->f + c->n) % c->s) * c->e;
if (a->v)
memmove(v, a->v, c->e);
else
memset(v, 0, c->e);
c->n++;
} else {
if (a->v) {
v = c->v + (c->f % c->s) * c->e;
memmove(a->v, v, c->e);
}
c->n--;
c->f++;
}
}
if (entry < 0)
for (i = 0; i < Nqbits; i++) {
bitno = i & Nqmask;
byteno = i >> Nqshift;
if (
(c->qused[byteno] & (1 << bitno))
&& c->qentry[i]->op == (CHANSND+CHANRCV) - a->op
&& *c->qentry[i]->tag == nil
) {
// Unblock peer process
xa = c->qentry[i];
*xa->tag = c;
unlock(&chanlock);
if (_threadrendezvous((ulong)xa->tag, 0) == ~0) {
(*procp)->curthread->call = Callnil;
return -1;
}
(*procp)->curthread->call = Callnil;
return a - alts;
}
}
if (entry >= 0) {
xa = c->qentry[entry];
if (a->op == CHANSND) {
if (xa->v) {
if (a->v)
memmove(xa->v, a->v, c->e);
else
memset(xa->v, 0, c->e);
}
} else {
if (a->v) {
if (xa->v)
memmove(a->v, xa->v, c->e);
else
memset(a->v, 0, c->e);
}
}
*xa->tag = c;
unlock(&chanlock);
if (_threadrendezvous((ulong)xa->tag, 0) == ~0) {
(*procp)->curthread->call = Callnil;
return -1;
}
(*procp)->curthread->call = Callnil;
return a - alts;
}
unlock(&chanlock);
yield();
(*procp)->curthread->call = Callnil;
return a - alts;
}
int
nbrecv(Channel *c, void *v) {
Alt *a;
int i, bitno, byteno;
lock(&chanlock);
for (i = 0; i < Nqbits; i++) {
bitno = i & Nqmask;
byteno = i >> Nqshift;
a = c->qentry[i];
if (
(c->qused[byteno] & (1 << bitno))
&& a->op == CHANSND
&& *a->tag == nil
) {
*a->tag = c;
if (c->n) {
// There's an item to receive in the buffered channel
if (v)
memmove(v, c->v + (c->f % c->s) * c->e, c->e);
c->n--;
c->f++;
} else {
if (v) {
if (a->v)
memmove(v, a->v, c->e);
else
memset(v, 0, c->e);
}
}
unlock(&chanlock);
if (_threadrendezvous((ulong)a->tag, 0) == ~0)
return -1;
return 1;
}
}
if (c->n) {
// There's an item to receive in the buffered channel
if (v)
memmove(v, c->v + (c->f % c->s) * c->e, c->e);
c->n--;
c->f++;
unlock(&chanlock);
return 1;
}
unlock(&chanlock);
return 0;
}
int
recv(Channel *c, void *v) {
Alt a;
Channel *tag;
int i, byteno;
Proc *p;
Thread *t;
retry:
if (i = nbrecv(c, v))
return i;
lock(&chanlock);
// Unbuffered, or buffered but empty
tag = nil;
a.c = c;
a.v = v;
a.tag = &tag;
a.op = CHANRCV;
p = *procp;
t = p->curthread;
t->alt = &a;
t->call = Callrcv;
// enqueue on the channel
i = emptyentry(c);
byteno = i >> Nqshift;
c->qentry[i] = &a;
a.q[byteno] = 1 << (i & Nqmask);
unlock(&chanlock);
if (_threadrendezvous((ulong)&tag, 0) == ~0) {
t->call = Callnil;
return -1;
}
lock(&chanlock);
// dequeue from the channel
c->qused[byteno] &= ~a.q[byteno];
unlock(&chanlock);
if (c->s) goto retry; // Buffered channels: try the queue again
if (c->freed) chanfree(c);
t->call = Callnil;
if (t->exiting)
threadexits(nil);
return 1;
}
int
nbsend(Channel *c, void *v) {
Alt *a;
int i, bitno, byteno;
lock(&chanlock);
for (i = 0; i < Nqbits; i++) {
bitno = i & Nqmask;
byteno = i >> Nqshift;
a = c->qentry[i];
if (
(c->qused[byteno] & 1 << bitno)
&& a->op == CHANRCV
&& *a->tag == nil
) {
*a->tag = c;
if (c->n < c->s) {
// There's room to send in the buffered channel
if (v)
memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
else
memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
c->n++;
} else {
if (a->v) {
if (v)
memmove(a->v, v, c->e);
else
memset(a->v, 0, c->e);
}
}
unlock(&chanlock);
if (_threadrendezvous((ulong)a->tag, 0) == ~0)
return -1;
return 1;
}
}
if (c->n < c->s) {
// There's room to send in the buffered channel
if (v)
memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
else
memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
c->n++;
unlock(&chanlock);
yield();
return 1;
}
unlock(&chanlock);
return 0;
}
int
send(Channel *c, void *v) {
Alt a;
Channel *tag;
int i, byteno;
Proc *p;
Thread *t;
retry:
if (i = nbsend(c, v))
return i;
lock(&chanlock);
tag = nil;
a.c = c;
a.v = v;
a.tag = &tag;
a.op = CHANSND;
p = *procp;
t = p->curthread;
t->alt = &a;
t->call = Callsnd;
// enqueue on the channel
i = emptyentry(c);
byteno = i >> Nqshift;
c->qentry[i] = &a;
a.q[byteno] = 1 << (i & Nqmask);
unlock(&chanlock);
if (_threadrendezvous((ulong)&tag, 0) == ~0) {
t->call = Callnil;
return -1;
}
lock(&chanlock);
// dequeue from the channel
c->qused[byteno] &= ~a.q[byteno];
unlock(&chanlock);
if (c->s)
goto retry; // Buffered channels: try the queue again
// Unbuffered channels: data is already transferred
if (c->freed) chanfree(c);
t->call = Callnil;
if (t->exiting)
threadexits(nil);
return 1;
}
int
sendul(Channel *c, ulong v) {
threadassert(c->e == sizeof(ulong));
return send(c, &v);
}
ulong
recvul(Channel *c) {
ulong v;
threadassert(c->e == sizeof(ulong));
if (recv(c, &v) < 0)
return ~0;
return v;
}
int
sendp(Channel *c, void *v) {
threadassert(c->e == sizeof(void *));
return send(c, &v);
}
void *
recvp(Channel *c) {
void * v;
threadassert(c->e == sizeof(void *));
if (recv(c, &v) < 0)
return nil;
return v;
}
int
nbsendul(Channel *c, ulong v) {
threadassert(c->e == sizeof(ulong));
return nbsend(c, &v);
}
ulong
nbrecvul(Channel *c) {
ulong v;
threadassert(c->e == sizeof(ulong));
if (nbrecv(c, &v) == 0)
return 0;
return v;
}
int
nbsendp(Channel *c, void *v) {
threadassert(c->e == sizeof(void *));
return nbsend(c, &v);
}
void *
nbrecvp(Channel *c) {
void * v;
threadassert(c->e == sizeof(void *));
if (nbrecv(c, &v) == 0)
return nil;
return v;
}
[-- Attachment #3: thread.h --]
[-- Type: text/plain, Size: 3196 bytes --]
#pragma src "/sys/src/libthread"
#pragma lib "libthread.a"
#pragma varargck argpos threadprint 2
#pragma varargck argpos chanprint 2
typedef struct Proc Proc;
typedef struct Thread Thread;
typedef struct Tqueue Tqueue;
typedef struct Alt Alt;
typedef struct Channel Channel;
typedef struct Ref Ref;
/* Channel structure. S is the size of the buffer. For unbuffered channels
* s is zero. v is an array of s values. If s is zero, v is unused.
* f and n represent the state of the queue pointed to by v.
* rcvrs and sndrs must be initialized to nil and should not be touched
* by code outside channel.c
*/
enum {
Nqwds = 2,
Nqshift = 5,
Nqbits = (1 << Nqshift) * Nqwds,
Nqmask = (1 << Nqshift) - 1,
};
struct Channel {
int s; // Size of the channel (may be zero)
uint f; // Extraction point (insertion pt: (f + n) % s)
uint n; // Number of values in the channel
int e; // Element size
int freed; // Set when channel is being deleted
ulong qused[Nqwds]; // Bitmap of used entries in rcvrs
Alt * qentry[Nqbits]; // Receivers/senders waiting
uchar v[1]; // Array of max(1, s) values in the channel
};
/* Channel operations for alt: */
enum{
CHANEND,
CHANSND,
CHANRCV,
CHANNOP,
CHANNOBLK,
};
struct Alt {
Channel *c; /* channel */
void *v; /* pointer to value */
int op; /* operation 0 == send, 1 == receive */
/* the next variables are used internally to alt
* they need not be initialized
*/
Channel **tag; /* pointer to rendez-vous tag */
ulong q[Nqwds];
};
struct Ref {
long ref;
};
int alt(Alt alts[]);
Channel* chancreate(int elemsize, int bufsize);
int chaninit(Channel *c, int elemsize, int elemcnt);
void chanfree(Channel *c);
int chanprint(Channel *, char *, ...);
long decref(Ref *r); /* returns 0 iff value is now zero */
void incref(Ref *r);
int nbrecv(Channel *c, void *v);
void* nbrecvp(Channel *c);
ulong nbrecvul(Channel *c);
int nbsend(Channel *c, void *v);
int nbsendp(Channel *c, void *v);
int nbsendul(Channel *c, ulong v);
int proccreate(void (*f)(void *arg), void *arg, uint stacksize);
int procrfork(void (*f)(void *arg), void *arg, uint stacksize, int flag);
ulong * procdata(void);
void procexec(Channel *, char *, char *[]);
void procexecl(Channel *, char *, ...);
int recv(Channel *c, void *v);
void* recvp(Channel *c);
ulong recvul(Channel *c);
int send(Channel *c, void *v);
int sendp(Channel *c, void *v);
int sendul(Channel *c, ulong v);
int threadcreate(void (*f)(void *arg), void *arg, uint stacksize);
ulong* threaddata(void);
void threadexits(char *);
void threadexitsall(char *);
int threadgetgrp(void); /* return thread group of current thread */
char* threadgetname(void);
void threadkill(int); /* kill thread */
void threadkillgrp(int); /* kill threads in group */
void threadmain(int argc, char *argv[]);
void threadnonotes(void);
int threadid(void);
int threadpid(int);
int threadprint(int, char*, ...);
int threadsetgrp(int); /* set thread group, return old */
void threadsetname(char *name);
Channel* threadwaitchan(void);
void yield(void);
next reply other threads:[~2001-11-20 21:15 UTC|newest]
Thread overview: 7+ messages / expand[flat|nested] mbox.gz Atom feed top
2001-11-20 21:15 Sape Mullender [this message]
-- strict thread matches above, loose matches on Subject: below --
2001-11-21 14:34 Fco.J.Ballesteros
2001-11-21 13:24 rog
2001-11-21 9:08 Fco.J.Ballesteros
2001-11-20 20:51 David Gordon Hogan
2001-11-20 14:03 Sape Mullender
2001-11-20 9:35 Fco.J.Ballesteros
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20011120211511.146EE19A4F@mail.cse.psu.edu \
--to=sape@plan9.bell-labs.com \
--cc=9fans@cse.psu.edu \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).