X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=capifcore%2Finternal%2Feventservice%2Feventservice.go;h=1d63a453ab63608e4888374e3082ae65f4203ec8;hb=refs%2Fchanges%2F19%2F10319%2F1;hp=d82b63d92fb30589fc3e8936daa572ef1ee6db06;hpb=fccbf5b13fcede6ec48796a04f41ab9f06e119b3;p=nonrtric%2Fplt%2Fsme.git diff --git a/capifcore/internal/eventservice/eventservice.go b/capifcore/internal/eventservice/eventservice.go index d82b63d..1d63a45 100644 --- a/capifcore/internal/eventservice/eventservice.go +++ b/capifcore/internal/eventservice/eventservice.go @@ -33,6 +33,7 @@ import ( "k8s.io/utils/strings/slices" "oransc.org/nonrtric/capifcore/internal/common29122" "oransc.org/nonrtric/capifcore/internal/eventsapi" + "oransc.org/nonrtric/capifcore/internal/publishserviceapi" "oransc.org/nonrtric/capifcore/internal/restclient" ) @@ -73,6 +74,11 @@ func (es *EventService) PostSubscriberIdSubscriptions(ctx echo.Context, subscrib if err != nil { return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, err)) } + + if err := newSubscription.Validate(); err != nil { + return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, err)) + } + uri := ctx.Request().Host + ctx.Request().URL.String() subId := es.getSubscriptionId(subscriberId) es.addSubscription(subId, newSubscription) @@ -87,18 +93,22 @@ func (es *EventService) PostSubscriberIdSubscriptions(ctx echo.Context, subscrib } func (es *EventService) DeleteSubscriberIdSubscriptionsSubscriptionId(ctx echo.Context, subscriberId string, subscriptionId string) error { - es.lock.Lock() - defer es.lock.Unlock() log.Debug(es.subscriptions) if _, ok := es.subscriptions[subscriptionId]; ok { - log.Debug("Deleting subscription", subscriptionId) - delete(es.subscriptions, subscriptionId) + es.deleteSubscription(subscriptionId) } return ctx.NoContent(http.StatusNoContent) } +func (es *EventService) deleteSubscription(subscriptionId string) { + log.Debug("Deleting subscription", subscriptionId) + es.lock.Lock() + defer es.lock.Unlock() + delete(es.subscriptions, subscriptionId) +} + func getEventSubscriptionFromRequest(ctx echo.Context) (eventsapi.EventSubscription, error) { var subscription eventsapi.EventSubscription err := ctx.Bind(&subscription) @@ -124,9 +134,26 @@ func (es *EventService) sendEvent(event eventsapi.EventNotification, subscriptio } func (es *EventService) getMatchingSubs(event eventsapi.EventNotification) []string { - matchingSubs := []string{} es.lock.Lock() defer es.lock.Unlock() + matchingTypeSubs := es.filterOnEventType(event) + matchingSubs := []string{} + for _, subId := range matchingTypeSubs { + subscription := es.subscriptions[subId] + if subscription.EventFilters == nil || event.EventDetail == nil { + matchingSubs = append(matchingSubs, subId) + } else if matchesFilters(event.EventDetail.ApiIds, *subscription.EventFilters, getApiIdsFromFilter) && + matchesFilters(event.EventDetail.ApiInvokerIds, *subscription.EventFilters, getInvokerIdsFromFilter) && + matchesFilters(getAefIdsFromEvent(event.EventDetail.ServiceAPIDescriptions), *subscription.EventFilters, getAefIdsFromFilter) { + matchingSubs = append(matchingSubs, subId) + } + } + + return matchingSubs +} + +func (es *EventService) filterOnEventType(event eventsapi.EventNotification) []string { + matchingSubs := []string{} for subId, subInfo := range es.subscriptions { if slices.Contains(asStrings(subInfo.Events), string(event.Events)) { matchingSubs = append(matchingSubs, subId) @@ -135,6 +162,50 @@ func (es *EventService) getMatchingSubs(event eventsapi.EventNotification) []str return matchingSubs } +func matchesFilters(eventIds *[]string, filters []eventsapi.CAPIFEventFilter, getIds func(eventsapi.CAPIFEventFilter) *[]string) bool { + if len(filters) == 0 || eventIds == nil { + return true + } + for _, id := range *eventIds { + filter := filters[0] + filterIds := getIds(filter) + if filterIds == nil || len(*filterIds) == 0 { + return matchesFilters(eventIds, filters[1:], getIds) + } else { + return slices.Contains(*getIds(filter), id) && matchesFilters(eventIds, filters[1:], getIds) + } + } + return true +} + +func getApiIdsFromFilter(filter eventsapi.CAPIFEventFilter) *[]string { + return filter.ApiIds +} + +func getInvokerIdsFromFilter(filter eventsapi.CAPIFEventFilter) *[]string { + return filter.ApiInvokerIds +} + +func getAefIdsFromEvent(serviceAPIDescriptions *[]publishserviceapi.ServiceAPIDescription) *[]string { + aefIds := []string{} + if serviceAPIDescriptions == nil { + return &aefIds + } + for _, serviceDescription := range *serviceAPIDescriptions { + if serviceDescription.AefProfiles == nil { + return &aefIds + } + for _, profile := range *serviceDescription.AefProfiles { + aefIds = append(aefIds, profile.AefId) + } + } + return &aefIds +} + +func getAefIdsFromFilter(filter eventsapi.CAPIFEventFilter) *[]string { + return filter.AefIds +} + func asStrings(events []eventsapi.CAPIFEvent) []string { asStrings := make([]string, len(events)) for i, event := range events { @@ -150,8 +221,8 @@ func (es *EventService) getSubscriptionId(subscriberId string) string { func (es *EventService) addSubscription(subId string, subscription eventsapi.EventSubscription) { es.lock.Lock() + defer es.lock.Unlock() es.subscriptions[subId] = subscription - es.lock.Unlock() } func (es *EventService) getSubscription(subId string) *eventsapi.EventSubscription {