Add apiInvokerId filtering for eventservice
[nonrtric/plt/sme.git] / capifcore / internal / eventservice / eventservice.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2022: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package eventservice
22
23 import (
24         "encoding/json"
25         "fmt"
26         "net/http"
27         "path"
28         "strconv"
29         "sync"
30
31         "github.com/labstack/echo/v4"
32         log "github.com/sirupsen/logrus"
33         "k8s.io/utils/strings/slices"
34         "oransc.org/nonrtric/capifcore/internal/common29122"
35         "oransc.org/nonrtric/capifcore/internal/eventsapi"
36         "oransc.org/nonrtric/capifcore/internal/restclient"
37 )
38
39 type EventService struct {
40         notificationChannel chan eventsapi.EventNotification
41         client              restclient.HTTPClient
42         subscriptions       map[string]eventsapi.EventSubscription
43         idCounter           uint
44         lock                sync.Mutex
45 }
46
47 func NewEventService(c restclient.HTTPClient) *EventService {
48         es := EventService{
49                 notificationChannel: make(chan eventsapi.EventNotification),
50                 client:              c,
51                 subscriptions:       make(map[string]eventsapi.EventSubscription),
52         }
53         es.start()
54         return &es
55 }
56
57 func (es *EventService) start() {
58         go es.handleIncomingEvents()
59 }
60
61 func (es *EventService) handleIncomingEvents() {
62         for event := range es.notificationChannel {
63                 es.handleEvent(event)
64         }
65 }
66 func (es *EventService) GetNotificationChannel() chan<- eventsapi.EventNotification {
67         return es.notificationChannel
68 }
69
70 func (es *EventService) PostSubscriberIdSubscriptions(ctx echo.Context, subscriberId string) error {
71         newSubscription, err := getEventSubscriptionFromRequest(ctx)
72         errMsg := "Unable to register subscription due to %s."
73         if err != nil {
74                 return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, err))
75         }
76         uri := ctx.Request().Host + ctx.Request().URL.String()
77         subId := es.getSubscriptionId(subscriberId)
78         es.addSubscription(subId, newSubscription)
79         ctx.Response().Header().Set(echo.HeaderLocation, ctx.Scheme()+`://`+path.Join(uri, subId))
80         err = ctx.JSON(http.StatusCreated, newSubscription)
81         if err != nil {
82                 // Something really bad happened, tell Echo that our handler failed
83                 return err
84         }
85
86         return nil
87 }
88
89 func (es *EventService) DeleteSubscriberIdSubscriptionsSubscriptionId(ctx echo.Context, subscriberId string, subscriptionId string) error {
90         es.lock.Lock()
91         defer es.lock.Unlock()
92
93         log.Debug(es.subscriptions)
94         if _, ok := es.subscriptions[subscriptionId]; ok {
95                 log.Debug("Deleting subscription", subscriptionId)
96                 delete(es.subscriptions, subscriptionId)
97         }
98
99         return ctx.NoContent(http.StatusNoContent)
100 }
101
102 func getEventSubscriptionFromRequest(ctx echo.Context) (eventsapi.EventSubscription, error) {
103         var subscription eventsapi.EventSubscription
104         err := ctx.Bind(&subscription)
105         if err != nil {
106                 return eventsapi.EventSubscription{}, fmt.Errorf("invalid format for subscription")
107         }
108         return subscription, nil
109 }
110
111 func (es *EventService) handleEvent(event eventsapi.EventNotification) {
112         subsIds := es.getMatchingSubs(event)
113         for _, subId := range subsIds {
114                 go es.sendEvent(event, subId)
115         }
116 }
117
118 func (es *EventService) sendEvent(event eventsapi.EventNotification, subscriptionId string) {
119         event.SubscriptionId = subscriptionId
120         e, _ := json.Marshal(event)
121         if error := restclient.Put(string(es.subscriptions[subscriptionId].NotificationDestination), []byte(e), es.client); error != nil {
122                 log.Error("Unable to send event")
123         }
124 }
125
126 func (es *EventService) getMatchingSubs(event eventsapi.EventNotification) []string {
127         es.lock.Lock()
128         defer es.lock.Unlock()
129         matchingTypeSubs := es.filterOnEventType(event)
130         matchingSubs := []string{}
131         for _, subId := range matchingTypeSubs {
132                 subscription := es.subscriptions[subId]
133                 if subscription.EventFilters == nil || event.EventDetail == nil {
134                         matchingSubs = append(matchingSubs, subId)
135                         break
136                 }
137                 if matchesFilters(*event.EventDetail.ApiIds, *subscription.EventFilters, getApiIdsFromFilter) &&
138                         matchesFilters(*event.EventDetail.ApiInvokerIds, *subscription.EventFilters, getInvokerIdsFromFilter) {
139                         matchingSubs = append(matchingSubs, subId)
140                 }
141         }
142
143         return matchingSubs
144 }
145
146 func (es *EventService) filterOnEventType(event eventsapi.EventNotification) []string {
147         matchingSubs := []string{}
148         for subId, subInfo := range es.subscriptions {
149                 if slices.Contains(asStrings(subInfo.Events), string(event.Events)) {
150                         matchingSubs = append(matchingSubs, subId)
151                 }
152         }
153         return matchingSubs
154 }
155
156 func matchesFilters(eventIds []string, filters []eventsapi.CAPIFEventFilter, getIds func(eventsapi.CAPIFEventFilter) *[]string) bool {
157         if len(filters) == 0 {
158                 return true
159         }
160         for _, id := range eventIds {
161                 filter := filters[0]
162                 if getIds(filter) == nil {
163                         return matchesFilters(eventIds, filters[1:], getIds)
164                 }
165                 return slices.Contains(*getIds(filter), id) && matchesFilters(eventIds, filters[1:], getIds)
166         }
167         return true
168 }
169
170 func getApiIdsFromFilter(filter eventsapi.CAPIFEventFilter) *[]string {
171         return filter.ApiIds
172 }
173 func getInvokerIdsFromFilter(filter eventsapi.CAPIFEventFilter) *[]string {
174         return filter.ApiInvokerIds
175 }
176
177 func asStrings(events []eventsapi.CAPIFEvent) []string {
178         asStrings := make([]string, len(events))
179         for i, event := range events {
180                 asStrings[i] = string(event)
181         }
182         return asStrings
183 }
184
185 func (es *EventService) getSubscriptionId(subscriberId string) string {
186         es.idCounter++
187         return subscriberId + strconv.FormatUint(uint64(es.idCounter), 10)
188 }
189
190 func (es *EventService) addSubscription(subId string, subscription eventsapi.EventSubscription) {
191         es.lock.Lock()
192         es.subscriptions[subId] = subscription
193         es.lock.Unlock()
194 }
195
196 func (es *EventService) getSubscription(subId string) *eventsapi.EventSubscription {
197         es.lock.Lock()
198         defer es.lock.Unlock()
199         if sub, ok := es.subscriptions[subId]; ok {
200                 return &sub
201         } else {
202                 return nil
203         }
204 }
205
206 // This function wraps sending of an error in the Error format, and
207 // handling the failure to marshal that.
208 func sendCoreError(ctx echo.Context, code int, message string) error {
209         pd := common29122.ProblemDetails{
210                 Cause:  &message,
211                 Status: &code,
212         }
213         err := ctx.JSON(code, pd)
214         return err
215 }