|
|
|
package kube
|
|
|
|
|
|
|
|
import (
|
|
|
|
"sync"
|
|
|
|
|
|
|
|
"github.com/zeromicro/go-zero/core/lang"
|
|
|
|
"github.com/zeromicro/go-zero/core/logx"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
|
|
)
|
|
|
|
|
|
|
|
// EventHandler is ResourceEventHandler implementation.
|
|
|
|
type EventHandler struct {
|
|
|
|
update func([]string)
|
|
|
|
endpoints map[string]lang.PlaceholderType
|
|
|
|
lock sync.Mutex
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewEventHandler returns an EventHandler.
|
|
|
|
func NewEventHandler(update func([]string)) *EventHandler {
|
|
|
|
return &EventHandler{
|
|
|
|
update: update,
|
|
|
|
endpoints: make(map[string]lang.PlaceholderType),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// OnAdd handles the endpoints add events.
|
|
|
|
func (h *EventHandler) OnAdd(obj interface{}) {
|
|
|
|
endpoints, ok := obj.(*v1.Endpoints)
|
|
|
|
if !ok {
|
|
|
|
logx.Errorf("%v is not an object with type *v1.Endpoints", obj)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
h.lock.Lock()
|
|
|
|
defer h.lock.Unlock()
|
|
|
|
|
|
|
|
var changed bool
|
|
|
|
for _, sub := range endpoints.Subsets {
|
|
|
|
for _, point := range sub.Addresses {
|
|
|
|
if _, ok := h.endpoints[point.IP]; !ok {
|
|
|
|
h.endpoints[point.IP] = lang.Placeholder
|
|
|
|
changed = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if changed {
|
|
|
|
h.notify()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// OnDelete handles the endpoints delete events.
|
|
|
|
func (h *EventHandler) OnDelete(obj interface{}) {
|
|
|
|
endpoints, ok := obj.(*v1.Endpoints)
|
|
|
|
if !ok {
|
|
|
|
logx.Errorf("%v is not an object with type *v1.Endpoints", obj)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
h.lock.Lock()
|
|
|
|
defer h.lock.Unlock()
|
|
|
|
|
|
|
|
var changed bool
|
|
|
|
for _, sub := range endpoints.Subsets {
|
|
|
|
for _, point := range sub.Addresses {
|
|
|
|
if _, ok := h.endpoints[point.IP]; ok {
|
|
|
|
delete(h.endpoints, point.IP)
|
|
|
|
changed = true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if changed {
|
|
|
|
h.notify()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// OnUpdate handles the endpoints update events.
|
|
|
|
func (h *EventHandler) OnUpdate(oldObj, newObj interface{}) {
|
|
|
|
oldEndpoints, ok := oldObj.(*v1.Endpoints)
|
|
|
|
if !ok {
|
|
|
|
logx.Errorf("%v is not an object with type *v1.Endpoints", oldObj)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
newEndpoints, ok := newObj.(*v1.Endpoints)
|
|
|
|
if !ok {
|
|
|
|
logx.Errorf("%v is not an object with type *v1.Endpoints", newObj)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if oldEndpoints.ResourceVersion == newEndpoints.ResourceVersion {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
h.Update(newEndpoints)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Update updates the endpoints.
|
|
|
|
func (h *EventHandler) Update(endpoints *v1.Endpoints) {
|
|
|
|
h.lock.Lock()
|
|
|
|
defer h.lock.Unlock()
|
|
|
|
|
|
|
|
old := h.endpoints
|
|
|
|
h.endpoints = make(map[string]lang.PlaceholderType)
|
|
|
|
for _, sub := range endpoints.Subsets {
|
|
|
|
for _, point := range sub.Addresses {
|
|
|
|
h.endpoints[point.IP] = lang.Placeholder
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if diff(old, h.endpoints) {
|
|
|
|
h.notify()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (h *EventHandler) notify() {
|
|
|
|
var targets []string
|
|
|
|
|
|
|
|
for k := range h.endpoints {
|
|
|
|
targets = append(targets, k)
|
|
|
|
}
|
|
|
|
|
|
|
|
h.update(targets)
|
|
|
|
}
|
|
|
|
|
|
|
|
func diff(o, n map[string]lang.PlaceholderType) bool {
|
|
|
|
if len(o) != len(n) {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
|
|
|
|
for k := range o {
|
|
|
|
if _, ok := n[k]; !ok {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return false
|
|
|
|
}
|