Add extfs background sync thread.

This commit is contained in:
Jonas 'Sortie' Termansen 2015-01-31 00:19:54 +01:00
parent 26336de7ff
commit 6e8f17b5df
6 changed files with 118 additions and 3 deletions

View File

@ -12,6 +12,7 @@ CXXFLAGS:=$(CXXFLAGS) -Wall -Wextra -fno-exceptions -fno-rtti
LIBS:=$(LIBS)
ifeq ($(HOST_IS_SORTIX),0)
PTHREAD_OPTION:=-pthread
LIBS:=$(LIBS) -lfuse
CPPFLAGS:=$(CPPFLAGS) -D_FILE_OFFSET_BITS=64
endif
@ -27,7 +28,7 @@ install: all
install $(BINARIES) $(DESTDIR)$(BINDIR)
extfs: *.cpp *.h
$(CXX) -std=gnu++11 $(CPPFLAGS) $(CXXFLAGS) *.cpp -o $@ $(LIBS)
$(CXX) $(PTHREAD_OPTION) -std=gnu++11 $(CPPFLAGS) $(CXXFLAGS) *.cpp -o $@ $(LIBS)
clean:
rm -f $(BINARIES) *.o

View File

@ -22,6 +22,8 @@
#include <sys/types.h>
#include <assert.h>
#include <pthread.h>
#include <stddef.h>
#include <stdint.h>
@ -31,6 +33,8 @@
Block::Block(Device* device, uint32_t block_id)
{
this->modify_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
this->transit_done_cond = PTHREAD_COND_INITIALIZER;
this->prev_block = NULL;
this->next_block = NULL;
this->prev_hashed = NULL;
@ -41,7 +45,8 @@ Block::Block(Device* device, uint32_t block_id)
this->reference_count = 1;
this->block_id = block_id;
this->dirty = false;
this->block_data = 0;
this->is_in_transit = false;
this->block_data = NULL;
}
Block::~Block()
@ -68,6 +73,15 @@ void Block::Unref()
void Block::Sync()
{
if ( device->has_sync_thread )
{
pthread_mutex_lock(&device->sync_thread_lock);
while ( dirty || is_in_transit )
pthread_cond_wait(&transit_done_cond, &device->sync_thread_lock);
pthread_mutex_unlock(&device->sync_thread_lock);
return;
}
if ( !dirty )
return;
dirty = false;
@ -84,10 +98,14 @@ void Block::Sync()
void Block::BeginWrite()
{
assert(device->write);
pthread_mutex_lock(&modify_lock);
}
void Block::FinishWrite()
{
pthread_mutex_unlock(&modify_lock);
pthread_mutex_lock(&device->sync_thread_lock);
if ( !dirty )
{
dirty = true;
@ -96,7 +114,9 @@ void Block::FinishWrite()
if ( next_dirty )
next_dirty->prev_dirty = this;
device->dirty_block = this;
pthread_cond_signal(&device->sync_thread_cond);
}
pthread_mutex_unlock(&device->sync_thread_lock);
Use();
}

View File

@ -32,6 +32,8 @@ public:
~Block();
public:
pthread_mutex_t modify_lock;
pthread_cond_t transit_done_cond;
Block* prev_block;
Block* next_block;
Block* prev_hashed;
@ -42,6 +44,7 @@ public:
size_t reference_count;
uint32_t block_id;
bool dirty;
bool is_in_transit;
uint8_t* block_data;
public:

View File

@ -24,6 +24,7 @@
#include <sys/types.h>
#include <assert.h>
#include <pthread.h>
#include <stddef.h>
#include <stdint.h>
#include <string.h>
@ -33,6 +34,12 @@
#include "device.h"
#include "ioleast.h"
void* Device__SyncThread(void* ctx)
{
((Device*) ctx)->SyncThread();
return NULL;
}
Device::Device(int fd, uint32_t block_size, bool write)
{
this->write = write;
@ -46,16 +53,38 @@ Device::Device(int fd, uint32_t block_size, bool write)
this->dirty_block = NULL;
for ( size_t i = 0; i < DEVICE_HASH_LENGTH; i++ )
hash_blocks[i] = NULL;
this->sync_thread_cond = PTHREAD_COND_INITIALIZER;
this->sync_thread_idle_cond = PTHREAD_COND_INITIALIZER;
this->sync_thread_lock = PTHREAD_MUTEX_INITIALIZER;
this->sync_in_transit = false;
this->has_sync_thread = false;
}
Device::~Device()
{
if ( has_sync_thread )
{
pthread_mutex_lock(&sync_thread_lock);
sync_thread_should_exit = true;
pthread_cond_signal(&sync_thread_cond);
pthread_mutex_unlock(&sync_thread_lock);
pthread_join(sync_thread, NULL);
has_sync_thread = false;
}
Sync();
while ( mru_block )
delete mru_block;
close(fd);
}
void Device::SpawnSyncThread()
{
if ( this->has_sync_thread )
return;
this->has_sync_thread = write &&
pthread_create(&this->sync_thread, NULL, Device__SyncThread, this) == 0;
}
Block* Device::GetBlock(uint32_t block_id)
{
if ( Block* block = GetCachedBlock(block_id) )
@ -97,6 +126,56 @@ Block* Device::GetCachedBlock(uint32_t block_id)
void Device::Sync()
{
if ( has_sync_thread )
{
pthread_mutex_lock(&sync_thread_lock);
while ( dirty_block || sync_in_transit )
pthread_cond_wait(&sync_thread_cond, &sync_thread_lock);
pthread_mutex_unlock(&sync_thread_lock);
return;
}
while ( dirty_block )
dirty_block->Sync();
}
void Device::SyncThread()
{
uint8_t transit_block_data[block_size];
pthread_mutex_lock(&sync_thread_lock);
while ( true )
{
while ( !(dirty_block || sync_thread_should_exit) )
pthread_cond_wait(&sync_thread_cond, &sync_thread_lock);
if ( sync_thread_should_exit )
break;
Block* block = dirty_block;
if ( block->next_dirty )
block->next_dirty->prev_dirty = NULL;
dirty_block = block->next_dirty;
block->next_dirty = NULL;
block->dirty = false;
block->is_in_transit = true;
sync_in_transit = true;
pthread_mutex_unlock(&sync_thread_lock);
pthread_mutex_lock(&block->modify_lock);
memcpy(transit_block_data, block->block_data, block_size);
pthread_mutex_unlock(&block->modify_lock);
off_t offset = (off_t) block_size * (off_t) block->block_id;
pwriteall(fd, transit_block_data, block_size, offset);
pthread_mutex_lock(&sync_thread_lock);
block->is_in_transit = false;
sync_in_transit = false;
pthread_cond_signal(&block->transit_done_cond);
if ( !dirty_block )
pthread_cond_signal(&sync_thread_idle_cond);
}
pthread_mutex_unlock(&sync_thread_lock);
}

View File

@ -34,6 +34,10 @@ public:
~Device();
public:
pthread_t sync_thread;
pthread_cond_t sync_thread_cond;
pthread_cond_t sync_thread_idle_cond;
pthread_mutex_t sync_thread_lock;
Block* mru_block;
Block* lru_block;
Block* dirty_block;
@ -42,12 +46,17 @@ public:
uint32_t block_size;
int fd;
bool write;
bool has_sync_thread;
bool sync_thread_should_exit;
bool sync_in_transit;
public:
void SpawnSyncThread();
Block* GetBlock(uint32_t block_id);
Block* GetBlockZeroed(uint32_t block_id);
Block* GetCachedBlock(uint32_t block_id);
void Sync();
void SyncThread();
};

View File

@ -668,6 +668,8 @@ int fsmarshall_main(const char* argv0,
setpgid(0, 0);
}
dev->SpawnSyncThread();
// Listen for filesystem messages and sync the filesystem every few seconds.
struct timespec last_sync_at;
clock_gettime(CLOCK_MONOTONIC, &last_sync_at);
@ -690,7 +692,8 @@ int fsmarshall_main(const char* argv0,
struct timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
if ( dev->write && 5 <= timespec_sub(now, last_sync_at).tv_sec )
if ( dev->write && !dev->has_sync_thread &&
5 <= timespec_sub(now, last_sync_at).tv_sec )
{
fs->Sync();
last_sync_at = now;