9fans - fans of the OS Plan 9 from Bell Labs
 help / color / mirror / Atom feed
From: Sape Mullender <sape@plan9.bell-labs.com>
To: 9fans@cse.psu.edu
Subject: Re: [9fans] silent limit in thread(2) Channels which can corrupt the stack?
Date: Tue, 20 Nov 2001 16:15:09 -0500	[thread overview]
Message-ID: <20011120211511.146EE19A4F@mail.cse.psu.edu> (raw)

[-- Attachment #1: Type: text/plain, Size: 334 bytes --]

Here are some fixes for channel.c.  You can now have up to 64 waiting
threads.  Overflow is now flagged.  Let me know how you fare.
You probably need to integrate my thread.c with yours; we use a new
version of 9p here with lots of changed interfaces.  The data structures
that matter are the Channel and Alt structs.

	Sape


[-- Attachment #2: channel.c --]
[-- Type: text/plain, Size: 10077 bytes --]

#include <u.h>
#include <libc.h>
#include "assert.h"
#include "threadimpl.h"

static Lock chanlock;		// Central channel access lock

static int
emptyentry(Channel *c)
{
	int i, bitno, byteno;

	for (i = 0; i < Nqbits; i++) {
		bitno = i & Nqmask;
		byteno = i >> Nqshift;
		if ((c->qused[byteno] & 1 << bitno) == 0){
			c->qused[byteno] |= 1 << bitno;
			return i;
		}
	}
	sysfatal("alt: too many waiting threads on channel 0x%p", c);
	return -1;
}

void
chanfree(Channel *c) {
	int i, inuse;

	lock(&chanlock);
	inuse = 0;
	for (i = 0; i < Nqwds; i++)
		if (c->qused[i]) inuse = 1;
	if (inuse)
		c->freed = 1;
	else
		free(c);
	unlock(&chanlock);
}

int
chaninit(Channel *c, int elemsize, int elemcnt) {
	int i;

	if(elemcnt < 0 || elemsize <= 0 || c == nil)
		return -1;
	c->f = 0;
	c->n = 0;
	c->freed = 0;
	for (i = 0; i < Nqwds; i++)
		c->qused[i] = 0;
	c->s = elemcnt;
	c->e = elemsize;
	_threaddebug(DBGCHAN, "chaninit %lux", c);
	return 1;
}

Channel *
chancreate(int elemsize, int elemcnt) {
	Channel *c;

	if(elemcnt < 0 || elemsize <= 0)
		return nil;
	c = _threadmalloc(sizeof(Channel) + elemcnt * elemsize, 1);
	c->s = elemcnt;
	c->e = elemsize;
	_threaddebug(DBGCHAN, "chancreate %lux", c);
	return c;
}

int
alt(Alt *alts) {
	Alt *a, *xa;
	Channel *c;
	uchar *v;
	int i, n, entry, bitno, byteno;
	Proc *p;
	Thread *t;

	lock(&chanlock);

	(*procp)->curthread->alt = alts;
	(*procp)->curthread->call = Callalt;
repeat:

	// Test which channels can proceed
	n = 1;
	a = nil;
	entry = -1;
	for (xa = alts; xa->op; xa++) {
		if (xa->op == CHANNOP) continue;
		if (xa->op == CHANNOBLK) {
			if (a == nil) {
				(*procp)->curthread->call = Callnil;
				unlock(&chanlock);
				return xa - alts;
			} else
				break;
		}

		c = xa->c;
		if (c == nil)
			sysfatal("alt: nil channel in entry %ld\n", xa - alts);
		if ((xa->op == CHANSND && c->n < c->s) ||
			(xa->op == CHANRCV && c->n)) {
				// There's room to send in the channel
				if (nrand(n) == 0) {
					a = xa;
					entry = -1;
				}
				n++;
		} else {
			// Test for blocked senders or receivers
			for (i = 0; i < Nqbits; i++) {
				bitno = i & Nqmask;
				byteno = i >> Nqshift;
				// Is it claimed?
				if (
					(c->qused[byteno] & (1 << bitno))
					&& xa->op == (CHANSND+CHANRCV) - c->qentry[i]->op
							// complementary op
					&& *c->qentry[i]->tag == nil
				) {
					// No
					if (nrand(n) == 0) {
						a = xa;
						entry = i;
					}
					n++;
					break;
				}
			}
		}
	}

	if (a == nil) {
		// Nothing can proceed, enqueue on all channels
		c = nil;
		for (a = alts; a->op; a++) {
			Channel *cp;

			if (a->op == CHANNOP || a->op == CHANNOBLK) continue;
			cp = a->c;
			a->tag = &c;
			i = emptyentry(cp);
			cp->qentry[i] = a;
			a->q[i >> Nqshift] = 1 << (i & Nqmask);
		}

		// And wait for the rendez vous
		unlock(&chanlock);
		if (_threadrendezvous((ulong)&c, 0) == ~0) {
			(*procp)->curthread->call = Callnil;
			return -1;
		}

		lock(&chanlock);

		/* We rendezed-vous on channel c, dequeue from all channels
		 * and find the Alt struct to which c belongs
		 */
		a = nil;
		for (xa = alts; xa->op; xa++) {
			Channel *xc;

			if (xa->op == CHANNOP || xa->op == CHANNOBLK) continue;
			xc = xa->c;
			for (i = 0; i < Nqwds; i++)
				xc->qused[i] &= ~xa->q[i];
			if (xc == c)
				a = xa;

		}

		if (c->s) {
			// Buffered channel, try again
			sleep(0);
			goto repeat;
		}

		unlock(&chanlock);

		if (c->freed) chanfree(c);

		p = *procp;
		t = p->curthread;
		if (t->exiting)
			threadexits(nil);
		(*procp)->curthread->call = Callnil;
		return a - alts;
	}

	c = a->c;
	// Channel c can proceed

	if (c->s) {
		// Send or receive via the buffered channel
		if (a->op == CHANSND) {
			v = c->v + ((c->f + c->n) % c->s) * c->e;
			if (a->v)
				memmove(v, a->v, c->e);
			else
				memset(v, 0, c->e);
			c->n++;
		} else {
			if (a->v) {
				v = c->v + (c->f % c->s) * c->e;
				memmove(a->v, v, c->e);
			}
			c->n--;
			c->f++;
		}
	}
	if (entry < 0)
		for (i = 0; i < Nqbits; i++) {
			bitno = i & Nqmask;
			byteno = i >> Nqshift;
			if (
				(c->qused[byteno] & (1 << bitno))
				&& c->qentry[i]->op == (CHANSND+CHANRCV) - a->op
				&& *c->qentry[i]->tag == nil
			) {
				// Unblock peer process
				xa = c->qentry[i];
				*xa->tag = c;

				unlock(&chanlock);
				if (_threadrendezvous((ulong)xa->tag, 0) == ~0) {
					(*procp)->curthread->call = Callnil;
					return -1;
				}
				(*procp)->curthread->call = Callnil;
				return a - alts;
			}
		}
	if (entry >= 0) {
		xa = c->qentry[entry];
		if (a->op == CHANSND) {
			if (xa->v) {
				if (a->v)
					memmove(xa->v, a->v, c->e);
				else
					memset(xa->v, 0, c->e);
			}
		} else {
			if (a->v) {
				if (xa->v)
					memmove(a->v, xa->v, c->e);
				else
					memset(a->v, 0, c->e);
			}
		}
		*xa->tag = c;

		unlock(&chanlock);
		if (_threadrendezvous((ulong)xa->tag, 0) == ~0) {
			(*procp)->curthread->call = Callnil;
			return -1;
		}
		(*procp)->curthread->call = Callnil;
		return a - alts;
	}
	unlock(&chanlock);
	yield();
	(*procp)->curthread->call = Callnil;
	return a - alts;
}

int
nbrecv(Channel *c, void *v) {
	Alt *a;
	int i, bitno, byteno;

	lock(&chanlock);
	for (i = 0; i < Nqbits; i++) {
		bitno = i & Nqmask;
		byteno = i >> Nqshift;
		a = c->qentry[i];
		if (
			(c->qused[byteno] & (1 << bitno))
			&& a->op == CHANSND
			&& *a->tag == nil
		) {
			*a->tag = c;
			if (c->n) {
				// There's an item to receive in the buffered channel
				if (v)
					memmove(v, c->v + (c->f % c->s) * c->e, c->e);
				c->n--;
				c->f++;
			} else {
				if (v) {
					if (a->v)
						memmove(v, a->v, c->e);
					else
						memset(v, 0, c->e);
				}
			}

			unlock(&chanlock);
			if (_threadrendezvous((ulong)a->tag, 0) == ~0)
				return -1;
			return 1;
		}
	}
	if (c->n) {
		// There's an item to receive in the buffered channel
		if (v)
			memmove(v, c->v + (c->f % c->s) * c->e, c->e);
		c->n--;
		c->f++;
		unlock(&chanlock);
		return 1;
	}
	unlock(&chanlock);
	return 0;
}

int
recv(Channel *c, void *v) {
	Alt a;
	Channel *tag;
	int i, byteno;
	Proc *p;
	Thread *t;

retry:
	if (i = nbrecv(c, v))
		return i;
	lock(&chanlock);
	// Unbuffered, or buffered but empty
	tag = nil;
	a.c = c;
	a.v = v;
	a.tag = &tag;
	a.op = CHANRCV;
	p = *procp;
	t = p->curthread;
	t->alt = &a;
	t->call = Callrcv;

	// enqueue on the channel
	i = emptyentry(c);
	byteno = i >> Nqshift;
	c->qentry[i] = &a;
	a.q[byteno] = 1 << (i & Nqmask);
	unlock(&chanlock);
	if (_threadrendezvous((ulong)&tag, 0) == ~0) {
		t->call = Callnil;
		return -1;
	}
	lock(&chanlock);

	// dequeue from the channel
	c->qused[byteno] &= ~a.q[byteno];
	unlock(&chanlock);
	if (c->s) goto retry;	// Buffered channels: try the queue again
	if (c->freed) chanfree(c);
	t->call = Callnil;
	if (t->exiting)
		threadexits(nil);
	return 1;
}

int
nbsend(Channel *c, void *v) {
	Alt *a;
	int i, bitno, byteno;

	lock(&chanlock);
	for (i = 0; i < Nqbits; i++) {
		bitno = i & Nqmask;
		byteno = i >> Nqshift;
		a = c->qentry[i];
		if (
			(c->qused[byteno] & 1 << bitno)
			&& a->op == CHANRCV
			&& *a->tag == nil
		) {
			*a->tag = c;
			if (c->n < c->s) {
				// There's room to send in the buffered channel
				if (v)
					memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
				else
					memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
				c->n++;
			} else {
				if (a->v) {
					if (v)
						memmove(a->v, v, c->e);
					else
						memset(a->v, 0, c->e);
				}
			}

			unlock(&chanlock);
			if (_threadrendezvous((ulong)a->tag, 0) == ~0)
				return -1;
			return 1;
		}
	}
	if (c->n < c->s) {
		// There's room to send in the buffered channel
		if (v)
			memmove(c->v + ((c->f + c->n) % c->s) * c->e, v, c->e);
		else
			memset(c->v + ((c->f + c->n) % c->s) * c->e, 0, c->e);
		c->n++;
		unlock(&chanlock);
		yield();
		return 1;
	}
	unlock(&chanlock);
	return 0;
}

int
send(Channel *c, void *v) {
	Alt a;
	Channel *tag;
	int i, byteno;
	Proc *p;
	Thread *t;

retry:
	if (i = nbsend(c, v))
		return i;
	lock(&chanlock);
	tag = nil;
	a.c = c;
	a.v = v;
	a.tag = &tag;
	a.op = CHANSND;
	p = *procp;
	t = p->curthread;
	t->alt = &a;
	t->call = Callsnd;

	// enqueue on the channel
	i = emptyentry(c);
	byteno = i >> Nqshift;
	c->qentry[i] = &a;
	a.q[byteno] = 1 << (i & Nqmask);
	unlock(&chanlock);
	if (_threadrendezvous((ulong)&tag, 0) == ~0) {
		t->call = Callnil;
		return -1;
	}
	lock(&chanlock);
	// dequeue from the channel
	c->qused[byteno] &= ~a.q[byteno];
	unlock(&chanlock);
	if (c->s)
		goto retry;	// Buffered channels: try the queue again
	// Unbuffered channels: data is already transferred
	if (c->freed) chanfree(c);
	t->call = Callnil;
	if (t->exiting)
		threadexits(nil);
	return 1;
}

int
sendul(Channel *c, ulong v) {
	threadassert(c->e == sizeof(ulong));
	return send(c, &v);
}

ulong
recvul(Channel *c) {
	ulong v;

	threadassert(c->e == sizeof(ulong));
	if (recv(c, &v) < 0)
		return ~0;
	return v;
}

int
sendp(Channel *c, void *v) {
	threadassert(c->e == sizeof(void *));
	return send(c, &v);
}

void *
recvp(Channel *c) {
	void * v;

	threadassert(c->e == sizeof(void *));
	if (recv(c, &v) < 0)
		return nil;
	return v;
}

int
nbsendul(Channel *c, ulong v) {
	threadassert(c->e == sizeof(ulong));
	return nbsend(c, &v);
}

ulong
nbrecvul(Channel *c) {
	ulong v;

	threadassert(c->e == sizeof(ulong));
	if (nbrecv(c, &v) == 0)
		return 0;
	return v;
}

int
nbsendp(Channel *c, void *v) {
	threadassert(c->e == sizeof(void *));
	return nbsend(c, &v);
}

void *
nbrecvp(Channel *c) {
	void * v;

	threadassert(c->e == sizeof(void *));
	if (nbrecv(c, &v) == 0)
		return nil;
	return v;
}

[-- Attachment #3: thread.h --]
[-- Type: text/plain, Size: 3196 bytes --]

#pragma src "/sys/src/libthread"
#pragma lib "libthread.a"

#pragma	varargck	argpos	threadprint	2
#pragma	varargck	argpos	chanprint	2

typedef struct Proc	Proc;
typedef struct Thread	Thread;
typedef struct Tqueue	Tqueue;
typedef struct Alt	Alt;
typedef struct Channel	Channel;
typedef struct Ref	Ref;

/* Channel structure.  S is the size of the buffer.  For unbuffered channels
 * s is zero.  v is an array of s values.  If s is zero, v is unused.
 * f and n represent the state of the queue pointed to by v.
 * rcvrs and sndrs must be initialized to nil and should not be touched
 * by code outside channel.c
 */

enum {
	Nqwds = 2,
	Nqshift = 5,
	Nqbits = (1 << Nqshift) * Nqwds,
	Nqmask = (1 << Nqshift) - 1,
};

struct Channel {
	int		s;				// Size of the channel (may be zero)
	uint		f;				// Extraction point (insertion pt: (f + n) % s)
	uint		n;				// Number of values in the channel
	int		e;				// Element size
	int		freed;			// Set when channel is being deleted
	ulong	qused[Nqwds];		// Bitmap of used entries in rcvrs
	Alt	*	qentry[Nqbits];		// Receivers/senders waiting
	uchar	v[1];				// Array of max(1, s) values in the channel
};


/* Channel operations for alt: */
enum{
	CHANEND,
	CHANSND,
	CHANRCV,
	CHANNOP,
	CHANNOBLK,
};

struct Alt {
	Channel	*c;		/* channel */
	void	*v;		/* pointer to value */
	int	op;		/* operation 0 == send, 1 == receive */

	/* the next variables are used internally to alt
	 * they need not be initialized
	 */
	Channel	**tag;	/* pointer to rendez-vous tag */
	ulong	q[Nqwds];
};

struct Ref {
	long ref;
};

int		alt(Alt alts[]);
Channel*	chancreate(int elemsize, int bufsize);
int		chaninit(Channel *c, int elemsize, int elemcnt);
void		chanfree(Channel *c);
int		chanprint(Channel *, char *, ...);
long		decref(Ref *r);		/* returns 0 iff value is now zero */
void		incref(Ref *r);
int		nbrecv(Channel *c, void *v);
void*		nbrecvp(Channel *c);
ulong		nbrecvul(Channel *c);
int		nbsend(Channel *c, void *v);
int		nbsendp(Channel *c, void *v);
int		nbsendul(Channel *c, ulong v);
int		proccreate(void (*f)(void *arg), void *arg, uint stacksize);
int		procrfork(void (*f)(void *arg), void *arg, uint stacksize, int flag);
ulong *		procdata(void);
void		procexec(Channel *, char *, char *[]);
void		procexecl(Channel *, char *, ...);
int		recv(Channel *c, void *v);
void*		recvp(Channel *c);
ulong		recvul(Channel *c);
int		send(Channel *c, void *v);
int		sendp(Channel *c, void *v);
int		sendul(Channel *c, ulong v);
int		threadcreate(void (*f)(void *arg), void *arg, uint stacksize);
ulong*		threaddata(void);
void		threadexits(char *);
void		threadexitsall(char *);
int		threadgetgrp(void);	/* return thread group of current thread */
char*		threadgetname(void);
void		threadkill(int);	/* kill thread */
void		threadkillgrp(int);	/* kill threads in group */
void		threadmain(int argc, char *argv[]);
void		threadnonotes(void);
int		threadid(void);
int		threadpid(int);
int		threadprint(int, char*, ...);
int		threadsetgrp(int);	/* set thread group, return old */
void		threadsetname(char *name);
Channel*	threadwaitchan(void);
void		yield(void);

             reply	other threads:[~2001-11-20 21:15 UTC|newest]

Thread overview: 7+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2001-11-20 21:15 Sape Mullender [this message]
  -- strict thread matches above, loose matches on Subject: below --
2001-11-21 14:34 Fco.J.Ballesteros
2001-11-21 13:24 rog
2001-11-21  9:08 Fco.J.Ballesteros
2001-11-20 20:51 David Gordon Hogan
2001-11-20 14:03 Sape Mullender
2001-11-20  9:35 Fco.J.Ballesteros

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20011120211511.146EE19A4F@mail.cse.psu.edu \
    --to=sape@plan9.bell-labs.com \
    --cc=9fans@cse.psu.edu \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line before the message body.
This is a public inbox, see mirroring instructions
for how to clone and mirror all data and code used for this inbox;
as well as URLs for NNTP newsgroup(s).