d82b63d92fb30589fc3e8936daa572ef1ee6db06
[nonrtric/plt/sme.git] / capifcore / internal / eventservice / eventservice.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2022: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package eventservice
22
23 import (
24         "encoding/json"
25         "fmt"
26         "net/http"
27         "path"
28         "strconv"
29         "sync"
30
31         "github.com/labstack/echo/v4"
32         log "github.com/sirupsen/logrus"
33         "k8s.io/utils/strings/slices"
34         "oransc.org/nonrtric/capifcore/internal/common29122"
35         "oransc.org/nonrtric/capifcore/internal/eventsapi"
36         "oransc.org/nonrtric/capifcore/internal/restclient"
37 )
38
39 type EventService struct {
40         notificationChannel chan eventsapi.EventNotification
41         client              restclient.HTTPClient
42         subscriptions       map[string]eventsapi.EventSubscription
43         idCounter           uint
44         lock                sync.Mutex
45 }
46
47 func NewEventService(c restclient.HTTPClient) *EventService {
48         es := EventService{
49                 notificationChannel: make(chan eventsapi.EventNotification),
50                 client:              c,
51                 subscriptions:       make(map[string]eventsapi.EventSubscription),
52         }
53         es.start()
54         return &es
55 }
56
57 func (es *EventService) start() {
58         go es.handleIncomingEvents()
59 }
60
61 func (es *EventService) handleIncomingEvents() {
62         for event := range es.notificationChannel {
63                 es.handleEvent(event)
64         }
65 }
66 func (es *EventService) GetNotificationChannel() chan<- eventsapi.EventNotification {
67         return es.notificationChannel
68 }
69
70 func (es *EventService) PostSubscriberIdSubscriptions(ctx echo.Context, subscriberId string) error {
71         newSubscription, err := getEventSubscriptionFromRequest(ctx)
72         errMsg := "Unable to register subscription due to %s."
73         if err != nil {
74                 return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, err))
75         }
76         uri := ctx.Request().Host + ctx.Request().URL.String()
77         subId := es.getSubscriptionId(subscriberId)
78         es.addSubscription(subId, newSubscription)
79         ctx.Response().Header().Set(echo.HeaderLocation, ctx.Scheme()+`://`+path.Join(uri, subId))
80         err = ctx.JSON(http.StatusCreated, newSubscription)
81         if err != nil {
82                 // Something really bad happened, tell Echo that our handler failed
83                 return err
84         }
85
86         return nil
87 }
88
89 func (es *EventService) DeleteSubscriberIdSubscriptionsSubscriptionId(ctx echo.Context, subscriberId string, subscriptionId string) error {
90         es.lock.Lock()
91         defer es.lock.Unlock()
92
93         log.Debug(es.subscriptions)
94         if _, ok := es.subscriptions[subscriptionId]; ok {
95                 log.Debug("Deleting subscription", subscriptionId)
96                 delete(es.subscriptions, subscriptionId)
97         }
98
99         return ctx.NoContent(http.StatusNoContent)
100 }
101
102 func getEventSubscriptionFromRequest(ctx echo.Context) (eventsapi.EventSubscription, error) {
103         var subscription eventsapi.EventSubscription
104         err := ctx.Bind(&subscription)
105         if err != nil {
106                 return eventsapi.EventSubscription{}, fmt.Errorf("invalid format for subscription")
107         }
108         return subscription, nil
109 }
110
111 func (es *EventService) handleEvent(event eventsapi.EventNotification) {
112         subsIds := es.getMatchingSubs(event)
113         for _, subId := range subsIds {
114                 go es.sendEvent(event, subId)
115         }
116 }
117
118 func (es *EventService) sendEvent(event eventsapi.EventNotification, subscriptionId string) {
119         event.SubscriptionId = subscriptionId
120         e, _ := json.Marshal(event)
121         if error := restclient.Put(string(es.subscriptions[subscriptionId].NotificationDestination), []byte(e), es.client); error != nil {
122                 log.Error("Unable to send event")
123         }
124 }
125
126 func (es *EventService) getMatchingSubs(event eventsapi.EventNotification) []string {
127         matchingSubs := []string{}
128         es.lock.Lock()
129         defer es.lock.Unlock()
130         for subId, subInfo := range es.subscriptions {
131                 if slices.Contains(asStrings(subInfo.Events), string(event.Events)) {
132                         matchingSubs = append(matchingSubs, subId)
133                 }
134         }
135         return matchingSubs
136 }
137
138 func asStrings(events []eventsapi.CAPIFEvent) []string {
139         asStrings := make([]string, len(events))
140         for i, event := range events {
141                 asStrings[i] = string(event)
142         }
143         return asStrings
144 }
145
146 func (es *EventService) getSubscriptionId(subscriberId string) string {
147         es.idCounter++
148         return subscriberId + strconv.FormatUint(uint64(es.idCounter), 10)
149 }
150
151 func (es *EventService) addSubscription(subId string, subscription eventsapi.EventSubscription) {
152         es.lock.Lock()
153         es.subscriptions[subId] = subscription
154         es.lock.Unlock()
155 }
156
157 func (es *EventService) getSubscription(subId string) *eventsapi.EventSubscription {
158         es.lock.Lock()
159         defer es.lock.Unlock()
160         if sub, ok := es.subscriptions[subId]; ok {
161                 return &sub
162         } else {
163                 return nil
164         }
165 }
166
167 // This function wraps sending of an error in the Error format, and
168 // handling the failure to marshal that.
169 func sendCoreError(ctx echo.Context, code int, message string) error {
170         pd := common29122.ProblemDetails{
171                 Cause:  &message,
172                 Status: &code,
173         }
174         err := ctx.JSON(code, pd)
175         return err
176 }