Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 0 additions & 84 deletions .github/k8s/cert-sync-cronjob-template.yaml

This file was deleted.

20 changes: 0 additions & 20 deletions .github/k8s/google-public-ca-issuer-template.yaml

This file was deleted.

68 changes: 68 additions & 0 deletions .github/k8s/sam-hub-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ metadata:
spec:
clusterIP: None
ports:
- port: 9090
targetPort: 9090
protocol: TCP
name: http
- port: 4501
targetPort: 4501
protocol: TCP
Expand Down Expand Up @@ -79,6 +83,7 @@ spec:
- "--external-multiaddr=/ip4/$(HOST_IP)/tcp/4501"
- "--external-multiaddr=/ip4/$(HOST_IP)/udp/4501/quic-v1"
- "--external-multiaddr=/ip4/$(HOST_IP)/tcp/4501/wss"
- "--external-multiaddr=/dnsaddr/bootstrap.${ENV_NAME}.sam-mesh.dev"
# 1. Trust Dex for humans/Google/GitHub. 2. Trust GKE for internal Canaries.
- "--issuer=https://auth.sam-mesh.dev,https://container.googleapis.com/v1/projects/${GCP_PROJECT_ID}/locations/${CLUSTER_REGION}/clusters/${CLUSTER_NAME}"
# 1. Dex CLI client ID. 2. Canary projected volume audience.
Expand Down Expand Up @@ -132,3 +137,66 @@ spec:
interval: 30s
path: /metrics
scheme: http
---
apiVersion: networking.gke.io/v1
kind: HealthCheckPolicy
metadata:
name: sam-hub-healthcheck-${ENV_NAME}
namespace: ${NAMESPACE}
spec:
default:
checkIntervalSec: 15
timeoutSec: 5
healthyThreshold: 1
unhealthyThreshold: 2
config:
type: HTTP
httpHealthCheck:
requestPath: /healthz
targetRef:
group: ""
kind: Service
name: sam-hub-${ENV_NAME}
---
apiVersion: gateway.networking.k8s.io/v1
kind: Gateway
metadata:
name: sam-hub-gateway-${ENV_NAME}
namespace: ${NAMESPACE}
annotations:
networking.gke.io/certmap: sam-hub-cert-map-${ENV_NAME}
spec:
gatewayClassName: gke-l7-global-external-managed
addresses:
- type: NamedAddress
value: sam-hub-ip-${ENV_NAME}
listeners:
- name: https
protocol: HTTPS
port: 443
allowedRoutes:
namespaces:
from: Same
---
apiVersion: gateway.networking.k8s.io/v1
kind: HTTPRoute
metadata:
name: sam-hub-route-${ENV_NAME}
namespace: ${NAMESPACE}
spec:
parentRefs:
- name: sam-hub-gateway-${ENV_NAME}
hostnames:
- "${ENV_NAME}.sam-mesh.dev"
rules:
- matches:
- path:
type: Exact
value: /register
- path:
type: Exact
value: /info
backendRefs:
- name: sam-hub-${ENV_NAME}
port: 9090

14 changes: 0 additions & 14 deletions .github/k8s/wildcard-cert-template.yaml

This file was deleted.

8 changes: 8 additions & 0 deletions cmd/sam-hub/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func handleRegisterHTTP(h *Hub) http.HandlerFunc {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
if r.URL.Path != "/register" {
http.NotFound(w, r)
return
}

body, err := io.ReadAll(r.Body)
if err != nil {
Expand Down Expand Up @@ -111,6 +115,10 @@ func handleInfoHTTP(h *Hub) http.HandlerFunc {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
if r.URL.Path != "/info" {
http.NotFound(w, r)
return
}

// Find the OIDC issuer deterministically by sorting the map keys
issuer := ""
Expand Down
96 changes: 77 additions & 19 deletions cmd/sam-node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ func NewSamNode(ctx context.Context, privKey crypto.PrivKey, hubPubKey ed25519.P
logger.Warnf("[AuthN] Fallback auth failed with %s: %v", addr, err)
} else {
logger.Infof("[AuthN] Fallback connection successful!")
authenticated = true
break
}
}
Expand All @@ -319,6 +320,10 @@ func NewSamNode(ctx context.Context, privKey crypto.PrivKey, hubPubKey ed25519.P
logger.Warnf("[AuthN] Failed to fetch updated addresses via HTTP: %v", err)
}
}

if !authenticated {
return nil, fmt.Errorf("failed to authenticate with any hub: all connection attempts failed")
}
}

// Initialize Gossipsub for Hub Events
Expand Down Expand Up @@ -374,9 +379,47 @@ func NewSamNode(ctx context.Context, privKey crypto.PrivKey, hubPubKey ed25519.P
return nil, fmt.Errorf("failed to start ingress server: %w", err)
}

// Start connection monitor
node.startConnectionMonitor(ctx, 2*time.Minute, 1*time.Minute, 3)

return node, nil
}

func (n *SamNode) startConnectionMonitor(ctx context.Context, bootstrapDuration, checkInterval time.Duration, maxFailures int) {
go func() {
// Wait for initial bootstrap to complete
select {
case <-ctx.Done():
return
case <-time.After(bootstrapDuration):
}

ticker := time.NewTicker(checkInterval)
defer ticker.Stop()

consecutiveFailures := 0
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if len(n.Host.Network().Peers()) == 0 {
consecutiveFailures++
logger.Warnf("Not connected to the mesh (0 peers). Consecutive failures: %d/%d", consecutiveFailures, maxFailures)
if consecutiveFailures >= maxFailures {
logger.Fatalf("Not connected to the mesh (0 peers) for %d consecutive checks. Exiting to avoid network partition.", maxFailures)
}
} else {
if consecutiveFailures > 0 {
logger.Infof("Reconnected to the mesh. Resetting failure count.")
}
consecutiveFailures = 0
}
}
}
}()
}

func (n *SamNode) RegisterStaticServices(ctx context.Context, services []api.ServiceConfig) error {
// Wait for DHT to be ready (size > 0)
// This avoids failure if we try to register immediately after enrollment
Expand Down Expand Up @@ -502,8 +545,11 @@ func (n *SamNode) StartRenewalLoop(ctx context.Context, issuerURL, clientID, cli
renewAfter = duration - RenewalBuffer
} else if duration > 0 {
renewAfter = duration / 2
if renewAfter < 2*time.Second {
renewAfter = 2 * time.Second
}
} else {
renewAfter = 1 * time.Minute
renewAfter = 1 * time.Second
}
Comment on lines 546 to 553

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

If duration is positive but very small (e.g., less than a few seconds), renewAfter = duration / 2 will result in extremely small durations (milliseconds or microseconds). This causes the renewal loop to busy-spin and spam the OIDC provider or hub with a rapid succession of requests as the expiration time approaches.

To prevent this, enforce a minimum floor for renewAfter (e.g., 2 seconds) when duration is small.

Suggested change
} else if duration > 0 {
renewAfter = duration / 2
} else {
renewAfter = 1 * time.Minute
renewAfter = 1 * time.Second
}
} else if duration > 4*time.Second {
renewAfter = duration / 2
} else {
renewAfter = 2 * time.Second
}

}

Expand All @@ -517,33 +563,45 @@ func (n *SamNode) StartRenewalLoop(ctx context.Context, issuerURL, clientID, cli
case <-timer.C:
fmt.Println("Renewing enrollment...")
var newJWT string
var fetchErr error

if issuerURL != "" {
tokenURL, err := n.DiscoverTokenURL(ctx, issuerURL)
if err != nil {
fmt.Printf("Failed to discover OIDC endpoints for renewal: %v\n", err)
continue
}
newJWT, err = n.FetchJWT(ctx, tokenURL, clientID, clientSecret)
if err != nil {
fmt.Printf("Failed to fetch JWT for renewal: %v\n", err)
continue
fetchErr = fmt.Errorf("failed to discover OIDC endpoints for renewal: %w", err)
} else {
newJWT, fetchErr = n.FetchJWT(ctx, tokenURL, clientID, clientSecret)
if fetchErr != nil {
fetchErr = fmt.Errorf("failed to fetch JWT for renewal: %w", fetchErr)
}
}
} else if jwtPath != "" {
data, err := os.ReadFile(jwtPath)
if err != nil {
fmt.Printf("Failed to read JWT file for renewal: %v\n", err)
continue
fetchErr = fmt.Errorf("failed to read JWT file for renewal: %w", err)
} else {
newJWT = strings.TrimSpace(string(data))
}
newJWT = strings.TrimSpace(string(data))
} else {
fmt.Println("No credentials available for renewal.")
continue
fetchErr = fmt.Errorf("no credentials available for renewal")
}

if fetchErr == nil {
fetchErr = n.Enroll(ctx, newJWT)
}

if err := n.Enroll(ctx, newJWT); err != nil {
fmt.Printf("Renewal enrollment failed: %v\n", err)
if fetchErr != nil {
logger.Errorf("Renewal failed: %v", fetchErr)

// Check if we are already expired and if so, die to avoid a split brain
exp, loadErr := n.Store.LoadIdentityExpiration()
if loadErr == nil && exp > 0 {
if time.Now().After(time.Unix(exp, 0)) {
logger.Fatalf("Identity expired and renewal failed. Exiting to avoid network partition.")
}
}
} else {
fmt.Println("Enrollment renewed successfully.")
logger.Infof("Enrollment renewed successfully.")
}
}
}
Expand Down Expand Up @@ -757,7 +815,7 @@ func (n *SamNode) startDiscovery(ctx context.Context, meshID string, interval ti

if n.Host.Network().Connectedness(p.ID) != network.Connected {
logger.Infof("[Discovery] Found peer not connected via DHT: %s", p.ID)

// Log the addresses returned by DHT to confirm they include p2p-circuit paths
for _, addr := range p.Addrs {
logger.Infof("[Discovery] Peer %s advertised address: %s", p.ID, addr)
Expand Down Expand Up @@ -939,11 +997,11 @@ func (n *SamNode) findProvidersByCID(ctx context.Context, c cid.Cid) ([]peer.Add
for _, addr := range p.Addrs {
logger.Infof("[Discovery] Provider %s advertised address: %s", p.ID, addr)
}

if len(p.Addrs) > 0 {
n.Host.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.TempAddrTTL)
}

providers = append(providers, p)
}
return providers, nil
Expand Down
Loading
Loading