Skip to content

Commit

Permalink
Proxy connect get cluster impersonator secret use secretLister
Browse files Browse the repository at this point in the history
Signed-off-by: huangyanfeng <huangyanfeng1992@gmail.com>
  • Loading branch information
yanfeng1992 committed Aug 16, 2023
1 parent 1953a0e commit 8e7d11a
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 6 deletions.
3 changes: 2 additions & 1 deletion cmd/aggregated-apiserver/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ func (o *Options) Run(ctx context.Context) error {
restConfig := config.GenericConfig.ClientConfig
restConfig.QPS, restConfig.Burst = o.KubeAPIQPS, o.KubeAPIBurst
kubeClientSet := kubernetes.NewForConfigOrDie(restConfig)
secretLister := config.GenericConfig.SharedInformerFactory.Core().V1().Secrets().Lister()

server, err := config.Complete().New(kubeClientSet)
server, err := config.Complete().New(kubeClientSet, secretLister)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/aggregatedapiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
"k8s.io/client-go/kubernetes"
listcorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"

clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
Expand Down Expand Up @@ -53,7 +54,7 @@ func (cfg *Config) Complete() CompletedConfig {
return CompletedConfig{&c}
}

func (c completedConfig) New(kubeClient kubernetes.Interface) (*APIServer, error) {
func (c completedConfig) New(kubeClient kubernetes.Interface, secretLister listcorev1.SecretLister) (*APIServer, error) {
genericServer, err := c.GenericConfig.New("aggregated-apiserver", genericapiserver.NewEmptyDelegate())
if err != nil {
return nil, err
Expand All @@ -65,7 +66,7 @@ func (c completedConfig) New(kubeClient kubernetes.Interface) (*APIServer, error

apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(clusterapis.GroupName, clusterscheme.Scheme, clusterscheme.ParameterCodec, clusterscheme.Codecs)

clusterStorage, err := clusterstorage.NewStorage(clusterscheme.Scheme, kubeClient, c.GenericConfig.RESTOptionsGetter)
clusterStorage, err := clusterstorage.NewStorage(clusterscheme.Scheme, kubeClient, secretLister, c.GenericConfig.RESTOptionsGetter)
if err != nil {
klog.Errorf("Unable to create REST storage for a resource due to %v, will die", err)
return nil, err
Expand Down
12 changes: 11 additions & 1 deletion pkg/registry/cluster/storage/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/kubernetes"
listcorev1 "k8s.io/client-go/listers/core/v1"

clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
"github.com/karmada-io/karmada/pkg/util/proxy"
Expand All @@ -18,6 +19,7 @@ import (
// ProxyREST implements the proxy subresource for a Cluster.
type ProxyREST struct {
kubeClient kubernetes.Interface
secretLister listcorev1.SecretLister
clusterGetter func(ctx context.Context, name string) (*clusterapis.Cluster, error)
}

Expand Down Expand Up @@ -54,7 +56,15 @@ func (r *ProxyREST) Connect(ctx context.Context, id string, options runtime.Obje
}

secretGetter := func(ctx context.Context, namespace string, name string) (*corev1.Secret, error) {
return r.kubeClient.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{})
secret, err := r.secretLister.Secrets(namespace).Get(name)
if err != nil {
// fall back to call api server in case the cache has not been synchronized yet
secret, err = r.kubeClient.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{})
if err != nil {
return nil, err
}
}
return secret, nil
}
return proxy.ConnectCluster(ctx, cluster, proxyOpts.Path, secretGetter, responder)
}
Expand Down
47 changes: 46 additions & 1 deletion pkg/registry/cluster/storage/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"

Expand All @@ -31,6 +32,7 @@ func TestProxyREST_Connect(t *testing.T) {
defer s.Close()

type fields struct {
secret *corev1.Secret
kubeClient kubernetes.Interface
clusterGetter func(ctx context.Context, name string) (*clusterapis.Cluster, error)
}
Expand All @@ -48,6 +50,10 @@ func TestProxyREST_Connect(t *testing.T) {
{
name: "options is invalid",
fields: fields{
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"},
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")},
},
kubeClient: fake.NewSimpleClientset(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"},
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")},
Expand All @@ -72,6 +78,10 @@ func TestProxyREST_Connect(t *testing.T) {
{
name: "cluster not found",
fields: fields{
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"},
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")},
},
kubeClient: fake.NewSimpleClientset(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"},
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")},
Expand All @@ -88,8 +98,9 @@ func TestProxyREST_Connect(t *testing.T) {
want: "",
},
{
name: "proxy success",
name: "proxy success without secret cache",
fields: fields{
secret: &corev1.Secret{},
kubeClient: fake.NewSimpleClientset(&corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"},
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")},
Expand All @@ -111,19 +122,53 @@ func TestProxyREST_Connect(t *testing.T) {
wantErr: false,
want: "ok",
},
{
name: "proxy success",
fields: fields{
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "ns"},
Data: map[string][]byte{clusterapis.SecretTokenKey: []byte("token")},
},
kubeClient: fake.NewSimpleClientset(),
clusterGetter: func(_ context.Context, name string) (*clusterapis.Cluster, error) {
return &clusterapis.Cluster{
ObjectMeta: metav1.ObjectMeta{Name: name},
Spec: clusterapis.ClusterSpec{
APIEndpoint: s.URL,
ImpersonatorSecretRef: &clusterapis.LocalSecretReference{Namespace: "ns", Name: "secret"},
},
}, nil
},
},
args: args{
id: "cluster",
options: &clusterapis.ClusterProxyOptions{Path: "/proxy"},
},
wantErr: false,
want: "ok",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
stopCh := make(chan struct{})
defer close(stopCh)

req, err := http.NewRequestWithContext(request.WithUser(request.NewContext(), &user.DefaultInfo{}), http.MethodGet, "http://127.0.0.1/xxx", nil)
if err != nil {
t.Fatal(err)
}
resp := httptest.NewRecorder()

kubeFactory := informers.NewSharedInformerFactory(fake.NewSimpleClientset(tt.fields.secret), 0)
r := &ProxyREST{
secretLister: kubeFactory.Core().V1().Secrets().Lister(),
kubeClient: tt.fields.kubeClient,
clusterGetter: tt.fields.clusterGetter,
}

kubeFactory.Start(stopCh)
kubeFactory.WaitForCacheSync(stopCh)

h, err := r.Connect(req.Context(), tt.args.id, tt.args.options, utiltest.NewResponder(resp))
if (err != nil) != tt.wantErr {
t.Errorf("Connect() error = %v, wantErr %v", err, tt.wantErr)
Expand Down
4 changes: 3 additions & 1 deletion pkg/registry/cluster/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/kubernetes"
listcorev1 "k8s.io/client-go/listers/core/v1"
"sigs.k8s.io/structured-merge-diff/v4/fieldpath"

clusterapis "github.com/karmada-io/karmada/pkg/apis/cluster"
Expand All @@ -30,7 +31,7 @@ type ClusterStorage struct {
}

// NewStorage returns a ClusterStorage object that will work against clusters.
func NewStorage(scheme *runtime.Scheme, kubeClient kubernetes.Interface, optsGetter generic.RESTOptionsGetter) (*ClusterStorage, error) {
func NewStorage(scheme *runtime.Scheme, kubeClient kubernetes.Interface, secretLister listcorev1.SecretLister, optsGetter generic.RESTOptionsGetter) (*ClusterStorage, error) {
strategy := clusterregistry.NewStrategy(scheme)

store := &genericregistry.Store{
Expand Down Expand Up @@ -63,6 +64,7 @@ func NewStorage(scheme *runtime.Scheme, kubeClient kubernetes.Interface, optsGet
Status: &StatusREST{&statusStore},
Proxy: &ProxyREST{
kubeClient: kubeClient,
secretLister: secretLister,
clusterGetter: clusterRest.getCluster,
},
}, nil
Expand Down

0 comments on commit 8e7d11a

Please sign in to comment.