Skip to content

Commit 18cbe7e

Browse files
committed
Reimplement controller with informers and workqueue
1 parent 99992fc commit 18cbe7e

File tree

5 files changed

+254
-141
lines changed

5 files changed

+254
-141
lines changed

cmd/k8s-ec2-srcdst/main.go

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,25 @@ import (
33
"flag"
44
"fmt"
55
"os"
6+
"time"
7+
8+
"k8s.io/client-go/informers"
9+
10+
"github.com/ottoyiu/k8s-ec2-srcdst/pkg/signals"
611

712
"github.com/aws/aws-sdk-go/aws"
813
"github.com/aws/aws-sdk-go/aws/session"
914
"github.com/aws/aws-sdk-go/service/ec2"
1015
"github.com/golang/glog"
11-
srcdst "github.com/ottoyiu/k8s-ec2-srcdst"
16+
"github.com/ottoyiu/k8s-ec2-srcdst"
1217
"github.com/ottoyiu/k8s-ec2-srcdst/pkg/common"
1318
"github.com/ottoyiu/k8s-ec2-srcdst/pkg/controller"
14-
"k8s.io/apimachinery/pkg/util/wait"
1519
"k8s.io/client-go/kubernetes"
1620
)
1721

1822
func main() {
1923
kubeconfig := flag.String("kubeconfig", "", "Path to a kubeconfig file")
24+
numWorkers := flag.Int("numWorkers", 1, "Number of workers to run in the controller")
2025
version := flag.Bool("version", false, "Prints current k8s-ec2-srcdst version")
2126

2227
flag.Set("logtostderr", "true")
@@ -30,20 +35,35 @@ func main() {
3035
// Build the client config - optionally using a provided kubeconfig file.
3136
config, err := common.GetClientConfig(*kubeconfig)
3237
if err != nil {
33-
glog.Fatalf("Failed to load client config: %v", err)
38+
glog.Fatalf("Failed to load client config: %v", err.Error())
3439
}
3540

3641
// Construct the Kubernetes client
3742
client, err := kubernetes.NewForConfig(config)
3843
if err != nil {
39-
glog.Fatalf("Failed to create kubernetes client: %v", err)
44+
glog.Fatalf("Failed to create kubernetes client: %v", err.Error())
4045
}
4146

4247
glog.Infof("k8s-ec2-srcdst: %v", srcdst.Version)
4348

44-
awsSession := session.New()
4549
awsConfig := &aws.Config{}
50+
awsSession, err := session.NewSession(awsConfig)
51+
if err != nil {
52+
glog.Fatalf("Failed to create AWS session: %v", err.Error())
53+
}
54+
4655
ec2Client := ec2.New(awsSession, awsConfig)
4756

48-
controller.NewSrcDstController(client, ec2Client).Controller.Run(wait.NeverStop)
57+
stopCh := signals.SetupSignalHandler()
58+
59+
nodeInformerFactory := informers.NewSharedInformerFactory(client, time.Second*30)
60+
61+
srcDstController := controller.NewSrcDstController(client, nodeInformerFactory.Core().V1().Nodes(),
62+
ec2Client)
63+
64+
go nodeInformerFactory.Start(stopCh)
65+
66+
if err = srcDstController.Run(*numWorkers, stopCh); err != nil {
67+
glog.Fatalf("Error running controller: %v", err.Error())
68+
}
4969
}

pkg/common/util.go

100644100755
Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package common
22

33
import (
4-
"k8s.io/api/core/v1"
5-
"k8s.io/apimachinery/pkg/runtime"
64
"k8s.io/client-go/rest"
75
"k8s.io/client-go/tools/clientcmd"
86
)
@@ -16,18 +14,3 @@ func GetClientConfig(kubeconfig string) (*rest.Config, error) {
1614
}
1715
return rest.InClusterConfig()
1816
}
19-
20-
// CopyObjToNode copies a Node object, so that no changes would be done
21-
// to the original Node which is part of the cache
22-
func CopyObjToNode(obj interface{}) (*v1.Node, error) {
23-
objCopy, err := runtime.NewScheme().Copy(obj.(*v1.Node))
24-
if err != nil {
25-
return nil, err
26-
}
27-
28-
node := objCopy.(*v1.Node)
29-
if node.Annotations == nil {
30-
node.Annotations = make(map[string]string)
31-
}
32-
return node, nil
33-
}

pkg/controller/srcdst_controller.go

100644100755
Lines changed: 143 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -4,110 +4,158 @@ import (
44
"fmt"
55
"net/url"
66
"strings"
7+
"sync"
78
"time"
89

910
"github.com/golang/glog"
11+
"k8s.io/apimachinery/pkg/util/runtime"
12+
"k8s.io/apimachinery/pkg/util/wait"
13+
"k8s.io/client-go/util/retry"
1014

1115
"github.com/aws/aws-sdk-go/aws"
1216
"github.com/aws/aws-sdk-go/service/ec2"
13-
"github.com/aws/aws-sdk-go/service/ec2/ec2iface"
14-
"github.com/ottoyiu/k8s-ec2-srcdst/pkg/common"
15-
1617
"k8s.io/api/core/v1"
17-
"k8s.io/apimachinery/pkg/fields"
18+
informer "k8s.io/client-go/informers/core/v1"
1819
"k8s.io/client-go/kubernetes"
1920
"k8s.io/client-go/tools/cache"
21+
"k8s.io/client-go/tools/record"
22+
"k8s.io/client-go/util/workqueue"
2023
)
2124

2225
type Controller struct {
23-
client kubernetes.Interface
24-
Controller cache.Controller
25-
ec2Client ec2iface.EC2API
26+
client kubernetes.Interface
27+
nodeInformer cache.SharedIndexInformer
28+
nodeSynced cache.InformerSynced
29+
workqueue workqueue.RateLimitingInterface
30+
recorder record.EventRecorder
31+
ec2Client *ec2.EC2
32+
wg sync.WaitGroup
2633
}
2734

2835
const (
29-
SrcDstCheckDisabledAnnotation = "kubernetes-ec2-srcdst-controller.ottoyiu.com/srcdst-check-disabled" // used as the Node Annotation key
36+
// used as the Node Annotation key
37+
SrcDstCheckDisabledAnnotation = "kubernetes-ec2-srcdst-controller.ottoyiu.com/srcdst-check-disabled"
3038
)
3139

32-
// NewSrcDstController creates a new Kubernetes controller using client-go's Informer
33-
func NewSrcDstController(client kubernetes.Interface, ec2Client *ec2.EC2) *Controller {
40+
// NewSrcDstController creates a new Kubernetes controller to monitor Kubernetes nodes and disable src-dst
41+
// check on EC2 instances.
42+
func NewSrcDstController(client kubernetes.Interface,
43+
nodeInformer informer.NodeInformer,
44+
ec2Client *ec2.EC2) *Controller {
45+
3446
c := &Controller{
35-
client: client,
36-
ec2Client: ec2Client,
47+
client: client,
48+
nodeInformer: nodeInformer.Informer(),
49+
nodeSynced: nodeInformer.Informer().HasSynced,
50+
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Nodes"),
51+
ec2Client: ec2Client,
3752
}
3853

39-
nodeListWatcher := cache.NewListWatchFromClient(
40-
client.Core().RESTClient(),
41-
"nodes",
42-
v1.NamespaceAll,
43-
fields.Everything())
44-
45-
_, controller := cache.NewInformer(
46-
nodeListWatcher,
47-
&v1.Node{},
48-
60*time.Second,
49-
// Callback Functions to trigger on add/update/delete
50-
cache.ResourceEventHandlerFuncs{
51-
AddFunc: c.handler,
52-
UpdateFunc: func(old, new interface{}) { c.handler(new) },
54+
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
55+
AddFunc: c.handler,
56+
UpdateFunc: func(old, new interface{}) {
57+
c.handler(new)
5358
},
54-
)
55-
56-
c.Controller = controller
59+
})
5760

5861
return c
5962
}
6063

64+
func (c *Controller) Run(numWorkers int, stopCh <-chan struct{}) error {
65+
defer runtime.HandleCrash()
66+
defer c.workqueue.ShutDown()
67+
68+
if ok := cache.WaitForCacheSync(stopCh, c.nodeSynced); !ok {
69+
return fmt.Errorf("caches have failed to sync")
70+
}
71+
72+
var wg sync.WaitGroup
73+
for i := 0; i < numWorkers; i++ {
74+
c.wg.Add(1)
75+
go func() {
76+
defer wg.Done()
77+
go wait.Until(c.runWorker, time.Second, stopCh)
78+
}()
79+
}
80+
81+
c.wg.Wait()
82+
<-stopCh
83+
return nil
84+
}
85+
86+
func (c *Controller) runWorker() {
87+
for c.processNextWorkItem() {
88+
}
89+
}
90+
91+
func (c *Controller) processNextWorkItem() bool {
92+
key, quit := c.workqueue.Get()
93+
if quit {
94+
return false
95+
}
96+
glog.Infof("Get key %s", key.(string))
97+
98+
defer c.workqueue.Forget(key)
99+
if srcDstEnabled, err := c.checkSrcDstAttributeEnabled(key.(string)); err != nil {
100+
glog.Error(err)
101+
c.workqueue.AddRateLimited(key)
102+
} else if srcDstEnabled {
103+
return true
104+
}
105+
106+
if err := c.disableSrcDstCheck(key.(string)); err != nil {
107+
c.workqueue.AddRateLimited(key)
108+
}
109+
110+
return true
111+
}
112+
61113
func (c *Controller) handler(obj interface{}) {
62-
// this handler makes sure that all nodes within a cluster has its src/dst check disabled in EC2
63-
node, ok := obj.(*v1.Node)
64-
if !ok {
65-
glog.Errorf("Expected Node but handler received: %+v", obj)
114+
115+
key, err := cache.MetaNamespaceKeyFunc(obj)
116+
if err != nil {
66117
return
67118
}
68-
glog.V(4).Infof("Received update of node: %s", node.Name)
69-
c.disableSrcDstIfEnabled(node)
119+
glog.Infof("Adding key %s", key)
120+
c.workqueue.Add(key)
121+
return
70122
}
71123

72-
func (c *Controller) disableSrcDstIfEnabled(node *v1.Node) {
73-
srcDstCheckEnabled := true
74-
if node.Annotations != nil {
75-
if _, ok := node.Annotations[SrcDstCheckDisabledAnnotation]; ok {
76-
srcDstCheckEnabled = false
124+
func (c *Controller) disableSrcDstCheck(key string) error {
125+
defer c.workqueue.Done(key)
126+
127+
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
128+
nodeObj, err := c.getNodeObjByKey(key)
129+
if err != nil {
130+
return err
77131
}
78-
}
79132

80-
if srcDstCheckEnabled {
133+
nodeCopy := nodeObj.DeepCopy()
81134
// src dst check disabled annotation does not exist
82135
// call AWS ec2 api to disable
83-
instanceID, err := GetInstanceIDFromProviderID(node.Spec.ProviderID)
84-
if err != nil {
85-
glog.Errorf("Fail to retrieve Instance ID from Provider ID: %v", node.Spec.ProviderID)
86-
return
87-
}
88-
err = c.disableSrcDstCheck(*instanceID)
136+
instanceID, err := GetInstanceIDFromProviderID(nodeCopy.Spec.ProviderID)
89137
if err != nil {
90-
glog.Errorf("Fail to disable src dst check for EC2 instance: %v; %v", *instanceID, err)
91-
return
138+
glog.Errorf("Failed to retrieve Instance ID from Provider ID: %v", nodeCopy.Spec.ProviderID)
139+
return err
92140
}
93-
// We should not modify the cache object directly, so we make a copy first
94-
nodeCopy, err := common.CopyObjToNode(node)
95-
if err != nil {
96-
glog.Errorf("Failed to make copy of node: %v", err)
97-
return
141+
if err = c.modifySrcDstCheckAttribute(*instanceID); err != nil {
142+
glog.Errorf("Failed to disable src dst check for EC2 instance: %v; %v", *instanceID, err)
143+
return err
98144
}
99-
glog.Infof("Marking node %s with SrcDstCheckDisabledAnnotation", node.Name)
145+
146+
glog.Infof("Marking node %s with SrcDstCheckDisabledAnnotation", nodeCopy.Name)
100147
nodeCopy.Annotations[SrcDstCheckDisabledAnnotation] = "true"
101-
if _, err := c.client.Core().Nodes().Update(nodeCopy); err != nil {
148+
149+
if _, err := c.client.CoreV1().Nodes().Update(nodeCopy); err != nil {
102150
glog.Errorf("Failed to set %s annotation: %v", SrcDstCheckDisabledAnnotation, err)
151+
return err
103152
}
104-
} else {
105-
glog.V(4).Infof("Skipping node %s because it already has the SrcDstCheckDisabledAnnotation", node.Name)
106153

107-
}
154+
return nil
155+
})
108156
}
109157

110-
func (c *Controller) disableSrcDstCheck(instanceID string) error {
158+
func (c *Controller) modifySrcDstCheckAttribute(instanceID string) error {
111159
_, err := c.ec2Client.ModifyInstanceAttribute(
112160
&ec2.ModifyInstanceAttributeInput{
113161
InstanceId: aws.String(instanceID),
@@ -123,14 +171,13 @@ func (c *Controller) disableSrcDstCheck(instanceID string) error {
123171
// GetInstanceIDFromProviderID will only retrieve the InstanceID from AWS
124172
func GetInstanceIDFromProviderID(providerID string) (*string, error) {
125173
// providerID is in this format: aws:///availability-zone/instanceID
126-
// TODO: why the extra slash in the provider ID of kubernetes anyways?
127174
if !strings.HasPrefix(providerID, "aws") {
128-
return nil, fmt.Errorf("Node is not in AWS EC2, skipping...")
175+
return nil, fmt.Errorf("node is not in AWS EC2, skipping!")
129176
}
130177
providerID = strings.Replace(providerID, "///", "//", 1)
131178
url, err := url.Parse(providerID)
132179
if err != nil {
133-
return nil, fmt.Errorf("Invalid providerID (%s): %v", providerID, err)
180+
return nil, fmt.Errorf("invalid providerID (%s): %v", providerID, err)
134181
}
135182
instanceID := url.Path
136183
instanceID = strings.Trim(instanceID, "/")
@@ -139,8 +186,39 @@ func GetInstanceIDFromProviderID(providerID string) (*string, error) {
139186
// i-12345678 and i-12345678abcdef01
140187
// TODO: Regex match?
141188
if strings.Contains(instanceID, "/") || !strings.HasPrefix(instanceID, "i-") {
142-
return nil, fmt.Errorf("Invalid format for AWS instanceID (%s)", instanceID)
189+
return nil, fmt.Errorf("invalid format for AWS instanceID (%s)", instanceID)
143190
}
144191

145192
return &instanceID, nil
146193
}
194+
195+
func (c *Controller) getNodeObjByKey(key string) (nodeObj *v1.Node, err error) {
196+
nodeItem, exists, err := c.nodeInformer.GetIndexer().GetByKey(key)
197+
if err != nil {
198+
return nil, err
199+
}
200+
201+
if !exists {
202+
return nil, fmt.Errorf("node object %s doesn't exist", key)
203+
}
204+
205+
return nodeItem.(*v1.Node), nil
206+
}
207+
208+
func (c *Controller) checkSrcDstAttributeEnabled(key string) (enabled bool, err error) {
209+
defer c.workqueue.Done(key)
210+
211+
nodeObj, err := c.getNodeObjByKey(key)
212+
if err != nil {
213+
return false, err
214+
}
215+
216+
if nodeObj.Annotations != nil {
217+
if _, ok := nodeObj.Annotations[SrcDstCheckDisabledAnnotation]; ok {
218+
glog.Info("The node has the annotation")
219+
return true, nil
220+
}
221+
}
222+
223+
return false, nil
224+
}

0 commit comments

Comments
 (0)