From 76dfc7c4fcb80aca30c63dc30811a5e733f5a203 Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Tue, 2 Jun 2026 17:53:48 +0200 Subject: [PATCH 1/6] Emit scope --- CHANGELOG.md | 1 + inc/ti/collection.t.h | 1 + inc/ti/version.h | 2 +- itest/requirements.txt | 2 +- itest/test_room.py | 61 ++++++++++++++++++++++++++++++++---------- src/ti/collection.c | 4 ++- src/ti/room.c | 56 +++++++++++++++++++++++++------------- 7 files changed, 91 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ee02454d..b9b43335 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # v1.8.6-alpha0 * Read `type_count(..)` from cache if available, pr #443. +* Add `scope` to event packages for correct mapping, pr # v1.8.5 diff --git a/inc/ti/collection.t.h b/inc/ti/collection.t.h index 11e78147..f20a9852 100644 --- a/inc/ti/collection.t.h +++ b/inc/ti/collection.t.h @@ -28,6 +28,7 @@ struct ti_collection_s uint64_t created_at; /* UNIX time-stamp in seconds */ ti_tz_t * tz; ti_raw_t * name; + ti_raw_t * scope; imap_t * things; /* weak map for ti_thing_t */ imap_t * rooms; /* weak map for ti_room_t */ queue_t * gc; /* ti_gc_t */ diff --git a/inc/ti/version.h b/inc/ti/version.h index 13d4b7af..b43362d3 100644 --- a/inc/ti/version.h +++ b/inc/ti/version.h @@ -25,7 +25,7 @@ * "-rc0" * "" */ -#define TI_VERSION_PRE_RELEASE "-alpha0" +#define TI_VERSION_PRE_RELEASE "-alpha1" #define TI_MAINTAINER \ "Jeroen van der Heijden " diff --git a/itest/requirements.txt b/itest/requirements.txt index 74182d78..e0f7a154 100644 --- a/itest/requirements.txt +++ b/itest/requirements.txt @@ -1,7 +1,7 @@ psutil pycodestyle msgpack -python-thingsdb>=1.3.0 +python-thingsdb>=1.4.0 requests pytz websockets diff --git a/itest/test_room.py b/itest/test_room.py index 27a40e87..47b88c98 100755 --- a/itest/test_room.py +++ b/itest/test_room.py @@ -55,7 +55,7 @@ class TestRoom(TestBase): title = 'Test room type' - @default_test_setup(num_nodes=3, seed=1, threshold_full_storage=100) + @default_test_setup(num_nodes=1, seed=1, threshold_full_storage=100) async def async_run(self): await self.node0.init_and_run() @@ -63,14 +63,15 @@ async def async_run(self): cl0.set_default_scope('//stuff') # add more nodes for watch validation - await self.node1.join_until_ready(cl0) - await self.node2.join_until_ready(cl0) + # await self.node1.join_until_ready(cl0) + # await self.node2.join_until_ready(cl0) - cl1 = await get_client(self.node1) - cl1.set_default_scope('//stuff') + # cl1 = await get_client(self.node1) + # cl1.set_default_scope('//stuff') - cl2 = await get_client(self.node2) - cl2.set_default_scope('//stuff') + # cl2 = await get_client(self.node2) + # cl2.set_default_scope('//stuff') + cl1 = cl2 = cl0 await self.run_tests(cl0, cl1, cl2) @@ -83,7 +84,7 @@ async def async_run(self): client.close() await client.wait_closed() - async def test_is_room(self, cl0, cl1, cl2): + async def _test_is_room(self, cl0, cl1, cl2): with self.assertRaisesRegex( NumArgumentsError, 'function `is_room` takes 1 argument but 0 were given'): @@ -92,7 +93,7 @@ async def test_is_room(self, cl0, cl1, cl2): self.assertTrue(await cl0.query('is_room( room() ); ')) self.assertFalse(await cl0.query('is_room( "bla" ); ')) - async def test_room_err(self, cl0, cl1, cl2): + async def _test_room_err(self, cl0, cl1, cl2): with self.assertRaisesRegex( ValueError, 'name `room` is reserved'): @@ -113,7 +114,7 @@ async def test_room_err(self, cl0, cl1, cl2): 'collection `stuff` has no `room` with id 0'): await cl0.query('room(0);') - async def test_rooms(self, cl0, cl1, cl2): + async def _test_rooms(self, cl0, cl1, cl2): room_ids = await cl0.query(r"""//ti .rooms = range(3).map(|| room()); .rooms.map(|room| room.id()); @@ -165,7 +166,7 @@ async def test_rooms(self, cl0, cl1, cl2): 'on_delete', 'on_delete', 'on_delete', 'on_delete', 'on_delete', 'on_delete']) - async def test_join_leave(self, cl0, cl1, cl2): + async def _test_join_leave(self, cl0, cl1, cl2): await cl0.query(r"""//ti range(3).each(|i| .set(`room{i}`, room())); """) @@ -184,7 +185,7 @@ async def test_join_leave(self, cl0, cl1, cl2): ids = [id for id in res if id is not None] self.assertEqual(len(ids), 3) - async def test_object_to_room(self, cl0, cl1, cl2): + async def _test_object_to_room(self, cl0, cl1, cl2): await cl0.query(r"""//ti .oroom = room(); .oroom.set_name("oroom"); @@ -207,7 +208,7 @@ async def test_object_to_room(self, cl0, cl1, cl2): self.assertEqual([r0, r1, r2], actions) - async def test_room_name(self, cl0, cl1, cl2): + async def _test_room_name(self, cl0, cl1, cl2): await cl0.query(r"""//ti .room_a = room(); .room_b = room(); @@ -303,7 +304,7 @@ async def test_room_name(self, cl0, cl1, cl2): res = await cl0.query('room("A").name();') self.assertEqual(res, "A") - async def test_room_peer_only(self, cl0, cl1, cl2): + async def _test_room_peer_only(self, cl0, cl1, cl2): await cl0.query(r"""//ti .room = room(); .room.set_name("test_peer_room"); @@ -336,6 +337,38 @@ async def test_room_peer_only(self, cl0, cl1, cl2): 'from_0_to_peers', ]), sorted(actions1)) + async def test_multi_collection_room(self, cl0, cl1, cl2): + await cl0.query(r"""//ti + new_collection('a'); + new_collection('b'); + """, scope='/t') + room_id_a = await cl0.query(r"""//ti + .room = room(); + .room.id(); + """, scope='//a') + room_id_b = await cl0.query(r"""//ti + .room = room(); + .room.id(); + """, scope='//b') + + self.assertEqual(room_id_a, room_id_b) + + actions0 = [] + actions1 = [] + room0 = ORoom(actions0, room_id_a, scope='//a') + room1 = ORoom(actions1, room_id_b, scope='//b') + + await room0.join(cl0) + await room1.join(cl0) + + await room0.emit('add', 'room0') + await room1.emit('add', 'room1') + + await asyncio.sleep(1.5) + + self.assertEqual(['room0'], actions0) + self.assertEqual(['room1'], actions1) + if __name__ == '__main__': run_test(TestRoom()) diff --git a/src/ti/collection.c b/src/ti/collection.c index 7e929b67..f3ae537f 100644 --- a/src/ti/collection.c +++ b/src/ti/collection.c @@ -56,6 +56,7 @@ ti_collection_t * ti_collection_create( collection->id = collection_id; collection->next_free_id = next_free_id; collection->name = ti_str_create(name, n); + collection->scope = ti_str_from_fmt("@collection:%.*s", (int) n, name); collection->things = imap_create(); collection->rooms = imap_create(); collection->gc = queue_new(20); @@ -77,7 +78,7 @@ ti_collection_t * ti_collection_create( if (!collection->name || !collection->things || !collection->gc || !collection->access || !collection->procedures || !collection->lock || !collection->types || !collection->enums || !collection->futures || - !collection->rooms || !collection->named_rooms || + !collection->rooms || !collection->named_rooms || !collection->scope || !collection->ano_types || uv_mutex_init(collection->lock)) { ti_collection_drop(collection); @@ -100,6 +101,7 @@ void ti_collection_destroy(ti_collection_t * collection) imap_destroy(collection->rooms, NULL); queue_destroy(collection->gc, NULL); ti_val_drop((ti_val_t *) collection->name); + ti_val_drop((ti_val_t *) collection->scope); vec_destroy(collection->access, (vec_destroy_cb) ti_auth_destroy); vec_destroy(collection->vtasks, (vec_destroy_cb) ti_vtask_drop); vec_destroy(collection->commits, (vec_destroy_cb) ti_commit_destroy); diff --git a/src/ti/room.c b/src/ti/room.c index c544b574..3cc28565 100644 --- a/src/ti/room.c +++ b/src/ti/room.c @@ -114,7 +114,7 @@ static void room__emit_delete(ti_room_t * room) msgpack_sbuffer buffer; ti_pkg_t * pkg; - if (mp_sbuffer_alloc_init(&buffer, 32, sizeof(ti_pkg_t))) + if (mp_sbuffer_alloc_init(&buffer, 288, sizeof(ti_pkg_t))) { log_critical(EX_MEMORY_S); return; @@ -122,7 +122,13 @@ static void room__emit_delete(ti_room_t * room) msgpack_packer_init(&pk, &buffer, msgpack_sbuffer_write); - msgpack_pack_map(&pk, 1); + msgpack_pack_map(&pk, 2); + + mp_pack_str(&pk, "scope"); + mp_pack_strn(&pk, + room->collection->scope->data, + room->collection->scope->n); + mp_pack_str(&pk, "id"); msgpack_pack_uint64(&pk, room->id); @@ -189,7 +195,12 @@ int ti_room_emit( sz = buffer.size; - msgpack_pack_map(&vp.pk, 3); + msgpack_pack_map(&vp.pk, 4); + + mp_pack_str(&vp.pk, "scope"); + mp_pack_strn(&vp.pk, + room->collection->scope->data, + room->collection->scope->n); mp_pack_str(&vp.pk, "id"); msgpack_pack_uint64(&vp.pk, room->id); @@ -403,6 +414,7 @@ void ti_room_emit_node_status(ti_room_t * room, const char * status) typedef struct { uint64_t room_id; + char scope[256]; ti_stream_t * stream; } room__async_t; @@ -419,7 +431,7 @@ static void room__async_emit_join_cb(uv_async_t * task) if (ti_stream_is_closed(w->stream)) goto done; - if (mp_sbuffer_alloc_init(&buffer, 32, sizeof(ti_pkg_t))) + if (mp_sbuffer_alloc_init(&buffer, 288, sizeof(ti_pkg_t))) { log_critical(EX_MEMORY_S); goto done; @@ -427,13 +439,13 @@ static void room__async_emit_join_cb(uv_async_t * task) msgpack_packer_init(&pk, &buffer, msgpack_sbuffer_write); - if (msgpack_pack_map(&pk, 1) || - mp_pack_str(&pk, "id") || - msgpack_pack_uint64(&pk, w->room_id)) - { - log_critical(EX_MEMORY_S); - goto done; - } + msgpack_pack_map(&pk, 2); + + mp_pack_str(&pk, "scope"); + mp_pack_str(&pk, w->scope); + + mp_pack_str(&pk, "id"); + msgpack_pack_uint64(&pk, w->room_id); pkg = (ti_pkg_t *) buffer.data; pkg_init(pkg, TI_PROTO_EV_ID, TI_PROTO_CLIENT_ROOM_JOIN, buffer.size); @@ -451,12 +463,16 @@ static void room__async_emit_join(ti_room_t * room, ti_stream_t * stream) { uv_async_t * task = malloc(sizeof(uv_async_t)); room__async_t * w = malloc(sizeof(room__async_t)); + ti_raw_t * scope = room ->collection->scope; if (!task || !w) goto failed; w->stream = stream; w->room_id = room->id; + memcpy(w->scope, scope->data, scope->n); + w->scope[scope->n] = '\0'; + task->data = w; if (uv_async_init(ti.loop, task, (uv_async_cb) room__async_emit_join_cb) || @@ -485,7 +501,7 @@ static void room__emit_leave(ti_room_t * room, ti_stream_t * stream) if (ti_stream_is_closed(stream)) return; - if (mp_sbuffer_alloc_init(&buffer, 32, sizeof(ti_pkg_t))) + if (mp_sbuffer_alloc_init(&buffer, 288, sizeof(ti_pkg_t))) { log_critical(EX_MEMORY_S); return; @@ -493,13 +509,15 @@ static void room__emit_leave(ti_room_t * room, ti_stream_t * stream) msgpack_packer_init(&pk, &buffer, msgpack_sbuffer_write); - if (msgpack_pack_map(&pk, 1) || - mp_pack_str(&pk, "id") || - msgpack_pack_uint64(&pk, room->id)) - { - log_critical(EX_MEMORY_S); - return; - } + msgpack_pack_map(&pk, 2); + + mp_pack_str(&pk, "scope"); + mp_pack_strn(&pk, + room->collection->scope->data, + room->collection->scope->n); + + mp_pack_str(&pk, "id"); + msgpack_pack_uint64(&pk, room->id); pkg = (ti_pkg_t *) buffer.data; pkg_init(pkg, TI_PROTO_EV_ID, TI_PROTO_CLIENT_ROOM_LEAVE, buffer.size); From bc68e0b80b183bc3d625d56ec0aeb6758b8b3d4b Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Tue, 2 Jun 2026 18:47:33 +0200 Subject: [PATCH 2/6] Upd test --- CHANGELOG.md | 4 ++-- itest/test_room.py | 14 +++++++------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b9b43335..b13636f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ -# v1.8.6-alpha0 +# v1.8.6-alpha1 * Read `type_count(..)` from cache if available, pr #443. -* Add `scope` to event packages for correct mapping, pr +* Add `scope` in format `@collection:...` to room events, pr #444. # v1.8.5 diff --git a/itest/test_room.py b/itest/test_room.py index 47b88c98..fd5f81eb 100755 --- a/itest/test_room.py +++ b/itest/test_room.py @@ -84,7 +84,7 @@ async def async_run(self): client.close() await client.wait_closed() - async def _test_is_room(self, cl0, cl1, cl2): + async def test_is_room(self, cl0, cl1, cl2): with self.assertRaisesRegex( NumArgumentsError, 'function `is_room` takes 1 argument but 0 were given'): @@ -93,7 +93,7 @@ async def _test_is_room(self, cl0, cl1, cl2): self.assertTrue(await cl0.query('is_room( room() ); ')) self.assertFalse(await cl0.query('is_room( "bla" ); ')) - async def _test_room_err(self, cl0, cl1, cl2): + async def test_room_err(self, cl0, cl1, cl2): with self.assertRaisesRegex( ValueError, 'name `room` is reserved'): @@ -114,7 +114,7 @@ async def _test_room_err(self, cl0, cl1, cl2): 'collection `stuff` has no `room` with id 0'): await cl0.query('room(0);') - async def _test_rooms(self, cl0, cl1, cl2): + async def test_rooms(self, cl0, cl1, cl2): room_ids = await cl0.query(r"""//ti .rooms = range(3).map(|| room()); .rooms.map(|room| room.id()); @@ -166,7 +166,7 @@ async def _test_rooms(self, cl0, cl1, cl2): 'on_delete', 'on_delete', 'on_delete', 'on_delete', 'on_delete', 'on_delete']) - async def _test_join_leave(self, cl0, cl1, cl2): + async def test_join_leave(self, cl0, cl1, cl2): await cl0.query(r"""//ti range(3).each(|i| .set(`room{i}`, room())); """) @@ -185,7 +185,7 @@ async def _test_join_leave(self, cl0, cl1, cl2): ids = [id for id in res if id is not None] self.assertEqual(len(ids), 3) - async def _test_object_to_room(self, cl0, cl1, cl2): + async def test_object_to_room(self, cl0, cl1, cl2): await cl0.query(r"""//ti .oroom = room(); .oroom.set_name("oroom"); @@ -208,7 +208,7 @@ async def _test_object_to_room(self, cl0, cl1, cl2): self.assertEqual([r0, r1, r2], actions) - async def _test_room_name(self, cl0, cl1, cl2): + async def test_room_name(self, cl0, cl1, cl2): await cl0.query(r"""//ti .room_a = room(); .room_b = room(); @@ -304,7 +304,7 @@ async def _test_room_name(self, cl0, cl1, cl2): res = await cl0.query('room("A").name();') self.assertEqual(res, "A") - async def _test_room_peer_only(self, cl0, cl1, cl2): + async def test_room_peer_only(self, cl0, cl1, cl2): await cl0.query(r"""//ti .room = room(); .room.set_name("test_peer_room"); From c66308a85cbc95921be38d967a38d22757942635 Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Tue, 2 Jun 2026 18:50:45 +0200 Subject: [PATCH 3/6] test --- itest/test_room.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/itest/test_room.py b/itest/test_room.py index fd5f81eb..8b8a696a 100755 --- a/itest/test_room.py +++ b/itest/test_room.py @@ -55,7 +55,7 @@ class TestRoom(TestBase): title = 'Test room type' - @default_test_setup(num_nodes=1, seed=1, threshold_full_storage=100) + @default_test_setup(num_nodes=3, seed=1, threshold_full_storage=100) async def async_run(self): await self.node0.init_and_run() @@ -63,15 +63,14 @@ async def async_run(self): cl0.set_default_scope('//stuff') # add more nodes for watch validation - # await self.node1.join_until_ready(cl0) - # await self.node2.join_until_ready(cl0) + await self.node1.join_until_ready(cl0) + await self.node2.join_until_ready(cl0) - # cl1 = await get_client(self.node1) - # cl1.set_default_scope('//stuff') + cl1 = await get_client(self.node1) + cl1.set_default_scope('//stuff') - # cl2 = await get_client(self.node2) - # cl2.set_default_scope('//stuff') - cl1 = cl2 = cl0 + cl2 = await get_client(self.node2) + cl2.set_default_scope('//stuff') await self.run_tests(cl0, cl1, cl2) From f4749ac41e86b45cfece94b0d9205848828aa897 Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Tue, 2 Jun 2026 18:52:30 +0200 Subject: [PATCH 4/6] correct size --- src/ti/room.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ti/room.c b/src/ti/room.c index 3cc28565..892f54cd 100644 --- a/src/ti/room.c +++ b/src/ti/room.c @@ -414,7 +414,7 @@ void ti_room_emit_node_status(ti_room_t * room, const char * status) typedef struct { uint64_t room_id; - char scope[256]; + char scope[268]; ti_stream_t * stream; } room__async_t; From 9902ec4b837910f0407f215e8ff063d9916b59de Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Tue, 2 Jun 2026 18:56:59 +0200 Subject: [PATCH 5/6] emit alloc --- src/ti/room.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ti/room.c b/src/ti/room.c index 892f54cd..acc12696 100644 --- a/src/ti/room.c +++ b/src/ti/room.c @@ -114,7 +114,7 @@ static void room__emit_delete(ti_room_t * room) msgpack_sbuffer buffer; ti_pkg_t * pkg; - if (mp_sbuffer_alloc_init(&buffer, 288, sizeof(ti_pkg_t))) + if (mp_sbuffer_alloc_init(&buffer, 290, sizeof(ti_pkg_t))) { log_critical(EX_MEMORY_S); return; @@ -431,7 +431,7 @@ static void room__async_emit_join_cb(uv_async_t * task) if (ti_stream_is_closed(w->stream)) goto done; - if (mp_sbuffer_alloc_init(&buffer, 288, sizeof(ti_pkg_t))) + if (mp_sbuffer_alloc_init(&buffer, 290, sizeof(ti_pkg_t))) { log_critical(EX_MEMORY_S); goto done; @@ -501,7 +501,7 @@ static void room__emit_leave(ti_room_t * room, ti_stream_t * stream) if (ti_stream_is_closed(stream)) return; - if (mp_sbuffer_alloc_init(&buffer, 288, sizeof(ti_pkg_t))) + if (mp_sbuffer_alloc_init(&buffer, 290, sizeof(ti_pkg_t))) { log_critical(EX_MEMORY_S); return; From fae1aac1b281077fdf5ac3c1d068fd1bcf0f130f Mon Sep 17 00:00:00 2001 From: Jeroen van der Heijden Date: Tue, 2 Jun 2026 19:27:48 +0200 Subject: [PATCH 6/6] size --- src/ti/room.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/ti/room.c b/src/ti/room.c index acc12696..6f8867f1 100644 --- a/src/ti/room.c +++ b/src/ti/room.c @@ -114,7 +114,7 @@ static void room__emit_delete(ti_room_t * room) msgpack_sbuffer buffer; ti_pkg_t * pkg; - if (mp_sbuffer_alloc_init(&buffer, 290, sizeof(ti_pkg_t))) + if (mp_sbuffer_alloc_init(&buffer, 320, sizeof(ti_pkg_t))) { log_critical(EX_MEMORY_S); return; @@ -431,7 +431,7 @@ static void room__async_emit_join_cb(uv_async_t * task) if (ti_stream_is_closed(w->stream)) goto done; - if (mp_sbuffer_alloc_init(&buffer, 290, sizeof(ti_pkg_t))) + if (mp_sbuffer_alloc_init(&buffer, 320, sizeof(ti_pkg_t))) { log_critical(EX_MEMORY_S); goto done; @@ -501,7 +501,7 @@ static void room__emit_leave(ti_room_t * room, ti_stream_t * stream) if (ti_stream_is_closed(stream)) return; - if (mp_sbuffer_alloc_init(&buffer, 290, sizeof(ti_pkg_t))) + if (mp_sbuffer_alloc_init(&buffer, 320, sizeof(ti_pkg_t))) { log_critical(EX_MEMORY_S); return;