diff --git a/go.mod b/go.mod index 7da7a5177..2f2f40f68 100644 --- a/go.mod +++ b/go.mod @@ -75,6 +75,7 @@ require ( github.com/muesli/termenv v0.11.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/projectcalico/api v0.0.0-20230602153125-fb7148692637 // indirect github.com/prometheus/client_model v0.4.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect diff --git a/go.sum b/go.sum index 036617f54..1305c2a62 100644 --- a/go.sum +++ b/go.sum @@ -418,6 +418,8 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021/go.mod h1:prYjPmNq4d1NPVmpShWobRqXY3q7Vp+80DqgxxUrUIA= +github.com/projectcalico/api v0.0.0-20230602153125-fb7148692637 h1:F48and+6vKJsRMl95Y/XKVik0Kwhos8YShTH9Fsdqlw= +github.com/projectcalico/api v0.0.0-20230602153125-fb7148692637/go.mod h1:d3yVTVhVHDawgeKrru/ZZD8QLEtiKQciUaAwnua47Qg= github.com/prometheus-community/pro-bing v0.3.0 h1:SFT6gHqXwbItEDJhTkzPWVqU6CLEtqEfNAPp47RUON4= github.com/prometheus-community/pro-bing v0.3.0/go.mod h1:p9dLb9zdmv+eLxWfCT6jESWuDrS+YzpPkQBgysQF8a0= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= diff --git a/pkg/routeagent_driver/handlers/calico/ippool_handler.go b/pkg/routeagent_driver/handlers/calico/ippool_handler.go new file mode 100644 index 000000000..a3e354c4d --- /dev/null +++ b/pkg/routeagent_driver/handlers/calico/ippool_handler.go @@ -0,0 +1,211 @@ +/* +SPDX-License-Identifier: Apache-2.0 + +Copyright Contributors to the Submariner project. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package calico + +import ( + "context" + "fmt" + "strings" + "sync/atomic" + + "github.com/pkg/errors" + calicoapi "github.com/projectcalico/api/pkg/apis/projectcalico/v3" + calicocs "github.com/projectcalico/api/pkg/client/clientset_generated/clientset" + "github.com/submariner-io/admiral/pkg/log" + submV1 "github.com/submariner-io/submariner/pkg/apis/submariner.io/v1" + "github.com/submariner-io/submariner/pkg/cidr" + "github.com/submariner-io/submariner/pkg/cni" + "github.com/submariner-io/submariner/pkg/event" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + errorutils "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/rest" + logf "sigs.k8s.io/controller-runtime/pkg/log" +) + +const ( + submarinerIPPool = "submariner.io/ippool" +) + +type calicoIPPoolHandler struct { + event.HandlerBase + restConfig *rest.Config + client *calicocs.Clientset + remoteEndpoints map[string]*submV1.Endpoint + isGateway atomic.Bool +} + +var logger = log.Logger{Logger: logf.Log.WithName("CalicoIPPool")} + +func NewCalicoIPPoolHandler(restConfig *rest.Config) event.Handler { + return &calicoIPPoolHandler{ + restConfig: restConfig, + remoteEndpoints: map[string]*submV1.Endpoint{}, + } +} + +func (h *calicoIPPoolHandler) GetNetworkPlugins() []string { + return []string{cni.Calico} +} + +func (h *calicoIPPoolHandler) GetName() string { + return "Calico IPPool handler" +} + +func (h *calicoIPPoolHandler) Init() error { + var err error + + h.client, err = calicocs.NewForConfig(h.restConfig) + + return errors.Wrap(err, "error initializing Calico clientset") +} + +func (h *calicoIPPoolHandler) RemoteEndpointCreated(endpoint *submV1.Endpoint) error { + h.remoteEndpoints[endpoint.Name] = endpoint + if !h.isGateway.Load() { + logger.V(log.TRACE).Info("Ignore RemoteEndpointCreated event (node isn't Gateway)") + return nil + } + + err := h.createIPPool(endpoint) + + return errors.Wrap(err, "failed to handle RemoteEndpointCreated event") +} + +func (h *calicoIPPoolHandler) RemoteEndpointRemoved(endpoint *submV1.Endpoint) error { + delete(h.remoteEndpoints, endpoint.Name) + + if !h.isGateway.Load() { + logger.V(log.TRACE).Info("Ignore RemoteEndpointRemoved event (node isn't Gateway)") + return nil + } + + err := h.deleteIPPool(endpoint) + + return errors.Wrap(err, "failed to handle RemoteEndpointRemoved event") +} + +func (h *calicoIPPoolHandler) TransitionToNonGateway() error { + logger.Info("TransitionToNonGateway") + + h.isGateway.Swap(false) + + return nil +} + +func (h *calicoIPPoolHandler) TransitionToGateway() error { + var retErrors []error + logger.Info("TransitionToGateway") + + h.isGateway.Swap(true) + + for _, endpoint := range h.remoteEndpoints { + err := h.createIPPool(endpoint) + if err != nil { + logger.Warningf("Failed to create ippool %s", endpoint.GetName()) + retErrors = append(retErrors, + errors.Wrapf(err, "error creating Calico IPPool for endpoint %q ", endpoint.GetName())) + } + } + + return errorutils.NewAggregate(retErrors) +} + +func (h *calicoIPPoolHandler) Stop(uninstall bool) error { + if !uninstall || !h.isGateway.Load() { + return nil + } + + logger.Info("Uninstalling Calico IPPools used for Submariner") + + labelSelector := labels.SelectorFromSet(map[string]string{submarinerIPPool: "true"}).String() + err := h.client.ProjectcalicoV3().IPPools().DeleteCollection(context.TODO(), metav1.DeleteOptions{}, + metav1.ListOptions{LabelSelector: labelSelector}) + + if err != nil && !apierrors.IsNotFound(err) { + return errors.Wrapf(err, "Failed to delete Calico IPPools using labelSelector %q", labelSelector) + } + + logger.Infof("Successfully delete Calico IPPools using labelSelector %q", labelSelector) + + return nil +} + +func (h *calicoIPPoolHandler) createIPPool(endpoint *submV1.Endpoint) error { + subnets := cidr.ExtractIPv4Subnets(endpoint.Spec.Subnets) + var retErrors []error + + for _, subnet := range subnets { + iPPoolObj := &calicoapi.IPPool{ + ObjectMeta: metav1.ObjectMeta{ + Name: getEndpointSubnetIPPoolName(endpoint, subnet), + Labels: map[string]string{submarinerIPPool: "true"}, + }, + Spec: calicoapi.IPPoolSpec{ + CIDR: subnet, + NATOutgoing: false, + Disabled: true, + }, + } + _, err := h.client.ProjectcalicoV3().IPPools().Create(context.TODO(), iPPoolObj, metav1.CreateOptions{}) + + if err == nil { + logger.Infof("Successfully created Calico IPPool %q", iPPoolObj.GetName()) + continue + } + + if !apierrors.IsAlreadyExists(err) { + retErrors = append(retErrors, + errors.Wrapf(err, "error creating Calico IPPool for ClusterID %q subnet %q (is Calico API server running?)", + endpoint.Spec.ClusterID, subnet)) + } + } + + return errorutils.NewAggregate(retErrors) +} + +func (h *calicoIPPoolHandler) deleteIPPool(endpoint *submV1.Endpoint) error { + subnets := cidr.ExtractIPv4Subnets(endpoint.Spec.Subnets) + var retErrors []error + + for _, subnet := range subnets { + poolName := getEndpointSubnetIPPoolName(endpoint, subnet) + + err := h.client.ProjectcalicoV3().IPPools().Delete(context.TODO(), + poolName, metav1.DeleteOptions{}) + + if err == nil { + logger.Infof("Successfully deleted Calico IPPool %q", poolName) + continue + } + + if !apierrors.IsNotFound(err) { + retErrors = append(retErrors, + errors.Wrapf(err, "error deleting Calico IPPool for ClusterID %q subnet %q (is Calico API server running?)", + endpoint.Spec.ClusterID, subnet)) + } + } + + return errorutils.NewAggregate(retErrors) +} + +func getEndpointSubnetIPPoolName(endpoint *submV1.Endpoint, subnet string) string { + return fmt.Sprintf("submariner-%s-%s", endpoint.Spec.ClusterID, strings.ReplaceAll(subnet, "/", "-")) +} diff --git a/pkg/routeagent_driver/main.go b/pkg/routeagent_driver/main.go index b5b01dd7d..9947138ac 100644 --- a/pkg/routeagent_driver/main.go +++ b/pkg/routeagent_driver/main.go @@ -39,6 +39,7 @@ import ( cniapi "github.com/submariner-io/submariner/pkg/routeagent_driver/cni" "github.com/submariner-io/submariner/pkg/routeagent_driver/constants" "github.com/submariner-io/submariner/pkg/routeagent_driver/environment" + "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/calico" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/kubeproxy" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/mtu" "github.com/submariner-io/submariner/pkg/routeagent_driver/handlers/ovn" @@ -102,6 +103,7 @@ func main() { cabledriver.NewXRFMCleanupHandler(), cabledriver.NewVXLANCleanup(), mtu.NewMTUHandler(env.ClusterCidr, len(env.GlobalCidr) != 0, getTCPMssValue(k8sClientSet)), + calico.NewCalicoIPPoolHandler(cfg), ); err != nil { logger.Fatalf("Error registering the handlers: %s", err.Error()) }