Plan 9 from Bell Labs’s /usr/web/sources/plan9/sys/src/cmd/cwfs/net.c

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


/* network i/o */

#include "all.h"
#include "io.h"
#include <fcall.h>		/* 9p2000 */
#include <thread.h>

enum {
	Maxfdata	= 8192,
	Nqueue		= 200,		/* queue size (tunable) */

	Netclosed	= 0,		/* Connection state */
	Netopen,
};

/*
 * the kernel file server read packets directly from
 * its ethernet(s) and did all the protocol processing.
 * if the incoming packets were 9p (over il/ip), they
 * were queued for the server processes to operate upon.
 *
 * in user mode, we have one process per incoming connection
 * instead, and those processes get just the data, minus
 * tcp and ip headers, so they just see a stream of 9p messages,
 * which they then queue for the server processes.
 *
 * there used to be more queueing (in the kernel), with separate
 * processes for ethernet input, il input, 9p processing, il output
 * and ethernet output, and queues connecting them.  we now let
 * the kernel's network queues, protocol stacks and processes do
 * much of this work.
 *
 * partly as a result of this, we can now process 9p messages
 * transported via tcp, exploit multiple x86 processors, and
 * were able to shed 70% of the file server's source, by line count.
 *
 * the upshot is that Ether (now Network) is no longer a perfect fit for
 * the way network i/o is done now.  the notion of `connection'
 * is being introduced to complement it.
 */

typedef struct Network Network;
typedef struct Netconn Netconn;
typedef struct Conn9p Conn9p;

/* a network, not necessarily an ethernet */
struct Network {
	int	ctlrno;
	char	iname[NAMELEN];
	char	oname[NAMELEN];

	char	*dialstr;
	char	anndir[40];
	char	lisdir[40];
	int	annfd;			/* fd from announce */
};

/* an open tcp (or other transport) connection */
struct Netconn {
	Queue*	reply;		/* network output */
	char*	raddr;		/* remote caller's addr */
	Chan*	chan;		/* list of tcp channels */

	int	alloc;		/* flag: allocated */

	int	state;
	Conn9p*	conn9p;		/* not reference-counted */

	Lock;
};

/*
 * incoming 9P network connection from a given machine.
 * typically will multiplex 9P sessions for multiple users.
 */
struct Conn9p {
	QLock;
	Ref;
	int	fd;
	char*	dir;
	Netconn*netconn;		/* cross-connection */
	char*	raddr;
};

static Network netif[Maxnets];
static struct {
	Lock;
	Chan*	chan;
} netchans;
static Queue *netoq;		/* only one network output queue is needed */

char *annstrs[Maxnets] = {
	"tcp!*!9fs",
};

/* never returns nil */
static Chan*
getchan(Conn9p *conn9p)
{
	Netconn *netconn;
	Chan *cp, *xcp;

	lock(&netchans);

	/* look for conn9p's Chan */
	xcp = nil;
	for(cp = netchans.chan; cp; cp = netconn->chan) {
		netconn = cp->pdata;
		if(!netconn->alloc)
			xcp = cp;		/* remember free Chan */
		else if(netconn->raddr != nil &&
		    strcmp(conn9p->raddr, netconn->raddr) == 0) {
			unlock(&netchans);
			return cp;		/* found conn9p's Chan */
		}
	}

	/* conn9p's Chan not found; if no free Chan, allocate & fill in one */
	cp = xcp;
	if(cp == nil) {
		cp = fs_chaninit(Devnet, 1, sizeof(Netconn));
		netconn = cp->pdata;
		netconn->chan = netchans.chan;
		netconn->state = Netopen;	/* a guess */
		/* cross-connect netconn and conn9p */
		netconn->conn9p = conn9p;	/* not reference-counted */
		conn9p->netconn = netconn;
		netchans.chan = cp;
	}

	/* fill in Chan's netconn */
	netconn = cp->pdata;
	netconn->raddr = strdup(conn9p->raddr);

	/* fill in Chan */
	cp->send = serveq;
	if (cp->reply == nil)
		cp->reply = netoq;
	netconn->reply = netoq;
	cp->protocol = nil;
	cp->msize = 0;
	cp->whotime = 0;
	strncpy(cp->whochan, conn9p->raddr, sizeof cp->whochan);
//	cp->whoprint = tcpwhoprint;
	netconn->alloc = 1;

	unlock(&netchans);
	return cp;
}

static char *
fd2name(int fd)
{
	char data[128];

	if (fd2path(fd, data, sizeof data) < 0)
		return strdup("/GOK");
	return strdup(data);
}

static void
hangupdfd(int dfd)
{
	int ctlfd;
	char *end, *data;

	data = fd2name(dfd);
	close(dfd);

	end = strstr(data, "/data");
	if (end != nil)
		strcpy(end, "/ctl");
	ctlfd = open(data, OWRITE);
	if (ctlfd >= 0) {
		hangup(ctlfd);
		close(ctlfd);
	}
	free(data);
}

void
closechan(int n)
{
	Chan *cp;

	for(cp = chans; cp; cp = cp->next)
		if(cp->whotime != 0 && cp->chan == n)
			fileinit(cp);
}

void
nethangup(Chan *cp, char *msg, int dolock)
{
	Netconn *netconn;

	netconn = cp->pdata;
	netconn->state = Netclosed;

	if(msg != nil)
		print("hangup! %s %s\n", msg, netconn->raddr);

	fileinit(cp);
	cp->whotime = 0;
	strcpy(cp->whoname, "<none>");

	if(dolock)
		lock(&netchans);
	netconn->alloc = 0;
	free(netconn->raddr);
	netconn->raddr = nil;
	if(dolock)
		unlock(&netchans);
}

void
chanhangup(Chan *cp, char *msg, int dolock)
{
	Netconn *netconn = cp->pdata;
	Conn9p *conn9p = netconn->conn9p;

	if (conn9p->fd > 0)
		hangupdfd(conn9p->fd);	/* drop it */
	nethangup(cp, msg, dolock);
}

/*
 * returns length of next 9p message (including the length) and
 * leaves it in the first few bytes of abuf.
 */
static long
size9pmsg(int fd, void *abuf, uint n)
{
	int m;
	uchar *buf = abuf;

	if (n < BIT32SZ)
		return -1;	/* caller screwed up */

	/* read count */
	m = readn(fd, buf, BIT32SZ);
	if(m != BIT32SZ){
		if(m < 0)
			return -1;
		return 0;
	}
	return GBIT32(buf);
}

static int
readalloc9pmsg(int fd, Msgbuf **mbp)
{
	int m, len;
	uchar lenbuf[BIT32SZ];
	Msgbuf *mb;

	*mbp = nil;
	len = size9pmsg(fd, lenbuf, BIT32SZ);
	if (len <= 0)
		return len;
	if(len <= BIT32SZ || len > IOHDRSZ+Maxfdata){
		werrstr("bad length in 9P2000 message header");
		return -1;
	}
	if ((mb = mballoc(len, nil, Mbeth1)) == nil)
		panic("readalloc9pmsg: mballoc failed");
	*mbp = mb;
	memmove(mb->data, lenbuf, BIT32SZ);
	len -= BIT32SZ;
	m = readn(fd, mb->data+BIT32SZ, len);
	if(m < len)
		return 0;
	return BIT32SZ+m;
}

static void
connection(void *v)
{
	int n;
	char buf[64];
	Chan *chan9p;
	Conn9p *conn9p = v;
	Msgbuf *mb;
	NetConnInfo *nci;

	incref(conn9p);			/* count connections */
	nci = getnetconninfo(conn9p->dir, conn9p->fd);
	if (nci == nil)
		panic("connection: getnetconninfo(%s, %d) failed",
			conn9p->dir, conn9p->fd);
	conn9p->raddr = nci->raddr;

	chan9p = getchan(conn9p);
	print("new connection on %s pid %d from %s\n",
		conn9p->dir, getpid(), conn9p->raddr);

	/*
	 * reading from a pipe or a network device
	 * will give an error after a few eof reads.
	 * however, we cannot tell the difference
	 * between a zero-length read and an interrupt
	 * on the processes writing to us,
	 * so we wait for the error.
	 */
	while (conn9p->fd > 0 && (n = readalloc9pmsg(conn9p->fd, &mb)) >= 0) {
		if(n == 0)
			continue;
		mb->param = (uintptr)conn9p;	/* has fd for replies */
		mb->chan = chan9p;

		assert(mb->magic == Mbmagic);
		incref(conn9p);			/* & count packets in flight */
		fs_send(serveq, mb);		/* to 9P server processes */
		/* mb will be freed by receiving process */
	}

	rerrstr(buf, sizeof buf);

	qlock(conn9p);
	print("connection hung up from %s\n", conn9p->dir);
	if (conn9p->fd > 0)		/* not poisoned yet? */
		hangupdfd(conn9p->fd);	/* poison the fd */

	nethangup(chan9p, "remote hung up", 1);
	closechan(chan9p->chan);

	conn9p->fd = -1;		/* poison conn9p */
	if (decref(conn9p) == 0) {	/* last conn.? turn the lights off */
		free(conn9p->dir);
		qunlock(conn9p);
		free(conn9p);
	} else
		qunlock(conn9p);

	freenetconninfo(nci);

	if(buf[0] == '\0' || strstr(buf, "hungup") != nil)
		exits("");
	sysfatal("mount read, pid %d", getpid());
}

static void
neti(void *v)
{
	int lisfd, accfd;
	Network *net;
	Conn9p *conn9p;

	net = v;
	print("net%di\n", net->ctlrno);
	for(;;) {
		lisfd = listen(net->anndir, net->lisdir);
		if (lisfd < 0) {
			print("listen %s failed: %r\n", net->anndir);
			continue;
		}

		/* got new call on lisfd */
		accfd = accept(lisfd, net->lisdir);
		if (accfd < 0) {
			print("accept %d (from %s) failed: %r\n",
				lisfd, net->lisdir);
			continue;
		}

		/* accepted that call */
		conn9p = malloc(sizeof *conn9p);
		conn9p->dir = strdup(net->lisdir);
		conn9p->fd = accfd;
		newproc(connection, conn9p, smprint("9P read %s", conn9p->dir));
		close(lisfd);
	}
}

/* only need one of these for all network connections, thus all interfaces */
static void
neto(void *)
{
	int len, datafd;
	Msgbuf *mb;
	Conn9p *conn9p;

	print("neto\n");
	for(;;) {
		/* receive 9P answer from 9P server processes */
		while((mb = fs_recv(netoq, 0)) == nil)
			continue;

		if(mb->data == nil) {
			print("neto: pkt nil cat=%d free=%d\n",
				mb->category, mb->flags&FREE);
			if(!(mb->flags & FREE))
				mbfree(mb);
			continue;
		}

		/* send answer back over the network connection in the reply */
		len = mb->count;
		conn9p = (Conn9p *)mb->param;
		assert(conn9p);

		qlock(conn9p);
		datafd = conn9p->fd;
		assert(len >= 0);
		/* datafd < 0 probably indicates poisoning by the read side */
		if (datafd < 0 || write(datafd, mb->data, len) != len) {
			print( "network write error (%r);");
			print(" closing connection for %s\n", conn9p->dir);
			nethangup(getchan(conn9p), "network write error", 1);
			if (datafd > 0)
				hangupdfd(datafd);	/* drop it */
			conn9p->fd = -1;		/* poison conn9p */
		}
		mbfree(mb);
		if (decref(conn9p) == 0)
			panic("neto: zero ref count");
		qunlock(conn9p);
	}
}

void
netstart(void)
{
	int netorun = 0;
	Network *net;

	if(netoq == nil)
		netoq = newqueue(Nqueue, "network reply");
	for(net = &netif[0]; net < &netif[Maxnets]; net++){
		if(net->dialstr == nil)
			continue;
		sprint(net->oname, "neto");
		if (netorun++ == 0)
			newproc(neto, nil, net->oname);
		sprint(net->iname, "net%di", net->ctlrno);
		newproc(neti, net, net->iname);
	}
}

void
netinit(void)
{
	Network *net;

	for (net = netif; net < netif + Maxnets; net++) {
		net->dialstr = annstrs[net - netif];
		if (net->dialstr == nil)
			continue;
		net->annfd = announce(net->dialstr, net->anndir);
		/* /bin/service/tcp564 may already have grabbed the port */
		if (net->annfd < 0)
			sysfatal("can't announce %s: %r", net->dialstr);
		print("netinit: announced on %s\n", net->dialstr);
	}
}

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to [email protected].