Add apiInvokerId filtering for eventservice
[nonrtric/plt/sme.git] / capifcore / internal / eventservice / eventservice.go
index ae35cc0..a4fa079 100644 (file)
@@ -30,6 +30,7 @@ import (
 
        "github.com/labstack/echo/v4"
        log "github.com/sirupsen/logrus"
+       "k8s.io/utils/strings/slices"
        "oransc.org/nonrtric/capifcore/internal/common29122"
        "oransc.org/nonrtric/capifcore/internal/eventsapi"
        "oransc.org/nonrtric/capifcore/internal/restclient"
@@ -108,13 +109,77 @@ func getEventSubscriptionFromRequest(ctx echo.Context) (eventsapi.EventSubscript
 }
 
 func (es *EventService) handleEvent(event eventsapi.EventNotification) {
-       subscription := es.getSubscription(event.SubscriptionId)
-       if subscription != nil {
-               e, _ := json.Marshal(event)
-               if error := restclient.Put(string(subscription.NotificationDestination), []byte(e), es.client); error != nil {
-                       log.Error("Unable to send event")
+       subsIds := es.getMatchingSubs(event)
+       for _, subId := range subsIds {
+               go es.sendEvent(event, subId)
+       }
+}
+
+func (es *EventService) sendEvent(event eventsapi.EventNotification, subscriptionId string) {
+       event.SubscriptionId = subscriptionId
+       e, _ := json.Marshal(event)
+       if error := restclient.Put(string(es.subscriptions[subscriptionId].NotificationDestination), []byte(e), es.client); error != nil {
+               log.Error("Unable to send event")
+       }
+}
+
+func (es *EventService) getMatchingSubs(event eventsapi.EventNotification) []string {
+       es.lock.Lock()
+       defer es.lock.Unlock()
+       matchingTypeSubs := es.filterOnEventType(event)
+       matchingSubs := []string{}
+       for _, subId := range matchingTypeSubs {
+               subscription := es.subscriptions[subId]
+               if subscription.EventFilters == nil || event.EventDetail == nil {
+                       matchingSubs = append(matchingSubs, subId)
+                       break
+               }
+               if matchesFilters(*event.EventDetail.ApiIds, *subscription.EventFilters, getApiIdsFromFilter) &&
+                       matchesFilters(*event.EventDetail.ApiInvokerIds, *subscription.EventFilters, getInvokerIdsFromFilter) {
+                       matchingSubs = append(matchingSubs, subId)
                }
        }
+
+       return matchingSubs
+}
+
+func (es *EventService) filterOnEventType(event eventsapi.EventNotification) []string {
+       matchingSubs := []string{}
+       for subId, subInfo := range es.subscriptions {
+               if slices.Contains(asStrings(subInfo.Events), string(event.Events)) {
+                       matchingSubs = append(matchingSubs, subId)
+               }
+       }
+       return matchingSubs
+}
+
+func matchesFilters(eventIds []string, filters []eventsapi.CAPIFEventFilter, getIds func(eventsapi.CAPIFEventFilter) *[]string) bool {
+       if len(filters) == 0 {
+               return true
+       }
+       for _, id := range eventIds {
+               filter := filters[0]
+               if getIds(filter) == nil {
+                       return matchesFilters(eventIds, filters[1:], getIds)
+               }
+               return slices.Contains(*getIds(filter), id) && matchesFilters(eventIds, filters[1:], getIds)
+       }
+       return true
+}
+
+func getApiIdsFromFilter(filter eventsapi.CAPIFEventFilter) *[]string {
+       return filter.ApiIds
+}
+func getInvokerIdsFromFilter(filter eventsapi.CAPIFEventFilter) *[]string {
+       return filter.ApiInvokerIds
+}
+
+func asStrings(events []eventsapi.CAPIFEvent) []string {
+       asStrings := make([]string, len(events))
+       for i, event := range events {
+               asStrings[i] = string(event)
+       }
+       return asStrings
 }
 
 func (es *EventService) getSubscriptionId(subscriberId string) string {