Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions agent/internal/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type Agent struct {
statusReportRequested chan string
refreshMutex sync.Mutex
pendingExpectedStateRefresh bool
workMutex sync.Mutex
activeWorkItem *agenthttp.WorkQueueItem
pendingWorkResults []agenthttp.CompletedWorkItem
Client *agenthttp.Client
Reconciler *reconcile.Reconciler
Config *Config
Expand Down
35 changes: 20 additions & 15 deletions agent/internal/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func (a *Agent) Run(ctx context.Context) {
}

go a.StatusReportLoop(ctx)
go a.WorkQueueLoop(ctx)

a.Tick()

Expand Down Expand Up @@ -74,17 +73,25 @@ func (a *Agent) Run(ctx context.Context) {
func (a *Agent) StatusReportLoop(ctx context.Context) {
a.reportStatus("startup")

ticker := time.NewTicker(WorkQueueStatusInterval)
defer ticker.Stop()
timer := time.NewTimer(nextStatusReportDelay())
defer timer.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
case <-timer.C:
a.reportStatus("periodic")
timer.Reset(nextStatusReportDelay())
case reason := <-a.statusReportRequested:
a.reportStatus(reason)
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
timer.Reset(nextStatusReportDelay())
}
}
}
Expand All @@ -100,21 +107,19 @@ func (a *Agent) RequestStatusReport(reason string) {

func (a *Agent) reportStatus(reason string) {
report := a.BuildStatusReport(true)
if err := a.Client.ReportStatus(report); err != nil {
completed, active := a.SnapshotWorkStatus()
response, err := a.Client.ReportStatus(report, completed, active)
if err != nil {
log.Printf("[status] failed to report (%s): %v", reason, err)
return
}
a.AcknowledgeWorkResults(response.AcceptedWorkItemResults, response.RejectedWorkItemResults)
a.LogRejectedActiveWorkItems(response.RejectedActiveWorkItems)
a.AcceptLeasedWorkItems(response.WorkItems)
log.Printf("[status] reported (%s)", reason)
}

func (a *Agent) WorkQueueLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
default:
}

a.ProcessWorkQueue()
}
func nextStatusReportDelay() time.Duration {
jitter := time.Duration(time.Now().UnixNano() % int64(5*time.Second))
return StatusReportInterval + jitter
}
161 changes: 117 additions & 44 deletions agent/internal/agent/workqueue.go
Original file line number Diff line number Diff line change
@@ -1,61 +1,134 @@
package agent

import (
"fmt"
"log"
"time"

agenthttp "techulus/cloud-agent/internal/http"
)

const (
LongPollTimeout = 30 * time.Second
WorkQueueStatusInterval = 60 * time.Second
StatusReportInterval = 15 * time.Second
)

func (a *Agent) ProcessWorkQueue() {
items, err := a.Client.GetWorkQueue(LongPollTimeout)
if err != nil {
log.Printf("[work-queue] failed to get work queue: %v", err)
time.Sleep(5 * time.Second)
func (a *Agent) SnapshotWorkStatus() ([]agenthttp.CompletedWorkItem, []agenthttp.ActiveWorkItem) {
a.workMutex.Lock()
defer a.workMutex.Unlock()

completed := append([]agenthttp.CompletedWorkItem(nil), a.pendingWorkResults...)
active := []agenthttp.ActiveWorkItem{}
if a.activeWorkItem != nil {
active = append(active, agenthttp.ActiveWorkItem{
ID: a.activeWorkItem.ID,
Attempt: a.activeWorkItem.Attempt,
})
}

return completed, active
}

func (a *Agent) AcknowledgeWorkResults(accepted []string, rejected []agenthttp.RejectedWorkItemResult) {
if len(accepted) == 0 && len(rejected) == 0 {
return
}

for _, item := range items {
log.Printf("[work-queue] processing item %s (type=%s)", Truncate(item.ID, 8), item.Type)

var processErr error
switch item.Type {
case "restart":
processErr = a.ProcessRestart(item)
case "stop":
processErr = a.ProcessStop(item)
case "deploy":
a.RequestReconcile("deploy work item " + Truncate(item.ID, 8))
case "force_cleanup":
processErr = a.ProcessForceCleanup(item)
case "cleanup_volumes":
processErr = a.ProcessCleanupVolumes(item)
case "build":
processErr = a.ProcessBuild(item)
case "backup_volume":
processErr = a.ProcessBackupVolume(item)
case "restore_volume":
processErr = a.ProcessRestoreVolume(item)
case "create_manifest":
processErr = a.ProcessCreateManifest(item)
default:
log.Printf("[work-queue] unknown work item type: %s", item.Type)
continue
}
acknowledged := map[string]struct{}{}
for _, id := range accepted {
acknowledged[id] = struct{}{}
}
for _, item := range rejected {
acknowledged[item.ID] = struct{}{}
log.Printf("[work-queue] completion rejected for %s: %s", Truncate(item.ID, 8), item.Reason)
}

a.workMutex.Lock()
defer a.workMutex.Unlock()

if processErr != nil {
log.Printf("[work-queue] item %s failed: %v", Truncate(item.ID, 8), processErr)
if err := a.Client.CompleteWorkItem(item.ID, "failed", processErr.Error()); err != nil {
log.Printf("[work-queue] failed to mark item as failed: %v", err)
}
} else {
log.Printf("[work-queue] item %s completed", Truncate(item.ID, 8))
if err := a.Client.CompleteWorkItem(item.ID, "completed", ""); err != nil {
log.Printf("[work-queue] failed to mark item as completed: %v", err)
}
pending := a.pendingWorkResults[:0]
for _, result := range a.pendingWorkResults {
if _, ok := acknowledged[result.ID]; !ok {
pending = append(pending, result)
}
}
a.pendingWorkResults = pending
}

func (a *Agent) LogRejectedActiveWorkItems(rejected []agenthttp.RejectedWorkItemResult) {
for _, item := range rejected {
log.Printf("[work-queue] active item renewal rejected for %s: %s", Truncate(item.ID, 8), item.Reason)
}
}

func (a *Agent) AcceptLeasedWorkItems(items []agenthttp.WorkQueueItem) {
if len(items) == 0 {
return
}

item := items[0]

a.workMutex.Lock()
if a.activeWorkItem != nil {
log.Printf("[work-queue] ignoring leased item %s while %s is active", Truncate(item.ID, 8), Truncate(a.activeWorkItem.ID, 8))
a.workMutex.Unlock()
return
}
a.activeWorkItem = &item
a.workMutex.Unlock()

go a.processLeasedWorkItem(item)
}

func (a *Agent) processLeasedWorkItem(item agenthttp.WorkQueueItem) {
log.Printf("[work-queue] processing item %s (type=%s attempt=%d)", Truncate(item.ID, 8), item.Type, item.Attempt)

status := "completed"
errorMsg := ""
if err := a.ProcessWorkItem(item); err != nil {
status = "failed"
errorMsg = err.Error()
log.Printf("[work-queue] item %s failed: %v", Truncate(item.ID, 8), err)
} else {
log.Printf("[work-queue] item %s completed", Truncate(item.ID, 8))
}

a.workMutex.Lock()
if a.activeWorkItem != nil && a.activeWorkItem.ID == item.ID && a.activeWorkItem.Attempt == item.Attempt {
a.activeWorkItem = nil
}
a.pendingWorkResults = append(a.pendingWorkResults, agenthttp.CompletedWorkItem{
ID: item.ID,
Attempt: item.Attempt,
Status: status,
Error: errorMsg,
})
a.workMutex.Unlock()

a.RequestStatusReport("work item " + status)
}

func (a *Agent) ProcessWorkItem(item agenthttp.WorkQueueItem) error {
switch item.Type {
case "restart":
return a.ProcessRestart(item)
case "stop":
return a.ProcessStop(item)
case "deploy":
a.RequestReconcile("deploy work item " + Truncate(item.ID, 8))
return nil
case "force_cleanup":
return a.ProcessForceCleanup(item)
case "cleanup_volumes":
return a.ProcessCleanupVolumes(item)
case "build":
return a.ProcessBuild(item)
case "backup_volume":
return a.ProcessBackupVolume(item)
case "restore_volume":
return a.ProcessRestoreVolume(item)
case "create_manifest":
return a.ProcessCreateManifest(item)
default:
return fmt.Errorf("unknown work item type: %s", item.Type)
}
}
1 change: 1 addition & 0 deletions agent/internal/container/runtime_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func Deploy(config *DeployConfig) (*DeployResult, error) {
"--cap-add", "CHOWN",
"--cap-add", "DAC_OVERRIDE",
"--cap-add", "FOWNER",
"--cap-add", "SETPCAP",
"--cap-add", "SETUID",
"--cap-add", "SETGID",
"--cap-add", "NET_BIND_SERVICE",
Expand Down
1 change: 1 addition & 0 deletions agent/internal/container/runtime_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func Deploy(config *DeployConfig) (*DeployResult, error) {
"--cap-add", "CHOWN",
"--cap-add", "DAC_OVERRIDE",
"--cap-add", "FOWNER",
"--cap-add", "SETPCAP",
"--cap-add", "SETUID",
"--cap-add", "SETGID",
"--cap-add", "NET_BIND_SERVICE",
Expand Down
Loading
Loading