NONRTRIC-946: Capifcore - join on ApiName
[nonrtric/plt/sme.git] / capifcore / internal / publishservice / publishservice.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2022-2023: Nordix Foundation
6 //   Copyright (C) 2024: OpenInfra Foundation Europe
7 //   %%
8 //   Licensed under the Apache License, Version 2.0 (the "License");
9 //   you may not use this file except in compliance with the License.
10 //   You may obtain a copy of the License at
11 //
12 //        http://www.apache.org/licenses/LICENSE-2.0
13 //
14 //   Unless required by applicable law or agreed to in writing, software
15 //   distributed under the License is distributed on an "AS IS" BASIS,
16 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 //   See the License for the specific language governing permissions and
18 //   limitations under the License.
19 //   ========================LICENSE_END===================================
20 //
21
22 package publishservice
23
24 import (
25         "fmt"
26         "net/http"
27         "path"
28         "strings"
29         "sync"
30
31         echo "github.com/labstack/echo/v4"
32         "k8s.io/utils/strings/slices"
33
34         "oransc.org/nonrtric/capifcore/internal/common29122"
35         "oransc.org/nonrtric/capifcore/internal/eventsapi"
36         publishapi "oransc.org/nonrtric/capifcore/internal/publishserviceapi"
37
38         "oransc.org/nonrtric/capifcore/internal/helmmanagement"
39         "oransc.org/nonrtric/capifcore/internal/providermanagement"
40
41         log "github.com/sirupsen/logrus"
42 )
43
44 //go:generate mockery --name PublishRegister
45 type PublishRegister interface {
46         // Checks if the provided API is published.
47         // Returns true if the provided API has been published, false otherwise.
48         IsAPIPublished(aefId, path string) bool
49         // Gets all published APIs.
50         // Returns a list of all APIs that has been published.
51         GetAllPublishedServices() []publishapi.ServiceAPIDescription
52         GetAllowedPublishedServices(invokerApiList []publishapi.ServiceAPIDescription) []publishapi.ServiceAPIDescription
53 }
54
55 type PublishService struct {
56         publishedServices map[string][]publishapi.ServiceAPIDescription
57         serviceRegister   providermanagement.ServiceRegister
58         helmManager       helmmanagement.HelmManager
59         eventChannel      chan<- eventsapi.EventNotification
60         lock              sync.Mutex
61 }
62
63 // Creates a service that implements both the PublishRegister and the publishserviceapi.ServerInterface interfaces.
64 func NewPublishService(serviceRegister providermanagement.ServiceRegister, hm helmmanagement.HelmManager, eventChannel chan<- eventsapi.EventNotification) *PublishService {
65         return &PublishService{
66                 helmManager:       hm,
67                 publishedServices: make(map[string][]publishapi.ServiceAPIDescription),
68                 serviceRegister:   serviceRegister,
69                 eventChannel:      eventChannel,
70         }
71 }
72
73 func (ps *PublishService) getAllAefIds() []string {
74         ps.lock.Lock()
75         defer ps.lock.Unlock()
76
77         allIds := []string{}
78         for _, descriptions := range ps.publishedServices {
79                 for _, description := range descriptions {
80                         allIds = append(allIds, description.GetAefIds()...)
81                 }
82         }
83         return allIds
84 }
85
86 func (ps *PublishService) IsAPIPublished(aefId, path string) bool {
87         return slices.Contains(ps.getAllAefIds(), aefId)
88 }
89
90 func (ps *PublishService) GetAllPublishedServices() []publishapi.ServiceAPIDescription {
91         publishedDescriptions := []publishapi.ServiceAPIDescription{}
92         for _, descriptions := range ps.publishedServices {
93                 publishedDescriptions = append(publishedDescriptions, descriptions...)
94         }
95         return publishedDescriptions
96 }
97
98 func (ps *PublishService) GetAllowedPublishedServices(apiListRequestedServices []publishapi.ServiceAPIDescription) []publishapi.ServiceAPIDescription {
99         apiListAllPublished := ps.GetAllPublishedServices()
100         allowedPublishedServices := join(apiListAllPublished, apiListRequestedServices)
101         return allowedPublishedServices
102 }
103
104 func join(a, b []publishapi.ServiceAPIDescription) []publishapi.ServiceAPIDescription {
105         var result []publishapi.ServiceAPIDescription
106
107         if (a == nil) || (b == nil) || (len(a) == 0) || (len(b) == 0) {
108                 return result
109         }
110
111         for _, itemA := range a {
112                 for _, itemB := range b {
113                         if itemA.ApiName == itemB.ApiName {
114                                 result = append(result, itemA)
115                                 break
116                         }
117                 }
118         }
119         return result
120 }
121
122 // Retrieve all published APIs.
123 func (ps *PublishService) GetApfIdServiceApis(ctx echo.Context, apfId string) error {
124         if !ps.serviceRegister.IsPublishingFunctionRegistered(apfId) {
125                 errorMsg := fmt.Sprintf("Unable to get the service due to %s api is only available for publishers", apfId)
126                 return sendCoreError(ctx, http.StatusNotFound, errorMsg)
127         }
128
129         serviceDescriptions := ps.publishedServices[apfId]
130         err := ctx.JSON(http.StatusOK, serviceDescriptions)
131         if err != nil {
132                 // Something really bad happened, tell Echo that our handler failed
133                 return err
134         }
135         return nil
136 }
137
138 // Publish a new API.
139 func (ps *PublishService) PostApfIdServiceApis(ctx echo.Context, apfId string) error {
140         var newServiceAPIDescription publishapi.ServiceAPIDescription
141         errorMsg := "Unable to publish the service due to %s "
142         err := ctx.Bind(&newServiceAPIDescription)
143         if err != nil {
144                 return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errorMsg, "invalid format for service "+apfId))
145         }
146
147         if !ps.serviceRegister.IsPublishingFunctionRegistered(apfId) {
148                 return sendCoreError(ctx, http.StatusForbidden, fmt.Sprintf(errorMsg, "api is only available for publishers "+apfId))
149         }
150
151         if err := ps.isServicePublished(newServiceAPIDescription); err != nil {
152                 return sendCoreError(ctx, http.StatusForbidden, fmt.Sprintf(errorMsg, err))
153         }
154
155         if err := newServiceAPIDescription.Validate(); err != nil {
156                 return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errorMsg, err))
157         }
158         ps.lock.Lock()
159         defer ps.lock.Unlock()
160
161         registeredFuncs := ps.serviceRegister.GetAefsForPublisher(apfId)
162         for _, profile := range *newServiceAPIDescription.AefProfiles {
163                 if !slices.Contains(registeredFuncs, profile.AefId) {
164                         return sendCoreError(ctx, http.StatusNotFound, fmt.Sprintf(errorMsg, fmt.Sprintf("function %s not registered", profile.AefId)))
165                 }
166         }
167
168         newServiceAPIDescription.PrepareNewService()
169
170         shouldReturn, returnValue := ps.installHelmChart(newServiceAPIDescription, ctx)
171         if shouldReturn {
172                 return returnValue
173         }
174         go ps.sendEvent(newServiceAPIDescription, eventsapi.CAPIFEventSERVICEAPIAVAILABLE)
175
176         _, ok := ps.publishedServices[apfId]
177         if ok {
178                 ps.publishedServices[apfId] = append(ps.publishedServices[apfId], newServiceAPIDescription)
179         } else {
180                 ps.publishedServices[apfId] = append([]publishapi.ServiceAPIDescription{}, newServiceAPIDescription)
181         }
182
183         uri := ctx.Request().Host + ctx.Request().URL.String()
184         ctx.Response().Header().Set(echo.HeaderLocation, ctx.Scheme()+`://`+path.Join(uri, *newServiceAPIDescription.ApiId))
185         err = ctx.JSON(http.StatusCreated, newServiceAPIDescription)
186         if err != nil {
187                 // Something really bad happened, tell Echo that our handler failed
188                 return err
189         }
190
191         return nil
192 }
193
194 func (ps *PublishService) isServicePublished(newService publishapi.ServiceAPIDescription) error {
195         for _, services := range ps.publishedServices {
196                 for _, service := range services {
197                         if err := service.ValidateAlreadyPublished(newService); err != nil {
198                                 return err
199                         }
200                 }
201         }
202         return nil
203 }
204
205 func (ps *PublishService) installHelmChart(newServiceAPIDescription publishapi.ServiceAPIDescription, ctx echo.Context) (bool, error) {
206         info := strings.Split(*newServiceAPIDescription.Description, ",")
207         if len(info) == 5 {
208                 err := ps.helmManager.InstallHelmChart(info[1], info[2], info[3], info[4])
209                 if err != nil {
210                         return true, sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf("Unable to install Helm chart %s due to: %s", info[3], err.Error()))
211                 }
212                 log.Debug("Installed service: ", newServiceAPIDescription.ApiId)
213         }
214         return false, nil
215 }
216
217 // Unpublish a published service API.
218 func (ps *PublishService) DeleteApfIdServiceApisServiceApiId(ctx echo.Context, apfId string, serviceApiId string) error {
219         serviceDescriptions, ok := ps.publishedServices[string(apfId)]
220         if ok {
221                 pos, description := getServiceDescription(serviceApiId, serviceDescriptions)
222                 if description != nil {
223                         info := strings.Split(*description.Description, ",")
224                         if len(info) == 5 {
225                                 ps.helmManager.UninstallHelmChart(info[1], info[3])
226                                 log.Debug("Deleted service: ", serviceApiId)
227                         }
228                         ps.lock.Lock()
229                         ps.publishedServices[string(apfId)] = removeServiceDescription(pos, serviceDescriptions)
230                         ps.lock.Unlock()
231                         go ps.sendEvent(*description, eventsapi.CAPIFEventSERVICEAPIUNAVAILABLE)
232                 }
233         }
234         return ctx.NoContent(http.StatusNoContent)
235 }
236
237 // Retrieve a published service API.
238 func (ps *PublishService) GetApfIdServiceApisServiceApiId(ctx echo.Context, apfId string, serviceApiId string) error {
239         ps.lock.Lock()
240         serviceDescriptions, ok := ps.publishedServices[apfId]
241         ps.lock.Unlock()
242
243         if ok {
244                 _, serviceDescription := getServiceDescription(serviceApiId, serviceDescriptions)
245                 if serviceDescription == nil {
246                         return ctx.NoContent(http.StatusNotFound)
247                 }
248                 err := ctx.JSON(http.StatusOK, serviceDescription)
249                 if err != nil {
250                         // Something really bad happened, tell Echo that our handler failed
251                         return err
252                 }
253
254                 return nil
255         }
256         return ctx.NoContent(http.StatusNotFound)
257 }
258
259 func getServiceDescription(serviceApiId string, descriptions []publishapi.ServiceAPIDescription) (int, *publishapi.ServiceAPIDescription) {
260         for pos, description := range descriptions {
261                 // Check for nil as we had a failure here when running unit tests in parallel against a single Capifcore instance
262                 if (description.ApiId != nil) && (serviceApiId == *description.ApiId) {
263                         return pos, &description
264                 }
265         }
266         return -1, nil
267 }
268
269 func removeServiceDescription(i int, a []publishapi.ServiceAPIDescription) []publishapi.ServiceAPIDescription {
270         a[i] = a[len(a)-1]                               // Copy last element to index i.
271         a[len(a)-1] = publishapi.ServiceAPIDescription{} // Erase last element (write zero value).
272         a = a[:len(a)-1]                                 // Truncate slice.
273         return a
274 }
275
276 // Modify an existing published service API.
277 func (ps *PublishService) ModifyIndAPFPubAPI(ctx echo.Context, apfId string, serviceApiId string) error {
278         return ctx.NoContent(http.StatusNotImplemented)
279 }
280
281 // Update a published service API.
282 func (ps *PublishService) PutApfIdServiceApisServiceApiId(ctx echo.Context, apfId string, serviceApiId string) error {
283         ps.lock.Lock()
284         defer ps.lock.Unlock()
285         errMsg := "Unable to update service due to %s."
286
287         pos, publishedService, err := ps.checkIfServiceIsPublished(apfId, serviceApiId)
288         if err != nil {
289                 return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, err))
290         }
291
292         updatedServiceDescription, err := getServiceFromRequest(ctx)
293         if err != nil {
294                 return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, err))
295         }
296
297         // Additional validation for PUT
298         if (updatedServiceDescription.ApiId == nil) || (*updatedServiceDescription.ApiId != serviceApiId) {
299                 errDetail := "ServiceAPIDescription ApiId doesn't match path parameter"
300                 return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, errDetail))
301         }
302
303         err = ps.checkProfilesRegistered(apfId, *updatedServiceDescription.AefProfiles)
304         if err != nil {
305                 return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, err))
306         }
307
308         ps.updateDescription(&updatedServiceDescription, &publishedService)
309
310         publishedService.AefProfiles = updatedServiceDescription.AefProfiles
311         ps.publishedServices[apfId][pos] = publishedService
312
313         err = ctx.JSON(http.StatusOK, publishedService)
314         if err != nil {
315                 // Something really bad happened, tell Echo that our handler failed
316                 return err
317         }
318         return nil
319 }
320
321 func (ps *PublishService) checkIfServiceIsPublished(apfId string, serviceApiId string) (int, publishapi.ServiceAPIDescription, error) {
322         publishedServices, ok := ps.publishedServices[apfId]
323         if !ok {
324                 return 0, publishapi.ServiceAPIDescription{}, fmt.Errorf("service must be published before updating it")
325         } else {
326                 for pos, description := range publishedServices {
327                         if *description.ApiId == serviceApiId {
328                                 return pos, description, nil
329                         }
330                 }
331         }
332         return 0, publishapi.ServiceAPIDescription{}, fmt.Errorf("service must be published before updating it")
333 }
334
335 func getServiceFromRequest(ctx echo.Context) (publishapi.ServiceAPIDescription, error) {
336         var updatedServiceDescription publishapi.ServiceAPIDescription
337         err := ctx.Bind(&updatedServiceDescription)
338         if err != nil {
339                 return publishapi.ServiceAPIDescription{}, fmt.Errorf("invalid format for service")
340         }
341         return updatedServiceDescription, nil
342 }
343
344 func (ps *PublishService) updateDescription(updatedServiceDescription, publishedService *publishapi.ServiceAPIDescription) {
345         if updatedServiceDescription.Description != nil {
346                 publishedService.Description = updatedServiceDescription.Description
347                 go ps.sendEvent(*publishedService, eventsapi.CAPIFEventSERVICEAPIUPDATE)
348         }
349 }
350
351 func (ps *PublishService) sendEvent(service publishapi.ServiceAPIDescription, eventType eventsapi.CAPIFEvent) {
352         apiIds := []string{*service.ApiId}
353         apis := []publishapi.ServiceAPIDescription{service}
354         event := eventsapi.EventNotification{
355                 EventDetail: &eventsapi.CAPIFEventDetail{
356                         ApiIds:                 &apiIds,
357                         ServiceAPIDescriptions: &apis,
358                 },
359                 Events: eventType,
360         }
361         ps.eventChannel <- event
362 }
363
364 func (ps *PublishService) checkProfilesRegistered(apfId string, updatedProfiles []publishapi.AefProfile) error {
365         registeredFuncs := ps.serviceRegister.GetAefsForPublisher(apfId)
366         for _, profile := range updatedProfiles {
367                 if !slices.Contains(registeredFuncs, profile.AefId) {
368                         return fmt.Errorf("function %s not registered", profile.AefId)
369                 }
370         }
371         return nil
372 }
373
374 // This function wraps sending of an error in the Error format, and
375 // handling the failure to marshal that.
376 func sendCoreError(ctx echo.Context, code int, message string) error {
377         pd := common29122.ProblemDetails{
378                 Cause:  &message,
379                 Status: &code,
380         }
381         err := ctx.JSON(code, pd)
382         return err
383 }