2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
31 "github.com/go-openapi/loads"
32 httptransport "github.com/go-openapi/runtime/client"
33 "github.com/go-openapi/runtime/middleware"
34 "github.com/go-openapi/strfmt"
35 "github.com/spf13/viper"
37 apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
38 apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
39 apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
41 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
42 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
43 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
44 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
47 type SubscriptionHandler func(interface{}) (*models.SubscriptionResponse, int)
48 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
49 type SubscriptionDeleteHandler func(string) int
50 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
52 type Subscriber struct {
60 clientCB SubscriptionResponseCallback
63 func NewSubscriber(host string, timo int) *Subscriber {
65 pltnamespace := os.Getenv("PLT_NAMESPACE")
66 if pltnamespace == "" {
67 pltnamespace = "ricplt"
69 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
79 remoteProt: []string{"http"},
80 timeout: time.Duration(timo) * time.Second,
83 clientUrl: "/ric/v1/subscriptions/response",
85 Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
90 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
92 var resp apimodel.SubscriptionResponse
93 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
94 if r.clientCB != nil {
102 // Server interface: listen and receive subscription requests
103 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
104 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
109 api := operations.NewXappFrameworkAPI(swaggerSpec)
111 // Subscription: Query
112 api.CommonGetAllSubscriptionsHandler = common.GetAllSubscriptionsHandlerFunc(
113 func(p common.GetAllSubscriptionsParams) middleware.Responder {
114 if resp, err := getSubscription(); err == nil {
115 return common.NewGetAllSubscriptionsOK().WithPayload(resp)
117 return common.NewGetAllSubscriptionsInternalServerError()
120 // Subscription: Subscribe
121 api.CommonSubscribeHandler = common.SubscribeHandlerFunc(
122 func(params common.SubscribeParams) middleware.Responder {
123 resp, retCode := createSubscription(params.SubscriptionParams)
124 if retCode != common.SubscribeCreatedCode {
125 if retCode == common.SubscribeBadRequestCode {
126 return common.NewSubscribeBadRequest()
127 } else if retCode == common.SubscribeNotFoundCode {
128 return common.NewSubscribeNotFound()
129 } else if retCode == common.SubscribeServiceUnavailableCode {
130 return common.NewSubscribeServiceUnavailable()
132 return common.NewSubscribeInternalServerError()
135 return common.NewSubscribeCreated().WithPayload(resp)
138 // Subscription: Unsubscribe
139 api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
140 func(p common.UnsubscribeParams) middleware.Responder {
141 retCode := delSubscription(p.SubscriptionID)
142 if retCode != common.UnsubscribeNoContentCode {
143 if retCode == common.UnsubscribeBadRequestCode {
144 return common.NewUnsubscribeBadRequest()
146 return common.NewUnsubscribeInternalServerError()
149 return common.NewUnsubscribeNoContent()
152 server := restapi.NewServer(api)
153 defer server.Shutdown()
154 server.Host = r.localAddr
155 server.Port = r.localPort
157 Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
158 if err := server.Serve(); err != nil {
164 // Server interface: send notification to client
165 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, ep models.SubscriptionParamsClientEndpoint) (err error) {
166 respData, err := json.Marshal(resp)
168 Logger.Error("json.Marshal failed: %v", err)
172 clientUrl := fmt.Sprintf("http://%s:%d%s", ep.Host, *ep.HTTPPort, r.clientUrl)
174 retries := viper.GetInt("subscription.retryCount")
179 delay := viper.GetInt("subscription.retryDelay")
184 for i := 0; i < retries; i++ {
185 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
186 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
191 Logger.Error("%v", err)
193 if r != nil && r.StatusCode != http.StatusOK {
194 Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
196 time.Sleep(time.Duration(delay) * time.Second)
202 // Subscription interface for xApp: Response callback
203 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
207 // Subscription interface for xApp
208 func (r *Subscriber) Subscribe(p *apimodel.SubscriptionParams) (*apimodel.SubscriptionResponse, error) {
209 params := apicommon.NewSubscribeParamsWithTimeout(r.timeout).WithSubscriptionParams(p)
210 result, err := r.CreateTransport().Common.Subscribe(params)
212 return &apimodel.SubscriptionResponse{}, err
214 return result.Payload, err
217 // Subscription interface for xApp: DELETE
218 func (r *Subscriber) Unsubscribe(subId string) error {
219 params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
220 _, err := r.CreateTransport().Common.Unsubscribe(params)
225 // Subscription interface for xApp: QUERY
226 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
227 resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
229 return models.SubscriptionList{}, err
232 defer resp.Body.Close()
234 contents, err := ioutil.ReadAll(resp.Body)
236 return models.SubscriptionList{}, err
239 subscriptions := models.SubscriptionList{}
240 err = json.Unmarshal([]byte(string(contents)), &subscriptions)
242 return models.SubscriptionList{}, err
245 return subscriptions, nil
248 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
249 return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)