2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 "github.com/go-openapi/loads"
27 httptransport "github.com/go-openapi/runtime/client"
28 "github.com/go-openapi/runtime/middleware"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
38 apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
39 apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
40 apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
41 apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
42 apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
44 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
45 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
46 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
47 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
48 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
49 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/query"
50 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
51 //"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/xapp"
54 type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
55 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
56 type SubscriptionDeleteHandler func(string) error
57 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
59 type Subscriber struct {
67 clientCB SubscriptionResponseCallback
70 func NewSubscriber(host string, timo int) *Subscriber {
72 pltnamespace := os.Getenv("PLT_NAMESPACE")
73 if pltnamespace == "" {
74 pltnamespace = "ricplt"
76 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
86 remoteProt: []string{"http"},
87 timeout: time.Duration(timo) * time.Second,
90 clientUrl: "/ric/v1/subscriptions/response",
92 Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
97 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
99 var resp apimodel.SubscriptionResponse
100 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
101 if r.clientCB != nil {
109 // Server interface: listen and receive subscription requests
110 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
111 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
116 api := operations.NewXappFrameworkAPI(swaggerSpec)
118 // Subscription: Query
119 api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
120 func(p query.GetAllSubscriptionsParams) middleware.Responder {
121 if resp, err := getSubscription(); err == nil {
122 return query.NewGetAllSubscriptionsOK().WithPayload(resp)
124 return query.NewGetAllSubscriptionsInternalServerError()
127 // SubscriptionType: Report
128 api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
129 func(p report.SubscribeReportParams) middleware.Responder {
130 if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil {
131 return report.NewSubscribeReportCreated().WithPayload(resp)
133 return report.NewSubscribeReportInternalServerError()
136 // SubscriptionType: Policy
137 api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
138 func(p policy.SubscribePolicyParams) middleware.Responder {
139 if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
140 return policy.NewSubscribePolicyCreated().WithPayload(resp)
142 return policy.NewSubscribePolicyInternalServerError()
145 // SubscriptionType: Delete
146 api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
147 func(p common.UnsubscribeParams) middleware.Responder {
148 if err := delSubscription(p.SubscriptionID); err == nil {
149 return common.NewUnsubscribeNoContent()
151 return common.NewUnsubscribeInternalServerError()
155 /*api.XappGetXappConfigListHandler = xapp.GetXappConfigListHandlerFunc(
156 func(p xapp.GetXappConfigListParams) middleware.Responder {
157 Logger.Info("Hitting xapp config")
158 if resp,err := r.getXappConfig(); err == nil {
159 return xapp.NewGetXappConfigListOK().WithPayload(resp)
161 return xapp.NewGetXappConfigListInternalServerError()
164 server := restapi.NewServer(api)
165 defer server.Shutdown()
166 server.Host = r.localAddr
167 server.Port = r.localPort
169 Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
170 if err := server.Serve(); err != nil {
176 // Server interface: send notification to client
177 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) {
178 respData, err := json.Marshal(resp)
180 Logger.Error("json.Marshal failed: %v", err)
184 ep, _, _ := net.SplitHostPort(clientEndpoint)
185 _, port, _ := net.SplitHostPort(fmt.Sprintf(":%d", GetPortData("http").Port))
186 clientUrl := fmt.Sprintf("http://%s:%s%s", ep, port, r.clientUrl)
188 retries := viper.GetInt("subscription.retryCount")
193 delay := viper.GetInt("subscription.retryDelay")
198 for i := 0; i < retries; i++ {
199 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
200 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
205 Logger.Error("%v", err)
207 if r != nil && r.StatusCode != http.StatusOK {
208 Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
210 time.Sleep(time.Duration(delay) * time.Second)
216 // Subscription interface for xApp: Response callback
217 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
221 // Subscription interface for xApp: REPORT
222 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
223 params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
224 result, err := r.CreateTransport().Report.SubscribeReport(params)
226 return &apimodel.SubscriptionResponse{}, err
229 return result.Payload, err
232 // Subscription interface for xApp: POLICY
233 func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (*apimodel.SubscriptionResponse, error) {
234 params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
235 result, err := r.CreateTransport().Policy.SubscribePolicy(params)
237 return &apimodel.SubscriptionResponse{}, err
240 return result.Payload, err
243 // Subscription interface for xApp: DELETE
244 func (r *Subscriber) UnSubscribe(subId string) error {
245 params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
246 _, err := r.CreateTransport().Common.Unsubscribe(params)
251 // Subscription interface for xApp: QUERY
252 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
253 resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
255 return models.SubscriptionList{}, err
258 defer resp.Body.Close()
260 contents, err := ioutil.ReadAll(resp.Body)
262 return models.SubscriptionList{}, err
265 subscriptions := models.SubscriptionList{}
266 err = json.Unmarshal([]byte(string(contents)), &subscriptions)
268 return models.SubscriptionList{}, err
271 return subscriptions, nil
274 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
275 return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
278 /*func (r *Subscriber) getXappConfig() (appconfig models.XappConfigList, err error) {
280 Logger.Error("Inside getXappConfig")
282 var metadata models.ConfigMetadata
283 var xappconfig models.XAppConfig
284 name := viper.GetString("name")
286 metadata.XappName = &name
287 metadata.ConfigType = &configtype
289 configFile, err := os.Open("/opt/ric/config/config-file.json")
291 Logger.Error("Cannot open config file: %v", err)
292 return nil,errors.New("Could Not parse the config file")
295 body, err := ioutil.ReadAll(configFile)
297 defer configFile.Close()
299 xappconfig.Metadata = &metadata
300 xappconfig.Config = body
302 appconfig = append(appconfig,&xappconfig)