#include "stdinc.h"
#include "dat.h"
#include "fns.h"
typedef struct ASum ASum;
struct ASum
{
Arena *arena;
ASum *next;
};
static void sealarena(Arena *arena);
static int okarena(Arena *arena);
static int loadarena(Arena *arena);
static CIBlock *getcib(Arena *arena, int clump, int writing, CIBlock *rock);
static void putcib(Arena *arena, CIBlock *cib);
static void sumproc(void *);
static QLock sumlock;
static Rendez sumwait;
static ASum *sumq;
static ASum *sumqtail;
static uchar zero[8192];
int arenasumsleeptime;
int
initarenasum(void)
{
needzeroscore(); /* OS X */
sumwait.l = &sumlock;
if(vtproc(sumproc, nil) < 0){
seterr(EOk, "can't start arena checksum slave: %r");
return -1;
}
return 0;
}
/*
* make an Arena, and initialize it based upon the disk header and trailer.
*/
Arena*
initarena(Part *part, u64int base, u64int size, u32int blocksize)
{
Arena *arena;
arena = MKZ(Arena);
arena->part = part;
arena->blocksize = blocksize;
arena->clumpmax = arena->blocksize / ClumpInfoSize;
arena->base = base + blocksize;
arena->size = size - 2 * blocksize;
if(loadarena(arena) < 0){
seterr(ECorrupt, "arena header or trailer corrupted");
freearena(arena);
return nil;
}
if(okarena(arena) < 0){
freearena(arena);
return nil;
}
if(arena->diskstats.sealed && scorecmp(zeroscore, arena->score)==0)
backsumarena(arena);
return arena;
}
void
freearena(Arena *arena)
{
if(arena == nil)
return;
free(arena);
}
Arena*
newarena(Part *part, u32int vers, char *name, u64int base, u64int size, u32int blocksize)
{
int bsize;
Arena *arena;
if(nameok(name) < 0){
seterr(EOk, "illegal arena name", name);
return nil;
}
arena = MKZ(Arena);
arena->part = part;
arena->version = vers;
if(vers == ArenaVersion4)
arena->clumpmagic = _ClumpMagic;
else{
do
arena->clumpmagic = fastrand();
while(arena->clumpmagic==_ClumpMagic || arena->clumpmagic==0);
}
arena->blocksize = blocksize;
arena->clumpmax = arena->blocksize / ClumpInfoSize;
arena->base = base + blocksize;
arena->size = size - 2 * blocksize;
namecp(arena->name, name);
bsize = sizeof zero;
if(bsize > arena->blocksize)
bsize = arena->blocksize;
if(wbarena(arena)<0 || wbarenahead(arena)<0
|| writepart(arena->part, arena->base, zero, bsize)<0){
freearena(arena);
return nil;
}
return arena;
}
int
readclumpinfo(Arena *arena, int clump, ClumpInfo *ci)
{
CIBlock *cib, r;
cib = getcib(arena, clump, 0, &r);
if(cib == nil)
return -1;
unpackclumpinfo(ci, &cib->data->data[cib->offset]);
putcib(arena, cib);
return 0;
}
int
readclumpinfos(Arena *arena, int clump, ClumpInfo *cis, int n)
{
CIBlock *cib, r;
int i;
for(i = 0; i < n; i++){
cib = getcib(arena, clump + i, 0, &r);
if(cib == nil)
break;
unpackclumpinfo(&cis[i], &cib->data->data[cib->offset]);
putcib(arena, cib);
}
return i;
}
/*
* write directory information for one clump
* must be called the arena locked
*/
int
writeclumpinfo(Arena *arena, int clump, ClumpInfo *ci)
{
CIBlock *cib, r;
cib = getcib(arena, clump, 1, &r);
if(cib == nil)
return -1;
dirtydblock(cib->data, DirtyArenaCib);
packclumpinfo(ci, &cib->data->data[cib->offset]);
putcib(arena, cib);
return 0;
}
u64int
arenadirsize(Arena *arena, u32int clumps)
{
return ((clumps / arena->clumpmax) + 1) * arena->blocksize;
}
/*
* read a clump of data
* n is a hint of the size of the data, not including the header
* make sure it won't run off the end, then return the number of bytes actually read
*/
u32int
readarena(Arena *arena, u64int aa, u8int *buf, long n)
{
DBlock *b;
u64int a;
u32int blocksize, off, m;
long nn;
if(n == 0)
return -1;
qlock(&arena->lock);
a = arena->size - arenadirsize(arena, arena->memstats.clumps);
qunlock(&arena->lock);
if(aa >= a){
seterr(EOk, "reading beyond arena clump storage: clumps=%d aa=%lld a=%lld -1 clumps=%lld\n", arena->memstats.clumps, aa, a, arena->size - arenadirsize(arena, arena->memstats.clumps - 1));
return -1;
}
if(aa + n > a)
n = a - aa;
blocksize = arena->blocksize;
a = arena->base + aa;
off = a & (blocksize - 1);
a -= off;
nn = 0;
for(;;){
b = getdblock(arena->part, a, OREAD);
if(b == nil)
return -1;
m = blocksize - off;
if(m > n - nn)
m = n - nn;
memmove(&buf[nn], &b->data[off], m);
putdblock(b);
nn += m;
if(nn == n)
break;
off = 0;
a += blocksize;
}
return n;
}
/*
* write some data to the clump section at a given offset
* used to fix up corrupted arenas.
*/
u32int
writearena(Arena *arena, u64int aa, u8int *clbuf, u32int n)
{
DBlock *b;
u64int a;
u32int blocksize, off, m;
long nn;
int ok;
if(n == 0)
return -1;
qlock(&arena->lock);
a = arena->size - arenadirsize(arena, arena->memstats.clumps);
if(aa >= a || aa + n > a){
qunlock(&arena->lock);
seterr(EOk, "writing beyond arena clump storage");
return -1;
}
blocksize = arena->blocksize;
a = arena->base + aa;
off = a & (blocksize - 1);
a -= off;
nn = 0;
for(;;){
b = getdblock(arena->part, a, off != 0 || off + n < blocksize ? ORDWR : OWRITE);
if(b == nil){
qunlock(&arena->lock);
return -1;
}
dirtydblock(b, DirtyArena);
m = blocksize - off;
if(m > n - nn)
m = n - nn;
memmove(&b->data[off], &clbuf[nn], m);
ok = 0;
putdblock(b);
if(ok < 0){
qunlock(&arena->lock);
return -1;
}
nn += m;
if(nn == n)
break;
off = 0;
a += blocksize;
}
qunlock(&arena->lock);
return n;
}
/*
* allocate space for the clump and write it,
* updating the arena directory
ZZZ question: should this distinguish between an arena
filling up and real errors writing the clump?
*/
u64int
writeaclump(Arena *arena, Clump *c, u8int *clbuf, u64int start, u64int *pa)
{
DBlock *b;
u64int a, aa;
u32int clump, n, nn, m, off, blocksize;
int ok;
AState as;
n = c->info.size + ClumpSize + U32Size;
qlock(&arena->lock);
aa = arena->memstats.used;
if(arena->memstats.sealed
|| aa + n + U32Size + arenadirsize(arena, arena->memstats.clumps + 1) > arena->size){
if(!arena->memstats.sealed){
logerr(EOk, "seal memstats %s", arena->name);
arena->memstats.sealed = 1;
as.arena = arena;
as.aa = start+aa;
as.stats = arena->memstats;
setdcachestate(&as);
}
qunlock(&arena->lock);
return TWID64;
}
if(packclump(c, &clbuf[0], arena->clumpmagic) < 0){
qunlock(&arena->lock);
return TWID64;
}
/*
* write the data out one block at a time
*/
blocksize = arena->blocksize;
a = arena->base + aa;
off = a & (blocksize - 1);
a -= off;
nn = 0;
for(;;){
b = getdblock(arena->part, a, off != 0 ? ORDWR : OWRITE);
if(b == nil){
qunlock(&arena->lock);
return TWID64;
}
dirtydblock(b, DirtyArena);
m = blocksize - off;
if(m > n - nn)
m = n - nn;
memmove(&b->data[off], &clbuf[nn], m);
ok = 0;
putdblock(b);
if(ok < 0){
qunlock(&arena->lock);
return TWID64;
}
nn += m;
if(nn == n)
break;
off = 0;
a += blocksize;
}
arena->memstats.used += c->info.size + ClumpSize;
arena->memstats.uncsize += c->info.uncsize;
if(c->info.size < c->info.uncsize)
arena->memstats.cclumps++;
clump = arena->memstats.clumps++;
if(arena->memstats.clumps == 0)
sysfatal("clumps wrapped");
arena->wtime = now();
if(arena->ctime == 0)
arena->ctime = arena->wtime;
writeclumpinfo(arena, clump, &c->info);
wbarena(arena);
/* set up for call to setdcachestate */
as.arena = arena;
as.aa = start+arena->memstats.used;
as.stats = arena->memstats;
/* update this before calling setdcachestate so it cannot be behind dcache.diskstate */
*pa = start+aa;
setdcachestate(&as);
qunlock(&arena->lock);
return aa;
}
int
atailcmp(ATailStats *a, ATailStats *b)
{
/* good test */
if(a->used < b->used)
return -1;
if(a->used > b->used)
return 1;
/* suspect tests - why order this way? (no one cares) */
if(a->clumps < b->clumps)
return -1;
if(a->clumps > b->clumps)
return 1;
if(a->cclumps < b->cclumps)
return -1;
if(a->cclumps > b->cclumps)
return 1;
if(a->uncsize < b->uncsize)
return -1;
if(a->uncsize > b->uncsize)
return 1;
if(a->sealed < b->sealed)
return -1;
if(a->sealed > b->sealed)
return 1;
/* everything matches */
return 0;
}
void
setatailstate(AState *as)
{
int i, j, osealed;
Arena *a;
Index *ix;
trace(0, "setatailstate %s 0x%llux clumps %d", as->arena->name, as->aa, as->stats.clumps);
/*
* Look up as->arena to find index.
*/
ix = mainindex;
for(i=0; i<ix->narenas; i++)
if(ix->arenas[i] == as->arena)
break;
if(i==ix->narenas || as->aa < ix->amap[i].start || as->aa >= ix->amap[i].stop || as->arena != ix->arenas[i]){
fprint(2, "funny settailstate 0x%llux\n", as->aa);
return;
}
for(j=0; j<=i; j++){
a = ix->arenas[j];
if(atailcmp(&a->diskstats, &a->memstats) == 0)
continue;
qlock(&a->lock);
osealed = a->diskstats.sealed;
if(j == i)
a->diskstats = as->stats;
else
a->diskstats = a->memstats;
wbarena(a);
if(a->diskstats.sealed != osealed && !a->inqueue)
sealarena(a);
qunlock(&a->lock);
}
}
/*
* once sealed, an arena never has any data added to it.
* it should only be changed to fix errors.
* this also syncs the clump directory.
*/
static void
sealarena(Arena *arena)
{
arena->inqueue = 1;
backsumarena(arena);
}
void
backsumarena(Arena *arena)
{
ASum *as;
if(sumwait.l == nil)
return;
as = MK(ASum);
if(as == nil)
return;
qlock(&sumlock);
as->arena = arena;
as->next = nil;
if(sumq)
sumqtail->next = as;
else
sumq = as;
sumqtail = as;
rwakeup(&sumwait);
qunlock(&sumlock);
}
static void
sumproc(void *unused)
{
ASum *as;
Arena *arena;
USED(unused);
for(;;){
qlock(&sumlock);
while(sumq == nil)
rsleep(&sumwait);
as = sumq;
sumq = as->next;
qunlock(&sumlock);
arena = as->arena;
free(as);
sumarena(arena);
}
}
void
sumarena(Arena *arena)
{
ZBlock *b;
DigestState s;
u64int a, e;
u32int bs;
int t;
u8int score[VtScoreSize];
bs = MaxIoSize;
if(bs < arena->blocksize)
bs = arena->blocksize;
/*
* read & sum all blocks except the last one
*/
memset(&s, 0, sizeof s);
b = alloczblock(bs, 0, arena->part->blocksize);
e = arena->base + arena->size;
for(a = arena->base - arena->blocksize; a + arena->blocksize <= e; a += bs){
disksched();
while((t=arenasumsleeptime) == SleepForever){
sleep(1000);
disksched();
}
sleep(t);
if(a + bs > e)
bs = arena->blocksize;
if(readpart(arena->part, a, b->data, bs) < 0)
goto ReadErr;
addstat(StatSumRead, 1);
addstat(StatSumReadBytes, bs);
sha1(b->data, bs, nil, &s);
}
/*
* the last one is special, since it may already have the checksum included
*/
bs = arena->blocksize;
if(readpart(arena->part, e, b->data, bs) < 0){
ReadErr:
logerr(EOk, "sumarena can't sum %s, read at %lld failed: %r", arena->name, a);
freezblock(b);
return;
}
addstat(StatSumRead, 1);
addstat(StatSumReadBytes, bs);
sha1(b->data, bs-VtScoreSize, nil, &s);
sha1(zeroscore, VtScoreSize, nil, &s);
sha1(nil, 0, score, &s);
/*
* check for no checksum or the same
*
* the writepart is okay because we flushed the dcache in sealarena
*/
if(scorecmp(score, &b->data[bs - VtScoreSize]) != 0){
if(scorecmp(zeroscore, &b->data[bs - VtScoreSize]) != 0)
logerr(EOk, "overwriting mismatched checksums for arena=%s, found=%V calculated=%V",
arena->name, &b->data[bs - VtScoreSize], score);
scorecp(&b->data[bs - VtScoreSize], score);
if(writepart(arena->part, e, b->data, bs) < 0)
logerr(EOk, "sumarena can't write sum for %s: %r", arena->name);
}
freezblock(b);
qlock(&arena->lock);
scorecp(arena->score, score);
qunlock(&arena->lock);
}
/*
* write the arena trailer block to the partition
*/
int
wbarena(Arena *arena)
{
DBlock *b;
int bad;
if((b = getdblock(arena->part, arena->base + arena->size, OWRITE)) == nil){
logerr(EAdmin, "can't write arena trailer: %r");
return -1;
}
dirtydblock(b, DirtyArenaTrailer);
bad = okarena(arena)<0 || packarena(arena, b->data)<0;
putdblock(b);
if(bad)
return -1;
return 0;
}
int
wbarenahead(Arena *arena)
{
ZBlock *b;
ArenaHead head;
int bad;
namecp(head.name, arena->name);
head.version = arena->version;
head.size = arena->size + 2 * arena->blocksize;
head.blocksize = arena->blocksize;
head.clumpmagic = arena->clumpmagic;
b = alloczblock(arena->blocksize, 1, arena->part->blocksize);
if(b == nil){
logerr(EAdmin, "can't write arena header: %r");
/* ZZZ add error message? */
return -1;
}
/*
* this writepart is okay because it only happens
* during initialization.
*/
bad = packarenahead(&head, b->data)<0 ||
writepart(arena->part, arena->base - arena->blocksize, b->data, arena->blocksize)<0 ||
flushpart(arena->part)<0;
freezblock(b);
if(bad)
return -1;
return 0;
}
/*
* read the arena header and trailer blocks from disk
*/
static int
loadarena(Arena *arena)
{
ArenaHead head;
ZBlock *b;
b = alloczblock(arena->blocksize, 0, arena->part->blocksize);
if(b == nil)
return -1;
if(readpart(arena->part, arena->base + arena->size, b->data, arena->blocksize) < 0){
freezblock(b);
return -1;
}
if(unpackarena(arena, b->data) < 0){
freezblock(b);
return -1;
}
if(arena->version != ArenaVersion4 && arena->version != ArenaVersion5){
seterr(EAdmin, "unknown arena version %d", arena->version);
freezblock(b);
return -1;
}
scorecp(arena->score, &b->data[arena->blocksize - VtScoreSize]);
if(readpart(arena->part, arena->base - arena->blocksize, b->data, arena->blocksize) < 0){
logerr(EAdmin, "can't read arena header: %r");
freezblock(b);
return 0;
}
if(unpackarenahead(&head, b->data) < 0)
logerr(ECorrupt, "corrupted arena header: %r");
else if(namecmp(arena->name, head.name)!=0
|| arena->clumpmagic != head.clumpmagic
|| arena->version != head.version
|| arena->blocksize != head.blocksize
|| arena->size + 2 * arena->blocksize != head.size){
if(namecmp(arena->name, head.name)!=0)
logerr(ECorrupt, "arena tail name %s head %s",
arena->name, head.name);
else if(arena->clumpmagic != head.clumpmagic)
logerr(ECorrupt, "arena tail clumpmagic 0x%lux head 0x%lux",
(ulong)arena->clumpmagic, (ulong)head.clumpmagic);
else if(arena->version != head.version)
logerr(ECorrupt, "arena tail version %d head version %d",
arena->version, head.version);
else if(arena->blocksize != head.blocksize)
logerr(ECorrupt, "arena tail block size %d head %d",
arena->blocksize, head.blocksize);
else if(arena->size+2*arena->blocksize != head.size)
logerr(ECorrupt, "arena tail size %lud head %lud",
(ulong)arena->size+2*arena->blocksize, head.size);
else
logerr(ECorrupt, "arena header inconsistent with arena data");
}
freezblock(b);
return 0;
}
static int
okarena(Arena *arena)
{
u64int dsize;
int ok;
ok = 0;
dsize = arenadirsize(arena, arena->diskstats.clumps);
if(arena->diskstats.used + dsize > arena->size){
seterr(ECorrupt, "arena %s used > size", arena->name);
ok = -1;
}
if(arena->diskstats.cclumps > arena->diskstats.clumps)
logerr(ECorrupt, "arena %s has more compressed clumps than total clumps", arena->name);
/*
* This need not be true if some of the disk is corrupted.
*
if(arena->diskstats.uncsize + arena->diskstats.clumps * ClumpSize + arena->blocksize < arena->diskstats.used)
logerr(ECorrupt, "arena %s uncompressed size inconsistent with used space %lld %d %lld", arena->name, arena->diskstats.uncsize, arena->diskstats.clumps, arena->diskstats.used);
*/
if(arena->ctime > arena->wtime)
logerr(ECorrupt, "arena %s creation time after last write time", arena->name);
return ok;
}
static CIBlock*
getcib(Arena *arena, int clump, int writing, CIBlock *rock)
{
int mode;
CIBlock *cib;
u32int block, off;
if(clump >= arena->memstats.clumps){
seterr(EOk, "clump directory access out of range");
return nil;
}
block = clump / arena->clumpmax;
off = (clump - block * arena->clumpmax) * ClumpInfoSize;
cib = rock;
cib->block = block;
cib->offset = off;
if(writing){
if(off == 0 && clump == arena->memstats.clumps-1)
mode = OWRITE;
else
mode = ORDWR;
}else
mode = OREAD;
cib->data = getdblock(arena->part,
arena->base + arena->size - (block + 1) * arena->blocksize, mode);
if(cib->data == nil)
return nil;
return cib;
}
static void
putcib(Arena *arena, CIBlock *cib)
{
USED(arena);
putdblock(cib->data);
cib->data = nil;
}
|