Add event sending to publishservice 31/10031/1
authorelinuxhenrik <henrik.b.andersson@est.tech>
Thu, 8 Dec 2022 14:53:52 +0000 (15:53 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Thu, 8 Dec 2022 14:57:19 +0000 (15:57 +0100)
Issue-ID: NONRTRIC-814
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Ic708cfcbe50d28d49c58dc36536a7cfbfdba6211

capifcore/internal/publishservice/publishservice.go
capifcore/internal/publishservice/publishservice_test.go
capifcore/main.go
capifcore/main_test.go

index a4b84f1..17f4b28 100644 (file)
@@ -31,6 +31,7 @@ import (
        "k8s.io/utils/strings/slices"
 
        "oransc.org/nonrtric/capifcore/internal/common29122"
+       "oransc.org/nonrtric/capifcore/internal/eventsapi"
        publishapi "oransc.org/nonrtric/capifcore/internal/publishserviceapi"
 
        "oransc.org/nonrtric/capifcore/internal/helmmanagement"
@@ -56,15 +57,17 @@ type PublishService struct {
        publishedServices map[string][]publishapi.ServiceAPIDescription
        serviceRegister   providermanagement.ServiceRegister
        helmManager       helmmanagement.HelmManager
+       eventChannel      chan<- eventsapi.EventNotification
        lock              sync.Mutex
 }
 
 // Creates a service that implements both the PublishRegister and the publishserviceapi.ServerInterface interfaces.
-func NewPublishService(serviceRegister providermanagement.ServiceRegister, hm helmmanagement.HelmManager) *PublishService {
+func NewPublishService(serviceRegister providermanagement.ServiceRegister, hm helmmanagement.HelmManager, eventChannel chan<- eventsapi.EventNotification) *PublishService {
        return &PublishService{
                helmManager:       hm,
                publishedServices: make(map[string][]publishapi.ServiceAPIDescription),
                serviceRegister:   serviceRegister,
+               eventChannel:      eventChannel,
        }
 }
 
@@ -177,6 +180,7 @@ func (ps *PublishService) PostApfIdServiceApis(ctx echo.Context, apfId string) e
        if shouldReturn {
                return returnValue
        }
+       go ps.sendEvent(newServiceAPIDescription, eventsapi.CAPIFEventSERVICEAPIAVAILABLE)
 
        _, ok := ps.publishedServices[apfId]
        if ok {
@@ -222,6 +226,7 @@ func (ps *PublishService) DeleteApfIdServiceApisServiceApiId(ctx echo.Context, a
                        ps.lock.Lock()
                        defer ps.lock.Unlock()
                        ps.publishedServices[string(apfId)] = removeServiceDescription(pos, serviceDescriptions)
+                       go ps.sendEvent(*description, eventsapi.CAPIFEventSERVICEAPIUNAVAILABLE)
                }
        }
        return ctx.NoContent(http.StatusNoContent)
@@ -272,9 +277,6 @@ func (ps *PublishService) ModifyIndAPFPubAPI(ctx echo.Context, apfId string, ser
 
 // Update a published service API.
 func (ps *PublishService) PutApfIdServiceApisServiceApiId(ctx echo.Context, apfId string, serviceApiId string) error {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
        pos, publishedService, shouldReturn, returnValue := ps.checkIfServiceIsPublished(apfId, serviceApiId, ctx)
        if shouldReturn {
                return returnValue
@@ -286,8 +288,12 @@ func (ps *PublishService) PutApfIdServiceApisServiceApiId(ctx echo.Context, apfI
        }
 
        if updatedServiceDescription.Description != nil {
+               ps.lock.Lock()
+               defer ps.lock.Unlock()
+
                publishedService.Description = updatedServiceDescription.Description
                ps.publishedServices[apfId][pos] = publishedService
+               go ps.sendEvent(publishedService, eventsapi.CAPIFEventSERVICEAPIUPDATE)
        }
 
        err := ctx.JSON(http.StatusOK, ps.publishedServices[apfId][pos])
@@ -295,12 +301,10 @@ func (ps *PublishService) PutApfIdServiceApisServiceApiId(ctx echo.Context, apfI
                // Something really bad happened, tell Echo that our handler failed
                return err
        }
-
        return nil
 }
 
 func (ps *PublishService) checkIfServiceIsPublished(apfId string, serviceApiId string, ctx echo.Context) (int, publishapi.ServiceAPIDescription, bool, error) {
-
        publishedServices, ok := ps.publishedServices[apfId]
        if !ok {
                return 0, publishapi.ServiceAPIDescription{}, true, sendCoreError(ctx, http.StatusBadRequest, "Service must be published before updating it")
@@ -315,7 +319,6 @@ func (ps *PublishService) checkIfServiceIsPublished(apfId string, serviceApiId s
 
        }
        return 0, publishapi.ServiceAPIDescription{}, true, sendCoreError(ctx, http.StatusBadRequest, "Service must be published before updating it")
-
 }
 
 func getServiceFromRequest(ctx echo.Context) (publishapi.ServiceAPIDescription, bool, error) {
@@ -327,6 +330,19 @@ func getServiceFromRequest(ctx echo.Context) (publishapi.ServiceAPIDescription,
        return updatedServiceDescription, false, nil
 }
 
+func (ps *PublishService) sendEvent(service publishapi.ServiceAPIDescription, eventType eventsapi.CAPIFEvent) {
+       apiIds := []string{*service.ApiId}
+       apis := []publishapi.ServiceAPIDescription{service}
+       event := eventsapi.EventNotification{
+               EventDetail: &eventsapi.CAPIFEventDetail{
+                       ApiIds:                 &apiIds,
+                       ServiceAPIDescriptions: &apis,
+               },
+               Events: eventType,
+       }
+       ps.eventChannel <- event
+}
+
 // This function wraps sending of an error in the Error format, and
 // handling the failure to marshal that.
 func sendCoreError(ctx echo.Context, code int, message string) error {
index 7bb0152..cad151c 100644 (file)
@@ -25,8 +25,10 @@ import (
        "net/http"
        "os"
        "testing"
+       "time"
 
        "oransc.org/nonrtric/capifcore/internal/common29122"
+       "oransc.org/nonrtric/capifcore/internal/eventsapi"
        "oransc.org/nonrtric/capifcore/internal/providermanagement"
 
        "github.com/labstack/echo/v4"
@@ -51,7 +53,7 @@ func TestPublishUnpublishService(t *testing.T) {
        serviceRegisterMock.On("GetAefsForPublisher", apfId).Return([]string{aefId, "otherAefId"})
        helmManagerMock := helmMocks.HelmManager{}
        helmManagerMock.On("InstallHelmChart", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
-       serviceUnderTest, requestHandler := getEcho(&serviceRegisterMock, &helmManagerMock)
+       serviceUnderTest, eventChannel, requestHandler := getEcho(&serviceRegisterMock, &helmManagerMock)
 
        // Check no services published for provider
        result := testutil.NewRequest().Get("/"+apfId+"/service-apis").Go(t, requestHandler)
@@ -83,6 +85,12 @@ func TestPublishUnpublishService(t *testing.T) {
        serviceRegisterMock.AssertCalled(t, "GetAefsForPublisher", apfId)
        helmManagerMock.AssertCalled(t, "InstallHelmChart", namespace, repoName, chartName, releaseName)
        assert.ElementsMatch(t, []string{aefId}, serviceUnderTest.getAllAefIds())
+       if publishEvent, ok := waitForEvent(eventChannel, 1*time.Second); ok {
+               assert.Fail(t, "No event sent")
+       } else {
+               assert.Equal(t, *resultService.ApiId, (*publishEvent.EventDetail.ApiIds)[0])
+               assert.Equal(t, eventsapi.CAPIFEventSERVICEAPIAVAILABLE, publishEvent.Events)
+       }
 
        // Check that the service is published for the provider
        result = testutil.NewRequest().Get("/"+apfId+"/service-apis/"+newApiId).Go(t, requestHandler)
@@ -103,6 +111,13 @@ func TestPublishUnpublishService(t *testing.T) {
        // Check no services published
        result = testutil.NewRequest().Get("/"+apfId+"/service-apis/"+newApiId).Go(t, requestHandler)
 
+       if publishEvent, ok := waitForEvent(eventChannel, 1*time.Second); ok {
+               assert.Fail(t, "No event sent")
+       } else {
+               assert.Equal(t, *resultService.ApiId, (*publishEvent.EventDetail.ApiIds)[0])
+               assert.Equal(t, eventsapi.CAPIFEventSERVICEAPIUNAVAILABLE, publishEvent.Events)
+       }
+
        assert.Equal(t, http.StatusNotFound, result.Code())
 }
 
@@ -111,9 +126,9 @@ func TestPostUnpublishedServiceWithUnregisteredFunction(t *testing.T) {
        aefId := "aefId"
        serviceRegisterMock := serviceMocks.ServiceRegister{}
        serviceRegisterMock.On("GetAefsForPublisher", apfId).Return([]string{"otherAefId"})
-       _, requestHandler := getEcho(&serviceRegisterMock, nil)
+       _, _, requestHandler := getEcho(&serviceRegisterMock, nil)
 
-       newServiceDescription := getServiceAPIDescription(aefId, "apiname", "description")
+       newServiceDescription := getServiceAPIDescription(aefId, "apiName", "description")
 
        // Publish a service
        result := testutil.NewRequest().Post("/"+apfId+"/service-apis").WithJsonBody(newServiceDescription).Go(t, requestHandler)
@@ -133,7 +148,7 @@ func TestGetServices(t *testing.T) {
        aefId := "aefId"
        serviceRegisterMock := serviceMocks.ServiceRegister{}
        serviceRegisterMock.On("GetAefsForPublisher", apfId).Return([]string{aefId})
-       _, requestHandler := getEcho(&serviceRegisterMock, nil)
+       _, _, requestHandler := getEcho(&serviceRegisterMock, nil)
 
        // Check no services published for provider
        result := testutil.NewRequest().Get("/"+apfId+"/service-apis").Go(t, requestHandler)
@@ -163,7 +178,7 @@ func TestGetServices(t *testing.T) {
 }
 
 func TestGetPublishedServices(t *testing.T) {
-       serviceUnderTest := NewPublishService(nil, nil)
+       serviceUnderTest := NewPublishService(nil, nil, nil)
 
        profiles := make([]publishapi.AefProfile, 1)
        serviceDescription := publishapi.ServiceAPIDescription{
@@ -189,7 +204,7 @@ func TestUpdateDescription(t *testing.T) {
        serviceRegisterMock.On("GetAefsForPublisher", apfId).Return([]string{aefId, "otherAefId"})
        helmManagerMock := helmMocks.HelmManager{}
        helmManagerMock.On("InstallHelmChart", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
-       serviceUnderTest, requestHandler := getEcho(&serviceRegisterMock, &helmManagerMock)
+       serviceUnderTest, eventChannel, requestHandler := getEcho(&serviceRegisterMock, &helmManagerMock)
 
        serviceDescription := getServiceAPIDescription(aefId, apiName, description)
        serviceDescription.ApiId = &serviceApiId
@@ -208,9 +223,15 @@ func TestUpdateDescription(t *testing.T) {
        assert.NoError(t, err, "error unmarshaling response")
        assert.Equal(t, resultService.Description, &newDescription)
 
+       if publishEvent, ok := waitForEvent(eventChannel, 1*time.Second); ok {
+               assert.Fail(t, "No event sent")
+       } else {
+               assert.Equal(t, *resultService.ApiId, (*publishEvent.EventDetail.ApiIds)[0])
+               assert.Equal(t, eventsapi.CAPIFEventSERVICEAPIUPDATE, publishEvent.Events)
+       }
 }
 
-func getEcho(serviceRegister providermanagement.ServiceRegister, helmManager helmmanagement.HelmManager) (*PublishService, *echo.Echo) {
+func getEcho(serviceRegister providermanagement.ServiceRegister, helmManager helmmanagement.HelmManager) (*PublishService, chan eventsapi.EventNotification, *echo.Echo) {
        swagger, err := publishapi.GetSwagger()
        if err != nil {
                fmt.Fprintf(os.Stderr, "Error loading swagger spec\n: %s", err)
@@ -219,14 +240,15 @@ func getEcho(serviceRegister providermanagement.ServiceRegister, helmManager hel
 
        swagger.Servers = nil
 
-       ps := NewPublishService(serviceRegister, helmManager)
+       eventChannel := make(chan eventsapi.EventNotification)
+       ps := NewPublishService(serviceRegister, helmManager, eventChannel)
 
        e := echo.New()
        e.Use(echomiddleware.Logger())
        e.Use(middleware.OapiRequestValidator(swagger))
 
        publishapi.RegisterHandlers(e, ps)
-       return ps, e
+       return ps, eventChannel, e
 }
 
 func getServiceAPIDescription(aefId, apiName, description string) publishapi.ServiceAPIDescription {
@@ -259,3 +281,14 @@ func getServiceAPIDescription(aefId, apiName, description string) publishapi.Ser
                Description: &description,
        }
 }
+
+// waitForEvent waits for the channel to receive an event for the specified max timeout.
+// Returns true if waiting timed out.
+func waitForEvent(ch chan eventsapi.EventNotification, timeout time.Duration) (*eventsapi.EventNotification, bool) {
+       select {
+       case event := <-ch:
+               return &event, false // completed normally
+       case <-time.After(timeout):
+               return nil, true // timed out
+       }
+}
index ddba0dd..ef15462 100644 (file)
@@ -30,6 +30,7 @@ import (
        "helm.sh/helm/v3/pkg/cli"
        "oransc.org/nonrtric/capifcore/internal/common29122"
        "oransc.org/nonrtric/capifcore/internal/discoverserviceapi"
+       "oransc.org/nonrtric/capifcore/internal/eventsapi"
        "oransc.org/nonrtric/capifcore/internal/invokermanagementapi"
        "oransc.org/nonrtric/capifcore/internal/providermanagementapi"
        "oransc.org/nonrtric/capifcore/internal/securityapi"
@@ -38,6 +39,7 @@ import (
        echomiddleware "github.com/labstack/echo/v4/middleware"
        log "github.com/sirupsen/logrus"
        "oransc.org/nonrtric/capifcore/internal/discoverservice"
+       "oransc.org/nonrtric/capifcore/internal/eventservice"
        "oransc.org/nonrtric/capifcore/internal/helmmanagement"
        "oransc.org/nonrtric/capifcore/internal/invokermanagement"
        "oransc.org/nonrtric/capifcore/internal/providermanagement"
@@ -93,13 +95,25 @@ func getEcho() *echo.Echo {
        group.Use(middleware.OapiRequestValidator(providerManagerSwagger))
        providermanagementapi.RegisterHandlersWithBaseURL(e, providerManager, "/api-provider-management/v1")
 
+       // Register EventService
+       eventServiceSwagger, err := eventsapi.GetSwagger()
+       if err != nil {
+               log.Fatalf("Error loading EventService swagger spec\n: %s", err)
+       }
+       eventServiceSwagger.Servers = nil
+       eventService := eventservice.NewEventService(&http.Client{})
+       group = e.Group("/capif-events/v1")
+       group.Use(middleware.OapiRequestValidator(eventServiceSwagger))
+       eventsapi.RegisterHandlersWithBaseURL(e, eventService, "/capif-events/v1")
+       eventChannel := eventService.GetNotificationChannel()
+
        // Register PublishService
        publishServiceSwagger, err := publishserviceapi.GetSwagger()
        if err != nil {
                log.Fatalf("Error loading PublishService swagger spec\n: %s", err)
        }
        publishServiceSwagger.Servers = nil
-       publishService := publishservice.NewPublishService(providerManager, helmManager)
+       publishService := publishservice.NewPublishService(providerManager, helmManager, eventChannel)
        group = e.Group("/published-apis/v1")
        group.Use(middleware.OapiRequestValidator(publishServiceSwagger))
        publishserviceapi.RegisterHandlersWithBaseURL(e, publishService, "/published-apis/v1")
@@ -169,6 +183,8 @@ func getSwagger(c echo.Context) error {
                swagger, err = invokermanagementapi.GetSwagger()
        case "discover":
                swagger, err = discoverserviceapi.GetSwagger()
+       case "events":
+               swagger, err = eventsapi.GetSwagger()
        case "security":
                swagger, err = securityapi.GetSwagger()
        default:
index 12092c1..f3a5f15 100644 (file)
@@ -85,6 +85,14 @@ func Test_routing(t *testing.T) {
                                method:       "DELETE",
                        },
                },
+               {
+                       name: "Event path",
+                       args: args{
+                               url:          "/capif-events/v1/subscriberId/subscriptions/subId",
+                               returnStatus: http.StatusNoContent,
+                               method:       "DELETE",
+                       },
+               },
                {
                        name: "Security path",
                        args: args{
@@ -140,6 +148,13 @@ func TestGetSwagger(t *testing.T) {
                                apiName: "Invoker",
                        },
                },
+               {
+                       name: "Events api",
+                       args: args{
+                               apiPath: "events",
+                               apiName: "Events",
+                       },
+               },
                {
                        name: "Discover api",
                        args: args{