2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 "github.com/go-openapi/loads"
27 httptransport "github.com/go-openapi/runtime/client"
28 "github.com/go-openapi/runtime/middleware"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
36 apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
37 apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
38 apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
40 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
41 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
42 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
43 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
46 type SubscriptionHandler func(interface{}) (*models.SubscriptionResponse, error)
47 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
48 type SubscriptionDeleteHandler func(string) error
49 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
51 type Subscriber struct {
59 clientCB SubscriptionResponseCallback
62 func NewSubscriber(host string, timo int) *Subscriber {
64 pltnamespace := os.Getenv("PLT_NAMESPACE")
65 if pltnamespace == "" {
66 pltnamespace = "ricplt"
68 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
78 remoteProt: []string{"http"},
79 timeout: time.Duration(timo) * time.Second,
82 clientUrl: "/ric/v1/subscriptions/response",
84 Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
89 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
91 var resp apimodel.SubscriptionResponse
92 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
93 if r.clientCB != nil {
101 // Server interface: listen and receive subscription requests
102 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
103 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
108 api := operations.NewXappFrameworkAPI(swaggerSpec)
110 // Subscription: Query
111 api.CommonGetAllSubscriptionsHandler = common.GetAllSubscriptionsHandlerFunc(
112 func(p common.GetAllSubscriptionsParams) middleware.Responder {
113 if resp, err := getSubscription(); err == nil {
114 return common.NewGetAllSubscriptionsOK().WithPayload(resp)
116 return common.NewGetAllSubscriptionsInternalServerError()
119 // Subscription: Subscribe
120 api.CommonSubscribeHandler = common.SubscribeHandlerFunc(
121 func(params common.SubscribeParams) middleware.Responder {
122 Logger.Error("Subscribe: Params=%+v", params.SubscriptionParams)
123 if resp, err := createSubscription(params.SubscriptionParams); err == nil {
124 return common.NewSubscribeCreated().WithPayload(resp)
126 return common.NewSubscribeInternalServerError()
129 // Subscription: Unsubscribe
130 api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
131 func(p common.UnsubscribeParams) middleware.Responder {
132 Logger.Error("Unsubscribe: SubscriptionID=%+v", p.SubscriptionID)
133 if err := delSubscription(p.SubscriptionID); err == nil {
134 return common.NewUnsubscribeNoContent()
136 return common.NewUnsubscribeInternalServerError()
139 server := restapi.NewServer(api)
140 defer server.Shutdown()
141 server.Host = r.localAddr
142 server.Port = r.localPort
144 Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
145 if err := server.Serve(); err != nil {
151 // Server interface: send notification to client
152 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, ep models.SubscriptionParamsClientEndpoint) (err error) {
153 respData, err := json.Marshal(resp)
155 Logger.Error("json.Marshal failed: %v", err)
159 clientUrl := fmt.Sprintf("http://%s:%d%s", ep.Host, *ep.HTTPPort, r.clientUrl)
161 retries := viper.GetInt("subscription.retryCount")
166 delay := viper.GetInt("subscription.retryDelay")
171 for i := 0; i < retries; i++ {
172 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
173 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
178 Logger.Error("%v", err)
180 if r != nil && r.StatusCode != http.StatusOK {
181 Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
183 time.Sleep(time.Duration(delay) * time.Second)
189 // Subscription interface for xApp: Response callback
190 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
194 // Subscription interface for xApp
195 func (r *Subscriber) Subscribe(p *apimodel.SubscriptionParams) (*apimodel.SubscriptionResponse, error) {
196 params := apicommon.NewSubscribeParamsWithTimeout(r.timeout).WithSubscriptionParams(p)
197 result, err := r.CreateTransport().Common.Subscribe(params)
199 return &apimodel.SubscriptionResponse{}, err
202 return result.Payload, err
205 // Subscription interface for xApp: DELETE
206 func (r *Subscriber) Unsubscribe(subId string) error {
207 params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
208 _, err := r.CreateTransport().Common.Unsubscribe(params)
213 // Subscription interface for xApp: QUERY
214 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
215 resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
217 return models.SubscriptionList{}, err
220 defer resp.Body.Close()
222 contents, err := ioutil.ReadAll(resp.Body)
224 return models.SubscriptionList{}, err
227 subscriptions := models.SubscriptionList{}
228 err = json.Unmarshal([]byte(string(contents)), &subscriptions)
230 return models.SubscriptionList{}, err
233 return subscriptions, nil
236 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
237 return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
240 /*func (r *Subscriber) getXappConfig() (appconfig models.XappConfigList, err error) {
242 Logger.Error("Inside getXappConfig")
244 var metadata models.ConfigMetadata
245 var xappconfig models.XAppConfig
246 name := viper.GetString("name")
248 metadata.XappName = &name
249 metadata.ConfigType = &configtype
251 configFile, err := os.Open("/opt/ric/config/config-file.json")
253 Logger.Error("Cannot open config file: %v", err)
254 return nil,errors.New("Could Not parse the config file")
257 body, err := ioutil.ReadAll(configFile)
259 defer configFile.Close()
261 xappconfig.Metadata = &metadata
262 xappconfig.Config = body
264 appconfig = append(appconfig,&xappconfig)