vici: Add a fully asynchronous IPC socket segmenting messages on/from stream

This commit is contained in:
Martin Willi 2014-01-21 17:53:15 +01:00
parent 1e39454214
commit 8457da7528
8 changed files with 1538 additions and 0 deletions

View File

@ -14,6 +14,8 @@ plugin_LTLIBRARIES = libstrongswan-vici.la
endif
libstrongswan_vici_la_SOURCES = \
vici_socket.h vici_socket.c \
vici_message.h vici_message.c \
vici_plugin.h vici_plugin.c
libstrongswan_vici_la_LDFLAGS = -module -avoid-version
@ -23,6 +25,10 @@ TESTS = vici_tests
check_PROGRAMS = $(TESTS)
vici_tests_SOURCES = \
suites/test_socket.c \
suites/test_message.c \
vici_socket.c \
vici_message.c \
vici_tests.h vici_tests.c
vici_tests_CFLAGS = \

View File

@ -0,0 +1,266 @@
/*
* Copyright (C) 2014 Martin Willi
* Copyright (C) 2014 revosec AG
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include <test_suite.h>
#include "../vici_message.h"
#include <unistd.h>
static char blob[] = {
0xd3,0xe5,0xee,0x37,0x7b,0x96,0x2f,0x3e,0x5f,0x3e,0x91,0xea,0x38,0x44,0xba,0x6c,
0x75,0xc8,0x42,0x32,0xaf,0x7a,0x66,0x43,0x33,0x92,0xd2,0xef,0x7d,0x91,0x7b,0x59,
0x9f,0x9f,0xd1,0x44,0xb6,0x1e,0x8c,0xd1,0xc5,0xa0,0xd9,0xe4,0xf2,0x31,0xfd,0x7b,
0x5b,0x56,0xa7,0xfe,0x63,0x0d,0xcb,0x31,0x74,0xd8,0xd6,0x4a,0x42,0x3a,0x88,0xf3,
0x79,0xf9,0x41,0xa6,0xc0,0x64,0x53,0x31,0x42,0xe2,0xd4,0x4a,0x22,0x5f,0x3f,0x99,
0xe0,0x1a,0xcb,0x93,0x26,0xd0,0xec,0xac,0x90,0x97,0x0a,0x5f,0x69,0x86,0xf1,0xda,
0xfc,0xa7,0xac,0xd0,0xd8,0x81,0xcf,0x7d,0x47,0x22,0xbe,0xbf,0x00,0x9b,0x6b,0x86,
0x92,0x89,0xbe,0x7f,0x74,0x13,0x53,0xf1,0x4c,0x2b,0xc9,0xe1,0x39,0xd6,0xfc,0x50,
0x3f,0x00,0xfb,0x76,0x42,0xa6,0xa4,0x70,0xfc,0x93,0x17,0x4a,0x35,0xce,0x5e,0x78,
0x41,0x88,0x24,0x50,0x78,0xf2,0x38,0x08,0xff,0x40,0xef,0x61,0xbb,0xbf,0x16,0xff,
0x0b,0xf6,0x33,0x21,0xcb,0x48,0xbd,0x7d,0xd1,0x73,0xfa,0x6d,0xd6,0xab,0xde,0x69,
0x63,0x17,0xdb,0x52,0xe2,0x75,0x4b,0xb7,0x1e,0xf0,0x8a,0x55,0x4f,0x70,0x8d,0x18,
0xe5,0x38,0x6a,0x9f,0xb8,0x06,0xb5,0x91,0x90,0x2b,0xc5,0x67,0xa9,0x12,0xe5,0xf3,
0x48,0x2f,0x80,0x03,0xa1,0xa0,0xfc,0x43,0xe9,0x0f,0x83,0x2b,0xbc,0x7c,0xa8,0x3b,
0x6c,0xc1,0xc8,0x72,0x5f,0x87,0x63,0x77,0x93,0x9b,0xe2,0xd7,0x4e,0xe6,0x65,0xa1,
0x69,0x00,0xda,0xf8,0xb4,0x61,0xee,0xb7,0x20,0xe7,0x2a,0x35,0x23,0xf0,0x37,0x4b,
0x67,0xcf,0x8d,0x85,0x72,0x22,0x6d,0x7a,0xb2,0x96,0xff,0x49,0xf4,0x94,0x3e,0x7e,
0x87,0x26,0x5d,0x34,0x05,0x26,0x60,0x9b,0x89,0xfe,0xf9,0x91,0xd3,0x03,0xe7,0x8a,
0x03,0xf6,0x4e,0xbf,0x68,0x13,0xc6,0xf2,0x7b,0x9c,0xe6,0x36,0x1b,0xe2,0x22,0x44,
0xb1,0x19,0x34,0x5f,0xe8,0x44,0x48,0x3a,0x19,0xe4,0xbd,0xb0,0x4e,0xb5,0x2c,0x40,
0x55,0x39,0xe6,0x4c,0xd5,0x68,0x34,0x72,0x6b,0x6d,0x88,0xce,0x7e,0x77,0x95,0x17,
0x2e,0x68,0x3f,0x0e,0x9d,0x70,0x9a,0x22,0xfa,0x19,0xcc,0x15,0x9d,0xba,0xaa,0xec,
0xb1,0x67,0x19,0x51,0xce,0x60,0x9a,0x38,0xf8,0xa7,0x4e,0xe3,0x25,0x47,0x1e,0x1d,
0x30,0x76,0x91,0x8f,0x4d,0x13,0x59,0x06,0x2f,0x01,0x10,0x95,0xdb,0x08,0x7c,0x46,
0xed,0x47,0xa1,0x19,0x4c,0x46,0xd1,0x3a,0x3f,0x88,0x7a,0x63,0xae,0x29,0x13,0x42,
0xe9,0x17,0xe8,0xa9,0x95,0xfc,0xd1,0xea,0xfa,0x59,0x90,0xfe,0xb7,0xbb,0x7f,0x61,
0x1b,0xcb,0x3d,0x12,0x99,0x96,0x3e,0x23,0x23,0xec,0x3a,0x4d,0x86,0x86,0x74,0xef,
0x38,0xa6,0xdc,0x3a,0x83,0x85,0xf8,0xb8,0xad,0x5b,0x33,0x94,0x4d,0x0e,0x68,0xbc,
0xf2,0xc7,0x6f,0x84,0x18,0x1e,0x5a,0x66,0x1f,0x6c,0x98,0x33,0xda,0xde,0x9e,0xda,
0x82,0xd0,0x56,0x44,0x47,0x08,0x0c,0x07,0x81,0x9d,0x8b,0x64,0x16,0x73,0x9d,0x80,
0x54,0x9c,0x4c,0x42,0xde,0x27,0x4e,0x97,0xb2,0xcf,0x48,0xaf,0x7e,0x85,0xc1,0xcd,
0x6a,0x4d,0x04,0x40,0x89,0xa3,0x9d,0x4e,0x89,0x56,0x60,0x31,0x1f,0x3f,0x49,0x16,
};
typedef struct {
vici_type_t type;
char *name;
chunk_t data;
} endecode_test_t;
static endecode_test_t endecode_test_simple[] = {
{ VICI_SECTION_START, "section1", {} },
{ VICI_KEY_VALUE, "key1", { "value1", 6 } },
{ VICI_KEY_VALUE, "key2", { "value2", 6 } },
{ VICI_SECTION_END, NULL, {} },
{ VICI_END, NULL, {} },
};
static endecode_test_t endecode_test_nested[] = {
{ VICI_SECTION_START, "section1", {} },
{ VICI_SECTION_START, "section2", {} },
{ VICI_SECTION_START, "section3", {} },
{ VICI_KEY_VALUE, "key1", { "value1", 6 } },
{ VICI_SECTION_START, "section4", {} },
{ VICI_KEY_VALUE, "key2", { "value2", 6 } },
{ VICI_SECTION_END, NULL, {} },
{ VICI_SECTION_END, NULL, {} },
{ VICI_SECTION_END, NULL, {} },
{ VICI_KEY_VALUE, "key3", { "value3", 6 } },
{ VICI_SECTION_END, NULL, {} },
{ VICI_END, NULL, {} },
};
static endecode_test_t endecode_test_list[] = {
{ VICI_SECTION_START, "section1", {} },
{ VICI_LIST_START, "list1", {} },
{ VICI_LIST_ITEM, NULL, { "item1", 5 } },
{ VICI_LIST_ITEM, NULL, { "item2", 5 } },
{ VICI_LIST_END, NULL, {} },
{ VICI_KEY_VALUE, "key1", { "value1", 6 } },
{ VICI_SECTION_END, NULL, {} },
{ VICI_END, NULL, {} },
};
static endecode_test_t endecode_test_blobs[] = {
{ VICI_KEY_VALUE, "key1", { blob, countof(blob) } },
{ VICI_SECTION_START, "section1", {} },
{ VICI_LIST_START, "list1", {} },
{ VICI_LIST_ITEM, NULL, { blob, countof(blob) } },
{ VICI_LIST_ITEM, NULL, { blob, countof(blob) } },
{ VICI_LIST_END, NULL, {} },
{ VICI_KEY_VALUE, "key2", { blob, countof(blob) } },
{ VICI_SECTION_END, NULL, {} },
{ VICI_END, NULL, {} },
};
static endecode_test_t *endecode_tests[] = {
endecode_test_simple,
endecode_test_nested,
endecode_test_list,
endecode_test_blobs,
};
typedef struct {
enumerator_t public;
endecode_test_t *next;
} endecode_enum_t;
static bool endecode_enumerate(endecode_enum_t *this, vici_type_t *type,
char **name, chunk_t *data)
{
if (this->next)
{
*type = this->next->type;
*name = this->next->name;
*data = this->next->data;
if (this->next->type == VICI_END)
{
this->next = NULL;
}
else
{
this->next++;
}
return TRUE;
}
return FALSE;
}
static enumerator_t *endecode_create_enumerator(endecode_test_t *test)
{
endecode_enum_t *enumerator;
INIT(enumerator,
.public = {
.enumerate = (void*)endecode_enumerate,
.destroy = (void*)free,
},
.next = test,
);
return &enumerator->public;
}
static void compare_vici(enumerator_t *parse, enumerator_t *tmpl)
{
vici_type_t type, ttype;
char *name, *tname;
chunk_t data, tdata;;
while (TRUE)
{
ck_assert(parse->enumerate(parse, &type, &name, &data));
ck_assert(tmpl->enumerate(tmpl, &ttype, &tname, &tdata));
ck_assert_int_eq(type, ttype);
switch (type)
{
case VICI_END:
return;
case VICI_SECTION_START:
case VICI_LIST_START:
ck_assert(streq(name, tname));
break;
case VICI_LIST_ITEM:
ck_assert(chunk_equals(data, tdata));
break;
case VICI_KEY_VALUE:
ck_assert(streq(name, tname));
ck_assert(chunk_equals(data, tdata));
break;
case VICI_SECTION_END:
case VICI_LIST_END:
break;
default:
ck_assert(FALSE);
break;
}
}
}
START_TEST(test_endecode)
{
enumerator_t *parse, *tmpl;
vici_message_t *m;
chunk_t data;
tmpl = endecode_create_enumerator(endecode_tests[_i]);
m = vici_message_create_from_enumerator(tmpl);
ck_assert(m);
data = chunk_clone(m->get_encoding(m));
tmpl = endecode_create_enumerator(endecode_tests[_i]);
parse = m->create_enumerator(m);
ck_assert(parse);
compare_vici(parse, tmpl);
tmpl->destroy(tmpl);
parse->destroy(parse);
m->destroy(m);
m = vici_message_create_from_data(data, TRUE);
ck_assert(m);
tmpl = endecode_create_enumerator(endecode_tests[_i]);
parse = m->create_enumerator(m);
ck_assert(parse);
compare_vici(parse, tmpl);
tmpl->destroy(tmpl);
parse->destroy(parse);
m->destroy(m);
}
END_TEST
START_TEST(test_vararg)
{
enumerator_t *parse, *tmpl;
vici_message_t *m;
m = vici_message_create_from_args(
VICI_SECTION_START, "section1",
VICI_LIST_START, "list1",
VICI_LIST_ITEM, chunk_from_str("item1"),
VICI_LIST_ITEM, chunk_from_str("item2"),
VICI_LIST_END,
VICI_KEY_VALUE, "key1", chunk_from_str("value1"),
VICI_SECTION_END,
VICI_END);
ck_assert(m);
tmpl = endecode_create_enumerator(endecode_test_list);
parse = m->create_enumerator(m);
ck_assert(parse);
compare_vici(parse, tmpl);
m->destroy(m);
tmpl->destroy(tmpl);
parse->destroy(parse);
}
END_TEST
Suite *message_suite_create()
{
Suite *s;
TCase *tc;
s = suite_create("vici message");
tc = tcase_create("enumerator en/decode");
tcase_add_loop_test(tc, test_endecode, 0, countof(endecode_tests));
suite_add_tcase(s, tc);
tc = tcase_create("vararg encode");
tcase_add_test(tc, test_vararg);
suite_add_tcase(s, tc);
return s;
}

View File

@ -0,0 +1,131 @@
/*
* Copyright (C) 2014 Martin Willi
* Copyright (C) 2014 revosec AG
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include <test_suite.h>
#include "../vici_socket.h"
#include <unistd.h>
typedef struct {
vici_socket_t *s;
int disconnect;
int bytes;
u_int id;
} test_data_t;
static void echo_inbound(void *user, u_int id, chunk_t buf)
{
test_data_t *data = user;
ck_assert_int_eq(data->id, id);
/* count number of bytes, including the header */
data->bytes += buf.len + sizeof(u_int16_t);
/* echo back data chunk */
data->s->send(data->s, id, chunk_clone(buf));
}
static void echo_connect(void *user, u_int id)
{
test_data_t *data = user;
data->id = id;
}
static void echo_disconnect(void *user, u_int id)
{
test_data_t *data = user;
ck_assert(id == data->id);
data->disconnect++;
}
static struct {
char *uri;
u_int chunksize;
} echo_tests[] = {
{ "tcp://127.0.0.1:6543", ~0 },
{ "tcp://127.0.0.1:6543", 1 },
{ "tcp://127.0.0.1:6543", 2 },
{ "tcp://127.0.0.1:6543", 3 },
{ "tcp://127.0.0.1:6543", 7 },
{ "unix:///tmp/strongswan-tests-vici-socket", ~0 },
{ "unix:///tmp/strongswan-tests-vici-socket", 1 },
{ "unix:///tmp/strongswan-tests-vici-socket", 2 },
{ "unix:///tmp/strongswan-tests-vici-socket", 3 },
{ "unix:///tmp/strongswan-tests-vici-socket", 7 },
};
START_TEST(test_echo)
{
stream_t *c;
test_data_t data = {};
chunk_t x, m = chunk_from_chars(
0x00,0x00,
0x00,0x01, 0x01,
0x00,0x05, 0x11,0x12,0x13,0x14,0x15,
0x00,0x0A, 0x21,0x22,0x23,0x24,0x25,0x26,0x27,0x28,0x29,0x02A,
);
char buf[m.len];
u_int16_t len;
lib->processor->set_threads(lib->processor, 4);
/* create socket, connect with stream */
data.s = vici_socket_create(echo_tests[_i].uri, echo_inbound, echo_connect,
echo_disconnect, &data);
ck_assert(data.s != NULL);
c = lib->streams->connect(lib->streams, echo_tests[_i].uri);
ck_assert(c != NULL);
/* write arbitrary chunks of messages blob depending on test */
x = m;
while (x.len)
{
len = min(x.len, echo_tests[_i].chunksize);
ck_assert(c->write_all(c, x.ptr, len));
x = chunk_skip(x, len);
}
/* verify echo */
ck_assert(c->read_all(c, buf, sizeof(buf)));
ck_assert(chunk_equals(m, chunk_from_thing(buf)));
/* wait for completion */
c->destroy(c);
while (data.disconnect != 1)
{
usleep(1000);
}
/* check that we got correct number of bytes/invocations */
ck_assert_int_eq(data.bytes, m.len);
data.s->destroy(data.s);
}
END_TEST
Suite *socket_suite_create()
{
Suite *s;
TCase *tc;
s = suite_create("vici socket");
tc = tcase_create("echo");
tcase_add_loop_test(tc, test_echo, 0, countof(echo_tests));
suite_add_tcase(s, tc);
return s;
}

View File

@ -0,0 +1,404 @@
/*
* Copyright (C) 2014 Martin Willi
* Copyright (C) 2014 revosec AG
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include "vici_message.h"
#include <bio/bio_reader.h>
#include <bio/bio_writer.h>
typedef struct private_vici_message_t private_vici_message_t;
/**
* Private data of an vici_message_t object.
*/
struct private_vici_message_t {
/**
* Public vici_message_t interface.
*/
vici_message_t public;
/**
* Message encoding
*/
chunk_t encoding;
/**
* Free encoding during destruction?
*/
bool cleanup;
};
ENUM(vici_type_names, VICI_SECTION_START, VICI_END,
"section-start",
"section-end",
"key-value",
"list-start",
"list-item",
"list-end",
"end"
);
/**
* See header.
*/
bool vici_stringify(chunk_t chunk, char *buf, size_t size)
{
if (!chunk_printable(chunk, NULL, 0))
{
return FALSE;
}
snprintf(buf, size, "%.*s", (int)chunk.len, chunk.ptr);
return TRUE;
}
/**
* Verify the occurence of a given type for given section/list nesting
*/
static bool verify_type(vici_type_t type, int section, bool list)
{
if (list)
{
if (type != VICI_LIST_END && type != VICI_LIST_ITEM)
{
DBG1(DBG_ENC, "'%N' within list", vici_type_names, type);
return FALSE;
}
}
else
{
if (type == VICI_LIST_ITEM || type == VICI_LIST_END)
{
DBG1(DBG_ENC, "'%N' outside list", vici_type_names, type);
return FALSE;
}
}
if (type == VICI_SECTION_END && section == 0)
{
DBG1(DBG_ENC, "'%N' outside of section", vici_type_names, type);
return FALSE;
}
if (type == VICI_END)
{
if (section)
{
DBG1(DBG_ENC, "'%N' within section", vici_type_names, type);
return FALSE;
}
if (list)
{
DBG1(DBG_ENC, "'%N' within list", vici_type_names, type);
return FALSE;
}
}
return TRUE;
}
/**
* Enumerator parsing message
*/
typedef struct {
/* implements enumerator */
enumerator_t public;
/** reader to parse from */
bio_reader_t *reader;
/** section nesting level */
int section;
/** currently parsing list? */
bool list;
/** string currently enumerating */
char name[257];
} parse_enumerator_t;
METHOD(enumerator_t, parse_enumerate, bool,
parse_enumerator_t *this, vici_type_t *out, char **name, chunk_t *value)
{
u_int8_t type;
chunk_t data;
if (!this->reader->read_uint8(this->reader, &type))
{
*out = VICI_END;
return TRUE;
}
if (!verify_type(type, this->section, this->list))
{
return FALSE;
}
switch (type)
{
case VICI_SECTION_START:
if (!this->reader->read_data8(this->reader, &data) ||
!vici_stringify(data, this->name, sizeof(this->name)))
{
DBG1(DBG_ENC, "invalid '%N' encoding", vici_type_names, type);
return FALSE;
}
*name = this->name;
this->section++;
break;
case VICI_SECTION_END:
this->section--;
break;
case VICI_KEY_VALUE:
if (!this->reader->read_data8(this->reader, &data) ||
!vici_stringify(data, this->name, sizeof(this->name)) ||
!this->reader->read_data16(this->reader, value))
{
DBG1(DBG_ENC, "invalid '%N' encoding", vici_type_names, type);
return FALSE;
}
*name = this->name;
break;
case VICI_LIST_START:
if (!this->reader->read_data8(this->reader, &data) ||
!vici_stringify(data, this->name, sizeof(this->name)))
{
DBG1(DBG_ENC, "invalid '%N' encoding", vici_type_names, type);
return FALSE;
}
*name = this->name;
this->list = TRUE;
break;
case VICI_LIST_ITEM:
this->reader->read_data16(this->reader, value);
break;
case VICI_LIST_END:
this->list = FALSE;
break;
case VICI_END:
return TRUE;
default:
DBG1(DBG_ENC, "unknown encoding type: %u", type);
return FALSE;
}
*out = type;
return TRUE;
}
METHOD(enumerator_t, parse_destroy, void,
parse_enumerator_t *this)
{
this->reader->destroy(this->reader);
free(this);
}
METHOD(vici_message_t, create_enumerator, enumerator_t*,
private_vici_message_t *this)
{
parse_enumerator_t *enumerator;
INIT(enumerator,
.public = {
.enumerate = (void*)_parse_enumerate,
.destroy = _parse_destroy,
},
.reader = bio_reader_create(this->encoding),
);
return &enumerator->public;
}
METHOD(vici_message_t, get_encoding, chunk_t,
private_vici_message_t *this)
{
return this->encoding;
}
METHOD(vici_message_t, destroy, void,
private_vici_message_t *this)
{
if (this->cleanup)
{
chunk_clear(&this->encoding);
}
free(this);
}
/**
* See header
*/
vici_message_t *vici_message_create_from_data(chunk_t data, bool cleanup)
{
private_vici_message_t *this;
INIT(this,
.public = {
.create_enumerator = _create_enumerator,
.get_encoding = _get_encoding,
.destroy = _destroy,
},
.encoding = data,
.cleanup = cleanup,
);
return &this->public;
}
/**
* Write from enumerator to writer
*/
static bool write_from_enumerator(bio_writer_t *writer,
enumerator_t *enumerator)
{
vici_type_t type;
char *name;
chunk_t value;
int section = 0;
bool list = FALSE;
while (enumerator->enumerate(enumerator, &type, &name, &value))
{
if (!verify_type(type, section, list))
{
return FALSE;
}
if (type != VICI_END)
{
writer->write_uint8(writer, type);
}
switch (type)
{
case VICI_SECTION_START:
writer->write_data8(writer, chunk_from_str(name));
section++;
break;
case VICI_SECTION_END:
section--;
break;
case VICI_KEY_VALUE:
writer->write_data8(writer, chunk_from_str(name));
writer->write_data16(writer, value);
break;
case VICI_LIST_START:
writer->write_data8(writer, chunk_from_str(name));
list = TRUE;
break;
case VICI_LIST_ITEM:
writer->write_data16(writer, value);
break;
case VICI_LIST_END:
list = FALSE;
break;
case VICI_END:
return TRUE;
default:
return FALSE;
}
}
return FALSE;
}
/**
* See header
*/
vici_message_t *vici_message_create_from_enumerator(enumerator_t *enumerator)
{
vici_message_t *message = NULL;
bio_writer_t *writer;
chunk_t data;
writer = bio_writer_create(0);
if (write_from_enumerator(writer, enumerator))
{
data = chunk_clone(writer->get_buf(writer));
message = vici_message_create_from_data(data, TRUE);
}
enumerator->destroy(enumerator);
writer->destroy(writer);
return message;
}
/**
* Enumerator for va_list arguments
*/
typedef struct {
/* implements enumerator */
enumerator_t public;
/** arguments to enumerate */
va_list args;
/** first type, if not yet processed */
vici_type_t *first;
} va_enumerator_t;
METHOD(enumerator_t, va_enumerate, bool,
va_enumerator_t *this, vici_type_t *out, char **name, chunk_t *value)
{
vici_type_t type;
if (this->first)
{
type = *this->first;
this->first = NULL;
}
else
{
type = va_arg(this->args, vici_type_t);
}
switch (type)
{
case VICI_SECTION_END:
case VICI_LIST_END:
case VICI_END:
break;
case VICI_LIST_START:
case VICI_SECTION_START:
*name = va_arg(this->args, char*);
break;
case VICI_KEY_VALUE:
*name = va_arg(this->args, char*);
*value = va_arg(this->args, chunk_t);
break;
case VICI_LIST_ITEM:
*value = va_arg(this->args, chunk_t);
break;
default:
return FALSE;
}
*out = type;
return TRUE;
}
METHOD(enumerator_t, va_destroy, void,
va_enumerator_t *this)
{
va_end(this->args);
free(this);
}
/**
* See header
*/
vici_message_t *vici_message_create_from_args(vici_type_t type, ...)
{
va_enumerator_t *enumerator;
INIT(enumerator,
.public = {
.enumerate = (void*)_va_enumerate,
.destroy = _va_destroy,
},
.first = &type,
);
va_start(enumerator->args, type);
return vici_message_create_from_enumerator(&enumerator->public);
}

View File

@ -0,0 +1,125 @@
/*
* Copyright (C) 2014 Martin Willi
* Copyright (C) 2014 revosec AG
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
/**
* @defgroup vici_message vici_message
* @{ @ingroup vici_dispatcher
*/
#ifndef VICI_MESSAGE_H_
#define VICI_MESSAGE_H_
#include <library.h>
typedef struct vici_message_t vici_message_t;
typedef enum vici_type_t vici_type_t;
/**
* Vici message encoding types
*/
enum vici_type_t {
/** begin of new section, argument is section name as char* */
VICI_SECTION_START = 0,
/** end of current section, no arguments */
VICI_SECTION_END,
/** key/value, arguments are key as char*, value as chunk_t */
VICI_KEY_VALUE,
/** list start, argument is list name as char* */
VICI_LIST_START,
/** list item, argument is item value as chunk_t */
VICI_LIST_ITEM,
/** end of list, no arguments */
VICI_LIST_END,
/** end of argument list, no arguments (never encoded) */
VICI_END
};
/**
* Names for vici encoding types
*/
extern enum_name_t *vici_type_names;
/**
* Vici message representation, encoding/decoding routines.
*/
struct vici_message_t {
/**
* Create an enumerator over message contents.
*
* The enumerator takes a fixed list of arguments, but depending on the
* type may set not all of them. It returns VICI_END as last argument
* to indicate the message end, and returns FALSE if parsing the message
* failed.
*
* @return enumerator over (vici_type_t, char*, chunk_t)
*/
enumerator_t* (*create_enumerator)(vici_message_t *this);
/**
* Get encoded message.
*
* @return message data, points to internal data
*/
chunk_t (*get_encoding)(vici_message_t *this);
/**
* Destroy a vici_message_t.
*/
void (*destroy)(vici_message_t *this);
};
/**
* Create a vici_message from encoded data.
*
* @param data message encoding
* @param cleanup TRUE to free data during
* @return message representation
*/
vici_message_t *vici_message_create_from_data(chunk_t data, bool cleanup);
/**
* Create a vici_message from an enumerator.
*
* The enumerator uses the same signature as the enumerator returned
* by create_enumerator(), and gets destroyed by this function. It should
* return VICI_END to close the message, return FALSE to indicate a failure.
*
* @param enumerator enumerator over (vici_type_t, char*, chunk_t)
* @return message representation, NULL on error
*/
vici_message_t *vici_message_create_from_enumerator(enumerator_t *enumerator);
/**
* Create vici message from a variable argument list.
*
* @param first first type beginning message
* @param ... vici_type_t and args, terminated by VICI_END
* @return message representation, NULL on error
*/
vici_message_t *vici_message_create_from_args(vici_type_t type, ...);
/**
* Check if a chunk has a printable string, and print it to buf.
*
* @param chunkt chunk containing potential string
* @param buf buffer to write string to
* @param size size of buf
* @return TRUE if printable and string written to buf
*/
bool vici_stringify(chunk_t chunk, char *buf, size_t size);
#endif /** VICI_MESSAGE_H_ @}*/

View File

@ -0,0 +1,513 @@
/*
* Copyright (C) 2014 Martin Willi
* Copyright (C) 2014 revosec AG
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
#include "vici_socket.h"
#include <daemon.h>
#include <threading/mutex.h>
#include <threading/rwlock.h>
#include <collections/array.h>
#include <collections/linked_list.h>
#include <processing/jobs/callback_job.h>
#include <errno.h>
#include <string.h>
typedef struct private_vici_socket_t private_vici_socket_t;
/**
* Private members of vici_socket_t
*/
struct private_vici_socket_t {
/**
* public functions
*/
vici_socket_t public;
/**
* Inbound message callback
*/
vici_inbound_cb_t inbound;
/**
* Client connect callback
*/
vici_connect_cb_t connect;
/**
* Client disconnect callback
*/
vici_disconnect_cb_t disconnect;
/**
* Next client connection identifier
*/
u_int nextid;
/**
* User data for callbacks
*/
void *user;
/**
* Service accepting vici connections
*/
stream_service_t *service;
/**
* Client connections, as entry_t
*/
linked_list_t *connections;
/**
* rwlock for client connection list
*/
rwlock_t *lock;
};
/**
* Data to securely reference an entry
*/
typedef struct {
/* reference to socket instance */
private_vici_socket_t *this;
/** connection identifier to disconnect */
u_int id;
} entry_data_t;
/**
* Partially processed message
*/
typedef struct {
/** bytes of length header sent/received */
u_char hdrlen;
/** bytes of length header */
char hdr[sizeof(u_int16_t)];
/** send/receive buffer on heap */
chunk_t buf;
/** bytes sent/received in buffer */
u_int16_t done;
} msg_buf_t;
/**
* Client connection entry
*/
typedef struct {
/** reference to socket */
private_vici_socket_t *this;
/** mutex to lock this entry in/out buffers */
mutex_t *mutex;
/** associated stream */
stream_t *stream;
/** queued messages to send, as msg_buf_t pointers */
array_t *out;
/** input message buffer */
msg_buf_t in;
/** client connection identifier */
u_int id;
} entry_t;
/**
* Destroy an connection entry
*/
CALLBACK(destroy_entry, void,
entry_t *entry)
{
msg_buf_t *out;
entry->stream->destroy(entry->stream);
entry->this->disconnect(entry->this->user, entry->id);
entry->mutex->destroy(entry->mutex);
while (array_remove(entry->out, ARRAY_TAIL, &out))
{
chunk_clear(&out->buf);
free(out);
}
array_destroy(entry->out);
chunk_clear(&entry->in.buf);
free(entry);
}
/**
* Find/remove entry by id, requires proper locking
*/
static entry_t* find_entry(private_vici_socket_t *this, u_int id, bool remove)
{
enumerator_t *enumerator;
entry_t *entry, *found = NULL;
enumerator = this->connections->create_enumerator(this->connections);
while (enumerator->enumerate(enumerator, &entry))
{
if (entry->id == id)
{
if (remove)
{
this->connections->remove_at(this->connections, enumerator);
}
found = entry;
break;
}
}
enumerator->destroy(enumerator);
return found;
}
/**
* Asynchronous callback to disconnect client
*/
CALLBACK(disconnect_async, job_requeue_t,
entry_data_t *data)
{
entry_t *entry;
data->this->lock->write_lock(data->this->lock);
entry = find_entry(data->this, data->id, TRUE);
data->this->lock->unlock(data->this->lock);
if (entry)
{
destroy_entry(entry);
}
return JOB_REQUEUE_NONE;
}
/**
* Disconnect a connected client
*/
static void disconnect(private_vici_socket_t *this, u_int id)
{
entry_data_t *data;
INIT(data,
.this = this,
.id = id,
);
lib->processor->queue_job(lib->processor,
(job_t*)callback_job_create(disconnect_async, data, free, NULL));
}
/**
* Write queued output data
*/
static bool do_write(private_vici_socket_t *this, entry_t *entry,
stream_t *stream)
{
msg_buf_t *out;
ssize_t len;
while (array_get(entry->out, ARRAY_HEAD, &out))
{
/* write header */
while (out->hdrlen < sizeof(out->hdr))
{
len = stream->write(stream, out->hdr + out->hdrlen,
sizeof(out->hdr) - out->hdrlen, FALSE);
if (len == 0)
{
return FALSE;
}
if (len < 0)
{
if (errno == EWOULDBLOCK)
{
return TRUE;
}
DBG1(DBG_CFG, "vici header write error: %s", strerror(errno));
return FALSE;
}
out->hdrlen += len;
}
/* write buffer buffer */
while (out->buf.len > out->done)
{
len = stream->write(stream, out->buf.ptr + out->done,
out->buf.len - out->done, FALSE);
if (len == 0)
{
DBG1(DBG_CFG, "premature vici disconnect");
return FALSE;
}
if (len < 0)
{
if (errno == EWOULDBLOCK)
{
return TRUE;
}
DBG1(DBG_CFG, "vici write error: %s", strerror(errno));
return FALSE;
}
out->done += len;
}
if (array_remove(entry->out, ARRAY_HEAD, &out))
{
chunk_clear(&out->buf);
free(out);
}
}
return TRUE;
}
/**
* Send pending messages
*/
CALLBACK(on_write, bool,
entry_t *entry, stream_t *stream)
{
bool ret;
entry->mutex->lock(entry->mutex);
ret = do_write(entry->this, entry, stream);
if (ret)
{
/* unregister if we have no more messages to send */
ret = array_count(entry->out) != 0;
}
else
{
disconnect(entry->this, entry->id);
}
entry->mutex->unlock(entry->mutex);
return ret;
}
/**
* Read in available header with data, non-blocking cumulating to buffer
*/
static bool do_read(private_vici_socket_t *this, entry_t *entry,
stream_t *stream)
{
ssize_t len;
/* assemble the length header first */
while (entry->in.hdrlen < sizeof(entry->in.hdr))
{
len = stream->read(stream, entry->in.hdr + entry->in.hdrlen,
sizeof(entry->in.hdr) - entry->in.hdrlen, FALSE);
if (len == 0)
{
return FALSE;
}
if (len < 0)
{
if (errno == EWOULDBLOCK)
{
return TRUE;
}
DBG1(DBG_CFG, "vici header read error: %s", strerror(errno));
return FALSE;
}
entry->in.hdrlen += len;
if (entry->in.hdrlen == sizeof(entry->in.hdr))
{
/* header complete, continue with data */
entry->in.buf = chunk_alloc(untoh16(entry->in.hdr));
}
}
/* assemble buffer */
while (entry->in.buf.len > entry->in.done)
{
len = stream->read(stream, entry->in.buf.ptr + entry->in.done,
entry->in.buf.len - entry->in.done, FALSE);
if (len == 0)
{
DBG1(DBG_CFG, "premature vici disconnect");
return FALSE;
}
if (len < 0)
{
if (errno == EWOULDBLOCK)
{
return TRUE;
}
DBG1(DBG_CFG, "vici read error: %s", strerror(errno));
return FALSE;
}
entry->in.done += len;
}
this->inbound(this->user, entry->id, entry->in.buf);
chunk_clear(&entry->in.buf);
entry->in.hdrlen = entry->in.done = 0;
return TRUE;
}
/**
* Process incoming messages
*/
CALLBACK(on_read, bool,
entry_t *entry, stream_t *stream)
{
bool ret;
entry->mutex->lock(entry->mutex);
ret = do_read(entry->this, entry, stream);
if (!ret)
{
disconnect(entry->this, entry->id);
}
entry->mutex->unlock(entry->mutex);
return ret;
}
/**
* Process connection request
*/
static bool on_accept(private_vici_socket_t *this, stream_t *stream)
{
entry_t *entry;
u_int id;
id = ref_get(&this->nextid);
INIT(entry,
.this = this,
.stream = stream,
.id = id,
.out = array_create(0, 0),
.mutex = mutex_create(MUTEX_TYPE_RECURSIVE),
);
this->lock->write_lock(this->lock);
this->connections->insert_last(this->connections, entry);
stream->on_read(stream, on_read, entry);
this->lock->unlock(this->lock);
this->connect(this->user, id);
return TRUE;
}
/**
* Enable on_write callback to send data
*/
CALLBACK(on_write_async, job_requeue_t,
entry_data_t *data)
{
private_vici_socket_t *this = data->this;
entry_t *entry;
this->lock->read_lock(this->lock);
entry = find_entry(this, data->id, FALSE);
if (entry)
{
entry->stream->on_write(entry->stream, on_write, entry);
}
this->lock->unlock(this->lock);
return JOB_REQUEUE_NONE;
}
METHOD(vici_socket_t, send_, void,
private_vici_socket_t *this, u_int id, chunk_t msg)
{
if (msg.len <= (u_int16_t)~0)
{
entry_data_t *data;
msg_buf_t *out;
entry_t *entry;
this->lock->read_lock(this->lock);
entry = find_entry(this, id, FALSE);
if (entry)
{
INIT(out,
.buf = msg,
);
htoun16(out->hdr, msg.len);
entry->mutex->lock(entry->mutex);
array_insert(entry->out, ARRAY_TAIL, out);
entry->mutex->unlock(entry->mutex);
if (array_count(entry->out) == 1)
{
INIT(data,
.this = this,
.id = entry->id,
);
/* asynchronously enable writing, as this might be called
* from the on_read() callback. */
lib->processor->queue_job(lib->processor,
(job_t*)callback_job_create(on_write_async,
data, free, NULL));
}
}
else
{
DBG1(DBG_CFG, "vici connection %u unknown", id);
}
this->lock->unlock(this->lock);
}
else
{
DBG1(DBG_CFG, "vici message exceeds maximum size, discarded");
chunk_clear(&msg);
}
}
METHOD(vici_socket_t, destroy, void,
private_vici_socket_t *this)
{
DESTROY_IF(this->service);
this->connections->destroy_function(this->connections, destroy_entry);
this->lock->destroy(this->lock);
free(this);
}
/*
* see header file
*/
vici_socket_t *vici_socket_create(char *uri, vici_inbound_cb_t inbound,
vici_connect_cb_t connect,
vici_disconnect_cb_t disconnect, void *user)
{
private_vici_socket_t *this;
INIT(this,
.public = {
.send = _send_,
.destroy = _destroy,
},
.lock = rwlock_create(RWLOCK_TYPE_DEFAULT),
.connections = linked_list_create(),
.inbound = inbound,
.connect = connect,
.disconnect = disconnect,
.user = user,
);
this->service = lib->streams->create_service(lib->streams, uri, 3);
if (!this->service)
{
DBG1(DBG_CFG, "creating vici socket failed");
destroy(this);
return NULL;
}
this->service->on_accept(this->service, (stream_service_cb_t)on_accept,
this, JOB_PRIO_CRITICAL, 0);
return &this->public;
}

View File

@ -0,0 +1,90 @@
/*
* Copyright (C) 2014 Martin Willi
* Copyright (C) 2014 revosec AG
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License as published by the
* Free Software Foundation; either version 2 of the License, or (at your
* option) any later version. See <http://www.fsf.org/copyleft/gpl.txt>.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
/**
* @defgroup vici_socket vici_socket
* @{ @ingroup vici
*/
#ifndef VICI_SOCKET_H_
#define VICI_SOCKET_H_
#include <library.h>
typedef struct vici_socket_t vici_socket_t;
/**
* Callback function for dispatching inbound client messages.
*
* @param user user data, as passed during registration
* @param id unique client connection identifier
* @param data incoming message data
*/
typedef void (*vici_inbound_cb_t)(void *user, u_int id, chunk_t data);
/**
* Callback function invoked when new clients connect
*
* @param user user data, as passed during registration
* @param id unique client connection identifier
* @return client connection context
*/
typedef void (*vici_connect_cb_t)(void *user, u_int id);
/**
* Callback function invoked when connected clients disconnect
*
* @param user user data, as passed during registration
* @param id unique client connection identifier
*/
typedef void (*vici_disconnect_cb_t)(void *user, u_int id);
/**
* Vici socket, low level socket input/output handling.
*
* On the socket, we pass raw chunks having a 2 byte network order length
* prefix. The length field does not count the length header itself, and
* is not included in the data passed over this interface.
*/
struct vici_socket_t {
/**
* Send a message to a client identified by connection identifier.
*
* @param id unique client connection identifier
* @param data data to send to client, gets owned
*/
void (*send)(vici_socket_t *this, u_int id, chunk_t data);
/**
* Destroy socket.
*/
void (*destroy)(vici_socket_t *this);
};
/**
* Create a vici_socket instance.
*
* @param uri socket URI to listen on
* @param inbound inbound message callback
* @param connect connect callback
* @param disconnect disconnect callback
* @param user user data to pass to callbacks
*/
vici_socket_t *vici_socket_create(char *uri, vici_inbound_cb_t inbound,
vici_connect_cb_t connect,
vici_disconnect_cb_t disconnect, void *user);
#endif /** VICI_SOCKET_H_ @}*/

View File

@ -12,3 +12,6 @@
* or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* for more details.
*/
TEST_SUITE(socket_suite_create)
TEST_SUITE(message_suite_create)