Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ a.out
/test_clientid
/test_stamp
/test_map
/test_map_abort
/test_elementid
/test_element

Expand Down
25 changes: 16 additions & 9 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ test-string: string.c test_string.c test_util.h
./test_string

.PHONY: test-counter
test-counter: arena.c string.c hashtable.c clientid.c host_posix.c counter.c test_counter.c test_util.h
$(CC) $(CFLAGS) -o test_counter arena.c string.c hashtable.c clientid.c host_posix.c counter.c test_counter.c
test-counter: arena.c string.c hashtable.c clientid.c elementid.c host_posix.c counter.c test_counter.c test_util.h
$(CC) $(CFLAGS) -o test_counter arena.c string.c hashtable.c clientid.c elementid.c host_posix.c counter.c test_counter.c
./test_counter

.PHONY: test-scalar
Expand All @@ -47,18 +47,25 @@ test-scalar: arena.c string.c host_posix.c scalar.c test_scalar.c test_util.h
./test_scalar

.PHONY: test-register
test-register: arena.c string.c clientid.c host_posix.c stamp.c scalar.c register.c test_register.c test_util.h
$(CC) $(CFLAGS) -o test_register arena.c string.c clientid.c host_posix.c stamp.c scalar.c register.c test_register.c
test-register: arena.c string.c clientid.c elementid.c host_posix.c stamp.c scalar.c register.c test_register.c test_util.h
$(CC) $(CFLAGS) -o test_register arena.c string.c clientid.c elementid.c host_posix.c stamp.c scalar.c register.c test_register.c
./test_register

.PHONY: test-map
test-map: arena.c string.c hashtable.c clientid.c host_posix.c stamp.c scalar.c map.c test_map.c test_util.h
$(CC) $(CFLAGS) -o test_map arena.c string.c hashtable.c clientid.c host_posix.c stamp.c scalar.c map.c test_map.c
test-map: arena.c string.c hashtable.c clientid.c elementid.c host_posix.c stamp.c scalar.c register.c counter.c element.c map.c test_map.c test_util.h
$(CC) $(CFLAGS) -o test_map arena.c string.c hashtable.c clientid.c elementid.c host_posix.c stamp.c scalar.c register.c counter.c element.c map.c test_map.c
./test_map

# Death test: binary must abort. Exit status is inverted so success means
# the process died via host_abort.
.PHONY: test-map-abort
test-map-abort: arena.c string.c hashtable.c clientid.c elementid.c host_posix.c stamp.c scalar.c register.c counter.c element.c map.c test_map_abort.c
$(CC) $(CFLAGS) -o test_map_abort arena.c string.c hashtable.c clientid.c elementid.c host_posix.c stamp.c scalar.c register.c counter.c element.c map.c test_map_abort.c
! ./test_map_abort 2>/dev/null

.PHONY: test-element
test-element: arena.c string.c hashtable.c clientid.c host_posix.c stamp.c scalar.c register.c counter.c map.c element.c test_element.c test_util.h
$(CC) $(CFLAGS) -o test_element arena.c string.c hashtable.c clientid.c host_posix.c stamp.c scalar.c register.c counter.c map.c element.c test_element.c
test-element: arena.c string.c hashtable.c clientid.c elementid.c host_posix.c stamp.c scalar.c register.c counter.c map.c element.c test_element.c test_util.h
$(CC) $(CFLAGS) -o test_element arena.c string.c hashtable.c clientid.c elementid.c host_posix.c stamp.c scalar.c register.c counter.c map.c element.c test_element.c
./test_element

.PHONY: test-clientid
Expand All @@ -77,4 +84,4 @@ test-elementid: string.c clientid.c host_posix.c elementid.c test_elementid.c te
./test_elementid

.PHONY: test
test: test-arena test-hashtable test-string test-counter test-scalar test-register test-clientid test-stamp test-map test-elementid test-element
test: test-arena test-hashtable test-string test-counter test-scalar test-register test-clientid test-stamp test-map test-map-abort test-elementid test-element
11 changes: 10 additions & 1 deletion counter.c
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
#include "hashtable.h"
#include "host.h"

struct Counter {
ElementId id;
Arena *arena;
HashTable *entries; // client_id (uint32_t) -> CounterEntry
};

static inline uint32_t max_u32(uint32_t a, uint32_t b) {
if (a > b) {
return a;
Expand All @@ -11,13 +17,14 @@ static inline uint32_t max_u32(uint32_t a, uint32_t b) {
return b;
}

Counter *counter_create(Arena *arena) {
Counter *counter_create(Arena *arena, ElementId id) {
Counter *counter = arena_alloc(arena, sizeof(Counter));
if (!counter) {
host_abortf(
"counter_create: arena OOM (requested %zu bytes for Counter)",
sizeof(Counter));
}
counter->id = id;
counter->arena = arena;
counter->entries = hashtable_create(arena);
if (!counter->entries) {
Expand All @@ -26,6 +33,8 @@ Counter *counter_create(Arena *arena) {
return counter;
}

ElementId counter_id(const Counter *counter) { return counter->id; }

int64_t counter_read(const Counter *counter) {
int64_t total = 0;
HashTableIter it = hashtable_iter(counter->entries);
Expand Down
32 changes: 27 additions & 5 deletions counter.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,31 @@
#ifndef _CRDT_COUNTER_H
#define _CRDT_COUNTER_H

// PN-Counter: integer counter with concurrent increments and decrements.
// Carries an ElementId set at create, exposed via counter_id — that's how
// parent containers identify "same logical Counter across replicas".
//
// Semantics:
// - Per-client (inc, dec) tallies, one CounterEntry per ClientId that
// ever wrote to this Counter. counter_inc / counter_dec add into the
// calling client's own tallies.
// - counter_read returns sum over all clients of (inc - dec).
// - counter_merge unions src into dst per-direction: dst's entry for
// each ClientId becomes (max(dst.inc, src.inc), max(dst.dec, src.dec)).
// Merge is NOT addition — replicas may have observed the same writes
// concurrently, so max is what makes the merge idempotent / commutative
// / associative.
// - Increments and decrements use uint32_t to keep per-direction max
// well-defined; counter_read widens to int64_t for the signed total.
//
// Ownership:
// - Per-client entries live in the Counter's arena.
//
// Lifetime: Counter must not outlive its arena.

#include "arena.h"
#include "clientid.h"
#include "elementid.h"
#include "hashtable.h"
#include <stdint.h>

Expand All @@ -12,12 +35,11 @@ typedef struct CounterEntry {
uint32_t dec;
} CounterEntry;

typedef struct Counter {
Arena *arena;
HashTable *entries; // client_id (uint32_t) -> CounterEntry
} Counter;
typedef struct Counter Counter;

Counter *counter_create(Arena *arena, ElementId id);

Counter *counter_create(Arena *arena);
ElementId counter_id(const Counter *counter);

int64_t counter_read(const Counter *counter);

Expand Down
1 change: 1 addition & 0 deletions element.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "element.h"
#include "host.h"
#include "map.h"

Element element_scalar(Scalar s) {
Element e = {.kind = ELEMENT_SCALAR, .as.scalar = s};
Expand Down
26 changes: 25 additions & 1 deletion element.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,34 @@
#ifndef _CRDT_ELEMENT_H
#define _CRDT_ELEMENT_H

// Element: tagged union over the four value kinds a Map slot can hold —
// SCALAR (inline value), or one of REGISTER / COUNTER / MAP (pointer to
// a separately-allocated composite).
//
// Constructors (element_scalar / _register / _counter / _map) tag the
// kind and stash the payload. element_kind reads the tag back.
//
// element_merge dispatches on dst's kind:
// - REGISTER → register_merge(dst, src)
// - COUNTER → counter_merge(dst, src)
// - MAP → map_merge(dst, src)
// - SCALAR → host_abort. Scalars do not merge as elements; their LWW
// lives at the slot level (in Map). Reaching this branch
// is a programmer error.
//
// Ownership: composites are referenced by pointer; element_merge mutates
// dst's composite in place and never touches src's. Callers are
// responsible for keeping pointed-to composites alive (typically by
// putting them in the same arena as the containing Map).

#include "counter.h"
#include "map.h"
#include "register.h"
#include "scalar.h"

typedef struct Map Map;
typedef struct Register Register;
typedef struct Counter Counter;

typedef enum ElementKind {
ELEMENT_SCALAR,
ELEMENT_REGISTER,
Expand Down
19 changes: 18 additions & 1 deletion elementid.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
#include "clientid.h"
#ifndef _CRDT_ELEMENTID_H
#define _CRDT_ELEMENTID_H

// ElementId: identity of a composite element (Register / Counter / Map),
// shared across replicas. Two replicas creating "the same logical element"
// must give it the same ElementId; that's the hook map_merge uses to know
// "these two slots are the same object, recurse" vs "these are different
// objects, LWW the slot".
//
// Shape: { ClientId origin, uint64 seq }. Pass by value (~24 bytes), like
// Stamp / ClientId / Scalar. Fields are public.
//
// elementid_new builds one from (origin, seq). elementid_root is a fixed
// sentinel for the top-level Map of a document; it does not collide with
// any id derived from a real ClientId. elementid_eq is the equality used
// by map_merge's recursive path; elementid_cmp gives a total order
// (origin first via clientid_cmp, then seq).

#include "clientid.h"

typedef struct ElementId {
ClientId origin;
Expand Down
98 changes: 84 additions & 14 deletions map.c
Original file line number Diff line number Diff line change
@@ -1,25 +1,28 @@
#include "map.h"
#include "element.h"
#include "hashtable.h"
#include "host.h"
#include "string.h"

typedef struct MapEntry {
Stamp stamp;
Scalar value;
Element value;
bool is_tombstone;
} Entry;

struct Map {
ElementId id;
Arena *arena;
HashTable *entries;
};

Map *map_create(Arena *arena) {
Map *map_create(Arena *arena, ElementId id) {
Map *map = arena_alloc(arena, sizeof(Map));
if (!map) {
host_abortf("map_create: arena OOM (requested %zu bytes for Map)",
sizeof(Map));
}
map->id = id;
map->arena = arena;
map->entries = hashtable_create(arena);
if (!map->entries) {
Expand All @@ -28,7 +31,9 @@ Map *map_create(Arena *arena) {
return map;
}

bool map_get(const Map *map, const void *key, size_t key_len, Scalar *out) {
ElementId map_id(const Map *map) { return map->id; }

bool map_get(const Map *map, const void *key, size_t key_len, Element *out) {
void *entry;
bool present = hashtable_get(map->entries, key, key_len, &entry);
if (!present) {
Expand All @@ -44,7 +49,7 @@ bool map_get(const Map *map, const void *key, size_t key_len, Scalar *out) {
return true;
}

void map_set(Map *map, const void *key, size_t key_len, Scalar value,
void map_set(Map *map, const void *key, size_t key_len, Element value,
Stamp stamp) {
Entry *entry;
bool present = hashtable_get(map->entries, key, key_len, (void **)&entry);
Expand All @@ -65,11 +70,23 @@ void map_set(Map *map, const void *key, size_t key_len, Scalar value,
}
}

Scalar copy = scalar_dup(map->arena, value);
switch (value.kind) {
case ELEMENT_SCALAR: {
Scalar copy = scalar_dup(map->arena, value.as.scalar);
value.as.scalar = copy;
break;
}
case ELEMENT_REGISTER:
case ELEMENT_COUNTER:
case ELEMENT_MAP:
// Composite values are pointers to separately-allocated heap
// objects; no dup needed.
break;
}

entry->value = value;
entry->stamp = stamp;
entry->is_tombstone = false;
entry->value = copy;
}
}

Expand Down Expand Up @@ -102,16 +119,69 @@ void map_delete(Map *map, const void *key, size_t key_len, Stamp stamp) {

void map_merge(Map *dst, const Map *src) {
HashTableIter it = hashtable_iter(src->entries);
const void *k = NULL;
size_t klen = 0;
void *v = NULL;
const void *k;
size_t klen;
void *v;
while (hashtable_iter_next(&it, &k, &klen, &v)) {
Entry *src_entry = v;
if (src_entry->is_tombstone) {
map_delete(dst, k, klen, src_entry->stamp);
} else {
map_set(dst, k, klen, src_entry->value, src_entry->stamp);
Entry *se = v;

Entry *de;
bool dst_has = hashtable_get(dst->entries, k, klen, (void **)&de);

// Recursive: both alive, same composite kind and same id then
// element_merge. This wins over slot LWW.
if (dst_has && !de->is_tombstone && !se->is_tombstone &&
de->value.kind == se->value.kind &&
de->value.kind != ELEMENT_SCALAR) {
ElementId did, sid;
switch (de->value.kind) {
case ELEMENT_SCALAR:
did = sid = elementid_root(); // unused, won't compare equal,
// assign to silence warning
break;
case ELEMENT_REGISTER:
did = register_id(de->value.as.reg);
sid = register_id(se->value.as.reg);
break;
case ELEMENT_COUNTER:
did = counter_id(de->value.as.counter);
sid = counter_id(se->value.as.counter);
break;
case ELEMENT_MAP:
did = map_id(de->value.as.map);
sid = map_id(se->value.as.map);
break;
}
if (elementid_eq(did, sid)) {
element_merge(de->value, se->value);
// Advance slot stamp to max(dst, src) so future slot-level
// ops on this key are LWW-deterministic across replicas.
if (stamp_gt(se->stamp, de->stamp)) {
de->stamp = se->stamp;
}
continue;
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
}
}

// LWW fallthrough.
if (se->is_tombstone) {
map_delete(dst, k, klen, se->stamp);
continue;
}

// Reaching the LWW path with a composite means id derivation is
// broken or types diverged at this key — programmer error
// regardless of which side's stamp wins. Abort unconditionally so
// the failure is not stamp-dependent (which would let the same
// broken state crash one replica and silently pass on another).
if (se->value.kind != ELEMENT_SCALAR) {
host_abortf("map_merge: composite at LWW path — "
"src kind %s, dst id != src id (or dst empty / "
"tombstoned / different kind). "
"Use deterministic id derivation for composite slots.",
element_kind_name(se->value.kind));
}
map_set(dst, k, klen, se->value, se->stamp);
}
}

Expand Down
Loading