Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .gimps.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ sets:
- 'k8s.io/**'
- 'sigs.k8s.io/controller-runtime/**'
- 'sigs.k8s.io/controller-tools/**'
- 'sigs.k8s.io/multicluster-runtime/**'
- 'sigs.k8s.io/yaml/**'
- 'github.com/kcp-dev/client-go/**'
- 'github.com/kcp-dev/kubernetes/**'
- name: kcp
patterns:
- 'github.com/kcp-dev/kcp/**'
- 'github.com/kcp-dev/multicluster-provider/**'
- 'github.com/kcp-dev/sdk/**'
- 'github.com/kcp-dev/logicalcluster/**'
- 'github.com/kcp-dev/code-generator/**'
- 'sigs.k8s.io/multicluster-runtime/**'
- 'github.com/kcp-dev/multicluster-provider/**'
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ linters:
# Controller Runtime
- pkg: sigs.k8s.io/controller-runtime/pkg/client
alias: ctrlruntimeclient
# kcp APIs
- pkg: github.com/kcp-dev/sdk/apis/(\w+)/(v[\w\d]+)
alias: kcp$1$2
no-unaliased: true
exclusions:
generated: lax
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ install-yq:
@UNCOMPRESSED=true hack/uget.sh https://github.com/mikefarah/yq/releases/download/v{VERSION}/yq_{GOOS}_{GOARCH} yq $(YQ_VERSION) yq_*

.PHONY: install-kcp
install-kcp: UGET_CHECKSUMS=false # do not checksum because the version regularly gets overwritten in CI jobs
install-kcp: UGET_CHECKSUMS= # do not checksum because the version regularly gets overwritten in CI jobs
install-kcp:
@hack/uget.sh https://github.com/kcp-dev/kcp/releases/download/v{VERSION}/kcp_{VERSION}_{GOOS}_{GOARCH}.tar.gz kcp $(KCP_VERSION)

Expand Down
144 changes: 54 additions & 90 deletions cmd/api-syncagent/kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ import (
"fmt"
"regexp"

"github.com/kcp-dev/logicalcluster/v3"

"github.com/kcp-dev/api-syncagent/internal/kcp"

kcpdevv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/apis/v1alpha1"
kcpdevcore "github.com/kcp-dev/kcp/sdk/apis/core"
kcpdevcorev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1"
"github.com/kcp-dev/logicalcluster/v3"
kcpapisv1alpha1 "github.com/kcp-dev/sdk/apis/apis/v1alpha1"
kcpcore "github.com/kcp-dev/sdk/apis/core"
kcpcorev1alpha1 "github.com/kcp-dev/sdk/apis/core/v1alpha1"

"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -39,44 +38,29 @@ import (
"sigs.k8s.io/controller-runtime/pkg/cluster"
)

// The agent has two potentially different kcp clusters:
//
// endpointCluster - this is where the source of the virtual workspace URLs
// live, i.e. where the APIExport/EndpointSlice.
// managedCluster - this is where the APIExport and APIResourceSchemas
// exist that are meant to be reconciled.
//
// The managedCluster always exists, the endpointCluster only if the workspace
// for the virtual workspace source is different from the managed cluster.

// setupEndpointKcpCluster sets up a plain, non-kcp-aware ctrl-runtime Cluster object
// that is solvely used to watch whichever object holds the virtual workspace URLs,
// either the APIExport or the APIExportEndpointSlice.
func setupEndpointKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
// no need for a dedicated endpoint cluster
if endpoint.EndpointSlice == nil || endpoint.EndpointSlice.Cluster == endpoint.APIExport.Cluster {
return nil, nil
}

func setupEndpointKcpCluster(endpointSlice qualifiedAPIExportEndpointSlice) (cluster.Cluster, error) {
scheme := runtime.NewScheme()

if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevv1alpha1.SchemeGroupVersion, err)
if err := kcpapisv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpapisv1alpha1.SchemeGroupVersion, err)
}

if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
if err := kcpcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpcorev1alpha1.SchemeGroupVersion, err)
}

// RBAC in kcp might be very tight and might not allow to list/watch all objects;
// restrict the cache's selectors accordingly so we can still make use of caching.
byObject := map[ctrlruntimeclient.Object]cache.ByObject{
&kcpdevv1alpha1.APIExportEndpointSlice{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpoint.EndpointSlice.Name}),
&kcpapisv1alpha1.APIExportEndpointSlice{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpointSlice.Name}),
},
}

return cluster.New(endpoint.EndpointSlice.Config, func(o *cluster.Options) {
return cluster.New(endpointSlice.Config, func(o *cluster.Options) {
o.Scheme = scheme
o.Cache = cache.Options{
Scheme: scheme,
Expand All @@ -87,26 +71,26 @@ func setupEndpointKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {

// setupManagedKcpCluster sets up a plain, non-kcp-aware ctrl-runtime Cluster object
// that is solvely used to manage the APIExport and APIResourceSchemas.
func setupManagedKcpCluster(endpoint *syncEndpoint) (cluster.Cluster, error) {
func setupManagedKcpCluster(apiExport qualifiedAPIExport) (cluster.Cluster, error) {
scheme := runtime.NewScheme()

if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevv1alpha1.SchemeGroupVersion, err)
if err := kcpapisv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpapisv1alpha1.SchemeGroupVersion, err)
}

if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
if err := kcpcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpcorev1alpha1.SchemeGroupVersion, err)
}

// RBAC in kcp might be very tight and might not allow to list/watch all objects;
// restrict the cache's selectors accordingly so we can still make use of caching.
byObject := map[ctrlruntimeclient.Object]cache.ByObject{
&kcpdevv1alpha1.APIExport{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": endpoint.APIExport.Name}),
&kcpapisv1alpha1.APIExport{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.name": apiExport.Name}),
},
}

return cluster.New(endpoint.APIExport.Config, func(o *cluster.Options) {
return cluster.New(apiExport.Config, func(o *cluster.Options) {
o.Scheme = scheme
o.Cache = cache.Options{
Scheme: scheme,
Expand All @@ -122,18 +106,18 @@ type qualifiedCluster struct {
}

type qualifiedAPIExport struct {
*kcpdevv1alpha1.APIExport
*kcpapisv1alpha1.APIExport
qualifiedCluster
}

type qualifiedAPIExportEndpointSlice struct {
*kcpdevv1alpha1.APIExportEndpointSlice
*kcpapisv1alpha1.APIExportEndpointSlice
qualifiedCluster
}

type syncEndpoint struct {
APIExport qualifiedAPIExport
EndpointSlice *qualifiedAPIExportEndpointSlice
EndpointSlice qualifiedAPIExportEndpointSlice
}

// resolveSyncEndpoint takes the user provided (usually via CLI flags) APIExportEndpointSliceRef and
Expand All @@ -142,14 +126,14 @@ type syncEndpoint struct {
// must point to the cluster where the APIExport lives, and vice versa for the endpoint slice;
// however the endpoint slice references an APIExport in potentially another cluster, and for this
// case the initialRestConfig will be rewritten accordingly).
func resolveSyncEndpoint(ctx context.Context, initialRestConfig *rest.Config, endpointSliceRef string, apiExportRef string) (*syncEndpoint, error) {
func resolveSyncEndpoint(ctx context.Context, initialRestConfig *rest.Config, endpointSliceRef string) (*syncEndpoint, error) {
// construct temporary, uncached client
scheme := runtime.NewScheme()
if err := kcpdevcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevcorev1alpha1.SchemeGroupVersion, err)
if err := kcpcorev1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpcorev1alpha1.SchemeGroupVersion, err)
}
if err := kcpdevv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpdevv1alpha1.SchemeGroupVersion, err)
if err := kcpapisv1alpha1.AddToScheme(scheme); err != nil {
return nil, fmt.Errorf("failed to register scheme %s: %w", kcpapisv1alpha1.SchemeGroupVersion, err)
}

clientOpts := ctrlruntimeclient.Options{Scheme: scheme}
Expand All @@ -160,58 +144,38 @@ func resolveSyncEndpoint(ctx context.Context, initialRestConfig *rest.Config, en

se := &syncEndpoint{}

// When an endpoint ref is given, both the APIExportEndpointSlice and the APIExport must exist.
if endpointSliceRef != "" {
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, endpointSliceRef)
if err != nil {
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
}
endpointSlice.Config = initialRestConfig

// find the APIExport referenced not by the user (can't: both ref parameters to this function
// are mutually exclusive), but in the APIExportEndpointSlice.
restConfig, err := retargetRestConfig(initialRestConfig, endpointSlice.Spec.APIExport.Path)
if err != nil {
return nil, fmt.Errorf("failed to re-target the given kubeconfig to cluster %q: %w", endpointSlice.Spec.APIExport.Path, err)
}

client, err := ctrlruntimeclient.New(restConfig, clientOpts)
if err != nil {
return nil, fmt.Errorf("failed to create service reader: %w", err)
}
// First we find the APIExportEndpointSlice.
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, endpointSliceRef)
if err != nil {
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
}
endpointSlice.Config = initialRestConfig

apiExport, err := resolveAPIExport(ctx, client, endpointSlice.Spec.APIExport.Name)
if err != nil {
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
}
apiExport.Config = restConfig

se.APIExport = apiExport
se.EndpointSlice = &endpointSlice
} else { // if an export ref is given, the endpoint slice is optional (for compat with kcp <0.28)
apiExport, err := resolveAPIExport(ctx, client, apiExportRef)
if err != nil {
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
}
apiExport.Config = initialRestConfig
// Now we find the APIExport referenced in the APIExportEndpointSlice.
restConfig, err := retargetRestConfig(initialRestConfig, endpointSlice.Spec.APIExport.Path)
if err != nil {
return nil, fmt.Errorf("failed to re-target the given kubeconfig to cluster %q: %w", endpointSlice.Spec.APIExport.Path, err)
}

se.APIExport = apiExport
client, err = ctrlruntimeclient.New(restConfig, clientOpts)
if err != nil {
return nil, fmt.Errorf("failed to create service reader: %w", err)
}

// try to find an endpoint slice in the same workspace with the same name as the APIExport
endpointSlice, err := resolveAPIExportEndpointSlice(ctx, client, apiExportRef)
if ctrlruntimeclient.IgnoreNotFound(err) != nil {
return nil, fmt.Errorf("failed to resolve APIExportEndpointSlice: %w", err)
} else if err == nil {
apiExport.Config = initialRestConfig
se.EndpointSlice = &endpointSlice
}
apiExport, err := resolveAPIExport(ctx, client, endpointSlice.Spec.APIExport.Name)
if err != nil {
return nil, fmt.Errorf("failed to resolve APIExport: %w", err)
}
apiExport.Config = restConfig

se.APIExport = apiExport
se.EndpointSlice = endpointSlice

return se, nil
}

func resolveAPIExportEndpointSlice(ctx context.Context, client ctrlruntimeclient.Client, ref string) (qualifiedAPIExportEndpointSlice, error) {
endpointSlice := &kcpdevv1alpha1.APIExportEndpointSlice{}
endpointSlice := &kcpapisv1alpha1.APIExportEndpointSlice{}
key := types.NamespacedName{Name: ref}
if err := client.Get(ctx, key, endpointSlice); err != nil {
return qualifiedAPIExportEndpointSlice{}, fmt.Errorf("failed to get APIExportEndpointSlice %q: %w", ref, err)
Expand All @@ -232,7 +196,7 @@ func resolveAPIExportEndpointSlice(ctx context.Context, client ctrlruntimeclient
}

func resolveAPIExport(ctx context.Context, client ctrlruntimeclient.Client, ref string) (qualifiedAPIExport, error) {
apiExport := &kcpdevv1alpha1.APIExport{}
apiExport := &kcpapisv1alpha1.APIExport{}
key := types.NamespacedName{Name: ref}
if err := client.Get(ctx, key, apiExport); err != nil {
return qualifiedAPIExport{}, fmt.Errorf("failed to get APIExport %q: %w", ref, err)
Expand All @@ -253,13 +217,13 @@ func resolveAPIExport(ctx context.Context, client ctrlruntimeclient.Client, ref
}

func resolveCurrentCluster(ctx context.Context, client ctrlruntimeclient.Client) (logicalcluster.Name, logicalcluster.Path, error) {
lc := &kcpdevcorev1alpha1.LogicalCluster{}
lc := &kcpcorev1alpha1.LogicalCluster{}
if err := client.Get(ctx, types.NamespacedName{Name: kcp.IdentityClusterName}, lc); err != nil {
return "", logicalcluster.None, fmt.Errorf("failed to resolve current workspace: %w", err)
}

lcName := logicalcluster.From(lc)
lcPath := logicalcluster.NewPath(lc.Annotations[kcpdevcore.LogicalClusterPathAnnotationKey])
lcPath := logicalcluster.NewPath(lc.Annotations[kcpcore.LogicalClusterPathAnnotationKey])

return lcName, lcPath, nil
}
Expand Down
Loading