9fans - fans of the OS Plan 9 from Bell Labs
 help / color / mirror / Atom feed
* Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-20 14:03 Sape Mullender
  0 siblings, 0 replies; 7+ messages in thread
From: Sape Mullender @ 2001-11-20 14:03 UTC (permalink / raw)
  To: 9fans

You are correct.  It is possible to get into trouble when there are more
than 32 threads reading on a channel.  I'll increase the limit and make sure
that going over the limit is flagged.

We use threaded programs very heavily here but we've never exceeded that
particular limit.  One problem is that allocating (malloc/realloc) the wait list
causes incredible race conditions.

	Sape



^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-21 14:34 Fco.J.Ballesteros
  0 siblings, 0 replies; 7+ messages in thread
From: Fco.J.Ballesteros @ 2001-11-21 14:34 UTC (permalink / raw)
  To: 9fans

I also use them to serialize calls, but more than 64 waiting
might mean that the server is overloaded, and perhaps should do some
handover through other chans. But probably all this is a matter of taste.


^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-21 13:24 rog
  0 siblings, 0 replies; 7+ messages in thread
From: rog @ 2001-11-21 13:24 UTC (permalink / raw)
  To: 9fans

> IMHO, the limit is not bad, since it warns you of high contention on a
> channel, which could mean that you'd better split the work somehow.

you might want this...  it's often useful to use channels as a
serialisation mechanism.  in Limbo, i've often had potentially more
than 64 processes blocking on a channel.

i haven't had more than a brief look at the code, but what's the
reason threads blocked on a channel can't be linked together directly
rather than placed in a limited size array?

  rog.



^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-21  9:08 Fco.J.Ballesteros
  0 siblings, 0 replies; 7+ messages in thread
From: Fco.J.Ballesteros @ 2001-11-21  9:08 UTC (permalink / raw)
  To: 9fans

:  We use threaded programs very heavily here but we've never exceeded that
:  particular limit.  One problem is that allocating (malloc/realloc) the wait list
:  causes incredible race conditions.

IMHO, the limit is not bad, since it warns you of high contention on a
channel, which could mean that you'd better split the work somehow.

thanks, I'll merge your source with mine.



^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-20 21:15 Sape Mullender
  0 siblings, 0 replies; 7+ messages in thread
From: Sape Mullender @ 2001-11-20 21:15 UTC (permalink / raw)
  To: 9fans

[-- Attachment #1: Type: text/plain, Size: 334 bytes --]

Here are some fixes for channel.c.  You can now have up to 64 waiting
threads.  Overflow is now flagged.  Let me know how you fare.
You probably need to integrate my thread.c with yours; we use a new
version of 9p here with lots of changed interfaces.  The data structures
that matter are the Channel and Alt structs.

	Sape


[-- Attachment #2: channel.c --]
[-- Type: text/plain, Size: 10077 bytes --]

#include <u.h>
#include <libc.h>
#include "assert.h"
#include "threadimpl.h"

static Lock chanlock;		// Central channel access lock

static int
emptyentry(Channel *c)
{
	int i, bitno, byteno;

	for (i = 0; i < Nqbits; i++) {
		bitno = i & Nqmask;
		byteno = i >> Nqshift;
		if ((c->qused[byteno] & 1 << bitno) == 0){
			c->qused[byteno] |= 1 << bitno;
			return i;
		}
	}
	sysfatal("alt: too many waiting threads on channel 0x%p", c);
	return -1;
}

void
chanfree(Channel *c) {
	int i, inuse;

	lock(&chanlock);
	inuse = 0;
	for (i = 0; i < Nqwds; i++)
		if (c->qused[i]) inuse = 1;
	if (inuse)
		c->freed = 1;
	else
		free(c);
	unlock(&chanlock);
}

int
chaninit(Channel *c, int elemsize, int elemcnt) {
	int i;

	if(elemcnt < 0 || elemsize <= 0 || c == nil)
		return -1;
	c->f = 0;
	c->n = 0;
	c->freed = 0;
	for (i = 0; i < Nqwds; i++)
		c->qused[i] = 0;
	c->s = elemcnt;
	c->e = elemsize;
	_threaddebug(DBGCHAN, "chaninit %lux", c);
	return 1;
}

Channel *
chancreate(int elemsize, int elemcnt) {
	Channel *c;

	if(elemcnt < 0 || elemsize <= 0)
		return nil;
	c = _threadmalloc(sizeof(Channel) + elemcnt * elemsize, 1);
	c->s = elemcnt;
	c->e = elemsize;
	_threaddebug(DBGCHAN, "chancreate %lux", c);
	return c;
}

int
alt(Alt *alts) {
	Alt *a, *xa;
	Channel *c;
	uchar *v;
	int i, n, entry, bitno, byteno;
	Proc *p;
	Thread *t;

	lock(&chanlock);

	(*procp)->curthread->alt = alts;
	(*procp)->curthread->call = Callalt;
repeat:

	// Test which channels can proceed
	n = 1;
	a = nil;
	entry = -1;
	for (xa = alts; xa->op; xa++) {
		if (xa->op == CHANNOP) continue;
		if (xa->op == CHANNOBLK) {
			if (a == nil) {
				(*procp)->curthread->call = Callnil;
				unlock(&chanlock);
				return xa - alts;
			} else
				break;
		}

		c = xa->c;
		if (c == nil)
			sysfatal("alt: nil channel in entry %ld\n", xa - alts);
		if ((xa->op == CHANSND && c->n < c->s) ||
			(xa->op == CHANRCV && c->n)) {
				// There's room to send in the channel
				if (nrand(n) == 0) {
					a = xa;
					entry = -1;
				}
				n++;
		} else {
			// Test for blocked senders or receivers
			for (i = 0; i < Nqbits; i++) {
				bitno = i & Nqmask;
				byteno = i >> Nqshift;
				// Is it claimed?
				if (
					(c->qused[byteno] & (1 << bitno))
					&& xa->op == (CHANSND+CHANRCV) - c->qentry[i]->op
							// complementary op
					&& *c->qentry[i]->tag == nil
				) {
					// No
					if (nrand(n) == 0) {
						a = xa;
						entry = i;
					}
					n++;
					break;
				}
			}
		}
	}

	if (a == nil) {
		// Nothing can proceed, enqueue on all channels
		c = nil;
		for (a = alts; a->op; a++) {
			Channel *cp;

			if (a->op == CHANNOP || a->op == CHANNOBLK) continue;
			cp = a->c;
			a->tag = &c;
			i = emptyentry(cp);
			cp->qentry[i] = a;
			a->q[i >> Nqshift] = 1 << (i & Nqmask);
		}

		// And wait for the rendez vous
		unlock(&chanlock);
		if (_threadrendezvous((ulong)&c, 0) == ~0) {
			(*procp)->curthread->call = Callnil;
			return -1;
		}

		lock(&chanlock);

		/* We rendezed-vous on channel c, dequeue from all channels
		 * and find the Alt struct to which c belongs
		 */
		a = nil;
		for (xa = alts; xa->op; xa++) {
			Channel *xc;

			if (xa->op == CHANNOP || xa->op == CHANNOBLK) continue;
			xc = xa->c;
			for (i = 0; i < Nqwds; i++)
				xc->qused[i] &= ~xa->q[i];
			if (xc == c)
				a = xa;

		}

		if (c->s) {
			// Buffered channel, try again
			sleep(0);
			goto repeat;
		}

		unlock(&chanlock);

		if (c->freed) chanfree(c);

		p = *procp;
		t = p->curthread;
		if (t->exiting)
			threadexits(nil);
		(*procp)->curthread->call = Callnil;
		return a - alts;
	}

	c = a->c;
	// Channel c can proceed

	if (c->s) {
		// Send or receive via the buffered channel
		if (a->op == CHANSND) {
			v = c->v + ((c->f + c->n) % c->s) * c->e;
			if (a->v)
				memmove(v, a->v, c->e);
			else
				memset(v, 0, c->e);
			c->n++;
		} else {
			if (a->v) {
				v = c->v + (c->f % c->s) * c->e;
				memmove(a->v, v, c->e);
			}
			c->n--;
			c->f++;
		}
	}
	if (entry < 0)
		for (i = 0; i < Nqbits; i++) {
			bitno = i & Nqmask;
			byteno = i >> Nqshift;
			if (
				(c->qused[byteno] & (1 << bitno))
				&& c->qentry[i]->op == (CHANSND+CHANRCV) - a->op
				&& *c->qentry[i]->tag == nil
			) {
				// Unblock peer process
				xa = c->qentry[i];
				*xa->tag = c;

				unlock(&chanlock);
				if (_threadrendezvous((ulong)xa->tag, 0) == ~0) {
					(*procp)->curthread->call = Callnil;
					return -1;
				}
				(*procp)->curthread->call = Callnil;
				return a - alts;
			}
		}
	if (entry >= 0) {
		xa = c->qentry[entry];
		if (a->op == CHANSND) {
			if (xa->v) {
				if (a->v)
					memmove(xa->v, a->v, c->e);
				else
					memset(xa->v, 0, c->e);
			}
		} else {
			if (a->v) {
				if (xa->v)
					memmove(a->v, xa->v, c->e);
				else
					memset(a->v, 0, c->e);
			}
		}
		*xa->tag = c;

		unlock(&chanlock);
		if (_threadrendezvous((ulong)xa->tag, 0) == ~0) {
			(*procp)->curthread->call = Callnil;
			return -1;
		}
		(*procp)->curthread->call = Callnil;
		return a - alts;
	}
	unlock(&chanlock);
	yield();
	(*procp)->curthread->call = Callnil;
	return a - alts;
}

int
nbrecv(Channel *c, void *v) {
	Alt *a;
	int i, bitno, byteno;

	lock(&chanlock);
	for (i = 0; i < Nqbits; i++) {
		bitno = i & Nqmask;
		byteno = i >> Nqshift;
		a = c->qentry[i];
		if (
			(c->qused[byteno] & (1 << bitno))
			&& a->op == CHANSND
			&& *a->tag == nil
		) {
			*a->tag = c;
			if (c->n) {
				// There's an item to receive in the buffered channel
				if (v)
					memmove(v, c->v + (c->f % c->s) * c->e, c->e);
				c->n--;
				c->f++;
			} else {
				if (v) {
					if (a->v)
						memmove(v, a->v, c->e);
					else
						memset(v, 0, c->e);
				}
			}

			unlock(&chanlock);
			if (_threadrendezvous((ulong)a->tag, 0) == ~0)
				return -1;
			return 1;
		}
	}
	if (c->n) {
		// There's an item to receive in the buffered channel
		if (v)
			memmove(v, c->v + (c->f % c->s) * c->e, c->e);
		c->n--;
		c->f++;
		unlock(&chanlock);
		return 1;
	}
	unlock(&chanlock);
	return 0;
}

int
recv(Channel *c, void *v) {
	Alt a;
	Channel *tag;
	int i, byteno;
	Proc *p;
	Thread *t;

retry:
	if (i = nbrecv(c, v))
		return i;
	lock(&chanlock);
	// Unbuffered, or buffered but empty
	tag = nil;
	a.c = c;
	a.v = v;
	a.tag = &tag;
	a.op = CHANRCV;
	p = *procp;
	t = p->curthread;
	t->alt = &a;
	t->call = Callrcv;

	// enqueue on the channel
	i = emptyentry(c);
	byteno = i >> Nqshift;
	c->qentry[i] = &a;
	a.q[byteno] = 1 << (i & Nqmask);
	unlock(&chanlock);
	if (_threadrendezvous((ulong)&tag, 0) == ~0) {
		t->call = Callnil;
		return -1;
	}
	lock(&chanlock);

	// dequeue from the channel
	c->qused[byteno] &= ~a.q[byteno];
	unlock(&chanlock);
	if (c->s) goto retry;	// Buffered channels: try the queue again
	if (c->freed) chanfree(c);
	t->call = Callnil;
	if (t->exiting)
		threadexits(nil);
	return 1;
}

int
nbsend(Channel *c, void *v) {
	Alt *a;
	int i, bitno, byteno;

	lock(&chanlock);
	for (i = 0; i < Nqbits; i++) {
		bitno = i & Nqmask;
		byteno = i >> Nqshift;
		a = c->qentry[i];
		if (
			(c->qused[byteno] & 1 << bitno)
			&& a->op == CHANRCV
			&& *a->tag == nil
		) {
			*a->tag = c;
			if (c->n < c->s) {
				// There's room to send in the buffered channel
				if (v)
					memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
				else
					memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
				c->n++;
			} else {
				if (a->v) {
					if (v)
						memmove(a->v, v, c->e);
					else
						memset(a->v, 0, c->e);
				}
			}

			unlock(&chanlock);
			if (_threadrendezvous((ulong)a->tag, 0) == ~0)
				return -1;
			return 1;
		}
	}
	if (c->n < c->s) {
		// There's room to send in the buffered channel
		if (v)
			memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
		else
			memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
		c->n++;
		unlock(&chanlock);
		yield();
		return 1;
	}
	unlock(&chanlock);
	return 0;
}

int
send(Channel *c, void *v) {
	Alt a;
	Channel *tag;
	int i, byteno;
	Proc *p;
	Thread *t;

retry:
	if (i = nbsend(c, v))
		return i;
	lock(&chanlock);
	tag = nil;
	a.c = c;
	a.v = v;
	a.tag = &tag;
	a.op = CHANSND;
	p = *procp;
	t = p->curthread;
	t->alt = &a;
	t->call = Callsnd;

	// enqueue on the channel
	i = emptyentry(c);
	byteno = i >> Nqshift;
	c->qentry[i] = &a;
	a.q[byteno] = 1 << (i & Nqmask);
	unlock(&chanlock);
	if (_threadrendezvous((ulong)&tag, 0) == ~0) {
		t->call = Callnil;
		return -1;
	}
	lock(&chanlock);
	// dequeue from the channel
	c->qused[byteno] &= ~a.q[byteno];
	unlock(&chanlock);
	if (c->s)
		goto retry;	// Buffered channels: try the queue again
	// Unbuffered channels: data is already transferred
	if (c->freed) chanfree(c);
	t->call = Callnil;
	if (t->exiting)
		threadexits(nil);
	return 1;
}

int
sendul(Channel *c, ulong v) {
	threadassert(c->e == sizeof(ulong));
	return send(c, &v);
}

ulong
recvul(Channel *c) {
	ulong v;

	threadassert(c->e == sizeof(ulong));
	if (recv(c, &v) < 0)
		return ~0;
	return v;
}

int
sendp(Channel *c, void *v) {
	threadassert(c->e == sizeof(void *));
	return send(c, &v);
}

void *
recvp(Channel *c) {
	void * v;

	threadassert(c->e == sizeof(void *));
	if (recv(c, &v) < 0)
		return nil;
	return v;
}

int
nbsendul(Channel *c, ulong v) {
	threadassert(c->e == sizeof(ulong));
	return nbsend(c, &v);
}

ulong
nbrecvul(Channel *c) {
	ulong v;

	threadassert(c->e == sizeof(ulong));
	if (nbrecv(c, &v) == 0)
		return 0;
	return v;
}

int
nbsendp(Channel *c, void *v) {
	threadassert(c->e == sizeof(void *));
	return nbsend(c, &v);
}

void *
nbrecvp(Channel *c) {
	void * v;

	threadassert(c->e == sizeof(void *));
	if (nbrecv(c, &v) == 0)
		return nil;
	return v;
}

[-- Attachment #3: thread.h --]
[-- Type: text/plain, Size: 3196 bytes --]

#pragma src "/sys/src/libthread"
#pragma lib "libthread.a"

#pragma	varargck	argpos	threadprint	2
#pragma	varargck	argpos	chanprint	2

typedef struct Proc	Proc;
typedef struct Thread	Thread;
typedef struct Tqueue	Tqueue;
typedef struct Alt	Alt;
typedef struct Channel	Channel;
typedef struct Ref	Ref;

/* Channel structure.  S is the size of the buffer.  For unbuffered channels
 * s is zero.  v is an array of s values.  If s is zero, v is unused.
 * f and n represent the state of the queue pointed to by v.
 * rcvrs and sndrs must be initialized to nil and should not be touched
 * by code outside channel.c
 */

enum {
	Nqwds = 2,
	Nqshift = 5,
	Nqbits = (1 << Nqshift) * Nqwds,
	Nqmask = (1 << Nqshift) - 1,
};

struct Channel {
	int		s;				// Size of the channel (may be zero)
	uint		f;				// Extraction point (insertion pt: (f + n) % s)
	uint		n;				// Number of values in the channel
	int		e;				// Element size
	int		freed;			// Set when channel is being deleted
	ulong	qused[Nqwds];		// Bitmap of used entries in rcvrs
	Alt	*	qentry[Nqbits];		// Receivers/senders waiting
	uchar	v[1];				// Array of max(1, s) values in the channel
};


/* Channel operations for alt: */
enum{
	CHANEND,
	CHANSND,
	CHANRCV,
	CHANNOP,
	CHANNOBLK,
};

struct Alt {
	Channel	*c;		/* channel */
	void	*v;		/* pointer to value */
	int	op;		/* operation 0 == send, 1 == receive */

	/* the next variables are used internally to alt
	 * they need not be initialized
	 */
	Channel	**tag;	/* pointer to rendez-vous tag */
	ulong	q[Nqwds];
};

struct Ref {
	long ref;
};

int		alt(Alt alts[]);
Channel*	chancreate(int elemsize, int bufsize);
int		chaninit(Channel *c, int elemsize, int elemcnt);
void		chanfree(Channel *c);
int		chanprint(Channel *, char *, ...);
long		decref(Ref *r);		/* returns 0 iff value is now zero */
void		incref(Ref *r);
int		nbrecv(Channel *c, void *v);
void*		nbrecvp(Channel *c);
ulong		nbrecvul(Channel *c);
int		nbsend(Channel *c, void *v);
int		nbsendp(Channel *c, void *v);
int		nbsendul(Channel *c, ulong v);
int		proccreate(void (*f)(void *arg), void *arg, uint stacksize);
int		procrfork(void (*f)(void *arg), void *arg, uint stacksize, int flag);
ulong *		procdata(void);
void		procexec(Channel *, char *, char *[]);
void		procexecl(Channel *, char *, ...);
int		recv(Channel *c, void *v);
void*		recvp(Channel *c);
ulong		recvul(Channel *c);
int		send(Channel *c, void *v);
int		sendp(Channel *c, void *v);
int		sendul(Channel *c, ulong v);
int		threadcreate(void (*f)(void *arg), void *arg, uint stacksize);
ulong*		threaddata(void);
void		threadexits(char *);
void		threadexitsall(char *);
int		threadgetgrp(void);	/* return thread group of current thread */
char*		threadgetname(void);
void		threadkill(int);	/* kill thread */
void		threadkillgrp(int);	/* kill threads in group */
void		threadmain(int argc, char *argv[]);
void		threadnonotes(void);
int		threadid(void);
int		threadpid(int);
int		threadprint(int, char*, ...);
int		threadsetgrp(int);	/* set thread group, return old */
void		threadsetname(char *name);
Channel*	threadwaitchan(void);
void		yield(void);

^ permalink raw reply	[flat|nested] 7+ messages in thread

* Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-20 20:51 David Gordon Hogan
  0 siblings, 0 replies; 7+ messages in thread
From: David Gordon Hogan @ 2001-11-20 20:51 UTC (permalink / raw)
  To: 9fans

> You are correct.  It is possible to get into trouble when there are more
> than 32 threads reading on a channel.  I'll increase the limit and make sure
> that going over the limit is flagged.

I'm sorry, but why is there a limit?



^ permalink raw reply	[flat|nested] 7+ messages in thread

* [9fans] silent limit in thread(2) Channels which can corrupt the stack?
@ 2001-11-20  9:35 Fco.J.Ballesteros
  0 siblings, 0 replies; 7+ messages in thread
From: Fco.J.Ballesteros @ 2001-11-20  9:35 UTC (permalink / raw)
  To: 9fans

Hi,

while debugging a program which uses thread(2), I've come to a point
where an apparently correct program causes stack corruption on one of
the threads. Tracking down the problem, I've found that the program I
show below seems to exceed a silent limit in the implementation of
channels.  I think that if enough pressure is put on a channel it can
corrupt your stack without a warning. Please, correct me if I'm wrong,
I'd be happy to get any info regarding what's going on.

The point where I think the channel has problems is (libthread/channel.c):

	// enqueue on the channel
	for (i = 0; i < 32; i++)
		if ((c->qused & (1 << i)) == 0) {
			c->qentry[i] = &a;
			c->qused |= a.q = 1 << i;
			break;
		}

In case I'm right, I'd suggest calling
abort() when the limit is exceeded.


This is the stack I get:

  acid: At pc:0x00002eb2:sleep+0x7 /sys/src/libc/9syscall/sleep.s:5
  sleep() /sys/src/libc/9syscall/sleep.s:3
	called from main+0x197 /sys/src/libthread/thread.c:500
  main(argc=0x00000001,argv=0x7fffefec) /sys/src/libthread/thread.c:459
	called from _main+0x31 /sys/src/libc/386/main9.s:16
  acid:

and this is smallest the program I got that presents the problem:

Channel c;
int	t;

void s(void *p)
{
	for(;;){
		if (t)
			break;
		recv(&c, nil);
	}
}

void r(void *p)
{
	int i = 1;
	for(;;){
		send(&c, &i);
		if (t)
			break;
	}
}

void
threadmain()
{
	int i;
	chaninit(&c, sizeof(int), 0);
	for (i = 0; i < 50; i++)
		proccreate(r, (void*)i, 16*1024);
	for (i = 0; i < 50; i++)
		proccreate(s, (void*)i, 16*1024);
	sleep(5*1000);
	t = 1;
}



^ permalink raw reply	[flat|nested] 7+ messages in thread

end of thread, other threads:[~2001-11-21 14:34 UTC | newest]

Thread overview: 7+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2001-11-20 14:03 [9fans] silent limit in thread(2) Channels which can corrupt the stack? Sape Mullender
  -- strict thread matches above, loose matches on Subject: below --
2001-11-21 14:34 Fco.J.Ballesteros
2001-11-21 13:24 rog
2001-11-21  9:08 Fco.J.Ballesteros
2001-11-20 21:15 Sape Mullender
2001-11-20 20:51 David Gordon Hogan
2001-11-20  9:35 Fco.J.Ballesteros

This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).