From 37b5fcbb802f06442a9c08c94e709080a747f7ad Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Thu, 8 Dec 2022 15:53:52 +0100 Subject: [PATCH] Add event sending to publishservice Issue-ID: NONRTRIC-814 Signed-off-by: elinuxhenrik Change-Id: Ic708cfcbe50d28d49c58dc36536a7cfbfdba6211 --- .../internal/publishservice/publishservice.go | 30 ++++++++++--- .../internal/publishservice/publishservice_test.go | 51 ++++++++++++++++++---- capifcore/main.go | 18 +++++++- capifcore/main_test.go | 15 +++++++ 4 files changed, 97 insertions(+), 17 deletions(-) diff --git a/capifcore/internal/publishservice/publishservice.go b/capifcore/internal/publishservice/publishservice.go index a4b84f1..17f4b28 100644 --- a/capifcore/internal/publishservice/publishservice.go +++ b/capifcore/internal/publishservice/publishservice.go @@ -31,6 +31,7 @@ import ( "k8s.io/utils/strings/slices" "oransc.org/nonrtric/capifcore/internal/common29122" + "oransc.org/nonrtric/capifcore/internal/eventsapi" publishapi "oransc.org/nonrtric/capifcore/internal/publishserviceapi" "oransc.org/nonrtric/capifcore/internal/helmmanagement" @@ -56,15 +57,17 @@ type PublishService struct { publishedServices map[string][]publishapi.ServiceAPIDescription serviceRegister providermanagement.ServiceRegister helmManager helmmanagement.HelmManager + eventChannel chan<- eventsapi.EventNotification lock sync.Mutex } // Creates a service that implements both the PublishRegister and the publishserviceapi.ServerInterface interfaces. -func NewPublishService(serviceRegister providermanagement.ServiceRegister, hm helmmanagement.HelmManager) *PublishService { +func NewPublishService(serviceRegister providermanagement.ServiceRegister, hm helmmanagement.HelmManager, eventChannel chan<- eventsapi.EventNotification) *PublishService { return &PublishService{ helmManager: hm, publishedServices: make(map[string][]publishapi.ServiceAPIDescription), serviceRegister: serviceRegister, + eventChannel: eventChannel, } } @@ -177,6 +180,7 @@ func (ps *PublishService) PostApfIdServiceApis(ctx echo.Context, apfId string) e if shouldReturn { return returnValue } + go ps.sendEvent(newServiceAPIDescription, eventsapi.CAPIFEventSERVICEAPIAVAILABLE) _, ok := ps.publishedServices[apfId] if ok { @@ -222,6 +226,7 @@ func (ps *PublishService) DeleteApfIdServiceApisServiceApiId(ctx echo.Context, a ps.lock.Lock() defer ps.lock.Unlock() ps.publishedServices[string(apfId)] = removeServiceDescription(pos, serviceDescriptions) + go ps.sendEvent(*description, eventsapi.CAPIFEventSERVICEAPIUNAVAILABLE) } } return ctx.NoContent(http.StatusNoContent) @@ -272,9 +277,6 @@ func (ps *PublishService) ModifyIndAPFPubAPI(ctx echo.Context, apfId string, ser // Update a published service API. func (ps *PublishService) PutApfIdServiceApisServiceApiId(ctx echo.Context, apfId string, serviceApiId string) error { - ps.lock.Lock() - defer ps.lock.Unlock() - pos, publishedService, shouldReturn, returnValue := ps.checkIfServiceIsPublished(apfId, serviceApiId, ctx) if shouldReturn { return returnValue @@ -286,8 +288,12 @@ func (ps *PublishService) PutApfIdServiceApisServiceApiId(ctx echo.Context, apfI } if updatedServiceDescription.Description != nil { + ps.lock.Lock() + defer ps.lock.Unlock() + publishedService.Description = updatedServiceDescription.Description ps.publishedServices[apfId][pos] = publishedService + go ps.sendEvent(publishedService, eventsapi.CAPIFEventSERVICEAPIUPDATE) } err := ctx.JSON(http.StatusOK, ps.publishedServices[apfId][pos]) @@ -295,12 +301,10 @@ func (ps *PublishService) PutApfIdServiceApisServiceApiId(ctx echo.Context, apfI // Something really bad happened, tell Echo that our handler failed return err } - return nil } func (ps *PublishService) checkIfServiceIsPublished(apfId string, serviceApiId string, ctx echo.Context) (int, publishapi.ServiceAPIDescription, bool, error) { - publishedServices, ok := ps.publishedServices[apfId] if !ok { return 0, publishapi.ServiceAPIDescription{}, true, sendCoreError(ctx, http.StatusBadRequest, "Service must be published before updating it") @@ -315,7 +319,6 @@ func (ps *PublishService) checkIfServiceIsPublished(apfId string, serviceApiId s } return 0, publishapi.ServiceAPIDescription{}, true, sendCoreError(ctx, http.StatusBadRequest, "Service must be published before updating it") - } func getServiceFromRequest(ctx echo.Context) (publishapi.ServiceAPIDescription, bool, error) { @@ -327,6 +330,19 @@ func getServiceFromRequest(ctx echo.Context) (publishapi.ServiceAPIDescription, return updatedServiceDescription, false, nil } +func (ps *PublishService) sendEvent(service publishapi.ServiceAPIDescription, eventType eventsapi.CAPIFEvent) { + apiIds := []string{*service.ApiId} + apis := []publishapi.ServiceAPIDescription{service} + event := eventsapi.EventNotification{ + EventDetail: &eventsapi.CAPIFEventDetail{ + ApiIds: &apiIds, + ServiceAPIDescriptions: &apis, + }, + Events: eventType, + } + ps.eventChannel <- event +} + // This function wraps sending of an error in the Error format, and // handling the failure to marshal that. func sendCoreError(ctx echo.Context, code int, message string) error { diff --git a/capifcore/internal/publishservice/publishservice_test.go b/capifcore/internal/publishservice/publishservice_test.go index 7bb0152..cad151c 100644 --- a/capifcore/internal/publishservice/publishservice_test.go +++ b/capifcore/internal/publishservice/publishservice_test.go @@ -25,8 +25,10 @@ import ( "net/http" "os" "testing" + "time" "oransc.org/nonrtric/capifcore/internal/common29122" + "oransc.org/nonrtric/capifcore/internal/eventsapi" "oransc.org/nonrtric/capifcore/internal/providermanagement" "github.com/labstack/echo/v4" @@ -51,7 +53,7 @@ func TestPublishUnpublishService(t *testing.T) { serviceRegisterMock.On("GetAefsForPublisher", apfId).Return([]string{aefId, "otherAefId"}) helmManagerMock := helmMocks.HelmManager{} helmManagerMock.On("InstallHelmChart", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - serviceUnderTest, requestHandler := getEcho(&serviceRegisterMock, &helmManagerMock) + serviceUnderTest, eventChannel, requestHandler := getEcho(&serviceRegisterMock, &helmManagerMock) // Check no services published for provider result := testutil.NewRequest().Get("/"+apfId+"/service-apis").Go(t, requestHandler) @@ -83,6 +85,12 @@ func TestPublishUnpublishService(t *testing.T) { serviceRegisterMock.AssertCalled(t, "GetAefsForPublisher", apfId) helmManagerMock.AssertCalled(t, "InstallHelmChart", namespace, repoName, chartName, releaseName) assert.ElementsMatch(t, []string{aefId}, serviceUnderTest.getAllAefIds()) + if publishEvent, ok := waitForEvent(eventChannel, 1*time.Second); ok { + assert.Fail(t, "No event sent") + } else { + assert.Equal(t, *resultService.ApiId, (*publishEvent.EventDetail.ApiIds)[0]) + assert.Equal(t, eventsapi.CAPIFEventSERVICEAPIAVAILABLE, publishEvent.Events) + } // Check that the service is published for the provider result = testutil.NewRequest().Get("/"+apfId+"/service-apis/"+newApiId).Go(t, requestHandler) @@ -103,6 +111,13 @@ func TestPublishUnpublishService(t *testing.T) { // Check no services published result = testutil.NewRequest().Get("/"+apfId+"/service-apis/"+newApiId).Go(t, requestHandler) + if publishEvent, ok := waitForEvent(eventChannel, 1*time.Second); ok { + assert.Fail(t, "No event sent") + } else { + assert.Equal(t, *resultService.ApiId, (*publishEvent.EventDetail.ApiIds)[0]) + assert.Equal(t, eventsapi.CAPIFEventSERVICEAPIUNAVAILABLE, publishEvent.Events) + } + assert.Equal(t, http.StatusNotFound, result.Code()) } @@ -111,9 +126,9 @@ func TestPostUnpublishedServiceWithUnregisteredFunction(t *testing.T) { aefId := "aefId" serviceRegisterMock := serviceMocks.ServiceRegister{} serviceRegisterMock.On("GetAefsForPublisher", apfId).Return([]string{"otherAefId"}) - _, requestHandler := getEcho(&serviceRegisterMock, nil) + _, _, requestHandler := getEcho(&serviceRegisterMock, nil) - newServiceDescription := getServiceAPIDescription(aefId, "apiname", "description") + newServiceDescription := getServiceAPIDescription(aefId, "apiName", "description") // Publish a service result := testutil.NewRequest().Post("/"+apfId+"/service-apis").WithJsonBody(newServiceDescription).Go(t, requestHandler) @@ -133,7 +148,7 @@ func TestGetServices(t *testing.T) { aefId := "aefId" serviceRegisterMock := serviceMocks.ServiceRegister{} serviceRegisterMock.On("GetAefsForPublisher", apfId).Return([]string{aefId}) - _, requestHandler := getEcho(&serviceRegisterMock, nil) + _, _, requestHandler := getEcho(&serviceRegisterMock, nil) // Check no services published for provider result := testutil.NewRequest().Get("/"+apfId+"/service-apis").Go(t, requestHandler) @@ -163,7 +178,7 @@ func TestGetServices(t *testing.T) { } func TestGetPublishedServices(t *testing.T) { - serviceUnderTest := NewPublishService(nil, nil) + serviceUnderTest := NewPublishService(nil, nil, nil) profiles := make([]publishapi.AefProfile, 1) serviceDescription := publishapi.ServiceAPIDescription{ @@ -189,7 +204,7 @@ func TestUpdateDescription(t *testing.T) { serviceRegisterMock.On("GetAefsForPublisher", apfId).Return([]string{aefId, "otherAefId"}) helmManagerMock := helmMocks.HelmManager{} helmManagerMock.On("InstallHelmChart", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil) - serviceUnderTest, requestHandler := getEcho(&serviceRegisterMock, &helmManagerMock) + serviceUnderTest, eventChannel, requestHandler := getEcho(&serviceRegisterMock, &helmManagerMock) serviceDescription := getServiceAPIDescription(aefId, apiName, description) serviceDescription.ApiId = &serviceApiId @@ -208,9 +223,15 @@ func TestUpdateDescription(t *testing.T) { assert.NoError(t, err, "error unmarshaling response") assert.Equal(t, resultService.Description, &newDescription) + if publishEvent, ok := waitForEvent(eventChannel, 1*time.Second); ok { + assert.Fail(t, "No event sent") + } else { + assert.Equal(t, *resultService.ApiId, (*publishEvent.EventDetail.ApiIds)[0]) + assert.Equal(t, eventsapi.CAPIFEventSERVICEAPIUPDATE, publishEvent.Events) + } } -func getEcho(serviceRegister providermanagement.ServiceRegister, helmManager helmmanagement.HelmManager) (*PublishService, *echo.Echo) { +func getEcho(serviceRegister providermanagement.ServiceRegister, helmManager helmmanagement.HelmManager) (*PublishService, chan eventsapi.EventNotification, *echo.Echo) { swagger, err := publishapi.GetSwagger() if err != nil { fmt.Fprintf(os.Stderr, "Error loading swagger spec\n: %s", err) @@ -219,14 +240,15 @@ func getEcho(serviceRegister providermanagement.ServiceRegister, helmManager hel swagger.Servers = nil - ps := NewPublishService(serviceRegister, helmManager) + eventChannel := make(chan eventsapi.EventNotification) + ps := NewPublishService(serviceRegister, helmManager, eventChannel) e := echo.New() e.Use(echomiddleware.Logger()) e.Use(middleware.OapiRequestValidator(swagger)) publishapi.RegisterHandlers(e, ps) - return ps, e + return ps, eventChannel, e } func getServiceAPIDescription(aefId, apiName, description string) publishapi.ServiceAPIDescription { @@ -259,3 +281,14 @@ func getServiceAPIDescription(aefId, apiName, description string) publishapi.Ser Description: &description, } } + +// waitForEvent waits for the channel to receive an event for the specified max timeout. +// Returns true if waiting timed out. +func waitForEvent(ch chan eventsapi.EventNotification, timeout time.Duration) (*eventsapi.EventNotification, bool) { + select { + case event := <-ch: + return &event, false // completed normally + case <-time.After(timeout): + return nil, true // timed out + } +} diff --git a/capifcore/main.go b/capifcore/main.go index ddba0dd..ef15462 100644 --- a/capifcore/main.go +++ b/capifcore/main.go @@ -30,6 +30,7 @@ import ( "helm.sh/helm/v3/pkg/cli" "oransc.org/nonrtric/capifcore/internal/common29122" "oransc.org/nonrtric/capifcore/internal/discoverserviceapi" + "oransc.org/nonrtric/capifcore/internal/eventsapi" "oransc.org/nonrtric/capifcore/internal/invokermanagementapi" "oransc.org/nonrtric/capifcore/internal/providermanagementapi" "oransc.org/nonrtric/capifcore/internal/securityapi" @@ -38,6 +39,7 @@ import ( echomiddleware "github.com/labstack/echo/v4/middleware" log "github.com/sirupsen/logrus" "oransc.org/nonrtric/capifcore/internal/discoverservice" + "oransc.org/nonrtric/capifcore/internal/eventservice" "oransc.org/nonrtric/capifcore/internal/helmmanagement" "oransc.org/nonrtric/capifcore/internal/invokermanagement" "oransc.org/nonrtric/capifcore/internal/providermanagement" @@ -93,13 +95,25 @@ func getEcho() *echo.Echo { group.Use(middleware.OapiRequestValidator(providerManagerSwagger)) providermanagementapi.RegisterHandlersWithBaseURL(e, providerManager, "/api-provider-management/v1") + // Register EventService + eventServiceSwagger, err := eventsapi.GetSwagger() + if err != nil { + log.Fatalf("Error loading EventService swagger spec\n: %s", err) + } + eventServiceSwagger.Servers = nil + eventService := eventservice.NewEventService(&http.Client{}) + group = e.Group("/capif-events/v1") + group.Use(middleware.OapiRequestValidator(eventServiceSwagger)) + eventsapi.RegisterHandlersWithBaseURL(e, eventService, "/capif-events/v1") + eventChannel := eventService.GetNotificationChannel() + // Register PublishService publishServiceSwagger, err := publishserviceapi.GetSwagger() if err != nil { log.Fatalf("Error loading PublishService swagger spec\n: %s", err) } publishServiceSwagger.Servers = nil - publishService := publishservice.NewPublishService(providerManager, helmManager) + publishService := publishservice.NewPublishService(providerManager, helmManager, eventChannel) group = e.Group("/published-apis/v1") group.Use(middleware.OapiRequestValidator(publishServiceSwagger)) publishserviceapi.RegisterHandlersWithBaseURL(e, publishService, "/published-apis/v1") @@ -169,6 +183,8 @@ func getSwagger(c echo.Context) error { swagger, err = invokermanagementapi.GetSwagger() case "discover": swagger, err = discoverserviceapi.GetSwagger() + case "events": + swagger, err = eventsapi.GetSwagger() case "security": swagger, err = securityapi.GetSwagger() default: diff --git a/capifcore/main_test.go b/capifcore/main_test.go index 12092c1..f3a5f15 100644 --- a/capifcore/main_test.go +++ b/capifcore/main_test.go @@ -85,6 +85,14 @@ func Test_routing(t *testing.T) { method: "DELETE", }, }, + { + name: "Event path", + args: args{ + url: "/capif-events/v1/subscriberId/subscriptions/subId", + returnStatus: http.StatusNoContent, + method: "DELETE", + }, + }, { name: "Security path", args: args{ @@ -140,6 +148,13 @@ func TestGetSwagger(t *testing.T) { apiName: "Invoker", }, }, + { + name: "Events api", + args: args{ + apiPath: "events", + apiName: "Events", + }, + }, { name: "Discover api", args: args{ -- 2.16.6