* Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-20 20:51 David Gordon Hogan
0 siblings, 0 replies; 7+ messages in thread
From: David Gordon Hogan @ 2001-11-20 20:51 UTC (permalink / raw)
To: 9fans
> You are correct. It is possible to get into trouble when there are more
> than 32 threads reading on a channel. I'll increase the limit and make sure
> that going over the limit is flagged.
I'm sorry, but why is there a limit?
^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-21 14:34 Fco.J.Ballesteros
0 siblings, 0 replies; 7+ messages in thread
From: Fco.J.Ballesteros @ 2001-11-21 14:34 UTC (permalink / raw)
To: 9fans
I also use them to serialize calls, but more than 64 waiting
might mean that the server is overloaded, and perhaps should do some
handover through other chans. But probably all this is a matter of taste.
^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-21 13:24 rog
0 siblings, 0 replies; 7+ messages in thread
From: rog @ 2001-11-21 13:24 UTC (permalink / raw)
To: 9fans
> IMHO, the limit is not bad, since it warns you of high contention on a
> channel, which could mean that you'd better split the work somehow.
you might want this... it's often useful to use channels as a
serialisation mechanism. in Limbo, i've often had potentially more
than 64 processes blocking on a channel.
i haven't had more than a brief look at the code, but what's the
reason threads blocked on a channel can't be linked together directly
rather than placed in a limited size array?
rog.
^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-21 9:08 Fco.J.Ballesteros
0 siblings, 0 replies; 7+ messages in thread
From: Fco.J.Ballesteros @ 2001-11-21 9:08 UTC (permalink / raw)
To: 9fans
: We use threaded programs very heavily here but we've never exceeded that
: particular limit. One problem is that allocating (malloc/realloc) the wait list
: causes incredible race conditions.
IMHO, the limit is not bad, since it warns you of high contention on a
channel, which could mean that you'd better split the work somehow.
thanks, I'll merge your source with mine.
^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-20 21:15 Sape Mullender
0 siblings, 0 replies; 7+ messages in thread
From: Sape Mullender @ 2001-11-20 21:15 UTC (permalink / raw)
To: 9fans
[-- Attachment #1: Type: text/plain, Size: 334 bytes --]
Here are some fixes for channel.c. You can now have up to 64 waiting
threads. Overflow is now flagged. Let me know how you fare.
You probably need to integrate my thread.c with yours; we use a new
version of 9p here with lots of changed interfaces. The data structures
that matter are the Channel and Alt structs.
Sape
[-- Attachment #2: channel.c --]
[-- Type: text/plain, Size: 10077 bytes --]
#include <u.h>
#include <libc.h>
#include "assert.h"
#include "threadimpl.h"
static Lock chanlock; // Central channel access lock
static int
emptyentry(Channel *c)
{
int i, bitno, byteno;
for (i = 0; i < Nqbits; i++) {
bitno = i & Nqmask;
byteno = i >> Nqshift;
if ((c->qused[byteno] & 1 << bitno) == 0){
c->qused[byteno] |= 1 << bitno;
return i;
}
}
sysfatal("alt: too many waiting threads on channel 0x%p", c);
return -1;
}
void
chanfree(Channel *c) {
int i, inuse;
lock(&chanlock);
inuse = 0;
for (i = 0; i < Nqwds; i++)
if (c->qused[i]) inuse = 1;
if (inuse)
c->freed = 1;
else
free(c);
unlock(&chanlock);
}
int
chaninit(Channel *c, int elemsize, int elemcnt) {
int i;
if(elemcnt < 0 || elemsize <= 0 || c == nil)
return -1;
c->f = 0;
c->n = 0;
c->freed = 0;
for (i = 0; i < Nqwds; i++)
c->qused[i] = 0;
c->s = elemcnt;
c->e = elemsize;
_threaddebug(DBGCHAN, "chaninit %lux", c);
return 1;
}
Channel *
chancreate(int elemsize, int elemcnt) {
Channel *c;
if(elemcnt < 0 || elemsize <= 0)
return nil;
c = _threadmalloc(sizeof(Channel) + elemcnt * elemsize, 1);
c->s = elemcnt;
c->e = elemsize;
_threaddebug(DBGCHAN, "chancreate %lux", c);
return c;
}
int
alt(Alt *alts) {
Alt *a, *xa;
Channel *c;
uchar *v;
int i, n, entry, bitno, byteno;
Proc *p;
Thread *t;
lock(&chanlock);
(*procp)->curthread->alt = alts;
(*procp)->curthread->call = Callalt;
repeat:
// Test which channels can proceed
n = 1;
a = nil;
entry = -1;
for (xa = alts; xa->op; xa++) {
if (xa->op == CHANNOP) continue;
if (xa->op == CHANNOBLK) {
if (a == nil) {
(*procp)->curthread->call = Callnil;
unlock(&chanlock);
return xa - alts;
} else
break;
}
c = xa->c;
if (c == nil)
sysfatal("alt: nil channel in entry %ld\n", xa - alts);
if ((xa->op == CHANSND && c->n < c->s) ||
(xa->op == CHANRCV && c->n)) {
// There's room to send in the channel
if (nrand(n) == 0) {
a = xa;
entry = -1;
}
n++;
} else {
// Test for blocked senders or receivers
for (i = 0; i < Nqbits; i++) {
bitno = i & Nqmask;
byteno = i >> Nqshift;
// Is it claimed?
if (
(c->qused[byteno] & (1 << bitno))
&& xa->op == (CHANSND+CHANRCV) - c->qentry[i]->op
// complementary op
&& *c->qentry[i]->tag == nil
) {
// No
if (nrand(n) == 0) {
a = xa;
entry = i;
}
n++;
break;
}
}
}
}
if (a == nil) {
// Nothing can proceed, enqueue on all channels
c = nil;
for (a = alts; a->op; a++) {
Channel *cp;
if (a->op == CHANNOP || a->op == CHANNOBLK) continue;
cp = a->c;
a->tag = &c;
i = emptyentry(cp);
cp->qentry[i] = a;
a->q[i >> Nqshift] = 1 << (i & Nqmask);
}
// And wait for the rendez vous
unlock(&chanlock);
if (_threadrendezvous((ulong)&c, 0) == ~0) {
(*procp)->curthread->call = Callnil;
return -1;
}
lock(&chanlock);
/* We rendezed-vous on channel c, dequeue from all channels
* and find the Alt struct to which c belongs
*/
a = nil;
for (xa = alts; xa->op; xa++) {
Channel *xc;
if (xa->op == CHANNOP || xa->op == CHANNOBLK) continue;
xc = xa->c;
for (i = 0; i < Nqwds; i++)
xc->qused[i] &= ~xa->q[i];
if (xc == c)
a = xa;
}
if (c->s) {
// Buffered channel, try again
sleep(0);
goto repeat;
}
unlock(&chanlock);
if (c->freed) chanfree(c);
p = *procp;
t = p->curthread;
if (t->exiting)
threadexits(nil);
(*procp)->curthread->call = Callnil;
return a - alts;
}
c = a->c;
// Channel c can proceed
if (c->s) {
// Send or receive via the buffered channel
if (a->op == CHANSND) {
v = c->v + ((c->f + c->n) % c->s) * c->e;
if (a->v)
memmove(v, a->v, c->e);
else
memset(v, 0, c->e);
c->n++;
} else {
if (a->v) {
v = c->v + (c->f % c->s) * c->e;
memmove(a->v, v, c->e);
}
c->n--;
c->f++;
}
}
if (entry < 0)
for (i = 0; i < Nqbits; i++) {
bitno = i & Nqmask;
byteno = i >> Nqshift;
if (
(c->qused[byteno] & (1 << bitno))
&& c->qentry[i]->op == (CHANSND+CHANRCV) - a->op
&& *c->qentry[i]->tag == nil
) {
// Unblock peer process
xa = c->qentry[i];
*xa->tag = c;
unlock(&chanlock);
if (_threadrendezvous((ulong)xa->tag, 0) == ~0) {
(*procp)->curthread->call = Callnil;
return -1;
}
(*procp)->curthread->call = Callnil;
return a - alts;
}
}
if (entry >= 0) {
xa = c->qentry[entry];
if (a->op == CHANSND) {
if (xa->v) {
if (a->v)
memmove(xa->v, a->v, c->e);
else
memset(xa->v, 0, c->e);
}
} else {
if (a->v) {
if (xa->v)
memmove(a->v, xa->v, c->e);
else
memset(a->v, 0, c->e);
}
}
*xa->tag = c;
unlock(&chanlock);
if (_threadrendezvous((ulong)xa->tag, 0) == ~0) {
(*procp)->curthread->call = Callnil;
return -1;
}
(*procp)->curthread->call = Callnil;
return a - alts;
}
unlock(&chanlock);
yield();
(*procp)->curthread->call = Callnil;
return a - alts;
}
int
nbrecv(Channel *c, void *v) {
Alt *a;
int i, bitno, byteno;
lock(&chanlock);
for (i = 0; i < Nqbits; i++) {
bitno = i & Nqmask;
byteno = i >> Nqshift;
a = c->qentry[i];
if (
(c->qused[byteno] & (1 << bitno))
&& a->op == CHANSND
&& *a->tag == nil
) {
*a->tag = c;
if (c->n) {
// There's an item to receive in the buffered channel
if (v)
memmove(v, c->v + (c->f % c->s) * c->e, c->e);
c->n--;
c->f++;
} else {
if (v) {
if (a->v)
memmove(v, a->v, c->e);
else
memset(v, 0, c->e);
}
}
unlock(&chanlock);
if (_threadrendezvous((ulong)a->tag, 0) == ~0)
return -1;
return 1;
}
}
if (c->n) {
// There's an item to receive in the buffered channel
if (v)
memmove(v, c->v + (c->f % c->s) * c->e, c->e);
c->n--;
c->f++;
unlock(&chanlock);
return 1;
}
unlock(&chanlock);
return 0;
}
int
recv(Channel *c, void *v) {
Alt a;
Channel *tag;
int i, byteno;
Proc *p;
Thread *t;
retry:
if (i = nbrecv(c, v))
return i;
lock(&chanlock);
// Unbuffered, or buffered but empty
tag = nil;
a.c = c;
a.v = v;
a.tag = &tag;
a.op = CHANRCV;
p = *procp;
t = p->curthread;
t->alt = &a;
t->call = Callrcv;
// enqueue on the channel
i = emptyentry(c);
byteno = i >> Nqshift;
c->qentry[i] = &a;
a.q[byteno] = 1 << (i & Nqmask);
unlock(&chanlock);
if (_threadrendezvous((ulong)&tag, 0) == ~0) {
t->call = Callnil;
return -1;
}
lock(&chanlock);
// dequeue from the channel
c->qused[byteno] &= ~a.q[byteno];
unlock(&chanlock);
if (c->s) goto retry; // Buffered channels: try the queue again
if (c->freed) chanfree(c);
t->call = Callnil;
if (t->exiting)
threadexits(nil);
return 1;
}
int
nbsend(Channel *c, void *v) {
Alt *a;
int i, bitno, byteno;
lock(&chanlock);
for (i = 0; i < Nqbits; i++) {
bitno = i & Nqmask;
byteno = i >> Nqshift;
a = c->qentry[i];
if (
(c->qused[byteno] & 1 << bitno)
&& a->op == CHANRCV
&& *a->tag == nil
) {
*a->tag = c;
if (c->n < c->s) {
// There's room to send in the buffered channel
if (v)
memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
else
memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
c->n++;
} else {
if (a->v) {
if (v)
memmove(a->v, v, c->e);
else
memset(a->v, 0, c->e);
}
}
unlock(&chanlock);
if (_threadrendezvous((ulong)a->tag, 0) == ~0)
return -1;
return 1;
}
}
if (c->n < c->s) {
// There's room to send in the buffered channel
if (v)
memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
else
memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
c->n++;
unlock(&chanlock);
yield();
return 1;
}
unlock(&chanlock);
return 0;
}
int
send(Channel *c, void *v) {
Alt a;
Channel *tag;
int i, byteno;
Proc *p;
Thread *t;
retry:
if (i = nbsend(c, v))
return i;
lock(&chanlock);
tag = nil;
a.c = c;
a.v = v;
a.tag = &tag;
a.op = CHANSND;
p = *procp;
t = p->curthread;
t->alt = &a;
t->call = Callsnd;
// enqueue on the channel
i = emptyentry(c);
byteno = i >> Nqshift;
c->qentry[i] = &a;
a.q[byteno] = 1 << (i & Nqmask);
unlock(&chanlock);
if (_threadrendezvous((ulong)&tag, 0) == ~0) {
t->call = Callnil;
return -1;
}
lock(&chanlock);
// dequeue from the channel
c->qused[byteno] &= ~a.q[byteno];
unlock(&chanlock);
if (c->s)
goto retry; // Buffered channels: try the queue again
// Unbuffered channels: data is already transferred
if (c->freed) chanfree(c);
t->call = Callnil;
if (t->exiting)
threadexits(nil);
return 1;
}
int
sendul(Channel *c, ulong v) {
threadassert(c->e == sizeof(ulong));
return send(c, &v);
}
ulong
recvul(Channel *c) {
ulong v;
threadassert(c->e == sizeof(ulong));
if (recv(c, &v) < 0)
return ~0;
return v;
}
int
sendp(Channel *c, void *v) {
threadassert(c->e == sizeof(void *));
return send(c, &v);
}
void *
recvp(Channel *c) {
void * v;
threadassert(c->e == sizeof(void *));
if (recv(c, &v) < 0)
return nil;
return v;
}
int
nbsendul(Channel *c, ulong v) {
threadassert(c->e == sizeof(ulong));
return nbsend(c, &v);
}
ulong
nbrecvul(Channel *c) {
ulong v;
threadassert(c->e == sizeof(ulong));
if (nbrecv(c, &v) == 0)
return 0;
return v;
}
int
nbsendp(Channel *c, void *v) {
threadassert(c->e == sizeof(void *));
return nbsend(c, &v);
}
void *
nbrecvp(Channel *c) {
void * v;
threadassert(c->e == sizeof(void *));
if (nbrecv(c, &v) == 0)
return nil;
return v;
}
[-- Attachment #3: thread.h --]
[-- Type: text/plain, Size: 3196 bytes --]
#pragma src "/sys/src/libthread"
#pragma lib "libthread.a"
#pragma varargck argpos threadprint 2
#pragma varargck argpos chanprint 2
typedef struct Proc Proc;
typedef struct Thread Thread;
typedef struct Tqueue Tqueue;
typedef struct Alt Alt;
typedef struct Channel Channel;
typedef struct Ref Ref;
/* Channel structure. S is the size of the buffer. For unbuffered channels
* s is zero. v is an array of s values. If s is zero, v is unused.
* f and n represent the state of the queue pointed to by v.
* rcvrs and sndrs must be initialized to nil and should not be touched
* by code outside channel.c
*/
enum {
Nqwds = 2,
Nqshift = 5,
Nqbits = (1 << Nqshift) * Nqwds,
Nqmask = (1 << Nqshift) - 1,
};
struct Channel {
int s; // Size of the channel (may be zero)
uint f; // Extraction point (insertion pt: (f + n) % s)
uint n; // Number of values in the channel
int e; // Element size
int freed; // Set when channel is being deleted
ulong qused[Nqwds]; // Bitmap of used entries in rcvrs
Alt * qentry[Nqbits]; // Receivers/senders waiting
uchar v[1]; // Array of max(1, s) values in the channel
};
/* Channel operations for alt: */
enum{
CHANEND,
CHANSND,
CHANRCV,
CHANNOP,
CHANNOBLK,
};
struct Alt {
Channel *c; /* channel */
void *v; /* pointer to value */
int op; /* operation 0 == send, 1 == receive */
/* the next variables are used internally to alt
* they need not be initialized
*/
Channel **tag; /* pointer to rendez-vous tag */
ulong q[Nqwds];
};
struct Ref {
long ref;
};
int alt(Alt alts[]);
Channel* chancreate(int elemsize, int bufsize);
int chaninit(Channel *c, int elemsize, int elemcnt);
void chanfree(Channel *c);
int chanprint(Channel *, char *, ...);
long decref(Ref *r); /* returns 0 iff value is now zero */
void incref(Ref *r);
int nbrecv(Channel *c, void *v);
void* nbrecvp(Channel *c);
ulong nbrecvul(Channel *c);
int nbsend(Channel *c, void *v);
int nbsendp(Channel *c, void *v);
int nbsendul(Channel *c, ulong v);
int proccreate(void (*f)(void *arg), void *arg, uint stacksize);
int procrfork(void (*f)(void *arg), void *arg, uint stacksize, int flag);
ulong * procdata(void);
void procexec(Channel *, char *, char *[]);
void procexecl(Channel *, char *, ...);
int recv(Channel *c, void *v);
void* recvp(Channel *c);
ulong recvul(Channel *c);
int send(Channel *c, void *v);
int sendp(Channel *c, void *v);
int sendul(Channel *c, ulong v);
int threadcreate(void (*f)(void *arg), void *arg, uint stacksize);
ulong* threaddata(void);
void threadexits(char *);
void threadexitsall(char *);
int threadgetgrp(void); /* return thread group of current thread */
char* threadgetname(void);
void threadkill(int); /* kill thread */
void threadkillgrp(int); /* kill threads in group */
void threadmain(int argc, char *argv[]);
void threadnonotes(void);
int threadid(void);
int threadpid(int);
int threadprint(int, char*, ...);
int threadsetgrp(int); /* set thread group, return old */
void threadsetname(char *name);
Channel* threadwaitchan(void);
void yield(void);
^ permalink raw reply [flat|nested] 7+ messages in thread
* Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-20 14:03 Sape Mullender
0 siblings, 0 replies; 7+ messages in thread
From: Sape Mullender @ 2001-11-20 14:03 UTC (permalink / raw)
To: 9fans
You are correct. It is possible to get into trouble when there are more
than 32 threads reading on a channel. I'll increase the limit and make sure
that going over the limit is flagged.
We use threaded programs very heavily here but we've never exceeded that
particular limit. One problem is that allocating (malloc/realloc) the wait list
causes incredible race conditions.
Sape
^ permalink raw reply [flat|nested] 7+ messages in thread
* [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-20 9:35 Fco.J.Ballesteros
0 siblings, 0 replies; 7+ messages in thread
From: Fco.J.Ballesteros @ 2001-11-20 9:35 UTC (permalink / raw)
To: 9fans
Hi,
while debugging a program which uses thread(2), I've come to a point
where an apparently correct program causes stack corruption on one of
the threads. Tracking down the problem, I've found that the program I
show below seems to exceed a silent limit in the implementation of
channels. I think that if enough pressure is put on a channel it can
corrupt your stack without a warning. Please, correct me if I'm wrong,
I'd be happy to get any info regarding what's going on.
The point where I think the channel has problems is (libthread/channel.c):
// enqueue on the channel
for (i = 0; i < 32; i++)
if ((c->qused & (1 << i)) == 0) {
c->qentry[i] = &a;
c->qused |= a.q = 1 << i;
break;
}
In case I'm right, I'd suggest calling
abort() when the limit is exceeded.
This is the stack I get:
acid: At pc:0x00002eb2:sleep+0x7 /sys/src/libc/9syscall/sleep.s:5
sleep() /sys/src/libc/9syscall/sleep.s:3
called from main+0x197 /sys/src/libthread/thread.c:500
main(argc=0x00000001,argv=0x7fffefec) /sys/src/libthread/thread.c:459
called from _main+0x31 /sys/src/libc/386/main9.s:16
acid:
and this is smallest the program I got that presents the problem:
Channel c;
int t;
void s(void *p)
{
for(;;){
if (t)
break;
recv(&c, nil);
}
}
void r(void *p)
{
int i = 1;
for(;;){
send(&c, &i);
if (t)
break;
}
}
void
threadmain()
{
int i;
chaninit(&c, sizeof(int), 0);
for (i = 0; i < 50; i++)
proccreate(r, (void*)i, 16*1024);
for (i = 0; i < 50; i++)
proccreate(s, (void*)i, 16*1024);
sleep(5*1000);
t = 1;
}
^ permalink raw reply [flat|nested] 7+ messages in thread
end of thread, other threads:[~2001-11-21 14:34 UTC | newest]
Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2001-11-20 20:51 [9fans] silent limit in thread(2) Channels which can corrupt the stack? David Gordon Hogan
-- strict thread matches above, loose matches on Subject: below --
2001-11-21 14:34 Fco.J.Ballesteros
2001-11-21 13:24 rog
2001-11-21 9:08 Fco.J.Ballesteros
2001-11-20 21:15 Sape Mullender
2001-11-20 14:03 Sape Mullender
2001-11-20 9:35 Fco.J.Ballesteros
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).