/*
* Copyright (c) 2013, Coraid, Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Coraid nor the
* names of its contributors may be used to endorse or promote products
* derived from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL CORAID BE LIABLE FOR ANY
* DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <fcall.h>
#include <9p.h>
#include "dat.h"
enum {
CDirty = 1,
CFlushing = 2,
CFree = 4,
CBrelease = 1,
CBclean,
CBread,
CBwrite,
CCanfree,
CReset,
Ncht = 4001,
Nreaders = 4,
Ridle = 0,
Rloading = 1,
};
typedef struct CBlock CBlock;
typedef struct Cachereq Cachereq;
typedef struct Cacheresp Cacheresp;
typedef struct Reader Reader;
struct CBlock {
Ref ref;
uvlong blkno;
uchar buf[BlkSize];
int flags;
CBlock *next, *prev;
CBlock *htnext, *htprev;
CBlock *wnext, *wprev;
};
struct Cachereq {
int req;
uvlong blk;
Channel *resp;
};
struct Cacheresp {
int res;
void *p;
};
struct Reader {
char *dev;
Channel *rdchan;
int state;
uvlong loading;
};
static int mypid;
static Channel *wbtrigger;
static CBlock *ht[Ncht];
static CBlock *chd, *ctl;
static CBlock *whd, *wtl;
static CBlock *freehd;
static int maxcache;
static Ref ncache;
static Ref ndirty;
static Ref nwlist;
static int timertid;
static uvlong nmiss;
static uvlong nread;
static uvlong nwrite;
static ulong hrate;
static int syncing;
static Reader rds[Nreaders];
static Lock calock;
Channel *cachechan;
/*
* Because all the allocs and frees are done in threads of the
* same process, we shouldn't need any locks
*/
static CBlock *
cballoc(void)
{
CBlock *p;
if(freehd == nil)
return θmalloc(sizeof(CBlock));
lock(&calock);
p = freehd;
freehd = p->next;
if(!(p->flags & CFree))
fprint(2, "Internal error: non-free block on free list\n");
p->flags &= ~CFree;
p->next = nil;
unlock(&calock);
return p;
}
static void
cbfree(CBlock *p)
{
if(p->flags & CFree)
fprint(2, "Freeing already free block?!?!?\n");
else if(p->ref.ref != 0)
fprint(2, "Freeing in use block\n");
else if(p->next || p->prev || p->htnext || p->htprev || p->wnext || p->wprev)
fprint(2, "Freeing block in data structures\n");
else {
lock(&calock);
p->flags |= CFree;
p->next = freehd;
freehd = p;
unlock(&calock);
}
}
static CBlock *
lookup(uvlong blk)
{
CBlock *p;
int idx;
idx = blk % Ncht;
for(p = ht[idx]; p && p->blkno != blk; p = p->htnext) ;
return p;
}
static void
updatestats(int hit)
{
if(!hit) {
++nmiss;
hrate = (999 * hrate + 500) / 1000;
}
else
hrate = (999 * hrate + 500) / 1000 + 1000;
}
static void
insht(CBlock *p)
{
int idx;
idx = p->blkno % Ncht;
if(ht[idx])
ht[idx]->htprev = p;
p->htnext = ht[idx];
ht[idx] = p;
}
static void
rmht(CBlock *p)
{
CBlock *nxt, *prv;
int idx;
idx = p->blkno % Ncht;
nxt = p->htnext;
prv = p->htprev;
if(nxt)
nxt->htprev = prv;
if(prv)
prv->htnext = nxt;
if(ht[idx] == p)
ht[idx] = nxt;
p->htnext = nil;
p->htprev = nil;
}
static void
inslru(CBlock *p)
{
if(chd == nil)
chd = p;
else
ctl->next = p;
p->prev = ctl;
p->next = nil;
ctl = p;
incref(&ncache);
}
static void
rmlru(CBlock *p)
{
if(p->next)
p->next->prev = p->prev;
else
ctl = p->prev;
if(p->prev)
p->prev->next = p->next;
else
chd = p->next;
p->next = nil;
p->prev = nil;
decref(&ncache);
}
static void
insw(CBlock *p)
{
if(whd == nil)
whd = p;
else
wtl->wnext = p;
p->wprev = wtl;
p->wnext = nil;
wtl = p;
incref(&nwlist);
}
static void
rmw(CBlock *p)
{
if(p->wnext)
p->wnext->wprev = p->wprev;
else
wtl = p->wprev;
if(p->wprev)
p->wprev->wnext = p->wnext;
else
whd = p->wnext;
p->wnext = nil;
p->wprev = nil;
decref(&nwlist);
}
static void
mvlru(CBlock *p)
{
if(p != ctl) {
rmlru(p);
inslru(p);
}
}
static void
dolru(void)
{
CBlock *p;
int i;
if(ncache.ref < maxcache)
return;
for(p = chd, i = 0; p && (p->ref.ref > 0 || (p->flags & (CDirty | CFlushing))); p = p->next, ++i)
if(i > ncache.ref) {
fprint(2, "cycle in LRU list? n:%ld d:%ld\n", ncache.ref, ndirty.ref);
chd->prev = nil;
ctl->next = nil;
return;
}
if(p) {
rmht(p);
rmlru(p);
if(p->flags & CDirty) {
decref(&ndirty);
p->flags &= ~CDirty;
rmw(p);
}
cbfree(p);
}
}
static long
_iopread(va_list *arg)
{
void *a;
vlong off;
long n;
int fd;
fd = va_arg(*arg, int);
a = va_arg(*arg, void*);
n = va_arg(*arg, long);
off = va_arg(*arg, vlong);
return pread(fd, a, n, off);
}
static long
_iopwrite(va_list *arg)
{
void *a;
vlong off;
long n;
int fd;
fd = va_arg(*arg, int);
a = va_arg(*arg, void*);
n = va_arg(*arg, long);
off = va_arg(*arg, vlong);
return pwrite(fd, a, n, off);
}
static void
wbtimer(void *)
{
while(!shutdown) {
sleep(15000);
if(syncing != 1)
sendul(wbtrigger, 1);
}
sendul(wbtrigger, 1);
}
static void
wbthread(void *d)
{
Ioproc *wbio;
CBlock *p;
char *dev;
int fd;
dev = d;
wbio = ioproc();
fd = ioopen(wbio, dev, ORDWR);
if(fd < 0)
sysfatal("wb open: %r");
while(!shutdown) {
recvul(wbtrigger);
syncing = 1;
do {
for(p = whd; p && p->ref.ref > 0 && p->blkno >= super.firstdat; p = p->wnext) ;
if(p) {
p->flags |= CFlushing;
p->flags &= ~CDirty;
decref(&ndirty);
rmw(p);
++nwrite;
iocall(wbio, _iopwrite, fd, p->buf, BlkSize, p->blkno * BlkSize);
p->flags &= ~CFlushing;
}
} while(p);
syncing = 0;
}
ioclose(wbio, fd);
closeioproc(wbio);
}
static int
_brelease(uvlong blk)
{
CBlock *p;
int rv;
rv = 0;
p = lookup(blk);
if(p) {
if(p->ref.ref == 0) {
fprint(2, "trying to decrement below 0: blk %ulld\n", blk);
rv = -1;
}
else
decref(&p->ref);
}
else
rv = -1;
dolru();
return rv;
}
static void *
_cbclean(uvlong blk)
{
CBlock *p;
p = lookup(blk);
if(p) {
memset(p->buf, 0, BlkSize);
mvlru(p);
incref(&p->ref);
updatestats(1);
return p->buf;
}
updatestats(0);
dolru();
p = cballoc();
memset(p->buf, 0, BlkSize);
p->blkno = blk;
incref(&p->ref);
insht(p);
inslru(p);
return p->buf;
}
static void
reader(void *a)
{
Cachereq r;
Cacheresp rsp;
Ioproc *cio;
Reader *rp;
CBlock *p;
int cfd, i;
rp = a;
cio = ioproc();
cfd = ioopen(cio, rp->dev, ORDWR);
if(cfd < 0)
sysfatal("Couldn't open device: %r");
while(1) {
if(recv(rp->rdchan, &r) == 0) {
if(shutdown) {
closeioproc(cio);
threadexits(nil);
}
continue;
}
/*
* See if it got loaded while it was in the channel queue
*/
p = lookup(r.blk);
if(p) {
mvlru(p);
incref(&p->ref);
updatestats(1);
rsp.p = p->buf;
send(r.resp, &rsp);
continue;
}
/*
* If another reader is already loading this block, pass off the request
* to that reader. That way, by the time this request gets looked at
* again, the block will already be loaded.
*/
for(i = 0; i < Nreaders && (rds[i].state != Rloading || rds[i].loading != r.blk); ++i) ;
if(i < Nreaders) {
send(rds[i].rdchan, &r);
continue;
}
rp->state = Rloading;
rp->loading = r.blk;
dolru();
p = cballoc();
p->blkno = r.blk;
incref(&p->ref);
++nread;
if(iocall(cio, _iopread, cfd, p->buf, BlkSize, r.blk * BlkSize) != BlkSize) {
rp->state = Ridle;
cbfree(p);
rsp.p = nil;
send(r.resp, &rsp);
continue;
}
insht(p);
inslru(p);
rp->state = Ridle;
rsp.p = p->buf;
send(r.resp, &rsp);
}
}
static void
_cbread(Cachereq *r)
{
Cacheresp rsp;
CBlock *p;
static int rr;
p = lookup(r->blk);
if(p) {
mvlru(p);
incref(&p->ref);
updatestats(1);
rsp.p = p->buf;
send(r->resp, &rsp);
return;
}
updatestats(0);
send(rds[rr].rdchan, r);
++rr;
if(rr >= Nreaders)
rr = 0;
}
static void
_cbwrite(uvlong blk)
{
CBlock *p;
p = lookup(blk);
if(p) {
mvlru(p);
if(!(p->flags & CDirty)) {
p->flags |= CDirty;
incref(&ndirty);
insw(p);
}
}
if(ndirty.ref > ncache.ref / 10 && !syncing)
nbsendul(wbtrigger, 1);
}
static int
_ccanfree(uvlong blk)
{
CBlock *p;
p = lookup(blk);
if(p) {
if(p->ref.ref > 0 /* || (p->flags & (CDirty | CFlushing)) */ ) {
fprint(2, "Wanting to free block %ulld with ref %ld and flags %x\n", blk, p->ref.ref, p->flags);
return 0;
}
if(p->flags & CDirty) {
decref(&ndirty);
rmw(p);
p->flags &= ~CDirty;
}
rmht(p);
rmlru(p);
cbfree(p);
}
return 1;
}
static void
_resetcache(void)
{
CBlock *p;
while(1) {
for(p = chd; p && p->ref.ref > 0; p = p->next) ;
if(p == nil)
break;
rmht(p);
rmlru(p);
cbfree(p);
}
if(chd)
fprint(2, "warning: active blocks during reset\n");
}
static void
handler(void *)
{
Cacheresp rsp;
Cachereq r;
mypid = threadpid(threadid());
while(1) {
if(recv(cachechan, &r) == 0) {
if(shutdown)
threadexits(nil);
continue;
}
switch(r.req) {
case CBrelease:
rsp.res = _brelease(r.blk);
if(r.resp)
send(r.resp, &rsp);
break;
case CBclean:
rsp.p = _cbclean(r.blk);
send(r.resp, &rsp);
break;
case CBread:
_cbread(&r);
break;
case CBwrite:
_cbwrite(r.blk);
send(r.resp, &rsp);
break;
case CCanfree:
rsp.res = _ccanfree(r.blk);
send(r.resp, &rsp);
break;
case CReset:
_resetcache();
send(r.resp, &rsp);
break;
}
}
}
void
initcache(char *dev, int m)
{
int i;
maxcache = m;
for(i = 0; i < Nreaders; ++i) {
rds[i].dev = dev;
rds[i].rdchan = chancreate(sizeof(Cachereq), 10);
threadcreate(reader, &rds[i], 8192);
}
cachechan = chancreate(sizeof(Cachereq), 2);
threadcreate(handler, nil, 8192);
wbtrigger = chancreate(sizeof(ulong), 2);
threadcreate(wbthread, dev, 8192);
timertid = proccreate(wbtimer, nil, 1024);
}
void
haltcache(void)
{
int i;
for(i = 0; i < Nreaders; ++i)
chanclose(rds[i].rdchan);
chanclose(cachechan);
threadkill(timertid);
sendul(wbtrigger, 1);
for(i = 0; i < 30 && whd; ++i) {
fprint(2, ".");
sleep(1000);
}
}
int
brelease(uvlong blk)
{
Cachereq r;
if(mypid == threadpid(threadid()))
// return _brelease(blk);
{
int n;
n=_brelease(blk);
if(n==-1) fprint(2, "brelease error called from %p\n", getcallerpc(&blk));
return n;
}
r.req = CBrelease;
r.blk = blk;
r.resp = nil;
send(cachechan, &r);
return 0;
}
void *
cbclean(uvlong blk)
{
Cachereq r;
Cacheresp rsp;
if(mypid == threadpid(threadid()))
return _cbclean(blk);
r.req = CBclean;
r.blk = blk;
r.resp = chancreate(sizeof(Cacheresp), 0);
send(cachechan, &r);
recv(r.resp, &rsp);
chanfree(r.resp);
return rsp.p;
}
void *
cbread(uvlong blk)
{
Cachereq r;
Cacheresp rsp;
CBlock *p;
if(mypid == threadpid(threadid())) {
p = lookup(blk);
if(p) {
mvlru(p);
incref(&p->ref);
updatestats(1);
return p->buf;
}
}
r.req = CBread;
r.blk = blk;
r.resp = chancreate(sizeof(Cacheresp), 0);
send(cachechan, &r);
recv(r.resp, &rsp);
chanfree(r.resp);
return rsp.p;
}
void
cbwrite(uvlong blk)
{
Cachereq r;
Cacheresp rsp;
if(mypid == threadpid(threadid())) {
_cbwrite(blk);
return;
}
r.req = CBwrite;
r.blk = blk;
r.resp = chancreate(sizeof(Cacheresp), 0);
send(cachechan, &r);
recv(r.resp, &rsp);
chanfree(r.resp);
}
int
ccanfree(uvlong blk)
{
Cachereq r;
Cacheresp rsp;
if(mypid == threadpid(threadid()))
return _ccanfree(blk);
r.req = CCanfree;
r.blk = blk;
r.resp = chancreate(sizeof(Cacheresp), 0);
send(cachechan, &r);
recv(r.resp, &rsp);
chanfree(r.resp);
return rsp.res;
}
int
cread(void *a, int n, uvlong off)
{
uchar *p;
uvlong blk;
ulong boff;
blk = off / BlkSize;
boff = off % BlkSize;
if(boff + n > BlkSize) {
fprint(2, "invalid block crossing\n");
return -1;
}
p = cbread(blk);
if(p == nil)
return -1;
memmove(a, p + boff, n);
brelease(blk);
return n;
}
int
cwrite(void *a, int n, uvlong off)
{
uchar *p;
uvlong blk;
ulong boff;
blk = off / BlkSize;
if(blk == 0)
return -1;
boff = off % BlkSize;
if(boff + n > BlkSize) {
fprint(2, "invalid block crossing\n");
return -1;
}
p = cbread(blk);
if(p == nil)
return -1;
memmove(p + boff, a, n);
cbwrite(blk);
brelease(blk);
return n;
}
void
csync(void)
{
syncing = 2;
threadint(timertid);
while(syncing != 0)
yield();
}
static char cstatbuf[1024];
char *
prcstat(void)
{
CBlock *cb;
char *p, *e;
int ldirty, i, nhash;
int refhist[10];
int saidit = 0;
ldirty = 0;
p = cstatbuf;
e = p + nelem(cstatbuf);
memset(refhist, 0, 10 * sizeof(int));
p = seprint(p, e, "Cache stats:\n");
p = seprint(p, e, "ncache: %ld\n", ncache.ref);
p = seprint(p, e, "nwlist: %ld\n", nwlist.ref);
p = seprint(p, e, "ndirty: %ld\n", ndirty.ref);
for(cb = chd; cb; cb = cb->next) {
if(cb->flags & CDirty)
{
if(!saidit) {p = seprint(p, e, "dirty block ref:%ld blk:%ulld\n", cb->ref.ref, cb->blkno); ++saidit;}
++ldirty;
}
if(cb->ref.ref < 0) {
p = seprint(p, e, "bad ref count: %ld on block %ulld; setting to 0\n", cb->ref.ref, cb->blkno);
cb->ref.ref = 0;
}
else if(cb->ref.ref >= 9)
++refhist[9];
else
++refhist[cb->ref.ref];
if(cb->ref.ref > 0)
p = seprint(p, e, "In use block: %ulld flags %ux\n", cb->blkno, cb->flags);
}
nhash = 0;
for(i = 0; i < Ncht; ++i) {
for(cb = ht[i]; cb; cb = cb->htnext)
++nhash;
}
p = seprint(p, e, "nhash: %d\n", nhash);
p = seprint(p, e, "ldirty: %d\n", ldirty);
p = seprint(p, e, "nread: %ulld\n", nread);
p = seprint(p, e, "nwrite: %ulld\n", nwrite);
p = seprint(p, e, "nmiss: %ulld\n", nmiss);
p = seprint(p, e, "hit rate: %uld%%\n", (hrate + 5000) / 10000);
p = seprint(p, e, "ref count histogram:\n");
p = seprint(p, e, " 0 1 2 3 4 5 6 7 8 >8\n");
for(i = 0; i < 10; ++i)
p = seprint(p, e, "%4d ", refhist[i]);
seprint(p, e, "\n");
return cstatbuf;
}
void
resetcache(void)
{
Cachereq r;
Cacheresp rsp;
if(mypid == threadpid(threadid())) {
_resetcache();
return;
}
r.req = CReset;
r.resp = chancreate(sizeof(Cacheresp), 0);
send(cachechan, &r);
recv(r.resp, &rsp);
chanfree(r.resp);
}
|