2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 "github.com/go-openapi/loads"
27 httptransport "github.com/go-openapi/runtime/client"
28 "github.com/go-openapi/runtime/middleware"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
36 apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
37 apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
38 apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
40 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
41 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
42 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
43 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
46 type SubscriptionHandler func(interface{}) (*models.SubscriptionResponse, error)
47 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
48 type SubscriptionDeleteHandler func(string) error
49 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
51 type Subscriber struct {
59 clientCB SubscriptionResponseCallback
62 func NewSubscriber(host string, timo int) *Subscriber {
64 pltnamespace := os.Getenv("PLT_NAMESPACE")
65 if pltnamespace == "" {
66 pltnamespace = "ricplt"
68 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
78 remoteProt: []string{"http"},
79 timeout: time.Duration(timo) * time.Second,
82 clientUrl: "/ric/v1/subscriptions/response",
84 Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
89 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
91 var resp apimodel.SubscriptionResponse
92 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
93 if r.clientCB != nil {
101 // Server interface: listen and receive subscription requests
102 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
103 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
108 api := operations.NewXappFrameworkAPI(swaggerSpec)
110 // Subscription: Query
111 api.CommonGetAllSubscriptionsHandler = common.GetAllSubscriptionsHandlerFunc(
112 func(p common.GetAllSubscriptionsParams) middleware.Responder {
113 if resp, err := getSubscription(); err == nil {
114 return common.NewGetAllSubscriptionsOK().WithPayload(resp)
116 return common.NewGetAllSubscriptionsInternalServerError()
119 // Subscription: Subscribe
120 api.CommonSubscribeHandler = common.SubscribeHandlerFunc(
121 func(params common.SubscribeParams) middleware.Responder {
122 if resp, err := createSubscription(params.SubscriptionParams); err == nil {
123 return common.NewSubscribeCreated().WithPayload(resp)
125 return common.NewSubscribeInternalServerError()
128 // Subscription: Unsubscribe
129 api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
130 func(p common.UnsubscribeParams) middleware.Responder {
131 if err := delSubscription(p.SubscriptionID); err == nil {
132 return common.NewUnsubscribeNoContent()
134 return common.NewUnsubscribeInternalServerError()
137 server := restapi.NewServer(api)
138 defer server.Shutdown()
139 server.Host = r.localAddr
140 server.Port = r.localPort
142 Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
143 if err := server.Serve(); err != nil {
149 // Server interface: send notification to client
150 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, ep models.SubscriptionParamsClientEndpoint) (err error) {
151 respData, err := json.Marshal(resp)
153 Logger.Error("json.Marshal failed: %v", err)
157 clientUrl := fmt.Sprintf("http://%s:%d%s", ep.Host, *ep.HTTPPort, r.clientUrl)
159 retries := viper.GetInt("subscription.retryCount")
164 delay := viper.GetInt("subscription.retryDelay")
169 for i := 0; i < retries; i++ {
170 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
171 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
176 Logger.Error("%v", err)
178 if r != nil && r.StatusCode != http.StatusOK {
179 Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
181 time.Sleep(time.Duration(delay) * time.Second)
187 // Subscription interface for xApp: Response callback
188 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
192 // Subscription interface for xApp
193 func (r *Subscriber) Subscribe(p *apimodel.SubscriptionParams) (*apimodel.SubscriptionResponse, error) {
194 params := apicommon.NewSubscribeParamsWithTimeout(r.timeout).WithSubscriptionParams(p)
195 result, err := r.CreateTransport().Common.Subscribe(params)
197 return &apimodel.SubscriptionResponse{}, err
200 return result.Payload, err
203 // Subscription interface for xApp: DELETE
204 func (r *Subscriber) Unsubscribe(subId string) error {
205 params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
206 _, err := r.CreateTransport().Common.Unsubscribe(params)
211 // Subscription interface for xApp: QUERY
212 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
213 resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
215 return models.SubscriptionList{}, err
218 defer resp.Body.Close()
220 contents, err := ioutil.ReadAll(resp.Body)
222 return models.SubscriptionList{}, err
225 subscriptions := models.SubscriptionList{}
226 err = json.Unmarshal([]byte(string(contents)), &subscriptions)
228 return models.SubscriptionList{}, err
231 return subscriptions, nil
234 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
235 return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)
238 /*func (r *Subscriber) getXappConfig() (appconfig models.XappConfigList, err error) {
240 Logger.Error("Inside getXappConfig")
242 var metadata models.ConfigMetadata
243 var xappconfig models.XAppConfig
244 name := viper.GetString("name")
246 metadata.XappName = &name
247 metadata.ConfigType = &configtype
249 configFile, err := os.Open("/opt/ric/config/config-file.json")
251 Logger.Error("Cannot open config file: %v", err)
252 return nil,errors.New("Could Not parse the config file")
255 body, err := ioutil.ReadAll(configFile)
257 defer configFile.Close()
259 xappconfig.Metadata = &metadata
260 xappconfig.Config = body
262 appconfig = append(appconfig,&xappconfig)