First version ov eventservice
[nonrtric/plt/sme.git] / capifcore / internal / eventservice / eventservice.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2022: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package eventservice
22
23 import (
24         "encoding/json"
25         "fmt"
26         "net/http"
27         "path"
28         "strconv"
29         "sync"
30
31         "github.com/labstack/echo/v4"
32         log "github.com/sirupsen/logrus"
33         "oransc.org/nonrtric/capifcore/internal/common29122"
34         "oransc.org/nonrtric/capifcore/internal/eventsapi"
35         "oransc.org/nonrtric/capifcore/internal/restclient"
36 )
37
38 type EventService struct {
39         notificationChannel chan eventsapi.EventNotification
40         client              restclient.HTTPClient
41         subscriptions       map[string]eventsapi.EventSubscription
42         idCounter           uint
43         lock                sync.Mutex
44 }
45
46 func NewEventService(c restclient.HTTPClient) *EventService {
47         es := EventService{
48                 notificationChannel: make(chan eventsapi.EventNotification),
49                 client:              c,
50                 subscriptions:       make(map[string]eventsapi.EventSubscription),
51         }
52         es.start()
53         return &es
54 }
55
56 func (es *EventService) start() {
57         go es.handleIncomingEvents()
58 }
59
60 func (es *EventService) handleIncomingEvents() {
61         for event := range es.notificationChannel {
62                 es.handleEvent(event)
63         }
64 }
65 func (es *EventService) GetNotificationChannel() chan<- eventsapi.EventNotification {
66         return es.notificationChannel
67 }
68
69 func (es *EventService) PostSubscriberIdSubscriptions(ctx echo.Context, subscriberId string) error {
70         newSubscription, err := getEventSubscriptionFromRequest(ctx)
71         errMsg := "Unable to register subscription due to %s."
72         if err != nil {
73                 return sendCoreError(ctx, http.StatusBadRequest, fmt.Sprintf(errMsg, err))
74         }
75         uri := ctx.Request().Host + ctx.Request().URL.String()
76         subId := es.getSubscriptionId(subscriberId)
77         es.addSubscription(subId, newSubscription)
78         ctx.Response().Header().Set(echo.HeaderLocation, ctx.Scheme()+`://`+path.Join(uri, subId))
79         err = ctx.JSON(http.StatusCreated, newSubscription)
80         if err != nil {
81                 // Something really bad happened, tell Echo that our handler failed
82                 return err
83         }
84
85         return nil
86 }
87
88 func (es *EventService) DeleteSubscriberIdSubscriptionsSubscriptionId(ctx echo.Context, subscriberId string, subscriptionId string) error {
89         es.lock.Lock()
90         defer es.lock.Unlock()
91
92         log.Debug(es.subscriptions)
93         if _, ok := es.subscriptions[subscriptionId]; ok {
94                 log.Debug("Deleting subscription", subscriptionId)
95                 delete(es.subscriptions, subscriptionId)
96         }
97
98         return ctx.NoContent(http.StatusNoContent)
99 }
100
101 func getEventSubscriptionFromRequest(ctx echo.Context) (eventsapi.EventSubscription, error) {
102         var subscription eventsapi.EventSubscription
103         err := ctx.Bind(&subscription)
104         if err != nil {
105                 return eventsapi.EventSubscription{}, fmt.Errorf("invalid format for subscription")
106         }
107         return subscription, nil
108 }
109
110 func (es *EventService) handleEvent(event eventsapi.EventNotification) {
111         subscription := es.getSubscription(event.SubscriptionId)
112         if subscription != nil {
113                 e, _ := json.Marshal(event)
114                 if error := restclient.Put(string(subscription.NotificationDestination), []byte(e), es.client); error != nil {
115                         log.Error("Unable to send event")
116                 }
117         }
118 }
119
120 func (es *EventService) getSubscriptionId(subscriberId string) string {
121         es.idCounter++
122         return subscriberId + strconv.FormatUint(uint64(es.idCounter), 10)
123 }
124
125 func (es *EventService) addSubscription(subId string, subscription eventsapi.EventSubscription) {
126         es.lock.Lock()
127         es.subscriptions[subId] = subscription
128         es.lock.Unlock()
129 }
130
131 func (es *EventService) getSubscription(subId string) *eventsapi.EventSubscription {
132         es.lock.Lock()
133         defer es.lock.Unlock()
134         if sub, ok := es.subscriptions[subId]; ok {
135                 return &sub
136         } else {
137                 return nil
138         }
139 }
140
141 // This function wraps sending of an error in the Error format, and
142 // handling the failure to marshal that.
143 func sendCoreError(ctx echo.Context, code int, message string) error {
144         pd := common29122.ProblemDetails{
145                 Cause:  &message,
146                 Status: &code,
147         }
148         err := ctx.JSON(code, pd)
149         return err
150 }