Conversation
…house Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
…ntext Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
…e context prometheus backends and test out the write clickhouse experiment Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
Signed-off-by: Dom Del Nano <ddelnano@gmail.com>
Signed-off-by: entlein <einentlein@gmail.com>
…config # Conflicts: # k8s/vizier/bootstrap/adaptive_export_deployment.yaml # src/vizier/services/adaptive_export/cmd/main.go # src/vizier/services/adaptive_export/internal/config/config.go # src/vizier/services/adaptive_export/internal/pixie/pixie.go
Signed-off-by: entlein <einentlein@gmail.com>
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Plus Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughAdds an adaptive-export operator (trigger/controller/sink/clickhouse) with ClickHouse schema/DDL/insert helpers and tests; introduces perf-tool exporter abstraction (BigQuery and Parquet→GCS), CLI wiring, new PxL scripts and suites, a DuckDB/Plotly web UI, multiple Bazel/build and Go dependency updates, and various supporting utilities and UDFs. ChangesAdaptive Export + Perf Tool (single cohesive cohort)
Sequence Diagram(s) sequenceDiagram
participant Trigger
participant Controller
participant Sink
participant ClickHouse
Trigger->>Controller: emits Event (from ClickHouse JSONEachRow)
Controller->>Sink: Write(updated attribution rows)
Sink->>ClickHouse: HTTP INSERT (JSONEachRow)
Controller->>Sink: QueryActive (on Rehydrate)
Sink->>ClickHouse: HTTP SELECT ... FINAL
Estimated code review effort: ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 31
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@go.mod`:
- Line 324: The go.mod file contains a replace directive forcing
google.golang.org/protobuf to v1.29.1 (the vulnerable downgrade) while the
require already specifies a safe v1.34.2; remove the replace directive
referencing google.golang.org/protobuf v1.29.1 so the module can use the patched
version and then run module cleanup (e.g., go mod tidy) to update the dependency
graph and ensure go.mod/go.sum reflect the removal.
In `@src/carnot/exec/clickhouse_export_sink_node.cc`:
- Around line 38-39: The sink currently dereferences mapped indices without
guarding for missing columns (see mapping.input_column_index() being passed to
rb.ColumnAt), so update the ClickhouseExportSinkNode code that iterates mappings
to check that the mapped input column actually exists before calling
rb.ColumnAt(mapping.input_column_index()); e.g., verify
mapping.input_column_index() is within rb.NumColumns() (or use rb.HasColumn /
equivalent) and skip/continue for mappings that are out-of-range or otherwise
invalid so the node ignores missing columns instead of crashing.
In `@src/e2e_test/perf_tool/pkg/cluster/context.go`:
- Around line 75-77: The code currently overwrites an empty kubeconfigPath with
clientcmd.RecommendedHomeFile, which can differ from what
NewDefaultClientConfigLoadingRules() actually used (including KUBECONFIG);
instead, when kubeconfigPath is empty, query the loading rules for the real
filename (e.g., call loadingRules.GetDefaultFilename() on the
ClientConfigLoadingRules returned by NewDefaultClientConfigLoadingRules()) and
assign that to kubeconfigPath so the value passed to AddEnv matches the actual
kubeconfig source used by the client config.
In `@src/e2e_test/perf_tool/pkg/exporter/bq_exporter.go`:
- Around line 86-107: ExportResults currently starts BatchInserter.Run(bqCh) in
a goroutine and defers close(bqCh) but returns without waiting, losing buffered
rows and swallowing errors; modify ExportResults to (1) create a done/error
channel or sync.WaitGroup and start inserter.Run(bqCh, ctx) (or wrap Run to
accept ctx) so the inserter can observe context cancellation, (2) close bqCh
only after draining resultCh, then wait for the inserter goroutine to finish
before returning, and (3) surface any insertion errors from
uploadBatch/BatchInserter back to ExportResults (via an error channel or
returned error) instead of only logging them; reference
BQExporter.ExportResults, bq.BatchInserter.Run and uploadBatch to locate where
to add waiting, context handling, and error propagation.
In `@src/e2e_test/perf_tool/pkg/exporter/parquet_exporter_test.go`:
- Around line 304-314: The test opens files in a loop and uses defer f.Close(),
which delays closing until the test function exits; change this so each file is
closed immediately after use: in the loop that iterates over uploadedFiles,
replace defer f.Close() with an explicit f.Close() call (checking/ignoring the
error) once you're done with parquet.OpenFile and the pf.NumRows() assertion, or
alternatively wrap the loop body in an inner anonymous func so the defer
f.Close() is scoped and runs at the end of each iteration; update the code paths
around os.Open, parquet.OpenFile, and pf.NumRows() accordingly.
In `@src/e2e_test/perf_tool/pkg/exporter/parquet_exporter.go`:
- Around line 141-144: The Close method on ParquetGCSExporter can panic if
gcsClient is nil (e.g., when an upload override is used in tests); update
ParquetGCSExporter's Close to check if e.gcsClient != nil before calling
e.gcsClient.Close() and return nil when it's nil, ensuring safe shutdown. Locate
the Close method on type ParquetGCSExporter and add the nil guard so Close only
calls gcsClient.Close() when the client exists.
- Around line 167-173: The loop writes each row with
writer.WriteRows([]parquet.Row{parquetRow}) causing per-call overhead; instead,
build a slice of parquet.Row using buildResultRow for each entry in rows (or
accumulate in fixed-size batches if memory is a concern) and call
writer.WriteRows once (or once per batch) to write them; preserve the existing
error handling (close tmpFile on error and return fmt.Errorf(...)) and update
the error message to reference the batch write failure where writer.WriteRows is
called.
In `@src/e2e_test/perf_tool/pkg/metrics/prometheus_recorder.go`:
- Around line 46-49: The struct field block in prometheus_recorder.go (fields
clusterCtx, ownsClusterCtx, spec, eg on the Prometheus recorder type) is not
gofmt-formatted; run the repository's Go formatter (e.g., gofmt -s -w or
goimports -w) on src/e2e_test/perf_tool/pkg/metrics/prometheus_recorder.go to
fix spacing/indentation so the struct declaration for the Prometheus recorder
(clusterCtx, ownsClusterCtx, spec, eg) conforms to the project's lint/format
rules and CI passes.
In `@src/e2e_test/perf_tool/pkg/suites/suites.go`:
- Around line 101-102: The clickhouseDSN variable currently contains hardcoded
credentials; remove the plaintext DSN and instead read it from a secure source
(e.g., an environment variable or a CLI flag). Replace references to
clickhouseDSN with a value loaded from os.Getenv("CLICKHOUSE_DSN") or a new flag
(e.g., add a flag similar to --prom_recorder_override) and validate it is
non-empty before use; keep clickhouseTable as-is or make it configurable via an
env var/flag if needed, and document placeholder usage so no secrets are
committed.
- Around line 89-91: Remove the leftover commented-out debug duration lines so
only the intended duration assignment remains: delete the "// preDur := 2 *
time.Minute" and "// dur := 5 * time.Minute" comment lines and keep the active
"dur := 30 * time.Minute" assignment in the suites setup (look for the dur
variable in suites.go).
In `@src/e2e_test/perf_tool/pkg/suites/workloads.go`:
- Around line 229-231: The workload currently sets a hardcoded placeholder
SkaffoldPath ("src/e2e_test/clickhouse_read_load/skaffold.yaml") which will
break deployment; update the code in workloads.go where SkaffoldPath is set (the
workload struct initialiser) to either: 1) gate inclusion by checking that the
real skaffold file exists (os.Stat or a helper like fileExists) before adding
this workload to active experiments, or 2) guard with a feature flag/config
(e.g., enableClickHouseReadLoad) and only set SkaffoldPath and wire the workload
when that flag is true; ensure the placeholder path is removed from default
active wiring until the real app and path land.
In `@src/e2e_test/perf_tool/ui/index.html`:
- Around line 428-429: The suite and workload filter values (variables
suiteFilter and workloadFilter) are interpolated directly into SQL and allow
injection; sanitize these inputs before building the SQL by escaping single
quotes and other SQL-breaking characters (e.g., replace each ' with ''), or run
them through a small helper like escapeSqlString(s) and use the escaped result
in the template strings for suiteFilter and workloadFilter; update any code that
constructs those filters to use the escaped values (references: suiteFilter,
workloadFilter) so no raw user/tag value is injected into the SQL.
- Line 107: The button element with id "gcs-load-btn" currently lacks an
explicit type and will default to type="submit"; change it to include
type="button" to prevent unintended form submission. Apply the same change (add
type="button") to the other plain <button class="btn"> elements in this file —
specifically the other buttons near the referenced lines (the other .btn buttons
around lines 120, 121, 131, and 150) so each has type="button".
In `@src/utils/shared/k8s/delete.go`:
- Around line 126-127: The method ObjectDeleter.getAggregatedGroupVersions
currently uses the deprecated non-generic sets API; update its signature return
type from sets.String to sets.Set[string], replace the initialization
sets.NewString() with sets.New[string](), and update the creation that uses
resource.Verbs from sets.NewString(resource.Verbs...) to
sets.New[string](resource.Verbs...); make these changes inside
getAggregatedGroupVersions so all set variables and returns use sets.Set[string]
and the generic sets.New[string](...) constructors.
- Around line 128-133: The code in getDeletableResourceTypes calls
o.dynamicClient.Resource(apiServiceGVR).List(...) and treats any error as fatal,
causing a permissions regression; update the error handling around that List
call (the apiServiceGVR dynamicClient call in delete.go /
getDeletableResourceTypes) to treat authorization failures like
NotFound/NoMatch: if errors.IsNotFound(err) || meta.IsNoMatchError(err) ||
errors.IsForbidden(err) || errors.IsUnauthorized(err) then return the current
out, nil (i.e., continue without the aggregated-resource skip) and emit a debug
log indicating APIService discovery was skipped due to lack of permission;
otherwise, keep returning the error as before.
In `@src/vizier/funcs/md_udtfs/md_udtfs_impl.h`:
- Around line 1158-1167: The code stores cluster_name_ without validation and
later interpolates it into DROP/CREATE SQL, enabling injection; in the
Init/constructor where cluster_name is assigned (the block that sets
cluster_name_ = std::string(cluster_name)), add strict identifier validation
(e.g. reject empty values, reject quotes/semicolons/whitespace and require a
conservative regex like ^[A-Za-z0-9_]+$), and if validation fails return an
error status instead of proceeding; apply the same validation check at the other
initialization sites referenced (the blocks around the DROP/CREATE usage noted
in the comment, e.g., the sections at ~1234-1237 and ~1315-1317) so
cluster_name_ is never interpolated into SQL unless it passes validation.
In `@src/vizier/services/adaptive_export/cmd/main.go`:
- Around line 176-179: The durEnv helper can return non-positive durations which
will cause time.NewTicker(pruneInterval) to panic and can invert attribution
windows; validate the duration returned by durEnv for pruneInterval and the
before/after window values (the variables named pruneInterval, beforeWindow,
afterWindow) and if the value is <= 0, replace it with the safe default used
previously (e.g., the second durEnv argument) or return an error; update the
code around the time.NewTicker(...) call and the logic handling the before/after
windows (the callers that use durEnv at lines creating those variables) to
perform this check before using the duration so negative or zero overrides are
rejected and the default is used instead.
- Around line 177-213: The shutdown path currently cancels the context then
unconditionally sleeps 500ms, which can cut off background work (the ticker
goroutine that calls ctl.PruneExpired and the controller started with ctl.Run)
that may take seconds; replace the naive sleep with coordinated waiting: start
those goroutines under a sync.WaitGroup or an errgroup (use the same ctx passed
to ctl.Run and the ticker), increment the group for the ticker goroutine and the
ctl.Run goroutine, ensure each decrements Done() when finished, and after
calling cancel() wait for the group to complete (and capture/log errors returned
by ctl.Run) instead of sleeping; keep signal handling (sigCh, cancel) but remove
the fixed time.Sleep so shutdown blocks until both the ticker goroutine and
ctl.Run finish.
- Around line 88-89: The code currently uses resolveHostname() which falls back
to os.Hostname(), causing incorrect pod hostnames for node-scoped queries;
change the logic so NODE_NAME (env var) is required: read NODE_NAME and if empty
log an error and exit instead of falling back, update the place where hostname
is assigned and logged (resolveHostname / hostname variable and the
log.WithField("hostname", hostname).Info call) and apply the same mandatory
check/exit behavior to the other block around lines 227-236 that also uses node
hostname resolution.
In `@src/vizier/services/adaptive_export/internal/clickhouse/apply.go`:
- Around line 86-96: Apply currently iterates OperatorOwnedTables and never runs
the database CREATE statement from schema.sql, causing table creation to fail on
a fresh cluster; update Applier.Apply to first obtain and execute the database
DDL (use DDL(...) for the database name or a dedicated function that returns the
"CREATE DATABASE IF NOT EXISTS forensic_db" statement) via a.execute(ctx, ddl)
before looping OperatorOwnedTables, and preserve the existing error-wrapping
style (fmt.Errorf) used in the loop for both fetching the database DDL and
executing it.
In `@src/vizier/services/adaptive_export/internal/clickhouse/ddl_test.go`:
- Around line 19-23: The import block in the test file is not formatted to
satisfy gci; run the project's import organizer (gci) or goimports/gofmt on the
file and fix the import grouping/order and spacing (ensure standard,
third-party, and local imports are grouped correctly), remove any unused imports
(e.g., check "errors" or "strings"), and re-save so the file passes the CI
formatting check referenced at Line 97.
- Around line 68-70: The test calls DDL(name) but ignores the returned error;
change the test to capture both ddl and err from DDL(name), check if err != nil
and fail immediately (e.g., t.Fatalf or t.Fatalf with the error) before
asserting that ddl does not contain "anomaly_hash" or "anomaly_hashes"; locate
the DDL(name) call and the strings.Contains checks in the test and insert the
err check so the test reports extraction failures rather than misleading
hash-column assertions.
In `@src/vizier/services/adaptive_export/internal/clickhouse/insert_test.go`:
- Line 110: The file ends with an extra trailing blank line after the final
closing brace in insert_test.go; remove that blank line (or run gofmt -w on
src/vizier/services/adaptive_export/internal/clickhouse/insert_test.go) so the
file ends immediately after the final closing brace of the test(s) in
insert_test.go, ensuring no extra newline is present to satisfy formatting
checks.
In `@src/vizier/services/adaptive_export/internal/controller/controller_test.go`:
- Around line 319-345: The test TestController_RestartMidStream_Aborts uses
fixed time.Sleep calls after trig.push(canonicalEvent()) which makes it flaky;
replace those sleeps with condition-based waits that poll or wait on
deterministic signals: after trig.push(...) wait until c.Active() == 1 (with a
short timeout) or wait on a channel/notification from the fakeTrigger/fakeSink
indicating the event was received/processed, and in the goroutine that runs
c.Run(ctx) use the existing done channel with a select+timeout to assert abort;
update the two occurrences around trig.push(...) (the Sleep at the first push
and the Sleep before cancel) to use these condition-based waits so the test no
longer depends on wall-clock sleeps.
In `@src/vizier/services/adaptive_export/internal/e2e/e2e_test.go`:
- Line 82: Replace the numeric literal 405 in the HTTP error response with the
standard constant http.StatusMethodNotAllowed; locate the call to http.Error(w,
"method", 405) (inside the request handler) and change the status argument to
http.StatusMethodNotAllowed so the code uses the named HTTP status constant for
clarity and maintainability.
In `@src/vizier/services/adaptive_export/internal/kubescape/extract_test.go`:
- Around line 94-95: The test currently ignores errors from Extract when calling
Extract(canonicalRow()) and Extract(r), which can mask parsing failures; change
both calls to capture the error (e.g., a, err := Extract(canonicalRow()) and b,
err := Extract(r)) and fail the test immediately on error (use t.Fatalf or
t.Fatalf with a clear message) so the reorder-stability test cannot pass with
zero-values when parsing fails; ensure you reference the Extract calls around
canonicalRow() and r in the reorder-stability test and include the error in the
failure message for debugging.
In `@src/vizier/services/adaptive_export/internal/kubescape/extract.go`:
- Around line 30-36: The import block in extract.go is not formatted per gci
rules: keep the standard library imports (encoding/json, errors, fmt) sorted
together, add a separating blank line, then the local package import
(px.dev/pixie/.../anomaly); update the imports block around those symbols so
stdlib and local imports are split by a blank line and re-run gci or gofmt to
satisfy the linter.
In `@src/vizier/services/adaptive_export/internal/pixie/pixie.go`:
- Around line 68-72: The isInternal check in Client.init currently uses
strings.ContainsAny(c.cloudAddr, "cluster.local") which incorrectly matches many
hostnames and disables TLS verification; update Client.init to determine
internal addresses correctly by parsing c.cloudAddr to extract the hostname
(e.g., via net.SplitHostPort or url/hostname parsing) and then use
strings.HasSuffix(hostname, "cluster.local") or an exact match to set
isInternal, so tls.Config{InsecureSkipVerify: isInternal} is only true for
genuine internal cluster addresses; keep the rest of the flow
(credentials.NewTLS, grpc.Dial) unchanged.
In `@src/vizier/services/adaptive_export/internal/pxl/tables_test.go`:
- Around line 23-32: Update the test comment in TestBuiltinTables_Has13 to
clearly state that BuiltinTables currently contains 12 table entries even though
the user-specified protocol list contains 13 protocol identifiers (because
http2_messages.beta is a separate protocol entry but does not add an extra
table), and change the wording to explicitly say "BuiltinTables should have 12
entries; the protocol list contains 13 identifiers but one maps to an existing
table (http2_messages.beta is listed separately from http_events)". Keep the
guard logic unchanged and reference BuiltinTables and TestBuiltinTables_Has13 so
future maintainers understand the difference between protocol identifiers and
table entries.
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse_test.go`:
- Line 55: This test file is failing gofumpt formatting around the top-level var
block ('var ('); run gofumpt to reformat the file (e.g., gofumpt -w <that test
file>) or apply the gofumpt style changes to the 'var (' declaration and
surrounding code, then save and commit the resulting formatting changes so CI
will pass; locate the offending block by searching for the 'var (' token in the
test file (clickhouse_test.go) and ensure the formatted output is committed.
In `@src/vizier/services/adaptive_export/internal/trigger/clickhouse.go`:
- Around line 103-121: The cursor using only event_time (watermark) is unsafe
because advancing watermark to maxSeen can permanently skip later-arriving rows
with the same event_time; update the logic around t.fetchSince, watermark, and
maxSeen to use a stable secondary key (e.g., primary key or id) or switch to an
inclusive query plus downstream dedupe: modify t.fetchSince to accept/return a
composite cursor (event_time + id) or return rows ordered by (event_time, id)
and a (maxTime,maxID) pair, change the poll loop that calls t.fetchSince and the
watermark update to advance to the composite cursor (or use event_time >=
watermark and maintain a dedupe set of last-seen IDs before sending to out), and
ensure kubescape.Extract/out sending still respects deduplication so no event
with equal event_time but later id is lost.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 0355ed64-49e6-4d9d-a791-ada36b3daea6
⛔ Files ignored due to path filters (2)
go.sumis excluded by!**/*.sumsrc/e2e_test/perf_tool/experimentpb/experiment.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (64)
go.modgo_deps.bzlsrc/carnot/exec/clickhouse_export_sink_node.ccsrc/carnot/funcs/metadata/metadata_ops.ccsrc/carnot/funcs/metadata/metadata_ops.hsrc/e2e_test/perf_tool/cmd/BUILD.bazelsrc/e2e_test/perf_tool/cmd/run.gosrc/e2e_test/perf_tool/experimentpb/experiment.protosrc/e2e_test/perf_tool/pkg/cluster/context.gosrc/e2e_test/perf_tool/pkg/exporter/BUILD.bazelsrc/e2e_test/perf_tool/pkg/exporter/bq_exporter.gosrc/e2e_test/perf_tool/pkg/exporter/exporter.gosrc/e2e_test/perf_tool/pkg/exporter/parquet_exporter.gosrc/e2e_test/perf_tool/pkg/exporter/parquet_exporter_test.gosrc/e2e_test/perf_tool/pkg/metrics/prometheus_recorder.gosrc/e2e_test/perf_tool/pkg/metrics/recorder.gosrc/e2e_test/perf_tool/pkg/run/BUILD.bazelsrc/e2e_test/perf_tool/pkg/run/run.gosrc/e2e_test/perf_tool/pkg/suites/BUILD.bazelsrc/e2e_test/perf_tool/pkg/suites/experiments.gosrc/e2e_test/perf_tool/pkg/suites/metrics.gosrc/e2e_test/perf_tool/pkg/suites/scripts/clickhouse_export.pxlsrc/e2e_test/perf_tool/pkg/suites/scripts/clickhouse_read.pxlsrc/e2e_test/perf_tool/pkg/suites/suites.gosrc/e2e_test/perf_tool/pkg/suites/workloads.gosrc/e2e_test/perf_tool/ui/index.htmlsrc/utils/shared/k8s/delete.gosrc/vizier/funcs/md_udtfs/md_udtfs_impl.hsrc/vizier/services/adaptive_export/cmd/BUILD.bazelsrc/vizier/services/adaptive_export/cmd/main.gosrc/vizier/services/adaptive_export/internal/anomaly/BUILD.bazelsrc/vizier/services/adaptive_export/internal/anomaly/hash.gosrc/vizier/services/adaptive_export/internal/anomaly/hash_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/BUILD.bazelsrc/vizier/services/adaptive_export/internal/clickhouse/apply.gosrc/vizier/services/adaptive_export/internal/clickhouse/apply_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/ddl.gosrc/vizier/services/adaptive_export/internal/clickhouse/ddl_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/insert.gosrc/vizier/services/adaptive_export/internal/clickhouse/insert_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/schema.sqlsrc/vizier/services/adaptive_export/internal/config/BUILD.bazelsrc/vizier/services/adaptive_export/internal/config/definition.gosrc/vizier/services/adaptive_export/internal/controller/BUILD.bazelsrc/vizier/services/adaptive_export/internal/controller/controller.gosrc/vizier/services/adaptive_export/internal/controller/controller_test.gosrc/vizier/services/adaptive_export/internal/e2e/BUILD.bazelsrc/vizier/services/adaptive_export/internal/e2e/e2e_test.gosrc/vizier/services/adaptive_export/internal/kubescape/BUILD.bazelsrc/vizier/services/adaptive_export/internal/kubescape/extract.gosrc/vizier/services/adaptive_export/internal/kubescape/extract_test.gosrc/vizier/services/adaptive_export/internal/pixie/BUILD.bazelsrc/vizier/services/adaptive_export/internal/pixie/pixie.gosrc/vizier/services/adaptive_export/internal/pxl/BUILD.bazelsrc/vizier/services/adaptive_export/internal/pxl/pxl.gosrc/vizier/services/adaptive_export/internal/pxl/tables.gosrc/vizier/services/adaptive_export/internal/pxl/tables_test.gosrc/vizier/services/adaptive_export/internal/script/script.gosrc/vizier/services/adaptive_export/internal/sink/BUILD.bazelsrc/vizier/services/adaptive_export/internal/sink/clickhouse.gosrc/vizier/services/adaptive_export/internal/sink/clickhouse_test.gosrc/vizier/services/adaptive_export/internal/trigger/BUILD.bazelsrc/vizier/services/adaptive_export/internal/trigger/clickhouse.gosrc/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go
💤 Files with no reviewable changes (4)
- src/vizier/services/adaptive_export/internal/pixie/BUILD.bazel
- src/vizier/services/adaptive_export/internal/pxl/pxl.go
- src/vizier/services/adaptive_export/internal/config/definition.go
- src/vizier/services/adaptive_export/internal/script/script.go
| gopkg.in/yaml.v2 => gopkg.in/yaml.v2 v2.4.0 | ||
| ) | ||
|
|
||
| replace google.golang.org/protobuf => google.golang.org/protobuf v1.29.1 |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
What is the first patched (non-vulnerable) version of google.golang.org/protobuf for GHSA-8r3f-844c-mc37 / GO-2024-2611?
💡 Result:
The first patched (non-vulnerable) version of google.golang.org/protobuf for GHSA-8r3f-844c-mc37 (also known as GO-2024-2611 and CVE-2024-24786) is v1.33.0.[1][2][3] This version fixes the infinite loop in protojson.Unmarshal when handling certain invalid JSON inputs, particularly with google.protobuf.Any or UnmarshalOptions.DiscardUnknown set.[4] Official advisories confirm affected versions are before v1.33.0, with v1.33.0 as the patched version.[1][5][2]
Citations:
- 1: GHSA-8r3f-844c-mc37
- 2: https://pkg.go.dev/vuln/GO-2024-2611
- 3: https://osv.dev/vulnerability/GO-2024-2611
- 4: https://github.com/protocolbuffers/protobuf-go/releases/tag/v1.33.0
- 5: https://osv.dev/vulnerability/GHSA-8r3f-844c-mc37
🏁 Script executed:
# Read the go.mod file around line 324 to see context and any comments
head -n 330 go.mod | tail -n 20Repository: k8sstormcenter/pixie
Length of output: 1339
🏁 Script executed:
# Search for all protobuf-related entries in go.mod
rg "protobuf" go.mod -n -A 1 -B 1Repository: k8sstormcenter/pixie
Length of output: 1000
🏁 Script executed:
# Check the require section to see what version of protobuf would be used without the replace
rg "google.golang.org/protobuf" go.mod -nRepository: k8sstormcenter/pixie
Length of output: 193
Remove the forced protobuf downgrade to a vulnerable version.
Line 324 pins google.golang.org/protobuf to v1.29.1, which contains the high-severity infinite-loop vulnerability (GHSA-8r3f-844c-mc37 / GO-2024-2611). This replace directive downgrades from the safe v1.34.2 already declared in the require section. Removing this replace allows the patched version to be used.
Proposed fix
-replace google.golang.org/protobuf => google.golang.org/protobuf v1.29.1📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| replace google.golang.org/protobuf => google.golang.org/protobuf v1.29.1 |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@go.mod` at line 324, The go.mod file contains a replace directive forcing
google.golang.org/protobuf to v1.29.1 (the vulnerable downgrade) while the
require already specifies a safe v1.34.2; remove the replace directive
referencing google.golang.org/protobuf v1.29.1 so the module can use the patched
version and then run module cleanup (e.g., go mod tidy) to update the dependency
graph and ensure go.mod/go.sum reflect the removal.
| // TODO(ddelnano): Defend against columns that don't exist. These should be | ||
| // ignored by the Node. |
There was a problem hiding this comment.
Implement the missing-column guard before dereferencing mapped indices.
Line 38 states missing columns should be ignored, but Line 107 still unconditionally calls rb.ColumnAt(mapping.input_column_index()). A stale or partial mapping can crash the sink instead of skipping the column.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/carnot/exec/clickhouse_export_sink_node.cc` around lines 38 - 39, The
sink currently dereferences mapped indices without guarding for missing columns
(see mapping.input_column_index() being passed to rb.ColumnAt), so update the
ClickhouseExportSinkNode code that iterates mappings to check that the mapped
input column actually exists before calling
rb.ColumnAt(mapping.input_column_index()); e.g., verify
mapping.input_column_index() is within rb.NumColumns() (or use rb.HasColumn /
equivalent) and skip/continue for mappings that are out-of-range or otherwise
invalid so the node ignores missing columns instead of crashing.
| if kubeconfigPath == "" { | ||
| kubeconfigPath = clientcmd.RecommendedHomeFile | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
find . -path "*src/e2e_test/perf_tool/pkg/cluster/context.go" -type fRepository: k8sstormcenter/pixie
Length of output: 112
🏁 Script executed:
cat -n src/e2e_test/perf_tool/pkg/cluster/context.go | head -100Repository: k8sstormcenter/pixie
Length of output: 3768
🏁 Script executed:
grep -n "NewNonInteractiveDeferredLoadingClientConfig" src/e2e_test/perf_tool/pkg/cluster/context.goRepository: k8sstormcenter/pixie
Length of output: 158
🏁 Script executed:
grep -n "AddEnv\|configPath" src/e2e_test/perf_tool/pkg/cluster/context.goRepository: k8sstormcenter/pixie
Length of output: 821
🏁 Script executed:
cat -n src/e2e_test/perf_tool/pkg/cluster/context.go | tail -50Repository: k8sstormcenter/pixie
Length of output: 1797
🏁 Script executed:
rg -A 5 -B 5 "NewDefaultClientConfigLoadingRules" --type goRepository: k8sstormcenter/pixie
Length of output: 975
🏁 Script executed:
rg "RecommendedConfigPathEnvVar|RecommendedHomeFile" --type goRepository: k8sstormcenter/pixie
Length of output: 159
🌐 Web query:
clientcmd.NewDefaultClientConfigLoadingRules KUBECONFIG environment variable
💡 Result:
clientcmd.NewDefaultClientConfigLoadingRules creates a ClientConfigLoadingRules struct that defines the default precedence for loading Kubernetes configuration files, with the KUBECONFIG environment variable taking highest priority [1][2][3]. Specifically, it first checks os.Getenv("KUBECONFIG") (const RecommendedConfigPathEnvVar = "KUBECONFIG"); if set and non-empty, it splits the value by path delimiters (e.g., colon on Unix) into a deduplicated list of files and sets those as the Precedence slice, also enabling WarnIfAllMissing=true to warn if all listed files are absent [2]. If KUBECONFIG is unset or empty, Precedence is set to just the default home file (~/.kube/config, const RecommendedHomeFile) [2][3]. It also sets default MigrationRules for legacy ~/.kube/.kubeconfig migration when using the home file [2]. The ExplicitPath field remains empty unless set separately. This matches kubectl's documented loading order: --kubeconfig flag first (explicit), then $KUBECONFIG files (merged), then ~/.kube/config [4]. Example usage: loadingRules:= clientcmd.NewDefaultClientConfigLoadingRules; config, err:= loadingRules.GetStartingConfig [5].
Citations:
- 1: https://godoc.org/k8s.io/client-go/tools/clientcmd
- 2: https://github.com/kubernetes/client-go/blob/master/tools/clientcmd/loader.go
- 3: https://pkg.go.dev/k8s.io/client-go/tools/clientcmd
- 4: https://kubernetes.io/docs/reference/kubectl/generated/kubectl_config/
- 5: https://kubernetes.io/blog/2026/01/19/clientcmd-apiserver-access/
🏁 Script executed:
# Verify that the constant names are correct in the codebase
grep -r "RecommendedConfigPathEnvVar" . --include="*.go" | head -5Repository: k8sstormcenter/pixie
Length of output: 46
🏁 Script executed:
# Check if os package is imported
grep "^import" -A 10 src/e2e_test/perf_tool/pkg/cluster/context.go | head -15Repository: k8sstormcenter/pixie
Length of output: 230
Preserve the actual kubeconfig source when kubeconfigPath is empty.
When kubeconfigPath is empty, NewDefaultClientConfigLoadingRules() (line 64) respects the KUBECONFIG environment variable, potentially loading config from a non-default location. However, lines 75-77 unconditionally set kubeconfigPath to clientcmd.RecommendedHomeFile, causing AddEnv() to expose a different kubeconfig path to spawned commands than what was actually used to load the context.
Proposed fix
@@
if kubeconfigPath == "" {
- kubeconfigPath = clientcmd.RecommendedHomeFile
+ kubeconfigPath = os.Getenv(clientcmd.RecommendedConfigPathEnvVar)
+ if kubeconfigPath == "" {
+ kubeconfigPath = clientcmd.RecommendedHomeFile
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if kubeconfigPath == "" { | |
| kubeconfigPath = clientcmd.RecommendedHomeFile | |
| } | |
| if kubeconfigPath == "" { | |
| kubeconfigPath = os.Getenv(clientcmd.RecommendedConfigPathEnvVar) | |
| if kubeconfigPath == "" { | |
| kubeconfigPath = clientcmd.RecommendedHomeFile | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/e2e_test/perf_tool/pkg/cluster/context.go` around lines 75 - 77, The code
currently overwrites an empty kubeconfigPath with clientcmd.RecommendedHomeFile,
which can differ from what NewDefaultClientConfigLoadingRules() actually used
(including KUBECONFIG); instead, when kubeconfigPath is empty, query the loading
rules for the real filename (e.g., call loadingRules.GetDefaultFilename() on the
ClientConfigLoadingRules returned by NewDefaultClientConfigLoadingRules()) and
assign that to kubeconfigPath so the value passed to AddEnv matches the actual
kubeconfig source used by the client config.
| // ExportResults consumes metrics from resultCh and inserts them into BigQuery in batches. | ||
| func (e *BQExporter) ExportResults(ctx context.Context, expID uuid.UUID, resultCh <-chan *metrics.ResultRow) error { | ||
| bqCh := make(chan interface{}) | ||
| defer close(bqCh) | ||
|
|
||
| inserter := &bq.BatchInserter{ | ||
| Table: e.resultTable, | ||
| BatchSize: 512, | ||
| PushTimeout: 2 * time.Minute, | ||
| } | ||
| go inserter.Run(bqCh) | ||
|
|
||
| for row := range resultCh { | ||
| bqRow, err := MetricsRowToResultRow(expID, row) | ||
| if err != nil { | ||
| log.WithError(err).Error("Failed to convert result row") | ||
| continue | ||
| } | ||
| bqCh <- bqRow | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for BatchInserter implementation to understand its behavior
ast-grep --pattern $'func ($_ *BatchInserter) Run($_) {
$$$
}'Repository: k8sstormcenter/pixie
Length of output: 1455
🏁 Script executed:
# Find the uploadBatch method to understand what it does
rg -A 15 "func.*\buploadBatch" src/shared/bq/batch_inserter.goRepository: k8sstormcenter/pixie
Length of output: 378
🏁 Script executed:
# Check the full BatchInserter.Run method context including error handling
cat -n src/shared/bq/batch_inserter.go | head -70Repository: k8sstormcenter/pixie
Length of output: 2366
🏁 Script executed:
# Find callers of ExportResults to see if they expect synchronization or error handling
rg -B 3 -A 3 "ExportResults" src/e2e_test/perf_tool/Repository: k8sstormcenter/pixie
Length of output: 6476
🏁 Script executed:
# Verify the bq.Table.Inserter().Put() behavior - does it actually wait for BigQuery?
rg -A 5 "func.*Put" src/shared/bq/ || echo "Checking BigQuery API docs..."Repository: k8sstormcenter/pixie
Length of output: 94
Race condition: ExportResults returns before the BatchInserter goroutine completes.
The function starts inserter.Run(bqCh) as a goroutine but returns immediately after closing bqCh (line 89 defer), without waiting for the inserter to finish. Although uploadBatch() is called after the channel closes, it executes within the goroutine. If the function returns and the process exits before the inserter completes, buffered rows may not be inserted.
Additionally, uploadBatch() logs insertion errors but does not propagate them—errors are silently lost. The ctx parameter is unused; context cancellation has no effect on the inserter.
🛠️ Proposed fix to wait for inserter completion
func (e *BQExporter) ExportResults(ctx context.Context, expID uuid.UUID, resultCh <-chan *metrics.ResultRow) error {
bqCh := make(chan interface{})
- defer close(bqCh)
inserter := &bq.BatchInserter{
Table: e.resultTable,
BatchSize: 512,
PushTimeout: 2 * time.Minute,
}
- go inserter.Run(bqCh)
+ done := make(chan struct{})
+ go func() {
+ inserter.Run(bqCh)
+ close(done)
+ }()
for row := range resultCh {
bqRow, err := MetricsRowToResultRow(expID, row)
if err != nil {
log.WithError(err).Error("Failed to convert result row")
continue
}
- bqCh <- bqRow
+ select {
+ case bqCh <- bqRow:
+ case <-ctx.Done():
+ close(bqCh)
+ <-done
+ return ctx.Err()
+ }
}
+ close(bqCh)
+ <-done
return nil
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/e2e_test/perf_tool/pkg/exporter/bq_exporter.go` around lines 86 - 107,
ExportResults currently starts BatchInserter.Run(bqCh) in a goroutine and defers
close(bqCh) but returns without waiting, losing buffered rows and swallowing
errors; modify ExportResults to (1) create a done/error channel or
sync.WaitGroup and start inserter.Run(bqCh, ctx) (or wrap Run to accept ctx) so
the inserter can observe context cancellation, (2) close bqCh only after
draining resultCh, then wait for the inserter goroutine to finish before
returning, and (3) surface any insertion errors from uploadBatch/BatchInserter
back to ExportResults (via an error channel or returned error) instead of only
logging them; reference BQExporter.ExportResults, bq.BatchInserter.Run and
uploadBatch to locate where to add waiting, context handling, and error
propagation.
| for _, localPath := range uploadedFiles { | ||
| f, err := os.Open(localPath) | ||
| require.NoError(t, err) | ||
| defer f.Close() | ||
|
|
||
| stat, err := f.Stat() | ||
| require.NoError(t, err) | ||
|
|
||
| pf, err := parquet.OpenFile(f, stat.Size()) | ||
| require.NoError(t, err) | ||
| assert.Equal(t, int64(3), pf.NumRows()) |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Defer in loop may delay file handle cleanup.
Using defer f.Close() inside a loop means all file handles remain open until the function returns. For this test with only one iteration, it's benign, but it's a pattern to avoid.
♻️ Suggested fix
for _, localPath := range uploadedFiles {
f, err := os.Open(localPath)
require.NoError(t, err)
- defer f.Close()
stat, err := f.Stat()
require.NoError(t, err)
pf, err := parquet.OpenFile(f, stat.Size())
require.NoError(t, err)
assert.Equal(t, int64(3), pf.NumRows())
+ f.Close()
// Verify schema has tag columns from the union of all rows.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| for _, localPath := range uploadedFiles { | |
| f, err := os.Open(localPath) | |
| require.NoError(t, err) | |
| defer f.Close() | |
| stat, err := f.Stat() | |
| require.NoError(t, err) | |
| pf, err := parquet.OpenFile(f, stat.Size()) | |
| require.NoError(t, err) | |
| assert.Equal(t, int64(3), pf.NumRows()) | |
| for _, localPath := range uploadedFiles { | |
| f, err := os.Open(localPath) | |
| require.NoError(t, err) | |
| stat, err := f.Stat() | |
| require.NoError(t, err) | |
| pf, err := parquet.OpenFile(f, stat.Size()) | |
| require.NoError(t, err) | |
| assert.Equal(t, int64(3), pf.NumRows()) | |
| f.Close() |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/e2e_test/perf_tool/pkg/exporter/parquet_exporter_test.go` around lines
304 - 314, The test opens files in a loop and uses defer f.Close(), which delays
closing until the test function exits; change this so each file is closed
immediately after use: in the loop that iterates over uploadedFiles, replace
defer f.Close() with an explicit f.Close() call (checking/ignoring the error)
once you're done with parquet.OpenFile and the pf.NumRows() assertion, or
alternatively wrap the loop body in an inner anonymous func so the defer
f.Close() is scoped and runs at the end of each iteration; update the code paths
around os.Open, parquet.OpenFile, and pf.NumRows() accordingly.
| // TestSink_Write_PostsCorrectQueryAndBody — INSERT targets the right | ||
| // table; body is one JSON object per line with all attribution fields. | ||
| func TestSink_Write_PostsCorrectQueryAndBody(t *testing.T) { | ||
| var ( |
There was a problem hiding this comment.
Run gofumpt on this test file to unblock CI.
Pipeline reports this file as not properly formatted (gofumpt) at Line 55.
🧰 Tools
🪛 GitHub Actions: pr-linter / 0_run-container-lint.txt
[error] 55-55: golangci-lint failed (gofumpt): File is not properly formatted (gofumpt)
🪛 golangci-lint (2.12.1)
[error] 55-55: File is not properly formatted
(gofumpt)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/sink/clickhouse_test.go` at line
55, This test file is failing gofumpt formatting around the top-level var block
('var ('); run gofumpt to reformat the file (e.g., gofumpt -w <that test file>)
or apply the gofumpt style changes to the 'var (' declaration and surrounding
code, then save and commit the resulting formatting changes so CI will pass;
locate the offending block by searching for the 'var (' token in the test file
(clickhouse_test.go) and ensure the formatted output is committed.
Signed-off-by: entlein <einentlein@gmail.com>
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (1)
src/vizier/services/adaptive_export/internal/pixie/pixie.go (1)
70-70: 🧹 Nitpick | 🔵 Trivial | ⚡ Quick winSet TLS
MinVersionto enforce minimum protocol security.The static analysis correctly flags the missing
MinVersion. Adding this ensures the client won't negotiate deprecated TLS versions.🔒 Proposed fix to add MinVersion
- tlsConfig := &tls.Config{InsecureSkipVerify: isInternal} + tlsConfig := &tls.Config{ + InsecureSkipVerify: isInternal, + MinVersion: tls.VersionTLS12, + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/vizier/services/adaptive_export/internal/pixie/pixie.go` at line 70, The tls.Config construction for the Pixie client sets InsecureSkipVerify but omits MinVersion; update the tlsConfig initialization (variable tlsConfig) to include MinVersion: tls.VersionTLS12 (or tls.VersionTLS13 if you require only TLS1.3) so the client will refuse to negotiate deprecated TLS protocols; keep InsecureSkipVerify: isInternal as-is and ensure the change is applied where tlsConfig is used (e.g., the Pixie client TLS setup).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/cmd/main.go`:
- Line 140: The lifecycle step comments in main (around the bootstrap sequence
that builds trigger+sink+controller and the subsequent lifecycle blocks) have
duplicated/incorrect numbering: update the comment preceding the
trigger/sink/controller construction (currently "2.") to "4.", change the later
comments that read "3.", "4.", and "5." to "5.", "6.", and "7." respectively so
the sequence is monotonically increasing; locate these comments near the main
bootstrap function that constructs the trigger/sink/controller and the following
lifecycle blocks and correct their numeric prefixes accordingly.
- Line 282: Remove the dead-code placeholder "var _ = fmt.Sprintf" — delete the
unused fmt.Sprintf reference so the code no longer has an unnecessary blank
identifier assignment; keep the fmt import intact because fmt.Errorf is used
elsewhere (so no other changes to imports are required).
In `@src/vizier/services/adaptive_export/internal/pixie/pixie.go`:
- Around line 68-72: The internal-hostname check in Client.init incorrectly uses
strings.Contains on c.cloudAddr which can match substrings (e.g.,
"cluster.local.attacker.com") and incorrectly set tls.Config.InsecureSkipVerify;
instead parse the host portion (use net.SplitHostPort or net/url/ net.ParseIP
depending on format) from c.cloudAddr, then perform a suffix check against
"cluster.local" (e.g., strings.HasSuffix(host, ".cluster.local") or host ==
"cluster.local") to set isInternal; update Client.init to derive host, compute
isInternal from the suffix-safe check, and then create
tls.Config{InsecureSkipVerify: isInternal} and proceed with credentials.NewTLS
and grpc.Dial as before.
---
Duplicate comments:
In `@src/vizier/services/adaptive_export/internal/pixie/pixie.go`:
- Line 70: The tls.Config construction for the Pixie client sets
InsecureSkipVerify but omits MinVersion; update the tlsConfig initialization
(variable tlsConfig) to include MinVersion: tls.VersionTLS12 (or
tls.VersionTLS13 if you require only TLS1.3) so the client will refuse to
negotiate deprecated TLS protocols; keep InsecureSkipVerify: isInternal as-is
and ensure the change is applied where tlsConfig is used (e.g., the Pixie client
TLS setup).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 0a5ad53f-7732-4ae6-a44a-8927b4ecc794
📒 Files selected for processing (12)
src/vizier/services/adaptive_export/cmd/main.gosrc/vizier/services/adaptive_export/internal/clickhouse/apply.gosrc/vizier/services/adaptive_export/internal/clickhouse/apply_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/ddl_test.gosrc/vizier/services/adaptive_export/internal/clickhouse/insert_test.gosrc/vizier/services/adaptive_export/internal/controller/controller.gosrc/vizier/services/adaptive_export/internal/controller/controller_test.gosrc/vizier/services/adaptive_export/internal/e2e/e2e_test.gosrc/vizier/services/adaptive_export/internal/kubescape/extract.gosrc/vizier/services/adaptive_export/internal/kubescape/extract_test.gosrc/vizier/services/adaptive_export/internal/pixie/pixie.gosrc/vizier/services/adaptive_export/internal/pxl/tables_test.go
|
|
||
| // Tear down the per-cluster ch-* retention scripts so the demo can be re-run cleanly. | ||
| current, err := client.GetClusterScripts(clusterID, clusterName) | ||
| // 2. Build trigger + sink + controller. |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | 💤 Low value
Fix comment numbering: step 2 is duplicated.
The lifecycle step comments restart numbering at line 140 (says "2." but should be "4."). This creates confusion about the actual boot sequence. The subsequent comments at lines 172, 179, and 202 are also off by 2.
📝 Proposed fix
- // 2. Build trigger + sink + controller.
+ // 4. Build trigger + sink + controller.And similarly update:
- Line 172:
// 3.→// 5. - Line 179:
// 4.→// 6. - Line 202:
// 5.→// 7.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/cmd/main.go` at line 140, The lifecycle
step comments in main (around the bootstrap sequence that builds
trigger+sink+controller and the subsequent lifecycle blocks) have
duplicated/incorrect numbering: update the comment preceding the
trigger/sink/controller construction (currently "2.") to "4.", change the later
comments that read "3.", "4.", and "5." to "5.", "6.", and "7." respectively so
the sequence is monotonically increasing; locate these comments near the main
bootstrap function that constructs the trigger/sink/controller and the following
lifecycle blocks and correct their numeric prefixes accordingly.
| return time.Duration(n) * unit | ||
| } | ||
|
|
||
| var _ = fmt.Sprintf |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Remove dead code: unused fmt.Sprintf reference.
This blank identifier assignment is unnecessary since fmt.Errorf is used at line 255, which already justifies the fmt import.
🧹 Proposed fix
-var _ = fmt.Sprintf📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| var _ = fmt.Sprintf |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/cmd/main.go` at line 282, Remove the
dead-code placeholder "var _ = fmt.Sprintf" — delete the unused fmt.Sprintf
reference so the code no longer has an unnecessary blank identifier assignment;
keep the fmt import intact because fmt.Errorf is used elsewhere (so no other
changes to imports are required).
| func (c *Client) init() error { | ||
| isInternal := strings.ContainsAny(c.cloudAddr, "cluster.local") | ||
|
|
||
| isInternal := strings.Contains(c.cloudAddr, "cluster.local") | ||
| tlsConfig := &tls.Config{InsecureSkipVerify: isInternal} | ||
| creds := credentials.NewTLS(tlsConfig) | ||
|
|
||
| conn, err := grpc.Dial(c.cloudAddr, grpc.WithTransportCredentials(creds)) |
There was a problem hiding this comment.
strings.Contains still allows unsafe matches for internal hostname check.
While strings.Contains is safer than the previous ContainsAny, it still matches hostnames like cluster.local.attacker.com:443 or evil.cluster.local.example.com:443, which would incorrectly disable TLS verification.
Use suffix matching after extracting the hostname:
🔒 Proposed fix for hostname validation
+import "net"
+
func (c *Client) init() error {
- isInternal := strings.Contains(c.cloudAddr, "cluster.local")
+ host := c.cloudAddr
+ if h, _, err := net.SplitHostPort(c.cloudAddr); err == nil {
+ host = h
+ }
+ isInternal := host == "cluster.local" || strings.HasSuffix(host, ".cluster.local")
tlsConfig := &tls.Config{InsecureSkipVerify: isInternal}🧰 Tools
🪛 ast-grep (0.42.1)
[warning] 69-69: MinVersionis missing from this TLS configuration. By default, TLS 1.2 is currently used as the minimum when acting as a client, and TLS 1.0 when acting as a server. General purpose web applications should default to TLS 1.3 with all other protocols disabled. Only where it is known that a web server must support legacy clients with unsupported an insecure browsers (such as Internet Explorer 10), it may be necessary to enable TLS 1.0 to provide support. AddMinVersion: tls.VersionTLS13' to the TLS configuration to bump the minimum version to TLS 1.3.
Context: tls.Config{InsecureSkipVerify: isInternal}
Note: [CWE-327]: Use of a Broken or Risky Cryptographic Algorithm [OWASP A03:2017]: Sensitive Data Exposure [OWASP A02:2021]: Cryptographic Failures [REFERENCES]
https://owasp.org/Top10/A02_2021-Cryptographic_Failures
(missing-ssl-minversion-go)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/pixie/pixie.go` around lines 68
- 72, The internal-hostname check in Client.init incorrectly uses
strings.Contains on c.cloudAddr which can match substrings (e.g.,
"cluster.local.attacker.com") and incorrectly set tls.Config.InsecureSkipVerify;
instead parse the host portion (use net.SplitHostPort or net/url/ net.ParseIP
depending on format) from c.cloudAddr, then perform a suffix check against
"cluster.local" (e.g., strings.HasSuffix(host, ".cluster.local") or host ==
"cluster.local") to set isInternal; update Client.init to derive host, compute
isInternal from the suffix-safe check, and then create
tls.Config{InsecureSkipVerify: isInternal} and proceed with credentials.NewTLS
and grpc.Dial as before.
Switch the kubescape_logs poll from `event_time > watermark` to `event_time >= watermark` and track a per-boundary fingerprint set (sha256 of EventTime + RuleID + Hostname + JSON content). Rows that land with the boundary timestamp after a poll completed are no longer silently dropped — they are deduped against the fingerprint set on the next poll. Addresses CodeRabbit comment on PR #37 src/vizier/services/adaptive_export/internal/trigger/clickhouse.go:121.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go`:
- Around line 127-146: Add a unit test in clickhouse_test.go that verifies
fingerprint-based deduplication at the watermark boundary: create an
httptest.Server handler that on consecutive polls first returns the same
canonical row twice (to simulate a late duplicate) and then returns a different
row with the same event_time; instantiate the trigger via New(Config{Endpoint:
srv.URL, Hostname: "node-1", PollInterval: 30*time.Millisecond}), Subscribe(ctx)
and assert that the duplicate row is emitted only once while the distinct row
with the same event_time is also emitted; reference the existing TestTrigger_*
tests and use the same canonicalRowJSON, New, Config and Subscribe symbols to
locate where to add the test.
In `@src/vizier/services/adaptive_export/internal/trigger/clickhouse.go`:
- Around line 168-175: The SQL built in ClickHouseHTTP.fetchSince interpolates
t.cfg.Database and t.cfg.Table directly, allowing identifier injection; update
the ClickHouseHTTP constructor (New) to validate or safely quote these
identifiers: validate cfg.Database and cfg.Table against
/^[A-Za-z_][A-Za-z0-9_]*$/ and return an error on mismatch, or normalize and
wrap them in ClickHouse identifier quotes (backticks) so fetchSince uses only
validated/quoted values; keep quoteCH for hostname literal escaping and ensure
any change is applied to the stored t.cfg fields used by fetchSince.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 834318a0-cbd7-43be-828a-f0b703a6837c
📒 Files selected for processing (2)
src/vizier/services/adaptive_export/internal/trigger/clickhouse.gosrc/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go
| func (t *ClickHouseHTTP) fetchSince(ctx context.Context, watermark uint64) ([]kubescape.Row, uint64, error) { | ||
| q := url.Values{} | ||
| q.Set("query", fmt.Sprintf( | ||
| "SELECT RuleID, RuntimeK8sDetails, RuntimeProcessDetails, event_time, hostname "+ | ||
| "FROM %s.%s "+ | ||
| "WHERE hostname = %s AND event_time >= %d "+ | ||
| "ORDER BY event_time FORMAT JSONEachRow", | ||
| t.cfg.Database, t.cfg.Table, quoteCH(t.cfg.Hostname), watermark)) |
There was a problem hiding this comment.
Validate or quote Database and Table identifiers to prevent SQL injection.
While quoteCH properly escapes the Hostname string literal, cfg.Database and cfg.Table are interpolated directly into the query without validation. If these values ever come from untrusted sources (e.g., environment variables, config files), an attacker could inject arbitrary SQL.
Consider validating that identifiers match ^[a-zA-Z_][a-zA-Z0-9_]*$ in New(), or use ClickHouse's backtick quoting for identifiers.
🛡️ Proposed fix: add identifier validation in New()
+import "regexp"
+
+var validIdentifier = regexp.MustCompile(`^[a-zA-Z_][a-zA-Z0-9_]*$`)
+
func New(cfg Config) (*ClickHouseHTTP, error) {
if cfg.Endpoint == "" {
return nil, fmt.Errorf("trigger: empty Endpoint")
}
if cfg.Hostname == "" {
return nil, fmt.Errorf("trigger: empty Hostname (operator must run node-local)")
}
if _, err := url.Parse(cfg.Endpoint); err != nil {
return nil, fmt.Errorf("trigger: invalid Endpoint %q: %w", cfg.Endpoint, err)
}
if cfg.Database == "" {
cfg.Database = "forensic_db"
}
+ if !validIdentifier.MatchString(cfg.Database) {
+ return nil, fmt.Errorf("trigger: invalid Database identifier %q", cfg.Database)
+ }
if cfg.Table == "" {
cfg.Table = "kubescape_logs"
}
+ if !validIdentifier.MatchString(cfg.Table) {
+ return nil, fmt.Errorf("trigger: invalid Table identifier %q", cfg.Table)
+ }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/vizier/services/adaptive_export/internal/trigger/clickhouse.go` around
lines 168 - 175, The SQL built in ClickHouseHTTP.fetchSince interpolates
t.cfg.Database and t.cfg.Table directly, allowing identifier injection; update
the ClickHouseHTTP constructor (New) to validate or safely quote these
identifiers: validate cfg.Database and cfg.Table against
/^[A-Za-z_][A-Za-z0-9_]*$/ and return an error on mismatch, or normalize and
wrap them in ClickHouse identifier quotes (backticks) so fetchSince uses only
validated/quoted values; keep quoteCH for hostname literal escaping and ensure
any change is applied to the stored t.cfg fields used by fetchSince.
- Reject Database / Table config values that don't match the plain ClickHouse identifier shape `[A-Za-z_][A-Za-z0-9_]*`. SQL identifiers cannot be parameterised, so the only safe path is validation at Config construction time. Closes the SELECT injection surface flagged by review. - Add a unit test that drives the watermark-boundary dedup path: stub server returns the same canonical row twice across consecutive polls AND a distinct row at the same event_time; assert the duplicate is emitted once and the distinct row is emitted once. Addresses CodeRabbit review on PR #37 src/vizier/services/adaptive_export/internal/trigger/clickhouse.go:168-175 src/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go:127-146.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go`:
- Around line 33-43: The test is racy because lastQuery is written in the server
goroutine and read later after a fixed sleep; change the httptest.NewServer
handler in clickhouse_test.go to send each received query string into a channel
(instead of writing to lastQuery directly) and have the test receive from that
channel to wait for the second poll before asserting; update references to
calls/lastQuery by reading the second value from the channel (use the existing
atomic calls to keep counting if needed) so the assertion waits
deterministically for the server's second request rather than sleeping.
In `@src/vizier/services/adaptive_export/internal/trigger/clickhouse.go`:
- Around line 214-218: The code currently calls io.ReadAll(resp.Body) and passes
the whole byte slice to parseJSONEachRow, which risks OOM for large ClickHouse
result sets; change to stream and parse directly from the response body
(resp.Body) instead of buffering: add or refactor parseJSONEachRow to accept an
io.Reader (e.g., parseJSONEachRowFromReader or change signature to
parseJSONEachRow(r io.Reader) returning the same values), call it with
resp.Body, and ensure resp.Body is closed and errors are propagated; apply the
same change to the other occurrence referenced (the block around lines 221-237)
so both paths parse JSONEachRow from the reader rather than using io.ReadAll.
- Around line 244-250: The current loop aborts the whole batch on json.Unmarshal
or parseUint64Loose errors, returning an error that includes the raw line and
leaving the watermark unchanged so the bad row is retried forever; change the
handling in the block that uses json.Unmarshal(line, &rr) and
parseUint64Loose(rr.EventTime) to skip malformed rows instead of returning: log
the parse error (do not include the raw line payload in the log returned to
callers), continue to the next row, and ensure normal processing continues so
the watermark can advance for subsequent good rows; do not change the method
signature—just replace the fmt.Errorf returns in that parse path with a
non-fatal log/skip and continue.
- Around line 73-75: The constructor New() currently only calls
url.Parse(cfg.Endpoint) which accepts relative URLs; change the validation to
parse cfg.Endpoint (using url.Parse or url.ParseRequestURI) and then ensure the
parsed URL has a non-empty Scheme of "http" or "https" and a non-empty Host; if
those checks fail return an error like the existing fmt.Errorf("trigger: invalid
Endpoint %q: %w", cfg.Endpoint, err) or a similar descriptive error. Update the
validation near the url.Parse call and keep the error handling path consistent
with the function so callers get a clear failure immediately instead of failing
later in http.NewRequestWithContext.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: 6559ef54-31d4-4cdc-a627-bd9d7c4ea30e
📒 Files selected for processing (2)
src/vizier/services/adaptive_export/internal/trigger/clickhouse.gosrc/vizier/services/adaptive_export/internal/trigger/clickhouse_test.go
…e, skip-on-malformed, race-free test - Endpoint validation: require http or https scheme + non-empty Host in New() so a config with a bare path or relative URL fails at construction, not later inside http.NewRequestWithContext. - parseJSONEachRow now takes io.Reader and streams from resp.Body with bufio.Scanner. Memory is bounded at one row regardless of result-set size; the previous io.ReadAll path could OOM on large ClickHouse responses. - Malformed JSON rows and bad event_time values are now logged at Debug and skipped, not fatal. The previous return-on-error caused the watermark to never advance, so a single bad row would block every subsequent poll forever. - Test TestTrigger_Polls_HostnameAndWatermark replaces the shared-variable-plus-fixed-sleep pattern with a channel that the server pushes each query into; the assertion waits deterministically for the second request instead of racing. Addresses CodeRabbit review on PR #37 (4 actionable items at trigger/clickhouse.go lines 73-75, 214-218, 244-250 and trigger/clickhouse_test.go lines 33-43).
Dom, dont review this, I need to have it as proper PR to trigger the AI-review