Muninn/muninn.c
Ava Affine 4f1311d2fe WIP: request and connection machine
Signed-off-by: Ava Affine <ava@sunnypup.io>
2026-05-13 14:46:55 +00:00

338 lines
9.3 KiB
C

#include "ngx_buf.h"
#pragma clang diagnostic ignored "-Wdangling-else"
#include <ngx_event.h>
#include <ngx_config.h>
#include <ngx_core.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define MUN_DNS_LEGACY_UDP_SZ 512
#define NGX_MUNIN_MOD 0x42000000
#define NGX_MUNIN_CONF 0x00000086
#define MUN_DEFAULT_CONN_POOL_SZ 1024
typedef struct {
ngx_url_t url;
} mun_dns_listener;
typedef struct {
mun_dns_listener *listener;
ngx_int_t connection_pool_ct;
} mun_ngx_conf;
typedef struct {
ngx_uint_t read_state;
ngx_log_t *log;
ngx_chain_t *in, *out;
//ngx_pool_t *pool; use conn->pool;
ngx_connection_t *conn;
} mun_dns_request;
// Configuration functions
static char *mun_conf_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static char *mun_conf_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
static void *mun_create_conf(ngx_cycle_t *cycle);
static char *mun_init_conf(ngx_cycle_t *cycle, void *conf);
static char *mun_conf_chk_connection_ct(ngx_conf_t *cf, void *post, void *data);
static char *mun_conf_mk_listener(ngx_conf_t *cf, void *post, void *data);
// Connection lifecycle
static void mun_init_connection(ngx_connection_t *c);
static void mun_close_connection(ngx_connection_t *c);
static void mun_handle_conn_readable(ngx_event_t *ev);
static void mun_handle_conn_writeable(ngx_event_t *ev);
// Request lifecycle functions
static ngx_int_t mun_read_request(mun_dns_request *r);
static ngx_conf_post_t mun_conf_listen_post = { mun_conf_mk_listener };
static ngx_conf_post_t mun_conf_conn_ct_post = { mun_conf_chk_connection_ct };
static ngx_command_t ngx_muninn_commands[] = {
{ ngx_string("muninn"),
NGX_MAIN_CONF|NGX_CONF_BLOCK|NGX_CONF_NOARGS,
mun_conf_server,
0, 0, NULL },
{ ngx_string("dns_listen"),
NGX_MUNIN_CONF|NGX_DIRECT_CONF|NGX_CONF_1MORE,
mun_conf_listen,
0, 0,
//offsetof(mun_ngx_conf, listener),
&mun_conf_listen_post },
{ ngx_string("dns_connection_pool_count"),
NGX_MUNIN_CONF|NGX_DIRECT_CONF|NGX_CONF_1MORE,
ngx_conf_set_num_slot,
0,
offsetof(mun_ngx_conf, connection_pool_ct),
&mun_conf_conn_ct_post },
ngx_null_command
};
static ngx_core_module_t ngx_muninn_module_ctx = {
ngx_string("muninn"),
mun_create_conf,
mun_init_conf,
};
ngx_module_t ngx_muninn_core_module = {
NGX_MODULE_V1,
&ngx_muninn_module_ctx, /* module context */
ngx_muninn_commands, /* module directives */
NGX_CORE_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static ngx_int_t (*mun_read_handler_list[])(mun_dns_request *) = {
};
static const char *mun_handler_action_tags[] = {
};
static ngx_uint_t mun_read_handler_adjacency[][7] = {
/* Entries:
* { NGX_OK next handler
* NGX_ERROR next handler
* NGX_AGAIN next handler
* NGX_BUSY next handler
* NGX_DONE next handler
* NGX_DECLINED next handler
* NGX_ABORT next handler }
*
* next handler = index into mun_read_handler_list
* ALL INDEXES BETWEEN THIS AND mun_read_handler_list MUST Match
*/
};
static void *mun_create_conf(ngx_cycle_t *cycle) {
mun_ngx_conf *conf;
if (!(conf = ngx_pcalloc(cycle->pool, sizeof(mun_ngx_conf))))
return NULL;
conf->connection_pool_ct = NGX_CONF_UNSET;
return conf;
}
static char *mun_init_conf(ngx_cycle_t *cycle, void *in) {
mun_ngx_conf *conf = in;
ngx_conf_init_value(conf->connection_pool_ct, MUN_DEFAULT_CONN_POOL_SZ);
return NGX_CONF_OK;
}
static char *mun_conf_chk_connection_ct(
ngx_conf_t *cf,
void *post,
void *data
) {
ngx_int_t *ct = data;
if (*ct > 0) return NGX_CONF_OK;
ngx_conf_log_error(NGX_LOG_ERR, cf, 0,
"dns_connection_pool_count must be a positive whole number");
return NGX_CONF_ERROR;
}
static char *mun_conf_server(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) {
cf->module_type = NGX_MUNIN_MOD;
cf->cmd_type = NGX_MUNIN_CONF;
return ngx_conf_parse(cf, NULL);
}
static char *mun_conf_listen(ngx_conf_t *cf, ngx_command_t *cmd, void *in) {
ngx_url_t *u;
ngx_int_t argc;
ngx_str_t *argv;
mun_ngx_conf *conf = in;
argc = cf->args->nelts;
argv = cf->args->elts;
if (conf->listener) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0,
"only one dns_listen allowed");
return NGX_CONF_ERROR;
}
if (argc < 1) {
ngx_conf_log_error(NGX_LOG_ERR, cf, 0,
"dns_listen needs at least one argument");
return NGX_CONF_ERROR;
}
if (!(conf->listener = ngx_pcalloc(cf->pool, sizeof(mun_dns_listener)))) {
return NGX_CONF_ERROR;
}
u = &conf->listener->url;
u->url = argv[1];
u->listen = 1;
u->default_port = 53;
if (ngx_parse_url(cf->pool, u) != NGX_OK) {
if (u->err) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"%s in \"%V\" of the \"dns_listen\" directive",
u->err, &u->url);
}
return NGX_CONF_ERROR;
}
return NGX_CONF_OK;
}
static char *mun_conf_mk_listener(
ngx_conf_t *cf,
void *post,
void *data
) {
mun_ngx_conf *conf = data;
mun_dns_listener *lis = conf->listener;
ngx_listening_t *listening;
listening = ngx_create_listening(cf, (struct sockaddr *) &lis->url.sockaddr,
lis->url.socklen);
if (!listening) return NGX_CONF_ERROR;
listening->addr_ntop = 1;
listening->pool_size = conf->connection_pool_ct;
listening->handler = mun_init_connection;
listening->logp = cf->log;
listening->log.data = &listening->addr_text;
listening->log.handler = ngx_accept_log_error;
listening->sndbuf = 1;
listening->rcvbuf = 1;
listening->backlog = conf->connection_pool_ct;
listening->protocol = IPPROTO_UDP;
listening->type = SOCK_DGRAM;
// listener added to cycle at creation
return NGX_CONF_OK;
}
static void mun_init_connection(ngx_connection_t *c) {
// create new connection, request, and somewhow schedule request processing
mun_dns_request *r;
if (!(r = ngx_pcalloc(c->pool, sizeof(mun_dns_request)))) {
mun_close_connection(c);
return;
}
c->data = r;
c->log->connection = c->number;
c->log_error = NGX_ERROR_INFO;
c->read->handler = mun_handle_conn_readable;
c->write->handler = mun_handle_conn_writeable;
if (c->read->ready) ngx_post_event(c->read, &ngx_posted_events);
}
static void mun_close_connection(ngx_connection_t *c) {
c->destroyed = 1;
ngx_close_connection(c);
ngx_destroy_pool(c->pool);
}
static void mun_handle_conn_readable(ngx_event_t *ev) {
ngx_connection_t *c;
mun_dns_request *r;
ngx_chain_t *cur;
ngx_int_t ret, idx;
c = ev->data;
r = c->data;
ret = 0;
if (!r->in) {
if (!(r->in = ngx_alloc_chain_link(r->conn->pool))) {
ret = NGX_ERROR;
goto content;
}
if (!(r->in->buf = ngx_create_temp_buf(r->conn->pool,
MUN_DNS_LEGACY_UDP_SZ))) {
ret = NGX_ERROR;
goto content;
}
cur = r->in;
cur->buf->last_buf = 1;
} else {
for (cur = r->in; cur && !cur->buf->last_buf; cur = cur->next) {}
if (!cur) {
ngx_log_error(NGX_LOG_ERR, r->conn->log, 0,
"muninn: failed to find next buffer to read into");
}
}
// TODO: make sure at least MUN_DNS_LEGACY_UDP_SZ space allocated and ready
// TODO: call r->recv_chain(c, r->in, 0)
// TODO: figure out how to handle the number of bytes returned
// TODO: handle c->read->error;
content:
if (!ret) ret = mun_read_handler_list[r->read_state](r);
idx = mun_read_handler_adjacency[r->read_state][0 - ret];
r->read_state = idx;
switch (ret) {
case NGX_OK:
case NGX_DONE:
// we could still have read data to process in next stage
break;
case NGX_ERROR:
case NGX_BUSY:
case NGX_DECLINED:
case NGX_ABORT:
ngx_post_event(c->write, &ngx_posted_events);
/* fallthrough */
case NGX_AGAIN:
// TODO: maybe handle r->eof if set ?
return;
}
ngx_post_event(ev, &ngx_posted_events);
}
static void mun_handle_conn_writeable(ngx_event_t *ev) {
ngx_connection_t *c;
mun_dns_request *r;
c = ev->data;
r = c->data;
/* this event posted for one of two reasons:
* 1. mun_handle_request_read has reached end of request processing
* 2. socket newly available after incomplete write
*/
// TODO: write r->out data across connection
// TODO: delete fully written bufs
// TODO: if (r->out && write FULL) return without posting
// TODO: if (write not FULL && !r->out) delete req and close connection
}