-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
208 lines (167 loc) · 4.66 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
package main
import (
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"os/signal"
"sort"
"strings"
"syscall"
"time"
"github.com/gleez/peer-watch/peerwatch"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
)
var (
// LDFLAGS should overwrite these variables in build time.
Version string
Revision string
labelSelector = flag.String("label-selector", "app=peer-watch", "The label to watch against pods")
namespace = flag.String("namespace", apiv1.NamespaceDefault, "The Kubernetes namespace for the pods")
inCluster = flag.Bool("use-cluster-credentials", false, "Should this request use cluster credentials?")
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
addr = flag.String("http", ":4040", "If non-empty, stand up a simple webserver that reports the peer state")
debug = flag.Bool("debug", false, "Should enable debug?")
versionFlag = flag.Bool("version", false, "display version and exit")
selfUrl string
urlSet = make(UrlSet)
initialized bool = false
)
func makeClient() (*kubernetes.Clientset, error) {
var cfg *rest.Config
var err error
if *inCluster {
if cfg, err = rest.InClusterConfig(); err != nil {
return nil, err
}
} else {
if *kubeconfig != "" {
if cfg, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {
return nil, err
}
}
}
if cfg == nil {
return nil, fmt.Errorf("peer-watch: config is not set")
}
return kubernetes.NewForConfig(rest.AddUserAgent(cfg, "peer-watch"))
}
func webHandler(res http.ResponseWriter, req *http.Request) {
podUrls := urlSet.Keys()
data, err := json.Marshal(podUrls)
if err != nil {
res.WriteHeader(http.StatusInternalServerError)
res.Write([]byte(err.Error()))
return
}
res.WriteHeader(http.StatusOK)
res.Write(data)
}
func validateFlags() {
if *kubeconfig == "" && *inCluster == false {
klog.Fatal("peer-watch: both --kubeconfig and --use-cluster-credentials cannot be empty")
}
if n := os.Getenv("POD_IP"); n == "" {
klog.Fatal("peer-watch: pod ip env value cannot be empty")
}
}
func main() {
flag.Parse()
if *versionFlag {
fmt.Printf("peer-watch: version=%s revision=%s\n", Version, Revision)
os.Exit(0)
}
validateFlags()
myIp := os.Getenv("POD_IP")
if n := os.Getenv("POD_CACHE_NAMESPACE"); n != "" {
*namespace = n
}
if ls := os.Getenv("POD_CACHE_LABEL_SELECTOR"); ls != "" {
*labelSelector = ls
}
client, err := makeClient()
if err != nil {
klog.Fatalf("peer-watch: error connecting to the client: %v", err)
}
// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("peer-watch: Received termination, signaling shutdown")
// cancel()
}()
opts := metav1.ListOptions{LabelSelector: *labelSelector}
initialPeers, err := peerwatch.Init(myIp, *namespace, opts, monitorPodsFn, client, *debug)
if err != nil {
// Setup groupcache with just self as peer
klog.Infof("peer-watch: WARNING error getting initial pods: %v", err)
if *debug {
klog.Infof("peer-watch: init %s", podUrl(myIp))
}
urlSet[podUrl(myIp)] = true
initialized = true
return
}
if err == nil {
for _, ip := range initialPeers {
if ip != "" {
urlSet[podUrl(ip)] = true
}
}
if *debug {
klog.Infof("peer-watch: init %s", podUrl(myIp))
}
initialized = true
}
if len(*addr) > 0 {
klog.Infof("peer-watch: http server starting at %s", *addr)
http.HandleFunc("/", webHandler)
klog.Fatal(http.ListenAndServe(*addr, nil))
} else {
select {}
}
}
func monitorPodsFn(ip string, state peerwatch.NotifyState) {
for !initialized {
// prevent race condition by waiting for initial peers to be setup before processing any changes
time.Sleep(100 * time.Millisecond)
}
klog.Infof("peer-watch: Got notify: %s [%d]", ip, state)
switch state {
case peerwatch.Added:
urlSet[podUrl(ip)] = true
case peerwatch.Removed:
delete(urlSet, podUrl(ip))
default:
return
}
podUrls := urlSet.Keys()
klog.Infof("peer-watch: New pod list = %v", podUrls)
}
func podUrl(podIp string) string {
// return fmt.Sprintf("http://%s:%d", podIp, *port)
return strings.TrimSpace(podIp)
}
type UrlSet map[string]bool
func (urlSet UrlSet) Keys() []string {
keys := make([]string, len(urlSet))
i := 0
for key := range urlSet {
keys[i] = key
i++
}
sort.Strings(keys)
return keys
}
func (urlSet UrlSet) String() string {
return fmt.Sprintf("%v", urlSet.Keys())
}