2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
31 "github.com/go-openapi/loads"
32 httptransport "github.com/go-openapi/runtime/client"
33 "github.com/go-openapi/runtime/middleware"
34 "github.com/go-openapi/strfmt"
35 "github.com/spf13/viper"
37 apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
38 apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
39 apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
41 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
42 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
43 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
44 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
47 type SubscriptionHandler func(interface{}) (*models.SubscriptionResponse, error)
48 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
49 type SubscriptionDeleteHandler func(string) error
50 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
52 type Subscriber struct {
60 clientCB SubscriptionResponseCallback
63 func NewSubscriber(host string, timo int) *Subscriber {
65 pltnamespace := os.Getenv("PLT_NAMESPACE")
66 if pltnamespace == "" {
67 pltnamespace = "ricplt"
69 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
79 remoteProt: []string{"http"},
80 timeout: time.Duration(timo) * time.Second,
83 clientUrl: "/ric/v1/subscriptions/response",
85 Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
90 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
92 var resp apimodel.SubscriptionResponse
93 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
94 if r.clientCB != nil {
102 // Server interface: listen and receive subscription requests
103 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
104 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
109 api := operations.NewXappFrameworkAPI(swaggerSpec)
111 // Subscription: Query
112 api.CommonGetAllSubscriptionsHandler = common.GetAllSubscriptionsHandlerFunc(
113 func(p common.GetAllSubscriptionsParams) middleware.Responder {
114 if resp, err := getSubscription(); err == nil {
115 return common.NewGetAllSubscriptionsOK().WithPayload(resp)
117 return common.NewGetAllSubscriptionsInternalServerError()
120 // Subscription: Subscribe
121 api.CommonSubscribeHandler = common.SubscribeHandlerFunc(
122 func(params common.SubscribeParams) middleware.Responder {
123 Logger.Error("Subscribe: Params=%+v", params.SubscriptionParams)
124 if resp, err := createSubscription(params.SubscriptionParams); err == nil {
125 return common.NewSubscribeCreated().WithPayload(resp)
127 return common.NewSubscribeInternalServerError()
130 // Subscription: Unsubscribe
131 api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
132 func(p common.UnsubscribeParams) middleware.Responder {
133 Logger.Error("Unsubscribe: SubscriptionID=%+v", p.SubscriptionID)
134 if err := delSubscription(p.SubscriptionID); err == nil {
135 return common.NewUnsubscribeNoContent()
137 return common.NewUnsubscribeInternalServerError()
140 server := restapi.NewServer(api)
141 defer server.Shutdown()
142 server.Host = r.localAddr
143 server.Port = r.localPort
145 Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
146 if err := server.Serve(); err != nil {
152 // Server interface: send notification to client
153 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, ep models.SubscriptionParamsClientEndpoint) (err error) {
154 respData, err := json.Marshal(resp)
156 Logger.Error("json.Marshal failed: %v", err)
160 clientUrl := fmt.Sprintf("http://%s:%d%s", ep.Host, *ep.HTTPPort, r.clientUrl)
162 retries := viper.GetInt("subscription.retryCount")
167 delay := viper.GetInt("subscription.retryDelay")
172 for i := 0; i < retries; i++ {
173 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
174 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
179 Logger.Error("%v", err)
181 if r != nil && r.StatusCode != http.StatusOK {
182 Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
184 time.Sleep(time.Duration(delay) * time.Second)
190 // Subscription interface for xApp: Response callback
191 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
195 // Subscription interface for xApp
196 func (r *Subscriber) Subscribe(p *apimodel.SubscriptionParams) (*apimodel.SubscriptionResponse, error) {
197 params := apicommon.NewSubscribeParamsWithTimeout(r.timeout).WithSubscriptionParams(p)
198 result, err := r.CreateTransport().Common.Subscribe(params)
200 return &apimodel.SubscriptionResponse{}, err
203 return result.Payload, err
206 // Subscription interface for xApp: DELETE
207 func (r *Subscriber) Unsubscribe(subId string) error {
208 params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
209 _, err := r.CreateTransport().Common.Unsubscribe(params)
214 // Subscription interface for xApp: QUERY
215 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
216 resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
218 return models.SubscriptionList{}, err
221 defer resp.Body.Close()
223 contents, err := ioutil.ReadAll(resp.Body)
225 return models.SubscriptionList{}, err
228 subscriptions := models.SubscriptionList{}
229 err = json.Unmarshal([]byte(string(contents)), &subscriptions)
231 return models.SubscriptionList{}, err
234 return subscriptions, nil
237 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
238 return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)