From: elinuxhenrik Date: Fri, 9 Dec 2022 10:20:26 +0000 (+0100) Subject: Add event type matching for sending events X-Git-Tag: 1.0.0~4 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=fccbf5b13fcede6ec48796a04f41ab9f06e119b3;p=nonrtric%2Fplt%2Fsme.git Add event type matching for sending events Issue-ID: NONRTRIC-814 Signed-off-by: elinuxhenrik Change-Id: Ie796b3bdd77f5f566cd3dea1f8463e7ba7f213d3 --- diff --git a/capifcore/internal/eventservice/eventservice.go b/capifcore/internal/eventservice/eventservice.go index ae35cc0..d82b63d 100644 --- a/capifcore/internal/eventservice/eventservice.go +++ b/capifcore/internal/eventservice/eventservice.go @@ -30,6 +30,7 @@ import ( "github.com/labstack/echo/v4" log "github.com/sirupsen/logrus" + "k8s.io/utils/strings/slices" "oransc.org/nonrtric/capifcore/internal/common29122" "oransc.org/nonrtric/capifcore/internal/eventsapi" "oransc.org/nonrtric/capifcore/internal/restclient" @@ -108,13 +109,38 @@ func getEventSubscriptionFromRequest(ctx echo.Context) (eventsapi.EventSubscript } func (es *EventService) handleEvent(event eventsapi.EventNotification) { - subscription := es.getSubscription(event.SubscriptionId) - if subscription != nil { - e, _ := json.Marshal(event) - if error := restclient.Put(string(subscription.NotificationDestination), []byte(e), es.client); error != nil { - log.Error("Unable to send event") + subsIds := es.getMatchingSubs(event) + for _, subId := range subsIds { + go es.sendEvent(event, subId) + } +} + +func (es *EventService) sendEvent(event eventsapi.EventNotification, subscriptionId string) { + event.SubscriptionId = subscriptionId + e, _ := json.Marshal(event) + if error := restclient.Put(string(es.subscriptions[subscriptionId].NotificationDestination), []byte(e), es.client); error != nil { + log.Error("Unable to send event") + } +} + +func (es *EventService) getMatchingSubs(event eventsapi.EventNotification) []string { + matchingSubs := []string{} + es.lock.Lock() + defer es.lock.Unlock() + for subId, subInfo := range es.subscriptions { + if slices.Contains(asStrings(subInfo.Events), string(event.Events)) { + matchingSubs = append(matchingSubs, subId) } } + return matchingSubs +} + +func asStrings(events []eventsapi.CAPIFEvent) []string { + asStrings := make([]string, len(events)) + for i, event := range events { + asStrings[i] = string(event) + } + return asStrings } func (es *EventService) getSubscriptionId(subscriberId string) string { diff --git a/capifcore/internal/eventservice/eventservice_test.go b/capifcore/internal/eventservice/eventservice_test.go index ec49ec0..cd03ba7 100644 --- a/capifcore/internal/eventservice/eventservice_test.go +++ b/capifcore/internal/eventservice/eventservice_test.go @@ -97,7 +97,6 @@ func TestSendEvent(t *testing.T) { apiIds := []string{"apiId"} subId := "sub1" newEvent := eventsapi.EventNotification{ - SubscriptionId: subId, EventDetail: &eventsapi.CAPIFEventDetail{ ApiIds: &apiIds, }, @@ -108,6 +107,7 @@ func TestSendEvent(t *testing.T) { if req.URL.String() == notificationUrl { assert.Equal(t, req.Method, "PUT") assert.Equal(t, "application/json", req.Header.Get("Content-Type")) + newEvent.SubscriptionId = subId assert.Equal(t, newEvent, getBodyAsEvent(req, t)) wg.Done() return &http.Response{ @@ -129,6 +129,13 @@ func TestSendEvent(t *testing.T) { NotificationDestination: common29122.Uri(notificationUrl), } serviceUnderTest.addSubscription(subId, subscription) + sub2 := eventsapi.EventSubscription{ + Events: []eventsapi.CAPIFEvent{ + eventsapi.CAPIFEventACCESSCONTROLPOLICYUNAVAILABLE, + }, + NotificationDestination: common29122.Uri(notificationUrl), + } + serviceUnderTest.addSubscription("other", sub2) wg.Add(1) go func() { @@ -139,7 +146,35 @@ func TestSendEvent(t *testing.T) { t.Error("Not all calls to server were made") t.Fail() } +} + +func TestMatchEventType(t *testing.T) { + notificationUrl := "url" + subId := "sub1" + serviceUnderTest := NewEventService(nil) + subscription := eventsapi.EventSubscription{ + Events: []eventsapi.CAPIFEvent{ + eventsapi.CAPIFEventSERVICEAPIAVAILABLE, + }, + NotificationDestination: common29122.Uri(notificationUrl), + } + serviceUnderTest.addSubscription(subId, subscription) + sub2 := eventsapi.EventSubscription{ + Events: []eventsapi.CAPIFEvent{ + eventsapi.CAPIFEventACCESSCONTROLPOLICYUNAVAILABLE, + }, + NotificationDestination: common29122.Uri(notificationUrl), + } + serviceUnderTest.addSubscription("other", sub2) + + event := eventsapi.EventNotification{ + SubscriptionId: subId, + Events: eventsapi.CAPIFEventSERVICEAPIAVAILABLE, + } + matchingSubs := serviceUnderTest.getMatchingSubs(event) + assert.Len(t, matchingSubs, 1) + assert.Equal(t, subId, matchingSubs[0]) } func getEcho(client restclient.HTTPClient) (*EventService, *echo.Echo) {