2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
31 "github.com/go-openapi/loads"
32 httptransport "github.com/go-openapi/runtime/client"
33 "github.com/go-openapi/runtime/middleware"
34 "github.com/go-openapi/strfmt"
35 "github.com/spf13/viper"
37 apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
38 apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
39 apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
41 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
42 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
43 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
44 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
47 type SubscriptionHandler func(interface{}) (*models.SubscriptionResponse, int)
48 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
49 type SubscriptionDeleteHandler func(string) int
50 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
52 type Subscriber struct {
60 clientCB SubscriptionResponseCallback
63 func NewSubscriber(host string, timo int) *Subscriber {
65 pltnamespace := os.Getenv("PLT_NAMESPACE")
66 if pltnamespace == "" {
67 pltnamespace = "ricplt"
69 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
79 remoteProt: []string{"http"},
80 timeout: time.Duration(timo) * time.Second,
83 clientUrl: "/ric/v1/subscriptions/response",
85 Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
90 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
92 var resp apimodel.SubscriptionResponse
93 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
94 if r.clientCB != nil {
102 // Server interface: listen and receive subscription requests
103 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
104 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
109 api := operations.NewXappFrameworkAPI(swaggerSpec)
111 // Subscription: Query
112 api.CommonGetAllSubscriptionsHandler = common.GetAllSubscriptionsHandlerFunc(
113 func(p common.GetAllSubscriptionsParams) middleware.Responder {
114 if resp, err := getSubscription(); err == nil {
115 return common.NewGetAllSubscriptionsOK().WithPayload(resp)
117 return common.NewGetAllSubscriptionsInternalServerError()
120 // Subscription: Subscribe
121 api.CommonSubscribeHandler = common.SubscribeHandlerFunc(
122 func(params common.SubscribeParams) middleware.Responder {
123 Logger.Error("Subscribe: Params=%+v", params.SubscriptionParams)
124 resp, retCode := createSubscription(params.SubscriptionParams)
125 if retCode != common.SubscribeCreatedCode {
126 if retCode == common.SubscribeBadRequestCode {
127 return common.NewSubscribeBadRequest()
129 return common.NewSubscribeInternalServerError()
132 return common.NewSubscribeCreated().WithPayload(resp)
135 // Subscription: Unsubscribe
136 api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
137 func(p common.UnsubscribeParams) middleware.Responder {
138 Logger.Error("Unsubscribe: SubscriptionID=%+v", p.SubscriptionID)
139 retCode := delSubscription(p.SubscriptionID)
140 if retCode != common.UnsubscribeNoContentCode {
141 if retCode == common.UnsubscribeBadRequestCode {
142 return common.NewUnsubscribeBadRequest()
144 return common.NewUnsubscribeInternalServerError()
147 return common.NewUnsubscribeNoContent()
150 server := restapi.NewServer(api)
151 defer server.Shutdown()
152 server.Host = r.localAddr
153 server.Port = r.localPort
155 Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
156 if err := server.Serve(); err != nil {
162 // Server interface: send notification to client
163 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, ep models.SubscriptionParamsClientEndpoint) (err error) {
164 respData, err := json.Marshal(resp)
166 Logger.Error("json.Marshal failed: %v", err)
170 clientUrl := fmt.Sprintf("http://%s:%d%s", ep.Host, *ep.HTTPPort, r.clientUrl)
172 retries := viper.GetInt("subscription.retryCount")
177 delay := viper.GetInt("subscription.retryDelay")
182 for i := 0; i < retries; i++ {
183 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
184 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
189 Logger.Error("%v", err)
191 if r != nil && r.StatusCode != http.StatusOK {
192 Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
194 time.Sleep(time.Duration(delay) * time.Second)
200 // Subscription interface for xApp: Response callback
201 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
205 // Subscription interface for xApp
206 func (r *Subscriber) Subscribe(p *apimodel.SubscriptionParams) (*apimodel.SubscriptionResponse, error) {
207 params := apicommon.NewSubscribeParamsWithTimeout(r.timeout).WithSubscriptionParams(p)
208 result, err := r.CreateTransport().Common.Subscribe(params)
210 return &apimodel.SubscriptionResponse{}, err
213 return result.Payload, err
216 // Subscription interface for xApp: DELETE
217 func (r *Subscriber) Unsubscribe(subId string) error {
218 params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
219 _, err := r.CreateTransport().Common.Unsubscribe(params)
224 // Subscription interface for xApp: QUERY
225 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
226 resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
228 return models.SubscriptionList{}, err
231 defer resp.Body.Close()
233 contents, err := ioutil.ReadAll(resp.Body)
235 return models.SubscriptionList{}, err
238 subscriptions := models.SubscriptionList{}
239 err = json.Unmarshal([]byte(string(contents)), &subscriptions)
241 return models.SubscriptionList{}, err
244 return subscriptions, nil
247 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
248 return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)