X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=capifcore%2Finternal%2Feventservice%2Feventservice.go;h=a4fa0795e9fe4fd6382056d675168698cea154c9;hb=refs%2Fchanges%2F18%2F10118%2F1;hp=ae35cc0d7833fcb69a9cf483fa01544fd73aa06a;hpb=2effa31635f1b8349c02b4c1c546bfc67e8797d3;p=nonrtric%2Fplt%2Fsme.git diff --git a/capifcore/internal/eventservice/eventservice.go b/capifcore/internal/eventservice/eventservice.go index ae35cc0..a4fa079 100644 --- a/capifcore/internal/eventservice/eventservice.go +++ b/capifcore/internal/eventservice/eventservice.go @@ -30,6 +30,7 @@ import ( "github.com/labstack/echo/v4" log "github.com/sirupsen/logrus" + "k8s.io/utils/strings/slices" "oransc.org/nonrtric/capifcore/internal/common29122" "oransc.org/nonrtric/capifcore/internal/eventsapi" "oransc.org/nonrtric/capifcore/internal/restclient" @@ -108,13 +109,77 @@ func getEventSubscriptionFromRequest(ctx echo.Context) (eventsapi.EventSubscript } func (es *EventService) handleEvent(event eventsapi.EventNotification) { - subscription := es.getSubscription(event.SubscriptionId) - if subscription != nil { - e, _ := json.Marshal(event) - if error := restclient.Put(string(subscription.NotificationDestination), []byte(e), es.client); error != nil { - log.Error("Unable to send event") + subsIds := es.getMatchingSubs(event) + for _, subId := range subsIds { + go es.sendEvent(event, subId) + } +} + +func (es *EventService) sendEvent(event eventsapi.EventNotification, subscriptionId string) { + event.SubscriptionId = subscriptionId + e, _ := json.Marshal(event) + if error := restclient.Put(string(es.subscriptions[subscriptionId].NotificationDestination), []byte(e), es.client); error != nil { + log.Error("Unable to send event") + } +} + +func (es *EventService) getMatchingSubs(event eventsapi.EventNotification) []string { + es.lock.Lock() + defer es.lock.Unlock() + matchingTypeSubs := es.filterOnEventType(event) + matchingSubs := []string{} + for _, subId := range matchingTypeSubs { + subscription := es.subscriptions[subId] + if subscription.EventFilters == nil || event.EventDetail == nil { + matchingSubs = append(matchingSubs, subId) + break + } + if matchesFilters(*event.EventDetail.ApiIds, *subscription.EventFilters, getApiIdsFromFilter) && + matchesFilters(*event.EventDetail.ApiInvokerIds, *subscription.EventFilters, getInvokerIdsFromFilter) { + matchingSubs = append(matchingSubs, subId) } } + + return matchingSubs +} + +func (es *EventService) filterOnEventType(event eventsapi.EventNotification) []string { + matchingSubs := []string{} + for subId, subInfo := range es.subscriptions { + if slices.Contains(asStrings(subInfo.Events), string(event.Events)) { + matchingSubs = append(matchingSubs, subId) + } + } + return matchingSubs +} + +func matchesFilters(eventIds []string, filters []eventsapi.CAPIFEventFilter, getIds func(eventsapi.CAPIFEventFilter) *[]string) bool { + if len(filters) == 0 { + return true + } + for _, id := range eventIds { + filter := filters[0] + if getIds(filter) == nil { + return matchesFilters(eventIds, filters[1:], getIds) + } + return slices.Contains(*getIds(filter), id) && matchesFilters(eventIds, filters[1:], getIds) + } + return true +} + +func getApiIdsFromFilter(filter eventsapi.CAPIFEventFilter) *[]string { + return filter.ApiIds +} +func getInvokerIdsFromFilter(filter eventsapi.CAPIFEventFilter) *[]string { + return filter.ApiInvokerIds +} + +func asStrings(events []eventsapi.CAPIFEvent) []string { + asStrings := make([]string, len(events)) + for i, event := range events { + asStrings[i] = string(event) + } + return asStrings } func (es *EventService) getSubscriptionId(subscriberId string) string {