#
# Watchs out the registry for the octopus.
# 1. Old, and not refreshed entries are removed, and their Ofs processes killed.
# 2. New ofs entries are announced via plumb messages
# 3. Gone ofs entries are announced as gone via plumb messages
implement Watcher;
include "sys.m";
sys: Sys;
FD, fprint, sprint, remove, read, sleep, unmount,
pctl, NEWPGRP, write, OWRITE, OREAD, dirread, open: import sys;
include "error.m";
err: Error;
stderr, checkload, panic, error, kill: import err;
include "arg.m";
arg: Arg;
usage: import arg;
include "registries.m";
regs: Registries;
Service, Registered, Attributes, Registry: import regs;
include "draw.m";
include "plumbmsg.m";
plumbmsg: Plumbmsg;
Msg: import plumbmsg;
include "daytime.m";
daytime: Daytime;
Watcher: module
{
init: fn(nil: ref Draw->Context, nil: list of string);
};
verbose := 0;
debug := 0;
terms: list of (string, int);
dump()
{
s := "terms: ";
for (l := terms; l != nil; l = tl l){
(t, p) := hd l;
s += sprint("%s:%d ", t, p);
}
fprint(stderr, "%s\n", s);
}
newterm(name: string)
{
text := "arrived: " + "/devs/" + name + "\n";
data:= array of byte text;
if (verbose)
fprint(stderr, "watcher: %s\n", text);
m := ref Msg("ofs", "", "/", "text", nil, data);
m.send();
fd := open("/devs/ports/post", OWRITE);
if (fd == nil)
fd = open("/mnt/ports/post", OWRITE);
if (fd != nil)
write(fd, data, len data);
}
goneterm(name: string, pid: int)
{
text := "gone: " + "/devs/" + name + "\n";
data:= array of byte text;
if (verbose)
fprint(stderr, "watcher: %s\n", text);
m := ref Msg("ofs", "", "/", "text", nil, data);
m.send();
kill(pid, "killgrp");
unmount(nil, "/devs/" + name);
fd := open("/devs/ports/post", OWRITE);
if (fd == nil)
fd = open("/mnt/ports/post", OWRITE);
if (fd != nil)
write(fd, data, len data);
}
scan(reg: ref Registry)
{
attrs := list of {("name", "ofs")};
(svcs, e) := reg.find(attrs);
if (e != nil){
fprint(stderr, "watcher: scan: %r");
return;
}
nl : list of (string, int);
for(l := terms; l != nil; l = tl l){
(tname, pid) := hd l;
for(sl := svcs; sl != nil; sl = tl sl){
s := hd sl;
if (tname == s.attrs.get("sys"))
break;
}
if (sl == nil)
goneterm(tname, int pid);
else
nl = (tname, pid)::nl;
}
terms = nl;
for(; svcs != nil; svcs = tl svcs){
s := hd svcs;
sname := s.attrs.get("sys");
pid := s.attrs.get("pid");
for(l = terms; l != nil; l = tl l){
(tname, nil) := hd l;
if (sname == tname)
break;
}
if (l == nil && pid != nil){
newterm(sname);
terms = (sname, int pid)::terms;
}
}
}
# ANY entry that starts with "o!" is potentially gc'd.
collectable(n: string): int
{
if (len n > 2 && n[0:2] == "o!")
return 1;
return 0;
}
dropold(reg: ref Registry, tmout: int)
{
fd := open(reg.dir, OREAD);
if (fd == nil)
panic(sprint("dropold: %s: %r\n", reg.dir));
now := daytime->now();
for(;;){
(n, dirs) := dirread(fd);
if (n <= 0)
break;
for(i := 0; i < n; i++)
if (dirs[i].name != "event" && dirs[i].name != "find" && dirs[i].name[0] != '.')
if(collectable(dirs[i].name))
if (dirs[i].name != "index" && dirs[i].name != "new"){
if (now - dirs[i].mtime > tmout){
if (verbose)
fprint(stderr, "timed out: %s (%d secs)\n", dirs[i].name, now - dirs[i].mtime > tmout );
remove(reg.dir + "/" + dirs[i].name);
}
}
}
}
watcher(reg: ref Registry, c: chan of int, tmout: int)
{
scan(reg);
for(;;){
if (debug)
dump();
<-c;
if (tmout != 0)
dropold(reg, tmout);
scan(reg);
}
}
events(cfd: ref FD, c: chan of int)
{
msg := array[30] of byte;
for(;;){
nr := read(cfd, msg, len msg);
if (nr <= 0)
panic("watcher: event eof");
c <-= 0;
}
}
timer(t: int, c: chan of int)
{
for(;;){
sleep(t);
c <-= 0;
}
}
init(nil: ref Draw->Context, args: list of string)
{
regdir := "/mnt/registry";
tick := 120 * 1000;
tmout := tick;
sys = load Sys Sys->PATH;
err = load Error Error->PATH;
err->init();
regs = checkload(load Registries Registries->PATH, Registries->PATH);
regs->init();
plumbmsg = checkload(load Plumbmsg Plumbmsg->PATH, Plumbmsg->PATH);
if (plumbmsg->init(1, nil, 0) < 0)
error(sprint("plumbmsg: %r"));
daytime = checkload(load Daytime Daytime->PATH, Daytime->PATH);
arg = checkload(load Arg Arg->PATH, Arg->PATH);
arg->init(args);
arg->setusage("watcher [-dv] [-i ms] [-t tmout] [-r regdir]");
while((opt := arg->opt()) != 0) {
case opt{
'd' =>
debug = verbose = 1;
'i' =>
tick = int arg->earg();
if (tick < 5000)
tick = 5000;
't' =>
tmout = int arg->earg();
'r' =>
regdir = arg->earg();
'v' =>
verbose = 1;
* =>
usage();
}
}
if (tmout != 0 && tmout < 1)
tmout = 1;
args = arg->argv();
if (len args != 0)
usage();
reg := Registry.new(regdir);
if (reg == nil)
error(sprint("registry: %r"));
efd := open(regdir + "/event", OREAD); # open here to abort in parent.
if (efd == nil)
error(sprint("no registr event: %r (old inferno?)"));
c := chan of int;
pctl(NEWPGRP, nil);
spawn timer(tick, c);
spawn events(efd, c);
spawn watcher(reg, c, tmout);
}
|