diff --git a/ext/Makefile b/ext/Makefile index fa67f2db..8f0e1eab 100644 --- a/ext/Makefile +++ b/ext/Makefile @@ -12,6 +12,7 @@ CXXFLAGS:=$(CXXFLAGS) -Wall -Wextra -fno-exceptions -fno-rtti LIBS:=$(LIBS) ifeq ($(HOST_IS_SORTIX),0) + PTHREAD_OPTION:=-pthread LIBS:=$(LIBS) -lfuse CPPFLAGS:=$(CPPFLAGS) -D_FILE_OFFSET_BITS=64 endif @@ -27,7 +28,7 @@ install: all install $(BINARIES) $(DESTDIR)$(BINDIR) extfs: *.cpp *.h - $(CXX) -std=gnu++11 $(CPPFLAGS) $(CXXFLAGS) *.cpp -o $@ $(LIBS) + $(CXX) $(PTHREAD_OPTION) -std=gnu++11 $(CPPFLAGS) $(CXXFLAGS) *.cpp -o $@ $(LIBS) clean: rm -f $(BINARIES) *.o diff --git a/ext/block.cpp b/ext/block.cpp index b75dcc64..b8400dfd 100644 --- a/ext/block.cpp +++ b/ext/block.cpp @@ -22,6 +22,8 @@ #include +#include +#include #include #include @@ -31,6 +33,8 @@ Block::Block(Device* device, uint32_t block_id) { + this->modify_lock = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; + this->transit_done_cond = PTHREAD_COND_INITIALIZER; this->prev_block = NULL; this->next_block = NULL; this->prev_hashed = NULL; @@ -41,7 +45,8 @@ Block::Block(Device* device, uint32_t block_id) this->reference_count = 1; this->block_id = block_id; this->dirty = false; - this->block_data = 0; + this->is_in_transit = false; + this->block_data = NULL; } Block::~Block() @@ -68,6 +73,15 @@ void Block::Unref() void Block::Sync() { + if ( device->has_sync_thread ) + { + pthread_mutex_lock(&device->sync_thread_lock); + while ( dirty || is_in_transit ) + pthread_cond_wait(&transit_done_cond, &device->sync_thread_lock); + pthread_mutex_unlock(&device->sync_thread_lock); + return; + } + if ( !dirty ) return; dirty = false; @@ -84,10 +98,14 @@ void Block::Sync() void Block::BeginWrite() { + assert(device->write); + pthread_mutex_lock(&modify_lock); } void Block::FinishWrite() { + pthread_mutex_unlock(&modify_lock); + pthread_mutex_lock(&device->sync_thread_lock); if ( !dirty ) { dirty = true; @@ -96,7 +114,9 @@ void Block::FinishWrite() if ( next_dirty ) next_dirty->prev_dirty = this; device->dirty_block = this; + pthread_cond_signal(&device->sync_thread_cond); } + pthread_mutex_unlock(&device->sync_thread_lock); Use(); } diff --git a/ext/block.h b/ext/block.h index 77c32f03..2b1be84e 100644 --- a/ext/block.h +++ b/ext/block.h @@ -32,6 +32,8 @@ public: ~Block(); public: + pthread_mutex_t modify_lock; + pthread_cond_t transit_done_cond; Block* prev_block; Block* next_block; Block* prev_hashed; @@ -42,6 +44,7 @@ public: size_t reference_count; uint32_t block_id; bool dirty; + bool is_in_transit; uint8_t* block_data; public: diff --git a/ext/device.cpp b/ext/device.cpp index 09c777e3..ed7426ef 100644 --- a/ext/device.cpp +++ b/ext/device.cpp @@ -24,6 +24,7 @@ #include #include +#include #include #include #include @@ -33,6 +34,12 @@ #include "device.h" #include "ioleast.h" +void* Device__SyncThread(void* ctx) +{ + ((Device*) ctx)->SyncThread(); + return NULL; +} + Device::Device(int fd, uint32_t block_size, bool write) { this->write = write; @@ -46,16 +53,38 @@ Device::Device(int fd, uint32_t block_size, bool write) this->dirty_block = NULL; for ( size_t i = 0; i < DEVICE_HASH_LENGTH; i++ ) hash_blocks[i] = NULL; + this->sync_thread_cond = PTHREAD_COND_INITIALIZER; + this->sync_thread_idle_cond = PTHREAD_COND_INITIALIZER; + this->sync_thread_lock = PTHREAD_MUTEX_INITIALIZER; + this->sync_in_transit = false; + this->has_sync_thread = false; } Device::~Device() { + if ( has_sync_thread ) + { + pthread_mutex_lock(&sync_thread_lock); + sync_thread_should_exit = true; + pthread_cond_signal(&sync_thread_cond); + pthread_mutex_unlock(&sync_thread_lock); + pthread_join(sync_thread, NULL); + has_sync_thread = false; + } Sync(); while ( mru_block ) delete mru_block; close(fd); } +void Device::SpawnSyncThread() +{ + if ( this->has_sync_thread ) + return; + this->has_sync_thread = write && + pthread_create(&this->sync_thread, NULL, Device__SyncThread, this) == 0; +} + Block* Device::GetBlock(uint32_t block_id) { if ( Block* block = GetCachedBlock(block_id) ) @@ -97,6 +126,56 @@ Block* Device::GetCachedBlock(uint32_t block_id) void Device::Sync() { + if ( has_sync_thread ) + { + pthread_mutex_lock(&sync_thread_lock); + while ( dirty_block || sync_in_transit ) + pthread_cond_wait(&sync_thread_cond, &sync_thread_lock); + pthread_mutex_unlock(&sync_thread_lock); + return; + } + while ( dirty_block ) dirty_block->Sync(); } + +void Device::SyncThread() +{ + uint8_t transit_block_data[block_size]; + pthread_mutex_lock(&sync_thread_lock); + while ( true ) + { + while ( !(dirty_block || sync_thread_should_exit) ) + pthread_cond_wait(&sync_thread_cond, &sync_thread_lock); + if ( sync_thread_should_exit ) + break; + + Block* block = dirty_block; + + if ( block->next_dirty ) + block->next_dirty->prev_dirty = NULL; + dirty_block = block->next_dirty; + block->next_dirty = NULL; + + block->dirty = false; + block->is_in_transit = true; + sync_in_transit = true; + + pthread_mutex_unlock(&sync_thread_lock); + + pthread_mutex_lock(&block->modify_lock); + memcpy(transit_block_data, block->block_data, block_size); + pthread_mutex_unlock(&block->modify_lock); + + off_t offset = (off_t) block_size * (off_t) block->block_id; + pwriteall(fd, transit_block_data, block_size, offset); + + pthread_mutex_lock(&sync_thread_lock); + block->is_in_transit = false; + sync_in_transit = false; + pthread_cond_signal(&block->transit_done_cond); + if ( !dirty_block ) + pthread_cond_signal(&sync_thread_idle_cond); + } + pthread_mutex_unlock(&sync_thread_lock); +} diff --git a/ext/device.h b/ext/device.h index 29a01a26..69a21856 100644 --- a/ext/device.h +++ b/ext/device.h @@ -34,6 +34,10 @@ public: ~Device(); public: + pthread_t sync_thread; + pthread_cond_t sync_thread_cond; + pthread_cond_t sync_thread_idle_cond; + pthread_mutex_t sync_thread_lock; Block* mru_block; Block* lru_block; Block* dirty_block; @@ -42,12 +46,17 @@ public: uint32_t block_size; int fd; bool write; + bool has_sync_thread; + bool sync_thread_should_exit; + bool sync_in_transit; public: + void SpawnSyncThread(); Block* GetBlock(uint32_t block_id); Block* GetBlockZeroed(uint32_t block_id); Block* GetCachedBlock(uint32_t block_id); void Sync(); + void SyncThread(); }; diff --git a/ext/fsmarshall.cpp b/ext/fsmarshall.cpp index 01e45b1a..9afc30c0 100644 --- a/ext/fsmarshall.cpp +++ b/ext/fsmarshall.cpp @@ -668,6 +668,8 @@ int fsmarshall_main(const char* argv0, setpgid(0, 0); } + dev->SpawnSyncThread(); + // Listen for filesystem messages and sync the filesystem every few seconds. struct timespec last_sync_at; clock_gettime(CLOCK_MONOTONIC, &last_sync_at); @@ -690,7 +692,8 @@ int fsmarshall_main(const char* argv0, struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); - if ( dev->write && 5 <= timespec_sub(now, last_sync_at).tv_sec ) + if ( dev->write && !dev->has_sync_thread && + 5 <= timespec_sub(now, last_sync_at).tv_sec ) { fs->Sync(); last_sync_at = now;