Plan 9 from Bell Labs’s /usr/web/sources/contrib/maht/limbo/pg/pgbase.b

Copyright © 2021 Plan 9 Foundation.
Distributed under the MIT License.
Download the Plan 9 distribution.



implement PgBase;

include "sys.m";
	sys: Sys;
include "bufio.m";
	bufio: Bufio;
	Iobuf: import bufio;

include "keyring.m";
	keyring: Keyring;

include "pgbase.m";




debug : con 0;

bytes_to_int(b : array of byte) : (int, int)
{
	neg, i : int;

	if(len b > 4) return (1, 0);

	if(int (b[0] & byte 2r10000000) > 0) {
		neg = -1;
		i = 1;
	} else {
		neg = 1;
		i = 0;
	}
	shift := 0;

	for(k := len(b) -1; k >= 0; k--) {
		if(neg == -1)
			i += (int ~b[k]) << shift;
		else
			i += (int b[k]) << shift;
		shift += 8;
	}

	return (0, i * neg);
}

bytes_to_big(b : array of byte) : (int, big)
{
	i : big;
	neg : int;

	if(len b > 8) return (1, big 0);

	if(int (b[0] & byte 2r10000000) > 0) {
		neg = -1;
		i = big 1;
	} else {
		neg = 1;
		i = big 0;
	}
	shift := 0;

	for(k := len(b) -1; k >= 0; k--) {
		if(neg == -1)
			i += (big ~b[k]) << shift;
		else
			i += (big b[k]) << shift;
		shift += 8;
	}

	return (0, i * big neg);
}

big_to_bytes(i : big, length : int) : array of byte
{
	b := array[length] of byte;
	shift := 0;
	j : big;
	if(i < big 0)
		j = -i;
	else
		j = i;

	for(k := length-1; k >= 0; k--) {
		b[k] = byte (j >> shift);
		if(i < big 0) {
			b[k] = ~b[k];
		}
		shift += 8;
	}
	if(i < big 0)
		b[length -1]++;

	return b;
}

int_to_bytes(i, length : int) : array of byte
{
	b := array[length] of byte;
	shift := 0;
	j : int;
	if(i < 0)
		j = -i;
	else
		j = i;

	for(k := length-1; k >= 0; k--) {
		b[k] = byte ((j >> shift) & 16rFF);
		if(i < 0) {
			b[k] = ~b[k];
		}
		shift += 8;
	}
	if(i < 0)
		b[length -1]++;

	return b;
}

int32_read(io : ref Iobuf) : int
{
	bytes := array[4] of byte;
	io.read(bytes, 4);
	(nil, value) := bytes_to_int(bytes);
	return value;
}

int32_write(io : ref Iobuf, n : int) : int
{
	bytes := int_to_bytes(n, 4);
	return io.write(bytes, 4);
}

int16_read(io : ref Iobuf) : int
{
	bytes := array[2] of byte;
	io.read(bytes, 2);
	(nil, value) := bytes_to_int(bytes);
	return value;
}

int16_write(io : ref Iobuf, n : int) : int
{
	bytes := int_to_bytes(n, 2);
	return io.write(bytes, 2);
}

chomp(io : ref Iobuf) : string
{
	t := io.gets(0);
	if(len(t) > 0)
		t = t[0:len(t) -1];
	return t;
}

Data.extend_bytes(data : self ref Data, n : int) : int
{
	if(data.bytes == nil) {
		data.bytes = array[n] of byte;
		return n;
	}

	bytes := array[len(data.bytes) + n] of byte;
	for(i := 0; i < len(data.bytes); i++)
		bytes[i] = data.bytes[i];
	data.bytes = bytes;
	return len(data.bytes);
}

Data.write(data : self ref Data, txt : string) : int
{
	if(sys == nil)
		sys = load Sys Sys->PATH;
	if(data.bytes == nil)
		data.extend_bytes(3 * len(txt));
	bytes_consumed : int;
	bytes_written := 0;
	for(i := 0; i < len(txt); i++) {
		if(len(data.bytes) - data.ptr > 2)
			bytes_consumed = sys->char2byte(txt[i], data.bytes, data.ptr);
		else
			bytes_consumed = 0;

		if(bytes_consumed == 0)
			break;
		data.ptr += bytes_consumed;
		bytes_written += bytes_consumed;
	}

	if(bytes_consumed == 0) {
		data.extend_bytes(3 * (len(txt) - i));
		bytes_written +=  data.write(txt[i:]);
	}

	return bytes_written;
}

Data.append(data : self ref Data, bytes : array of byte) : int
{
	if(data.bytes == nil)
		data.extend_bytes(len bytes);
	else 
		if (len(data.bytes) - data.ptr < len(bytes))
			data.extend_bytes(len(bytes) - (len(data.bytes) - data.ptr));

	data.bytes[data.ptr:] = bytes;
	data.ptr += len(bytes);
	return len(bytes);
}

Data.md5(data : self ref Data) : string
{
	keyring = load Keyring Keyring->PATH;
	digest := array[16] of byte;
	keyring->md5(data.bytes[0:data.ptr], data.ptr, digest, nil);
	digest_text := "md5";
	for(i:=0; i<16; i++) {
		digest_text += sys->sprint("%02x", int(digest[i]));
	}

	return digest_text;
}

Data.puts(data : self ref Data, txt : string) : int
{
	if(txt == nil) return 0;
	return data.write(txt) + data.write("\0");
}

Data.put_notnil_s(data : self ref Data, key, value : string) : int
{
	if(value != nil)
		return data.puts(key) + data.puts(value);
	return 0;
}

Data.send(data : self ref Data, io : ref Iobuf, tag : int) : int
{
	written := 0;

	if(tag > 0) {
		written += io.putc(tag);
	}

	written += int32_write(io, 4 + data.ptr);
	if(data.bytes != nil && data.ptr <= len(data.bytes))
		written += io.write(data.bytes, data.ptr);
	return written;
}

Data.to_string(data : self ref Data) : string
{
	i, j, c, k : int;
	bytes := int_to_bytes(data.ptr, 4);

	out := "";
	
	for(j = 0; j < 4; j++) {
		out += sys->sprint("%02x ", int(bytes[j]));
	}
	out += sys->sprint("[%d]", data.ptr);
	k = 0;
	for(i = 0; k < data.ptr ; i += 16) {
		out += sys->sprint("\n%02X : ", k);
		for(j = 0; k < data.ptr && j < 16; j++) {
			c = int(data.bytes[k]);
			case c {
			32 to 126  =>
				out += sys->sprint(".%c", c);
			* =>
				out += sys->sprint(".%02X", c);
			}
			k++;
		}
	}
	return out;
}

Parameter.read(p : self ref Parameter, io : ref Iobuf) : int
{
	p.key = chomp(io);
	if(len p.key == 0)
		return 0;

	p.value = chomp(io);
	return 1;
}

Response.read(r : self ref Response, io : ref Iobuf) : int
{
	r.code = io.getc();
	if(r.code == 0)
		return 0;

	r.data = chomp(io);
	return r.code;
}

Recordset.to_string(r : self ref Recordset) : string
{
	str := "Recordset\n";
	str += "	Fields: " + string len r.fields + "\n";
	for(i:=0; i < len(r.fields); i++)
		str += "	" + string i + "	" + r.fields[i].to_string() + "\n";
	numrows := len r.rows;
	str += "	Rows: " + string numrows + "\n";

	numcols := len r.rows[i];
	str += "	Cols: " + string numcols + "\n";
	for(i=0; i < numrows; i++) {
		str += "	" + string i + "	";
		for(j:=0; j < numcols; j++) {
			str += "	";
			if(r.fields[j].format_code == 0)
				for(k:=0; k < len(r.rows[i][j]); k++)
					str += sys->sprint(".%02X", int r.rows[i][j][k]);
			else
				str += "X" + string r.rows[i][j];
		}
		str += "\n";
	}
	return str;
}

read_responses(io : ref Iobuf) : list of ref Response
{
	responses : list of ref Response;

	r := ref Response;

	while(r.read(io)) {
		responses = r :: responses;
		r = ref Response;
	}
	return responses;
}

Field.read(f : self ref Field, io : ref Iobuf)
{
	f.name = chomp(io);
	f.table_oid = int32_read(io);
	f.column_attribute_number = int16_read(io);
	f.data_type_oid = int32_read(io);
	f.data_type_size = int16_read(io);
	f.type_modifier = int32_read(io);
	f.format_code = int16_read(io);
}

Field.to_string(f : self ref Field) : string
{
	fcode : string;
	if(f.format_code == 0)
		fcode = "ascii";
	else
		fcode = "bin";

	fmod : string;
	if(f.type_modifier != -1)
		fmod = "(" + string f.type_modifier + ")";
	else
		fmod = "";

	flen : string;
	if(f.data_type_size == -1)
		flen = "?";
	else
		flen = string f.data_type_size;

	return sys->sprint("\"%s\" oid:%d.%d type %d[%s]%s %s", f.name, f.table_oid, f.column_attribute_number, f.data_type_oid, flen, fmod, fcode);
}

read_fields(io : ref Iobuf) : array of ref Field
{
	field_count := int16_read(io);
	fields := array[field_count] of ref Field;
	for(i:= 0; i < field_count; i++) {
		fields[i] = ref Field;
		fields[i].read(io);
	}
	return fields;
}

Response.to_string(r : self ref Response) : string
{
	case r.code {
	'P' or 'p'=> return "position " + r.data;
	'F'=> return "in file " + r.data;
	'L'=> return ":" + r.data;
	* => return r.data;
	}
}

Parameter.to_string(p : self ref Parameter) : string
{
	return p.key + "=" + string p.value;
}

response_by_code(responses : list of ref Response, code : int) : string
{
	h : ref Response;
	for(t:=responses; t != nil; t =  tl t) {
		h = hd t;
		if(h.code == code) {
			return h.to_string();
			break;
		}
	}
	return nil;
}

Backend_Message.to_string(b_msg : self ref Backend_Message) : string
{
	str : string;
	pick msg := b_msg {
	Error =>
		str = sys->sprint("%s (%s) %s %s%s %s", response_by_code(msg.responses, 'S'), response_by_code(msg.responses, 'C'), response_by_code(msg.responses, 'M'), response_by_code(msg.responses, 'F'), response_by_code(msg.responses, 'L'), response_by_code(msg.responses, 'R') );
		h : int;
		for(t:=list of {'D', 'H', 'P', 'p', 'q', 'W'}; t != nil; t =  tl t) {
			h = hd t;
			s := response_by_code(msg.responses, h);
			if(s != nil) {
				str += sys->sprint(" (%c) %s", h, s);
			}
		}
	Authentication =>
		case msg.auth_type {
		0 => 
			str = "AuthenticationOK";
		2 => 
			str = "Kerberos V5 requested";
		3 => 
			str = "Clear Text  requested";
		4 => 
			str = sys->sprint("Crypt requested salt %02X%02X", int(msg.salt[0]), int(msg.salt[1]));
		5 => 
			str = sys->sprint("MD5 requested salt %02X%02X%02X%02X", int(msg.salt[0]), int(msg.salt[1]), int(msg.salt[2]), int(msg.salt[3]));
		6 => 
			str = "SCM Credential requested";
		* =>
			str = "Unknown Auth Type";
		}
	ReadyForQuery =>
		str = "ReadyForQuery";
	RowDescription =>
		str = sys->sprint("Row Description : %d columns\n", len(msg.fields));
		for(i:=0; i < len(msg.fields); i++) {
			str += sys->sprint("	%s\n", msg.fields[i].to_string());
		}
	DataRow =>
		str = sys->sprint("Data Row column(s):%d \n", len(msg.columns));
		for(i:=0; i<len(msg.columns); i++) {
			str += sys->sprint("Column %02d\n", i);
			if(msg.columns[i] == nil) {
				str += "NULL\n";
			} else {
				d := ref Data;
				d.bytes = msg.columns[i];
				d.ptr = len(d.bytes);
				str += sys->sprint("%s\n", d.to_string());
			}
		}
	CopyData =>
		str = "Copy Data\n";
		if(msg.data == nil)
			str += "NULL";
		else {
			d := ref Data(msg.data, len msg.data);
			str += d.to_string();
		}
	CopyDone =>
		str = "Copy Done";
	CopyInResponse => # G
		str = sys->sprint("Copy In Response copy format:%d columns:%d\n", int msg.copy_format, len msg.format_codes);
		for(i:=0; i<len(msg.format_codes); i++)
			str += sys->sprint("Column %02d: %d\n", i, msg.format_codes[i]);
	CopyOutResponse => # H
		str = sys->sprint("Copy Out Response copy format:%d columns:%d\n", int msg.copy_format, len msg.format_codes);
		for(i:=0; i<len(msg.format_codes); i++)
			str += sys->sprint("Column %02d: %d\n", i, msg.format_codes[i]);
	CommandComplete => # C
		str = msg.cmd + " oid:" + string msg.oid + " " + string msg.rows + " row(s)";
	ParseComplete => # 1
		str = "Parse Complete";
	BindComplete => # 2
		str = "Bind Complete";
	CloseComplete => # 3
		str = "Close Complete";
	PortalSuspended => # s
		str = "Portal Suspended";
	EmptyQueryResponse => # l
		str = "Empty Query Response";
	NoData =>
		str = "No Data";
	ParameterDescription =>
		str = sys->sprint("Parameters %d\n", len(msg.oids));
		for(i:=0; i<len(msg.oids); i++)
			str += sys->sprint("	%d oid: %d\n", i, msg.oids[i]);
	Unknown =>
		str = sys->sprint("Unknown TAG %c:\n", msg.tag);
		d := ref Data(nil, 0);
		d.append(msg.data);
		str += sys->sprint("%s\n", d.to_string());
	}

	return str;
}

Connection.connect(connection: self ref Connection, ip, port, options, parameters : string) : int
{
	sys = load Sys Sys->PATH;
	bufio = load Bufio Bufio->PATH;
	
	addr := sys->sprint("tcp!%s!%s", ip, port);
	(i, c) := sys->dial(addr, nil);
	if(i == -1) {
		return 0;
	}

	connection.status = 1;
	connection.fd = c.dfd;

	connection.rx = chan of ref Backend_Message;
	connection.tx = chan of ref Frontend_Message;
	connection.notices = chan of ref Backend_Message;
	spawn deal_with_notices(connection);
	spawn deal_with_incoming(connection);
	spawn deal_with_outgoing(connection);
	
	connection.tx <-= ref Frontend_Message.StartupMessage(0, 196608, connection.user, connection.database, options, parameters);

	for(msg := <- connection.rx; msg != nil && msg.tag != 'Z'; msg = <- connection.rx);
	return 1;
}

Connection.set_parameter(c: self ref Connection, p : ref Parameter)
{
	h : ref Parameter;
	for(t:=c.parameters; t != nil; t =  tl t) {
		h = hd t;
		if(h.key == p.key) {
			h.value = p.value;
			break;
		}
	}
	if(t == nil) {
		c.parameters = p :: c.parameters;
	}
}

Connection.query(c: self ref Connection, sql : string) : ref Recordset
{
	c.tx <-= ref Frontend_Message.Query('Q', sql);
	r := ref Recordset(nil, nil);

	rows : list of array of array of byte = nil;

	for(msg := <- c.rx; msg.tag != 'Z'; msg = <- c.rx) {
		pick m := msg {
		RowDescription =>
			r.fields = m.fields;
		DataRow =>
			rows = m.columns :: rows;
		}
	}
	r.rows = array[len(rows)] of array of array of byte;
	for(i := len(rows) -1 ; i >= 0; i--) {
		r.rows[i] = hd rows;
		rows = tl rows;
	}

	return r;
}

Connection.parse(c: self ref Connection, name, query : string, data_type_oids : array of int) : int
{
	c.tx <-= ref Frontend_Message.Parse('P', name, query, data_type_oids);
	c.tx <- = ref Frontend_Message.Sync('S');
	for(m := <- c.rx; m.tag != 'E' && m.tag != '1'; m = <- c.rx)
		c.notices <- = m;

	if(m.tag == '1')
		for(m = <- c.rx; m.tag != 'E' && m.tag != 'Z'; m = <- c.rx)
			c.notices <- = m;

	return m.tag == 'Z';
}

Connection.describe(c: self ref Connection, item_type : byte, name : string)
{
	c.tx <- = ref Frontend_Message.Describe('D', item_type, name);
}

Connection.execute(c: self ref Connection, portal, name : string, parameter_format_codes : array of int, parameters : array of array of byte, result_format_codes : array of int, rows_to_return : int) : ref Recordset
{
	recordset := ref Recordset(nil, nil);

	pfc : array of int;
	if(parameter_format_codes == nil)
		pfc = array[0] of int;
#	else
#		parameter_format_codes = pfc;

	c.tx <-= ref Frontend_Message.Bind('B', portal, name, pfc, parameters, result_format_codes);
	c.tx <-= ref Frontend_Message.Flush('H');

	for(msg := <- c.rx; msg.tag != '2' && msg.tag != 'E'; msg = <-c.rx) {
		pick m := msg {
		RowDescription =>
			recordset.fields = m.fields;
		}
	}

	c.tx <-= ref Frontend_Message.Describe('D', byte 'S', name);
	c.tx <-= ref Frontend_Message.Flush('H');

	# a 't' followed by a 'T'
	for(msg = <- c.rx; msg.tag == 't' || msg.tag == 'T'; msg = <-c.rx) {
		pick m := msg {
		RowDescription =>
			recordset.fields = m.fields;
		}
		if(msg.tag == 'T')
			break;
	}

	if(msg.tag != 'T') return nil;

	c.tx <-= ref Frontend_Message.Execute('E', portal, rows_to_return);
	c.tx <-= ref Frontend_Message.Sync('S');
	rows : list of array of array of byte = nil;
	for(msg = <- c.rx; msg.tag != 'C' && msg.tag != 'E' && msg.tag != 'Z'; msg = <-c.rx) {
		pick m := msg {
		DataRow =>
			rows = m.columns :: rows;
		}
	}
	recordset.rows = array[len(rows)] of array of array of byte;

	i := len(rows) -1;
	while(rows != nil) {
		recordset.rows[i--] = hd rows;
		rows = tl rows;
	}

	return recordset;
}

Connection.disconnect(c: self ref Connection)
{
	c.status = 0;
	c.tx <-= ref Frontend_Message.Terminate;
	c.tx <-= nil;
	c.notices <- = nil;
}

peek_at_data(io : ref Iobuf, size : int)
{
	data := ref Data(array[size] of byte, size);
	io.seek(big(-5), bufio->SEEKRELA);
	tag := io.getc();
	io.seek(big(4), bufio->SEEKRELA);
	data.ptr = io.read(data.bytes, size);
	io.seek(big(-data.ptr), bufio->SEEKRELA);
	sys->print("RX '%c' %s\n", tag, data.to_string());
}

deal_with_outgoing(c : ref Connection)
{
	Iobuf : import bufio;
	d : ref Data;
	d = nil;
	tag : int;

	io := bufio->fopen(c.fd, Bufio->OWRITE);

	for(f_msg := <- c.tx; f_msg != nil; f_msg = <- c.tx) {
		pick msg := f_msg {
		StartupMessage =>
			tag = msg.tag;
			if(msg.user == nil)
				return;
			d = ref Data(int_to_bytes(16r00030000, 4), 4);
			d.put_notnil_s("user", msg.user);
			d.put_notnil_s("database", msg.database);
			d.put_notnil_s("options", msg.options);
			d.puts(msg.parameters);
			d.write("\0");
		PasswordMessage =>
			tag = msg.tag;
			d = ref Data(nil, 0);
			d.puts(msg.password);
		Query =>
			tag = msg.tag;
			d = ref Data(nil, 0);
			d.puts(msg.sql);
		CopyData =>
			tag = msg.tag;
			if(msg.data == nil)
				d = ref Data(nil, 0);
			else
				d = ref Data(msg.data, len msg.data);
		CopyFail =>
			tag = msg.tag;
			d = ref Data(nil, 0);
		CopyDone =>
			tag = msg.tag;
			d = ref Data(nil, 0);
		Parse =>
			tag = msg.tag;
			d = ref Data(nil, 0);
			if(msg.name == nil)
				d.write("\0");
			else
				d.puts(msg.name);
			d.puts(msg.query);
			d.append(int_to_bytes(len msg.data_type_oids, 2));
			for(i:= 0; i < len(msg.data_type_oids); i++)
				d.append(int_to_bytes(msg.data_type_oids[i], 4));
		Bind =>
			tag = msg.tag;
			d = ref Data(nil, 0);
			if(msg.portal == "") 
				d.write("\0");
			else
				d.puts(msg.portal);

			if(msg.name == "")
				d.write("\0");
			else
				d.puts(msg.name);
			d.append(int_to_bytes(len msg.parameter_format_codes, 2));
			i : int;
			for(i = 0; i < len msg.parameter_format_codes; i++)
				d.append(int_to_bytes(msg.parameter_format_codes[i], 2));
			d.append(int_to_bytes(len msg.parameters, 2));
			for(i = 0; i < len msg.parameters; i++) {
				if(msg.parameters[i] == nil)
					d.append(int_to_bytes(-1, 4));
				else {
					d.append(int_to_bytes(len msg.parameters[i], 4));
					d.append(msg.parameters[i]);
				}
			}
			d.append(int_to_bytes(len msg.result_format_codes, 2));
			for(i = 0; i < len msg.result_format_codes; i++)
				d.append(int_to_bytes(msg.result_format_codes[i], 2));
		Execute =>
			tag = msg.tag;
			d = ref Data(nil, 0);
			if(msg.portal == nil)
				d.write("\0");
			else
				d.puts(msg.portal);
			d.append(int_to_bytes(msg.rows_to_return, 4));
		Sync =>
			tag = msg.tag;
			d = ref Data(nil, 0);
		Describe =>
			tag = msg.tag;
			d = ref Data(array[1] of byte, 1);
			d.bytes[0] = msg.item_type;
			if(msg.name == nil)
				d.write("\0");
			else
				d.puts(msg.name);
		Close =>
			tag = msg.tag;
			d = ref Data(array[1] of byte, 1);
			d.bytes[0] = msg.item_type;
			d.puts(msg.name);
		Flush =>
			tag = msg.tag;
			d = ref Data(nil, 0);
		Terminate =>
			tag = msg.tag;
			d = ref Data(nil, 0);
		}	

		if(d != nil) {
			written := d.send(io, tag);
			io.flush();

			if(debug)
				sys->print("TX %d '%c': %s\n\n", written, tag, d.to_string());
		} else {
			if(debug)
				sys->print("Outgoing data was nil, not sent\n");
		}
	}	
}

deal_with_incoming(c : ref Connection) 
{
	Iobuf : import bufio;
	msg : ref Backend_Message;
	size : int;

	io := bufio->fopen(c.fd, Bufio->OREAD);
	for(tag := io.getc(); tag > 0 && c.status > 0; tag = io.getc()) {
		size = int32_read(io) - 4; # includes self

		if(debug)
			peek_at_data(io, size);

		msg = nil;
		case tag  {
		'E' =>
			msg = ref Backend_Message.Error(tag, read_responses(io));
			c.notices <-= msg;
		'N' =>
			msg = ref Backend_Message.NoticeResponse(tag, read_responses(io));
			c.notices <-= msg;
		'R' =>
			auth_type := int32_read(io);
			salt : array of byte = nil;
			case auth_type {
			3 =>
				c.tx <-= ref Frontend_Message.PasswordMessage('p', c.password);
			4 =>
				salt = array[2] of byte;
				io.read(salt, 2);
			5 => # 'md5' || md5(md5(password || username) || salt)
				salt = array[4] of byte;
				io.read(salt, 4);
				d := ref Data(nil, 0);
				d.write(c.password);
				d.write(c.user);
				un_pw_digest := d.md5()[3:];
				d = ref Data(nil, 0);
				d.write(un_pw_digest);
				d.append(salt);
				c.tx <-= ref Frontend_Message.PasswordMessage('p', d.md5());
			}
			msg = ref Backend_Message.Authentication(tag, auth_type, salt);
		'S' =>
			p := ref Parameter;
			if(p.read(io) > 0)
				c.set_parameter(p);
		'K' =>
			c.process_id = int32_read(io);
			c.key = int32_read(io);
		'Z' =>
			msg = ref Backend_Message.ReadyForQuery(tag,io.getc());
		'T' =>
			msg = ref Backend_Message.RowDescription(tag,read_fields(io));
		'D' =>
			num_cols := int16_read(io);
			columns := array[num_cols] of array of byte;
			for(i:= 0; i<num_cols; i++) {
				data_size := int32_read(io);
				if(data_size >= 0) {
					columns[i] = array[data_size] of byte;
					if(data_size > 0)
						io.read(columns[i], data_size);
				} else {
					columns[i] = nil;
				}
			}
			msg = ref Backend_Message.DataRow(tag, columns);
		'd' =>
			
			data := array[size] of byte;
			io.read(data, size);
			msg = ref Backend_Message.CopyData(tag, data);
		'c' =>
			msg = ref Backend_Message.CopyDone(tag);
		'G' =>
			copy_format := byte io.getc();
			column_count := int16_read(io);
			format_codes := array[column_count] of int;
			for(i := 0; i < column_count; i++)
				format_codes[i] = int16_read(io);
			msg = ref Backend_Message.CopyInResponse(tag, copy_format, format_codes);
		'H' =>
			copy_format := byte io.getc();
			column_count := int16_read(io);
			format_codes := array[column_count] of int;
			for(i := 0; i < column_count; i++)
				format_codes[i] = int16_read(io);
			msg = ref Backend_Message.CopyOutResponse(tag, copy_format, format_codes);
			
		'C' =>
			oid, rows : int;
			
			(numwords, words) := sys->tokenize(chomp(io), " ");
			cmd := hd words;
			words = tl words;
			case numwords {
			3 =>
				oid = int hd words;
				rows = int hd tl words;
			2 =>
				oid = 0;
				rows = int hd words;
			* =>
				oid = 0;
				rows = 0;
			}

			msg = ref Backend_Message.CommandComplete(tag, cmd, oid, rows);
			c.notices <-= msg;
			msg = nil;
		'1' =>
			msg = ref Backend_Message.ParseComplete(tag);
		'2' =>
			msg = ref Backend_Message.BindComplete(tag);
		'3' =>
			msg = ref Backend_Message.CloseComplete(tag);
		'I' =>
			msg = ref Backend_Message.EmptyQueryResponse(tag);
		's' =>
			msg = ref Backend_Message.PortalSuspended(tag);
		'n' =>
			msg = ref Backend_Message.NoData(tag);
		't' =>
			oid_count := int16_read(io);
			oids := array[oid_count] of int;
			for(i := 0; i < oid_count; i++)
				oids[i] = int32_read(io);
			msg = ref Backend_Message.ParameterDescription(tag, oids);
		* =>
			data := array[size] of byte;
			io.read(data, size);
			msg = ref Backend_Message.Unknown(tag, data);
		}

		if(msg != nil) {
			if(debug)
				sys->print("%s\n\n", msg.to_string());
			c.rx <-= msg;
		}
	}
}

deal_with_notices(c : ref Connection)
{
	stderr := sys->fildes(2);

	for(msg := <-c.notices; msg != nil; msg = <-c.notices) {
		pick m:= msg {
		Error =>
			if(response_by_code(m.responses, 'S') == "FATAL") {
				c.disconnect();
			}
		}
		if(debug)
			sys->fprint(stderr, "%s\n", msg.to_string());
	}
}

Bell Labs OSI certified Powered by Plan 9

(Return to Plan 9 Home Page)

Copyright © 2021 Plan 9 Foundation. All Rights Reserved.
Comments to [email protected].