6a6983ce4bd208a1cb6e0caf0d24ceb5a8c61643
[ric-plt/xapp-frame.git] / pkg / xapp / subscription.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "bytes"
24         "encoding/json"
25         "fmt"
26         "github.com/go-openapi/loads"
27         httptransport "github.com/go-openapi/runtime/client"
28         "github.com/go-openapi/runtime/middleware"
29         "github.com/go-openapi/strfmt"
30         "github.com/spf13/viper"
31         "io/ioutil"
32         "net/http"
33         "os"
34         "time"
35
36         apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
37         apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
38         apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
39
40         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
41         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
42         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
43         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
44 )
45
46 type SubscriptionHandler func(interface{}) (*models.SubscriptionResponse, error)
47 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
48 type SubscriptionDeleteHandler func(string) error
49 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
50
51 type Subscriber struct {
52         localAddr  string
53         localPort  int
54         remoteHost string
55         remoteUrl  string
56         remoteProt []string
57         timeout    time.Duration
58         clientUrl  string
59         clientCB   SubscriptionResponseCallback
60 }
61
62 func NewSubscriber(host string, timo int) *Subscriber {
63         if host == "" {
64                 pltnamespace := os.Getenv("PLT_NAMESPACE")
65                 if pltnamespace == "" {
66                         pltnamespace = "ricplt"
67                 }
68                 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
69         }
70
71         if timo == 0 {
72                 timo = 20
73         }
74
75         r := &Subscriber{
76                 remoteHost: host,
77                 remoteUrl:  "/ric/v1",
78                 remoteProt: []string{"http"},
79                 timeout:    time.Duration(timo) * time.Second,
80                 localAddr:  "0.0.0.0",
81                 localPort:  8088,
82                 clientUrl:  "/ric/v1/subscriptions/response",
83         }
84         Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
85
86         return r
87 }
88
89 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
90         if req.Body != nil {
91                 var resp apimodel.SubscriptionResponse
92                 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
93                         if r.clientCB != nil {
94                                 r.clientCB(&resp)
95                         }
96                 }
97                 req.Body.Close()
98         }
99 }
100
101 // Server interface: listen and receive subscription requests
102 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
103         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
104         if err != nil {
105                 return err
106         }
107
108         api := operations.NewXappFrameworkAPI(swaggerSpec)
109
110         // Subscription: Query
111         api.CommonGetAllSubscriptionsHandler = common.GetAllSubscriptionsHandlerFunc(
112                 func(p common.GetAllSubscriptionsParams) middleware.Responder {
113                         if resp, err := getSubscription(); err == nil {
114                                 return common.NewGetAllSubscriptionsOK().WithPayload(resp)
115                         }
116                         return common.NewGetAllSubscriptionsInternalServerError()
117                 })
118
119         // Subscription: Subscribe
120         api.CommonSubscribeHandler = common.SubscribeHandlerFunc(
121                 func(params common.SubscribeParams) middleware.Responder {
122                         Logger.Error("Subscribe: Params=%+v", params.SubscriptionParams)
123                         if resp, err := createSubscription(params.SubscriptionParams); err == nil {
124                                 return common.NewSubscribeCreated().WithPayload(resp)
125                         }
126                         return common.NewSubscribeInternalServerError()
127                 })
128
129         // Subscription: Unsubscribe
130         api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
131                 func(p common.UnsubscribeParams) middleware.Responder {
132                         Logger.Error("Unsubscribe: SubscriptionID=%+v", p.SubscriptionID)
133                         if err := delSubscription(p.SubscriptionID); err == nil {
134                                 return common.NewUnsubscribeNoContent()
135                         }
136                         return common.NewUnsubscribeInternalServerError()
137                 })
138
139         server := restapi.NewServer(api)
140         defer server.Shutdown()
141         server.Host = r.localAddr
142         server.Port = r.localPort
143
144         Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
145         if err := server.Serve(); err != nil {
146                 return err
147         }
148         return nil
149 }
150
151 // Server interface: send notification to client
152 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, ep models.SubscriptionParamsClientEndpoint) (err error) {
153         respData, err := json.Marshal(resp)
154         if err != nil {
155                 Logger.Error("json.Marshal failed: %v", err)
156                 return err
157         }
158
159         clientUrl := fmt.Sprintf("http://%s:%d%s", ep.Host, *ep.HTTPPort, r.clientUrl)
160
161         retries := viper.GetInt("subscription.retryCount")
162         if retries == 0 {
163                 retries = 10
164         }
165
166         delay := viper.GetInt("subscription.retryDelay")
167         if delay == 0 {
168                 delay = 5
169         }
170
171         for i := 0; i < retries; i++ {
172                 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
173                 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
174                         break
175                 }
176
177                 if err != nil {
178                         Logger.Error("%v", err)
179                 }
180                 if r != nil && r.StatusCode != http.StatusOK {
181                         Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
182                 }
183                 time.Sleep(time.Duration(delay) * time.Second)
184         }
185
186         return err
187 }
188
189 // Subscription interface for xApp: Response callback
190 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
191         r.clientCB = c
192 }
193
194 // Subscription interface for xApp
195 func (r *Subscriber) Subscribe(p *apimodel.SubscriptionParams) (*apimodel.SubscriptionResponse, error) {
196         params := apicommon.NewSubscribeParamsWithTimeout(r.timeout).WithSubscriptionParams(p)
197         result, err := r.CreateTransport().Common.Subscribe(params)
198         if err != nil {
199                 return &apimodel.SubscriptionResponse{}, err
200         }
201
202         return result.Payload, err
203 }
204
205 // Subscription interface for xApp: DELETE
206 func (r *Subscriber) Unsubscribe(subId string) error {
207         params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
208         _, err := r.CreateTransport().Common.Unsubscribe(params)
209
210         return err
211 }
212
213 // Subscription interface for xApp: QUERY
214 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
215         resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
216         if err != nil {
217                 return models.SubscriptionList{}, err
218         }
219
220         defer resp.Body.Close()
221
222         contents, err := ioutil.ReadAll(resp.Body)
223         if err != nil {
224                 return models.SubscriptionList{}, err
225         }
226
227         subscriptions := models.SubscriptionList{}
228         err = json.Unmarshal([]byte(string(contents)), &subscriptions)
229         if err != nil {
230                 return models.SubscriptionList{}, err
231         }
232
233         return subscriptions, nil
234 }
235
236 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
237         return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
238 }
239
240 /*func (r *Subscriber) getXappConfig() (appconfig models.XappConfigList, err error) {
241
242     Logger.Error("Inside getXappConfig")
243
244                 var metadata models.ConfigMetadata
245         var xappconfig models.XAppConfig
246         name := viper.GetString("name")
247         configtype := "json"
248                 metadata.XappName = &name
249                 metadata.ConfigType = &configtype
250
251         configFile, err := os.Open("/opt/ric/config/config-file.json")
252         if err != nil {
253                 Logger.Error("Cannot open config file: %v", err)
254                 return nil,errors.New("Could Not parse the config file")
255         }
256
257         body, err := ioutil.ReadAll(configFile)
258
259         defer configFile.Close()
260
261                 xappconfig.Metadata = &metadata
262                 xappconfig.Config = body
263
264         appconfig = append(appconfig,&xappconfig)
265
266                 return appconfig,nil
267 }*/