Fix polling Unix sockets in both incoming and outgoing directions.

This commit is contained in:
Jonas 'Sortie' Termansen 2014-04-29 22:16:07 +02:00
parent b30878e816
commit 6774c79ba6
6 changed files with 96 additions and 41 deletions

View File

@ -1,6 +1,6 @@
/*******************************************************************************
Copyright(C) Jonas 'Sortie' Termansen 2012.
Copyright(C) Jonas 'Sortie' Termansen 2012, 2014.
This file is part of Sortix.
@ -57,7 +57,8 @@ class PollNode
friend class PollChannel;
public:
PollNode() { next = NULL; prev = NULL; channel = NULL; }
PollNode() { next = NULL; prev = NULL; channel = NULL; master = this; slave = NULL; }
~PollNode() { delete slave; }
private:
PollNode* next;
@ -65,6 +66,8 @@ private:
public:
PollChannel* channel;
PollNode* master;
PollNode* slave;
public:
kthread_mutex_t* wake_mutex;
@ -75,6 +78,7 @@ public:
public:
void Cancel();
PollNode* CreateSlave();
};

View File

@ -332,8 +332,9 @@ int AbstractInode::poll(ioctx_t* /*ctx*/, PollNode* /*node*/)
if ( inode_type == INODE_TYPE_FILE )
{
// TODO: Correct bits?
node->revents |= (POLLIN | POLLOUT) & node->events;
// TODO: What if not listening on events (POLLIN | POLLOUT)?
if ( !((POLLIN | POLLOUT) & node->events) )
return errno = EAGAIN, -1;
node->master->revents |= (POLLIN | POLLOUT) & node->events;
return 0;
}
#endif

View File

@ -445,7 +445,7 @@ int LogTerminal::poll(ioctx_t* /*ctx*/, PollNode* node)
short ret_status = PollEventStatus() & node->events;
if ( ret_status )
{
node->revents |= ret_status;
node->master->revents |= ret_status;
return 0;
}
poll_channel.Register(node);

View File

@ -289,9 +289,14 @@ ssize_t StreamSocket::write(ioctx_t* ctx, const uint8_t* buf, size_t count)
int StreamSocket::poll(ioctx_t* ctx, PollNode* node)
{
if ( is_connected )
// TODO: The poll API is broken, can't provide multiple sources on a poll
// node. For now, polling the read channel should be most useful.
return incoming.poll(ctx, node);
{
PollNode* slave = node->CreateSlave();
if ( !slave )
return -1;
int incoming_result = incoming.poll(ctx, node);
int outgoing_result = outgoing.poll(ctx, slave);
return incoming_result == 0 || outgoing_result == 0 ? 0 : -1;
}
if ( is_listening )
return manager->AcceptPoll(this, ctx, node);
return errno = ENOTCONN, -1;
@ -360,8 +365,9 @@ void Manager::Unlisten(StreamSocket* socket)
int Manager::AcceptPoll(StreamSocket* socket, ioctx_t* /*ctx*/, PollNode* node)
{
ScopedLock lock(&manager_lock);
if ( socket->first_pending )
return (node->revents |= POLLIN | POLLRDNORM), 0;
if ( socket->first_pending &&
((POLLIN | POLLRDNORM) & node->events) )
return node->master->revents |= ((POLLIN | POLLRDNORM) & node->events), 0;
socket->accept_poll_channel.Register(node);
return errno = EAGAIN, -1;
}

View File

@ -1,6 +1,6 @@
/*******************************************************************************
Copyright(C) Jonas 'Sortie' Termansen 2011, 2012, 2013.
Copyright(C) Jonas 'Sortie' Termansen 2011, 2012, 2013, 2014.
This file is part of Sortix.
@ -62,13 +62,16 @@ public:
void PerhapsShutdown();
ssize_t read(ioctx_t* ctx, uint8_t* buf, size_t count);
ssize_t write(ioctx_t* ctx, const uint8_t* buf, size_t count);
int poll(ioctx_t* ctx, PollNode* node);
int read_poll(ioctx_t* ctx, PollNode* node);
int write_poll(ioctx_t* ctx, PollNode* node);
private:
short PollEventStatus();
short ReadPollEventStatus();
short WritePollEventStatus();
private:
PollChannel poll_channel;
PollChannel read_poll_channel;
PollChannel write_poll_channel;
kthread_mutex_t pipelock;
kthread_cond_t readcond;
kthread_cond_t writecond;
@ -114,7 +117,8 @@ void PipeChannel::CloseWriting()
void PipeChannel::PerhapsShutdown()
{
kthread_mutex_lock(&pipelock);
poll_channel.Signal(PollEventStatus());
read_poll_channel.Signal(ReadPollEventStatus());
write_poll_channel.Signal(WritePollEventStatus());
bool deleteme = !anyreading & !anywriting;
kthread_mutex_unlock(&pipelock);
if ( deleteme )
@ -145,7 +149,8 @@ ssize_t PipeChannel::read(ioctx_t* ctx, uint8_t* buf, size_t count)
bufferoffset = (bufferoffset + amount) % buffersize;
bufferused -= amount;
kthread_cond_broadcast(&writecond);
poll_channel.Signal(PollEventStatus());
read_poll_channel.Signal(ReadPollEventStatus());
write_poll_channel.Signal(WritePollEventStatus());
return amount;
}
@ -178,34 +183,48 @@ ssize_t PipeChannel::write(ioctx_t* ctx, const uint8_t* buf, size_t count)
ctx->copy_from_src(buffer + writeoffset, buf, amount);
bufferused += amount;
kthread_cond_broadcast(&readcond);
poll_channel.Signal(PollEventStatus());
read_poll_channel.Signal(ReadPollEventStatus());
write_poll_channel.Signal(WritePollEventStatus());
return amount;
}
short PipeChannel::PollEventStatus()
short PipeChannel::ReadPollEventStatus()
{
short status = 0;
if ( !anywriting )
if ( !anywriting && !bufferused )
status |= POLLHUP;
if ( !anyreading )
status |= POLLERR;
if ( bufferused )
status |= POLLIN | POLLRDNORM;
if ( bufferused != buffersize )
return status;
}
short PipeChannel::WritePollEventStatus()
{
short status = 0;
if ( !anyreading )
status |= POLLERR;
if ( anyreading && bufferused != buffersize )
status |= POLLOUT | POLLWRNORM;
return status;
}
int PipeChannel::poll(ioctx_t* /*ctx*/, PollNode* node)
int PipeChannel::read_poll(ioctx_t* /*ctx*/, PollNode* node)
{
ScopedLockSignal lock(&pipelock);
short ret_status = PollEventStatus() & node->events;
short ret_status = ReadPollEventStatus() & node->events;
if ( ret_status )
{
node->revents |= ret_status;
return 0;
}
poll_channel.Register(node);
return node->master->revents |= ret_status, 0;
read_poll_channel.Register(node);
return errno = EAGAIN, -1;
}
int PipeChannel::write_poll(ioctx_t* /*ctx*/, PollNode* node)
{
ScopedLockSignal lock(&pipelock);
short ret_status = WritePollEventStatus() & node->events;
if ( ret_status )
return node->master->revents |= ret_status, 0;
write_poll_channel.Register(node);
return errno = EAGAIN, -1;
}
@ -263,7 +282,8 @@ ssize_t PipeEndpoint::write(ioctx_t* ctx, const uint8_t* buf, size_t count)
int PipeEndpoint::poll(ioctx_t* ctx, PollNode* node)
{
return channel->poll(ctx, node);
return reading ? channel->read_poll(ctx, node)
: channel->write_poll(ctx, node);
}
class PipeNode : public AbstractInode

View File

@ -1,6 +1,6 @@
/*******************************************************************************
Copyright(C) Jonas 'Sortie' Termansen 2012.
Copyright(C) Jonas 'Sortie' Termansen 2012, 2014.
This file is part of Sortix.
@ -79,20 +79,24 @@ void PollChannel::Signal(short events)
void PollChannel::SignalUnlocked(short events)
{
for ( PollNode* node = first; node; node = node->next )
if ( node->revents |= events & (node->events | POLL__ONLY_REVENTS) )
{
PollNode* target = node->master;
if ( target->revents |= events & (target->events | POLL__ONLY_REVENTS) )
{
ScopedLock node_lock(node->wake_mutex);
if ( !*node->woken )
ScopedLock target_lock(target->wake_mutex);
if ( !*target->woken )
{
*node->woken = true;
kthread_cond_signal(node->wake_cond);
*target->woken = true;
kthread_cond_signal(target->wake_cond);
}
}
}
}
void PollChannel::Register(PollNode* node)
{
ScopedLock lock(&channel_lock);
assert(!node->channel);
node->channel = this;
if ( !first )
first = last = node,
@ -124,6 +128,23 @@ void PollNode::Cancel()
{
if ( channel )
channel->Unregister(this);
if ( slave )
slave->Cancel();
}
PollNode* PollNode::CreateSlave()
{
PollNode* new_slave = new PollNode();
if ( !new_slave )
return NULL;
new_slave->wake_mutex = wake_mutex;
new_slave->wake_cond = wake_cond;
new_slave->events = events;
new_slave->revents = revents;
new_slave->woken = woken;
new_slave->master = master;
new_slave->slave = slave;
return slave = new_slave;
}
namespace Poll {
@ -191,7 +212,7 @@ static int sys_ppoll(struct pollfd* user_fds, nfds_t nfds,
bool unexpected_error = false;
nfds_t reqs = nfds;
for ( reqs = 0; !unexpected_error && reqs < nfds; reqs++ )
for ( reqs = 0; !unexpected_error && reqs < nfds; )
{
PollNode* node = nodes + reqs;
if ( fds[reqs].fd < 0 )
@ -202,16 +223,18 @@ static int sys_ppoll(struct pollfd* user_fds, nfds_t nfds,
// user-space immediately? What if conditions are already true on
// some of the file descriptors (those we have processed so far?)?
node->revents = 0;
reqs++;
continue;
}
Ref<Descriptor> desc = process->GetDescriptor(fds[reqs].fd);
if ( !desc ) { unexpected_error = true; break; }
node->events = fds[reqs].events;
if ( !desc ) { self_woken = unexpected_error = true; break; }
node->events = fds[reqs].events | POLL__ONLY_REVENTS;
node->revents = 0;
node->wake_mutex = &wakeup_mutex;
node->wake_cond = &wakeup_cond;
node->woken = (bool*) &remote_woken;
// TODO: How should erors be handled?
reqs++;
// TODO: How should errors be handled?
if ( desc->poll(&ctx, node) == 0 )
self_woken = true;
else if ( errno != EAGAIN )
@ -231,7 +254,8 @@ static int sys_ppoll(struct pollfd* user_fds, nfds_t nfds,
kthread_mutex_unlock(&wakeup_mutex);
for ( nfds_t i = 0; i < reqs; i++ )
nodes[i].Cancel();
if ( 0 <= fds[i].fd )
nodes[i].Cancel();
if ( !unexpected_error )
{