Rework inbox envelope

master
teknomunk 1 year ago
parent 391049c46f
commit df589eb9e0

@ -1 +1 @@
Subproject commit da39f4eda56b559e54c4f50364aa28efed7699f1
Subproject commit 1e91a6c2f2d504e74fe1cb325b49b1b0f5383bf0

@ -10,70 +10,28 @@
#include "model/account.h"
#include "model/ap/activity.h"
#include "model/ap/inbox_envelope.h"
#include "model/crypto/http_sign.h"
// Stdlib
#include <stdio.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
extern bool terminate;
static void io_copy( FILE* in, FILE* out )
{
char buffer[512];
for(;;) {
int count = fread( buffer, 1, 512, in );
if( count == 0 ) { return; }
fwrite( buffer, 1, count, out );
}
}
bool route_inbox( struct http_request* req )
{
// No subroutes
if( !http_request_route_term( req, "" ) ) { return false; }
if( !http_request_route_method( req, "POST" ) ) { return false; }
printf( "Queue inbox items for later processing\n" );
const char* signature = http_request_get_header(req,"Signature");
const char* date = http_request_get_header(req,"Date");
printf( "Signature: %s\nDate: %s", signature, date );
FILE* body = http_request_get_request_data(req);
uint64_t time_ns;
struct timespec ts;
clock_gettime( CLOCK_REALTIME, &ts );
time_ns = (uint64_t)ts.tv_sec * 1000000000 + (uint64_t)ts.tv_nsec;
int head = fs_list_get( "data/inbox/HEAD" );
head += 1;
fs_list_set( "data/inbox/HEAD", head );
// TODO: change to envelope_save()
char filename[512];
snprintf( filename, 512, "data/inbox/%d.json", head );
char tmp_filename[512+32];
snprintf( tmp_filename, 512+32, "%s.tmp-%d", filename, rand() );
FILE* f = fopen( tmp_filename, "w" );
if( !f ) {
printf( "Failed to open %s\n", tmp_filename );
return false;
if( envelope_create_from_request( req ) ) {
http_request_send_headers( req, 200, "text/plain", true );
} else {
http_request_send_headers( req, 400, "text/plain", true );
}
#define RENDER
#include "src/view/inbox_envelope.json.inc"
#undef RENDER
fclose(f);
rename( tmp_filename, filename );
http_request_send_headers( req, 200, "text/plain", true );
return true;
}
@ -184,44 +142,63 @@ bool validate_signature( struct ap_envelope* env )
bool process_one()
{
// Items requiring cleanup
struct ap_activity* act = NULL;
struct ap_envelope* env = NULL;
bool result = false;
int tail_pos = fs_list_get("data/inbox/TAIL");
int head_pos = fs_list_get("data/inbox/HEAD");
if( head_pos > tail_pos ) {
// We have data to process
printf( "Inbox has %d items pending processing...\n", (head_pos - tail_pos) );
if( head_pos <= tail_pos ) { return false; }
int id = tail_pos + 1;
// We have data to process
printf( "Inbox has %d items pending processing...\n", (head_pos - tail_pos) );
struct ap_envelope* env = ap_envelope_from_id(id);
bool step_tail = false;
int id = tail_pos + 1;
if( !env ) {
printf( "Failed to parse envelope+activity for data/inbox/%d.json\n", id );
return false;
}
validate_signature(env);
env = ap_envelope_from_id(id);
bool step_tail = false;
// Discard delete requests
if( env->activity.type == apat_delete ) {
step_tail = true;
goto step;
}
if( !env ) {
printf( "Failed to parse envelope+activity for data/inbox/%d.json\n", id );
return false;
}
validate_signature(env);
if( !env->validated ) { return false; }
// Load activity
FILE* f = fmemopen( env->body, strlen(env->body), "r" );
act = ap_activity_from_FILE(f);
fclose(f);
printf( "Processing %d\n", id );
step_tail = route_activity( &env->activity );
ap_envelope_free(env);
if( !act ) { goto failed; }
step:
printf( "handled: %c\n", step_tail ? 'T' : 'F' );
if( step_tail ) {
fs_list_set( "data/inbox/TAIL", id );
return true;
}
// Discard delete requests
if( act->type == apat_delete ) {
step_tail = true;
goto step;
}
return false;
if( !env->validated ) { goto failed; }
printf( "Processing %d\n", id );
step_tail = route_activity( act );
finished:
printf( "handled: %c\n", step_tail ? 'T' : 'F' );
if( step_tail ) {
fs_list_set( "data/inbox/TAIL", id );
result = true;
}
ap_activity_free(act);
ap_envelope_free(env);
return result;
failed:
result = false;
goto finished;
step:
result = true;
goto finished;
}
bool cleanup_inbox()

@ -1 +1 @@
Subproject commit daf2a866fe238c6f916bff85e776518e7ae2656f
Subproject commit e13505a72597bcaff2cba02726a90488bd0700e1

@ -1 +1 @@
Subproject commit 3b055f3b60cd0dde22c84e3bf5092d815f63ec13
Subproject commit 84141b7260faf667d733d571ea0e81ac42fb0907

@ -33,8 +33,8 @@ static bool context_reader( struct json_pull_parser* jpp, void* field_data, stru
struct collection c = {
.ptr = &ctx->extra,
.vtable = &array_iface,
.itable = &itv_string,
.vtable = &array_vtable,
.itable = &string_itable,
};
while( !json_pull_parser_end_array( jpp, &save ) ) {
@ -69,6 +69,7 @@ static bool context_reader( struct json_pull_parser* jpp, void* field_data, stru
printf( "unknown language: %s\n", lang );
return false;
}
free(lang);
if( !json_pull_parser_end_object(jpp,&save2) ) {
printf( "Object failed\n" );

@ -1,18 +1,40 @@
#define _GNU_SOURCE
#include "inbox_envelope.h"
// Submodules
#include "json/json.h"
#include "json/layout.h"
#include "http_server/http_request.h"
#include "ffdb/fs_list.h"
#include "collections/collection.h"
#include "collections/array.h"
#include <stdio.h>
#include <stdlib.h>
#include <stddef.h>
#include <string.h>
#include <stdint.h>
#include <time.h>
static struct json_object_field http_header_layout[] = {
{ "key", offsetof(struct http_header, key), true, &json_field_string },
{ "value", offsetof(struct http_header, value), true, &json_field_string },
{ NULL },
};
static struct json_field_type http_header_type = {
.reader = json_field_object_composite_reader,
.writer = json_field_object_composite_writer,
.size = sizeof( struct http_header ),
.layout = http_header_layout,
.alloc = NULL,
.free = NULL,
};
static struct json_object_field envelope_layout[] = {
{ "signature", offsetof(struct ap_envelope, signature), true, &json_field_string },
{ "date", offsetof(struct ap_envelope, date), true, &json_field_string },
{ "when", offsetof(struct ap_envelope, when), true, &json_field_string },
{ "body", offsetof(struct ap_envelope, activity), true, &json_field_object_composite, ap_activity_layout },
{ "headers", offsetof(struct ap_envelope, headers), true, &json_field_array_of, &http_header_type },
{ "body", offsetof(struct ap_envelope, body), true, &json_field_string },
{ "validated", offsetof(struct ap_envelope, validated), false, &json_field_bool },
{ NULL },
};
@ -33,12 +55,71 @@ struct ap_envelope* ap_envelope_from_id( int id )
return env;
}
void ap_envelope_free_composite( struct ap_envelope* env )
{
free(env->body);
free(env->headers.items);
free(env->when);
}
void ap_envelope_free( struct ap_envelope* env )
{
if( !env ) { return; }
free(env->signature);
free(env->date);
free(env->when);
ap_envelope_free_composite(env);
free(env);
}
static void io_copy( FILE* in, FILE* out )
{
char buffer[512];
for(;;) {
int count = fread( buffer, 1, 512, in );
if( count == 0 ) { return; }
fwrite( buffer, 1, count, out );
}
}
bool envelope_create_from_request( struct http_request* req )
{
struct ap_envelope env;
memset( &env, 0, sizeof(env) );
// Read body in
FILE* body = http_request_get_request_data(req);
size_t s;
FILE* mem = open_memstream(&env.body, &s);
io_copy(body,mem);
fclose(mem);
env.body = realloc(env.body,s+1);
env.body[s] = '\0';
// Create timestamp
uint64_t time_ns;
struct timespec ts;
clock_gettime( CLOCK_REALTIME, &ts );
time_ns = (uint64_t)ts.tv_sec * 1000000000 + (uint64_t)ts.tv_nsec;
asprintf( &env.when, "%llu", time_ns );
// Get request headers
struct collection c = {
.ptr = &env.headers,
.vtable = &array_vtable,
.itable = &http_header_itable,
};
http_request_copy_headers( req, c );
// Get a space in the inbox
int head = fs_list_get( "data/inbox/HEAD" );
head += 1;
fs_list_set( "data/inbox/HEAD", head );
// Setup filenames
char filename[512];
snprintf( filename, 512, "data/inbox/%d.json", head );
json_write_object_layout_to_file( filename, "\t", envelope_layout, &env );
ap_envelope_free_composite(&env);
return true;
}

@ -2,17 +2,28 @@
#include "activity.h"
#include "http_server/header.h"
#include <stdbool.h>
struct ap_envelope
{
char* signature;
char* date;
char* when;
struct {
struct http_header* items;
int count;
} headers;
bool validated;
struct ap_activity activity;
//struct ap_activity activity;
char* body;
};
struct ap_envelope* ap_envelope_from_id( int id );
void ap_envelope_free( struct ap_envelope* env );
struct http_request;
bool envelope_create_from_request( struct http_request* req );

@ -13,4 +13,5 @@ struct http_signature
bool http_signature_make( const char* inbox, struct crypto_keys* keys, struct http_signature* sign );
void http_signature_free( struct http_signature* sign );
bool http_signature_validate( struct http_signature* sign );

@ -1 +1 @@
Subproject commit 06e2b87da59115047d62e248f0c79d14ae6ec912
Subproject commit ebdd3f20466ffc6e37235982cf67136ee4d0e6be

@ -1,6 +0,0 @@
{
"signature": %( json_write_string( f, signature ); ),
"date": %( json_write_string( f, date ); ),
"when": "%llu{time_ns}",
"body": %( io_copy( body, f ); )
}
Loading…
Cancel
Save