diff --git a/.github/workflows/v11-deployment-pipeline.yml b/.github/workflows/v11-deployment-pipeline.yml index cf15b2677..b53d5527f 100644 --- a/.github/workflows/v11-deployment-pipeline.yml +++ b/.github/workflows/v11-deployment-pipeline.yml @@ -453,6 +453,7 @@ jobs: cd ${{ github.workspace }}/plugins/modules-config; go build -o com.utmstack.modules-config.plugin -v . cd ${{ github.workspace }}/plugins/crowdstrike; go build -o com.utmstack.crowdstrike.plugin -v . cd ${{ github.workspace }}/plugins/feeds; go build -o com.utmstack.feeds.plugin -v . + cd ${{ github.workspace }}/plugins/compliance-orchestrator; go build -o com.utmstack.compliance-orchestrator.plugin -v . - name: Prepare Dependencies for Event Processor Image run: | diff --git a/agent/dependency/dependency.go b/agent/dependency/dependency.go index 2140def1a..6070fa23c 100644 --- a/agent/dependency/dependency.go +++ b/agent/dependency/dependency.go @@ -21,6 +21,29 @@ const ( MacosCollectorVersion = "11.2.3" ) +// getUpdaterVersion reads the desired updater version from version.json +// (the updater_version field). Falls back to UpdaterVersion if the +// file is missing, unreadable, or the field is empty. +// +// The source of truth for this value is version.json shipped by the server, +// which the updater promotes on each successful agent update. +func getUpdaterVersion() string { + var v struct { + UpdaterVersion string `json:"updater_version"` + } + versionPath := filepath.Join(fs.GetExecutablePath(), "version.json") + if !fs.Exists(versionPath) { + return UpdaterVersion + } + if err := fs.ReadJSON(versionPath, &v); err != nil { + return UpdaterVersion + } + if v.UpdaterVersion == "" { + return UpdaterVersion + } + return v.UpdaterVersion +} + // UpdaterFile returns the updater binary name with OS and architecture suffix. // Format: utmstack_updater_service__[.exe] // Examples: @@ -43,6 +66,7 @@ type Dependency struct { DownloadURL func(server string) string // URL template to download from DownloadName string // Filename to save as (if different from BinaryPath basename) Critical bool // If true, failure blocks agent startup + PreDownload func() (cleanup func(), err error) // Called before download, returns cleanup for rollback PostDownload func() error // Run after download (e.g., unzip). Can be nil. Configure func() error // Run on first install (can be nil) Update func() error // Run on version change (can be nil, uses Configure) @@ -188,6 +212,22 @@ func Reconcile(server string, skipCertValidation bool) error { } else if inst.Version != dep.Version { // VERSION CHANGED: Download (if needed) and update utils.Logger.Info("Updating dependency: %s (%s -> %s)", dep.Name, inst.Version, dep.Version) + + // Call PreDownload hook if defined + var cleanup func() + if dep.PreDownload != nil { + var err error + cleanup, err = dep.PreDownload() + if err != nil { + errMsg := fmt.Errorf("failed to run PreDownload for %s: %v", dep.Name, err) + utils.Logger.ErrorF("%v", errMsg) + if dep.Critical { + criticalErrors = append(criticalErrors, errMsg) + } + continue + } + } + if dep.DownloadURL != nil { if err := downloadDependency(dep, server, skipCertValidation); err != nil { errMsg := fmt.Errorf("failed to download dependency update %s: %v", dep.Name, err) @@ -195,6 +235,11 @@ func Reconcile(server string, skipCertValidation bool) error { if dep.Critical { criticalErrors = append(criticalErrors, errMsg) } + // Rollback: call cleanup if PreDownload succeeded + if cleanup != nil { + utils.Logger.Info("Rolling back PreDownload changes for %s", dep.Name) + cleanup() + } continue } } @@ -210,6 +255,11 @@ func Reconcile(server string, skipCertValidation bool) error { if dep.Critical { criticalErrors = append(criticalErrors, errMsg) } + // Rollback: call cleanup if PreDownload succeeded + if cleanup != nil { + utils.Logger.Info("Rolling back PreDownload changes for %s", dep.Name) + cleanup() + } continue } } diff --git a/agent/dependency/deps_darwin.go b/agent/dependency/deps_darwin.go index 02244bf9b..33cd3d532 100644 --- a/agent/dependency/deps_darwin.go +++ b/agent/dependency/deps_darwin.go @@ -10,6 +10,7 @@ import ( "github.com/utmstack/UTMStack/agent/config" "github.com/utmstack/UTMStack/shared/exec" "github.com/utmstack/UTMStack/shared/fs" + "github.com/utmstack/UTMStack/shared/svc" ) const macosCollectorBinary = "utmstack-collector-mac" @@ -20,15 +21,16 @@ func GetDependencies() []Dependency { return []Dependency{ { - Name: "updater", - Version: UpdaterVersion, - BinaryPath: filepath.Join(basePath, UpdaterFile("")), + Name: "updater", + Version: getUpdaterVersion(), + BinaryPath: filepath.Join(basePath, UpdaterFile("")), DownloadURL: func(server string) string { return fmt.Sprintf(config.DependUrl, server, config.DependenciesPort, UpdaterFile("")) }, - Critical: false, - Configure: configureUpdater, - Uninstall: uninstallUpdater, + Critical: false, + PreDownload: preDownloadUpdater, + Configure: configureUpdater, + Uninstall: uninstallUpdater, }, { Name: "macos-collector", @@ -66,3 +68,19 @@ func uninstallUpdater() error { } return exec.Run(updaterPath, fs.GetExecutablePath(), "uninstall") } + +func preDownloadUpdater() (func(), error) { + // Stop the updater service before download + if err := svc.Stop(config.SERVICE_UPDATER_NAME); err != nil { + // Service might not be running or installed yet - that's OK + // Return cleanup function anyway (safe to start) + return func() { + _ = svc.Start(config.SERVICE_UPDATER_NAME) + }, nil + } + + // Return cleanup function that restarts the service + return func() { + _ = svc.Start(config.SERVICE_UPDATER_NAME) + }, nil +} diff --git a/agent/dependency/deps_linux_amd64.go b/agent/dependency/deps_linux_amd64.go index 424403353..97875c905 100644 --- a/agent/dependency/deps_linux_amd64.go +++ b/agent/dependency/deps_linux_amd64.go @@ -11,6 +11,7 @@ import ( "github.com/utmstack/UTMStack/agent/config" "github.com/utmstack/UTMStack/shared/exec" "github.com/utmstack/UTMStack/shared/fs" + "github.com/utmstack/UTMStack/shared/svc" ) // GetDependencies returns the list of dependencies for Linux amd64. @@ -19,15 +20,16 @@ func GetDependencies() []Dependency { return []Dependency{ { - Name: "updater", - Version: UpdaterVersion, - BinaryPath: filepath.Join(basePath, UpdaterFile("")), + Name: "updater", + Version: getUpdaterVersion(), + BinaryPath: filepath.Join(basePath, UpdaterFile("")), DownloadURL: func(server string) string { return fmt.Sprintf(config.DependUrl, server, config.DependenciesPort, UpdaterFile("")) }, - Critical: false, - Configure: configureUpdater, - Uninstall: uninstallUpdater, + Critical: false, + PreDownload: preDownloadUpdater, + Configure: configureUpdater, + Uninstall: uninstallUpdater, }, // New beats dependency - only for uninstalling existing filebeat/winlogbeat @@ -75,3 +77,19 @@ func uninstallUpdater() error { func uninstallBeats() error { return collector.UninstallAll() } + +func preDownloadUpdater() (func(), error) { + // Stop the updater service before download + if err := svc.Stop(config.SERVICE_UPDATER_NAME); err != nil { + // Service might not be running or installed yet - that's OK + // Return cleanup function anyway (safe to start) + return func() { + _ = svc.Start(config.SERVICE_UPDATER_NAME) + }, nil + } + + // Return cleanup function that restarts the service + return func() { + _ = svc.Start(config.SERVICE_UPDATER_NAME) + }, nil +} diff --git a/agent/dependency/deps_linux_arm64.go b/agent/dependency/deps_linux_arm64.go index 18119c2a9..384d10f1e 100644 --- a/agent/dependency/deps_linux_arm64.go +++ b/agent/dependency/deps_linux_arm64.go @@ -10,6 +10,7 @@ import ( "github.com/utmstack/UTMStack/agent/config" "github.com/utmstack/UTMStack/shared/exec" "github.com/utmstack/UTMStack/shared/fs" + "github.com/utmstack/UTMStack/shared/svc" ) // GetDependencies returns the list of dependencies for Linux arm64. @@ -19,15 +20,16 @@ func GetDependencies() []Dependency { return []Dependency{ { - Name: "updater", - Version: UpdaterVersion, - BinaryPath: filepath.Join(basePath, UpdaterFile("")), + Name: "updater", + Version: getUpdaterVersion(), + BinaryPath: filepath.Join(basePath, UpdaterFile("")), DownloadURL: func(server string) string { return fmt.Sprintf(config.DependUrl, server, config.DependenciesPort, UpdaterFile("")) }, - Critical: false, - Configure: configureUpdater, - Uninstall: uninstallUpdater, + Critical: false, + PreDownload: preDownloadUpdater, + Configure: configureUpdater, + Uninstall: uninstallUpdater, }, // Auditd dependency - auto-configures Linux audit daemon @@ -61,3 +63,19 @@ func uninstallUpdater() error { } return exec.Run(updaterPath, fs.GetExecutablePath(), "uninstall") } + +func preDownloadUpdater() (func(), error) { + // Stop the updater service before download + if err := svc.Stop(config.SERVICE_UPDATER_NAME); err != nil { + // Service might not be running or installed yet - that's OK + // Return cleanup function anyway (safe to start) + return func() { + _ = svc.Start(config.SERVICE_UPDATER_NAME) + }, nil + } + + // Return cleanup function that restarts the service + return func() { + _ = svc.Start(config.SERVICE_UPDATER_NAME) + }, nil +} diff --git a/agent/dependency/deps_windows_amd64.go b/agent/dependency/deps_windows_amd64.go index 144fb4bbe..0cc658485 100644 --- a/agent/dependency/deps_windows_amd64.go +++ b/agent/dependency/deps_windows_amd64.go @@ -11,6 +11,7 @@ import ( "github.com/utmstack/UTMStack/agent/config" "github.com/utmstack/UTMStack/shared/exec" "github.com/utmstack/UTMStack/shared/fs" + "github.com/utmstack/UTMStack/shared/svc" ) // GetDependencies returns the list of dependencies for Windows amd64. @@ -19,15 +20,16 @@ func GetDependencies() []Dependency { return []Dependency{ { - Name: "updater", - Version: UpdaterVersion, - BinaryPath: filepath.Join(basePath, UpdaterFile("")), + Name: "updater", + Version: getUpdaterVersion(), + BinaryPath: filepath.Join(basePath, UpdaterFile("")), DownloadURL: func(server string) string { return fmt.Sprintf(config.DependUrl, server, config.DependenciesPort, UpdaterFile("")) }, - Critical: false, // Agent can run without updater - Configure: configureUpdater, - Uninstall: uninstallUpdater, + Critical: false, // Agent can run without updater + PreDownload: preDownloadUpdater, + Configure: configureUpdater, + Uninstall: uninstallUpdater, }, // New beats dependency - only for uninstalling existing filebeat/winlogbeat @@ -57,3 +59,19 @@ func uninstallUpdater() error { func uninstallBeats() error { return collector.UninstallAll() } + +func preDownloadUpdater() (func(), error) { + // Stop the updater service before download + if err := svc.Stop(config.SERVICE_UPDATER_NAME); err != nil { + // Service might not be running or installed yet - that's OK + // Return cleanup function anyway (safe to start) + return func() { + _ = svc.Start(config.SERVICE_UPDATER_NAME) + }, nil + } + + // Return cleanup function that restarts the service + return func() { + _ = svc.Start(config.SERVICE_UPDATER_NAME) + }, nil +} diff --git a/agent/dependency/deps_windows_arm64.go b/agent/dependency/deps_windows_arm64.go index a45d4e8f4..e6c0bfb1b 100644 --- a/agent/dependency/deps_windows_arm64.go +++ b/agent/dependency/deps_windows_arm64.go @@ -10,6 +10,7 @@ import ( "github.com/utmstack/UTMStack/agent/config" "github.com/utmstack/UTMStack/shared/exec" "github.com/utmstack/UTMStack/shared/fs" + "github.com/utmstack/UTMStack/shared/svc" ) // GetDependencies returns the list of dependencies for Windows arm64. @@ -19,15 +20,16 @@ func GetDependencies() []Dependency { return []Dependency{ { - Name: "updater", - Version: UpdaterVersion, - BinaryPath: filepath.Join(basePath, UpdaterFile("")), + Name: "updater", + Version: getUpdaterVersion(), + BinaryPath: filepath.Join(basePath, UpdaterFile("")), DownloadURL: func(server string) string { return fmt.Sprintf(config.DependUrl, server, config.DependenciesPort, UpdaterFile("")) }, - Critical: false, - Configure: configureUpdater, - Uninstall: uninstallUpdater, + Critical: false, + PreDownload: preDownloadUpdater, + Configure: configureUpdater, + Uninstall: uninstallUpdater, }, } } @@ -44,3 +46,19 @@ func uninstallUpdater() error { } return exec.Run(updaterPath, fs.GetExecutablePath(), "uninstall") } + +func preDownloadUpdater() (func(), error) { + // Stop the updater service before download + if err := svc.Stop(config.SERVICE_UPDATER_NAME); err != nil { + // Service might not be running or installed yet - that's OK + // Return cleanup function anyway (safe to start) + return func() { + _ = svc.Start(config.SERVICE_UPDATER_NAME) + }, nil + } + + // Return cleanup function that restarts the service + return func() { + _ = svc.Start(config.SERVICE_UPDATER_NAME) + }, nil +} diff --git a/agent/updater/updates/update.go b/agent/updater/updates/update.go index fecaad400..d95437cec 100644 --- a/agent/updater/updates/update.go +++ b/agent/updater/updates/update.go @@ -106,8 +106,6 @@ func runUpdateProcess(basePath string) error { return fmt.Errorf("error stopping agent: %v", err) } - time.Sleep(10 * time.Second) - // Migration: check if old naming convention exists and migrate to new naming oldBinPath := filepath.Join(basePath, oldBin) if !fs.Exists(oldBinPath) { @@ -138,6 +136,32 @@ func runUpdateProcess(basePath string) error { return fmt.Errorf("error renaming new binary: %v", err) } + // Promote version.json BEFORE starting the agent so that the new agent's + // dependency.Reconcile() sees the new updater_version on first boot. + // If anything fails after this, rollback restores the previous version.json. + versionNewPath := filepath.Join(basePath, "version_new.json") + versionPath := filepath.Join(basePath, "version.json") + versionBackupPath := filepath.Join(basePath, "version.json.old") + + os.Remove(versionBackupPath) + versionBackedUp := false + if fs.Exists(versionPath) { + if err := os.Rename(versionPath, versionBackupPath); err != nil { + os.Rename(filepath.Join(basePath, oldBin), filepath.Join(basePath, newBin)) + os.Rename(backupPath, filepath.Join(basePath, oldBin)) + return fmt.Errorf("error backing up version.json: %v", err) + } + versionBackedUp = true + } + if err := os.Rename(versionNewPath, versionPath); err != nil { + if versionBackedUp { + os.Rename(versionBackupPath, versionPath) + } + os.Rename(filepath.Join(basePath, oldBin), filepath.Join(basePath, newBin)) + os.Rename(backupPath, filepath.Join(basePath, oldBin)) + return fmt.Errorf("error promoting version.json: %v", err) + } + if err := svc.Start(config.SERV_AGENT_NAME); err != nil { rollbackAgent(oldBin, backupBin, basePath) return fmt.Errorf("error starting agent: %v", err) @@ -153,16 +177,7 @@ func runUpdateProcess(basePath string) error { } logger.Info("Health check passed for agent") - - versionNewPath := filepath.Join(basePath, "version_new.json") - versionPath := filepath.Join(basePath, "version.json") - if fs.Exists(versionNewPath) { - if err := os.Rename(versionNewPath, versionPath); err != nil { - logger.Error("error updating version file: %v", err) - } else { - logger.Info("Version file updated successfully") - } - } + os.Remove(versionBackupPath) return nil } @@ -171,11 +186,18 @@ func rollbackAgent(currentBin, backupBin, basePath string) { logger.Info("Rolling back agent to previous version...") svc.Stop(config.SERV_AGENT_NAME) - time.Sleep(5 * time.Second) os.Remove(filepath.Join(basePath, currentBin)) os.Rename(filepath.Join(basePath, backupBin), filepath.Join(basePath, currentBin)) + // Restore previous version.json if it was backed up during promotion. + versionPath := filepath.Join(basePath, "version.json") + versionBackupPath := filepath.Join(basePath, "version.json.old") + if fs.Exists(versionBackupPath) { + os.Remove(versionPath) + os.Rename(versionBackupPath, versionPath) + } + svc.Start(config.SERV_AGENT_NAME) os.Remove(filepath.Join(basePath, "version_new.json")) diff --git a/agent/version.json b/agent/version.json index 8981e0f87..904906ff4 100644 --- a/agent/version.json +++ b/agent/version.json @@ -1,4 +1,4 @@ { "version": "11.1.5", - "updater_version": "1.0.4" + "updater_version": "1.0.5" } diff --git a/event_processor.Dockerfile b/event_processor.Dockerfile index 950291fb2..72d7cce60 100644 --- a/event_processor.Dockerfile +++ b/event_processor.Dockerfile @@ -18,4 +18,5 @@ COPY ./plugins/stats/com.utmstack.stats.plugin /workdir/plugins/utmstack/ COPY ./plugins/soc-ai/com.utmstack.soc-ai.plugin /workdir/plugins/utmstack/ COPY ./plugins/modules-config/com.utmstack.modules-config.plugin /workdir/plugins/utmstack/ COPY ./plugins/crowdstrike/com.utmstack.crowdstrike.plugin /workdir/plugins/utmstack/ -COPY ./plugins/feeds/com.utmstack.feeds.plugin /workdir/plugins/utmstack/ \ No newline at end of file +COPY ./plugins/feeds/com.utmstack.feeds.plugin /workdir/plugins/utmstack/ +COPY ./plugins/compliance-orchestrator/com.utmstack.compliance-orchestrator.plugin /workdir/plugins/utmstack/ \ No newline at end of file diff --git a/filters/aws/aws.yml b/filters/aws/aws.yml index e1fbdaec5..b25deca79 100644 --- a/filters/aws/aws.yml +++ b/filters/aws/aws.yml @@ -130,7 +130,7 @@ pipeline: - rename: from: - log.requestParameters.bucketName - to: log.userIdentityAccesrequestParametersBucketNamesKeyId + to: log.userIdentityAccessrequestParametersBucketNamesKeyId - rename: from: diff --git a/plugins/compliance-orchestrator/bootstrap.go b/plugins/compliance-orchestrator/bootstrap.go index 273d8521d..8ae301670 100644 --- a/plugins/compliance-orchestrator/bootstrap.go +++ b/plugins/compliance-orchestrator/bootstrap.go @@ -16,13 +16,14 @@ func waitForBackend(bc *client.BackendClient) error { err := bc.HealthCheck(context.Background()) if err == nil { catcher.Info("Connected to Backend", map[string]any{ - "process": "compliance-orchestrator", + "process": "plugin_com.utmstack.compliance-orchestrator", }) return nil } catcher.Error("Cannot connect to Backend, retrying", err, map[string]any{ - "retry": retry + 1, + "retry": retry + 1, + "process": "plugin_com.utmstack.compliance-orchestrator", }) if retry < maxRetries-1 { @@ -47,7 +48,8 @@ func waitForOpenSearch() error { } catcher.Error("Cannot connect to OpenSearch, retrying", err, map[string]any{ - "retry": retry + 1, + "retry": retry + 1, + "process": "plugin_com.utmstack.compliance-orchestrator", }) if retry < maxRetries-1 { diff --git a/plugins/compliance-orchestrator/main.go b/plugins/compliance-orchestrator/main.go index 88a85cd1c..9a3489cd8 100644 --- a/plugins/compliance-orchestrator/main.go +++ b/plugins/compliance-orchestrator/main.go @@ -2,25 +2,37 @@ package main import ( "context" + "os" "time" "github.com/threatwinds/go-sdk/catcher" + "github.com/threatwinds/go-sdk/plugins" "github.com/utmstack/UTMStack/plugins/compliance-orchestrator/scheduler" "github.com/utmstack/UTMStack/plugins/compliance-orchestrator/workers" ) func main() { + mode := plugins.GetCfg("plugin_com.utmstack.compliance-orchestrator").Env.Mode + if mode != "manager" { + return + } + catcher.Info("Starting Compliance Orchestrator", map[string]any{ - "process": "compliance-orchestrator", + "process": "plugin_com.utmstack.compliance-orchestrator", }) backend, err := bootstrap() if err != nil { - catcher.Info("Compliance Orchestrator bootstrap failed", map[string]any{}) - return + _ = catcher.Error("Compliance Orchestrator bootstrap failed", err, map[string]any{ + "process": "plugin_com.utmstack.compliance-orchestrator", + }) + time.Sleep(5 * time.Second) + os.Exit(1) } - catcher.Info("Compliance Orchestrator bootstrapped successfully", nil) + catcher.Info("Compliance Orchestrator bootstrapped successfully", map[string]any{ + "process": "plugin_com.utmstack.compliance-orchestrator", + }) ctx := context.Background() diff --git a/shared/svc/svc_windows.go b/shared/svc/svc_windows.go index 52d24b70a..cfdc64c75 100644 --- a/shared/svc/svc_windows.go +++ b/shared/svc/svc_windows.go @@ -7,24 +7,59 @@ import ( "fmt" "os/exec" "strings" + "time" ) -// Start starts a Windows service by name. +const ( + pollInterval = 500 * time.Millisecond + stopTimeout = 60 * time.Second + startTimeout = 60 * time.Second +) + +// Start starts a Windows service by name and waits until it's running. func Start(serviceName string) error { + // Already running? Nothing to do + if running, _ := IsActive(serviceName); running { + return nil + } + cmd := exec.Command("sc", "start", serviceName) - if err := cmd.Run(); err != nil { - return fmt.Errorf("failed to start service %s: %w", serviceName, err) + cmd.Run() // Ignore error, we'll check actual state + + // Poll until running or timeout + deadline := time.Now().Add(startTimeout) + for time.Now().Before(deadline) { + if running, _ := IsActive(serviceName); running { + return nil + } + time.Sleep(pollInterval) } - return nil + + return fmt.Errorf("timeout waiting for service %s to start", serviceName) } -// Stop stops a Windows service by name. +// Stop stops a Windows service by name and waits until it's stopped. func Stop(serviceName string) error { + // Already stopped? Nothing to do + status, _ := Status(serviceName) + if status == StatusStopped { + return nil + } + cmd := exec.Command("sc", "stop", serviceName) - if err := cmd.Run(); err != nil { - return fmt.Errorf("failed to stop service %s: %w", serviceName, err) + cmd.Run() // Ignore error, we'll check actual state + + // Poll until stopped or timeout + deadline := time.Now().Add(stopTimeout) + for time.Now().Before(deadline) { + status, _ := Status(serviceName) + if status == StatusStopped { + return nil + } + time.Sleep(pollInterval) } - return nil + + return fmt.Errorf("timeout waiting for service %s to stop", serviceName) } // Restart restarts a Windows service by stopping and starting it. @@ -37,12 +72,11 @@ func Restart(serviceName string) error { // IsActive checks if a Windows service is running. func IsActive(serviceName string) (bool, error) { - cmd := exec.Command("sc", "query", serviceName) - output, err := cmd.Output() + status, err := Status(serviceName) if err != nil { - return false, fmt.Errorf("failed to query service %s: %w", serviceName, err) + return false, err } - return strings.Contains(string(output), "RUNNING"), nil + return status == StatusRunning, nil } // Status returns the status of a Windows service.