Skip to content

Commit 54ce6a6

Browse files
committed
Add checkpointStore
1 parent 4b7312a commit 54ce6a6

File tree

9 files changed

+110
-45
lines changed

9 files changed

+110
-45
lines changed

docs/development.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,6 @@ make manifests
139139

140140
Эта команда генерирует:
141141
- CRD манифесты в `config/crd/bases/`
142-
- RBAC манифесты в `config/rbac/`
143142

144143
### Генерация DeepCopy методов
145144

docs/en/architecture.md

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ High-level flow:
2525
- **Errors**: optional sink for messages that fail to be written to the main sink.
2626
- **Resources**: optional CPU/memory for the processor pod.
2727
- **Scheduling**: optional `nodeSelector`, `affinity`, `tolerations`.
28+
- **CheckpointPersistence**: optional; defaults to `true`. When enabled, polling sources (PostgreSQL, ClickHouse, Trino) persist read position to a ConfigMap, reducing duplicates on restart. Set to `false` to disable.
2829

2930
Secrets can be referenced via `SecretRef` in the spec; the operator resolves them before writing the spec into the ConfigMap.
3031

@@ -46,7 +47,9 @@ For each DataFlow `<name>` in a namespace:
4647
| Resource | Name | Purpose |
4748
|----------|------|---------|
4849
| ConfigMap | `dataflow-<name>-spec` | Holds `spec.json` (resolved spec with secrets inlined). |
50+
| ConfigMap | `dataflow-<name>-checkpoint` | Stores read position for polling sources (default). Omitted when `checkpointPersistence: false`. |
4951
| Deployment | `dataflow-<name>` | One replica; pod runs the **processor** container. |
52+
| ServiceAccount, Role, RoleBinding | `dataflow-<name>-processor` | RBAC for processor to read/write checkpoint ConfigMap (default). Omitted when `checkpointPersistence: false`. |
5053

5154
The processor container:
5255

@@ -65,8 +68,9 @@ The operator uses a **ClusterRole** (and **ClusterRoleBinding** to its ServiceAc
6568
- Create/patch **events**.
6669
- Read **secrets** (for resolution).
6770
- Create/update/delete **ConfigMaps** and **Deployments** in the same namespaces as DataFlow resources.
71+
- When checkpoint persistence is enabled: create **ServiceAccounts**, **Roles**, and **RoleBindings** for processor pods to access the checkpoint ConfigMap.
6872

69-
See the Helm templates (e.g. `clusterrole.yaml`, `clusterrolebinding.yaml`) and the manifest under `config/rbac/` for the exact rules.
73+
See the Helm templates (e.g. `clusterrole.yaml`, `clusterrolebinding.yaml`) for the exact rules.
7074

7175
### Optional: GUI
7276

@@ -92,18 +96,21 @@ flowchart LR
9296
API["API Server"]
9397
CRD["DataFlow CRD"]
9498
Operator["Operator Pod"]
95-
CM["ConfigMap"]
99+
CMSpec["ConfigMap spec"]
100+
CMCheckpoint["ConfigMap checkpoint"]
96101
Dep["Deployment"]
97102
Proc["Processor Pod"]
98103
Ext["Kafka / PostgreSQL / Trino / Nessie"]
99104
100105
User -->|"apply DataFlow"| API
101106
API --> CRD
102107
Operator -->|watch| CRD
103-
Operator -->|create/update| CM
108+
Operator -->|create/update| CMSpec
109+
Operator -->|create/update| CMCheckpoint
104110
Operator -->|create/update| Dep
105111
Dep --> Proc
106-
Proc -->|mount spec| CM
112+
Proc -->|mount spec| CMSpec
113+
Proc -->|read/write checkpoint| CMCheckpoint
107114
Proc -->|connect| Ext
108115
```
109116

@@ -114,33 +121,39 @@ flowchart LR
114121
For each DataFlow, the controller runs the following steps (on create, update, or when owned resources change):
115122

116123
1. **Get DataFlow**
117-
If not found, return. If **DeletionTimestamp** is set: delete the Deployment and ConfigMap (cleanup), update status to `Stopped`, then return.
124+
If not found, return. If **DeletionTimestamp** is set: delete the Deployment, ConfigMaps (spec and checkpoint), and processor RBAC (cleanup), update status to `Stopped`, then return.
118125

119126
2. **Resolve secrets**
120127
Use **SecretResolver** to substitute all `SecretRef` fields in the spec with values from Kubernetes Secrets. Result: **resolved spec**.
121128

122129
3. **ConfigMap**
123130
Create or update the ConfigMap `dataflow-<name>-spec` with key `spec.json` = JSON of the resolved spec. Set controller reference to the DataFlow.
124131

125-
4. **Deployment**
126-
Create or update the Deployment `dataflow-<name>`: processor image, volume from that ConfigMap, args and env as above. Use resources/affinity from DataFlow spec if set. Set controller reference to the DataFlow.
132+
4. **Checkpoint ConfigMap and RBAC** (when `checkpointPersistence` is not `false`, default: enabled)
133+
Create ConfigMap `dataflow-<name>-checkpoint` and RBAC (ServiceAccount, Role, RoleBinding) so the processor pod can read/write the checkpoint. The processor persists source read position (lastReadID, lastReadChangeTime) there, reducing duplicates on restart.
127134

128-
5. **Deployment status**
135+
5. **Deployment**
136+
Create or update the Deployment `dataflow-<name>`: processor image, volume from the spec ConfigMap, args and env as above. When checkpoint persistence is enabled, set `serviceAccountName` so the pod uses the dedicated ServiceAccount. Use resources/affinity from DataFlow spec if set. Set controller reference to the DataFlow.
137+
138+
6. **Deployment status**
129139
Read the Deployment; set DataFlow status **Phase** and **Message** from it (e.g. `Running` when `ReadyReplicas > 0`, `Pending` when replicas are starting, `Error` when no replicas).
130140

131-
6. **Update DataFlow status**
141+
7. **Update DataFlow status**
132142
Write Phase, Message, and other status fields back to the DataFlow resource (with retry on conflict).
133143

134144
### Reconcile Loop Diagram
135145

136146
```mermaid
137147
flowchart TD
138148
A[Get DataFlow] --> B{Deleted?}
139-
B -->|Yes| C[Cleanup Deployment and ConfigMap]
149+
B -->|Yes| C[Cleanup Deployment, ConfigMaps, RBAC]
140150
C --> D[Update Status Stopped]
141151
B -->|No| E[Resolve Secrets]
142152
E --> F[Create or Update ConfigMap]
143-
F --> G[Create or Update Deployment]
153+
F --> F2{CheckpointPersistence?}
154+
F2 -->|Yes| F3[Create Checkpoint ConfigMap and RBAC]
155+
F2 -->|No| G
156+
F3 --> G[Create or Update Deployment]
144157
G --> H[Read Deployment Status]
145158
H --> I[Update DataFlow Status]
146159
```
@@ -164,7 +177,7 @@ It reads the spec from the file, builds a **Processor** from it, and runs `Proce
164177

165178
The **Processor** (in `internal/processor/processor.go`) is built from the spec and contains:
166179

167-
- **Source**: a **SourceConnector** (Kafka, PostgreSQL, Trino, or Nessie) — `Connect`, `Read`, `Close`.
180+
- **Source**: a **SourceConnector** (Kafka, PostgreSQL, Trino, or Nessie) — `Connect`, `Read`, `Close`. By default, polling sources load initial checkpoint from ConfigMap and save it after each successful sink write (debounced). Disable with `checkpointPersistence: false`.
168181
- **Sink**: a **SinkConnector** for the main destination — `Connect`, `Write`, `Close`.
169182
- **Error sink** (optional): another SinkConnector for failed writes.
170183
- **Transformations**: an ordered list of **Transformer** implementations (timestamp, flatten, filter, mask, router, select, remove, snakeCase, camelCase).
@@ -220,6 +233,6 @@ flowchart LR
220233

221234
## Summary
222235

223-
- **Kubernetes**: You declare a **DataFlow** CR; the **operator** reconciles it into a **ConfigMap** (spec) and a **Deployment** (processor pod). RBAC and optional GUI complete the picture.
236+
- **Kubernetes**: You declare a **DataFlow** CR; the **operator** reconciles it into a **ConfigMap** (spec) and a **Deployment** (processor pod). By default, a second ConfigMap and RBAC are created for checkpoint storage (set `checkpointPersistence: false` to disable). RBAC and optional GUI complete the picture.
224237
- **Reconciliation**: Get DataFlow → resolve secrets → update ConfigMap → update Deployment → reflect Deployment status in DataFlow status.
225238
- **Runtime**: Each **processor** pod runs a single pipeline: source → read channel → transformations → write to main (and optionally error and router) sinks, using pluggable connectors and a fixed set of transformations.

docs/en/connectors.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@ source:
414414
- **Change Tracking**: By default tracks changes via `updated_at` column (or `changeTrackingColumn`), captures both INSERTs and UPDATEs
415415
- **Auto-create Table**: When `autoCreateTable: true`, creates the table with CDC-friendly schema (`id SERIAL PRIMARY KEY`, `created_at`, `updated_at`) if it doesn't exist. Creation happens at Connect time.
416416
- **Schema notation**: Table name supports `schema.table` format (e.g. `public.products`)
417-
- **In-memory state**: Read position (lastReadChangeTime) is stored only in memory. On pod/connector restart, the table is fully re-read. For pg→pg flows, enable `upsertMode: true` in sink to update duplicates instead of inserting them again.
417+
- **Checkpoint persistence**: By default, read position (lastReadChangeTime) is persisted to ConfigMap; on restart, reading resumes from the last position. Set `checkpointPersistence: false` in spec to store only in memory. For pg→pg flows, enable `upsertMode: true` in sink to update duplicates instead of inserting them again.
418418

419419
### Sink
420420

docs/en/development.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ make manifests
105105

106106
This command generates:
107107
- CRD manifests in `config/crd/bases/`
108-
- RBAC manifests in `config/rbac/`
109108

110109
### Generate DeepCopy Methods
111110

docs/en/fault-tolerance.md

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ DataFlow Operator processes messages with **at-least-once** delivery semantics.
1212
| Source | State storage | On restart |
1313
|--------|---------------|------------|
1414
| **Kafka** | Consumer group (Kafka) | Resumes from last committed offset. No duplicates if offset was committed after sink write. |
15-
| **PostgreSQL** | In-memory (lastReadChangeTime) | State lost. Re-reads from beginning. Duplicates or gaps possible. |
16-
| **ClickHouse** | In-memory (lastReadID, lastReadTime) | State lost. Re-reads from beginning. Duplicates possible. |
17-
| **Trino** | In-memory (lastReadID) | State lost. Re-reads from beginning. Duplicates possible. |
15+
| **PostgreSQL** | ConfigMap (default); in-memory when `checkpointPersistence: false` | By default resumes from last position. Without persistence: re-reads from beginning. |
16+
| **ClickHouse** | ConfigMap (default); in-memory when `checkpointPersistence: false` | By default resumes from last position. Without persistence: re-reads from beginning. |
17+
| **Trino** | ConfigMap (default); in-memory when `checkpointPersistence: false` | By default resumes from last position. Without persistence: re-reads from beginning. |
1818

1919
### Kafka Source
2020

@@ -26,12 +26,14 @@ The Kafka consumer commits offset **only after** the message is successfully wri
2626

2727
### Polling Sources (PostgreSQL, ClickHouse, Trino)
2828

29-
Read position (lastReadID, lastReadChangeTime) is stored **only in memory**. On pod crash:
29+
By default, read position (lastReadID, lastReadChangeTime) is stored **only in memory**. On pod crash:
3030

3131
- State is lost.
3232
- On restart, the source re-reads from the beginning (or from a wrong position).
3333
- **Duplicates** or **gaps** are possible depending on when the crash occurred.
3434

35+
**Checkpoint persistence** is enabled by default. The read position is persisted to a ConfigMap. On restart, the source resumes from the last committed position, reducing duplicates. Set `checkpointPersistence: false` in spec to disable.
36+
3537
!!! warning "Idempotent sink required"
3638
For polling sources, always configure an **idempotent sink** (UPSERT, ReplacingMergeTree) to handle duplicates safely.
3739

@@ -107,9 +109,28 @@ On SIGTERM (e.g., pod eviction, node drain):
107109

108110
Ensure `terminationGracePeriodSeconds` is sufficient for large batches to flush (default: 600 seconds).
109111

110-
## Checkpoint Persistence (Future)
112+
## Checkpoint Persistence
113+
114+
!!! note "Enabled by default"
115+
The `checkpointPersistence` field in the DataFlow spec defaults to `true`. You do not need to set it explicitly — checkpoint persistence is enabled for all DataFlows with polling sources.
116+
117+
Checkpoint persistence is **enabled by default**. The read position (lastReadID, lastReadChangeTime) is persisted to ConfigMap `dataflow-<name>-checkpoint`. On processor restart, polling sources (PostgreSQL, ClickHouse, Trino) resume from the last committed position, reducing duplicates.
118+
119+
To disable, set `checkpointPersistence: false`:
120+
121+
```yaml
122+
apiVersion: dataflow.dataflow.io/v1
123+
kind: DataFlow
124+
metadata:
125+
name: my-dataflow
126+
spec:
127+
checkpointPersistence: false # Disable (default: true)
128+
source:
129+
type: postgresql
130+
# ...
131+
```
111132

112-
Persisting source checkpoint (lastReadID, lastReadChangeTime) to external storage (ConfigMap or sink table) would allow polling sources to resume from the last committed position after a processor restart, reducing duplicates. This is planned for a future release. Until then, use idempotent sinks to handle duplicates safely.
133+
The controller creates the ConfigMap and RBAC (ServiceAccount, Role, RoleBinding) for the processor. Checkpoint is saved with debounce (every 30 seconds) and on graceful shutdown.
113134

114135
## Summary Checklist
115136

@@ -118,5 +139,5 @@ Persisting source checkpoint (lastReadID, lastReadChangeTime) to external storag
118139
| PostgreSQL sink | Enable `upsertMode: true` with PRIMARY KEY or `conflictKey` |
119140
| ClickHouse sink | Use `ReplacingMergeTree` with `ORDER BY` on deduplication key |
120141
| Kafka source | Consumer group persists offset; idempotent sink recommended |
121-
| Polling sources | **Always** use idempotent sink; state is lost on crash |
142+
| Polling sources | **Always** use idempotent sink; checkpoint persistence enabled by default |
122143
| batchSize | Consider smaller values to reduce duplicate window |

0 commit comments

Comments
 (0)