/*
* Client for 9P file servers.
*
* Services calls that operate on a remote file tree.
*
* Requests are simply forwarded to the fs for processing.
* The fs should NOT receive version/auth/attach requests. Those are
* handled by the fs itself while being created.
*
* Attach, and recovery, are performed by the reader proc,
* because this operation might block.
*
* This does NOT rewrite fids, and does NOT fix up qids. That's the
* work of volfs. fs only services the 9P RPC
*
* From outside, a dead fs can be seen because it has a -1 fd and
* always responds with Eio.
*
* BUG: Once created, a fs stays around. If there's an I/O error, the file
* descriptor is closed, but the processes are kept running.
*/
#include <u.h>
#include <libc.h>
#include <fcall.h>
#include <thread.h>
#include <auth.h>
#include <ip.h>
#include "names.h"
#include "vols.h"
enum {
Request = 0,
Reply = 1,
Tick = 2,
};
static QLock fslck;
static Fs** fss;
static int nfss;
static uvlong qidgen = 1;
static Lock qidgenlck;
uvlong Mvolqid = 0x00000b4900000000ULL; // B49
char Eopen[] = "Fid already open";
char Enotopen[] = "Fid not open";
char Eio[] = "io error";
char Eperm[] = "permission denied";
char Enotexist[]="file does not exist";
char Eauth[] = "authentication not required";
char Euser[] = "permission denied for that user";
char Espec[] = "bad volume specifier";
char Edupfid[]="fid already in use";
char Ebadfid[]="bad fid";
static ulong ndead, ncame, ntmout;
static ulong nsrvrpcs[Tmax];
static ulong nclirpcs[Tmax];
static char* names[Tmax] = {
[Tattach] "attach",
[Twalk] "walk",
[Topen] "open",
[Tclunk] "clunk",
[Tremove] "remove",
[Tread] "read",
[Twrite] "write",
[Tstat] "stat",
[Twstat] "wstat",
[Tflush] "flush",
};
void
dumpfss(void)
{
int i;
fprint(2, "fss:\n");
qlock(&fslck);
for (i = 0; i < nfss; i++){
fprint(2, " %p %s '%s' t=%ld ref=%ld fd=%d pid=%d e=%ld %ldms\n",
fss[i], fss[i]->addr, fss[i]->spec,fss[i]->time,
fss[i]->ref, fss[i]->fd, fss[i]->pid, fss[i]->epoch,
fss[i]->lat);
if (fss[i]->fid)
fprint(2, "\tQ=%llx %X\n", fss[i]->qid.path, fss[i]->fid);
}
qunlock(&fslck);
}
void
fsstats(void)
{
int i;
ulong tsrv, tcli;
fprint(2, "fs stats:\n");
fprint(2, "%ld deaths %ld recovers %ld tmouts\n", ndead, ncame, ntmout);
tsrv = tcli = 0;
for (i = 0; i < Tmax; i++){
tsrv += nsrvrpcs[i];
tcli += nsrvrpcs[i];
if (names[i])
fprint(2, "%s:\t%5ld cli\t%5ld srv\n",
names[i], nclirpcs[i], nsrvrpcs[i]);
}
fprint(2,"total:\t%5ld cli\t%5ld srv\n", tcli, tsrv);
}
/* All calls to particular file systems must come through this.
* This also dispatches to the empty volume and to the ctl fs.
*/
int
fsop(Fs* fs, Frpc* op)
{
ulong r;
if (fs == nil){
// Fake, empty Fs used to present an empty dir
// when a vol has a dead fs.
return nilfsops[op->f.type](nil, op);
}
if (fs == &ctlfs){
// fake fs to service the ctl file
return ctlfsops[op->f.type](nil, op);
}
sendp(fs->reqc, op);
r = recvul(op->sc);
if (r == ~0)
r = -1;
return r;
}
void
fstimer(ulong now)
{
int i;
Fs* fs;
qlock(&fslck);
for (i = 0; i < nfss; i++){
fs = fss[i];
nbsendul(fs->tmrc, now);
}
qunlock(&fslck);
}
int
putfcall(int fd, Frpc* m)
{
int n;
nsrvrpcs[m->f.type]++;
Dbgprint(m->fid, " <= %F\n", &m->f);
n = convS2M(&m->f, m->sbuf, sizeof(m->sbuf));
if (n > 0)
n = write(fd, m->sbuf, n);
else
n = -1;
return n;
}
int
putfrep(int fd, Frpc* m)
{
int n;
if (m->r.type == Rstat && m->d){
m->r.nstat = convD2M(m->d, m->buf, sizeof(m->buf));
m->r.stat = m->buf;
free(m->d);
m->d = nil;
}
dbgprint(m->fid, "-> %F\n", &m->r);
n = convS2M(&m->r, m->sbuf, sizeof(m->sbuf));
if (n > 0)
n = write(fd, m->sbuf, n);
return n;
}
int
getfcall(int fd, Frpc* m)
{
int n;
n = read9pmsg(fd, m->buf, sizeof(m->buf));
if (n <= 0)
return n;
if (convM2S(m->buf, n, &m->f) == 0)
return -1;
nclirpcs[m->f.type]++;
return n;
}
int
getfrep(int fd, Frpc* m)
{
int n;
char* p;
n = read9pmsg(fd, m->buf, sizeof(m->buf));
if (n <= 0)
return n;
if (convM2S(m->buf, n, &m->r) == 0)
return -1;
switch(m->r.type){
case Rstat:
free(m->d);
m->d = emalloc(sizeof(Dir)+128);
p = (char*)(m->d + 1);
convM2D(m->r.stat, m->r.nstat, m->d, p);
/* lib9p may insist that name is "/", which is not
* a valid file name
*/
if (m->d->name[0] == '/' && m->d->name[1] == 0)
m->d->name = "dev";
break;
case Rerror:
if (!strncmp(m->r.ename, "io ", 3))
n = -1;
break;
}
return n;
}
void
rpcerr(Frpc* fop, char* err)
{
fop->r.type = Rerror;
fop->r.tag = fop->f.tag;
fop->r.ename = fop->err;
strecpy(fop->err, fop->err+sizeof(fop->err), err);
}
static void
fsmuxrep(Fs* fs, Frpc* rep)
{
Frpc* fop;
Frpc** fl;
int e;
for (fl = &fs->rpcs; fop = *fl; fl = &fop->next)
if (fop->f.tag == rep->r.tag)
break;
if (fop == nil){
rpcfree(rep);
return; // no tag. request was aborted.
}
*fl = fop->next;
fop->next = nil;
if (rep->r.type == Rread)
memmove(fop->buf, rep->buf, rep->r.count);
fop->r = rep->r;
free(fop->d);
fop->d = rep->d;
rep->d = nil;
if (fop->rep)
rpcfree(fop->rep);
fop->rep = rep;
Dbgprint(fop->fid, " => %F\n", &rep->r);
e = 0;
if (fop->r.type == Rerror){
strecpy(fop->err, fop->err+sizeof(fop->err), fop->r.ename);
fop->r.ename = fop->err;
e = ~0;
}
sendul(fop->sc, e); // awake the thread doing the rpc.
}
static int
maytimeout(Frpc* fop)
{
/* This is here to experiment with removal of
* timeouts. Use with care. It may block the whole thing
* if a broken/faulty fs is not being timed out.
*
* As it is now, we timeout all requests but for those in
* volumes mounted with -T in spec that might block on
* devices like portfs: Tread.
*/
if (fop->flushed)
return 1;
if (fop->fid != nil && fop->fid->notimeout && fop->f.type == Tread)
if (fop->fid->qid.path != 0LL)
return (fop->fid->qid.type&QTDIR);
else
return 0;
return 1;
}
static void
fstmouts(Fs* fs)
{
Frpc* fop;
Frpc** fl;
int some;
some = 0;
for (fl = &fs->rpcs; fop = *fl; ){
if (maytimeout(fop) && fs->time - fop->time > Rpctmout){
dprint(2, "timeout req tag %d type=%d ot=%uld ft=%uld\n",
fop->f.tag, fop->f.type, fop->time, fs->time);
ntmout++;
*fl = fop->next;
fop->next = nil;
if (fop->flushed){
rpcerr(fop, "interrupt");
} else {
if (fs->time - fop->time > 3 * Rpctmout){
// don't abort the fs in this case.
// It's likely that RPCs were very
// old ones but the fs has recover.
dprint(2, "%s: preposterous PRC tag %d type=%d\n", fs->addr, fop->f.tag, fop->f.type);
} else
some++;
rpcerr(fop, Eio);
}
sendul(fop->sc, ~0);
} else
fl = &fop->next;
}
if (some){
close(fs->fd); // Shut this down to clean things up.
fs->fd = -1;
incref(&epoch);
}
}
static void
abortrpcs(Fs* fs)
{
Frpc* fop;
Frpc** fl;
for (fl = &fs->rpcs; fop = *fl; ){
*fl = fop->next;
fop->next = nil;
rpcerr(fop, Eio);
sendul(fop->sc, ~0);
}
}
static void
fsreadproc(void*a)
{
Fs* fs = a;
Frpc* rep;
int r;
vlong t0, t1;
long tmout;
threadsetname("fsread");
assert(fs->fd < 0);
fs->lat = Biglatency;
t0 = nsec();
if (!amountfs(fs))
vdprint(2, "failed to mount %s %s\n", fs->addr, fs->spec);
t1 = nsec();
fs->lat = (t1 - t0)/1000000;
tmout = 3000;
for(;;){
if (fs->fd < 0){
vdprint(2, "trying %s %s\n", fs->addr, fs->spec);
if (!amountfs(fs)){
vdprint(2, "amount failed: %r\n");
if (!fs->musthave && tmout < Trytmout * 1000)
tmout += 1000;
sleep(tmout);
continue;
}
ncame++;
vdprint(2, "got fs: %s %s\n", fs->addr, fs->spec);
incref(&epoch);
fs->epoch = epoch.ref;
}
tmout = 1000;
rep = rpcalloc();
r = getfrep(fs->fd, rep);
if (r < 0){ // we're dying.
vdprint(2, "readproc: dying\n");
ndead++;
close(fs->fd);
fs->fd = -1;
rpcfree(rep);
rep = nil; // Eio to all pending rpcs
incref(&epoch);
fs->epoch = epoch.ref;
}
sendp(fs->repc, rep);
}
dprint(2, "fsreadproc exiting\n");
threadexits(nil);
}
void
fsmuxthread(void*arg)
{
Fs* fs = arg;
Frpc* fop;
Frpc* rep;
ulong now;
Alt a[] = { { nil, &fop, CHANRCV},
{ nil, &rep, CHANRCV},
{ nil, &now, CHANRCV},
{ nil, 0, CHANEND }};
threadsetname("fsmux");
a[0].c = fs->reqc;
a[1].c = fs->repc;
a[2].c = fs->tmrc;
now = fs->time = time(nil);
for(;;){
rep = nil;
switch(alt(a)){
case Request:
if (fs->fd < 0){
rpcerr(fop, Eio);
sendul(fop->sc, ~0);
} else {
fop->next = fs->rpcs;
fs->rpcs = fop;
fop->time = fs->time;
putfcall(fs->fd, fop);
}
break;
case Reply:
if (rep != nil)
fsmuxrep(fs, rep);
else
abortrpcs(fs); // Eio to them all.
break;
case Tick:
fs->time = now;
fstmouts(fs);
break;
default:
sysfatal("fsmux: alt failed\n");
}
}
fprint(2, "fsmux exiting\n");
threadexits(nil);
}
void
newfsqid(Qid* q)
{
lock(&qidgenlck);
qidgen++;
q->path = (Mvolqid|(qidgen<<44));
q->type = QTDIR;
q->vers = 0;
unlock(&qidgenlck);
}
static Fs*
addfs(char* addr, char* spec)
{
int i;
Fs* fs;
vdprint(2, "adding fs for addr %s spec '%s'\n", addr, spec);
if ((nfss%16) == 0)
fss = erealloc(fss, sizeof(Fs*)*(nfss+16));
i = nfss;
fs = fss[i] = emalloc(sizeof(Fs));
memset(fs, 0, sizeof(Fs));
fs->addr = estrdup(addr);
fs->spec = estrdup(spec);
fs->reqc = chancreate(sizeof(Frpc*), 0);
fs->repc = chancreate(sizeof(Frpc*), 0);
fs->tmrc = chancreate(sizeof(ulong), 0);
fs->fid = fidalloc(0);
fs->fid->fs = fs; // sic
fs->fd = -1;
fs->fdqid.path = -1; // something weird for a qid.
newfsqid(&fs->qid);
/* A call to getfs made by the user may lead to this.
* The caller will already count its ref
*/
fs->ref = 0;
fs->epoch = epoch.ref;
fs->fid->epoch = fs->epoch;
threadcreate(fsmuxthread, fs, Stack);
fs->lat = Biglatency;
fs->pid = proccreate(fsreadproc, fs, Stack);
nfss++;
/* We do not give fsreadproc much chance of mounting the fs.
* We don't do that ourselves and we dont synchronize
* because that might block all the program.
* For volume mounts that must be there for the system to
* work, mustmount should be specified.
* This is ugly, but waiting for all known volumes to be
* mounted may slow booting a lot. Also, after the mount
* request, the next 9p call is likely to find working vols
* already mounted.
*/
sleep(1); // gives a chance.
return fs;
}
Fs*
getfs(char* addr, char* spec)
{
int i;
Fs* c;
qlock(&fslck);
c = nil;
for (i = 0; i < nfss; i++){
if (!strcmp(addr, fss[i]->addr) && !strcmp(spec, fss[i]->spec)){
c = fss[i];
break;
}
}
if (c == nil)
c = addfs(addr, spec);
if (c)
incref(c);
qunlock(&fslck);
return c;
}
void
putfs(Fs* c)
{
// The fs always stay there, once mounted.
// It's likely that we will use them again and we keep them
// already attached. To change this, unmount when no more refs.
if (c)
decref(c);
}
|