diff --git a/pkg/api/api.go b/pkg/api/api.go index dc8254dc1ea1e3b8f21223f8b9ba6d588486874b..c903f9572abf5622129acce0847a78d2165a07d5 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -52,6 +52,5 @@ type EventHandler interface { // Informer is an interface for external pod data sources to interact with rubik type Informer interface { Publisher - Start(ctx context.Context) - WaitReady() + Start(ctx context.Context) error } diff --git a/pkg/informer/apiserverinformer.go b/pkg/informer/apiserverinformer.go index 58ae00a6e3ac1a92758008844f8f1271235e65ce..d5917f354ec923522e3b4a4fa97ff4eb3962dbe5 100644 --- a/pkg/informer/apiserverinformer.go +++ b/pkg/informer/apiserverinformer.go @@ -78,16 +78,14 @@ func InitKubeClient() (*kubernetes.Clientset, error) { return kubeClient, nil } -func (informer *APIServerInformer) WaitReady() { -} - // Start starts and enables PIServerInformer -func (informer *APIServerInformer) Start(ctx context.Context) { +func (informer *APIServerInformer) Start(ctx context.Context) error { const specNodeNameField = "spec.nodeName" // set options to return only pods on the current node. var fieldSelector = fields.OneTermEqualSelector(specNodeNameField, informer.nodeName).String() informer.listFunc(fieldSelector) informer.watchFunc(ctx, fieldSelector) + return nil } func (informer *APIServerInformer) listFunc(fieldSelector string) { diff --git a/pkg/informer/nriinformer.go b/pkg/informer/nriinformer.go index fb4744dbb908c1fe093cae73d402c0c9f31f774d..e3af51d493fe2d5d87fe524e2799d10c43b6a2d7 100644 --- a/pkg/informer/nriinformer.go +++ b/pkg/informer/nriinformer.go @@ -16,10 +16,12 @@ package informer import ( "context" + "fmt" "os" "github.com/containerd/nri/pkg/api" "github.com/containerd/nri/pkg/stub" + rubikapi "isula.org/rubik/pkg/api" "isula.org/rubik/pkg/common/constant" "isula.org/rubik/pkg/core/typedef" @@ -52,13 +54,17 @@ func NewNRIInformer(publisher rubikapi.Publisher) (rubikapi.Informer, error) { } // start nriplugin -func (plugin NRIInformer) Start(ctx context.Context) { - go plugin.stub.Run(ctx) -} - -// wait sync event finish -func (plugin NRIInformer) WaitReady() { +func (plugin NRIInformer) Start(ctx context.Context) error { + if err := plugin.stub.Start(ctx); err != nil { + return fmt.Errorf("failed to start nri informer: %v", err) + } <-plugin.finishedSync + + go func() { + plugin.stub.Wait() + plugin.stub.Stop() + }() + return nil } // nri sync event diff --git a/pkg/rubik/rubik.go b/pkg/rubik/rubik.go index a0dfa89e2e4847519163edc9e58069fade2ac358..c4026998a3c5b5e1f0475471c377df44567609e0 100644 --- a/pkg/rubik/rubik.go +++ b/pkg/rubik/rubik.go @@ -71,7 +71,6 @@ func (a *Agent) Run(ctx context.Context) error { if err := a.startInformer(ctx, informerName); err != nil { return err } - a.informer.WaitReady() if err := a.startServiceHandler(ctx); err != nil { return err } @@ -98,8 +97,7 @@ func (a *Agent) startInformer(ctx context.Context, informerName string) error { return fmt.Errorf("failed to subscribe informer: %v", err) } a.informer = i - i.Start(ctx) - return nil + return i.Start(ctx) } // stopInformer stops the informer