Plan 9 from Bell Labs’s /usr/web/sources/contrib/cinap_lenrek/old/linuxemu.old/buffd.c

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


#include <u.h>
#include <libc.h>
#include <ureg.h>
#include "linuxsys.h"
#include "linux.h"

typedef struct Buf Buf;
typedef struct Bufq Bufq;
typedef struct Buffd Buffd;

struct Buf
{
	Buf		*next;
	uchar	*bp;
	uchar	*ep;
	uchar	data[];
};

struct Bufq
{
	Buf		*head;
	Buf		**tail;
};

struct Buffd
{
	QLock	lock;

	int		fd;
	int		pid;
	int		shutdown;
	int		eof;
	int		error;

	Bufq	rq;

	Rendez	queuewait;
	Rendez	readwait;
	Rendez	killwait;
};

enum
{
	MAXREADSIZE = 1024*8,
	MAXQUEUESIZE = 1024*64,
};

static int
queuesize(Bufq *q)
{
	Buf *b;
	int n;

	n = 0;
	for(b=q->head; b; b=b->next)
		n += (b->ep - b->bp);
	return n;
}

static void
flushqueue(Bufq *q)
{
	Buf *b, *x;

	for(b = q->head; b; b = x){
		x = b->next;
		free(b);
	}
	q->head = nil;
	q->tail = &q->head;
}


static void
killbuffd(Buffd *bfd)
{
	if(bfd->fd != -1){
		bfd->fd = -1;
		if(bfd->pid != -1){
			int pid;

			pid = bfd->pid;
			rwakeup(&bfd->queuewait);
			rwakeup(&bfd->readwait);
			if(!postnote(PNPROC, pid, "rocktimer"))
				rsleep(&bfd->killwait);
		}
	}
}

static void 
destroybuffdtag(void *tag)
{
	Buffd *bfd;

	bfd = *fdtagp(tag);
	*fdtagp(tag) = nil;
	qlock(&bfd->lock);
	killbuffd(bfd);
	flushqueue(&bfd->rq);
	qunlock(&bfd->lock);
	free(bfd);
}

static void
forkbuffdtag(void *tag)
{
	Buffd *bfd;

	bfd = *fdtagp(tag);
	*fdtagp(tag) = nil;
	flushqueue(&bfd->rq);
	free(bfd);
	unlinkfdtag(tag);
}

static int
readprocnote(void *, char *msg)
{
	if(threadp->pid != 0)
		return 0;
	if(strcmp(msg, "rocktimer")==0)
		return 1;
	return 0;
}

static void
readproc(void *aux)
{
	Buffd *bfd;
	Buf *b;
	int n;
	int z;
	int	fd;

	bfd = aux;

	z = 0;
	b = nil;

	qlock(&bfd->lock);
	fd = bfd->fd;
	if(bfd->fd < 0)
		goto die;
	qunlock(&bfd->lock);

	for(;;){
		if(b==nil){
			b = malloc(sizeof(*b) + MAXREADSIZE);
			if(b == nil){
				qlock(&bfd->lock);
				bfd->error = -ENOMEM;
				goto die;
			}
		}

		n = read(fd, b->data, MAXREADSIZE);

		qlock(&bfd->lock);
		if(bfd->fd < 0)
			goto die;

		if(n == 0){
			if(++z <= 3){
				qunlock(&bfd->lock);
				continue;
			}
			bfd->eof = 1;
			epollevent(fd, POLLIN, 0);
			goto die;
		}

		z = 0;
		if(n < 0){
			int e;
			switch(e = mkerror()){
			case -ESHUTDOWN:
				bfd->eof = 1;
				break;
			default:
				bfd->error  = e;
			}
			epollevent(fd, POLLIN, 0);
			goto die;
		}

		if(realloc(b, sizeof(*b) + n) == nil){
			bfd->error = -ENOMEM;
			goto die;
		}

		b->ep = b->bp = b->data;
		b->ep += n;

		b->next = nil;
		*bfd->rq.tail = b;
		bfd->rq.tail = &b->next;
		b = nil;

		epollevent(fd, POLLIN, 0);
		rwakeup(&bfd->readwait);

		if(queuesize(&bfd->rq) > MAXQUEUESIZE)
			rsleep(&bfd->queuewait);

		if(bfd->fd < 0)
			goto die;
		qunlock(&bfd->lock);
	}

die:
	bfd->pid = -1;
	rwakeup(&bfd->killwait);
	rwakeup(&bfd->readwait);
	qunlock(&bfd->lock);

	if(b)
		free(b);
}

static void
buffdproc(void *aux)
{
	Buffd *bfd;

	bfd = aux;
	threadp->pid = 0;
	atnotify(readprocnote, 1);
	readproc(bfd);
}

void
buffd(int fd)
{
	void *tag;
	Buffd *bfd;

	if((tag = openfdtag(fd, TAG_BUFFD, 1))==nil)
		return;
	if(*fdtagp(tag) != nil){
		closefdtag(tag);
		return;
	}

	bfd = malloc(sizeof(*bfd));
	memset(bfd, 0, sizeof(*bfd));
	qlock(&bfd->lock);

	bfd->fd = fd;
	bfd->pid = -1;
	bfd->shutdown = 0;
	bfd->eof = 0;
	bfd->error = 0;
	bfd->rq.head = nil;
	bfd->rq.tail = &bfd->rq.head;
	bfd->queuewait.l = &bfd->lock;
	bfd->readwait.l = &bfd->lock;
	bfd->killwait.l = &bfd->lock;
	bfd->pid = 
		createxproc(buffdproc, bfd, RFMEM|RFPROC, 16 * 1024);
	*fdtagp(tag) = bfd;
	atdestroyfdtag(tag, destroybuffdtag);
	atforkfdtag(tag, forkbuffdtag);
	qunlock(&bfd->lock);
	closefdtag(tag);
}

void
buffdpoll(int fd)
{
	void *tag;
	Buffd *bfd;
	ulong e;

	if((tag = openfdtag(fd, TAG_BUFFD, 0))==nil)
		return;
	if((bfd = *fdtagp(tag)) == nil)
		return;
	qlock(&bfd->lock);
	closefdtag(tag);
	e = 0;
	if(!((bfd->shutdown&(1<<SHUT_WR)) || (bfd->shutdown&(1<<SHUT_RDWR))))
		e |= POLLOUT;
	if(bfd->error || bfd->eof || bfd->rq.head)
		e |= POLLIN;
	if(bfd->error)
		e |= POLLRDHUP;
	epollevent(fd, e, 0);	
	qunlock(&bfd->lock);
}


int
buffdionread(int fd)
{
	void *tag;
	int n;
	Buffd *bfd;

	tag = openfdtag(fd, TAG_BUFFD, 0);
	if(tag == nil)
		return -1;
	bfd = (Buffd*)*fdtagp(tag);
	qlock(&bfd->lock);
	closefdtag(tag);
	if((bfd->shutdown&(1<<SHUT_RDWR)) || (bfd->shutdown&(1<<SHUT_RD))){
		n = 0;
	} else {
		n = queuesize(&bfd->rq);
	}
	qunlock(&bfd->lock);
	return n;
}

void
shutdownbuffd(int fd, int how)
{
	void *tag;
	Buffd *bfd;

	tag = openfdtag(fd, TAG_BUFFD, 0);
	if(tag == nil)
		return;
	bfd = (Buffd*)*fdtagp(tag);
	qlock(&bfd->lock);
	closefdtag(tag);

	bfd->shutdown = (1<<how);
	if(how==SHUT_RD || how==SHUT_RDWR)
		flushqueue(&bfd->rq);
	qunlock(&bfd->lock);
}

int
peekbuffd(int fd, void *data, int len, int noblock)
{
	int t;
	void *tag;
	Buffd *bfd;
	Buf *b;

	if((tag = openfdtag(fd, TAG_BUFFD, 0))==nil)
		return -EBADF;
	if((bfd = *fdtagp(tag)) == nil)
		return -EBADF;
	t = 0;
	qlock(&bfd->lock);
	closefdtag(tag);
	if((bfd->shutdown&(1<<SHUT_RD)) || (bfd->shutdown&(1<<SHUT_RDWR))){
		t = -EBADF;
		goto out;
	}
	for(;;){
		for(b=bfd->rq.head; (t < len) && b; b=b->next){
			int x;
			x = b->ep - b->bp;
			if(x > len-t)
				x = len-t;
			memcpy(((uchar*)data) + t, b->bp, x);
			t += x;
		}
		if(t > 0){
			break;
		}
		if(bfd->eof){
			t = 0;
			break;
		}
		if(bfd->error || (bfd->fd < 0)){
			t = bfd->error ? bfd->error : -1;
			break;
		}
		if(noblock){
			t = -EAGAIN;
			break;
		}
		rsleep(&bfd->readwait);
	}
out:
	qunlock(&bfd->lock);
	return t;
}

int
readbuffd(int fd, void *data, int len, int noblock)
{
	int t;
	void *tag;
	Buffd *bfd;
	Buf *b;

	if((tag = openfdtag(fd, TAG_BUFFD, 0))==nil)
		return -EBADF;
	if((bfd = *fdtagp(tag)) == nil)
		return -EBADF;
	t = 0;
	qlock(&bfd->lock);
	closefdtag(tag);
	if((bfd->shutdown&(1<<SHUT_RD)) || (bfd->shutdown&(1<<SHUT_RDWR))){
		t = -EBADF;
		goto out;
	}
	for(;;){
		while((b=bfd->rq.head) && (t < len)){
			int x;
			x = b->ep - b->bp;
			if(x > len-t)
				x = len-t;
			memcpy(((uchar*)data) + t, b->bp, x);
			t		+= x;
			b->bp	+= x;
			if(b->bp == b->ep){
				if((bfd->rq.head = b->next) == nil){
					bfd->rq.tail = &bfd->rq.head;
				}
				free(b);
			}
		}
		if(t > 0){
			rwakeup(&bfd->queuewait);
			break;
		}
		if(bfd->eof){
			t = 0;
			break;
		}
		if(bfd->error || (bfd->fd < 0)){
			t = bfd->error ? bfd->error : -1;
			break;
		}
		if(noblock){
			t = -EAGAIN;
			break;
		}
		rsleep(&bfd->readwait);
	}
	if(bfd->rq.head == nil)
		epollevent(fd, 0, POLLIN);
out:
	qunlock(&bfd->lock);
	return t;
}

int
writebuffd(int fd, void *data, int len, int noblock)
{
	int ret;
	Buffd *bfd;
	void *tag;

	USED(noblock);
	tag = openfdtag(fd, TAG_BUFFD, 0);
	if(tag == nil)
		return -EBADF;
	bfd = (Buffd*)*fdtagp(tag);
	qlock(&bfd->lock);
	closefdtag(tag);
	if((bfd->shutdown&(1<<SHUT_WR)) || (bfd->shutdown&(1<<SHUT_RDWR))){
		ret = -EBADF;
		goto out;
	}
	ret = write(fd, data, len);
	if(ret != len){
		epollevent(fd, POLLERR|POLLHUP, 0);
		ret = mkerror();
	}
out:
	qunlock(&bfd->lock);
	return ret;
}

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to [email protected].