#include "stdinc.h"
#include "dat.h"
#include "fns.h"
#include "error.h"
static void diskThread(void *a);
enum {
/*
* disable measurement since it gets alignment faults on BG
* and the guts used to be commented out.
*/
Timing = 0, /* flag */
QueueSize = 100, /* maximum block to queue */
};
struct Disk {
VtLock *lk;
int ref;
int fd;
Header h;
VtRendez *flow;
VtRendez *starve;
VtRendez *flush;
VtRendez *die;
int nqueue;
Block *cur; /* block to do on current scan */
Block *next; /* blocks to do next scan */
};
/* keep in sync with Part* enum in dat.h */
static char *partname[] = {
[PartError] "error",
[PartSuper] "super",
[PartLabel] "label",
[PartData] "data",
[PartVenti] "venti",
};
Disk *
diskAlloc(int fd)
{
u8int buf[HeaderSize];
Header h;
Disk *disk;
if(pread(fd, buf, HeaderSize, HeaderOffset) < HeaderSize){
vtSetError("short read: %r");
vtOSError();
return nil;
}
if(!headerUnpack(&h, buf)){
vtSetError("bad disk header");
return nil;
}
disk = vtMemAllocZ(sizeof(Disk));
disk->lk = vtLockAlloc();
disk->starve = vtRendezAlloc(disk->lk);
disk->flow = vtRendezAlloc(disk->lk);
disk->flush = vtRendezAlloc(disk->lk);
disk->fd = fd;
disk->h = h;
disk->ref = 2;
vtThread(diskThread, disk);
return disk;
}
void
diskFree(Disk *disk)
{
diskFlush(disk);
/* kill slave */
vtLock(disk->lk);
disk->die = vtRendezAlloc(disk->lk);
vtWakeup(disk->starve);
while(disk->ref > 1)
vtSleep(disk->die);
vtUnlock(disk->lk);
vtRendezFree(disk->flow);
vtRendezFree(disk->starve);
vtRendezFree(disk->die);
vtLockFree(disk->lk);
close(disk->fd);
vtMemFree(disk);
}
static u32int
partStart(Disk *disk, int part)
{
switch(part){
default:
assert(0);
case PartSuper:
return disk->h.super;
case PartLabel:
return disk->h.label;
case PartData:
return disk->h.data;
}
}
static u32int
partEnd(Disk *disk, int part)
{
switch(part){
default:
assert(0);
case PartSuper:
return disk->h.super+1;
case PartLabel:
return disk->h.data;
case PartData:
return disk->h.end;
}
}
int
diskReadRaw(Disk *disk, int part, u32int addr, uchar *buf)
{
ulong start, end;
u64int offset;
int n, nn;
start = partStart(disk, part);
end = partEnd(disk, part);
if(addr >= end-start){
vtSetError(EBadAddr);
return 0;
}
offset = ((u64int)(addr + start))*disk->h.blockSize;
n = disk->h.blockSize;
while(n > 0){
nn = pread(disk->fd, buf, n, offset);
if(nn < 0){
vtOSError();
return 0;
}
if(nn == 0){
vtSetError("eof reading disk");
return 0;
}
n -= nn;
offset += nn;
buf += nn;
}
return 1;
}
int
diskWriteRaw(Disk *disk, int part, u32int addr, uchar *buf)
{
ulong start, end;
u64int offset;
int n;
start = partStart(disk, part);
end = partEnd(disk, part);
if(addr >= end - start){
vtSetError(EBadAddr);
return 0;
}
offset = ((u64int)(addr + start))*disk->h.blockSize;
n = pwrite(disk->fd, buf, disk->h.blockSize, offset);
if(n < 0){
vtOSError();
return 0;
}
if(n < disk->h.blockSize) {
vtSetError("short write");
return 0;
}
return 1;
}
static void
diskQueue(Disk *disk, Block *b)
{
Block **bp, *bb;
vtLock(disk->lk);
while(disk->nqueue >= QueueSize)
vtSleep(disk->flow);
if(disk->cur == nil || b->addr > disk->cur->addr)
bp = &disk->cur;
else
bp = &disk->next;
for(bb=*bp; bb; bb=*bp){
if(b->addr < bb->addr)
break;
bp = &bb->ionext;
}
b->ionext = bb;
*bp = b;
if(disk->nqueue == 0)
vtWakeup(disk->starve);
disk->nqueue++;
vtUnlock(disk->lk);
}
void
diskRead(Disk *disk, Block *b)
{
assert(b->iostate == BioEmpty || b->iostate == BioLabel);
blockSetIOState(b, BioReading);
diskQueue(disk, b);
}
void
diskWrite(Disk *disk, Block *b)
{
assert(b->nlock == 1);
assert(b->iostate == BioDirty);
blockSetIOState(b, BioWriting);
diskQueue(disk, b);
}
void
diskWriteAndWait(Disk *disk, Block *b)
{
int nlock;
/*
* If b->nlock > 1, the block is aliased within
* a single thread. That thread is us.
* DiskWrite does some funny stuff with VtLock
* and blockPut that basically assumes b->nlock==1.
* We humor diskWrite by temporarily setting
* nlock to 1. This needs to be revisited.
*/
nlock = b->nlock;
if(nlock > 1)
b->nlock = 1;
diskWrite(disk, b);
while(b->iostate != BioClean)
vtSleep(b->ioready);
b->nlock = nlock;
}
int
diskBlockSize(Disk *disk)
{
return disk->h.blockSize; /* immuttable */
}
int
diskFlush(Disk *disk)
{
Dir dir;
vtLock(disk->lk);
while(disk->nqueue > 0)
vtSleep(disk->flush);
vtUnlock(disk->lk);
/* there really should be a cleaner interface to flush an fd */
nulldir(&dir);
if(dirfwstat(disk->fd, &dir) < 0){
vtOSError();
return 0;
}
return 1;
}
u32int
diskSize(Disk *disk, int part)
{
return partEnd(disk, part) - partStart(disk, part);
}
static uintptr
mypc(int x)
{
return getcallerpc(&x);
}
static char *
disk2file(Disk *disk)
{
static char buf[256];
if (fd2path(disk->fd, buf, sizeof buf) < 0)
strncpy(buf, "GOK", sizeof buf);
return buf;
}
static void
diskThread(void *a)
{
Disk *disk = a;
Block *b;
uchar *buf, *p;
double t;
int nio;
vtThreadSetName("disk");
//fprint(2, "diskThread %d\n", getpid());
buf = vtMemAlloc(disk->h.blockSize);
vtLock(disk->lk);
if (Timing) {
nio = 0;
t = -nsec();
}
for(;;){
while(disk->nqueue == 0){
if (Timing) {
t += nsec();
if(nio >= 10000){
fprint(2, "disk: io=%d at %.3fms\n",
nio, t*1e-6/nio);
nio = 0;
t = 0;
}
}
if(disk->die != nil)
goto Done;
vtSleep(disk->starve);
if (Timing)
t -= nsec();
}
assert(disk->cur != nil || disk->next != nil);
if(disk->cur == nil){
disk->cur = disk->next;
disk->next = nil;
}
b = disk->cur;
disk->cur = b->ionext;
vtUnlock(disk->lk);
/*
* no one should hold onto blocking in the
* reading or writing state, so this lock should
* not cause deadlock.
*/
if(0)fprint(2, "fossil: diskThread: %d:%d %x\n", getpid(), b->part, b->addr);
bwatchLock(b);
vtLock(b->lk);
b->pc = mypc(0);
assert(b->nlock == 1);
switch(b->iostate){
default:
abort();
case BioReading:
if(!diskReadRaw(disk, b->part, b->addr, b->data)){
fprint(2, "fossil: diskReadRaw failed: %s: "
"score %V: part=%s block %ud: %r\n",
disk2file(disk), b->score,
partname[b->part], b->addr);
blockSetIOState(b, BioReadError);
}else
blockSetIOState(b, BioClean);
break;
case BioWriting:
p = blockRollback(b, buf);
/* NB: ctime result ends with a newline */
if(!diskWriteRaw(disk, b->part, b->addr, p)){
fprint(2, "fossil: diskWriteRaw failed: %s: "
"score %V: date %s part=%s block %ud: %r\n",
disk2file(disk), b->score,
ctime(time(0)),
partname[b->part], b->addr);
break;
}
if(p != buf)
blockSetIOState(b, BioClean);
else
blockSetIOState(b, BioDirty);
break;
}
blockPut(b); /* remove extra reference, unlock */
vtLock(disk->lk);
disk->nqueue--;
if(disk->nqueue == QueueSize-1)
vtWakeup(disk->flow);
if(disk->nqueue == 0)
vtWakeup(disk->flush);
if(Timing)
nio++;
}
Done:
//fprint(2, "diskThread done\n");
disk->ref--;
vtWakeup(disk->die);
vtUnlock(disk->lk);
vtMemFree(buf);
}
|