X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=capifcore%2Finternal%2Fpublishservice%2Fpublishservice.go;h=670d2ed53c9c5e69712579913f3287eb005bc16d;hb=refs%2Fchanges%2F85%2F12585%2F1;hp=51a07117c94a8928bdfb40051e3de422fed2c901;hpb=c9e08b2a2f647f9f870040570c5e71305f0fb5d2;p=nonrtric%2Fplt%2Fsme.git diff --git a/capifcore/internal/publishservice/publishservice.go b/capifcore/internal/publishservice/publishservice.go index 51a0711..670d2ed 100644 --- a/capifcore/internal/publishservice/publishservice.go +++ b/capifcore/internal/publishservice/publishservice.go @@ -2,7 +2,8 @@ // ========================LICENSE_START================================= // O-RAN-SC // %% -// Copyright (C) 2022: Nordix Foundation +// Copyright (C) 2022-2023: Nordix Foundation +// Copyright (C) 2024: OpenInfra Foundation Europe // %% // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,15 +22,18 @@ package publishservice import ( + "fmt" "net/http" "path" "strings" "sync" - "github.com/labstack/echo/v4" + echo "github.com/labstack/echo/v4" + "k8s.io/utils/strings/slices" "oransc.org/nonrtric/capifcore/internal/common29122" - "oransc.org/nonrtric/capifcore/internal/publishserviceapi" + "oransc.org/nonrtric/capifcore/internal/eventsapi" + publishapi "oransc.org/nonrtric/capifcore/internal/publishserviceapi" "oransc.org/nonrtric/capifcore/internal/helmmanagement" "oransc.org/nonrtric/capifcore/internal/providermanagement" @@ -37,131 +41,144 @@ import ( log "github.com/sirupsen/logrus" ) -//go:generate mockery --name APIRegister -type APIRegister interface { - AreAPIsRegistered(serviceDescriptions *[]publishserviceapi.ServiceAPIDescription) bool - GetAPIs() *[]publishserviceapi.ServiceAPIDescription - IsAPIRegistered(aefId, path string) bool +//go:generate mockery --name PublishRegister +type PublishRegister interface { + // Checks if the provided API is published. + // Returns true if the provided API has been published, false otherwise. + IsAPIPublished(aefId, path string) bool + // Gets all published APIs. + // Returns a list of all APIs that has been published. + GetAllPublishedServices() []publishapi.ServiceAPIDescription + GetAllowedPublishedServices(invokerApiList []publishapi.ServiceAPIDescription) []publishapi.ServiceAPIDescription } type PublishService struct { - publishedServices map[string]publishserviceapi.ServiceAPIDescription + publishedServices map[string][]publishapi.ServiceAPIDescription serviceRegister providermanagement.ServiceRegister helmManager helmmanagement.HelmManager + eventChannel chan<- eventsapi.EventNotification lock sync.Mutex } -func NewPublishService(serviceRegister providermanagement.ServiceRegister, hm helmmanagement.HelmManager) *PublishService { +// Creates a service that implements both the PublishRegister and the publishserviceapi.ServerInterface interfaces. +func NewPublishService(serviceRegister providermanagement.ServiceRegister, hm helmmanagement.HelmManager, eventChannel chan<- eventsapi.EventNotification) *PublishService { return &PublishService{ helmManager: hm, - publishedServices: make(map[string]publishserviceapi.ServiceAPIDescription), + publishedServices: make(map[string][]publishapi.ServiceAPIDescription), serviceRegister: serviceRegister, + eventChannel: eventChannel, } } -func (ps *PublishService) AreAPIsRegistered(serviceDescriptions *[]publishserviceapi.ServiceAPIDescription) bool { +func (ps *PublishService) getAllAefIds() []string { ps.lock.Lock() defer ps.lock.Unlock() - allRegistered := true - if serviceDescriptions != nil { - out: - for _, newApi := range *serviceDescriptions { - registeredApi, ok := ps.publishedServices[*newApi.ApiId] - if ok { - if !ps.areProfilesRegistered(newApi.AefProfiles, registeredApi.AefProfiles) { - allRegistered = false - break out - } - } else { - allRegistered = false - break out - } + allIds := []string{} + for _, descriptions := range ps.publishedServices { + for _, description := range descriptions { + allIds = append(allIds, description.GetAefIds()...) } } - return allRegistered + return allIds } -func (ps *PublishService) areProfilesRegistered(newProfiles *[]publishserviceapi.AefProfile, registeredProfiles *[]publishserviceapi.AefProfile) bool { - allRegistered := true - if newProfiles != nil && registeredProfiles != nil { - out: - for _, newProfile := range *newProfiles { - for _, registeredProfile := range *registeredProfiles { - if newProfile.AefId == registeredProfile.AefId { - break - } - allRegistered = false - break out - } - } - } else if registeredProfiles == nil { - allRegistered = false - } - return allRegistered +func (ps *PublishService) IsAPIPublished(aefId, path string) bool { + return slices.Contains(ps.getAllAefIds(), aefId) } -func (ps *PublishService) GetAPIs() *[]publishserviceapi.ServiceAPIDescription { - ps.lock.Lock() - defer ps.lock.Unlock() - - apis := []publishserviceapi.ServiceAPIDescription{} - for _, service := range ps.publishedServices { - apis = append(apis, service) +func (ps *PublishService) GetAllPublishedServices() []publishapi.ServiceAPIDescription { + publishedDescriptions := []publishapi.ServiceAPIDescription{} + for _, descriptions := range ps.publishedServices { + publishedDescriptions = append(publishedDescriptions, descriptions...) } - return &apis + return publishedDescriptions } -func (ps *PublishService) IsAPIRegistered(aefId, path string) bool { - ps.lock.Lock() - defer ps.lock.Unlock() +func (ps *PublishService) GetAllowedPublishedServices(apiListRequestedServices []publishapi.ServiceAPIDescription) []publishapi.ServiceAPIDescription { + apiListAllPublished := ps.GetAllPublishedServices() + allowedPublishedServices := intersection(apiListAllPublished, apiListRequestedServices) + return allowedPublishedServices +} + +func intersection(a, b []publishapi.ServiceAPIDescription) []publishapi.ServiceAPIDescription { + var result []publishapi.ServiceAPIDescription + + if (a == nil) || (b == nil) || (len(a) == 0) || (len(b) == 0) { + return result + } - registered := false -out: - for _, service := range ps.publishedServices { - if service.ApiName == path { - for _, profile := range *service.AefProfiles { - if profile.AefId == aefId { - registered = true - break out - } + for _, itemA := range a { + for _, itemB := range b { + if (itemA.ApiId != nil) && (itemB.ApiId != nil) && (*itemA.ApiId == *itemB.ApiId) { + result = append(result, itemA) + break } } } - return registered + return result } -func (ps *PublishService) GetApfIdServiceApis(ctx echo.Context, apfId publishserviceapi.ApfId) error { - return ctx.NoContent(http.StatusNotImplemented) +// Retrieve all published APIs. +func (ps *PublishService) GetApfIdServiceApis(ctx echo.Context, apfId string) error { + if !ps.serviceRegister.IsPublishingFunctionRegistered(apfId) { + errorMsg := fmt.Sprintf("Unable to get the service due to %s api is only available for publishers", apfId) + return sendCoreError(ctx, http.StatusNotFound, errorMsg) + } + + serviceDescriptions := ps.publishedServices[apfId] + err := ctx.JSON(http.StatusOK, serviceDescriptions) + if err != nil { + // Something really bad happened, tell Echo that our handler failed + return err + } + return nil } -func (ps *PublishService) PostApfIdServiceApis(ctx echo.Context, apfId publishserviceapi.ApfId) error { - var newServiceAPIDescription publishserviceapi.ServiceAPIDescription +// Publish a new API. +func (ps *PublishService) PostApfIdServiceApis(ctx echo.Context, apfId string) error { + var newServiceAPIDescription publishapi.ServiceAPIDescription + errorMsg := "Unable to publish the service due to %s " err := ctx.Bind(&newServiceAPIDescription) if err != nil { - return sendCoreError(ctx, http.StatusBadRequest, "Invalid format for service") + return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errorMsg, "invalid format for service "+apfId)) + } + + if !ps.serviceRegister.IsPublishingFunctionRegistered(apfId) { + return sendCoreError(ctx, http.StatusForbidden, fmt.Sprintf(errorMsg, "api is only available for publishers "+apfId)) + } + + if err := ps.isServicePublished(newServiceAPIDescription); err != nil { + return sendCoreError(ctx, http.StatusForbidden, fmt.Sprintf(errorMsg, err)) } + if err := newServiceAPIDescription.Validate(); err != nil { + return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errorMsg, err)) + } ps.lock.Lock() defer ps.lock.Unlock() + registeredFuncs := ps.serviceRegister.GetAefsForPublisher(apfId) for _, profile := range *newServiceAPIDescription.AefProfiles { - if !ps.serviceRegister.IsFunctionRegistered(profile.AefId) { - return sendCoreError(ctx, http.StatusNotFound, "Function not registered, "+profile.AefId) + if !slices.Contains(registeredFuncs, profile.AefId) { + return sendCoreError(ctx, http.StatusNotFound, fmt.Sprintf(errorMsg, fmt.Sprintf("function %s not registered", profile.AefId))) } } - newId := "api_id_" + newServiceAPIDescription.ApiName - newServiceAPIDescription.ApiId = &newId - info := strings.Split(*newServiceAPIDescription.Description, ",") - if len(info) == 5 { - err = ps.helmManager.InstallHelmChart(info[1], info[2], info[3], info[4]) - if err != nil { - return sendCoreError(ctx, http.StatusBadRequest, "Unable to install Helm chart due to: "+err.Error()) - } - log.Info("Installed service: ", newId) + newServiceAPIDescription.PrepareNewService() + + shouldReturn, returnValue := ps.installHelmChart(newServiceAPIDescription, ctx) + if shouldReturn { + return returnValue + } + go ps.sendEvent(newServiceAPIDescription, eventsapi.CAPIFEventSERVICEAPIAVAILABLE) + + _, ok := ps.publishedServices[apfId] + if ok { + ps.publishedServices[apfId] = append(ps.publishedServices[apfId], newServiceAPIDescription) + } else { + ps.publishedServices[apfId] = append([]publishapi.ServiceAPIDescription{}, newServiceAPIDescription) } - ps.publishedServices[*newServiceAPIDescription.ApiId] = newServiceAPIDescription uri := ctx.Request().Host + ctx.Request().URL.String() ctx.Response().Header().Set(echo.HeaderLocation, ctx.Scheme()+`://`+path.Join(uri, *newServiceAPIDescription.ApiId)) @@ -174,22 +191,60 @@ func (ps *PublishService) PostApfIdServiceApis(ctx echo.Context, apfId publishse return nil } -func (ps *PublishService) DeleteApfIdServiceApisServiceApiId(ctx echo.Context, apfId publishserviceapi.ApfId, serviceApiId publishserviceapi.ServiceApiId) error { - serviceDescription, ok := ps.publishedServices[string(serviceApiId)] +func (ps *PublishService) isServicePublished(newService publishapi.ServiceAPIDescription) error { + for _, services := range ps.publishedServices { + for _, service := range services { + if err := service.ValidateAlreadyPublished(newService); err != nil { + return err + } + } + } + return nil +} + +func (ps *PublishService) installHelmChart(newServiceAPIDescription publishapi.ServiceAPIDescription, ctx echo.Context) (bool, error) { + info := strings.Split(*newServiceAPIDescription.Description, ",") + if len(info) == 5 { + err := ps.helmManager.InstallHelmChart(info[1], info[2], info[3], info[4]) + if err != nil { + return true, sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf("Unable to install Helm chart %s due to: %s", info[3], err.Error())) + } + log.Debug("Installed service: ", newServiceAPIDescription.ApiId) + } + return false, nil +} + +// Unpublish a published service API. +func (ps *PublishService) DeleteApfIdServiceApisServiceApiId(ctx echo.Context, apfId string, serviceApiId string) error { + serviceDescriptions, ok := ps.publishedServices[string(apfId)] if ok { - info := strings.Split(*serviceDescription.Description, ",") - if len(info) == 5 { - ps.helmManager.UninstallHelmChart(info[1], info[3]) - log.Info("Deleted service: ", serviceApiId) + pos, description := getServiceDescription(serviceApiId, serviceDescriptions) + if description != nil { + info := strings.Split(*description.Description, ",") + if len(info) == 5 { + ps.helmManager.UninstallHelmChart(info[1], info[3]) + log.Debug("Deleted service: ", serviceApiId) + } + ps.lock.Lock() + ps.publishedServices[string(apfId)] = removeServiceDescription(pos, serviceDescriptions) + ps.lock.Unlock() + go ps.sendEvent(*description, eventsapi.CAPIFEventSERVICEAPIUNAVAILABLE) } - delete(ps.publishedServices, string(serviceApiId)) } return ctx.NoContent(http.StatusNoContent) } -func (ps *PublishService) GetApfIdServiceApisServiceApiId(ctx echo.Context, apfId publishserviceapi.ApfId, serviceApiId publishserviceapi.ServiceApiId) error { - serviceDescription, ok := ps.publishedServices[string(serviceApiId)] +// Retrieve a published service API. +func (ps *PublishService) GetApfIdServiceApisServiceApiId(ctx echo.Context, apfId string, serviceApiId string) error { + ps.lock.Lock() + serviceDescriptions, ok := ps.publishedServices[apfId] + ps.lock.Unlock() + if ok { + _, serviceDescription := getServiceDescription(serviceApiId, serviceDescriptions) + if serviceDescription == nil { + return ctx.NoContent(http.StatusNotFound) + } err := ctx.JSON(http.StatusOK, serviceDescription) if err != nil { // Something really bad happened, tell Echo that our handler failed @@ -201,14 +256,121 @@ func (ps *PublishService) GetApfIdServiceApisServiceApiId(ctx echo.Context, apfI return ctx.NoContent(http.StatusNotFound) } -func (ps *PublishService) ModifyIndAPFPubAPI(ctx echo.Context, apfId publishserviceapi.ApfId, serviceApiId publishserviceapi.ServiceApiId) error { - return ctx.NoContent(http.StatusNotImplemented) +func getServiceDescription(serviceApiId string, descriptions []publishapi.ServiceAPIDescription) (int, *publishapi.ServiceAPIDescription) { + for pos, description := range descriptions { + // Check for nil as we had a failure here when running unit tests in parallel against a single Capifcore instance + if (description.ApiId != nil) && (serviceApiId == *description.ApiId) { + return pos, &description + } + } + return -1, nil } -func (ps *PublishService) PutApfIdServiceApisServiceApiId(ctx echo.Context, apfId publishserviceapi.ApfId, serviceApiId publishserviceapi.ServiceApiId) error { +func removeServiceDescription(i int, a []publishapi.ServiceAPIDescription) []publishapi.ServiceAPIDescription { + a[i] = a[len(a)-1] // Copy last element to index i. + a[len(a)-1] = publishapi.ServiceAPIDescription{} // Erase last element (write zero value). + a = a[:len(a)-1] // Truncate slice. + return a +} + +// Modify an existing published service API. +func (ps *PublishService) ModifyIndAPFPubAPI(ctx echo.Context, apfId string, serviceApiId string) error { return ctx.NoContent(http.StatusNotImplemented) } +// Update a published service API. +func (ps *PublishService) PutApfIdServiceApisServiceApiId(ctx echo.Context, apfId string, serviceApiId string) error { + ps.lock.Lock() + defer ps.lock.Unlock() + errMsg := "Unable to update service due to %s." + + pos, publishedService, err := ps.checkIfServiceIsPublished(apfId, serviceApiId) + if err != nil { + return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, err)) + } + + updatedServiceDescription, err := getServiceFromRequest(ctx) + if err != nil { + return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, err)) + } + + // Additional validation for PUT + if (updatedServiceDescription.ApiId == nil) || (*updatedServiceDescription.ApiId != serviceApiId) { + errDetail := "ServiceAPIDescription ApiId doesn't match path parameter" + return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, errDetail)) + } + + err = ps.checkProfilesRegistered(apfId, *updatedServiceDescription.AefProfiles) + if err != nil { + return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, err)) + } + + ps.updateDescription(&updatedServiceDescription, &publishedService) + + publishedService.AefProfiles = updatedServiceDescription.AefProfiles + ps.publishedServices[apfId][pos] = publishedService + + err = ctx.JSON(http.StatusOK, publishedService) + if err != nil { + // Something really bad happened, tell Echo that our handler failed + return err + } + return nil +} + +func (ps *PublishService) checkIfServiceIsPublished(apfId string, serviceApiId string) (int, publishapi.ServiceAPIDescription, error) { + publishedServices, ok := ps.publishedServices[apfId] + if !ok { + return 0, publishapi.ServiceAPIDescription{}, fmt.Errorf("service must be published before updating it") + } else { + for pos, description := range publishedServices { + if *description.ApiId == serviceApiId { + return pos, description, nil + } + } + } + return 0, publishapi.ServiceAPIDescription{}, fmt.Errorf("service must be published before updating it") +} + +func getServiceFromRequest(ctx echo.Context) (publishapi.ServiceAPIDescription, error) { + var updatedServiceDescription publishapi.ServiceAPIDescription + err := ctx.Bind(&updatedServiceDescription) + if err != nil { + return publishapi.ServiceAPIDescription{}, fmt.Errorf("invalid format for service") + } + return updatedServiceDescription, nil +} + +func (ps *PublishService) updateDescription(updatedServiceDescription, publishedService *publishapi.ServiceAPIDescription) { + if updatedServiceDescription.Description != nil { + publishedService.Description = updatedServiceDescription.Description + go ps.sendEvent(*publishedService, eventsapi.CAPIFEventSERVICEAPIUPDATE) + } +} + +func (ps *PublishService) sendEvent(service publishapi.ServiceAPIDescription, eventType eventsapi.CAPIFEvent) { + apiIds := []string{*service.ApiId} + apis := []publishapi.ServiceAPIDescription{service} + event := eventsapi.EventNotification{ + EventDetail: &eventsapi.CAPIFEventDetail{ + ApiIds: &apiIds, + ServiceAPIDescriptions: &apis, + }, + Events: eventType, + } + ps.eventChannel <- event +} + +func (ps *PublishService) checkProfilesRegistered(apfId string, updatedProfiles []publishapi.AefProfile) error { + registeredFuncs := ps.serviceRegister.GetAefsForPublisher(apfId) + for _, profile := range updatedProfiles { + if !slices.Contains(registeredFuncs, profile.AefId) { + return fmt.Errorf("function %s not registered", profile.AefId) + } + } + return nil +} + // This function wraps sending of an error in the Error format, and // handling the failure to marshal that. func sendCoreError(ctx echo.Context, code int, message string) error {