#ifdef PLAN9PORT
#include <u.h>
#include <signal.h>
#endif
#include "stdinc.h"
#include <bio.h>
#include "dat.h"
#include "fns.h"
#include "whack.h"
typedef struct Allocs Allocs;
struct Allocs {
u32int mem;
u32int bcmem;
u32int icmem;
u32int stfree; /* free memory at start */
uint mempcnt;
};
int debug;
int nofork;
int mainstacksize = 256*1024;
VtSrv *ventisrv;
static void ventiserver(void*);
static ulong
freemem(void)
{
int nf, pgsize = 0;
uvlong size, userpgs = 0, userused = 0;
char *ln, *sl;
char *fields[2];
Biobuf *bp;
size = 64*1024*1024;
bp = Bopen("#c/swap", OREAD);
if (bp != nil) {
while ((ln = Brdline(bp, '\n')) != nil) {
ln[Blinelen(bp)-1] = '\0';
nf = tokenize(ln, fields, nelem(fields));
if (nf != 2)
continue;
if (strcmp(fields[1], "pagesize") == 0)
pgsize = atoi(fields[0]);
else if (strcmp(fields[1], "user") == 0) {
sl = strchr(fields[0], '/');
if (sl == nil)
continue;
userpgs = atoll(sl+1);
userused = atoll(fields[0]);
}
}
Bterm(bp);
if (pgsize > 0 && userpgs > 0 && userused > 0)
size = (userpgs - userused) * pgsize;
}
/* cap it to keep the size within 32 bits */
if (size >= 3840UL * 1024 * 1024)
size = 3840UL * 1024 * 1024;
return size;
}
static void
allocminima(Allocs *all) /* enforce minima for sanity */
{
if (all->icmem < 6 * 1024 * 1024)
all->icmem = 6 * 1024 * 1024;
if (all->mem < 1024 * 1024 || all->mem == Unspecified) /* lumps */
all->mem = 1024 * 1024;
if (all->bcmem < 2 * 1024 * 1024)
all->bcmem = 2 * 1024 * 1024;
}
/* automatic memory allocations sizing per venti(8) guidelines */
static Allocs
allocbypcnt(u32int mempcnt, u32int stfree)
{
u32int avail;
vlong blmsize;
Allocs all;
static u32int free;
all.mem = Unspecified;
all.bcmem = all.icmem = 0;
all.mempcnt = mempcnt;
all.stfree = stfree;
if (free == 0)
free = freemem();
blmsize = stfree - free;
if (blmsize <= 0)
blmsize = 0;
avail = ((vlong)stfree * mempcnt) / 100;
if (blmsize >= avail || (avail -= blmsize) <= (1 + 2 + 6) * 1024 * 1024)
fprint(2, "%s: bloom filter bigger than mem pcnt; "
"resorting to minimum values (9MB total)\n", argv0);
else {
if (avail >= 3840UL * 1024 * 1024)
avail = 3840UL * 1024 * 1024; /* sanity */
avail /= 2;
all.icmem = avail;
avail /= 3;
all.mem = avail;
all.bcmem = 2 * avail;
}
return all;
}
/*
* we compute default values for allocations,
* which can be overridden by (in order):
* configuration file parameters,
* command-line options other than -m, and -m.
*/
static Allocs
sizeallocs(Allocs opt, Config *cfg)
{
Allocs all;
/* work out sane defaults */
all = allocbypcnt(20, opt.stfree);
/* config file parameters override */
if (cfg->mem && cfg->mem != Unspecified)
all.mem = cfg->mem;
if (cfg->bcmem)
all.bcmem = cfg->bcmem;
if (cfg->icmem)
all.icmem = cfg->icmem;
/* command-line options override */
if (opt.mem && opt.mem != Unspecified)
all.mem = opt.mem;
if (opt.bcmem)
all.bcmem = opt.bcmem;
if (opt.icmem)
all.icmem = opt.icmem;
/* automatic memory sizing? */
if(opt.mempcnt > 0)
all = allocbypcnt(opt.mempcnt, opt.stfree);
allocminima(&all);
return all;
}
void
usage(void)
{
fprint(2, "usage: venti [-Ldrsw] [-a ventiaddr] [-c config] "
"[-h httpaddr] [-m %%mem] [-B blockcachesize] [-C cachesize] [-I icachesize] "
"[-W webroot]\n");
threadexitsall("usage");
}
void
threadmain(int argc, char *argv[])
{
char *configfile, *haddr, *vaddr, *webroot;
u32int mem, icmem, bcmem, minbcmem, mempcnt, stfree;
Allocs allocs;
Config config;
traceinit();
threadsetname("main");
mempcnt = 0;
vaddr = nil;
haddr = nil;
configfile = nil;
webroot = nil;
mem = Unspecified;
icmem = 0;
bcmem = 0;
ARGBEGIN{
case 'a':
vaddr = EARGF(usage());
break;
case 'B':
bcmem = unittoull(EARGF(usage()));
break;
case 'c':
configfile = EARGF(usage());
break;
case 'C':
mem = unittoull(EARGF(usage()));
break;
case 'D':
settrace(EARGF(usage()));
break;
case 'd':
debug = 1;
nofork = 1;
break;
case 'h':
haddr = EARGF(usage());
break;
case 'm':
mempcnt = atoi(EARGF(usage()));
if (mempcnt <= 0 || mempcnt >= 100)
usage();
break;
case 'I':
icmem = unittoull(EARGF(usage()));
break;
case 'L':
ventilogging = 1;
break;
case 'r':
readonly = 1;
break;
case 's':
nofork = 1;
break;
case 'w': /* compatibility with old venti */
queuewrites = 1;
break;
case 'W':
webroot = EARGF(usage());
break;
default:
usage();
}ARGEND
if(argc)
usage();
if(!nofork)
rfork(RFNOTEG);
#ifdef PLAN9PORT
{
/* sigh - needed to avoid signals when writing to hungup networks */
struct sigaction sa;
memset(&sa, 0, sizeof sa);
sa.sa_handler = SIG_IGN;
sigaction(SIGPIPE, &sa, nil);
}
#endif
ventifmtinstall();
trace(TraceQuiet, "venti started");
fprint(2, "%T venti: ");
if(configfile == nil)
configfile = "venti.conf";
/* remember free memory before initventi & loadbloom, for auto-sizing */
stfree = freemem();
fprint(2, "conf...");
if(initventi(configfile, &config) < 0)
sysfatal("can't init server: %r");
/*
* load bloom filter
*/
if(mainindex->bloom && loadbloom(mainindex->bloom) < 0)
sysfatal("can't load bloom filter: %r");
/*
* size memory allocations; assumes bloom filter is loaded
*/
allocs = sizeallocs((Allocs){mem, bcmem, icmem, stfree, mempcnt},
&config);
mem = allocs.mem;
bcmem = allocs.bcmem;
icmem = allocs.icmem;
fprint(2, "%s: mem %,ud bcmem %,ud icmem %,ud...",
argv0, mem, bcmem, icmem);
/*
* default other configuration-file parameters
*/
if(haddr == nil)
haddr = config.haddr;
if(vaddr == nil)
vaddr = config.vaddr;
if(vaddr == nil)
vaddr = "tcp!*!venti";
if(webroot == nil)
webroot = config.webroot;
if(queuewrites == 0)
queuewrites = config.queuewrites;
if(haddr){
fprint(2, "httpd %s...", haddr);
if(httpdinit(haddr, webroot) < 0)
fprint(2, "warning: can't start http server: %r");
}
fprint(2, "init...");
/*
* lump cache
*/
if(0) fprint(2, "initialize %d bytes of lump cache for %d lumps\n",
mem, mem / (8 * 1024));
initlumpcache(mem, mem / (8 * 1024));
/*
* index cache
*/
initicache(icmem);
initicachewrite();
/*
* block cache: need a block for every arena and every process
*/
minbcmem = maxblocksize *
(mainindex->narenas + mainindex->nsects*4 + 16);
if(bcmem < minbcmem)
bcmem = minbcmem;
if(0) fprint(2, "initialize %d bytes of disk block cache\n", bcmem);
initdcache(bcmem);
if(mainindex->bloom)
startbloomproc(mainindex->bloom);
fprint(2, "sync...");
if(!readonly && syncindex(mainindex) < 0)
sysfatal("can't sync server: %r");
if(!readonly && queuewrites){
fprint(2, "queue...");
if(initlumpqueues(mainindex->nsects) < 0){
fprint(2, "can't initialize lump queues,"
" disabling write queueing: %r");
queuewrites = 0;
}
}
if(initarenasum() < 0)
fprint(2, "warning: can't initialize arena summing process: %r");
fprint(2, "announce %s...", vaddr);
ventisrv = vtlisten(vaddr);
if(ventisrv == nil)
sysfatal("can't announce %s: %r", vaddr);
fprint(2, "serving.\n");
if(nofork)
ventiserver(nil);
else
vtproc(ventiserver, nil);
threadexits(nil);
}
static void
vtrerror(VtReq *r, char *error)
{
r->rx.msgtype = VtRerror;
r->rx.error = estrdup(error);
}
static void
ventiserver(void *v)
{
Packet *p;
VtReq *r;
char err[ERRMAX];
uint ms;
int cached, ok;
USED(v);
threadsetname("ventiserver");
trace(TraceWork, "start");
while((r = vtgetreq(ventisrv)) != nil){
trace(TraceWork, "finish");
trace(TraceWork, "start request %F", &r->tx);
trace(TraceRpc, "<- %F", &r->tx);
r->rx.msgtype = r->tx.msgtype+1;
addstat(StatRpcTotal, 1);
if(0) print("req (arenas[0]=%p sects[0]=%p) %F\n",
mainindex->arenas[0], mainindex->sects[0], &r->tx);
switch(r->tx.msgtype){
default:
vtrerror(r, "unknown request");
break;
case VtTread:
ms = msec();
r->rx.data = readlump(r->tx.score, r->tx.blocktype, r->tx.count, &cached);
ms = msec() - ms;
addstat2(StatRpcRead, 1, StatRpcReadTime, ms);
if(r->rx.data == nil){
addstat(StatRpcReadFail, 1);
rerrstr(err, sizeof err);
vtrerror(r, err);
}else{
addstat(StatRpcReadBytes, packetsize(r->rx.data));
addstat(StatRpcReadOk, 1);
if(cached)
addstat2(StatRpcReadCached, 1, StatRpcReadCachedTime, ms);
else
addstat2(StatRpcReadUncached, 1, StatRpcReadUncachedTime, ms);
}
break;
case VtTwrite:
if(readonly){
vtrerror(r, "read only");
break;
}
p = r->tx.data;
r->tx.data = nil;
addstat(StatRpcWriteBytes, packetsize(p));
ms = msec();
ok = writelump(p, r->rx.score, r->tx.blocktype, 0, ms);
ms = msec() - ms;
addstat2(StatRpcWrite, 1, StatRpcWriteTime, ms);
if(ok < 0){
addstat(StatRpcWriteFail, 1);
rerrstr(err, sizeof err);
vtrerror(r, err);
}
break;
case VtTsync:
flushqueue();
flushdcache();
break;
}
trace(TraceRpc, "-> %F", &r->rx);
vtrespond(r);
trace(TraceWork, "start");
}
flushdcache();
flushicache();
threadexitsall(0);
}
|