diff --git a/cmd/aggregated-apiserver/app/options/options.go b/cmd/aggregated-apiserver/app/options/options.go index 0f80327a6768..9af56845add1 100644 --- a/cmd/aggregated-apiserver/app/options/options.go +++ b/cmd/aggregated-apiserver/app/options/options.go @@ -92,8 +92,9 @@ func (o *Options) Run(ctx context.Context) error { restConfig := config.GenericConfig.ClientConfig restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst kubeClientSet := kubernetes.NewForConfigOrDie(restConfig) + secretLister := config.GenericConfig.SharedInformerFactory.Core().V1().Secrets().Lister() - server, err := config.Complete().New(kubeClientSet) + server, err := config.Complete().New(kubeClientSet, secretLister) if err != nil { return err } diff --git a/pkg/aggregatedapiserver/apiserver.go b/pkg/aggregatedapiserver/apiserver.go index db4db15e96ee..9772d0150e02 100644 --- a/pkg/aggregatedapiserver/apiserver.go +++ b/pkg/aggregatedapiserver/apiserver.go @@ -5,6 +5,7 @@ import ( "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/kubernetes" + listcorev1 "k8s.io/client-go/listers/core/v1" "k8s.io/klog/v2" clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" @@ -53,7 +54,7 @@ func (cfg *Config) Complete() CompletedConfig { return CompletedConfig{&c} } -func (c completedConfig) New(kubeClient kubernetes.Interface) (*APIServer, error) { +func (c completedConfig) New(kubeClient kubernetes.Interface, secretLister listcorev1.SecretLister) (*APIServer, error) { genericServer, err := c.GenericConfig.New("aggregated-apiserver", genericapiserver.NewEmptyDelegate()) if err != nil { return nil, err @@ -65,7 +66,7 @@ func (c completedConfig) New(kubeClient kubernetes.Interface) (*APIServer, error apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(clusterapis.GroupName, clusterscheme.Scheme, clusterscheme.ParameterCodec, clusterscheme.Codecs) - clusterStorage, err := clusterstorage.NewStorage(clusterscheme.Scheme, kubeClient, c.GenericConfig.RESTOptionsGetter) + clusterStorage, err := clusterstorage.NewStorage(clusterscheme.Scheme, kubeClient, secretLister, c.GenericConfig.RESTOptionsGetter) if err != nil { klog.Errorf("Unable to create REST storage for a resource due to %v, will die", err) return nil, err diff --git a/pkg/registry/cluster/storage/proxy.go b/pkg/registry/cluster/storage/proxy.go index 33c1d66fda84..a42ae5c21585 100644 --- a/pkg/registry/cluster/storage/proxy.go +++ b/pkg/registry/cluster/storage/proxy.go @@ -10,6 +10,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/kubernetes" + listcorev1 "k8s.io/client-go/listers/core/v1" clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" "github.com/karmada-io/karmada/pkg/util/proxy" @@ -18,6 +19,7 @@ import ( // ProxyREST implements the proxy subresource for a Cluster. type ProxyREST struct { kubeClient kubernetes.Interface + secretLister listcorev1.SecretLister clusterGetter func(ctx context.Context, name string) (*clusterapis.Cluster, error) } @@ -54,7 +56,15 @@ func (r *ProxyREST) Connect(ctx context.Context, id string, options runtime.Obje } secretGetter := func(ctx context.Context, namespace string, name string) (*corev1.Secret, error) { - return r.kubeClient.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{}) + secret, err := r.secretLister.Secrets(namespace).Get(name) + if err != nil { + // fall back to call api server in case the cache has not been synchronized yet + secret, err = r.kubeClient.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + } + return secret, nil } return proxy.ConnectCluster(ctx, cluster, proxyOpts.Path, secretGetter, responder) } diff --git a/pkg/registry/cluster/storage/proxy_test.go b/pkg/registry/cluster/storage/proxy_test.go index f35eccd76cb9..02cde71c5eb2 100644 --- a/pkg/registry/cluster/storage/proxy_test.go +++ b/pkg/registry/cluster/storage/proxy_test.go @@ -13,6 +13,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" @@ -31,6 +32,7 @@ func TestProxyREST_Connect(t *testing.T) { defer s.Close() type fields struct { + secret *corev1.Secret kubeClient kubernetes.Interface clusterGetter func(ctx context.Context, name string) (*clusterapis.Cluster, error) } @@ -48,6 +50,10 @@ func TestProxyREST_Connect(t *testing.T) { { name: "options is invalid", fields: fields{ + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"}, + Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")}, + }, kubeClient: fake.NewSimpleClientset(&corev1.Secret{ ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"}, Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")}, @@ -72,6 +78,10 @@ func TestProxyREST_Connect(t *testing.T) { { name: "cluster not found", fields: fields{ + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"}, + Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")}, + }, kubeClient: fake.NewSimpleClientset(&corev1.Secret{ ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"}, Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")}, @@ -88,8 +98,9 @@ func TestProxyREST_Connect(t *testing.T) { want: "", }, { - name: "proxy success", + name: "proxy success without secret cache", fields: fields{ + secret: &corev1.Secret{}, kubeClient: fake.NewSimpleClientset(&corev1.Secret{ ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"}, Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")}, @@ -111,19 +122,53 @@ func TestProxyREST_Connect(t *testing.T) { wantErr: false, want: "ok", }, + { + name: "proxy success", + fields: fields{ + secret: &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"}, + Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")}, + }, + kubeClient: fake.NewSimpleClientset(), + clusterGetter: func(_ context.Context, name string) (*clusterapis.Cluster, error) { + return &clusterapis.Cluster{ + ObjectMeta: metav1.ObjectMeta{Name: name}, + Spec: clusterapis.ClusterSpec{ + APIEndpoint: s.URL, + ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"}, + }, + }, nil + }, + }, + args: args{ + id: "cluster", + options: &clusterapis.ClusterProxyOptions{Path: "/proxy"}, + }, + wantErr: false, + want: "ok", + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { + stopCh := make(chan struct{}) + defer close(stopCh) + req, err := http.NewRequestWithContext(request.WithUser(request.NewContext(), &user.DefaultInfo{}), http.MethodGet, "http://127.0.0.1/xxx", nil) if err != nil { t.Fatal(err) } resp := httptest.NewRecorder() + kubeFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.fields.secret), 0) r := &ProxyREST{ + secretLister: kubeFactory.Core().V1().Secrets().Lister(), kubeClient: tt.fields.kubeClient, clusterGetter: tt.fields.clusterGetter, } + + kubeFactory.Start(stopCh) + kubeFactory.WaitForCacheSync(stopCh) + h, err := r.Connect(req.Context(), tt.args.id, tt.args.options, utiltest.NewResponder(resp)) if (err != nil) != tt.wantErr { t.Errorf("Connect() error = %v, wantErr %v", err, tt.wantErr) diff --git a/pkg/registry/cluster/storage/storage.go b/pkg/registry/cluster/storage/storage.go index 237ab6fc82b4..b6af646b05ac 100644 --- a/pkg/registry/cluster/storage/storage.go +++ b/pkg/registry/cluster/storage/storage.go @@ -12,6 +12,7 @@ import ( genericregistry "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/client-go/kubernetes" + listcorev1 "k8s.io/client-go/listers/core/v1" "sigs.k8s.io/structured-merge-diff/v4/fieldpath" clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster" @@ -30,7 +31,7 @@ type ClusterStorage struct { } // NewStorage returns a ClusterStorage object that will work against clusters. -func NewStorage(scheme *runtime.Scheme, kubeClient kubernetes.Interface, optsGetter generic.RESTOptionsGetter) (*ClusterStorage, error) { +func NewStorage(scheme *runtime.Scheme, kubeClient kubernetes.Interface, secretLister listcorev1.SecretLister, optsGetter generic.RESTOptionsGetter) (*ClusterStorage, error) { strategy := clusterregistry.NewStrategy(scheme) store := &genericregistry.Store{ @@ -63,6 +64,7 @@ func NewStorage(scheme *runtime.Scheme, kubeClient kubernetes.Interface, optsGet Status: &StatusREST{&statusStore}, Proxy: &ProxyREST{ kubeClient: kubeClient, + secretLister: secretLister, clusterGetter: clusterRest.getCluster, }, }, nil