#include <u.h>
#include <libc.h>
#include <ctype.h>
#include "dat.h"
#include "fns.h"
/* listens on ctlfd for messages - hub commands are sent to parser */
void
ctlsrv(int ctlfd)
{
char ctlbuf[SMBUF];
char msgbuild[SMBUF];
int cn;
memset(msgbuild, 0, SMBUF);
while(masterswitch != STOP){
memset(ctlbuf, 0, SMBUF);
cn = read(ctlfd, ctlbuf, SMBUF);
if(cn == 0){
print("\nzero length read on ctlfd\n");
}
if(cn < 0){
print("\n\nctlfd read error!");
sysfatal("ctl problems!");
}
if(strncmp(ctlbuf, "h", 1) == 0){
if(rfork(RFPROC|RFMEM|RFNOWAIT) == 0){
if(verbosity==UP){
print("-hub command detected forking command parser-");
}
parsecmd(ctlbuf);
if(verbosity==UP){
print("-command handled successfully\n");
}
exits(nil);
}
continue;
}
if(strncmp(ctlbuf, "debug", 5) == 0){
print("\ndebugging output on\n");
verbosity = UP;
continue;
}
if(strncmp(ctlbuf, "shut", 4) == 0){
print("\ndebugging output off\n");
verbosity = DOWN;
continue;
}
if(strncmp(ctlbuf, "fear", 4) == 0){
print("\nPARANOIA ACTIVATED READER SPEED LOCKED to WRITERS\n");
paranoia = UP;
continue;
}
if(strncmp(ctlbuf, "calm" ,4) == 0){
print("\nreaders may proceed at will and the writers will follow\n");
paranoia = DOWN;
continue;
}
if(strncmp(ctlbuf, "quit", 4) == 0){
print("\nQUITTING\n");
for(int i=0; i < HUBNUM; i++){
sprint(msgbuild, "h%dx", i);
if(rfork(RFPROC|RFMEM|RFNOWAIT) == 0){
print("\nsending command %s", msgbuild);
parsecmd(msgbuild);
exits(nil);
}
sleep(100);
}
masterswitch = STOP;
continue;
}
print("\nunsupported control message\n");
}
print("\nexiting control server\n");
return;
}
/* zeroes out a hub to prepare it for use */
void
zerohub(Hub *h)
{
if(verbosity == UP){
print("-zerohub target Hub is at %p-", h);
}
memset(h->name, 0, SMBUF);
h->hubstate = STOP;
h->sleeptime = sleepdefault;
h->infdcount = 0;
h->outfdcount = 0;
h->inqcount = 0;
memset(h->bucket, 0, BUCKSIZE);
memset(h->inqsize, 0, (MAXQ * sizeof(int)));
h->inbuckp = &h->bucket[0];
for(int i=0; i < MAXFDS; i++){
h->msgptr[i] = 0;
h->outq[i] = 0;
h->infds[i] = -1;
h->outfds[i] = -1;
h->infdpids[i] = 0;
h->outfdpids[i] = 0;
h->infdstat[i] = GO;
h->outfdstat[i] = GO;
if(h->wroblck[i].locked != 0){
print("-%s wroblock[%d] not 0\n", h->name, i);
}
}
if((h->iblck.locked != 0) || (h->entwrlck.locked !=0)){
print("-%s locks are not zero!\n", h->name);
}
if(verbosity == UP){
print("-zeroed hub-");
}
return;
}
/* sets up a hub with given name and input and forks initial hubsrv */
Hub*
starthub(int hnum, char *name, char *firstin)
{
if((hnum < 0) || (hnum > HUBNUM - 1)){
print("bad hub number!\n");
return nil;
}
Hubs[hnum] = (Hub*)malloc(sizeof(Hub));
if(verbosity == UP){
print("\nmalloced Hub %d at %p\n", hnum, Hubs[hnum]);
}
zerohub(Hubs[hnum]);
strcat(Hubs[hnum]->name, name);
newin(Hubs[hnum], firstin);
Hubs[hnum]->hubstate = GO;
hubmon[hnum] = GO;
if(rfork(RFPROC|RFMEM|RFNOWAIT|RFNOTEG) == 0){
hubsrv(Hubs[hnum]);
exits(nil);
}
return Hubs[hnum];
}
/* parsecmd is what controls almost everything after the initial startup */
/* the most important actions are starting new hubs and attaching new outputs and inputs to existing ones */
/*command format h(hub#)[ceiostfkxvyz](parameter #) */
void
parsecmd(char *ctlbuf)
{
int h;
int procid;
int tmpsleep;
int tmpnum;
int offset;
int cmd;
int para;
offset = 0;
char cmdtmp[SMBUF];
char hubname[SMBUF];
memset(hubname, 0, SMBUF);
memset(cmdtmp, 0, SMBUF);
if(isdigit(*(ctlbuf+1))){
h=atoi(ctlbuf+1);
} else {
print("hub specification needs to be numeric!\n");
return;
}
if(h>9){
offset++;
}
cmd = 2 + offset;
para = 3 + offset;
if(strncmp(ctlbuf+cmd, "t", 1) == 0){
if(hubmon[h] != GO){
print("-hub inactive, skipping-");
return;
}
if(isdigit(*(ctlbuf+para))){
tmpsleep = atoi(ctlbuf+para);
} else {
print("command error: sleeptime parameter should be a number.\n");
return;
}
print("\nsetting sleeptime on hub %d to %d\n", h, tmpsleep);
Hubs[h]->sleeptime = tmpsleep;
return;
}
if(strncmp(ctlbuf+cmd, "c", 1) ==0){
if(hubmon[h] != GO){
print("-hub inactive, skipping-");
return;
}
print("\nclearing Hub %d message queue and log\n", h);
qlock(&Hubs[h]->iblck);
sleep(500);
print("-locking entwrlck-");
canqlock(&Hubs[h]->entwrlck);
Hubs[h]->inqcount = 0;
for(int i=0; i < MAXFDS; i++){
Hubs[h]->outq[i] = 0;
Hubs[h]->msgptr[i] = 0;
}
Hubs[h]->inbuckp = &Hubs[h]->bucket[0];
qunlock(&Hubs[h]->entwrlck);
qunlock(&Hubs[h]->iblck);
return;
}
if(strncmp(ctlbuf+cmd, "s", 1) == 0){
if(isdigit(*(ctlbuf+para))){
tmpnum = atoi(ctlbuf+para);
} else {
print("command error: start hub parameter should be a number.\n");
return;
}
sprint(cmdtmp, "%s/H%din%d", dirname, h, tmpnum);
sprint(hubname, "H%d", h);
print("\nmaking Hub %d %s first input %s\n", h, hubname, cmdtmp);
starthub(h, hubname, cmdtmp);
return;
}
if(strncmp(ctlbuf+cmd, "o", 1) == 0){
if(hubmon[h] != GO){
print("-hub inactive, skipping-");
return;
}
if(isdigit(*(ctlbuf+para))){
tmpnum = atoi(ctlbuf+para);
} else {
print("command error: parameter should be a number.\n");
return;
}
print("\nadding new output to hub %d\n", h);
sprint(cmdtmp, "%s/H%dout%d", dirname, h, tmpnum);
newout(Hubs[h], cmdtmp);
if((procid=rfork(RFPROC|RFMEM|RFNOWAIT|RFNOTEG)) == 0){
print("-%s forked wrproc-", Hubs[h]->name);
wrproc(Hubs[h], (Hubs[h]->outfdcount - 1), Hubs[h]->outfds[(Hubs[h]->outfdcount -1)]);
}
Hubs[h]->outfdpids[(Hubs[h]->outfdcount -1)] = procid;
return;
}
if(strncmp(ctlbuf+cmd, "e", 1) == 0){
if(hubmon[h] != GO){
print("-hub inactive, skipping-");
return;
}
if(isdigit(*(ctlbuf+para))){
tmpnum = atoi(ctlbuf+para);
} else {
print("command error: parameter should be a number.\n");
return;
}
print("\nadding new err output to hub %d\n", h);
sprint(cmdtmp, "%s/H%derr%d", dirname, h, tmpnum);
newout(Hubs[h], cmdtmp);
if((procid=rfork(RFPROC|RFMEM|RFNOWAIT|RFNOTEG)) == 0){
print("-%s forked wrproc-", Hubs[h]->name);
wrproc(Hubs[h], (Hubs[h]->outfdcount - 1), Hubs[h]->outfds[(Hubs[h]->outfdcount -1)]);
}
Hubs[h]->outfdpids[(Hubs[h]->outfdcount -1)] = procid;
return;
}
if(strncmp(ctlbuf+cmd, "i", 1) == 0){
if(hubmon[h] != GO){
print("-hub inactive, skipping-");
return;
}
if(isdigit(*(ctlbuf+para))){
tmpnum = atoi(ctlbuf+para);
} else {
print("command error: add input parameter should be a number.\n");
return;
}
print("\nadding new input to hub %d\n", h);
sprint(cmdtmp, "%s/H%din%d", dirname, h, tmpnum);
newin(Hubs[h], cmdtmp);
if((procid=rfork(RFPROC|RFMEM|RFNOWAIT|RFNOTEG)) == 0){
print("-%s forked rdproc-",Hubs[h]->name);
rdproc(Hubs[h], (Hubs[h]->infdcount - 1), Hubs[h]->infds[(Hubs[h]->infdcount - 1)]);
}
Hubs[h]->infdpids[(Hubs[h]->infdcount - 1)] = procid;
return;
}
if(strncmp(ctlbuf+cmd, "f", 1) == 0){
if(isdigit(*(ctlbuf+para))){
tmpnum=atoi(ctlbuf+para);
} else {
print("command error: extra fd release parameter should be a number\n");
return;
}
close(extrafds[tmpnum]);
print("extrafd[%d] closed \n", tmpnum);
return;
}
if(strncmp(ctlbuf+cmd, "k", 1) == 0){
if(isdigit(*(ctlbuf+para))){
tmpnum=atoi(ctlbuf+para);
} else {
print("command error: fd kill parameter should be a number\n");
return;
}
Hubs[h]->outfdstat[tmpnum] = STOP;
close(Hubs[h]->outfds[tmpnum]);
if(Hubs[h]->wroblck[tmpnum].locked ==1){
qunlock(&Hubs[h]->wroblck[tmpnum]);
}
print("H%d outfd[%d] closed\n", h, tmpnum);
return;
}
if(strncmp(ctlbuf+cmd, "x", 1) == 0){
if(hubmon[h] != GO){
print("-hub inactive, skipping-");
return;
}
print("\n\nresetting hub %d %d infds %d outfds\n", h, Hubs[h]->infdcount, Hubs[h]->outfdcount);
for(int i=0; i < Hubs[h]->infdcount; i++){
if(rfork(RFPROC|RFMEM|RFNOWAIT) == 0){
Hubs[h]->infdstat[i] = STOP;
close(Hubs[h]->infds[i]);
sleep(100);
sprint(cmdtmp, "h%dy%d", h, i);
parsecmd(cmdtmp);
sleep(100);
exits(nil);
}
}
for(int i=0; i < Hubs[h]->outfdcount; i++){
if(rfork(RFPROC|RFMEM|RFNOWAIT) == 0){
Hubs[h]->outfdstat[i] = STOP;
close(Hubs[h]->outfds[i]);
if(Hubs[h]->wroblck[i].locked == 1){
qunlock(&Hubs[h]->wroblck[i]);
}
sleep(100);
sprint(cmdtmp, "h%dz%d", h, i);
parsecmd(cmdtmp);
sleep(100);
exits(nil);
}
}
Hubs[h]->hubstate = STOP;
hubmon[h] = STOP;
if(Hubs[h]->entwrlck.locked == 1){
qunlock(&Hubs[h]->entwrlck);
}
return;
}
if(strncmp(ctlbuf+cmd, "v", 1) == 0){
print("\nH%d status:\n", h);
print("sleeptime %d infdcount %d outfdcount %d inbuckp %d inqcount %d\n", Hubs[h]->sleeptime, Hubs[h]->infdcount, Hubs[h]->outfdcount, (int)Hubs[h]->inbuckp, Hubs[h]->inqcount);
return;
}
if(strncmp(ctlbuf+cmd, "y", 1) == 0){
if(isdigit(*(ctlbuf+para))){
tmpnum=atoi(ctlbuf+para);
} else {
print("command error: fd kill parameter should be a number\n");
return;
}
sprint(cmdtmp, "/proc/%d/note", Hubs[h]->infdpids[tmpnum]);
tmpnum=open(cmdtmp, OWRITE);
if(tmpnum < 0){
print("cant open proc\n");
return;
}
fprint(tmpnum, "kill\n");
close(tmpnum);
print("sent kill to rdproc %s\n", cmdtmp);
return;
}
if(strncmp(ctlbuf+cmd, "z", 1) == 0){
if(isdigit(*(ctlbuf+para))){
tmpnum=atoi(ctlbuf+para);
} else {
print("command error: fd kill parameter should be a number\n");
return;
}
sprint(cmdtmp, "/proc/%d/note", Hubs[h]->outfdpids[tmpnum]);
tmpnum=open(cmdtmp, OWRITE);
if(tmpnum < 0){
print("cant open proc\n");
return;
}
fprint(tmpnum, "kill\n");
close(tmpnum);
print("sent kill to wrproc %s\n", cmdtmp);
return;
}
print("\nno command match found\n");
return;
}
/* Hub command mnemonics:
* t - time
* c - clear
* s - start
* o - output
* e - err
* i - input
* f - free held fd
* k - kill fd
* x - xit
* v - view
* y - kill rdproc
* z - kill wrproc
* all commands except c, x, v take a 4th numeric character of input
* SAMPLE COMMAND: h3o3 "on Hub[3] create output file descriptor pipe labeled H3out3"
*/
|