2 // ========================LICENSE_START=================================
5 // Copyright (C) 2022: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
31 "github.com/labstack/echo/v4"
32 log "github.com/sirupsen/logrus"
33 "oransc.org/nonrtric/capifcore/internal/common29122"
34 "oransc.org/nonrtric/capifcore/internal/eventsapi"
35 "oransc.org/nonrtric/capifcore/internal/restclient"
38 type EventService struct {
39 notificationChannel chan eventsapi.EventNotification
40 client restclient.HTTPClient
41 subscriptions map[string]eventsapi.EventSubscription
46 func NewEventService(c restclient.HTTPClient) *EventService {
48 notificationChannel: make(chan eventsapi.EventNotification),
50 subscriptions: make(map[string]eventsapi.EventSubscription),
56 func (es *EventService) start() {
57 go es.handleIncomingEvents()
60 func (es *EventService) handleIncomingEvents() {
61 for event := range es.notificationChannel {
65 func (es *EventService) GetNotificationChannel() chan<- eventsapi.EventNotification {
66 return es.notificationChannel
69 func (es *EventService) PostSubscriberIdSubscriptions(ctx echo.Context, subscriberId string) error {
70 newSubscription, err := getEventSubscriptionFromRequest(ctx)
71 errMsg := "Unable to register subscription due to %s."
73 return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, err))
75 uri := ctx.Request().Host + ctx.Request().URL.String()
76 subId := es.getSubscriptionId(subscriberId)
77 es.addSubscription(subId, newSubscription)
78 ctx.Response().Header().Set(echo.HeaderLocation, ctx.Scheme()+`://`+path.Join(uri, subId))
79 err = ctx.JSON(http.StatusCreated, newSubscription)
81 // Something really bad happened, tell Echo that our handler failed
88 func (es *EventService) DeleteSubscriberIdSubscriptionsSubscriptionId(ctx echo.Context, subscriberId string, subscriptionId string) error {
90 defer es.lock.Unlock()
92 log.Debug(es.subscriptions)
93 if _, ok := es.subscriptions[subscriptionId]; ok {
94 log.Debug("Deleting subscription", subscriptionId)
95 delete(es.subscriptions, subscriptionId)
98 return ctx.NoContent(http.StatusNoContent)
101 func getEventSubscriptionFromRequest(ctx echo.Context) (eventsapi.EventSubscription, error) {
102 var subscription eventsapi.EventSubscription
103 err := ctx.Bind(&subscription)
105 return eventsapi.EventSubscription{}, fmt.Errorf("invalid format for subscription")
107 return subscription, nil
110 func (es *EventService) handleEvent(event eventsapi.EventNotification) {
111 subscription := es.getSubscription(event.SubscriptionId)
112 if subscription != nil {
113 e, _ := json.Marshal(event)
114 if error := restclient.Put(string(subscription.NotificationDestination), []byte(e), es.client); error != nil {
115 log.Error("Unable to send event")
120 func (es *EventService) getSubscriptionId(subscriberId string) string {
122 return subscriberId + strconv.FormatUint(uint64(es.idCounter), 10)
125 func (es *EventService) addSubscription(subId string, subscription eventsapi.EventSubscription) {
127 es.subscriptions[subId] = subscription
131 func (es *EventService) getSubscription(subId string) *eventsapi.EventSubscription {
133 defer es.lock.Unlock()
134 if sub, ok := es.subscriptions[subId]; ok {
141 // This function wraps sending of an error in the Error format, and
142 // handling the failure to marshal that.
143 func sendCoreError(ctx echo.Context, code int, message string) error {
144 pd := common29122.ProblemDetails{
148 err := ctx.JSON(code, pd)