Refactor init(8) communications.

This commit is contained in:
Jonas 'Sortie' Termansen 2024-05-06 00:30:16 +02:00
parent 4187705c9f
commit 6f70708ff5
1 changed files with 132 additions and 103 deletions

View File

@ -1,5 +1,5 @@
/*
* Copyright (c) 2011-2023 Jonas 'Sortie' Termansen.
* Copyright (c) 2011-2024 Jonas 'Sortie' Termansen.
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
@ -255,6 +255,24 @@ struct daemon_config
mode_t log_file_mode;
};
enum communication_type
{
COMMUNICATION_TYPE_OUTPUT,
COMMUNICATION_TYPE_READY,
};
struct communication
{
enum communication_type type;
size_t* index_ptr;
union
{
struct daemon* daemon;
struct connection* connection;
struct server* server;
};
};
static pid_t main_pid;
static pid_t forward_signal_pid = -1;
@ -297,8 +315,8 @@ static struct pollfd* pfds = NULL;
static size_t pfds_used = 0;
static size_t pfds_length = 0;
static struct daemon** pfds_daemon = NULL;
static size_t pfds_daemon_length = 0;
static struct communication* communications = NULL;
static size_t communications_length = 0;
static bool chain_location_made = false;
static char chain_location[] = "/tmp/fs.XXXXXX";
@ -1557,6 +1575,68 @@ static struct daemon_config* daemon_config_load(const char* name)
return daemon_config;
}
static bool communication_reserve(size_t required)
{
if ( pfds_length - pfds_used < required )
{
size_t old_length = pfds_length ? pfds_length : required;
struct pollfd* new_pfds =
reallocarray(pfds, old_length, 2 * sizeof(struct pollfd));
if ( !new_pfds )
return false;
pfds = new_pfds;
pfds_length = old_length * 2;
}
if ( communications_length - pfds_used < required )
{
size_t old_length =
communications_length ? communications_length : required;
struct communication* new_communications =
reallocarray(communications, old_length,
2 * sizeof(struct communication));
if ( !new_communications )
return false;
communications = new_communications;
communications_length = old_length * 2;
}
return true;
}
static void communication_register(struct communication* comm,
int fd,
short events)
{
assert(pfds_used < pfds_length);
assert(pfds_used < communications_length);
size_t index = pfds_used++;
struct pollfd* pfd = pfds + index;
memset(pfd, 0, sizeof(*pfd));
pfd->fd = fd;
pfd->events = events;
communications[index] = *comm;
*communications[index].index_ptr = index;
}
static void communication_unregister(size_t index)
{
assert(pfds_used <= pfds_length);
assert(pfds_used <= communications_length);
assert(index < pfds_used);
assert(index < communications_length);
// This function is relied on to not mess with any pollfds prior to the
// index, so it doesn't break a forward iteration on the pollfds.
size_t last_index = pfds_used - 1;
if ( index != last_index )
{
memcpy(pfds + index, pfds + last_index, sizeof(*pfds));
communications[index] = communications[last_index];
*communications[index].index_ptr = index;
}
pfds_used--;
memset(pfds + last_index, 0, sizeof(*pfds));
communications[last_index].daemon = NULL;
}
// TODO: Replace with better data structure.
static struct daemon* add_daemon(void)
{
@ -1953,46 +2033,6 @@ static void daemon_on_startup_error(struct daemon* daemon)
daemon_change_state_list(daemon, DAEMON_STATE_FINISHING);
}
static void daemon_register_pollfd(struct daemon* daemon,
int fd,
size_t* out_index,
short events)
{
assert(pfds_used < pfds_length);
assert(pfds_used < pfds_daemon_length);
size_t index = pfds_used++;
struct pollfd* pfd = pfds + index;
memset(pfd, 0, sizeof(*pfd));
pfd->fd = fd;
pfd->events = events;
pfds_daemon[index] = daemon;
*out_index = index;
}
static void daemon_unregister_pollfd(struct daemon* daemon, size_t index)
{
assert(pfds_used <= pfds_length);
assert(index < pfds_used);
assert(pfds_daemon[index] == daemon);
// This function is relied on to not mess with any pollfds prior to the
// index, so it doesn't break a forward iteration on the pollfds.
size_t last_index = pfds_used - 1;
if ( index != last_index )
{
memcpy(pfds + index, pfds + last_index, sizeof(*pfds));
pfds_daemon[index] = pfds_daemon[last_index];
if ( 0 <= pfds_daemon[index]->readyfd &&
pfds_daemon[index]->pfd_readyfd_index == last_index )
pfds_daemon[index]->pfd_readyfd_index = index;
if ( 0 <= pfds_daemon[index]->outputfd &&
pfds_daemon[index]->pfd_outputfd_index == last_index )
pfds_daemon[index]->pfd_outputfd_index = index;
}
pfds_used--;
memset(pfds + last_index, 0, sizeof(*pfds));
pfds_daemon[last_index] = NULL;
}
static void daemon_wait(struct daemon* daemon)
{
assert(daemon->state == DAEMON_STATE_SCHEDULED);
@ -2104,29 +2144,8 @@ static void daemon_start(struct daemon* daemon)
int readyfds[2];
if ( !daemon->need_tty )
{
size_t required_fds = 2;
if ( pfds_length - pfds_used < required_fds )
{
size_t old_length = pfds_length ? pfds_length : required_fds;
struct pollfd* new_pfds =
reallocarray(pfds, old_length, 2 * sizeof(struct pollfd));
if ( !new_pfds )
fatal("malloc");
pfds = new_pfds;
pfds_length = old_length * 2;
}
if ( pfds_daemon_length - pfds_used < required_fds )
{
size_t old_length =
pfds_daemon_length ? pfds_daemon_length : required_fds;
struct daemon** new_pfds_daemon =
reallocarray(pfds_daemon, old_length,
2 * sizeof(struct daemon*));
if ( !new_pfds_daemon )
fatal("malloc");
pfds_daemon = new_pfds_daemon;
pfds_daemon_length = old_length * 2;
}
if ( !communication_reserve(2) )
fatal("malloc");
if ( !log_begin(&daemon->log) )
{
// TODO: Mode where daemons are stopped if logging fails.
@ -2136,16 +2155,22 @@ static void daemon_start(struct daemon* daemon)
daemon->outputfd = outputfds[0];
fcntl(daemon->outputfd, F_SETFL, O_NONBLOCK);
// Setup the pollfd for the outputfd.
daemon_register_pollfd(daemon, daemon->outputfd,
&daemon->pfd_outputfd_index, POLLIN);
struct communication output_comm;
output_comm.type = COMMUNICATION_TYPE_OUTPUT;
output_comm.index_ptr = &daemon->pfd_outputfd_index;
output_comm.daemon = daemon;
communication_register(&output_comm, daemon->outputfd, POLLIN);
// Create the readyfd.
if ( pipe(readyfds) < 0 )
fatal("pipe");
daemon->readyfd = readyfds[0];
fcntl(daemon->readyfd, F_SETFL, O_NONBLOCK);
// Setup the pollfd for the readyfd.
daemon_register_pollfd(daemon, daemon->readyfd,
&daemon->pfd_readyfd_index, POLLIN);
struct communication ready_comm;
ready_comm.type = COMMUNICATION_TYPE_READY;
ready_comm.index_ptr = &daemon->pfd_readyfd_index;
ready_comm.daemon = daemon;
communication_register(&ready_comm, daemon->readyfd, POLLIN);
}
// TODO: This is not concurrency safe, build a environment array just for
// this daemon.
@ -2277,6 +2302,18 @@ static bool daemon_process_ready(struct daemon* daemon)
return true;
}
static bool daemon_on_ready_event(struct daemon* daemon, int revents)
{
if ( (revents & (POLLIN | POLLHUP)) && !daemon_process_ready(daemon) )
{
communication_unregister(daemon->pfd_readyfd_index);
close(daemon->readyfd);
daemon->readyfd = -1;
return false;
}
return true;
}
static bool daemon_process_output(struct daemon* daemon)
{
char data[4096];
@ -2291,6 +2328,18 @@ static bool daemon_process_output(struct daemon* daemon)
return true;
}
static bool daemon_on_output_event(struct daemon* daemon, int revents)
{
if ( (revents & (POLLIN | POLLHUP)) && !daemon_process_output(daemon) )
{
communication_unregister(daemon->pfd_outputfd_index);
close(daemon->outputfd);
daemon->outputfd = -1;
return false;
}
return true;
}
static void daemon_on_exit(struct daemon* daemon, int exit_code)
{
assert(daemon->state != DAEMON_STATE_FINISHING);
@ -2298,14 +2347,14 @@ static void daemon_on_exit(struct daemon* daemon, int exit_code)
daemon->exit_code = exit_code;
if ( 0 <= daemon->readyfd )
{
daemon_unregister_pollfd(daemon, daemon->pfd_readyfd_index);
communication_unregister(daemon->pfd_readyfd_index);
close(daemon->readyfd);
daemon->readyfd = -1;
}
if ( 0 <= daemon->outputfd )
{
daemon_process_output(daemon);
daemon_unregister_pollfd(daemon, daemon->pfd_outputfd_index);
communication_unregister(daemon->pfd_outputfd_index);
close(daemon->outputfd);
daemon->outputfd = -1;
}
@ -2441,39 +2490,19 @@ static void init(void)
if ( !pfd->revents )
continue;
nevents--;
struct daemon* daemon = pfds_daemon[i];
if ( 0 <= daemon->readyfd && pfd->fd == daemon->readyfd )
struct communication* comm = &communications[i];
bool closed = false;
switch ( comm->type )
{
if ( pfd->revents & (POLLIN | POLLHUP) )
{
if ( !daemon_process_ready(daemon) )
{
daemon_unregister_pollfd(daemon,
daemon->pfd_readyfd_index);
close(daemon->readyfd);
daemon->readyfd = -1;
i--; // Process this index again (something new there).
}
}
}
else if ( 0 <= daemon->outputfd && pfd->fd == daemon->outputfd )
{
if ( pfd->revents & (POLLIN | POLLHUP) )
{
if ( !daemon_process_output(daemon) )
{
daemon_unregister_pollfd(daemon,
daemon->pfd_outputfd_index);
close(daemon->outputfd);
daemon->outputfd = -1;
i--; // Process this index again (something new there).
}
}
}
else
{
assert(false);
case COMMUNICATION_TYPE_OUTPUT:
closed = daemon_on_output_event(comm->daemon, pfd->revents);
break;
case COMMUNICATION_TYPE_READY:
closed = daemon_on_ready_event(comm->daemon, pfd->revents);
break;
}
if ( closed )
i--; // Process this index again (something new there).
}
}