WIP: request and connection machine

Signed-off-by: Ava Affine <ava@sunnypup.io>
This commit is contained in:
Ava Apples Affine 2026-05-04 20:29:35 +00:00
parent 19fafc6a8c
commit ebeea248f1
2 changed files with 276 additions and 4 deletions

1
.gitignore vendored
View file

@ -1,3 +1,4 @@
libmuninn.so
libmuninn.dylib
nginx/.cache
.cache

277
muninn.c
View file

@ -1,12 +1,82 @@
#include <ngx_event.h>
#include <ngx_config.h>
#include <ngx_core.h>
#include <sys/socket.h>
#include <netinet/in.h>
static ngx_command_t ngx_muninn_commands[] = {};
#define NGX_MUNIN_MOD 0x42000000
#define NGX_MUNIN_CONF 0x00000086
#define MUN_DEFAULT_CONN_POOL_SZ 1024
typedef struct {
ngx_url_t url;
} mun_dns_listener;
typedef struct {
mun_dns_listener *listener;
ngx_int_t connection_pool_ct;
} mun_ngx_conf;
typedef struct {
ngx_uint_t read_state;
ngx_log_t *log;
ngx_chain_t *in, *out;
//ngx_pool_t *pool; use conn->pool;
ngx_connection_t *conn;
} mun_dns_request;
// Configuration functions
static char *mun_conf_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static char *mun_conf_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static void *mun_create_conf(ngx_cycle_t *cycle);
static char *mun_init_conf(ngx_cycle_t *cycle, void *conf);
static char *mun_conf_chk_connection_ct(ngx_conf_t *cf, void *post, void *data);
static char *mun_conf_mk_listener(ngx_conf_t *cf, void *post, void *data);
// Connection lifecycle
static void mun_init_connection(ngx_connection_t *c);
static void mun_close_connection(ngx_connection_t *c);
static void mun_handle_conn_readable(ngx_event_t *ev);
static void mun_handle_conn_writeable(ngx_event_t *ev);
// Request lifecycle functions
static ngx_int_t mun_read_request(mun_dns_request *r);
static ngx_conf_post_t mun_conf_listen_post = { mun_conf_mk_listener };
static ngx_conf_post_t mun_conf_conn_ct_post = { mun_conf_chk_connection_ct };
static ngx_command_t ngx_muninn_commands[] = {
{ ngx_string("muninn"),
NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
mun_conf_server,
0, 0, NULL },
{ ngx_string("dns_listen"),
NGX_MUNIN_CONF|NGX_DIRECT_CONF|NGX_CONF_1MORE,
mun_conf_listen,
0, 0,
//offsetof(mun_ngx_conf, listener),
&mun_conf_listen_post },
{ ngx_string("dns_connection_pool_count"),
NGX_MUNIN_CONF|NGX_DIRECT_CONF|NGX_CONF_1MORE,
ngx_conf_set_num_slot,
0,
offsetof(mun_ngx_conf, connection_pool_ct),
&mun_conf_conn_ct_post },
ngx_null_command
};
static ngx_core_module_t ngx_muninn_module_ctx = {
ngx_string("muninn"),
NULL, // create conf
NULL // init conf
mun_create_conf,
mun_init_conf,
};
ngx_module_t ngx_muninn_core_module = {
@ -24,4 +94,205 @@ ngx_module_t ngx_muninn_core_module = {
NGX_MODULE_V1_PADDING
};
static ngx_int_t (*mun_read_handler_list[])(mun_dns_request *) = {
};
static const char *mun_handler_action_tags[] = {
};
static ngx_uint_t mun_read_handler_adjacency[][7] = {
/* Entries:
* { NGX_OK next handler
* NGX_ERROR next handler
* NGX_AGAIN next handler
* NGX_BUSY next handler
* NGX_DONE next handler
* NGX_DECLINED next handler
* NGX_ABORT next handler }
*
* next handler = index into mun_read_handler_list
* ALL INDEXES BETWEEN THIS AND mun_read_handler_list MUST Match
*/
};
static void *mun_create_conf(ngx_cycle_t *cycle) {
mun_ngx_conf *conf;
if (!(conf = ngx_pcalloc(cycle->pool, sizeof(mun_ngx_conf))))
return NULL;
conf->connection_pool_ct = NGX_CONF_UNSET;
return conf;
}
static char *mun_init_conf(ngx_cycle_t *cycle, void *in) {
mun_ngx_conf *conf = in;
ngx_conf_init_value(conf->connection_pool_ct, MUN_DEFAULT_CONN_POOL_SZ);
return NGX_CONF_OK;
}
static char *mun_conf_chk_connection_ct(
ngx_conf_t *cf,
void *post,
void *data
) {
ngx_int_t *ct = data;
if (*ct > 0) return NGX_CONF_OK;
ngx_conf_log_error(NGX_LOG_ERR, cf, 0,
"dns_connection_pool_count must be a positive whole number");
return NGX_CONF_ERROR;
}
static char *mun_conf_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
cf->module_type = NGX_MUNIN_MOD;
cf->cmd_type = NGX_MUNIN_CONF;
return ngx_conf_parse(cf, NULL);
}
static char *mun_conf_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *in) {
ngx_url_t *u;
ngx_int_t argc;
ngx_str_t *argv;
mun_ngx_conf *conf = in;
argc = cf->args->nelts;
argv = cf->args->elts;
if (conf->listener) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0,
"only one dns_listen allowed");
return NGX_CONF_ERROR;
}
if (argc < 1) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0,
"dns_listen needs at least one argument");
return NGX_CONF_ERROR;
}
if (!(conf->listener = ngx_pcalloc(cf->pool, sizeof(mun_dns_listener)))) {
return NGX_CONF_ERROR;
}
u = &conf->listener->url;
u->url = argv[1];
u->listen = 1;
u->default_port = 53;
if (ngx_parse_url(cf->pool, u) != NGX_OK) {
if (u->err) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"%s in \"%V\" of the \"dns_listen\" directive",
u->err, &u->url);
}
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}
static char *mun_conf_mk_listener(
ngx_conf_t *cf,
void *post,
void *data
) {
mun_ngx_conf *conf = data;
mun_dns_listener *lis = conf->listener;
ngx_listening_t *listening;
listening = ngx_create_listening(cf, (struct sockaddr *) &lis->url.sockaddr,
lis->url.socklen);
if (!listening) return NGX_CONF_ERROR;
listening->addr_ntop = 1;
listening->pool_size = conf->connection_pool_ct;
listening->handler = mun_init_connection;
listening->logp = cf->log;
listening->log.data = &listening->addr_text;
listening->log.handler = ngx_accept_log_error;
listening->sndbuf = 1;
listening->rcvbuf = 1;
listening->backlog = conf->connection_pool_ct;
listening->protocol = IPPROTO_UDP;
listening->type = SOCK_DGRAM;
// listener added to cycle at creation
return NGX_CONF_OK;
}
static void mun_init_connection(ngx_connection_t *c) {
// create new connection, request, and somewhow schedule request processing
mun_dns_request *r;
if (!(r = ngx_pcalloc(c->pool, sizeof(mun_dns_request)))) {
mun_close_connection(c);
return;
}
c->data = r;
c->log->connection = c->number;
c->log_error = NGX_ERROR_INFO;
c->read->handler = mun_handle_conn_readable;
c->write->handler = mun_handle_conn_writeable;
if (c->read->ready) ngx_post_event(c->read, &ngx_posted_events);
}
static void mun_close_connection(ngx_connection_t *c) {
c->destroyed = 1;
ngx_close_connection(c);
ngx_destroy_pool(c->pool);
}
static void mun_handle_conn_readable(ngx_event_t *ev) {
ngx_connection_t *c;
mun_dns_request *r;
ngx_int_t ret, idx;
c = ev->data;
r = c->data;
// TODO: read data into r->in
ret = mun_read_handler_list[r->read_state](r);
idx = mun_read_handler_adjacency[r->read_state][0 - ret];
r->read_state = idx;
switch (ret) {
case NGX_ERROR:
case NGX_BUSY:
case NGX_DECLINED:
case NGX_ABORT:
ngx_post_event(c->write, &ngx_posted_events);
/* fallthrough */
case NGX_AGAIN:
return;
}
// case NGX_OK or NGX_DONE (may still have data to process in next phase)
ngx_post_event(ev, &ngx_posted_events);
}
static void mun_handle_conn_writeable(ngx_event_t *ev) {
ngx_connection_t *c;
mun_dns_request *r;
c = ev->data;
r = c->data;
/* this event posted for one of two reasons:
* 1. mun_handle_request_read has reached end of request processing
* 2. socket newly available after incomplete write
*/
// TODO: write r->out data across connection
// TODO: delete fully written bufs
// TODO: if (r->out && write FULL) return without posting
// TODO: if (write not FULL && !r->out) delete req and close connection
}