Refactor pipe implementation.

This commit is contained in:
Jonas 'Sortie' Termansen 2013-04-24 17:32:36 +02:00
parent 7f543dc910
commit 658defdbc3
2 changed files with 139 additions and 63 deletions

View File

@ -0,0 +1,51 @@
/*******************************************************************************
Copyright(C) Jonas 'Sortie' Termansen 2011, 2012, 2013.
This file is part of Sortix.
Sortix is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
Foundation, either version 3 of the License, or (at your option) any later
version.
Sortix is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
details.
You should have received a copy of the GNU General Public License along with
Sortix. If not, see <http://www.gnu.org/licenses/>.
sortix/kernel/pipe.h
Embeddedable one-way data stream.
*******************************************************************************/
#ifndef INCLUDE_SORTIX_KERNEL_PIPE_H
#define INCLUDE_SORTIX_KERNEL_PIPE_H
namespace Sortix {
class PipeChannel;
class PipeEndpoint
{
public:
PipeEndpoint();
~PipeEndpoint();
bool Connect(PipeEndpoint* destination);
void Disconnect();
ssize_t read(ioctx_t* ctx, uint8_t* buf, size_t count);
ssize_t write(ioctx_t* ctx, const uint8_t* buf, size_t count);
int poll(ioctx_t* ctx, PollNode* node);
private:
PipeChannel* channel;
bool reading;
};
} // namespace Sortix
#endif

View File

@ -41,8 +41,9 @@
#include <sortix/kernel/vnode.h>
#include <sortix/kernel/descriptor.h>
#include <sortix/kernel/dtable.h>
#include <sortix/kernel/poll.h>
#include <sortix/kernel/syscall.h>
#include <sortix/kernel/pipe.h>
#include <sortix/kernel/poll.h>
#include <sortix/signal.h>
#include <sortix/stat.h>
@ -63,8 +64,6 @@ class PipeChannel
public:
PipeChannel(uint8_t* buffer, size_t buffersize);
~PipeChannel();
void StartReading();
void StartWriting();
void CloseReading();
void CloseWriting();
void PerhapsShutdown();
@ -97,7 +96,7 @@ PipeChannel::PipeChannel(uint8_t* buffer, size_t buffersize)
this->buffer = buffer;
this->buffersize = buffersize;
bufferoffset = bufferused = 0;
anyreading = anywriting = false;
anyreading = anywriting = true;
}
PipeChannel::~PipeChannel()
@ -105,20 +104,6 @@ PipeChannel::~PipeChannel()
delete[] buffer;
}
void PipeChannel::StartReading()
{
ScopedLock lock(&pipelock);
assert(!anyreading);
anyreading = true;
}
void PipeChannel::StartWriting()
{
ScopedLock lock(&pipelock);
assert(!anywriting);
anywriting = true;
}
void PipeChannel::CloseReading()
{
anyreading = false;
@ -231,48 +216,44 @@ int PipeChannel::poll(ioctx_t* /*ctx*/, PollNode* node)
return errno = EAGAIN, -1;
}
class PipeEndpoint : public AbstractInode
PipeEndpoint::PipeEndpoint()
{
public:
PipeEndpoint(dev_t dev, uid_t owner, gid_t group, mode_t mode,
PipeChannel* channel, bool reading);
~PipeEndpoint();
virtual ssize_t read(ioctx_t* ctx, uint8_t* buf, size_t count);
virtual ssize_t write(ioctx_t* ctx, const uint8_t* buf, size_t count);
virtual int poll(ioctx_t* ctx, PollNode* node);
private:
kthread_mutex_t pipelock;
PipeChannel* channel;
bool reading;
};
PipeEndpoint::PipeEndpoint(dev_t dev, uid_t owner, gid_t group, mode_t mode,
PipeChannel* channel, bool reading)
{
inode_type = INODE_TYPE_STREAM;
this->dev = dev;
this->ino = (ino_t) this;
this->channel = channel;
this->reading = reading;
if ( reading )
channel->StartReading();
else
channel->StartWriting();
pipelock = KTHREAD_MUTEX_INITIALIZER;
this->stat_uid = owner;
this->stat_gid = group;
this->type = S_IFCHR;
this->stat_mode = (mode & S_SETABLE) | this->type;
channel = NULL;
reading = false;
}
PipeEndpoint::~PipeEndpoint()
{
if ( channel )
Disconnect();
}
bool PipeEndpoint::Connect(PipeEndpoint* destination)
{
assert(!channel);
assert(!destination->channel);
const size_t BUFFER_SIZE = 4096;
size_t size = BUFFER_SIZE;
uint8_t* buffer = new uint8_t[size];
if ( !buffer )
return false;
destination->reading = !(reading = false);
if ( !(destination->channel = channel = new PipeChannel(buffer, size)) )
{
delete[] buffer;
return false;
}
return true;
}
void PipeEndpoint::Disconnect()
{
assert(channel);
if ( reading )
channel->CloseReading();
else
channel->CloseWriting();
reading = false;
}
ssize_t PipeEndpoint::read(ioctx_t* ctx, uint8_t* buf, size_t count)
@ -292,9 +273,57 @@ int PipeEndpoint::poll(ioctx_t* ctx, PollNode* node)
return channel->poll(ctx, node);
}
namespace Pipe {
class PipeNode : public AbstractInode
{
public:
PipeNode(dev_t dev, uid_t owner, gid_t group, mode_t mode);
~PipeNode();
bool Connect(PipeNode* destination);
virtual ssize_t read(ioctx_t* ctx, uint8_t* buf, size_t count);
virtual ssize_t write(ioctx_t* ctx, const uint8_t* buf, size_t count);
virtual int poll(ioctx_t* ctx, PollNode* node);
const size_t BUFFER_SIZE = 4096UL;
private:
PipeEndpoint endpoint;
};
bool PipeNode::Connect(PipeNode* destination)
{
return endpoint.Connect(&destination->endpoint);
}
PipeNode::PipeNode(dev_t dev, uid_t owner, gid_t group, mode_t mode)
{
inode_type = INODE_TYPE_STREAM;
this->dev = dev;
this->ino = (ino_t) this;
this->stat_uid = owner;
this->stat_gid = group;
this->type = S_IFCHR;
this->stat_mode = (mode & S_SETABLE) | this->type;
}
PipeNode::~PipeNode()
{
}
ssize_t PipeNode::read(ioctx_t* ctx, uint8_t* buf, size_t count)
{
return endpoint.read(ctx, buf, count);
}
ssize_t PipeNode::write(ioctx_t* ctx, const uint8_t* buf, size_t count)
{
return endpoint.write(ctx, buf, count);
}
int PipeNode::poll(ioctx_t* ctx, PollNode* node)
{
return endpoint.poll(ctx, node);
}
namespace Pipe {
static int sys_pipe(int pipefd[2])
{
@ -303,18 +332,14 @@ static int sys_pipe(int pipefd[2])
uid_t gid = process->gid;
mode_t mode = 0600;
size_t buffersize = BUFFER_SIZE;
uint8_t* buffer = new uint8_t[buffersize];
if ( !buffer ) return -1;
PipeChannel* channel = new PipeChannel(buffer, buffersize);
if ( !channel ) { delete[] buffer; return -1; }
Ref<Inode> recv_inode(new PipeEndpoint(0, uid, gid, mode, channel, true));
if ( !recv_inode ) { delete channel; return -1; }
Ref<Inode> send_inode(new PipeEndpoint(0, uid, gid, mode, channel, false));
Ref<PipeNode> recv_inode(new PipeNode(0, uid, gid, mode));
if ( !recv_inode ) return -1;
Ref<PipeNode> send_inode(new PipeNode(0, uid, gid, mode));
if ( !send_inode ) return -1;
if ( !send_inode->Connect(recv_inode.Get()) )
return -1;
Ref<Vnode> recv_vnode(new Vnode(recv_inode, Ref<Vnode>(NULL), 0, 0));
Ref<Vnode> send_vnode(new Vnode(send_inode, Ref<Vnode>(NULL), 0, 0));
if ( !recv_vnode || !send_vnode ) return -1;