#
# Multiplexed octopus connections.
# Provides a file tree that switches from one
# implementor to another without breaking the client name space.
#
# BUGS:
# This requires OR, AND, and MIN expressions, so that a single mount
# of mux would provide the mount point desired. Otherwise, we have to
# o/mux .... ; o/mux -a ... ; o/mux -a ....
# and that is a waste.
# Also, a way is needed to know which particular fs is the one being used.
# Probably a ndb file in the root of each fs muxed.
implement Mux;
include "sys.m";
sys: Sys;
fildes, fprint, FD, OTRUNC, DMDIR, ORCLOSE, OREAD, ORDWR,
Qid, print, pctl, create, mount, QTDIR, pread, open, sprint, NEWPGRP,
FORKNS, NEWFD, MREPL, MBEFORE, MCREATE, MAFTER, pipe,
pwrite, remove, nulldir, fwstat, wstat, fstat, stat, Dir, write, sleep, millisec: import sys;
include "draw.m";
include "styx.m";
styx: Styx;
Tmsg, Rmsg, NOFID, IOHDRSZ, VERSION, MAXRPC,
unpackdir, compatible, packdir, NOTAG: import styx;
include "error.m";
err: Error;
checkload, stderr, panic, kill, error: import err;
include "arg.m";
arg: Arg;
usage: import arg;
include "names.m";
names: Names;
dirname, cleanname, relative, isprefix, rooted: import names;
Mux: module {
init: fn(nil: ref Draw->Context, argv: list of string);
};
include "muxdat.m";
dat: Muxdat;
Qroot, NOQID, Fid, rebind, rootdir, brokenfs,
bindrootdir, addfid, delfid, getfid, maybebroken,
addqid, getqid, fixqid, delqid, renametree, debug, qgen: import dat;
slash: ref Fid;
msize: int;
terminate()
{
if (debug)
fprint(stderr, "mux: terminating\n");
kill(pctl(0,nil), "killgrp"); # kill tmsgreader and any other
exit;
}
fsversion(m: ref Tmsg.Version): ref Rmsg
{
if (msize <= 0)
msize = MAXRPC;
(sz, version) := compatible(m, msize, VERSION);
if(sz < 256)
return ref Rmsg.Error(m.tag, "message size too small");
msize = sz;
return ref Rmsg.Version(m.tag, msize, version);
}
fsattach(m: ref Tmsg.Attach): ref Rmsg
{
slash = ref Fid(m.fid, nil, 0, -1, Qroot, rootdir);
if (addfid(slash) < 0){
slash = nil;
return ref Rmsg.Error(m.tag, "fid already exists");
}
slash.qid = Qroot;
slash.path = rootdir;
q := addqid(rootdir);
if (q != Qroot.path)
panic("bug: first qid != Qroot");
return ref Rmsg.Attach(m.tag, Qroot);
}
fsopen(m: ref Tmsg.Open): ref Rmsg
{
fid := getfid(m.fid);
if (fid == nil)
return ref Rmsg.Error(m.tag, "bad fid");
if (fid.broken)
return ref Rmsg.Error(m.tag, "i/o error");
if (fid.fd != nil)
return ref Rmsg.Error(m.tag, "fid already open");
fid.fd = open(fid.path, m.mode);
if (fid.fd == nil){
e := sprint("%r");
maybebroken(e);
return ref Rmsg.Error(m.tag, e);
}
fid.omode = m.mode;
return ref Rmsg.Open(m.tag, fid.qid, msize - IOHDRSZ);
}
fixdirqids(fid: ref Fid, b: array of byte)
{
d: Dir;
n := 0;
for(o := 0; o < len b; o += n){
(n, d) = unpackdir(b[o:]);
if (n <= 0){
if (n < 0 && debug)
fprint(stderr, "fixdirqids: unpack: %r\n");
break;
}
path := rooted(fid.path, d.name);
d.qid = fixqid(path, d.qid);
b[o:] = packdir(d);
o += n;
}
}
fsread(m: ref Tmsg.Read): ref Rmsg
{
fid := getfid(m.fid);
if (fid == nil)
return ref Rmsg.Error(m.tag, "bad fid");
if (fid.broken)
return ref Rmsg.Error(m.tag, "i/o error");
if (fid.fd == nil)
return ref Rmsg.Error(m.tag, "fid not open");
if (m.count > msize)
return ref Rmsg.Error(m.tag, "count too big");
b := array[m.count] of byte;
nr := pread(fid.fd, b, m.count, m.offset);
if (nr < 0){
e := sprint("%r");
maybebroken(e);
return ref Rmsg.Error(m.tag, e);
}
nb := b[0:nr];
b = nil;
if (fid.qid.qtype&QTDIR)
fixdirqids(fid, nb);
return ref Rmsg.Read(m.tag, nb);
}
fswrite(m: ref Tmsg.Write): ref Rmsg
{
fid := getfid(m.fid);
if (fid == nil)
return ref Rmsg.Error(m.tag, "bad fid");
if (fid.broken)
return ref Rmsg.Error(m.tag, "i/o error");
if (fid.fd == nil)
return ref Rmsg.Error(m.tag, "fid not open");
if (len m.data > msize)
return ref Rmsg.Error(m.tag, "count too big");
nw := pwrite(fid.fd, m.data, len m.data, m.offset);
if (nw < 0){
e := sprint("%r");
maybebroken(e);
return ref Rmsg.Error(m.tag, e);
} else
return ref Rmsg.Write(m.tag, nw);
}
fsclunk(m: ref Tmsg.Clunk): ref Rmsg
{
fid := getfid(m.fid);
if (fid == nil)
return ref Rmsg.Error(m.tag, "bad fid");
delfid(fid);
fid.fd = nil;
return ref Rmsg.Clunk(m.tag);
}
fscreate(m: ref Tmsg.Create): ref Rmsg
{
fid := getfid(m.fid);
if (fid == nil)
return ref Rmsg.Error(m.tag, "bad fid");
if (fid.broken)
return ref Rmsg.Error(m.tag, "i/o error");
if (fid.fd != nil)
return ref Rmsg.Error(m.tag, "fid already open");
npath := rooted(fid.path, m.name);
if (npath == nil)
return ref Rmsg.Error(m.tag, "bad name");
fd := create(fid.path, m.mode, m.perm);
if (fd == nil){
e := sprint("%r");
maybebroken(e);
return ref Rmsg.Error(m.tag, e);
}
fid.fd = fd;
fid.path = npath;
fid.qid = Qid(big 0, 0, 0);
if (m.mode&DMDIR)
fid.qid.qtype = QTDIR;
fid.qid = fixqid(fid.path, fid.qid);
fid.omode = m.mode;
return ref Rmsg.Create(m.tag, fid.qid, msize - IOHDRSZ);
}
fsremove(m: ref Tmsg.Remove): ref Rmsg
{
fid := getfid(m.fid);
if (fid == nil)
return ref Rmsg.Error(m.tag, "bad fid");
if (fid.broken)
return ref Rmsg.Error(m.tag, "i/o error");
path := fid.path;
delfid(fid);
if (remove(path) < 0){
e := sprint("%r");
maybebroken(e);
return ref Rmsg.Error(m.tag, e);
} else {
delqid(fid.path);
return ref Rmsg.Remove(m.tag);
}
}
fswstat(m: ref Tmsg.Wstat): ref Rmsg
{
fid := getfid(m.fid);
if (fid == nil)
return ref Rmsg.Error(m.tag, "bad fid");
if (fid.broken)
return ref Rmsg.Error(m.tag, "i/o error");
m.stat.qid = nulldir.qid;
ec: int;
if (fid.fd != nil)
ec = fwstat(fid.fd, m.stat);
else
ec = wstat(fid.path, m.stat);
if (ec < 0){
e := sprint("%r");
maybebroken(e);
return ref Rmsg.Error(m.tag, e);
} else {
if (m.stat.name != nil)
renametree(fid.path, m.stat.name);
return ref Rmsg.Wstat(m.tag);
}
}
fsstat(m: ref Tmsg.Stat): ref Rmsg
{
fid := getfid(m.fid);
if (fid == nil)
return ref Rmsg.Error(m.tag, "bad fid");
if (fid.broken)
return ref Rmsg.Error(m.tag, "i/o error");
ec: int;
d: Dir;
if (fid.fd != nil)
(ec,d) = fstat(fid.fd);
else
(ec,d) = stat(fid.path);
if (ec < 0){
if (debug)
fprint(stderr, "stat: %s: %r\n", fid.path);
e := sprint("%r");
maybebroken(e);
return ref Rmsg.Error(m.tag, sprint("%r"));
}
if (fid.path == rootdir)
d.name = "/";
d.qid = fid.qid = fixqid(fid.path, d.qid);
return ref Rmsg.Stat(m.tag, d);
}
fswalk(m: ref Tmsg.Walk): ref Rmsg
{
fid := getfid(m.fid);
if (fid == nil)
return ref Rmsg.Error(m.tag, "bad fid");
if (fid.broken)
return ref Rmsg.Error(m.tag, "i/o error");
if (fid.fd != nil)
return ref Rmsg.Error(m.tag, "walk on a open fid");
# Clone if needed
oqid := fid.qid;
opath:= fid.path;
if (m.newfid != m.fid){
nfid := ref Fid(m.newfid, nil, 0, -1, fid.qid, fid.path);
if (addfid(nfid) < 0)
return ref Rmsg.Error(m.tag, "fid already in use");
fid = nfid;
}
# Walk to m.names
nqids := len m.names;
if (nqids == 0)
return ref Rmsg.Walk(m.tag, array[0] of Qid);
wpaths := array[nqids] of string;
wqids := array[nqids] of Qid;
for (i := 0; i < nqids; i++){
fid.path = rooted(fid.path, m.names[i]);
fid.path = cleanname(fid.path);
if (!isprefix(rootdir, fid.path))
fid.path = rootdir;
wpaths[i] = fid.path;
}
(ec, d) := stat(fid.path);
if (ec < 0){
if (debug)
fprint(stderr, "walk: stat: %s: %r\n", fid.path);
e := sprint("%r");
maybebroken(e);
}
if (ec >= 0){ # walk worked. invent everything else.
for (i = 0; i < nqids - 1; i++){
q := getqid(wpaths[i]);
if (q == NOQID)
q = addqid(wpaths[i]);
wqids[i] = Qid(q, 0, QTDIR);
}
fid.qid = wqids[nqids-1] = fixqid(fid.path, d.qid);
return ref Rmsg.Walk(m.tag, wqids);
}
# Couldn't walk. restore fid and respond
fid.qid = oqid;
fid.path = opath;
for (i = 0; i < nqids; i++){
(ec, d) = stat(wpaths[i]);
if (ec < 0){
e := sprint("%r");
maybebroken(e);
if (i == 0)
return ref Rmsg.Error(m.tag, "file does not exist");
else
return ref Rmsg.Walk(m.tag, wqids[0:i]);
}
wqids[i] = fixqid(wpaths[i], d.qid);
}
panic("can or can't walk?");
return nil;
}
reqrdproc(fd: ref FD, reqc: chan of ref Tmsg)
{
pctl(Sys->NEWFD, 0::1::2::fd.fd :: nil);
m: ref Tmsg;
do {
m = Tmsg.read(fd, msize);
if (m != nil)
pick mm := m {
Readerror =>
fprint(stderr, "mux: %s\n", mm.error);
m = nil;
}
reqc <-= m;
} while(m != nil);
}
fsreq(req: ref Tmsg) : ref Rmsg
{
pick m := req {
Version => return fsversion(m);
Auth => return ref Rmsg.Error(m.tag, "no auth required");
Attach => return fsattach(m);
Flush => return ref Rmsg.Flush(m.tag);
Walk => return fswalk(m);
Open => return fsopen(m);
Create => return fscreate(m);
Read => return fsread(m);
Write => return fswrite(m);
Clunk => return fsclunk(m);
Stat => return fsstat(m);
Remove => return fsremove(m);
Wstat => return fswstat(m);
* => panic("bug"); return nil;
}
}
canretry(req: ref Tmsg): int
{
r := tagof(req);
if (r == tagof(Tmsg.Auth) || r == tagof(Tmsg.Read) || r == tagof(Tmsg.Write))
return 0;
return 1;
}
srvproc(reqc: chan of ref Tmsg, cfd: ref FD)
{
for(;;){
req := <-reqc;
if (req == nil)
break;
if (debug)
fprint(stderr, "<- %s\n", req.text());
rebind();
wasbroken := brokenfs;
rep := fsreq(req);
if (rep == nil)
error("nil reply");
if (!wasbroken && brokenfs){
# BUG: Here, if we were not lucky and this thing did break,
# we might call rebind and retry the rpc again, if we see that
# there's no problem in doing so.
if (canretry(req)){
rebind();
if (!brokenfs)
rep = fsreq(req);
}
}
if (debug)
fprint(stderr, "-> %s\n", rep.text());
b := rep.pack();
nw := write(cfd, b, len b);
if (nw != len b){
fprint(stderr, "write: %r\n");
break;
}
}
terminate();
}
mux(pidc: chan of int, fd: ref FD)
{
if (pidc != nil)
pidc <-= pctl(FORKNS|NEWPGRP|NEWFD, list of {0,1,2,fd.fd});
else
pctl(NEWPGRP, nil);
reqc := chan of ref Tmsg;
bindrootdir();
spawn reqrdproc(fd, reqc);
spawn srvproc(reqc, fd);
}
init(nil: ref Draw->Context, args: list of string)
{
sys = load Sys Sys->PATH;
err = load Error Error->PATH;
err->init();
styx = checkload(load Styx Styx->PATH, Styx->PATH);
names = checkload(load Names Names->PATH, Names->PATH);
dat = checkload(load Muxdat Muxdat->PATH, Muxdat->PATH);
arg = checkload(load Arg Arg->PATH, Arg->PATH);
arg->init(args);
arg->setusage("mux [-abcd] [-m mnt] name val [name val]...");
mnt: string;
flag := MREPL|MCREATE;
while((opt := arg->opt()) != 0) {
case opt{
'b' =>
flag = MBEFORE;
'a' =>
flag = MAFTER;
'c' =>
flag |= MCREATE;
'm' =>
mnt = arg->earg();
'd' =>
debug = 1;
* =>
usage();
}
}
args = arg->argv();
argc := len args;
if (argc == 0 || (argc%2) != 0)
usage();
styx->init();
dat->init(sys, err, names, args);
if (mnt == nil)
mux(nil, fildes(0));
else {
pfds := array[2] of ref FD;
if (pipe(pfds) < 0)
error(sprint("mux: pipe: %r"));
pidc := chan of int;
spawn mux(pidc, pfds[0]);
<-pidc;
if (mount(pfds[1], nil, mnt, flag, nil) < 0)
error(sprint("mux: mount: %r"));
pfds[0] = nil;
}
}
|