## diffname port/stream.c 1990/0227
## diff -e /dev/null /n/bootesdump/1990/0227/sys/src/9/mips/stream.c
0a
#include "u.h"
#include "lib.h"
#include "mem.h"
#include "dat.h"
#include "fns.h"
#include "io.h"
#include "errno.h"
#include "devtab.h"
static void stputq(Queue*, Block*);
Qinfo procinfo = { stputq, nullput, 0, 0, "process" } ;
extern Qinfo noetherinfo;
static Qinfo *lds[] = {
&noetherinfo,
0
};
enum {
Nclass=4,
};
/*
* All stream structures are ialloc'd at boot time
*/
Stream *slist;
Queue *qlist;
Block *blist;
static Lock garbagelock;
/*
* The block classes. There are Nclass block sizes, each with its own free list.
* All are ialloced at qinit() time.
*/
typedef struct {
int size;
Queue;
} Bclass;
Bclass bclass[Nclass]={
{ 0 },
{ 64 },
{ 512 },
{ 4096 },
};
/*
* Allocate streams, queues, and blocks. Allocate n block classes with
* 1/2(m+1) to class m < n-1
* 1/2(n-1) to class n-1
*/
void
streaminit(void)
{
int class, i, n;
Block *bp;
Bclass *bcp;
slist = (Stream *)ialloc(conf.nstream * sizeof(Stream), 0);
qlist = (Queue *)ialloc(conf.nqueue * sizeof(Queue), 0);
blist = (Block *)ialloc(conf.nblock * sizeof(Block), 0);
bp = blist;
n = conf.nblock;
for(class = 0; class < Nclass; class++){
if(class < Nclass-1)
n = n/2;
bcp = &bclass[class];
for(i = 0; i < n; i++) {
if(bcp->size)
bp->base = (uchar *)ialloc(bcp->size, 0);
bp->lim = bp->base + bcp->size;
bp->flags = class;
freeb(bp);
bp++;
}
}
}
/*
* allocate a block
*/
static int
isblock(void *arg)
{
Bclass *bcp;
bcp = (Bclass *)arg;
return bcp->first!=0;
}
Block *
allocb(ulong size)
{
Block *bp;
Bclass *bcp;
int i;
/*
* map size to class
*/
for(bcp=bclass; bcp->size<size && bcp<&bclass[Nclass-1]; bcp++)
;
/*
* look for a free block, garbage collect if there are none
*/
lock(bcp);
while(bcp->first == 0){
unlock(bcp);
print("waiting for blocks\n");
sleep(&bcp->r, isblock, (void *)bcp);
lock(bcp);
}
bp = bcp->first;
bcp->first = bp->next;
if(bcp->first == 0)
bcp->last = 0;
unlock(bcp);
/*
* return an empty block
*/
bp->rptr = bp->wptr = bp->base;
bp->next = 0;
bp->type = M_DATA;
bp->flags &= S_CLASS;
return bp;
}
/*
* Free a block. Poison its pointers so that someone trying to access
* it after freeing will cause a dump.
*/
void
freeb(Block *bp)
{
Bclass *bcp;
bcp = &bclass[bp->flags & S_CLASS];
bp->rptr = bp->wptr = 0;
lock(bcp);
if(bcp->first)
bcp->last->next = bp;
else
bcp->first = bp;
bcp->last = bp;
bp->next = 0;
wakeup(&bcp->r);
unlock(bcp);
}
/*
* allocate a pair of queues. flavor them with the requested put routines.
* the `QINUSE' flag on the read side is the only one used.
*/
static Queue *
allocq(Qinfo *qi)
{
Queue *q, *wq;
for(q=qlist; q<&qlist[conf.nqueue]; q++, q++) {
if(q->flag == 0){
if(canlock(q)){
if(q->flag == 0)
break;
unlock(q);
}
}
}
if(q == &qlist[conf.nqueue]){
print("no more queues\n");
error(0, Enoqueue);
}
q->flag = QINUSE;
q->r.p = 0;
q->info = qi;
q->put = qi->iput;
wq = q->other = q + 1;
wq->r.p = 0;
wq->info = qi;
wq->put = qi->oput;
wq->other = q;
unlock(q);
return q;
}
/*
* free a queue
*/
static void
freeq(Queue *q)
{
Block *bp;
q = RD(q);
while(bp = getq(q))
freeb(bp);
q = WR(q);
while(bp = getq(q))
freeb(bp);
RD(q)->flag = 0;
}
/*
* push a queue onto a stream referenced by the proc side write q
*/
Queue *
pushq(Stream* s, Qinfo *qi)
{
Queue *q;
Queue *nq;
q = RD(s->procq);
/*
* make the new queue
*/
nq = allocq(qi);
/*
* push
*/
RD(nq)->next = q;
RD(WR(q)->next)->next = RD(nq);
WR(nq)->next = WR(q)->next;
WR(q)->next = WR(nq);
if(qi->open)
(*qi->open)(RD(nq), s);
return WR(nq)->next;
}
/*
* pop off the top line discipline
*/
static void
popq(Stream *s)
{
Queue *q;
if(s->procq->next == WR(s->devq))
error(0, Ebadld);
q = s->procq->next;
if(q->info->close)
(*q->info->close)(RD(q));
s->procq->next = q->next;
RD(q->next)->next = RD(s->procq);
freeq(q);
}
/*
* add a block (or list of blocks) to the end of a queue. return true
* if one of the blocks contained a delimiter.
*/
int
putq(Queue *q, Block *bp)
{
int delim;
delim = 0;
lock(q);
if(q->first)
q->last->next = bp;
else
q->first = bp;
q->len += bp->wptr - bp->rptr;
delim = bp->flags & S_DELIM;
while(bp->next) {
bp = bp->next;
q->len += bp->wptr - bp->rptr;
delim |= bp->flags & S_DELIM;
}
q->last = bp;
if(q->len >= Streamhi)
q->flag |= QHIWAT;
unlock(q);
return delim;
}
int
putb(Blist *q, Block *bp)
{
int delim;
delim = 0;
if(q->first)
q->last->next = bp;
else
q->first = bp;
q->len += bp->wptr - bp->rptr;
delim = bp->flags & S_DELIM;
while(bp->next) {
bp = bp->next;
q->len += bp->wptr - bp->rptr;
delim |= bp->flags & S_DELIM;
}
q->last = bp;
bp->next = 0;
return delim;
}
/*
* add a block to the start of a queue
*/
static void
putbq(Blist *q, Block *bp)
{
lock(q);
if(q->first)
bp->next = q->first;
else
q->last = bp;
q->first = bp;
q->len += bp->wptr - bp->rptr;
unlock(q);
}
/*
* remove the first block from a queue
*/
Block *
getq(Queue *q)
{
Block *bp;
lock(q);
bp = q->first;
if(bp) {
q->first = bp->next;
if(q->first == 0)
q->last = 0;
q->len -= bp->wptr - bp->rptr;
if((q->flag&QHIWAT) && q->len < Streamhi/2){
wakeup(&q->other->next->other->r);
q->flag &= ~QHIWAT;
}
bp->next = 0;
}
unlock(q);
return bp;
}
Block *
getb(Blist *q)
{
Block *bp;
bp = q->first;
if(bp) {
q->first = bp->next;
if(q->first == 0)
q->last = 0;
q->len -= bp->wptr - bp->rptr;
bp->next = 0;
}
return bp;
}
/*
* put a block into the bit bucket
*/
void
nullput(Queue *q, Block *bp)
{
freeb(bp);
error(0, Ehungup);
}
/*
* find the info structure for line discipline 'name'
*/
static Qinfo *
qinfofind(char *name)
{
Qinfo **qip;
if(name == 0)
error(0, Ebadld);
for(qip = lds; *qip; qip++)
if(strcmp((*qip)->name, name)==0)
return *qip;
error(0, Ebadld);
}
/*
* send a hangup up a stream
*/
static void
hangup(Stream *s)
{
Block *bp;
bp = allocb(0);
bp->type = M_HANGUP;
(*s->devq->put)(s->devq, bp);
}
/*
* parse a string and return a pointer to the second element if the
* first matches name. bp->rptr will be updated to point to the
* second element.
*
* return 0 if no match.
*
* it is assumed that the block data is null terminated. streamwrite
* guarantees this.
*/
int
streamparse(char *name, Block *bp)
{
int len;
len = strlen(name);
if(bp->wptr - bp->rptr < len)
return 0;
if(strncmp(name, (char *)bp->rptr, len)==0){
if(bp->rptr[len] == ' ')
bp->rptr += len+1;
else if(bp->rptr[len])
return 0;
else
bp->rptr += len;
return 1;
}
return 0;
}
/*
* the per stream directory structure
*/
Dirtab streamdir[]={
"data", Sdataqid, 0, 0600,
"ctl", Sctlqid, 0, 0600,
};
/*
* A stream device consists of the contents of streamdir plus
* any directory supplied by the actual device.
*
* values of s:
* 0 to ntab-1 apply to the auxiliary directory.
* ntab to ntab+Shighqid-Slowqid+1 apply to streamdir.
*/
int
streamgen(Chan *c, Dirtab *tab, int ntab, int s, Dir *dp)
{
Proc *p;
char buf[NAMELEN];
if(s < ntab)
tab = &tab[s];
else if(s < ntab + Shighqid - Slowqid + 1)
tab = &streamdir[s - ntab];
else
return -1;
devdir(c, STREAMQID(STREAMID(c->qid),tab->qid), tab->name, tab->length,
tab->perm, dp);
return 1;
}
/*
* create a new stream
*/
Stream *
streamnew(Chan *c, Qinfo *qi)
{
Stream *s;
Queue *q;
/*
* find a free stream struct
*/
for(s = slist; s < &slist[conf.nstream]; s++) {
if(s->inuse == 0){
if(canlock(s)){
if(s->inuse == 0)
break;
unlock(s);
}
}
}
if(s == &slist[conf.nstream]){
print("no more streams\n");
error(0, Enostream);
}
if(waserror()){
unlock(s);
streamclose(c);
nexterror();
}
/*
* marry a stream and a channel
*/
if(c){
c->stream = s;
s->type = c->type;
s->dev = c->dev;
s->id = STREAMID(c->qid);
} else
s->type = -1;
/*
* hang a device and process q off the stream
*/
s->inuse = 1;
s->tag[0] = 0;
q = allocq(&procinfo);
s->procq = WR(q);
q = allocq(qi);
s->devq = RD(q);
WR(s->procq)->next = WR(s->devq);
RD(s->procq)->next = 0;
RD(s->devq)->next = RD(s->procq);
WR(s->devq)->next = 0;
if(qi->open)
(*qi->open)(RD(s->devq), s);
c->flag |= COPEN;
unlock(s);
poperror();
return s;
}
/*
* (Re)open a stream. If this is the first open, create a stream.
*/
void
streamopen(Chan *c, Qinfo *qi)
{
Stream *s;
Queue *q;
/*
* if the stream already exists, just up the reference count.
*/
for(s = slist; s < &slist[conf.nstream]; s++) {
if(s->inuse && s->type == c->type && s->dev == c->dev
&& s->id == STREAMID(c->qid)){
lock(s);
if(s->inuse && s->type == c->type
&& s->dev == c->dev
&& s->id == STREAMID(c->qid)){
s->inuse++;
c->stream = s;
unlock(s);
return;
}
unlock(s);
}
}
/*
* create a new stream
*/
streamnew(c, qi);
}
/*
* On the last close of a stream, for each queue on the
* stream release its blocks and call its close routine.
*/
void
streamclose(Chan *c)
{
Queue *q, *nq;
Block *bp;
/*
* if not open, ignore it
*/
if(!(c->flag & COPEN))
return;
/*
* decrement the reference cound
*/
lock(c->stream);
if(--(c->stream->inuse) != 0){
unlock(c->stream);
return;
}
/*
* descend the stream closing the queues
*/
for(q = c->stream->procq; q; q = q->next){
if(q->info->close)
(*q->info->close)(q->other);
if(q == c->stream->devq->other)
break;
}
/*
* ascend the stream freeing the queues
*/
for(q = c->stream->devq; q; q = nq){
nq = q->next;
freeq(q);
}
c->stream->id = c->stream->dev = c->stream->type = 0;
unlock(c->stream);
}
/*
* put a block to be read into the queue. wakeup any waiting reader
*/
void
stputq(Queue *q, Block *bp)
{
int i;
if(bp->type == M_HANGUP){
freeb(bp);
q->flag |= QHUNGUP;
q->other->flag |= QHUNGUP;
} else {
lock(q);
if(q->first)
q->last->next = bp;
else
q->first = bp;
q->last = bp;
q->len += bp->wptr - bp->rptr;
if(q->len >= Streamhi)
q->flag |= QHIWAT;
unlock(q);
}
wakeup(&q->r);
}
/*
* read a string. update the offset accordingly.
*/
long
stringread(Chan *c, uchar *buf, long n, char *str)
{
long i;
i = strlen(str);
i -= c->offset;
if(i<n)
n = i;
if(n<0)
return 0;
memcpy(buf + c->offset, str, n);
c->offset += n;
return n;
}
/*
* return true if there is an output buffer available
*/
static int
isinput(void *x)
{
return ((Queue *)x)->first != 0;
}
/*
* read until we fill the buffer or until a DELIM is encountered
*/
long
streamread(Chan *c, void *vbuf, long n)
{
Block *bp;
Stream *s;
Queue *q;
long rv = 0;
int left, i, x;
uchar *buf = vbuf;
char num[32];
s = c->stream;
switch(STREAMTYPE(c->qid)){
case Sdataqid:
break;
case Sctlqid:
sprint(num, "%d", s->id);
return stringread(c, buf, n, num);
default:
if(CHDIR & c->qid)
return devdirread(c, vbuf, n, 0, 0, streamgen);
else
panic("streamread");
}
/*
* one reader at a time
*/
qlock(&s->rdlock);
if(waserror()){
qunlock(&s->rdlock);
nexterror();
}
/*
* sleep till data is available
*/
q = RD(s->procq);
left = n;
while(left){
bp = getq(q);
if(bp == 0){
if(q->flag & QHUNGUP)
break;
sleep(&q->r, &isinput, (void *)q);
continue;
}
i = bp->wptr - bp->rptr;
if(i <= left){
memcpy(buf, bp->rptr, i);
left -= i;
buf += i;
if(bp->flags & S_DELIM){
freeb(bp);
break;
} else
freeb(bp);
} else {
memcpy(buf, bp->rptr, left);
bp->rptr += left;
putbq(q, bp);
left = 0;
}
};
qunlock(&s->rdlock);
poperror();
return n - left;
}
/*
* Handle a ctl request. Streamwide requests are:
*
* hangup -- send an M_HANGUP up the stream
* push ldname -- push the line discipline named ldname
* pop -- pop a line discipline
*
* This routing is entrered with s->wrlock'ed and must unlock.
*/
static long
streamctlwrite(Stream *s, void *a, long n)
{
Qinfo *qi;
Block *bp;
/*
* package
*/
bp = allocb(n+1);
memcpy(bp->wptr, a, n);
bp->wptr[n] = 0;
bp->wptr += n + 1;
/*
* check for standard requests
*/
if(streamparse("hangup", bp)){
hangup(s);
freeb(bp);
} else if(streamparse("push", bp)){
qi = qinfofind((char *)bp->rptr);
pushq(s, qi);
freeb(bp);
} else if(streamparse("pop", bp)){
popq(s);
freeb(bp);
} else {
bp->type = M_CTL;
bp->flags |= S_DELIM;
PUTNEXT(s->procq, bp);
}
return n;
}
/*
* wait till there's room in the next stream
*/
static int
notfull(void *arg)
{
Queue *q;
q = (Queue *)arg;
return q->len < Streamhi;
}
void
flowctl(Queue *q)
{
if(q->next->len >= Streamhi)
sleep(&q->r, notfull, q->next);
}
/*
* send the request as a single delimited block
*/
long
streamwrite(Chan *c, void *a, long n)
{
Stream *s;
Block *bp;
Queue *q;
long rem;
int i;
/*
* one writer at a time
*/
s = c->stream;
qlock(&s->wrlock);
if(waserror()){
qunlock(&s->wrlock);
nexterror();
}
/*
* decode the qid
*/
switch(STREAMTYPE(c->qid)){
case Sdataqid:
break;
case Sctlqid:
n = streamctlwrite(s, a, n);
qunlock(&s->wrlock);
poperror();
return n;
default:
panic("bad stream qid\n");
}
/*
* No writes allowed on hungup channels
*/
q = s->procq;
if(q->other->flag & QHUNGUP)
error(0, Ehungup);
if(GLOBAL(a) || n==0){
/*
* `a' is global to the whole system, just create a
* pointer to it and pass it on.
*/
flowctl(q);
bp = allocb(0);
bp->rptr = bp->base = (uchar *)a;
bp->wptr = bp->lim = (uchar *)a+n;
bp->flags |= S_DELIM;
bp->type = M_DATA;
PUTNEXT(q, bp);
} else {
/*
* `a' is in the user's address space, copy it into
* system buffers and pass the buffers on.
*/
for(rem = n; ; rem -= i) {
flowctl(q);
bp = allocb(rem);
i = bp->lim - bp->wptr;
if(i >= rem){
memcpy(bp->wptr, a, rem);
bp->flags |= S_DELIM;
bp->wptr += rem;
bp->type = M_DATA;
PUTNEXT(q, bp);
break;
} else {
memcpy(bp->wptr, a, i);
bp->wptr += i;
bp->type = M_DATA;
PUTNEXT(q, bp);
a = ((char*)a) + i;
}
}
}
qunlock(&s->wrlock);
poperror();
return n;
}
.
## diffname port/stream.c 1990/03013
## diff -e /n/bootesdump/1990/0227/sys/src/9/mips/stream.c /n/bootesdump/1990/03013/sys/src/9/mips/stream.c
606,607c
s->id = s->dev = s->type = 0;
s->inuse--;
unlock(s);
.
602c
for(q = s->devq; q; q = nq){
.
596c
if(q == s->devq->other)
.
593c
for(q = s->procq; q; q = q->next){
.
584,585c
lock(s);
if(s->inuse != 1){
s->inuse--;
.
573a
Stream *s = c->stream;
.
## diffname port/stream.c 1990/0312
## diff -e /n/bootesdump/1990/03013/sys/src/9/mips/stream.c /n/bootesdump/1990/0312/sys/src/9/mips/stream.c
888a
}
/*
* like andrew's getmfields but no hidden state
*/
int
getfields(char *lp, /* to be parsed */
char **fields, /* where to put pointers */
int n, /* number of pointers */
char sep /* separator */
)
{
int i;
for(i=0; lp && *lp && i<n; i++){
while(*lp == sep)
*lp++=0;
if(*lp == 0)
break;
fields[i]=lp;
while(*lp && *lp != sep)
lp++;
}
return i;
.
849c
if((GLOBAL(a) && !docopy) || n==0){
.
809c
streamwrite(Chan *c, void *a, long n, int docopy)
.
719c
i = BLEN(bp);
.
654,655c
memcpy(buf, str + c->offset, n);
.
632d
630a
q->len += BLEN(bp);
while(bp->next) {
bp = bp->next;
q->len += BLEN(bp);
}
.
624a
wakeup(&q->other->r);
.
418c
if(BLEN(bp) < len)
.
363a
* make sure the first block has n bytes
*/
Block *
pullup(Block *bp, int n)
{
Block *nbp;
int i;
/*
* this should almost always be true, the rest it
* just for to avoid every caller checking.
*/
if(BLEN(bp) >= n)
return bp;
/*
* if not enough room in the first block,
* add another to the front of the list.
if(bp->lim - bp->rptr < n){
nbp = allocb(n);
nbp->next = bp;
bp = nbp;
}
/*
* copy bytes from the trailing blocks into the first
*/
n -= BLEN(bp);
while(nbp = bp->next){
i = BLEN(nbp);
if(i > n) {
memcpy(bp->wptr, nbp->rptr, n);
bp->wptr += n;
nbp->rptr += n;
return bp;
} else {
memcpy(bp->wptr, nbp->rptr, i);
bp->wptr += i;
bp->next = nbp->next;
nbp->next = 0;
freeb(nbp);
}
}
freeb(bp);
return 0;
}
/*
* grow the front of a list of blocks by n bytes
*/
Block *
prepend(Block *bp, int n)
{
Block *nbp;
if(bp->base && (bp->rptr - bp->base)>=n){
/*
* room for channel number in first block of message
*/
bp->rptr -= n;
return bp;
} else {
/*
* make new block, put message number at end
*/
nbp = allocb(2);
nbp->next = bp;
nbp->wptr = nbp->lim;
nbp->rptr = nbp->wptr - n;
return nbp;
}
}
/*
.
357c
q->len -= BLEN(bp);
.
346a
/*
* remove the first block from a list of blocks
*/
.
337c
q->len -= BLEN(bp);
.
324c
* remove the first block from a queue
.
319c
q->len += BLEN(bp);
.
310c
void
.
303d
299c
q->len += BLEN(bp);
.
295c
q->len += BLEN(bp);
.
276c
q->len += BLEN(bp);
.
272c
q->len += BLEN(bp);
.
149a
wakeup(&bcp->r);
.
147,148d
145a
tries = 0;
while(bp->next){
if(++tries > 10){
dumpstack();
panic("freeb");
}
bp = bp->next;
}
.
141a
bp->rptr = bp->wptr = 0;
.
140d
137a
int tries;
.
131,132c
* Free a block (or list of blocks). Poison its pointers so that
* someone trying to access it after freeing will cause a dump.
.
111a
qunlock(bcp);
.
110a
qlock(bcp);
.
37c
Blist;
QLock; /* qlock for sleepers on r */
Rendez r; /* sleep here waiting for blocks */
.
15a
&dkmuxinfo,
&urpinfo,
.
13a
/*
* line disciplines that can be pushed
*
* WARNING: this table should be the result of configuration
*/
extern Qinfo noetherinfo;
extern Qinfo dkmuxinfo;
extern Qinfo urpinfo;
.
11,12c
Qinfo procinfo = { stputq, nullput, 0, 0, "process" };
.
9a
/*
* process end line discipline
*/
.
## diffname port/stream.c 1990/0321
## diff -e /n/bootesdump/1990/0312/sys/src/9/mips/stream.c /n/bootesdump/1990/0321/sys/src/9/mips/stream.c
743c
if(delim)
wakeup(&q->r);
.
740a
delim = 1;
}
.
739c
if(q->len >= Streamhi){
.
736a
delim |= bp->flags & S_DELIM;
.
733a
delim = bp->flags & S_DELIM;
.
727a
delim = 0;
.
726a
delim = 1;
.
720c
int delim;
.
173c
if(bcp->r.p)
wakeup(&bcp->r);
.
92d
55,56c
{ 68 },
{ 260 },
.
34a
for(i=0; i<Nlds && lds[i]; i++)
if(lds[i] == qi)
return;
if(i == Nlds)
panic("pushable");
lds[i] = qi;
}
.
31,33c
void
newqinfo(Qinfo *qi)
{
int i;
.
21,29c
static Qinfo *lds[Nlds+1];
.
18,19d
9a
enum {
Nclass=4, /* number of block classes */
Nlds=32, /* max number of pushable line disciplines */
};
.
## diffname port/stream.c 1990/0322
## diff -e /n/bootesdump/1990/0321/sys/src/9/mips/stream.c /n/bootesdump/1990/0322/sys/src/9/mips/stream.c
127,130c
print("waiting for block %d\n", size);
if(loop++ > 10)
panic("waiting for blocks");
qlock(&bcp->q);
tsleep(&bcp->r, isblock, (void *)bcp, 250);
qunlock(&bcp->q);
.
114d
112c
int loop=0;
.
54c
QLock q; /* qlock for sleepers on r */
.
## diffname port/stream.c 1990/03292
## diff -e /n/bootesdump/1990/0322/sys/src/9/mips/stream.c /n/bootesdump/1990/03292/sys/src/9/mips/stream.c
685c
if(!c->stream)
.
631d
131c
qunlock(bcp);
.
128,129c
panic("waiting for blocks\n");
qlock(bcp);
.
126d
54c
QLock; /* qlock for sleepers on r */
.
## diffname port/stream.c 1990/0331
## diff -e /n/bootesdump/1990/03292/sys/src/9/mips/stream.c /n/bootesdump/1990/0331/sys/src/9/mips/stream.c
828,829c
if(q->flag & QHUNGUP){
if(s->hread++ < 3)
break;
else
error(0, Ehungup);
}
.
616a
s->hread = 0;
.
127a
}
.
126c
if(loop++ > 10){
dumpqueues();
dumpstack();
.
64a
* Dump all block information of how many blocks are in which queues
*/
void
dumpqueues(void)
{
Queue *q;
int count;
Block *bp;
for(q = qlist; q < qlist + conf.nqueue; q++, q++){
if(!(q->flag & QINUSE))
continue;
for(count = 0, bp = q->first; bp; bp = bp->next)
count++;
print("%s %ux RD count %d len %d", q->info->name, q, count, q->len);
for(count = 0, bp = WR(q)->first; bp; bp = bp->next)
count++;
print(" WR count %d len %d\n", count, WR(q)->len);
}
}
/*
.
## diffname port/stream.c 1990/0403
## diff -e /n/bootesdump/1990/0331/sys/src/9/mips/stream.c /n/bootesdump/1990/0403/sys/src/9/mips/stream.c
1012c
FLOWCTL(q);
.
999c
FLOWCTL(q);
.
946,947c
sleep(&q->r, notfull, q->next);
.
941c
return !QFULL(q->next);
.
772c
if(q->len >= Streamhi || q->nb >= Streambhi){
.
768a
q->nb++;
.
764a
q->nb++;
.
390c
q->nb--;
if((q->flag&QHIWAT) && q->len < Streamhi/2 && q->nb < Streambhi){
.
371a
q->nb++;
.
333c
if(q->len >= Streamhi || q->nb >= Streambhi)
.
329a
q->nb++;
.
325a
q->nb++;
.
238a
wq->len = wq->nb = 0;
.
232a
q->len = q->nb = 0;
.
150,151c
print("waiting for blocks\n");
.
148c
if(loop++ == 10){
.
61c
{ 1024 },
.
## diffname port/stream.c 1990/0406
## diff -e /n/bootesdump/1990/0403/sys/src/9/mips/stream.c /n/bootesdump/1990/0406/sys/src/9/mips/stream.c
945,948c
return !QFULL((Queue *)arg);
.
395c
if((q->flag&QHIWAT) && q->len<Streamhi/2 && q->nb<Streambhi/2){
.
## diffname port/stream.c 1990/0409
## diff -e /n/bootesdump/1990/0406/sys/src/9/mips/stream.c /n/bootesdump/1990/0409/sys/src/9/mips/stream.c
61c
{ 4096 },
.
## diffname port/stream.c 1990/0509
## diff -e /n/bootesdump/1990/0409/sys/src/9/mips/stream.c /n/bootesdump/1990/0509/sys/src/9/mips/stream.c
83a
print("\n");
for(bcp=bclass; bcp<&bclass[Nclass-1]; bcp++){
lock(bcp);
for(count = 0, bp = bcp->first; bp; count++, bp = bp->next)
;
unlock(bcp);
print("%d blocks of size %d\n", count, bcp->size);
}
print("\n");
.
73a
print("\n");
.
72a
Bclass *bcp;
.
## diffname port/stream.c 1990/0511
## diff -e /n/bootesdump/1990/0509/sys/src/9/mips/stream.c /n/bootesdump/1990/0511/sys/src/9/mips/stream.c
87c
for(bcp=bclass; bcp<&bclass[Nclass]; bcp++){
.
84c
print(" W c %d l %d f %ux\n", count, WR(q)->len, WR(q)->flag);
.
81c
print("%s %ux R c %d l %d f %ux", q->info->name, q, count,
q->len, q->flag);
.
## diffname port/stream.c 1990/0513
## diff -e /n/bootesdump/1990/0511/sys/src/9/mips/stream.c /n/bootesdump/1990/0513/sys/src/9/mips/stream.c
194a
if((bp->flags&S_CLASS) >= Nclass)
panic("freeb class");
.
## diffname port/stream.c 1990/0629
## diff -e /n/bootesdump/1990/0513/sys/src/9/mips/stream.c /n/bootesdump/1990/0629/sys/src/9/mips/stream.c
744,758c
streamexit(s, 1);
.
742c
* leave it and free it
.
739a
s->opens--;
.
735,738c
if(s->opens == 1){
/*
* descend the stream closing the queues
*/
for(q = s->procq; q; q = q->next){
if(q->info->close)
(*q->info->close)(q->other);
/* this may be 2 streams joined device end to device end */
if(q == s->devq->other)
break;
}
/*
* ascend the stream flushing the queues
*/
for(q = s->devq; q; q = nq){
nq = q->next;
flushq(q);
}
.
732c
* decrement the reference count
.
714a
* Enter a stream. Increment the reference count so it can't disappear
* under foot.
*/
int
streamenter(Stream *s)
{
lock(s);
if(s->opens == 0){
unlock(s);
return -1;
}
s->inuse++;
unlock(s);
return 0;
}
/*
* Decrement the reference count on a stream. If the count is
* zero, free the stream.
*/
void
streamexit(Stream *s, int locked)
{
Queue *q;
Queue *nq;
if(!locked)
lock(s);
if(s->inuse == 1){
/*
* ascend the stream freeing the queues
*/
for(q = s->devq; q; q = nq){
nq = q->next;
freeq(q);
}
s->id = s->dev = s->type = 0;
}
s->inuse--;
if(!locked)
unlock(s);
}
/*
.
699a
s->opens++;
.
662d
660a
s->opens = 1;
.
518,519c
if(bp->type == M_HANGUP)
freeb(bp);
else {
freeb(bp);
error(0, Ehungup);
}
.
260a
* flush a queue
*/
static void
flushq(Queue *q)
{
Block *bp;
q = RD(q);
while(bp = getq(q))
freeb(bp);
q = WR(q);
while(bp = getq(q))
freeb(bp);
}
/*
.
## diffname port/stream.c 1990/0702
## diff -e /n/bootesdump/1990/0629/sys/src/9/mips/stream.c /n/bootesdump/1990/0702/sys/src/9/mips/stream.c
248a
wq->flag = QINUSE;
.
## diffname port/stream.c 1990/0707
## diff -e /n/bootesdump/1990/0702/sys/src/9/mips/stream.c /n/bootesdump/1990/0707/sys/src/9/mips/stream.c
79,85c
print("%s %ux R n %d l %d f %ux r %ux", q->info->name, q, q->nb,
q->len, q->flag, &(q->r));
print(" W n %d l %d f %ux r %ux\n", WR(q)->nb, WR(q)->len, WR(q)->flag,
&(WR(q)->r));
dumpblocks(q, 'R');
dumpblocks(WR(q), 'W');
.
67a
dumpblocks(Queue *q, char c)
{
Block *bp;
uchar *cp;
lock(q);
for(bp = q->first; bp; bp = bp->next){
print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM)?'D':' ');
for(cp = bp->rptr; cp<bp->wptr && cp<bp->rptr+10; cp++)
print(" %uo", *cp);
print("\n");
}
unlock(q);
}
void
.
## diffname port/stream.c 1990/0801
## diff -e /n/bootesdump/1990/0707/sys/src/9/mips/stream.c /n/bootesdump/1990/0801/sys/src/9/mips/stream.c
1157a
}
/*
* stat a stream. the length is the number of bytes up to the
* first delimiter.
*/
void
streamstat(Chan *c, char *db, char *name)
{
Dir dir;
Stream *s;
Queue *q;
Block *bp;
long n;
s = c->stream;
if(s == 0)
panic("streamstat");
q = RD(s->procq);
lock(q);
for(n=0, bp=q->first; bp; bp = bp->next){
n += BLEN(bp);
if(bp->flags&S_DELIM)
break;
}
unlock(q);
devdir(c, c->qid, name, n, 0, &dir);
convD2M(&dir, db);
.
75c
print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM));
.
8a
#include "fcall.h"
.
## diffname port/stream.c 1990/08272
## diff -e /n/bootesdump/1990/0801/sys/src/9/mips/stream.c /n/bootesdump/1990/08272/sys/src/9/mips/stream.c
137c
bp->base = (uchar *)ialloc(bcp->size, i == 0);
.
135a
/*
* The i == 0 means that each allocation range
* starts on a page boundary. This makes sure
* no block crosses a page boundary.
*/
.
60,61c
{ 64 },
{ 256 },
.
50a
*
* NOTE: to help the mappings on the IO2 and IO3 boards, the data pointed
* to by a block must not cross a 4k boundary. Therefore:
* 1) all the following block sizes divide evenly into 4k
* 2) all the blocks are ialloc'd to not cross 4k boundaries
.
## diffname port/stream.c 1990/0905
## diff -e /n/bootesdump/1990/08272/sys/src/9/mips/stream.c /n/bootesdump/1990/0905/sys/src/9/mips/stream.c
1199a
/*
* announce a line discipline that can be pushed
*/
void
newqinfo(Qinfo *qi)
{
int i;
for(i=0; i<Nlds && lds[i]; i++)
if(lds[i] == qi)
return;
if(i == Nlds)
panic("pushable");
lds[i] = qi;
}
.
629,636d
482,529d
146,147c
if(bcp->size){
if(bcp->size > left){
left = bcp->size>4096 ? bcp->size : 4096;
ptr = (uchar *)ialloc(left, 1);
}
bp->base = ptr;
ptr += bcp->size;
left -= bcp->size;
}
.
135a
left = 0;
.
129a
uchar *ptr;
.
127c
int class, i, n, left;
.
122a
*
* All data areas are alligned to their size.
*
* No data area crosses a 4k boundary. This allows us to use the
* VME/SCSI/LANCE to MP bus maps on the SGI power series machines.
.
70a
* the per stream directory structure
*/
Dirtab streamdir[]={
"data", Sdataqid, 0, 0600,
"ctl", Sctlqid, 0, 0600,
};
/*
.
27,39d
## diffname port/stream.c 1990/09051
## diff -e /n/bootesdump/1990/0905/sys/src/9/mips/stream.c /n/bootesdump/1990/09051/sys/src/9/mips/stream.c
53a
{ 2048 },
.
12c
Nclass=5, /* number of block classes */
.
## diffname port/stream.c 1990/0907
## diff -e /n/bootesdump/1990/09051/sys/src/9/mips/stream.c /n/bootesdump/1990/0907/sys/src/9/mips/stream.c
1154,1170d
590a
* the per stream directory structure
*/
Dirtab streamdir[]={
"data", Sdataqid, 0, 0600,
"ctl", Sctlqid, 0, 0600,
};
/*
.
491a
* make sure the first block has n bytes
*/
Block *
pullup(Block *bp, int n)
{
Block *nbp;
int i;
/*
* this should almost always be true, the rest it
* just for to avoid every caller checking.
*/
if(BLEN(bp) >= n)
return bp;
/*
* if not enough room in the first block,
* add another to the front of the list.
if(bp->lim - bp->rptr < n){
nbp = allocb(n);
nbp->next = bp;
bp = nbp;
}
/*
* copy bytes from the trailing blocks into the first
*/
n -= BLEN(bp);
while(nbp = bp->next){
i = BLEN(nbp);
if(i > n) {
memcpy(bp->wptr, nbp->rptr, n);
bp->wptr += n;
nbp->rptr += n;
return bp;
} else {
memcpy(bp->wptr, nbp->rptr, i);
bp->wptr += i;
bp->next = nbp->next;
nbp->next = 0;
freeb(nbp);
}
}
freeb(bp);
return 0;
}
/*
.
149,157c
if(bcp->size)
bp->base = (uchar *)ialloc(bcp->size, i == 0);
.
138d
131d
128c
int class, i, n;
.
119,123d
59,66d
54d
26a
void
newqinfo(Qinfo *qi)
{
int i;
for(i=0; i<Nlds && lds[i]; i++)
if(lds[i] == qi)
return;
if(i == Nlds)
panic("pushable");
lds[i] = qi;
}
.
12c
Nclass=4, /* number of block classes */
.
## diffname port/stream.c 1990/0911
## diff -e /n/bootesdump/1990/0907/sys/src/9/mips/stream.c /n/bootesdump/1990/0911/sys/src/9/mips/stream.c
1198a
}
/*
* Dump all block information of how many blocks are in which queues
*/
void
dumpblocks(Queue *q, char c)
{
Block *bp;
uchar *cp;
lock(q);
for(bp = q->first; bp; bp = bp->next){
print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM));
for(cp = bp->rptr; cp<bp->wptr && cp<bp->rptr+10; cp++)
print(" %uo", *cp);
print("\n");
}
unlock(q);
}
void
dumpqueues(void)
{
Queue *q;
int count;
Block *bp;
Bclass *bcp;
print("\n");
for(q = qlist; q < qlist + conf.nqueue; q++, q++){
if(!(q->flag & QINUSE))
continue;
print("%s %ux R n %d l %d f %ux r %ux", q->info->name, q, q->nb,
q->len, q->flag, &(q->r));
print(" W n %d l %d f %ux r %ux\n", WR(q)->nb, WR(q)->len, WR(q)->flag,
&(WR(q)->r));
dumpblocks(q, 'R');
dumpblocks(WR(q), 'W');
}
print("\n");
for(bcp=bclass; bcp<&bclass[Nclass]; bcp++){
lock(bcp);
for(count = 0, bp = bcp->first; bp; count++, bp = bp->next)
;
unlock(bcp);
print("%d blocks of size %d\n", count, bcp->size);
}
print("\n");
.
1142,1143c
out:
/* qunlock(&s->wrlock);
poperror(); /**/
.
1091,1093c
goto out;
.
1081a
*/
.
1075,1076d
1072a
s = c->stream;
.
579,581c
for(qi = lds; qi; qi = qi->next)
if(strcmp(qi->name, name)==0)
return qi;
.
575c
Qinfo *qi;
.
244a
* pad a block to the front with n bytes
*/
Block *
padb(Block *bp, int n)
{
Block *nbp;
if(bp->base && bp->rptr-bp->base>=n){
bp->rptr -= n;
return bp;
} else {
nbp = allocb(n);
nbp->wptr = nbp->lim;
nbp->rptr = nbp->wptr - n;
nbp->next = bp;
return nbp;
}
}
/*
.
186,189d
172d
156a
* make known a stream module and call its initialization routine, if
* it has one.
*/
void
newqinfo(Qinfo *qi)
{
qi->next = lds;
lds = qi;
if(qi->reset)
(*qi->reset)();
}
/*
.
153a
/*
* make stream modules available
*/
streaminit0();
.
147c
bp->base = (uchar *)ialloc(bcp->size, 0);
.
141,145d
130a
/*
* allocate blocks, queues, and streams
*/
.
79,118d
70,77c
#include "stream.h"
.
65,66c
{ 68 },
{ 260 },
.
51,55d
27,39d
25c
static Qinfo *lds;
.
13d
## diffname port/stream.c 1990/0914
## diff -e /n/bootesdump/1990/0911/sys/src/9/mips/stream.c /n/bootesdump/1990/0914/sys/src/9/mips/stream.c
1160d
1151,1158c
n = 0;
else {
q = RD(s->procq);
lock(q);
for(n=0, bp=q->first; bp; bp = bp->next){
n += BLEN(bp);
if(bp->flags&S_DELIM)
break;
}
unlock(q);
.
## diffname port/stream.c 1990/0930
## diff -e /n/bootesdump/1990/0914/sys/src/9/mips/stream.c /n/bootesdump/1990/0930/sys/src/9/mips/stream.c
1106,1108d
1069c
if(!docopy && GLOBAL(a)){
.
1052,1060c
if(STREAMTYPE(c->qid) != Sdataqid)
return streamctlwrite(c, a, n);
.
1041,1049d
981a
if(STREAMTYPE(c->qid) != Sctlqid)
panic("streamctlwrite %lux", c->qid);
s = c->stream;
.
980a
Stream *s;
.
977c
streamctlwrite(Chan *c, void *a, long n)
.
919a
s = c->stream;
.
903,915c
if(STREAMTYPE(c->qid) != Sdataqid)
return streamctlread(c, vbuf, n);
.
901d
898,899c
int left, i;
.
886c
Queue *q;
q = (Queue *)x;
return (q->flag&QHUNGUP) || q->first!=0;
.
880a
* return the stream id
*/
long
streamctlread(Chan *c, void *vbuf, long n)
{
uchar *buf = vbuf;
char num[32];
Stream *s;
s = c->stream;
if(STREAMTYPE(c->qid) == Sctlqid){
sprint(num, "%d", s->id);
return stringread(c, buf, n, num);
} else {
if(CHDIR & c->qid)
return devdirread(c, vbuf, n, 0, 0, streamgen);
else
panic("streamctlread");
}
}
/*
.
130c
* look for a free block
.
## diffname port/stream.c 1990/1009
## diff -e /n/bootesdump/1990/0930/sys/src/9/mips/stream.c /n/bootesdump/1990/1009/sys/src/9/mips/stream.c
1177c
print("%c%d%c", c, bp->wptr-bp->rptr, (bp->flags&S_DELIM)?'D':' ');
.
819a
void
streamclose(Chan *c)
{
/*
* if no stream, ignore it
*/
if(!c->stream)
return;
streamclose1(c->stream);
}
.
783,788d
780d
776c
streamclose1(Stream *s)
.
724c
c->stream = streamnew(c->type, c->dev, STREAMID(c->qid), qi, 0);
.
702c
* if the stream already exists, just increment the reference counts.
.
673c
if(noopen)
s->opens = 0;
else
s->opens = 1;
.
661,667c
s->type = type;
s->dev = dev;
s->id = id;
.
659c
* identify the stream
.
654c
streamclose1(s);
.
631c
streamnew(ushort type, ushort dev, ushort id, Qinfo *qi, int noopen)
.
628c
* create a new stream, if noopen is non-zero, don't increment the open count
.
244a
wq->ptr = 0;
.
237a
q->ptr = 0;
.
## diffname port/stream.c 1990/1011
## diff -e /n/bootesdump/1990/1009/sys/src/9/mips/stream.c /n/bootesdump/1990/1011/sys/src/9/mips/stream.c
787,795c
if(!waserror()){
/*
* descend the stream closing the queues
*/
for(q = s->procq; q; q = q->next){
if(q->info->close)
(*q->info->close)(q->other);
/*
* this may be 2 streams joined device end to device end
*/
if(q == s->devq->other)
break;
}
poperror();
.
## diffname port/stream.c 1990/1018
## diff -e /n/bootesdump/1990/1011/sys/src/9/mips/stream.c /n/bootesdump/1990/1018/sys/src/9/mips/stream.c
1050a
qunlock(&q->rlock);
.
1049a
qlock(&q->rlock);
.
## diffname port/stream.c 1990/1101
## diff -e /n/bootesdump/1990/1018/sys/src/9/mips/stream.c /n/bootesdump/1990/1101/sys/src/9/mips/stream.c
793a
WR(q)->put = nullput;
.
769a
return rv;
.
767a
rv = s->inuse;
.
753a
int rv;
.
749c
int
.
## diffname port/stream.c 1990/1104
## diff -e /n/bootesdump/1990/1101/sys/src/9/mips/stream.c /n/bootesdump/1990/1104/sys/src/9/mips/stream.c
758a
if(s->opens != 0)
print("streamexit %d %s\n", s->opens, s->devq->info->name);
.
754a
char *name;
.
## diffname port/stream.c 1990/1113
## diff -e /n/bootesdump/1990/1104/sys/src/9/mips/stream.c /n/bootesdump/1990/1113/sys/src/9/mips/stream.c
1061a
poperror();
.
1059a
if(waserror()){
qunlock(&q->rlock);
nexterror();
}
.
137a
poperror();
.
135a
if(waserror()){
qunlock(bcp);
nexterror();
}
.
## diffname port/stream.c 1990/11151
## diff -e /n/bootesdump/1990/1113/sys/src/9/mips/stream.c /n/bootesdump/1990/11151/sys/src/9/mips/stream.c
766c
panic("streamexit %d %s\n", s->opens, s->devq->info->name);
.
19c
Qinfo procinfo =
{
stputq,
nullput,
0,
0,
"process"
};
.
## diffname port/stream.c 1990/11161
## diff -e /n/bootesdump/1990/11151/sys/src/9/mips/stream.c /n/bootesdump/1990/11161/sys/src/9/mips/stream.c
838c
qunlock(s);
.
821c
WR(q)->put = nullput;
/*
* this may be 2 streams joined device end to device end
*/
if(q == s->devq->other)
break;
.
813,819c
poperror();
.
806,810c
/*
* descend the stream closing the queues
*/
for(q = s->procq; q; q = q->next){
if(!waserror()){
.
804c
qlock(s);
.
787c
qunlock(s);
.
770c
qlock(s);
.
753c
qunlock(s);
.
749c
qunlock(s);
.
747c
qlock(s);
.
730c
qunlock(s);
.
727c
qunlock(s);
.
720c
qlock(s);
.
700c
qunlock(s);
.
667c
qunlock(s);
.
658c
qunlock(s);
.
655c
if(canqlock(s)){
.
## diffname port/stream.c 1990/11211
## diff -e /n/bootesdump/1990/11161/sys/src/9/mips/stream.c /n/bootesdump/1990/11211/sys/src/9/mips/stream.c
1106c
error(Ehungup);
.
1098c
if(STREAMTYPE(c->qid.path) != Sdataqid)
.
1026c
if(STREAMTYPE(c->qid.path) != Sctlqid)
.
981c
error(Ehungup);
.
956c
if(STREAMTYPE(c->qid.path) != Sdataqid)
.
925c
if(CHDIR & c->qid.path)
.
921c
if(STREAMTYPE(c->qid.path) == Sctlqid){
.
847,848c
return 1;
return streamclose1(c->stream);
.
840c
int
.
838a
return rv;
.
832c
rv = --(s->opens);
.
799a
int rv;
.
795c
int
.
737c
c->stream = streamnew(c->type, c->dev, STREAMID(c->qid.path), qi, 0);
.
723c
&& s->id == STREAMID(c->qid.path)){
.
719c
&& s->id == STREAMID(c->qid.path)){
.
664c
error(Enostream);
.
636c
devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0}, tab->name, tab->length,
.
611,612c
"data", {Sdataqid}, 0, 0600,
"ctl", {Sctlqid}, 0, 0600,
.
561c
error(Ebadld);
.
557c
error(Ebadld);
.
544c
error(Ehungup);
.
338c
error(Ebadld);
.
242c
error(Enoqueue);
.
## diffname port/stream.c 1990/1127
## diff -e /n/bootesdump/1990/11211/sys/src/9/mips/stream.c /n/bootesdump/1990/1127/sys/src/9/mips/stream.c
849c
return;
.
140a
if(newblock(bcp) == 0)
continue;
.
113a
* upgrade a block 0 block to another class (called with bcp qlocked)
*/
newblock(Bclass *bcp)
{
Page *page;
int n;
Block *bp;
uchar *cp;
if(bcp->made > bcp->lim)
return;
if(bcp == bclass){
/*
* create some level zero blocks and return
*/
page = newpage(1, 0, 0);
page->va = VA(kmap(page));
n = BY2PG/sizeof(Block);
bp = (Block *)(page->va);
while(n-- > 0){
bp->flags = 0;
bp->base = bp->lim = bp->rptr = bp->wptr = 0;
if(bcp->first)
bcp->last->next = bp;
else
bcp->first = bp;
bcp->last = bp;
bcp->made++;
bp++;
}
} else {
/*
* create a page worth of new blocks
*/
page = newpage(1, 0, 0);
page->va = VA(kmap(page));
n = BY2PG/bcp->size;
cp = (uchar *)(page->va);
while(n-- > 0){
/*
* upgrade a level 0 block
*/
bp = allocb(0);
qlock(bclass);
bclass->made--;
bcp->made++;
bp->flags = bcp - bclass;
qunlock(bclass);
/*
* tack on the data area
*/
bp->base = bp->rptr = bp->wptr = cp;
cp += bcp->size;
bp->lim = cp;
if(bcp->first)
bcp->last->next = bp;
else
bcp->first = bp;
bcp->last = bp;
}
}
return;
}
/*
.
84,91c
bcp->lim = n;
bcp->made = 0;
.
77,78c
/*
* set limits on blocks
*/
.
73c
* allocate queues, streams
.
69d
46a
int lim;
int made;
.
38d
## diffname port/stream.c 1990/1202
## diff -e /n/bootesdump/1990/1127/sys/src/9/mips/stream.c /n/bootesdump/1990/1202/sys/src/9/mips/stream.c
1117a
freeb(bp);
} else if(streamparse("look", bp)){
qlook(s, (char *)bp->rptr);
.
1083a
* look ldname -- look for a line discipline
.
1078a
* look for an instance of the line discipline `name' on
* the stream `s'
*/
void
qlook(Stream *s, char *name)
{
Queue *q;
for(q = s->procq; q; q = q->next){
if(strcmp(q->info->name, name) == 0)
return;
/*
* this may be 2 streams joined device end to device end
*/
if(q == s->devq->other)
break;
}
errors("not found");
}
/*
.
## diffname port/stream.c 1990/1212
## diff -e /n/bootesdump/1990/1210/sys/src/9/mips/stream.c /n/bootesdump/1990/1212/sys/src/9/port/stream.c
1170c
q->rp = &q->r;
sleep(q->rp, notfull, q->next);
.
1051c
q->rp = &q->r;
sleep(q->rp, &isinput, (void *)q);
.
957c
wakeup(q->rp);
.
931c
wakeup(q->other->rp);
.
498c
wakeup(q->other->next->other->rp);
.
325a
wq->rp = &wq->r;
.
316a
q->rp = &q->r;
.
## diffname port/stream.c 1990/1214
## diff -e /n/bootesdump/1990/1212/sys/src/9/port/stream.c /n/bootesdump/1990/1214/sys/src/9/port/stream.c
936d
917c
return 0;
.
450d
424d
174c
return 0;
.
120c
return -1;
.
111a
int
.
## diffname port/stream.c 1990/1219
## diff -e /n/bootesdump/1990/1214/sys/src/9/port/stream.c /n/bootesdump/1990/1219/sys/src/9/port/stream.c
216a
splhi();
.
## diffname port/stream.c 1990/1229
## diff -e /n/bootesdump/1990/1219/sys/src/9/port/stream.c /n/bootesdump/1990/1229/sys/src/9/port/stream.c
244a
int x;
.
217d
## diffname port/stream.c 1991/0115
## diff -e /n/bootesdump/1990/1229/sys/src/9/port/stream.c /n/bootesdump/1991/0115/sys/src/9/port/stream.c
59,60d
## diffname port/stream.c 1991/0316
## diff -e /n/bootesdump/1991/0115/sys/src/9/port/stream.c /n/bootesdump/1991/0316/sys/src/9/port/stream.c
598a
.
444a
blen(Block *bp)
{
int len;
len = 0;
while(bp) {
len += BLEN(bp);
bp = bp->next;
}
return len;
}
/*
* bround - round a block to chain to some 2^n number of bytes
*/
int
bround(Block *bp, int amount)
{
Block *last;
int len, pad;
len = 0;
SET(last);
while(bp) {
len += BLEN(bp);
last = bp;
bp = bp->next;
}
pad = ((len + amount) & ~amount) - len;
if(pad) {
last->next = allocb(pad);
memset(last->next->rptr, 0, pad);
last->next->flags |= S_DELIM;
last->flags &= ~S_DELIM;
}
return len + pad;
}
int
.
443a
.
## diffname port/stream.c 1991/0317
## diff -e /n/bootesdump/1991/0316/sys/src/9/port/stream.c /n/bootesdump/1991/0317/sys/src/9/port/stream.c
1380c
print("%d queues\n", qcount);
.
1372a
qcount++;
.
1369a
qcount = 0;
.
1365c
int count, qcount;
.
481a
last = last->next;
memset(last->wptr, 0, pad);
last->wptr += pad;
last->flags |= S_DELIM;
.
479,480d
469c
SET(last); /* Ken's magic */
.
460c
* bround - round a block chain to some 2^n number of bytes
.
## diffname port/stream.c 1991/0318
## diff -e /n/bootesdump/1991/0317/sys/src/9/port/stream.c /n/bootesdump/1991/0318/sys/src/9/port/stream.c
1280c
memmove(bp->wptr, a, i);
.
1273c
memmove(bp->wptr, a, rem);
.
1173c
memmove(bp->wptr, a, n);
.
1114c
memmove(buf, bp->rptr, left);
.
1105c
memmove(buf, bp->rptr, i);
.
1021c
memmove(buf, str + c->offset, n);
.
610c
memmove(bp->wptr, nbp->rptr, i);
.
605c
memmove(bp->wptr, nbp->rptr, n);
.
## diffname port/stream.c 1991/0320
## diff -e /n/bootesdump/1991/0318/sys/src/9/port/stream.c /n/bootesdump/1991/0320/sys/src/9/port/stream.c
1381,1382c
print(" W n %d l %d f %ux r %ux next %lux put %lux Rz %lux\n",
WR(q)->nb, WR(q)->len,
WR(q)->flag, &(WR(q)->r), q->next, q->put, q->rp);
.
## diffname port/stream.c 1991/0323
## diff -e /n/bootesdump/1991/0320/sys/src/9/port/stream.c /n/bootesdump/1991/0323/sys/src/9/port/stream.c
1379,1380c
print("%10s %ux R n %d l %d f %ux r %ux", q->info->name, q,
q->nb, q->len, q->flag, &(q->r));
.
## diffname port/stream.c 1991/0328
## diff -e /n/bootesdump/1991/0323/sys/src/9/port/stream.c /n/bootesdump/1991/0328/sys/src/9/port/stream.c
261,264d
246,259c
for(; bp; bp = nbp){
bcp = &bclass[bp->flags & S_CLASS];
lock(bcp);
bp->rptr = bp->wptr = 0;
if(bcp->first)
bcp->last->next = bp;
else
bcp->first = bp;
bcp->last = bp;
nbp = bp->next;
bp->next = 0;
unlock(bcp);
if(bcp->r.p)
wakeup(&bcp->r);
.
241d
239a
Block *nbp;
.
229a
if(bp->lim-bp->rptr<size && size<4096)
panic("allocb %lux %lux %d %ux %d", bp->lim, bp->rptr,
size, bp->flags, bcp-bclass);
.
## diffname port/stream.c 1991/0401
## diff -e /n/bootesdump/1991/0328/sys/src/9/port/stream.c /n/bootesdump/1991/0401/sys/src/9/port/stream.c
714a
while(*bp->rptr==' ' && bp->wptr>bp->rptr)
bp->rptr++;
.
## diffname port/stream.c 1991/0404
## diff -e /n/bootesdump/1991/0401/sys/src/9/port/stream.c /n/bootesdump/1991/0404/sys/src/9/port/stream.c
1252c
if(!docopy && isphys(a)){
.
## diffname port/stream.c 1991/0411
## diff -e /n/bootesdump/1991/0404/sys/src/9/port/stream.c /n/bootesdump/1991/0411/sys/src/9/port/stream.c
1249,1250c
if(q->other->flag & QHUNGUP){
if(s->err)
errors((char*)(s->err->rptr));
else
error(Ehungup);
}
.
1094c
if(s->err)
errors((char*)s->err->rptr);
else if(s->hread++<3)
.
1039c
return stringread(c, buf, n, num, c->offset);
.
1022c
memmove(buf, str + offset, n);
.
1017c
i -= offset;
.
1012c
stringread(Chan *c, uchar *buf, long n, char *str, ulong offset)
.
977c
s = q->ptr;
if(bp->rptr<bp->wptr && s->err==0)
s->err = bp;
else
freeb(bp);
.
974a
Stream *s;
.
897a
if(s->err)
freeb(s->err);
.
803a
WR(q)->ptr = s;
RD(q)->ptr = s;
.
792a
s->err = 0;
.
## diffname port/stream.c 1991/0413
## diff -e /n/bootesdump/1991/0411/sys/src/9/port/stream.c /n/bootesdump/1991/0413/sys/src/9/port/stream.c
959a
if(*err)
errors(err);
.
931c
if(waserror()){
if(*err == 0)
strncpy(err, u->error, ERRLEN-1);
} else {
.
925a
*err = 0;
.
920a
char err[ERRLEN];
.
410a
qunlock(s);
.
403a
if(waserror()){
qunlock(s);
nexterror();
}
qlock(s);
.
388a
qunlock(s);
.
384a
qlock(s);
.
## diffname port/stream.c 1991/0419
## diff -e /n/bootesdump/1991/0413/sys/src/9/port/stream.c /n/bootesdump/1991/0419/sys/src/9/port/stream.c
598a
*/
.
250a
bp->flags = S_CLASS; /* Check for doulbe free */
.
225a
bp->flags = bcp - bclass;
.
## diffname port/stream.c 1991/0420
## diff -e /n/bootesdump/1991/0419/sys/src/9/port/stream.c /n/bootesdump/1991/0420/sys/src/9/port/stream.c
252c
bp->flags = bp->flags|S_CLASS; /* Check for doulbe free */
.
249a
.
243a
ulong mark[1];
.
## diffname port/stream.c 1991/0421
## diff -e /n/bootesdump/1991/0420/sys/src/9/port/stream.c /n/bootesdump/1991/0421/sys/src/9/port/stream.c
55c
{ 268 },
.
## diffname port/stream.c 1991/0501
## diff -e /n/bootesdump/1991/0421/sys/src/9/port/stream.c /n/bootesdump/1991/0501/sys/src/9/port/stream.c
1152c
}
.
## diffname port/stream.c 1991/0502
## diff -e /n/bootesdump/1991/0501/sys/src/9/port/stream.c /n/bootesdump/1991/0502/sys/src/9/port/stream.c
1324a
poperror();
.
1288c
/*
* if an error occurs during write n,
* force a delim before write n+1
*/
if(waserror()){
s->forcedelim = 1;
nexterror();
}
if(s->forcedelim){
FLOWCTL(q);
bp = allocb(0);
bp->flags |= S_DELIM;
bp->type = M_DATA;
PUTNEXT(q, bp);
s->forcedelim = 0;
}
if(0 && !docopy && isphys(a)){
.
1270a
if(n == 1 && *((char*)a) == 1)
print("u->p->pid %d %s\n", u->p->pid, u->p->text);
.
1012a
if(BLEN(bp) == 1 && *(bp->rptr) == 1)
print("stputq u->p->pid %d %s\n", u->p->pid, u->p->text);
.
## diffname port/stream.c 1991/0504
## diff -e /n/bootesdump/1991/0502/sys/src/9/port/stream.c /n/bootesdump/1991/0504/sys/src/9/port/stream.c
810a
s->forcedelim = 0;
.
## diffname port/stream.c 1991/0507
## diff -e /n/bootesdump/1991/0504/sys/src/9/port/stream.c /n/bootesdump/1991/0507/sys/src/9/port/stream.c
1014,1016d
## diffname port/stream.c 1991/0511
## diff -e /n/bootesdump/1991/0507/sys/src/9/port/stream.c /n/bootesdump/1991/0511/sys/src/9/port/stream.c
1272,1274d
## diffname port/stream.c 1991/0516
## diff -e /n/bootesdump/1991/0511/sys/src/9/port/stream.c /n/bootesdump/1991/0516/sys/src/9/port/stream.c
254c
bp->flags = bp->flags|S_CLASS; /* Check for double free */
.
## diffname port/stream.c 1991/0614
## diff -e /n/bootesdump/1991/0516/sys/src/9/port/stream.c /n/bootesdump/1991/0614/sys/src/9/port/stream.c
421a
poperror();
.
## diffname port/stream.c 1991/0705
## diff -e /n/bootesdump/1991/0614/sys/src/9/port/stream.c /n/bootesdump/1991/0705/sys/src/9/port/stream.c
422d
144,145d
125,126d
120a
page = newpage(1, 0, 0);
page->va = VA(kmap(page));
.
## diffname port/stream.c 1991/0809
## diff -e /n/bootesdump/1991/0705/sys/src/9/port/stream.c /n/bootesdump/1991/0809/sys/src/9/port/stream.c
1131a
continue;
}
if(s->flushmsg){
if(bp->flags & S_DELIM)
s->flushmsg = 0;
freeb(bp);
.
1118d
1109a
/*
* notes will flush the rest of any partially
* read message.
*/
if(n != left)
s->flushmsg = 1;
.
1107a
left = n;
.
825a
s->flushmsg = 0;
.
## diffname port/stream.c 1991/0811
## diff -e /n/bootesdump/1991/0809/sys/src/9/port/stream.c /n/bootesdump/1991/0811/sys/src/9/port/stream.c
1165a
/*
* free completely read blocks
*/
if(tofree)
freeb(tofree);
.
1157,1158d
1154,1155c
bp->next = tofree;
tofree = bp;
if(bp->flags & S_DELIM)
.
1142,1148d
1116,1117c
while(tofree){
bp = tofree;
tofree = bp->next;
bp->next = 0;
putbq(q, bp);
}
.
1113,1114c
* put any partially read message back into the
* queue
.
1110a
tofree = 0;
.
1096a
Block *tofree;
.
## diffname port/stream.c 1991/0831
## diff -e /n/bootesdump/1991/0811/sys/src/9/port/stream.c /n/bootesdump/1991/0831/sys/src/9/port/stream.c
1465c
print("%d byte blocks: %d made %d free\n", bcp->size,
bcp->made, count);
.
213d
210,211c
newblock(bcp);
.
207c
unlock(bcp);
.
202,205d
156c
unlock(bclass);
.
152c
lock(bclass);
.
118,120d
84d
77c
* set block classes
.
49,50d
46d
## diffname port/stream.c 1991/0901
## diff -e /n/bootesdump/1991/0831/sys/src/9/port/stream.c /n/bootesdump/1991/0901/sys/src/9/port/stream.c
250,251d
## diffname port/stream.c 1991/0904
## diff -e /n/bootesdump/1991/0901/sys/src/9/port/stream.c /n/bootesdump/1991/0904/sys/src/9/port/stream.c
1097a
q = 0;
.
## diffname port/stream.c 1991/0926
## diff -e /n/bootesdump/1991/0904/sys/src/9/port/stream.c /n/bootesdump/1991/0926/sys/src/9/port/stream.c
1308,1345c
/*
* send it down stream
*/
last->flags |= S_DELIM;
FLOWCTL(q);
PUTNEXT(q, first);
.
1304,1305c
a = ((char*)a) + i;
if(first == 0)
first = bp;
else
last->next = bp;
last = bp;
if(i == rem)
break;
.
1295,1302c
first = last = 0;
for(rem = n; ; rem -= i) {
bp = allocb(rem);
i = bp->lim - bp->wptr;
if(i >= rem)
i = rem;
memmove(bp->wptr, a, i);
bp->wptr += i;
.
1292,1293c
* copy the whole write into kernel space
.
1270a
Block *bp, *first, *last;
.
1267d
811d
794d
## diffname port/stream.c 1991/1012
## diff -e /n/bootesdump/1991/0926/sys/src/9/port/stream.c /n/bootesdump/1991/1012/sys/src/9/port/stream.c
96a
if(qi->next)
panic("newqinfo: already configured");
.
## diffname port/stream.c 1991/1027
## diff -e /n/bootesdump/1991/1012/sys/src/9/port/stream.c /n/bootesdump/1991/1027/sys/src/9/port/stream.c
1327,1331c
getfields(char *lp, char **fields, int n, char sep)
.
## diffname port/stream.c 1991/1105
## diff -e /n/bootesdump/1991/1027/sys/src/9/port/stream.c /n/bootesdump/1991/1105/sys/src/9/port/stream.c
1193c
* This routing is entered with s->wrlock'ed and must unlock.
.
## diffname port/stream.c 1991/1107
## diff -e /n/bootesdump/1991/1105/sys/src/9/port/stream.c /n/bootesdump/1991/1107/sys/src/9/port/stream.c
1420c
qunlock(bcp);
.
1417c
qlock(bcp);
.
619a
* expand a block list to be one byte, len bytes long
*/
Block*
expandb(Block *bp, int len)
{
Block *nbp, *new;
int i;
new = allocb(len);
if(new == 0){
freeb(bp);
return 0;
}
/*
* copy bytes into new block
*/
for(nbp = bp; len>0 && nbp; nbp = nbp->next){
i = BLEN(bp);
if(i > len) {
memmove(new->wptr, nbp->rptr, len);
new->wptr += len;
break;
} else {
memmove(new->wptr, nbp->rptr, i);
new->wptr += i;
len -= i;
}
}
if(len){
memset(new->wptr, 0, len);
new->wptr += len;
}
freeb(bp);
return new;
}
/*
.
252c
qunlock(bcp);
.
243c
qlock(bcp);
.
209c
qunlock(bcp);
.
199c
qunlock(bcp);
.
196c
qlock(bcp);
.
152c
qunlock(bclass);
.
148c
qlock(bclass);
.
46a
QLock;
.
## diffname port/stream.c 1991/1109
## diff -e /n/bootesdump/1991/1107/sys/src/9/port/stream.c /n/bootesdump/1991/1109/sys/src/9/port/stream.c
1410c
devdir(c, c->qid, name, n, eve, 0, &dir);
.
790,791c
devdir(c, (Qid){STREAMQID(STREAMID(c->qid.path),tab->qid.path), 0},
tab->name, tab->length, eve, tab->perm, dp);
.
## diffname port/stream.c 1991/1115
## diff -e /n/bootesdump/1991/1109/sys/src/9/port/stream.c /n/bootesdump/1991/1115/sys/src/9/port/stream.c
1095c
return stringread(buf, n, num, c->offset);
.
1068c
stringread(uchar *buf, long n, char *str, ulong offset)
.
691a
USED(q);
.
## diffname port/stream.c 1991/1121
## diff -e /n/bootesdump/1991/1115/sys/src/9/port/stream.c /n/bootesdump/1991/1121/sys/src/9/port/stream.c
653a
new->flags |= delim;
.
639c
delim = nbp->flags & S_DELIM;
i = BLEN(nbp);
.
627a
ulong delim = 0;
.
## diffname port/stream.c 1991/1122
## diff -e /n/bootesdump/1991/1121/sys/src/9/port/stream.c /n/bootesdump/1991/1122/sys/src/9/port/stream.c
217a
bp->list = 0;
.
## diffname port/stream.c 1991/1126
## diff -e /n/bootesdump/1991/1122/sys/src/9/port/stream.c /n/bootesdump/1991/1126/sys/src/9/port/stream.c
543c
if((q->flag&QHIWAT) && q->len<Streamhi/2 && q->nb<Streambhi/2 &&q->other){
.
## diffname port/stream.c 1991/1227
## diff -e /n/bootesdump/1991/1126/sys/src/9/port/stream.c /n/bootesdump/1991/1227/sys/src/9/port/stream.c
1009,1010d
977,980c
if(!waserror()){
.
971d
965d
## diffname port/stream.c 1992/0101
## diff -e /n/bootesdump/1991/1227/sys/src/9/port/stream.c /n/bootesdump/1992/0101/sys/src/9/port/stream.c
119c
page->va = VA(kmapperm(page));
.
## diffname port/stream.c 1992/0111
## diff -e /n/bootesdump/1992/0101/sys/src/9/port/stream.c /n/bootesdump/1992/0111/sys/src/9/port/stream.c
7c
#include "../port/error.h"
.
## diffname port/stream.c 1992/0114
## diff -e /n/bootesdump/1992/0111/sys/src/9/port/stream.c /n/bootesdump/1992/0114/sys/src/9/port/stream.c
1325c
error((char*)(s->err->rptr));
.
1220c
error(Ebadarg);
.
1162c
error((char*)s->err->rptr);
.
823c
exhausted("streams");
.
299c
exhausted("queues");
.
## diffname port/stream.c 1992/0207
## diff -e /n/bootesdump/1992/0114/sys/src/9/port/stream.c /n/bootesdump/1992/0207/sys/src/9/port/stream.c
1169c
sleep(q->rp, isinput, (void *)q);
.
## diffname port/stream.c 1992/0222
## diff -e /n/bootesdump/1992/0207/sys/src/9/port/stream.c /n/bootesdump/1992/0222/sys/src/9/port/stream.c
239,240c
pc = getcallerpc(((uchar*)&bp) - sizeof(bp));
if((bp->flags&S_CLASS) >= Nclass) /* Check for double free */
panic("freeb class last(%lux) this(%lux)", bp->pc, pc);
bp->pc = pc;
.
237a
ulong pc;
.
## diffname port/stream.c 1992/0305
## diff -e /n/bootesdump/1992/0222/sys/src/9/port/stream.c /n/bootesdump/1992/0305/sys/src/9/port/stream.c
1359,1360c
FLOWCTL(q, first);
.
1296,1299c
PUTNEXT(q, bp);
.
1294c
poperror();
.
1291,1292c
if(bp->type != M_HANGUP){
qlock(&q->rlock);
if(waserror()){
qunlock(&q->rlock);
freeb(bp);
nexterror();
}
q->rp = &q->r;
sleep(q->rp, notfull, q->next);
.
1289c
flowctl(Queue *q, Block *bp)
.
## diffname port/stream.c 1992/0318
## diff -e /n/bootesdump/1992/0305/sys/src/9/port/stream.c /n/bootesdump/1992/0318/sys/src/9/port/stream.c
1415a
}
Block *
copyb(Block *bp, int count)
{
Block *nb, *head, **p;
int l;
p = &head;
while(count) {
l = BLEN(bp);
if(count < l)
l = count;
nb = allocb(l);
if(nb == 0)
panic("copyb.1");
memmove(nb->wptr, bp->rptr, l);
nb->wptr += l;
count -= l;
if(bp->flags & S_DELIM)
nb->flags |= S_DELIM;
*p = nb;
p = &nb->next;
bp = bp->next;
if(bp == 0)
break;
}
if(count) {
nb = allocb(count);
if(nb == 0)
panic("copyb.2");
memset(nb->wptr, 0, count);
nb->wptr += count;
nb->flags |= S_DELIM;
*p = nb;
}
if(blen(head) == 0)
print("copyb: zero length\n");
return head;
.
1262a
if(qi == 0)
error(Ebadld);
.
721c
return 0;
.
717c
return 0;
.
711c
Qinfo *
.
## diffname port/stream.c 1992/0319
## diff -e /n/bootesdump/1992/0318/sys/src/9/port/stream.c /n/bootesdump/1992/0319/sys/src/9/port/stream.c
1510a
for(i = 0; i < 100; i++) {
if(refsa[i] != refsf[i])
print("%d alloc %lux %d free %lux\n", refsa[i], apcs[i], refsf[i], fpcs[i]);
}
.
1497a
if(q->pg)
print("get %d %s ", q->pg->pid, q->pg->text);
if(q->pp)
print("put %d %s ", q->pp->pid, q->pp->text);
print("\n");
.
1495c
if(q->pg)
print("get %d %s ", q->pg->pid, q->pg->text);
if(q->pp)
print("put %d %s ", q->pp->pid, q->pp->text);
print(" W n %d l %d f %ux r %ux next %lux put %lux Rz %lux",
.
1493c
print("%10s %ux R n %d l %d f %ux r %ux ",
q->info->name, q,
.
1486c
int i;
.
1471,1473c
print("%c %c%d%c", c, bp->type == M_DATA ? 'd' : 'c',
bp->wptr-bp->rptr, (bp->flags&S_DELIM)?'D':' ');
for(cp = bp->rptr; cp<bp->wptr && cp<bp->rptr+30; cp++)
print(" %.2x", *cp);
.
1040a
q->pp = u->p;
.
538a
q->pg = u->p;
.
426a
q->pp = u->p;
.
242a
bcp = &bclass[bp->flags & S_CLASS];
if(bcp->size == 68) {
for(i = 0; i < 100; i++)
if(apcs[i] == bp->pc) {
refsf[i]++;
fpcs[i] = pc;
}
}
.
238a
int i;
.
223a
bp->pc = pc;
.
209a
if(bcp->size == 68) {
int i, hole, fnd;
fnd = 0;
hole = -1;
for(i = 0; i < 100; i++) {
if(apcs[i] == pc) {
refsa[i]++;
fnd = 1;
break;
}
if(refsa[i] == 0 && hole<0)
hole = i;
}
if(fnd == 0 && hole>=0) {
refsa[hole] = 1;
apcs[hole] = pc;
}
}
.
187a
pc = getcallerpc(((uchar*)&size) - sizeof(size));
.
186a
ulong pc;
.
10a
ulong fpcs[100], refsa[100], refsf[100], apcs[100];
.
## diffname port/stream.c 1992/0321
## diff -e /n/bootesdump/1992/0319/sys/src/9/port/stream.c /n/bootesdump/1992/0321/sys/src/9/port/stream.c
2c
#include "../port/lib.h"
.
## diffname port/stream.c 1992/0326
## diff -e /n/bootesdump/1992/0321/sys/src/9/port/stream.c /n/bootesdump/1992/0326/sys/src/9/port/stream.c
1561,1564d
1543,1546d
1536,1539d
1526c
.
1079d
576d
463d
269,278d
264d
248d
215,233d
191,192d
189d
11,12d
## diffname port/stream.c 1992/0509
## diff -e /n/bootesdump/1992/0326/sys/src/9/port/stream.c /n/bootesdump/1992/0509/sys/src/9/port/stream.c
1509d
1506d
## diffname port/stream.c 1992/0520
## diff -e /n/bootesdump/1992/0509/sys/src/9/port/stream.c /n/bootesdump/1992/0520/sys/src/9/port/stream.c
1102a
return 0; /* not reached */
.
## diffname port/stream.c 1992/0529
## diff -e /n/bootesdump/1992/0520/sys/src/9/port/stream.c /n/bootesdump/1992/0529/sys/src/9/port/stream.c
625c
* expand a block list to be one block, len bytes long
.
607c
if(i >= n) {
.
## diffname port/stream.c 1992/0603
## diff -e /n/bootesdump/1992/0529/sys/src/9/port/stream.c /n/bootesdump/1992/0603/sys/src/9/port/stream.c
244c
#endif asdf
.
239a
#ifdef asdf
.
220,223d
174,181d
## diffname port/stream.c 1992/0609
## diff -e /n/bootesdump/1992/0603/sys/src/9/port/stream.c /n/bootesdump/1992/0609/sys/src/9/port/stream.c
545a
* grab all the blocks in a queue
*/
Block *
grabq(Queue *q)
{
Block *bp;
lock(q);
bp = q->first;
if(bp){
q->first = 0;
q->last = 0;
q->len = 0;
q->nb = 0;
if(q->flag&QHIWAT){
wakeup(q->other->next->other->rp);
q->flag &= ~QHIWAT;
}
}
unlock(q);
return bp;
}
/*
.
## diffname port/stream.c 1992/0619
## diff -e /n/bootesdump/1992/0609/sys/src/9/port/stream.c /n/bootesdump/1992/0619/sys/src/9/port/stream.c
1519,1525d
1500d
228,246c
while(bp){
bp->rptr = 0;
bp->wptr = 0;
next = bp->next;
free(bp);
bp = next;
.
222,226c
Block *next;
.
186,208c
data = (uchar*)bp + sizeof(Block);
bp->rptr = data;
bp->wptr = data;
bp->base = data;
bp->lim = data+size;
bp->flags = 0;
.
180,184c
bp = smalloc(sizeof(Block)+size);
.
178c
uchar *data;
.
108,171d
75,85d
71,72c
slist = (Stream *)xalloc(conf.nstream * sizeof(Stream));
qlist = (Queue *)xalloc(conf.nqueue * sizeof(Queue));
.
65,66d
41,57d
33,35d
9d
## diffname port/stream.c 1992/0623
## diff -e /n/bootesdump/1992/0619/sys/src/9/port/stream.c /n/bootesdump/1992/0623/sys/src/9/port/stream.c
1361,1381d
1355c
return i;
.
1347,1353c
for(i=0; lp && *lp && i<n; i++){
while(*lp == sep)
*lp++=0;
if(*lp == 0)
break;
fields[i]=lp;
while(*lp && *lp != sep)
lp++;
.
1344,1345c
int i;
.
1341,1342c
int
getfields(char *lp, char **fields, int n, char sep)
.
1339c
* like andrew's getmfields but no hidden state
.
1335c
/*
* parse a string and return a pointer to the second element if the
* first matches name. bp->rptr will be updated to point to the
* second element.
*
* return 0 if no match.
*
* it is assumed that the block data is null terminated. streamwrite
* guarantees this.
*/
int
streamparse(char *name, Block *bp)
{
int len;
len = strlen(name);
if(BLEN(bp) < len)
return 0;
if(strncmp(name, (char *)bp->rptr, len)==0){
if(bp->rptr[len] == ' ')
bp->rptr += len+1;
else if(bp->rptr[len])
return 0;
else
bp->rptr += len;
while(*bp->rptr==' ' && bp->wptr>bp->rptr)
bp->rptr++;
return 1;
}
return 0;
.
1304,1333c
bp = allocb(0);
bp->type = M_HANGUP;
(*s->devq->put)(s->devq, bp);
}
.
1301,1302c
Block *bp;
.
1298,1299c
/*
* send a hangup up a stream
*/
static void
hangup(Stream *s)
.
1248,1267d
973c
return readstr(c->offset, buf, n, num);
.
966c
char *buf = vbuf;
.
943,960d
883d
877c
rv = s->opens;
qunlock(s);
.
850c
if(s->opens-- == 1){
.
847c
* decrement the open count
.
836,837c
* Decrement the open count. When it goes to zero, call the close
* routines for each queue in the stream.
.
828,832c
qunlock(hb);
.
826a
/*
* unchain it from the hash bucket and free
*/
l = &hb->s;
for(ns = hb->s; ns; ns = ns->next){
if(s == ns){
*l = s->next;
break;
}
l = &ns->next;
}
free(s);
.
824d
811,813c
hb = hash(s->type, s->dev, s->id);
qlock(hb);
if(s->inuse-- == 1){
.
809a
Sthash *hb;
Stream **l, *ns;
.
808d
803c
void
.
789,796c
Sthash *hb;
Stream *ns;
hb = hash(s->type, s->dev, s->id);
qlock(hb);
for(ns = hb->s; ns; ns = ns->next)
if(s->type == ns->type && s->dev == ns->dev && s->id == ns->id){
s->inuse++;
qunlock(hb);
if(s->opens == 0){
streamexit(s, 1);
return -1;
}
return 0;
}
qunlock(hb);
return -1;
.
783,784c
* Enter a stream only if the stream exists and is open. Increment the
* reference count so it can't disappear under foot.
*
* Return -1 if the stream no longer exists or is not opened.
.
753,778d
748c
* Associate a stream with a channel
.
727d
722d
719a
* The ordering of these 2 instructions is very important.
* It makes sure we finish the stream initialization before
* anyone else can access it.
*/
qlock(s);
qunlock(hb);
if(waserror()){
qunlock(s);
streamclose1(s);
nexterror();
}
/*
.
717a
s->hread = 0;
s->next = hb->s;
hb->s = s;
.
713a
s = smalloc(sizeof(Stream));
s->inuse = 1;
.
712c
* create and init a new stream
.
701,709d
698a
return s;
.
692,696c
qlock(hb);
for(s = hb->s; s; s = s->next) {
if(s->type == type && s->dev == dev && s->id == id){
s->inuse++;
qunlock(hb);
if(noopen == 0){
qlock(s);
s->opens++;
.
690c
* if the stream already exists, just increment the reference counts.
.
688a
hb = hash(type, dev, id);
.
687a
Sthash *hb;
.
680a
* return a hash bucket for a stream
*/
static Sthash*
hash(int type, int dev, int id)
{
return &ht[(type*7*7 + dev*7 + id) & Nmask];
}
/*
.
647,654d
629,643c
static void hangup(Stream*);
void
streaminit(void)
{
/*
* make stream modules available
*/
streaminit0();
.
627c
Nbits= 5,
Nhash= 1<<Nbits,
Nmask= Nhash-1,
};
typedef struct Sthash Sthash;
struct Sthash
{
QLock;
Stream *s;
};
static Sthash ht[Nhash];
.
624,625c
enum
.
615,622c
* hash buckets containing all streams
.
609,613d
604,607c
Dirtab streamdir[]={
"data", {Sdataqid}, 0, 0600,
"ctl", {Sctlqid}, 0, 0600,
};
.
602c
* the per stream directory structure
.
593,600d
588,591d
586c
* Part 3) Streams
.
502,570d
461,500d
452,459d
301,346d
211d
201c
flushq(Queue *q)
.
198c
* flush a queue
.
194a
free(RD(q));
.
185c
freeq(Queue *q)
.
182c
* free a queue
.
176,177d
153,157d
143,151c
q = smalloc(2*sizeof(Queue));
.
134a
* make sure the first block has n bytes
*/
Block *
pullup(Block *bp, int n)
{
Block *nbp;
int i;
/*
* this should almost always be true, the rest it
* just for to avoid every caller checking.
*/
if(BLEN(bp) >= n)
return bp;
/*
* if not enough room in the first block,
* add another to the front of the list.
*/
if(bp->lim - bp->rptr < n){
nbp = allocb(n);
nbp->next = bp;
bp = nbp;
}
/*
* copy bytes from the trailing blocks into the first
*/
n -= BLEN(bp);
while(nbp = bp->next){
i = BLEN(nbp);
if(i >= n) {
memmove(bp->wptr, nbp->rptr, n);
bp->wptr += n;
nbp->rptr += n;
return bp;
} else {
memmove(bp->wptr, nbp->rptr, i);
bp->wptr += i;
bp->next = nbp->next;
nbp->next = 0;
freeb(nbp);
n -= i;
}
}
freeb(bp);
return 0;
}
/*
* return the number of data bytes of a list of blocks
*/
int
blen(Block *bp)
{
int len;
len = 0;
while(bp) {
len += BLEN(bp);
bp = bp->next;
}
return len;
}
/*
* round a block chain to some even number of bytes. Used
* by devip.c becuase all IP packets must have an even number
* of bytes.
*
* The last block in the returned chain will have S_DELIM set.
*/
int
bround(Block *bp, int amount)
{
Block *last;
int len, pad;
len = 0;
SET(last); /* Ken's magic */
while(bp) {
len += BLEN(bp);
last = bp;
bp = bp->next;
}
pad = ((len + amount) & ~amount) - len;
if(pad) {
if(last->lim - last->wptr >= pad){
memset(last->wptr, 0, pad);
last->wptr += pad;
} else {
last->next = allocb(pad);
last->flags &= ~S_DELIM;
last = last->next;
last->wptr += pad;
last->flags |= S_DELIM;
}
}
return len + pad;
}
/*
* expand a block list to be one block, len bytes long. used by
* ethernet routines.
*/
Block*
expandb(Block *bp, int len)
{
Block *nbp, *new;
int i;
ulong delim = 0;
new = allocb(len);
if(new == 0){
freeb(bp);
return 0;
}
/*
* copy bytes into new block
*/
for(nbp = bp; len>0 && nbp; nbp = nbp->next){
delim = nbp->flags & S_DELIM;
i = BLEN(nbp);
if(i > len) {
memmove(new->wptr, nbp->rptr, len);
new->wptr += len;
break;
} else {
memmove(new->wptr, nbp->rptr, i);
new->wptr += i;
len -= i;
}
}
if(len){
memset(new->wptr, 0, len);
new->wptr += len;
}
new->flags |= delim;
freeb(bp);
return new;
}
/*
* make a copy of the first 'count' bytes of a block chain. Use
* by transport protocols.
*/
Block *
copyb(Block *bp, int count)
{
Block *nb, *head, **p;
int l;
p = &head;
while(count) {
l = BLEN(bp);
if(count < l)
l = count;
nb = allocb(l);
if(nb == 0)
panic("copyb.1");
memmove(nb->wptr, bp->rptr, l);
nb->wptr += l;
count -= l;
if(bp->flags & S_DELIM)
nb->flags |= S_DELIM;
*p = nb;
p = &nb->next;
bp = bp->next;
if(bp == 0)
break;
}
if(count) {
nb = allocb(count);
if(nb == 0)
panic("copyb.2");
memset(nb->wptr, 0, count);
nb->wptr += count;
nb->flags |= S_DELIM;
*p = nb;
}
if(blen(head) == 0)
print("copyb: zero length\n");
return head;
}
/*
* Part 2) Queues
*/
/*
* process end line discipline
*/
static void stputq(Queue*, Block*);
Qinfo procinfo =
{
stputq,
nullput,
0,
0,
"process"
};
/*
* line disciplines that can be pushed
*/
static Qinfo *lds;
/*
* make known a stream module and call its initialization routine, if
* it has one.
*/
void
newqinfo(Qinfo *qi)
{
if(qi->next)
panic("newqinfo: already configured");
qi->next = lds;
lds = qi;
if(qi->reset)
(*qi->reset)();
}
/*
* find the info structure for line discipline 'name'
*/
Qinfo *
qinfofind(char *name)
{
Qinfo *qi;
if(name == 0)
return 0;
for(qi = lds; qi; qi = qi->next)
if(strcmp(qi->name, name)==0)
return qi;
return 0;
}
/*
.
115c
* Pad a block to the front with n bytes. This is used to add protocol
* headers to the front of blocks.
.
98c
* someone trying to access it after freeing will cause a panic.
.
84,88c
base = (uchar*)bp + sizeof(Block);
lim = (uchar*)bp + msize(bp);
bp->wptr = bp->rptr = lim - size;
bp->base = base;
bp->lim = lim;
.
80c
uchar *base, *lim;
.
30,75d
28c
* Allocate a block. Put the data portion at the end of the smalloc'd
* chunk so that it can easily grow from the front to add protocol
* headers. Thank Larry Peterson for the suggestion.
.
17,25d
15c
* Part 1) Blocks
.
10,13d
## diffname port/stream.c 1992/0625
## diff -e /n/bootesdump/1992/0623/sys/src/9/port/stream.c /n/bootesdump/1992/0625/sys/src/9/port/stream.c
1325,1329d
1222,1227d
1220a
}
.
1212,1219c
va += i;
rem -= i;
if(rem > 0){
FLOWCTL(q, bp);
} else {
bp->flags |= S_DELIM;
FLOWCTL(q, bp);
.
1210c
bp = allocb(i);
memmove(bp->wptr, va, i);
.
1204,1208c
va = a;
rem = n;
for(;;){
if(rem > Streamhi)
i = Streamhi;
else
.
1202c
* Write the message using blocks <= Streamhi bytes longs
.
1180c
Block *bp;
char *va;
.
940c
if(awaken)
.
936c
awaken = 1;
.
931c
awaken |= bp->flags & S_DELIM;
.
926c
awaken = bp->flags & S_DELIM;
.
917c
awaken = 1;
.
905c
int awaken;
.
837a
* nail down a stream so that it can't be closed
*/
void
naildownstream(Stream *s)
{
s->opens++;
s->inuse++;
}
/*
.
629,637d
278c
static Streamput stputq;
.
## diffname port/stream.c 1992/0711
## diff -e /n/bootesdump/1992/0625/sys/src/9/port/stream.c /n/bootesdump/1992/0711/sys/src/9/port/stream.c
1182a
/*
* docopy will get used if I ever figure out when to avoid copying
* data. -- presotto
*/
USED(docopy);
.
886c
streamexit(s);
.
846d
792d
788c
streamexit(Stream *s)
.
774c
streamexit(s);
.
640,642d
## diffname port/stream.c 1992/0826
## diff -e /n/bootesdump/1992/0711/sys/src/9/port/stream.c /n/bootesdump/1992/0826/sys/src/9/port/stream.c
1257c
devdir(c, c->qid, name, n, eve, perm, &dir);
.
1235c
streamstat(Chan *c, char *db, char *name, long perm)
.
## diffname port/stream.c 1993/0511
## diff -e /n/bootesdump/1992/0826/sys/src/9/port/stream.c /n/fornaxdump/1993/0511/sys/src/brazil/port/stream.c
955,1323c
memmove(p, b->rp, len);
.
936,953c
n = BLEN(b);
if(n < len){
memmove(p, b->rp, n);
.
934a
return -1;
.
844,933c
lock(q);
b = q->first;
if(b == 0){
q->state |= Qcsleep;
.
841,842c
Block *b;
int n;
.
839c
consume(Queue *q, uchar *p, int len, int drop)
.
827,837d
825c
* copy out of a queue, returns # bytes copied
.
780,821c
return b;
.
764,778c
b->base = (uchar*)(b+1);
b->rp = b->wp = b->base;
b->lim = b->base + size;
.
752,762c
b = alloc(sizeof(Block) + size);
if(b == 0)
exhausted("blocks");
.
749,750c
Block *b;
.
743,747c
Block*
allocb(int size)
.
671,740c
q = smalloc(sizeof(Queue));
q->limit = limit;
.
669d
615,667d
601,613c
Queue*
allocq(int limit)
.
599c
* allocate queues and blocks
.
589,595c
/* start garbage collector */
kproc("buffer", bgc, 0);
.
587c
blockinit(void)
.
582,585d
569,579c
lock(&freed);
b->next = freed->first;
freed->first = b;
unlock(&freed);
.
563,567c
void
freeb(Block *b)
.
559,560d
382,556c
for(; b; b = nb){
nb = b->next;
free(b);
.
374,380c
x = slphi();
lock(&freed);
b = freed->first;
freed->first = freed->last = 0;;
unlock(&freed);
spllo();
.
365,372c
USED(arg);
for(;;){
tsleep(&freed->r, return0, 0, 500);
if(freed->first == 0)
continue;
.
363c
Block *b, *nb;
.
361c
bgc(void *arg)
.
19,359d
15,17c
* Interrupt handlers use freeb() to release blocks. They are
* garbage collected by the kproc running bgc().
.
10,12c
static Queue *freed;
.
8d
6d
1d
## diffname port/stream.c 1993/0512
## diff -e /n/fornaxdump/1993/0511/sys/src/brazil/port/stream.c /n/fornaxdump/1993/0512/sys/src/brazil/port/stream.c
99,102c
if(n < len)
len = n;
memmove(p, b->rp, len);
if(len == n || drop){
q->first = b->next;
ifree(b);
} else
b->rp += len;
unlock(q);
return len;
}
int
produce(Queue *q, uchar *p, int len)
{
Block *b;
b = ialloc(sizeof(Block)
.
94c
q->state |= Qstarve;
.
83c
* Interrupt level copy out of a queue, return # bytes copied. If drop is
* set, any bytes left in a block afer a consume are discarded.
.
65a
/*
* allocate queues and blocks
*/
.
62,63c
cl = &arena.freed;
p = a;
lock(cl);
p->next = cl->first;
cl->first = p;
unlock(cl);
.
60c
Chunk *p;
Chunkl *cl;
.
54,58c
void
ifree(void *a)
.
50,51c
int pow;
Chunkl *cl;
Chunk *p;
for(pow = Min; pow <= Maxpow; pow++)
if(size <= (1<<pow)){
cl = &arena.alloc[pow];
lock(cl);
p = cl->first;
if(p){
cl->have--;
cl->first = p->next;
}
unlock(cl);
return (void*)p;
}
panic("ialloc %d\n", size);
.
47,48c
void*
ialloc(int size)
.
41,44c
int pow;
Chunkl *cl;
for(pow = Minpow; pow <= Maxpow; pow++){
cl = &arena.alloc[pow];
cl->goal = Maxpow-pow + 4;
}
/* start garbage collector */
kproc("iallockproc", iallockproc, 0);
.
39c
iallocinit(void)
.
31,33c
/* make sure we have blocks available for interrupt level */
for(pow = Minpow; pow <= Maxpow; pow++){
cl = &arena.alloc[pow];
if(cl->have >= cl->goal){
cl->had = cl->have;
continue;
}
/* increase goal if we've been drained twice in a row */
if(cl->have == 0 && cl->had == 0)
cl->goal += cl->goal>>2;
cl->had = cl->have;
l = &first;
for(i = x = cl->goal - cl->have; x > 0; x--){
p = alloc(1<<pow);
if(p == 0)
break;
*l = p;
l = &p->next;
}
if(first){
x = splhi();
lock(cl);
*l = cl->first;
cl->first = first;
cl->have += i;
unlock(cl);
spllo(x);
}
.
24,29c
/* really free what was freed at interrupt level */
cl = &arena.freed;
if(cl->first){
x = slphi();
lock(cl);
first = cl->first;
cl->first = 0;
unlock(cl);
spllo();
for(; first; first = p){
p = first->next;
free(first);
}
}
.
21,22d
16c
Chunk *p, *first, **l;
Chunkl *cl;
int pow, x, i;
.
14c
iallockproc(void *arg)
.
10,11c
* Manage interrupt level memory allocation.
.
8a
enum
{
Minpow= 7,
Maxpow= 12,
};
struct Chunk
{
Chunk *next;
};
struct Alloc
{
Lock;
Chunk *first;
int had;
int goal;
int last;
};
struct Arena
{
Chunkl alloc[Maxpow-Minpow+1];
Chunkl freed;
};
static Arena arena;
.
7c
typedef struct Chunk Chunk;
typedef struct Chunkl Chunkl;
typedef struct Arena Arena;
.
## diffname port/stream.c 1993/0513
## diff -e /n/fornaxdump/1993/0512/sys/src/brazil/port/stream.c /n/fornaxdump/1993/0513/sys/src/brazil/port/stream.c
207c
lock(q);
b = q->rfirst;
if(b){
/* hand to waiting receiver */
n = b->lim - b->wp;
if(n < len)
len = n;
memmove(b->wp, p, len);
b->wp += len;
q->rfirst = b->next;
wakeup(&b->r);
unlock(q);
return len;
}
/* no waiting receivers, buffer */
if(q->len >= q->limit)
return -1;
b = ialloc(sizeof(Block)+len);
if(b == 0)
return -1;
b->base = (uchar*)(b+1);
b->rp = b->base;
b->wp = b->lim = b->base + len;
memmove(b->rp, p, len);
if(q->bfirst)
q->blast->next = b;
else
q->bfirst = b;
q->last = b;
q->len += len;
unlock(q);
return len;
}
/*
* called by non-interrupt code
*/
Queue*
qopen(int limit)
{
Queue *q;
q = malloc(sizeof(Queue));
if(q == 0)
exhausted("Queues");
q->limit = limit;
}
static int
bfilled(void *a)
{
Block *b = a;
return b->wp - b->rp;
}
long
qread(Queue *q, char *p, int len, int drop)
{
Block *b, *bb;
int x, n;
/* ... to be replaced by a mapping */
b = allocb(len);
x = splhi();
lock(q);
bb = q->bfirst;
if(bb == 0){
/* wait for our block to be filled */
if(q->rfirst)
q->rlast->next = b;
else
q->rfirst = b;
q->rlast = b;
unlock(q);
splx(x);
sleep(&b->r, bfilled, b);
n = BLEN(b);
memmove(p, b->rp, n);
return n;
}
/* grab a block from the buffer */
n = BLEN(b);
if(drop || n <= len){
q->bfirst = b->next;
q->len -= n;
unlock(q);
slpx(x);
memmove(p, b->rp, n);
} else {
n = len;
q->len -= n;
memmove(p, b->rp, n);
b->rp += n;
unlock(q);
slpx(x);
}
free(b);
return n;
}
static int
qnotfull(void *a)
{
Queue *q = a;
return q->len < q->limit;
}
long
qwrite(Queue *q, char *p, int len)
{
Block *b;
int x, n;
b = allocb(len);
memmove(b->rp, p, len);
b->wp += len;
/* flow control */
if(!qnotfull(q)){
qlock(&q->wlock);
sleep(&q->r, qnotfull, q);
qunlock(&q->wlock);
}
x = splhi();
lock(q);
if(q->bfirst)
q->blast->next = b;
else
q->bfirst = b;
q->blast = b;
q->len += len;
unlock(q);
splx(x);
return len;
.
198a
if(drop || len == n)
ifree(b);
.
197a
q->len -= len;
/* wakeup flow controlled writers */
if(q->len+len >= q->limit && q->len < q->limit)
wakeup(&q->r);
.
193,196c
if(drop || len == n)
q->bfirst = b->next;
else
.
183c
b = q->bfirst;
.
163c
exhausted("Blocks");
.
161c
b = malloc(sizeof(Block) + size);
.
83c
p = malloc(1<<pow);
.
79a
else {
x = cl->goal/2;
if(cl->goal > 4 && cl->had > x && cl->have > x)
cl->goal--;
}
.
77,78c
/*
* increase goal if we've been drained, decrease
* goal if we've had lots of blocks twice in a row.
*/
if(cl->have == 0)
.
## diffname port/stream.c 1993/0515
## diff -e /n/fornaxdump/1993/0513/sys/src/brazil/port/stream.c /n/fornaxdump/1993/0515/sys/src/brazil/port/stream.c
243,254c
b = q->first;
if(append && b && b->lim-b->wp <= len){
memmove(b->wp, p, len);
b->wp += len;
} else {
b = ialloc(sizeof(Block)+len);
if(b == 0)
return -1;
b->base = (uchar*)(b+1);
b->rp = b->base;
b->wp = b->lim = b->base + len;
memmove(b->rp, p, len);
if(q->bfirst)
q->blast->next = b;
else
q->bfirst = b;
q->last = b;
}
.
221c
produce(Queue *q, uchar *p, int len, int append)
.
## diffname port/stream.c 1993/0522
## diff -e /n/fornaxdump/1993/0515/sys/src/brazil/port/stream.c /n/fornaxdump/1993/0522/sys/src/brazil/port/stream.c
367a
if((q->state & Qstarve) && q->kick){
q->stat &= ~Qstarve;
(*q->kick)(q->arg);
}
.
277a
q->kick = kick;
q->arg = arg;
.
270c
qopen(int limit, void (*kick)(void*), void *arg)
.
## diffname port/stream.c 1993/0525
## diff -e /n/fornaxdump/1993/0522/sys/src/brazil/port/stream.c /n/fornaxdump/1993/0525/sys/src/brazil/port/stream.c
371c
q->state &= ~Qstarve;
.
349c
int x;
.
331c
splx(x);
.
327,329c
q->len -= n;
unlock(q);
splx(x);
memmove(p, bb->rp, n);
bb->rp += n;
/* free it or put it back */
if(drop || bb->rp == bb->wp)
free(bb);
else {
x = splhi();
lock(q);
bb->next = q->bfirst;
q->bfirst = bb;
.
317,325c
/* copy from a buffered block */
q->bfirst = bb->next;
n = BLEN(bb);
if(n > len)
.
313a
free(b);
.
296c
/* ... to be replaced by a kmapping if need be */
.
291c
qread(Queue *q, char *p, int len)
.
279a
q->state = Qmsg;
return q;
.
259c
q->blast = b;
.
250a
}
.
249c
if(b == 0){
unlock(q);
.
243,244c
}
/* save in buffer */
b = q->bfirst;
if((q->state&Qmsg)==0 && b && b->lim-b->wp <= len){
.
240,241c
/* no waiting receivers, room in buffer? */
if(q->len >= q->limit){
unlock(q);
.
223a
int n;
.
221c
qproduce(Queue *q, uchar *p, int len)
.
214c
if((q->state&Qmsg) || len == n)
.
208,209c
/* wakeup flow controlled writers (with a bit of histeresis) */
if(q->len+len >= q->limit && q->len < q->limit/2)
.
202c
if((q->state&Qmsg) || len == n)
.
186c
qconsume(Queue *q, uchar *p, int len)
.
145a
return 0; /* not reached */
.
133c
for(pow = Minpow; pow <= Maxpow; pow++)
.
105c
splx(x);
.
89c
first = 0;
.
83,87d
75c
} else
cl->hist <<= 1;
.
73c
cl->hist = ((cl->hist<<1) | 1) & 0xff;
if(cl->hist == 0xff && cl->goal > 8)
cl->goal--;
.
71a
/*
* if we've been ahead of the game for a while
* start giving blocks back to the general pool
*/
.
61c
splx(x);
.
56c
x = splhi();
.
51c
tsleep(&arena.r, return0, 0, 500);
.
34a
Rendez r;
.
33c
Chunkl alloc[Maxpow+1];
.
28c
int hist;
.
26c
int have;
.
22c
struct Chunkl
.
0a
#include "u.h"
.
## diffname port/stream.c 1993/0526 # deleted
## diff -e /n/fornaxdump/1993/0525/sys/src/brazil/port/stream.c /n/fornaxdump/1993/0526/sys/src/brazil/port/stream.c
1,401d
|