/* network i/o */
#include "all.h"
#include "io.h"
#include <fcall.h> /* 9p2000 */
#include <thread.h>
enum {
Maxfdata = 8192,
Nqueue = 200, /* queue size (tunable) */
Netclosed = 0, /* Connection state */
Netopen,
};
/*
* the kernel file server read packets directly from
* its ethernet(s) and did all the protocol processing.
* if the incoming packets were 9p (over il/ip), they
* were queued for the server processes to operate upon.
*
* in user mode, we have one process per incoming connection
* instead, and those processes get just the data, minus
* tcp and ip headers, so they just see a stream of 9p messages,
* which they then queue for the server processes.
*
* there used to be more queueing (in the kernel), with separate
* processes for ethernet input, il input, 9p processing, il output
* and ethernet output, and queues connecting them. we now let
* the kernel's network queues, protocol stacks and processes do
* much of this work.
*
* partly as a result of this, we can now process 9p messages
* transported via tcp, exploit multiple x86 processors, and
* were able to shed 70% of the file server's source, by line count.
*
* the upshot is that Ether (now Network) is no longer a perfect fit for
* the way network i/o is done now. the notion of `connection'
* is being introduced to complement it.
*/
typedef struct Network Network;
typedef struct Netconn Netconn;
typedef struct Conn9p Conn9p;
/* a network, not necessarily an ethernet */
struct Network {
int ctlrno;
char iname[NAMELEN];
char oname[NAMELEN];
char *dialstr;
char anndir[40];
char lisdir[40];
int annfd; /* fd from announce */
};
/* an open tcp (or other transport) connection */
struct Netconn {
Queue* reply; /* network output */
char* raddr; /* remote caller's addr */
Chan* chan; /* list of tcp channels */
int alloc; /* flag: allocated */
int state;
Conn9p* conn9p; /* not reference-counted */
Lock;
};
/*
* incoming 9P network connection from a given machine.
* typically will multiplex 9P sessions for multiple users.
*/
struct Conn9p {
QLock;
Ref;
int fd;
char* dir;
Netconn*netconn; /* cross-connection */
char* raddr;
};
static Network netif[Maxnets];
static struct {
Lock;
Chan* chan;
} netchans;
static Queue *netoq; /* only one network output queue is needed */
char *annstrs[Maxnets] = {
"tcp!*!9fs",
};
/* never returns nil */
static Chan*
getchan(Conn9p *conn9p)
{
Netconn *netconn;
Chan *cp, *xcp;
lock(&netchans);
/* look for conn9p's Chan */
xcp = nil;
for(cp = netchans.chan; cp; cp = netconn->chan) {
netconn = cp->pdata;
if(!netconn->alloc)
xcp = cp; /* remember free Chan */
else if(netconn->raddr != nil &&
strcmp(conn9p->raddr, netconn->raddr) == 0) {
unlock(&netchans);
return cp; /* found conn9p's Chan */
}
}
/* conn9p's Chan not found; if no free Chan, allocate & fill in one */
cp = xcp;
if(cp == nil) {
cp = fs_chaninit(Devnet, 1, sizeof(Netconn));
netconn = cp->pdata;
netconn->chan = netchans.chan;
netconn->state = Netopen; /* a guess */
/* cross-connect netconn and conn9p */
netconn->conn9p = conn9p; /* not reference-counted */
conn9p->netconn = netconn;
netchans.chan = cp;
}
/* fill in Chan's netconn */
netconn = cp->pdata;
netconn->raddr = strdup(conn9p->raddr);
/* fill in Chan */
cp->send = serveq;
if (cp->reply == nil)
cp->reply = netoq;
netconn->reply = netoq;
cp->protocol = nil;
cp->msize = 0;
cp->whotime = 0;
strncpy(cp->whochan, conn9p->raddr, sizeof cp->whochan);
// cp->whoprint = tcpwhoprint;
netconn->alloc = 1;
unlock(&netchans);
return cp;
}
static char *
fd2name(int fd)
{
char data[128];
if (fd2path(fd, data, sizeof data) < 0)
return strdup("/GOK");
return strdup(data);
}
static void
hangupdfd(int dfd)
{
int ctlfd;
char *end, *data;
data = fd2name(dfd);
close(dfd);
end = strstr(data, "/data");
if (end != nil)
strcpy(end, "/ctl");
ctlfd = open(data, OWRITE);
if (ctlfd >= 0) {
hangup(ctlfd);
close(ctlfd);
}
free(data);
}
void
closechan(int n)
{
Chan *cp;
for(cp = chans; cp; cp = cp->next)
if(cp->whotime != 0 && cp->chan == n)
fileinit(cp);
}
void
nethangup(Chan *cp, char *msg, int dolock)
{
Netconn *netconn;
netconn = cp->pdata;
netconn->state = Netclosed;
if(msg != nil)
print("hangup! %s %s\n", msg, netconn->raddr);
fileinit(cp);
cp->whotime = 0;
strcpy(cp->whoname, "<none>");
if(dolock)
lock(&netchans);
netconn->alloc = 0;
free(netconn->raddr);
netconn->raddr = nil;
if(dolock)
unlock(&netchans);
}
void
chanhangup(Chan *cp, char *msg, int dolock)
{
Netconn *netconn = cp->pdata;
Conn9p *conn9p = netconn->conn9p;
if (conn9p->fd > 0)
hangupdfd(conn9p->fd); /* drop it */
nethangup(cp, msg, dolock);
}
/*
* returns length of next 9p message (including the length) and
* leaves it in the first few bytes of abuf.
*/
static long
size9pmsg(int fd, void *abuf, uint n)
{
int m;
uchar *buf = abuf;
if (n < BIT32SZ)
return -1; /* caller screwed up */
/* read count */
m = readn(fd, buf, BIT32SZ);
if(m != BIT32SZ){
if(m < 0)
return -1;
return 0;
}
return GBIT32(buf);
}
static int
readalloc9pmsg(int fd, Msgbuf **mbp)
{
int m, len;
uchar lenbuf[BIT32SZ];
Msgbuf *mb;
*mbp = nil;
len = size9pmsg(fd, lenbuf, BIT32SZ);
if (len <= 0)
return len;
if(len <= BIT32SZ || len > IOHDRSZ+Maxfdata){
werrstr("bad length in 9P2000 message header");
return -1;
}
if ((mb = mballoc(len, nil, Mbeth1)) == nil)
panic("readalloc9pmsg: mballoc failed");
*mbp = mb;
memmove(mb->data, lenbuf, BIT32SZ);
len -= BIT32SZ;
m = readn(fd, mb->data+BIT32SZ, len);
if(m < len)
return 0;
return BIT32SZ+m;
}
static void
connection(void *v)
{
int n;
char buf[64];
Chan *chan9p;
Conn9p *conn9p = v;
Msgbuf *mb;
NetConnInfo *nci;
incref(conn9p); /* count connections */
nci = getnetconninfo(conn9p->dir, conn9p->fd);
if (nci == nil)
panic("connection: getnetconninfo(%s, %d) failed",
conn9p->dir, conn9p->fd);
conn9p->raddr = nci->raddr;
chan9p = getchan(conn9p);
print("new connection on %s pid %d from %s\n",
conn9p->dir, getpid(), conn9p->raddr);
/*
* reading from a pipe or a network device
* will give an error after a few eof reads.
* however, we cannot tell the difference
* between a zero-length read and an interrupt
* on the processes writing to us,
* so we wait for the error.
*/
while (conn9p->fd > 0 && (n = readalloc9pmsg(conn9p->fd, &mb)) >= 0) {
if(n == 0)
continue;
mb->param = (uintptr)conn9p; /* has fd for replies */
mb->chan = chan9p;
assert(mb->magic == Mbmagic);
incref(conn9p); /* & count packets in flight */
fs_send(serveq, mb); /* to 9P server processes */
/* mb will be freed by receiving process */
}
rerrstr(buf, sizeof buf);
qlock(conn9p);
print("connection hung up from %s\n", conn9p->dir);
if (conn9p->fd > 0) /* not poisoned yet? */
hangupdfd(conn9p->fd); /* poison the fd */
nethangup(chan9p, "remote hung up", 1);
closechan(chan9p->chan);
conn9p->fd = -1; /* poison conn9p */
if (decref(conn9p) == 0) { /* last conn.? turn the lights off */
free(conn9p->dir);
qunlock(conn9p);
free(conn9p);
} else
qunlock(conn9p);
freenetconninfo(nci);
if(buf[0] == '\0' || strstr(buf, "hungup") != nil)
exits("");
sysfatal("mount read, pid %d", getpid());
}
static void
neti(void *v)
{
int lisfd, accfd;
Network *net;
Conn9p *conn9p;
net = v;
print("net%di\n", net->ctlrno);
for(;;) {
lisfd = listen(net->anndir, net->lisdir);
if (lisfd < 0) {
print("listen %s failed: %r\n", net->anndir);
continue;
}
/* got new call on lisfd */
accfd = accept(lisfd, net->lisdir);
if (accfd < 0) {
print("accept %d (from %s) failed: %r\n",
lisfd, net->lisdir);
continue;
}
/* accepted that call */
conn9p = malloc(sizeof *conn9p);
conn9p->dir = strdup(net->lisdir);
conn9p->fd = accfd;
newproc(connection, conn9p, smprint("9P read %s", conn9p->dir));
close(lisfd);
}
}
/* only need one of these for all network connections, thus all interfaces */
static void
neto(void *)
{
int len, datafd;
Msgbuf *mb;
Conn9p *conn9p;
print("neto\n");
for(;;) {
/* receive 9P answer from 9P server processes */
while((mb = fs_recv(netoq, 0)) == nil)
continue;
if(mb->data == nil) {
print("neto: pkt nil cat=%d free=%d\n",
mb->category, mb->flags&FREE);
if(!(mb->flags & FREE))
mbfree(mb);
continue;
}
/* send answer back over the network connection in the reply */
len = mb->count;
conn9p = (Conn9p *)mb->param;
assert(conn9p);
qlock(conn9p);
datafd = conn9p->fd;
assert(len >= 0);
/* datafd < 0 probably indicates poisoning by the read side */
if (datafd < 0 || write(datafd, mb->data, len) != len) {
print( "network write error (%r);");
print(" closing connection for %s\n", conn9p->dir);
nethangup(getchan(conn9p), "network write error", 1);
if (datafd > 0)
hangupdfd(datafd); /* drop it */
conn9p->fd = -1; /* poison conn9p */
}
mbfree(mb);
if (decref(conn9p) == 0)
panic("neto: zero ref count");
qunlock(conn9p);
}
}
void
netstart(void)
{
int netorun = 0;
Network *net;
if(netoq == nil)
netoq = newqueue(Nqueue, "network reply");
for(net = &netif[0]; net < &netif[Maxnets]; net++){
if(net->dialstr == nil)
continue;
sprint(net->oname, "neto");
if (netorun++ == 0)
newproc(neto, nil, net->oname);
sprint(net->iname, "net%di", net->ctlrno);
newproc(neti, net, net->iname);
}
}
void
netinit(void)
{
Network *net;
for (net = netif; net < netif + Maxnets; net++) {
net->dialstr = annstrs[net - netif];
if (net->dialstr == nil)
continue;
net->annfd = announce(net->dialstr, net->anndir);
/* /bin/service/tcp564 may already have grabbed the port */
if (net->annfd < 0)
sysfatal("can't announce %s: %r", net->dialstr);
print("netinit: announced on %s\n", net->dialstr);
}
}
|