2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
26 "github.com/go-openapi/loads"
27 httptransport "github.com/go-openapi/runtime/client"
28 "github.com/go-openapi/runtime/middleware"
29 "github.com/go-openapi/strfmt"
30 "github.com/spf13/viper"
37 apiclient "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi"
38 apicommon "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/common"
39 apipolicy "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/policy"
40 apireport "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientapi/report"
41 apimodel "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/clientmodel"
43 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
44 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi"
45 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations"
46 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/common"
47 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/policy"
48 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/query"
49 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/restapi/operations/report"
52 type SubscriptionHandler func(models.SubscriptionType, interface{}) (*models.SubscriptionResponse, error)
53 type SubscriptionQueryHandler func() (models.SubscriptionList, error)
54 type SubscriptionDeleteHandler func(string) error
55 type SubscriptionResponseCallback func(*apimodel.SubscriptionResponse)
57 type Subscriber struct {
65 clientCB SubscriptionResponseCallback
68 func NewSubscriber(host string, timo int) *Subscriber {
70 pltnamespace := os.Getenv("PLT_NAMESPACE")
71 if pltnamespace == "" {
72 pltnamespace = "ricplt"
74 host = fmt.Sprintf("service-%s-submgr-http.%s:8088", pltnamespace, pltnamespace)
84 remoteProt: []string{"http"},
85 timeout: time.Duration(timo) * time.Second,
88 clientUrl: "/ric/v1/subscriptions/response",
90 Resource.InjectRoute(r.clientUrl, r.ResponseHandler, "POST")
95 func (r *Subscriber) ResponseHandler(w http.ResponseWriter, req *http.Request) {
97 var resp apimodel.SubscriptionResponse
98 if err := json.NewDecoder(req.Body).Decode(&resp); err == nil {
99 if r.clientCB != nil {
107 // Server interface: listen and receive subscription requests
108 func (r *Subscriber) Listen(createSubscription SubscriptionHandler, getSubscription SubscriptionQueryHandler, delSubscription SubscriptionDeleteHandler) error {
109 swaggerSpec, err := loads.Embedded(restapi.SwaggerJSON, restapi.FlatSwaggerJSON)
114 api := operations.NewXappFrameworkAPI(swaggerSpec)
116 // Subscription: Query
117 api.QueryGetAllSubscriptionsHandler = query.GetAllSubscriptionsHandlerFunc(
118 func(p query.GetAllSubscriptionsParams) middleware.Responder {
119 if resp, err := getSubscription(); err == nil {
120 return query.NewGetAllSubscriptionsOK().WithPayload(resp)
122 return query.NewGetAllSubscriptionsInternalServerError()
125 // SubscriptionType: Report
126 api.ReportSubscribeReportHandler = report.SubscribeReportHandlerFunc(
127 func(p report.SubscribeReportParams) middleware.Responder {
128 if resp, err := createSubscription(models.SubscriptionTypeReport, p.ReportParams); err == nil {
129 return report.NewSubscribeReportCreated().WithPayload(resp)
131 return report.NewSubscribeReportInternalServerError()
134 // SubscriptionType: Policy
135 api.PolicySubscribePolicyHandler = policy.SubscribePolicyHandlerFunc(
136 func(p policy.SubscribePolicyParams) middleware.Responder {
137 if resp, err := createSubscription(models.SubscriptionTypePolicy, p.PolicyParams); err == nil {
138 return policy.NewSubscribePolicyCreated().WithPayload(resp)
140 return policy.NewSubscribePolicyInternalServerError()
143 // SubscriptionType: Delete
144 api.CommonUnsubscribeHandler = common.UnsubscribeHandlerFunc(
145 func(p common.UnsubscribeParams) middleware.Responder {
146 if err := delSubscription(p.SubscriptionID); err == nil {
147 return common.NewUnsubscribeNoContent()
149 return common.NewUnsubscribeInternalServerError()
152 server := restapi.NewServer(api)
153 defer server.Shutdown()
154 server.Host = r.localAddr
155 server.Port = r.localPort
157 Logger.Info("Serving subscriptions on %s:%d\n", server.Host, server.Port)
158 if err := server.Serve(); err != nil {
164 // Server interface: send notification to client
165 func (r *Subscriber) Notify(resp *models.SubscriptionResponse, clientEndpoint string) (err error) {
166 respData, err := json.Marshal(resp)
168 Logger.Error("json.Marshal failed: %v", err)
172 ep, _, _ := net.SplitHostPort(clientEndpoint)
173 _, port, _ := net.SplitHostPort(fmt.Sprintf(":%d", GetPortData("http").Port))
174 clientUrl := fmt.Sprintf("http://%s:%s%s", ep, port, r.clientUrl)
176 retries := viper.GetInt("subscription.retryCount")
181 delay := viper.GetInt("subscription.retryDelay")
186 for i := 0; i < retries; i++ {
187 r, err := http.Post(clientUrl, "application/json", bytes.NewBuffer(respData))
188 if err == nil && (r != nil && r.StatusCode == http.StatusOK) {
193 Logger.Error("%v", err)
195 if r != nil && r.StatusCode != http.StatusOK {
196 Logger.Error("clientUrl=%s statusCode=%d", clientUrl, r.StatusCode)
198 time.Sleep(time.Duration(delay) * time.Second)
204 // Subscription interface for xApp: Response callback
205 func (r *Subscriber) SetResponseCB(c SubscriptionResponseCallback) {
209 // Subscription interface for xApp: REPORT
210 func (r *Subscriber) SubscribeReport(p *apimodel.ReportParams) (*apimodel.SubscriptionResponse, error) {
211 params := apireport.NewSubscribeReportParamsWithTimeout(r.timeout).WithReportParams(p)
212 result, err := r.CreateTransport().Report.SubscribeReport(params)
214 return &apimodel.SubscriptionResponse{}, err
217 return result.Payload, err
220 // Subscription interface for xApp: POLICY
221 func (r *Subscriber) SubscribePolicy(p *apimodel.PolicyParams) (*apimodel.SubscriptionResponse, error) {
222 params := apipolicy.NewSubscribePolicyParamsWithTimeout(r.timeout).WithPolicyParams(p)
223 result, err := r.CreateTransport().Policy.SubscribePolicy(params)
225 return &apimodel.SubscriptionResponse{}, err
228 return result.Payload, err
231 // Subscription interface for xApp: DELETE
232 func (r *Subscriber) UnSubscribe(subId string) error {
233 params := apicommon.NewUnsubscribeParamsWithTimeout(r.timeout).WithSubscriptionID(subId)
234 _, err := r.CreateTransport().Common.Unsubscribe(params)
239 // Subscription interface for xApp: QUERY
240 func (r *Subscriber) QuerySubscriptions() (models.SubscriptionList, error) {
241 resp, err := http.Get(fmt.Sprintf("http://%s/%s/subscriptions", r.remoteHost, r.remoteUrl))
243 return models.SubscriptionList{}, err
246 defer resp.Body.Close()
248 contents, err := ioutil.ReadAll(resp.Body)
250 return models.SubscriptionList{}, err
253 subscriptions := models.SubscriptionList{}
254 err = json.Unmarshal([]byte(string(contents)), &subscriptions)
256 return models.SubscriptionList{}, err
259 return subscriptions, nil
262 func (r *Subscriber) CreateTransport() *apiclient.RICSubscription {
263 return apiclient.New(httptransport.New(r.remoteHost, r.remoteUrl, r.remoteProt), strfmt.Default)