Plan 9 from Bell Labs’s /usr/web/sources/contrib/nemo/octopus/port/lib/opmux.b

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


implement Opmux;
include "sys.m";
	sys: Sys;
	write, millisec, pctl, fprint, fildes, QTDIR, FD: import sys;
include "op.m";
	op: Op;
	Tmsg, Rmsg, ODATA, OSTAT, OMORE: import op;
include "error.m";
	err: Error;
	stderr, kill: import err;

include "opmux.m";

Opcall: adt {
	req: ref Op->Tmsg;
	repc: chan of ref Op->Rmsg;
};

Stats: adt {
	nput, nget, nremove, nflush, nconc, nrecover: int;

	dump: fn(s: self ref Stats);
};

#
# Tags must be assigned by clients, so that the mux knows
# how to flush a Tag
#

opc : chan of ref Opcall;
oprdprocpid: int;
stats: ref Stats;

init(ofd: ref Sys->FD, o: Op, endc: chan of string)
{
	sys = load Sys Sys->PATH;
	err = load Error Error->PATH;
	err->init();
	op = o;
	stats = ref Stats(0, 0, 0, 0, 0, 0);
	opc = chan of ref Opcall;
	oprc:= chan of ref Rmsg;
	spawn oprdproc(ofd, oprc);
	<-oprc;
	spawn opmuxproc(ofd, opc, oprc, endc);
}

term()
{
	opc <-= nil;
}
recover(ofd: ref FD, oprc: chan of ref Rmsg, reqs: list of ref Opcall): list of ref Opcall
{
	kill(oprdprocpid, "kill");
	spawn oprdproc(ofd, oprc);
	<-oprc;
	l: list of ref Opcall;
	for(; reqs != nil; reqs = tl reqs){
		m := hd reqs;
		tmsg := m.req.pack();
		nw := write(ofd, tmsg, len tmsg);
		if (nw != len tmsg){
			fprint(stderr, "ofs: recover: failed: %s\n", m.req.text());
			m.repc <-= ref Rmsg.Error(m.req.tag, "i/o error");
		} else
			l = m :: l;
	}
	stats.nrecover++;
	return l;
}

Stats.dump(s: self ref Stats)
{
	tot := s.nput + s.nget + s.nremove;
	fprint(stderr,"op:\n");
	fprint(stderr,"\t%s\t%d\n", "put", s.nput);
	fprint(stderr,"\t%s\t%d\n", "get", s.nget);
	fprint(stderr,"\t%s\t%d\n", "remove", s.nremove);
	fprint(stderr, "\t%s\t%d\n", "flush", s.nflush);
	fprint(stderr, "\t%s\t%d\n", "recover", s.nrecover);
	fprint(stderr, "\tconc.\t%d\n", s.nconc);
	fprint(stderr,"\ttotal\t%d\n", tot);
}

oprdproc(ofd: ref FD, oprc: chan of ref Rmsg)
{
	oprdprocpid = pctl(0, nil);
	oprc <-= nil;

	if (debug)
		fprint(stderr, "opmuxproc\n");
	for(;;){
		m := op->Rmsg.read(ofd, 0);
		if (debug){
			if (m == nil)
				fprint(stderr, "oprdproc: eof\n");
		}
		oprc <-= m;
		if (m == nil || tagof(m) == tagof(Rmsg.Readerror))
			break;
	}
	if (debug)
		fprint(stderr, "oprdproc: exit\n");
}

opmuxproc(ofd: ref FD, opcc: chan of ref Opcall, oprc: chan of ref Rmsg, endc: chan of string)
{
	reqs: list of ref Opcall;	# outstanding ones.

	broken := 0;
	for(;;){
		alt {
		m := <- opcc =>
			now := millisec();
			if (debug && reqs != nil)
				rdump(reqs);
			if (m == nil){
				if (debug) fprint(stderr, "opmux: hangup: eof\n");
				abortall(reqs);
				kill(oprdprocpid, "kill");
				endc <-= "hangup: eof";
				exit;
			}
			if (debug)
				fprint(stderr, "\n%d\t<-op- %s\n", now, m.req.text());
			if (broken){
				r := ref Rmsg.Error(m.req.tag, "i/o error");
				if (debug)
					fprint(stderr, "\n%d\t-op-> %s\n", now, r.text());
				m.repc <-= r;
				continue;
			}
			pick x := m.req {
			Put => stats.nput++;
			Get => stats.nget++;
			Remove => stats.nremove++;
			Flush => stats.nflush++;
			}
			tmsg := m.req.pack();
			nw := write(ofd, tmsg, len tmsg);
			reqs = m :: reqs;
			if (len reqs > stats.nconc)
				stats.nconc = len reqs;
			if (nw != len tmsg){
				fprint(stderr, "opmux: hangup: write: %r\n");
				if (recoverfn != nil && (ofd = recoverfn()) != nil){
					reqs = recover(ofd, oprc, reqs);
					continue;
				}
				m.repc <-= ref Rmsg.Error(m.req.tag, "i/o error");
				abortall(reqs);
				kill(oprdprocpid, "kill");
				endc <-= "hangup: write";
				exit;
			}
		rmsg := <- oprc =>
			m: ref Opcall;
			last: int;
			if (rmsg == nil || tagof(rmsg) == tagof(Rmsg.Readerror)){
				fprint(stderr, "opmux: hangup: read\n");
				if (tagof(rmsg) == tagof(Rmsg.Readerror) && debug)
					fprint(stderr, "%s\n", rmsg.text());
				if (recoverfn != nil && (ofd = recoverfn()) != nil){
					reqs = recover(ofd, oprc, reqs);
					continue;
				}
				abortall(reqs);
				kill(oprdprocpid, "kill");
				endc <-= "hangup: read";
				broken = 1;
				exit;
			}
			(m, reqs, last) = muxreply(reqs, rmsg);
			if (m == nil || m.repc == nil){
				fprint(stderr, "nil reply o no reply chan\n");
				continue;
			}
			if (debug)
				fprint(stderr, "%d\t-op-> %s\n\n", millisec(), rmsg.text());
			m.repc <-= rmsg;
		}
	}
	kill(oprdprocpid, "kill");
}

dump()
{
	stats.dump();
}

rdump(reqs: list of ref Opcall)
{
	stats.dump();
	while(reqs != nil){
		fprint(stderr, "\t- %s\n", (hd reqs).req.text());
		reqs = tl reqs;
	}
}

muxreply(reqs: list of ref Opcall, rmsg: ref Rmsg): (ref Opcall, list of ref Opcall, int)
{
	nreqs : list of ref Opcall;
	call: ref Opcall;
	done := 1;
	flushed := ~0;
	pick fmsg := rmsg {
	Flush =>
		for(l := reqs; l != nil && flushed == 0; l = tl l){
			m := hd l;
			if (m.req.tag == rmsg.tag)
			pick freq := m.req {
			Flush =>
				flushed = freq.oldtag;
			}
		}
	}
	for(; reqs != nil; reqs = tl reqs){
		m := hd reqs;
		if (m.req.tag == flushed){
			m.repc <-= ref Rmsg.Error(m.req.tag, "flushed");
		} else if (m.req.tag != rmsg.tag)
			nreqs = m :: nreqs;
		else {
			call = m;
			pick req := m.req {
			Get =>
				if (req.nmsgs == 0 || --req.nmsgs > 0){
					pick rm := rmsg {
					Get =>
						if ((rm.mode&OSTAT) && (rm.stat.qid.qtype&QTDIR) && (req.mode&ODATA))
							req.nmsgs = 0; # dirs accept any number of Rget.
						if (rm.mode&OMORE)
							done = 0;
					}
				}
			}
			if (!done)
				nreqs = m :: nreqs;
		}
	}
	if (call == nil)
		fprint(stderr, "opmux: no request for %s\n", rmsg.text());
	else if (tagof(rmsg) != tagof(Rmsg.Error) && rmsg.mtype() != call.req.mtype()+1){
		fprint(stderr, "opmux: type mismatch:\n");
		fprint(stderr, "\tcall: %s\n\treply:%s\n", rmsg.text(), call.req.text());
	}
	return (call, nreqs, done);
}

abortall(reqs: list of ref Opcall)
{
	if (debug)
		fprint(stderr, "opmux: aborting\n");
	while(reqs != nil){
		m := hd reqs;
		m.repc <-= ref Rmsg.Error(m.req.tag, "i/o error");
		reqs = tl reqs;
	}
}

rpc(t: ref Op->Tmsg) : chan of ref Op->Rmsg
{
	rc := chan[1] of ref Op->Rmsg;
	if (opc == nil)
		fprint(stderr, "opmux: nil opc");
	opc <-= ref Opcall(t, rc);
	return rc;
}


Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to [email protected].