1cdce827e201d7b70d078a6a7e7dd6a6dd274cef
[ric-plt/xapp-frame.git] / pkg / xapp / subscription.go
1 /*
2 ==================================================================================
3   Copyright (c) 2019 AT&T Intellectual Property.
4   Copyright (c) 2019 Nokia
5
6    Licensed under the Apache License, Version 2.0 (the "License");
7    you may not use this file except in compliance with the License.
8    You may obtain a copy of the License at
9
10        http://www.apache.org/licenses/LICENSE-2.0
11
12    Unless required by applicable law or agreed to in writing, software
13    distributed under the License is distributed on an "AS IS" BASIS,
14    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15    See the License for the specific language governing permissions and
16    limitations under the License.
17 ==================================================================================
18 */
19
20 package xapp
21
22 import (
23         "github.com/go-openapi/loads"
24         httptransport "github.com/go-openapi/runtime/client"
25         "github.com/go-openapi/runtime/middleware"
26         "github.com/go-openapi/strfmt"
27         "time"
28
29         apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
30         apicontrol "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/control"
31         apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
32         apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
33         apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
34
35         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
36         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
37         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
38         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/control"
39         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
40         "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
41 )
42
43 type SubscriptionReportHandler func(models.SubscriptionType, interface{}) (models.SubscriptionResult, error)
44
45 type Subscriber struct {
46         localAddr  string
47         localPort  int
48         remoteHost string
49         remoteUrl  string
50         remoteProt []string
51         timeout    time.Duration
52 }
53
54 func NewSubscriber(host string, timo int) *Subscriber {
55         if host == "" {
56                 host = "service-ricplt-submgr-http:8088"
57         }
58
59         if timo == 0 {
60                 timo = 20
61         }
62
63         return &Subscriber{
64                 remoteHost: host,
65                 remoteUrl:  "/ric/v1",
66                 remoteProt: []string{"http"},
67                 timeout:    time.Duration(timo) * time.Second,
68                 localAddr:  "0.0.0.0",
69                 localPort:  8088,
70         }
71 }
72
73 // Server interface: listen and receive subscription requests
74 func (r *Subscriber) Listen(handler SubscriptionReportHandler) error {
75         swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
76         if err != nil {
77                 return err
78         }
79
80         api := operations.NewXappFrameworkAPI(swaggerSpec)
81
82         // SubscriptionType: Report
83         api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
84                 func(p report.SubscribeReportParams) middleware.Responder {
85                         if resp, err := handler(models.SubscriptionTypeReport, p.ReportParams); err == nil {
86                                 return report.NewSubscribeReportCreated().WithPayload(resp)
87                         }
88                         return report.NewSubscribeReportInternalServerError()
89                 })
90
91         // SubscriptionType: Control
92         api.ControlSubscribeControlHandler = control.SubscribeControlHandlerFunc(
93                 func(p control.SubscribeControlParams) middleware.Responder {
94                         if resp, err := handler(models.SubscriptionTypeControl, p.ControlParams); err == nil {
95                                 return control.NewSubscribeControlCreated().WithPayload(resp)
96                         }
97                         return control.NewSubscribeControlInternalServerError()
98                 })
99
100         // SubscriptionType: policy
101         api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
102                 func(p policy.SubscribePolicyParams) middleware.Responder {
103                         if resp, err := handler(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
104                                 return policy.NewSubscribePolicyCreated().WithPayload(resp)
105                         }
106                         return policy.NewSubscribePolicyInternalServerError()
107                 })
108
109         server := restapi.NewServer(api)
110         defer server.Shutdown()
111         server.Host = r.localAddr
112         server.Port = r.localPort
113
114         Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
115         if err := server.Serve(); err != nil {
116                 return err
117         }
118         return nil
119 }
120
121 // Subscription interface for xApp: REPORT
122 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (apimodel.SubscriptionResult, error) {
123         params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
124         result, err := r.CreateTransport().Report.SubscribeReport(params)
125         if err != nil {
126                 return apimodel.SubscriptionResult{}, err
127         }
128
129         return result.Payload, err
130 }
131
132 // Subscription interface for xApp: CONTROL
133 func (r *Subscriber) SubscribeControl(p *apimodel.ControlParams) (apimodel.SubscriptionResult, error) {
134         params := apicontrol.NewSubscribeControlParamsWithTimeout(r.timeout).WithControlParams(p)
135         result, err := r.CreateTransport().Control.SubscribeControl(params)
136         if err != nil {
137                 return apimodel.SubscriptionResult{}, err
138         }
139
140         return result.Payload, err
141 }
142
143 // Subscription interface for xApp: POLICY
144 func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (apimodel.SubscriptionResult, error) {
145         params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
146         result, err := r.CreateTransport().Policy.SubscribePolicy(params)
147         if err != nil {
148                 return apimodel.SubscriptionResult{}, err
149         }
150
151         return result.Payload, err
152 }
153
154 func (s *Subscriber) CreateTransport() *apiclient.RICSubscription {
155         return apiclient.New(httptransport.New(s.remoteHost, s.remoteUrl, s.remoteProt), strfmt.Default)
156 }