Plan 9 from Bell Labs’s /usr/web/sources/contrib/maht/limbo/pg/pgcfs.b

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.


implement Pgcfs;


# The Dbfs code I started with as a template said : 
# Copyright © 1999 Vita Nuova Limited.  All rights reserved.
# Revisions copyright © 2002 Vita Nuova Holdings Limited.  All rights reserved.
# Ok well, I raise you Postgresql catalogue tables as the datastore [email protected] is this GPL now ? Tis a bit confusing. I'm sure you don't mind.

#pgcfs && cat /mnt/db/1		mk && ./pgcfs && ls -l /mnt/db/proc/ | grep www

#	mk && ./pgcfs && ls /mnt/db/user/www/proc 

debug : con 0;


include "sys.m";
	sys: Sys;
	Qid: import Sys;

include "draw.m";

include "arg.m";

include "styx.m";
	styx: Styx;
	Tmsg, Rmsg: import styx;

include "styxservers.m";
	styxservers: Styxservers;
	Fid, Styxserver, Navigator, Navop: import styxservers;
	Enotfound, Eperm, Ebadarg: import styxservers;

include "bufio.m";
	bufio: Bufio;
	Iobuf: import bufio;

include "keyring.m";
	keyring: Keyring;

include "pgbase.m";
	pgbase :PgBase;
	Connection : import pgbase;

include "pg_catalogue.m";
	pgcatalogue : PgCatalogue;
	Catalogue : import pgcatalogue;

# XXX
Record: adt {
	id:		int;		# file number in directory
	x:		int;		# index in file
	dirty:	int;		# modified but not written
	vers:		int;		# version
	data:		array of byte;

	new:		fn(x: array of byte): ref Record;
	print:	fn(r: self ref Record, fd: ref Sys->FD);
	qid:		fn(r: self ref Record): Sys->Qid;
};

# XXX
Database: adt {
	name:	string;
	file:	ref Iobuf;
	records:	array of ref Record;
	dirty:	int;
	vers:		int;
	nextid:	int;

	findrec:	fn(db: self ref Database, id: int): ref Record;
};

Pgcfs: module
{
	init: fn(nil: ref Draw->Context, nil: list of string);
};

Qdir, Qnew, Qdata, Qproc, Qprocdir, Quserdir, Quser, Qsysid, Quserprocdir, Quserproc: con iota;

clockfd: ref Sys->FD;
stderr: ref Sys->FD;
database: ref Database;
err_chan : chan of string;

user: string;
Eremoved: con "file removed";

pg_cat : ref Catalogue;

usage()
{
	sys->fprint(stderr, "Usage: dbfs [-a|-b|-ac|-bc] [-D] file mountpoint\n");
	raise "fail:usage";
}

nomod(s: string)
{
	sys->fprint(stderr, "dbfs: can't load %s: %r\n", s);
	raise "fail:load";
}

open_file(file : string, empty : int) : ref Iobuf
{
	df := bufio->open(file, Sys->OREAD);
	if(df == nil && empty){
		(rc, nil) := sys->stat(file);
		if(rc < 0)
			df = bufio->create(file, Sys->OREAD, 8r600);
	}
	if(df == nil){
		sys->fprint(stderr, "dbfs: can't open %s: %r\n", file);
		raise "fail:open";
	}
	return df;
}


create_pipes() : array of ref Sys->FD
{
	fds := array[2] of ref Sys->FD;
	if(sys->pipe(fds) < 0){
		sys->fprint(stderr, "dbfs: can't create pipe: %r\n");
		raise "fail:pipe";
	}
	return fds;
}

init(nil: ref Draw->Context, args: list of string)
{
	sys = load Sys Sys->PATH;
	sys->pctl(Sys->FORKFD|Sys->NEWPGRP, nil);
	stderr = sys->fildes(2);
	styx = load Styx Styx->PATH;
	if(styx == nil)
		nomod(Styx->PATH);
	styx->init();
	styxservers = load Styxservers Styxservers->PATH;
	if(styxservers == nil)
		nomod(Styxservers->PATH);
	styxservers->init(styx);
	bufio = load Bufio Bufio->PATH;
	if(bufio == nil)
		nomod(Bufio->PATH);

	arg := load Arg Arg->PATH;
	if(arg == nil)
		nomod(Arg->PATH);
	arg->init(args);

	err_chan = chan of string;

	spawn logger();

	pgcatalogue = load PgCatalogue "pg_catalogue.dis";
	conn := pgcatalogue->new_connection("127.0.0.1", "5432", "www", "", "study", nil, nil);
	pg_cat = pgcatalogue->new_catalogue(conn);
	if(pg_cat == nil)
		raise "catalogue creation failed";

	pg_cat.err_chan = err_chan;
	pg_cat.sync();

	flags := Sys->MREPL;
	flags |= Sys->MCREATE;

	mountpt := "/mnt/db";

	sys->pctl(Sys->FORKFD, nil);

	user = rf("/dev/user");
	if(user == nil)
		user = "inferno";

	fds := create_pipes();

	navops := chan of ref Navop;
	spawn navigator(navops);

	(tchan, srv) := Styxserver.new(fds[0], Navigator.new(navops), big Qdir);
	fds[0] = nil;

	pidc := chan of int;
	spawn serveloop(tchan, srv, pidc, navops);
	<-pidc;

	if(sys->mount(fds[1], nil, mountpt, flags, nil) < 0) {
		sys->fprint(stderr, "dbfs: mount failed: %r\n");
		raise "fail:mount";
	}
}

logger()
{
	txt : string;
	while((txt = <- err_chan) != nil) {
		if(debug)
			sys->print("%s\n", txt);
	}
	if(debug)
		sys->print("Closing\n");
}

rf(f: string): string
{
	fd := sys->open(f, Sys->OREAD);
	if(fd == nil)
		return nil;
	b := array[Sys->NAMEMAX] of byte;
	n := sys->read(fd, b, len b);
	if(n < 0)
		return nil;
	return string b[0:n];
}

#XXX
dbread(db: ref Database): (ref Database, string)
{
	db.file.seek(big 0, Sys->SEEKSTART);
	rl: list of ref Record;
	n := 0;
	for(;;){
		(r, err) := getrec(db);
		if(err != nil)
			return (nil, err);		# could press on without it, or make it the `file' contents
		if(r == nil)
			break;
		rl = r :: rl;
		n++;
	}
	db.nextid = n;
	db.records = array[n] of ref Record;
	for(; rl != nil; rl = tl rl){
		r := hd rl;
		n--;
		r.id = n;
		r.x = n;
		db.records[n] = r;
	}
	return (db, nil);
}

#XXX
getrec(db: ref Database): (ref Record, string)
{
	r := ref Record(-1, -1, 0, 0, nil);
	data := "";
	for(;;){
		s := db.file.gets('\n');
		if(s == nil){
			if(data == nil)
				return (nil, nil);		# BUG: distinguish i/o error from EOF?
			break;
		}
		if(s[len s - 1] != '\n')
#			return (nil, "file missing newline");	# possibly truncated
			s += "\n";
		if(s == "\n")
			break;
		data += s;
	}
	r.data = array of byte data;
	return (r, nil);
}

#XXX
dbsync(db: ref Database): int
{
	if(db.dirty){
		db.file = bufio->create(db.name, Sys->OWRITE, 8r666);
		if(db.file == nil)
			return -1;
		for(i := 0; i < len db.records; i++){
			r := db.records[i];
			if(r != nil && r.data != nil){
				if(db.file.write(r.data, len r.data) != len r.data)
					return -1;
				db.file.putc('\n');
			}
		}
		if(db.file.flush())
			return -1;
		db.file = nil;
		db.dirty = 0;
	}
	return 0;
}


dbprint(db: ref Database)
{
	stdout := sys->fildes(1);
	for(i := 0; i < len db.records; i++){
		db.records[i].print(stdout);
		sys->print("\n");
	}
}

Database.findrec(db: self ref Database, id: int): ref Record
{
	for(i:=0; i<len db.records; i++)
		if((r := db.records[i]) != nil && r.id == id)
			return r;
	return nil;
}

Record.new(fields: array of byte): ref Record
{
	n := len database.records;
	r := ref Record(n, n, 0, 0, fields);
	a := array[n+1] of ref Record;
	if(n)
		a[0:] = database.records[0:];
	a[n] = r;
	database.records = a;
	database.vers++;
	return r;
}

Record.print(r: self ref Record, fd: ref Sys->FD)
{
	if(r.data != nil)
		sys->write(fd, r.data, len r.data);
}

Record.qid(r: self ref Record): Sys->Qid
{
	return Sys->Qid(QPATH(r.x, Qdata), r.vers, Sys->QTFILE);
}


serveloop(tchan: chan of ref Tmsg, srv: ref Styxserver, pidc: chan of int, navops: chan of ref Navop)
{
	pidc <-= sys->pctl(Sys->FORKNS|Sys->NEWFD, 1::2::srv.fd.fd::nil);
Serve:
	while((gm := <-tchan) != nil){
		pick m := gm {
		Readerror =>
			sys->fprint(stderr, "dbfs: fatal read error: %s\n", m.error);
			break Serve;
		Open =>
			c := srv.getfid(m.fid);
			if(c == nil || TYPE(c.path) != Qnew){
				srv.open(m);		# default action
				break;
			}
			if(c.uname != user) {
				srv.reply(ref Rmsg.Error(m.tag, Eperm));
				break;
			}
			mode := styxservers->openmode(m.mode);
			if(mode < 0) {
				srv.reply(ref Rmsg.Error(m.tag, Ebadarg));
				break;
			}
			# generate new file, change Fid's qid to match
			r := Record.new(array[0] of byte);
			qid := r.qid();
			c.open(mode, qid);
			srv.reply(ref Rmsg.Open(m.tag, qid, srv.iounit()));
		Read =>
			(c, err) := srv.canread(m);
			if(c == nil){
				srv.reply(ref Rmsg.Error(m.tag, err));
				break;
			}
			if(c.qtype & Sys->QTDIR){
				srv.read(m);	# does readdir
				break;
			}
			r := database.records[FILENO(c.path)];
			if(r == nil)
				srv.reply(ref Rmsg.Error(m.tag, Eremoved));
			else
				srv.reply(styxservers->readbytes(m, r.data));
		Write =>
			(c, merr) := srv.canwrite(m);
			if(c == nil){
				srv.reply(ref Rmsg.Error(m.tag, merr));
				break;
			}
			(value, err) := data2rec(m.data);
			if(err != nil){
				srv.reply(ref Rmsg.Error(m.tag, err));
				break;
			}
			fno := FILENO(c.path);
			r := database.records[fno];
			if(r == nil){
				srv.reply(ref Rmsg.Error(m.tag, Eremoved));
				break;
			}
			r.data = value;
			r.vers++;
			database.dirty++;
			if(dbsync(database) == 0)
				srv.reply(ref Rmsg.Write(m.tag, len m.data));
			else
				srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r")));
		Clunk =>
			# a transaction-oriented dbfs could delay updating the record until clunk
			srv.clunk(m);
		Remove =>
			c := srv.getfid(m.fid);
			if(c == nil || c.qtype & Sys->QTDIR || TYPE(c.path) != Qdata){
				# let it diagnose all the errors
				srv.remove(m);
				break;
			}
			r := database.records[FILENO(c.path)];
			if(r != nil)
				r.data = nil;
			database.dirty++;
			srv.delfid(c);
			if(dbsync(database) == 0)
				srv.reply(ref Rmsg.Remove(m.tag));
			else
				srv.reply(ref Rmsg.Error(m.tag, sys->sprint("%r")));
		Wstat =>
			srv.default(gm);	# TO DO?
		* =>
			srv.default(gm);
		}
	}
	navops <-= nil;		# shut down navigator
}

dirslot(n: int): int
{
	for(i := 0; i < len database.records; i++){
		r := database.records[i];
		if(r != nil && r.data != nil){
			if(n == 0)
				return i;
			n--;
		}
	}
	return -1;
}

#
# a record is (.+\n)*, without final empty line
#
data2rec(data: array of byte): (array of byte, string)
{
	s: string;
	for(b := data; len b > 0;){
		(b, s) = getline(b);
		if(s == nil || s[len s - 1] != '\n' || s == "\n")
			return (nil, "partial or malformed record");	# possibly truncated
	}
	return (data, nil);
}

getline(b: array of byte): (array of byte, string)
{
	n := len b;
	for(i := 0; i < n; i++){
		(ch, l, nil) := sys->byte2char(b, i);
		i += l;
		if(l == 0 || ch == '\n')
			break;
	}
	return (b[i:], string b[0:i]);
}


serveloopX(tchan: chan of ref Tmsg, srv: ref Styxserver, pidc: chan of int, navops: chan of ref Navop)
{
	pidc <-= sys->pctl(Sys->FORKNS|Sys->NEWFD, 1::2::srv.fd.fd::nil);
Serve:
	while((gm := <-tchan) != nil){	
		err_chan <- = "Serve";
		pick m := gm {
		Readerror =>
			err_chan <- = "Readerror";
			sys->fprint(stderr, "dbfs: fatal read error: %s\n", m.error);
			break Serve;
		Open =>
			err_chan <- = "Open";
			c := srv.getfid(m.fid);
			if(c == nil){
				srv.open(m);		# default action
				err_chan <- = "Open";
				break;
			}
			err_chan <- = "Opened";
		Read =>
			err_chan <- = "Read";
			(c, err) := srv.canread(m);
			if(c == nil){
				srv.reply(ref Rmsg.Error(m.tag, err));
				break;
			}
			if(c.qtype & Sys->QTDIR){
				srv.read(m);	# does readdir
				break;
			}
			case TYPE(c.path) {
			Qproc =>
				err_chan <- = "Qproc";
				pid := FILENO(c.path);
				if(pid < len pg_cat.procs) {
					srv.reply(styxservers->readbytes(m, array of byte pg_cat.proc_sql(pid)));
				} else {
					srv.reply(ref Rmsg.Error(m.tag, Eremoved));
				}
			* =>
				err_chan <- = "*";
				srv.reply(nil);
			}
		Write =>
			err_chan <- = "Write";
			(c, merr) := srv.canwrite(m);
			if(c == nil){
				srv.reply(ref Rmsg.Error(m.tag, merr));
				break;
			}
			
			case TYPE(c.path) {
			Qproc =>
				err_chan <- = "Qproc";
				pid := FILENO(c.path);
				if(pid < len pg_cat.procs) {
					srv.reply(ref Rmsg.Write(m.tag, len m.data));
				} else {
					srv.reply(ref Rmsg.Error(m.tag, Eremoved));
				}
			* =>
				err_chan <- = "*";
				srv.reply(nil);
			}

		Clunk =>
			err_chan <- = "Clunk";
			# a transaction-oriented dbfs could delay updating the record until clunk
			srv.clunk(m);
		Remove =>
			err_chan <- = "Remove";
			c := srv.getfid(m.fid);
			if(c == nil || c.qtype & Sys->QTDIR){
				srv.remove(m);
				break;
			}
			case TYPE(c.path) {
			Qproc =>
				err_chan <- = "Qproc";
				pid := FILENO(c.path);
				if(pg_cat.drop_proc(pid)) {
					srv.reply(ref Rmsg.Remove(m.tag));
				} else {
					srv.reply(ref Rmsg.Error(m.tag, "Drop failed"));
				}
			* =>
				err_chan <- = "*";
				srv.reply(nil);
			}
		Wstat =>
			err_chan <- = "Wstat";
			srv.default(gm);	# TO DO?
		* =>
			err_chan <- = "Default";
			srv.default(gm);
		}
		err_chan <- = "Served";
	}
	err_chan <- = nil;
	navops <-= nil;		# shut down navigator
	pgcatalogue->disconnect(pg_cat.conn);
}


dir(qid: Sys->Qid, name: string, length: big, uid: string, perm, mtime: int): ref Sys->Dir
{
	d := ref sys->zerodir;
	d.qid = qid;
	if(qid.qtype & Sys->QTDIR)
		perm |= Sys->DMDIR;
	d.mode = perm;
	d.name = name;
	d.uid = uid;
	d.gid = uid;
	d.length = length;
	d.mtime = mtime;
	return d;
}

proc_dirname(id : int) : string
{
	p := pg_cat.procs[id];
	dirname := p.name + "+";

	for(i := 0; i < p.nargs; i++) {
		if(p.argnames != nil && i < len p.argnames)
			dirname += p.argnames[i] + "-";

		if(p.argtypes != nil && i < len p.argtypes)
			dirname += pg_cat.type_name(p.argtypes[i]);

		if(i < p.nargs - 1)
			dirname += ",";

	}
	return dirname;

}

dirgen(p: big): (ref Sys->Dir, string)
{
	case TYPE(p) {
	Qdir =>
		return (dir(Qid(QPATH(0, Qdir), 0, Sys->QTDIR), "/", big 0, pg_cat.users[0].name, 8r555, pg_cat.last_sync), nil);
	Qprocdir =>
		return (dir(Qid(QPATH(0, Qprocdir), 0, Sys->QTDIR), "proc", big 0, pg_cat.users[0].name, 8r555, pg_cat.last_sync), nil);
	Qproc =>
		id := FILENO(p);
		src := pg_cat.proc_sql(id);
		return (dir(Qid(QPATH(id, Qproc), 0, Sys->QTFILE), proc_dirname(id), big len src, pg_cat.user_name(pg_cat.procs[id].owner), 8r444, pg_cat.last_sync), nil);
	Quserprocdir =>
		sysid := FILENO(p);
		return (dir(Qid(QPATH(sysid, Quserprocdir), 0, Sys->QTDIR), "proc", big 0, pg_cat.user_name(sysid), 8r555, pg_cat.last_sync), nil);
	Quserdir =>
		return (dir(Qid(QPATH(0, Quserdir), 0, Sys->QTDIR), "user", big 0, pg_cat.users[0].name, 8r555, pg_cat.last_sync), nil);
	Quser =>
		sysid := FILENO(p);
		return (dir(Qid(QPATH(sysid, Quser), 0, Sys->QTDIR), pg_cat.user_name(sysid), big 0, pg_cat.users[0].name, 8r555, pg_cat.last_sync), nil);
		
	Qsysid =>
		sysid := FILENO(p);
		return (dir(Qid(QPATH(sysid, Quser), 0, Sys->QTFILE), "sysid", big len string sysid, pg_cat.users[0].name, 8r444, pg_cat.last_sync), nil);
	* =>
		err_chan <- = "dirgen *";
		return (nil, Enotfound);
	}
}

navigator(navops: chan of ref Navop)
{
	while((m := <-navops) != nil){
		err_chan <-= "Navops";
		pick n := m {
		Stat =>
			err_chan <-= "Stat";
			n.reply <-= dirgen(n.path);
		Walk =>
			err_chan <-= "Walk to " + n.name;
			case TYPE(n.path) {
			Qdir =>
				err_chan <-= " From Qdir";
				case n.name {
				".." =>
					;	# nop
				"proc" =>
					n.path = QPATH(0, Qprocdir);
					n.reply <-= dirgen(n.path);
				"user" =>
					n.path = QPATH(0, Quserdir);
					n.reply <-= dirgen(n.path);
				* =>
					n.reply <-= (nil, Enotfound);
				}
			Qprocdir =>
				err_chan <-= " From Qprocdir";
				case n.name {
				".." =>
					n.path = QPATH(0, Qdir);
					n.reply <-= dirgen(n.path);
				* =>
					(numbits, bits) := sys->tokenize(n.name, "+");
					err_chan <- = "numbits " + string numbits + " bits[0] " + hd bits;
			
					(numargs, args) := sys->tokenize(hd tl bits, ",");
					err_chan <- = "numargs " + string numargs;
					if(numargs > 0)
						err_chan <- = "args[0] " + hd args;
			
			
					n.reply <-= (nil, Enotfound);
				}
			Quserdir =>
				err_chan <-= " From Quserdir";
				case n.name {
				".." =>
					n.path = QPATH(0, Qdir);
					n.reply <-= dirgen(n.path);
				* =>
					(nil, sysid) := pg_cat.user_sysid(n.name);
					if(sysid > 0) {
						n.path = QPATH(sysid, Quser);
						n.reply <-= dirgen(n.path);
					} else {
						n.reply <-= (nil, Enotfound);
					}
				}
			Quser =>
				err_chan <-= " From Quser";
				case n.name {
				".." =>
					n.path = QPATH(0, Quserdir);
					n.reply <-= dirgen(n.path);
				"sysid" =>
					n.path = QPATH(FILENO(n.path), Qsysid);
					n.reply <-= dirgen(n.path);
				"proc" =>
					n.path = QPATH(FILENO(n.path), Quserprocdir);
					n.reply <-= dirgen(n.path);
				* =>
					n.reply <-= (nil, Enotfound);
				}

			* =>
				err_chan <-= " From *";
				n.reply <-= (nil, "not a directory");
			}	
		Readdir =>
			err_chan <-= "Readdir";
			i := n.offset;
			case TYPE(m.path) {
			Qprocdir =>
				err_chan <-= "Qprocdir";
				if(pg_cat.procs == nil)
					pg_cat.fill_procs();

				if(pg_cat.procs != nil) {
					for(; --n.count >= 0 && i < len pg_cat.procs; i++) {
						n.reply <-= dirgen(QPATH(i, Qproc));	# n² but the file will be small		
					}			
				} 
			Quserprocdir =>
				if(pg_cat.procs == nil)
					pg_cat.fill_procs();
				
				if(pg_cat.procs != nil) {
					sysid := FILENO(m.path);
					offset := i;
					for(k := 0 ; n.count > 0 && k < len pg_cat.procs; k++) {
						if(pg_cat.procs[k].owner == sysid) {
							if(offset == 0) {
								n.reply <-= dirgen(QPATH(k, Qproc));	# n² but the file will be small		
								n.count--;
							} else {
								offset--;
							}
						}
					}			
				} 
			Quserdir =>
				err_chan <-= "Quserdir";
				if(pg_cat.users == nil)
					pg_cat.fill_users();

				if(pg_cat.users != nil) {
					for(; --n.count >= 0 && i < len pg_cat.users; i++) {
						n.reply <-= dirgen(QPATH(pg_cat.users[i].sysid, Quser));
					}			
				}
			Quser =>
				err_chan <-= "Quser";
				if(i == 0 && n.count > 0) {
					n.reply <-= dirgen(QPATH(0,Qsysid));
					i++;
					--n.count;
				}
				if(i == 1 && n.count-- > 0)
					n.reply <-= dirgen(QPATH(0,Qprocdir));
			Qdir =>
				err_chan <-= "Qdir";
				if(i == 0 && n.count > 0) {
					n.reply <-= dirgen(QPATH(0,Qprocdir));
					i++;
					--n.count;
				}
				if(i == 1 && n.count-- > 0)
					n.reply <-= dirgen(QPATH(0,Quserdir));
			* =>
				err_chan <-= "Not Dir";
				n.reply <-= (nil, "not a directory");
			}
			n.reply <-= (nil, nil);
		}
		err_chan <-= "Navigated";
	}
}

QPATH(w, q: int): big
{
	return big ((w<<8)|q);
}

TYPE(path: big): int
{
	return int path & 16rFF;
}

FILENO(path: big) : int
{
	return (int path >> 8) & 16rFFFFFF;
}

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to [email protected].