diff --git a/server/pkg/auth.go b/server/pkg/auth.go index c9410df..4a5ee0d 100644 --- a/server/pkg/auth.go +++ b/server/pkg/auth.go @@ -84,8 +84,16 @@ func (s *authServer) findTaskByRequest(host string, headers map[string]string) ( defer s.mx.RUnlock() var hash string - if routerHeaderValue, ok := headers[routerHeaderName]; ok { + if routerHeaderValue, ok := headers[idRouterHeaderName]; ok { hash = routerHeaderValue + } else if operationID, ok := headers[operationIDRouterHeaderName]; ok { + hash = taskHash(operationID, headers[taskNameRouterHeaderName], headers[serviceRouteHeaderName]) + } else if operationAlias, ok := headers[operationAliasRouterHeaderName]; ok { + operationID, ok := s.operationAliasToID[operationAlias] + if !ok { + return nil, fmt.Errorf("operation by alias %q from header was not found", operationAlias) + } + hash = taskHash(operationID, headers[taskNameRouterHeaderName], headers[serviceRouteHeaderName]) } else if host != "" { subdomain := strings.Split(host, ".")[0] if operationAlias, taskName, service, ok := tryParseAliasSubdomain(subdomain); ok { @@ -93,12 +101,12 @@ func (s *authServer) findTaskByRequest(host string, headers map[string]string) ( if !ok { return nil, fmt.Errorf("operation by alias %q from subdomain was not found", operationAlias) } - hash = (&Task{operationID: operationID, taskName: taskName, service: service}).Hash() + hash = taskHash(operationID, taskName, service) } else { hash = subdomain } } else { - return nil, fmt.Errorf("authority (host) or %s headers are missing in request", routerHeaderName) + return nil, fmt.Errorf("authority (host) or %s headers are missing in request", idRouterHeaderName) } if task, ok := s.hashToTasks[hash]; !ok { @@ -215,3 +223,7 @@ var ( }, } ) + +func taskHash(operationID, taskName, service string) string { + return (&Task{operationID: operationID, taskName: taskName, service: service}).Hash() +} diff --git a/server/pkg/auth_test.go b/server/pkg/auth_test.go index 1cf149b..91f9dc1 100644 --- a/server/pkg/auth_test.go +++ b/server/pkg/auth_test.go @@ -86,7 +86,93 @@ func TestFindTaskByRequest(t *testing.T) { expectedID: task1.operationID, }, - // Source 2: Alias-based subdomain (format: alias-taskname-service) + // Source 2: (operation-id, task-name, service) headers + { + name: "operation-id headers - valid task", + host: "ignored.example.com", + headers: map[string]string{ + "x-yt-taskproxy-operation-id": task1.operationID, + "x-yt-taskproxy-task-name": task1.taskName, + "x-yt-taskproxy-service": task1.service, + }, + expectedID: task1.operationID, + }, + { + name: "operation-id headers - unknown task", + host: "ignored.example.com", + headers: map[string]string{ + "x-yt-taskproxy-operation-id": "op-unknown", + "x-yt-taskproxy-task-name": "worker", + "x-yt-taskproxy-service": "api", + }, + errorMsg: "no entry for hash", + }, + { + name: "operation-id headers - takes precedence over host", + host: task3Hash + ".example.com", + headers: map[string]string{ + "x-yt-taskproxy-operation-id": task1.operationID, + "x-yt-taskproxy-task-name": task1.taskName, + "x-yt-taskproxy-service": task1.service, + }, + expectedID: task1.operationID, + }, + { + name: "operation-id headers - id header takes precedence over operation-id headers", + host: "ignored.example.com", + headers: map[string]string{ + "x-yt-taskproxy-id": task1Hash, + "x-yt-taskproxy-operation-id": task3.operationID, + "x-yt-taskproxy-task-name": task3.taskName, + "x-yt-taskproxy-service": task3.service, + }, + expectedID: task1.operationID, + }, + + // Source 3: (operation-alias, task-name, service) headers + { + name: "operation-alias headers - valid alias", + host: "ignored.example.com", + headers: map[string]string{ + "x-yt-taskproxy-operation-alias": task2.operationAlias, + "x-yt-taskproxy-task-name": task2.taskName, + "x-yt-taskproxy-service": task2.service, + }, + expectedID: task2.operationID, + }, + { + name: "operation-alias headers - unknown alias", + host: "ignored.example.com", + headers: map[string]string{ + "x-yt-taskproxy-operation-alias": "unknownalias", + "x-yt-taskproxy-task-name": "worker", + "x-yt-taskproxy-service": "api", + }, + errorMsg: "operation by alias \"unknownalias\" from header was not found", + }, + { + name: "operation-alias headers - takes precedence over host", + host: task3Hash + ".example.com", + headers: map[string]string{ + "x-yt-taskproxy-operation-alias": task2.operationAlias, + "x-yt-taskproxy-task-name": task2.taskName, + "x-yt-taskproxy-service": task2.service, + }, + expectedID: task2.operationID, + }, + { + name: "operation-alias headers - operation-id headers take precedence", + host: "ignored.example.com", + headers: map[string]string{ + "x-yt-taskproxy-operation-id": task1.operationID, + "x-yt-taskproxy-operation-alias": task2.operationAlias, + "x-yt-taskproxy-task-name": task1.taskName, + "x-yt-taskproxy-service": task1.service, + }, + expectedID: task1.operationID, + }, + + // Source 4: Alias-based subdomain (format: alias-taskname-service) { name: "alias subdomain - valid alias", host: "myalias-master-ui.example.com", @@ -111,7 +197,7 @@ func TestFindTaskByRequest(t *testing.T) { expectedID: task2.operationID, }, - // Source 3: Direct hash from subdomain (fallback) + // Source 5: Direct hash from subdomain (fallback) { name: "direct hash subdomain - valid hash", host: task1Hash + ".example.com", diff --git a/server/pkg/xds.go b/server/pkg/xds.go index e367bcf..0b5176f 100644 --- a/server/pkg/xds.go +++ b/server/pkg/xds.go @@ -4,6 +4,7 @@ import ( "fmt" "log" "net" + "sort" "time" "google.golang.org/grpc" @@ -37,7 +38,13 @@ import ( const ( extAuthClusterName = "extAuthz" - routerHeaderName = "x-yt-taskproxy-id" + + idRouterHeaderName = "x-yt-taskproxy-id" // hash + + operationIDRouterHeaderName = "x-yt-taskproxy-operation-id" + operationAliasRouterHeaderName = "x-yt-taskproxy-operation-alias" + taskNameRouterHeaderName = "x-yt-taskproxy-task-name" + serviceRouteHeaderName = "x-yt-taskproxy-service" ) func ServeGRPC(s serverv3.Server, authServer *authServer) error { @@ -99,25 +106,38 @@ func makeSnapshot(hashToTask map[string]Task, version string, baseDomain string, Action: action, }}, }) - // ... or by custom header + // ... or by custom header(-s) defaultVhostRoutes = append(defaultVhostRoutes, &routev3.Route{ Match: &routev3.RouteMatch{ PathSpecifier: &routev3.RouteMatch_Prefix{Prefix: "/"}, - Headers: []*routev3.HeaderMatcher{ - { - Name: routerHeaderName, - HeaderMatchSpecifier: &routev3.HeaderMatcher_StringMatch{ - StringMatch: &matcherv3.StringMatcher{ - MatchPattern: &matcherv3.StringMatcher_Exact{ - Exact: hash, - }, - }, - }, - }, - }, + Headers: makeHeaderMatchers(map[string]string{idRouterHeaderName: hash}), }, Action: action, }) + defaultVhostRoutes = append(defaultVhostRoutes, &routev3.Route{ + Match: &routev3.RouteMatch{ + PathSpecifier: &routev3.RouteMatch_Prefix{Prefix: "/"}, + Headers: makeHeaderMatchers(map[string]string{ + operationIDRouterHeaderName: task.operationID, + taskNameRouterHeaderName: task.taskName, + serviceRouteHeaderName: task.service, + }), + }, + Action: action, + }) + if task.operationAlias != "" { + defaultVhostRoutes = append(defaultVhostRoutes, &routev3.Route{ + Match: &routev3.RouteMatch{ + PathSpecifier: &routev3.RouteMatch_Prefix{Prefix: "/"}, + Headers: makeHeaderMatchers(map[string]string{ + operationAliasRouterHeaderName: task.operationAlias, + taskNameRouterHeaderName: task.taskName, + serviceRouteHeaderName: task.service, + }), + }, + Action: action, + }) + } } defaultVhostRoutes = append(defaultVhostRoutes, &routev3.Route{ @@ -320,3 +340,28 @@ func mustAny(m proto.Message) *anypb.Any { } return a } + +func makeHeaderMatchers(headers map[string]string) []*routev3.HeaderMatcher { + // Sort keys for deterministic order + keys := make([]string, 0, len(headers)) + for name := range headers { + keys = append(keys, name) + } + sort.Strings(keys) + + matchers := make([]*routev3.HeaderMatcher, 0, len(headers)) + for _, name := range keys { + value := headers[name] + matchers = append(matchers, &routev3.HeaderMatcher{ + Name: name, + HeaderMatchSpecifier: &routev3.HeaderMatcher_StringMatch{ + StringMatch: &matcherv3.StringMatcher{ + MatchPattern: &matcherv3.StringMatcher_Exact{ + Exact: value, + }, + }, + }, + }) + } + return matchers +} diff --git a/server/pkg/xds_test.go b/server/pkg/xds_test.go new file mode 100644 index 0000000..2ecb9d5 --- /dev/null +++ b/server/pkg/xds_test.go @@ -0,0 +1,235 @@ +package pkg + +import ( + "sort" + "testing" + + resourcev3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" + "gopkg.in/yaml.v3" +) + +func TestMakeSnapshot(t *testing.T) { + hashToTask := map[string]Task{ + "abc12345": { + operationID: "op123", + operationAlias: "myalias", + taskName: "worker", + service: "api", + protocol: GRPC, + jobs: []HostPort{ + {host: "10.0.0.1", port: 8080}, + {host: "10.0.0.2", port: 8080}, + }, + }, + } + + snapshot, err := makeSnapshot(hashToTask, "v1", "example.com", false, true) + require.NoError(t, err) + + // Convert snapshot to a structured map for YAML comparison + snapshotMap := make(map[string]any) + + // Get clusters + clusterResources := snapshot.GetResources(resourcev3.ClusterType) + clusterNames := make([]string, 0, len(clusterResources)) + for name := range clusterResources { + clusterNames = append(clusterNames, name) + } + sort.Strings(clusterNames) // Sort for deterministic output + + clusters := make([]map[string]any, 0, len(clusterResources)) + for _, name := range clusterNames { + res := clusterResources[name] + jsonBytes, err := protojson.Marshal(res) + require.NoError(t, err) + + var clusterMap map[string]any + err = yaml.Unmarshal(jsonBytes, &clusterMap) + require.NoError(t, err) + clusters = append(clusters, clusterMap) + } + snapshotMap["clusters"] = clusters + + // Get listener + listenerResources := snapshot.GetResources(resourcev3.ListenerType) + require.Len(t, listenerResources, 1) + + for _, res := range listenerResources { + jsonBytes, err := protojson.Marshal(res) + require.NoError(t, err) + + var listenerMap map[string]any + err = yaml.Unmarshal(jsonBytes, &listenerMap) + require.NoError(t, err) + snapshotMap["listener"] = listenerMap + break + } + + resultYAML, err := yaml.Marshal(snapshotMap) + require.NoError(t, err) + + expectedYAML := `clusters: +- connectTimeout: 2s + loadAssignment: + clusterName: extAuthz + endpoints: + - lbEndpoints: + - endpoint: + address: + socketAddress: + address: 127.0.0.1 + portValue: 9090 + name: extAuthz + type: STATIC + typedExtensionProtocolOptions: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + '@type': type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicitHttpConfig: + http2ProtocolOptions: {} +- connectTimeout: 2s + loadAssignment: + clusterName: op123-worker-api-0 + endpoints: + - lbEndpoints: + - endpoint: + address: + socketAddress: + address: 10.0.0.1 + portValue: 8080 + name: op123-worker-api-0 + type: STRICT_DNS + typedExtensionProtocolOptions: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + '@type': type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicitHttpConfig: + http2ProtocolOptions: {} +- connectTimeout: 2s + loadAssignment: + clusterName: op123-worker-api-1 + endpoints: + - lbEndpoints: + - endpoint: + address: + socketAddress: + address: 10.0.0.2 + portValue: 8080 + name: op123-worker-api-1 + type: STRICT_DNS + typedExtensionProtocolOptions: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + '@type': type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicitHttpConfig: + http2ProtocolOptions: {} +listener: + accessLog: + - name: envoy.access_loggers.stderr + typedConfig: + '@type': type.googleapis.com/envoy.extensions.access_loggers.stream.v3.StderrAccessLog + address: + socketAddress: + address: 0.0.0.0 + portValue: 8080 + filterChains: + - filters: + - name: envoy.filters.network.http_connection_manager + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + http2ProtocolOptions: {} + httpFilters: + - name: envoy.filters.http.ext_authz + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.ext_authz.v3.ExtAuthz + grpcService: + envoyGrpc: + clusterName: extAuthz + timeout: 0.800s + - name: envoy.filters.http.router + typedConfig: + '@type': type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + routeConfig: + name: local_routes + virtualHosts: + - domains: + - abc12345.example.com + - myalias-worker-api.example.com + name: op123-worker-api + routes: + - match: + prefix: / + route: + weightedClusters: + clusters: + - name: op123-worker-api-0 + weight: 1 + - name: op123-worker-api-1 + weight: 1 + - domains: + - '*' + name: vhost_default + routes: + - match: + headers: + - name: x-yt-taskproxy-id + stringMatch: + exact: abc12345 + prefix: / + route: + weightedClusters: + clusters: + - name: op123-worker-api-0 + weight: 1 + - name: op123-worker-api-1 + weight: 1 + - match: + headers: + - name: x-yt-taskproxy-operation-id + stringMatch: + exact: op123 + - name: x-yt-taskproxy-service + stringMatch: + exact: api + - name: x-yt-taskproxy-task-name + stringMatch: + exact: worker + prefix: / + route: + weightedClusters: + clusters: + - name: op123-worker-api-0 + weight: 1 + - name: op123-worker-api-1 + weight: 1 + - match: + headers: + - name: x-yt-taskproxy-operation-alias + stringMatch: + exact: myalias + - name: x-yt-taskproxy-service + stringMatch: + exact: api + - name: x-yt-taskproxy-task-name + stringMatch: + exact: worker + prefix: / + route: + weightedClusters: + clusters: + - name: op123-worker-api-0 + weight: 1 + - name: op123-worker-api-1 + weight: 1 + - directResponse: + body: + inlineString: no such task + status: 404 + match: + prefix: / + statPrefix: ingress_http + name: listener_0 +` + + assert.YAMLEq(t, expectedYAML, string(resultYAML)) +}