2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
23 #include <rmr/RIC_message_types.h>
27 #cgo LDFLAGS: -lrmr_nng -lnng
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 "github.com/spf13/viper"
35 "github.com/go-openapi/strfmt"
36 httptransport "github.com/go-openapi/runtime/client"
37 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
38 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
47 rtmgrClient *RtmgrClient
57 var SubscriptionReqChan = make(chan subRouteInfo, 10)
67 viper.SetEnvPrefix("submgr")
68 viper.AllowEmptyEnv(true)
69 SEEDSN = uint16(viper.GetInt("seed_sn"))
71 rand.Seed(time.Now().UnixNano())
72 SEEDSN = uint16(rand.Intn(65535))
77 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", SEEDSN)
80 func NewControl() Control {
81 registry := new(Registry)
82 registry.Initialize(SEEDSN)
84 tracker := new(Tracker)
87 transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
88 client := rtmgrclient.New(transport, strfmt.Default)
89 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
90 delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
91 rtmgrClient := RtmgrClient{client, handle, delete_handle}
93 return Control{new(E2ap), registry, &rtmgrClient, tracker}
96 func (c *Control) Run() {
100 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
103 err = c.handleSubscriptionRequest(rp)
105 err = c.handleSubscriptionResponse(rp)
106 case C.RIC_SUB_DEL_REQ:
107 err = c.handleSubscriptionDeleteRequest(rp)
108 case C.RIC_SUB_DEL_RESP:
109 err = c.handleSubscriptionDeleteResponse(rp)
111 err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
116 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
117 if !xapp.Rmr.Send(params, false) {
118 err = errors.New("rmr.Send() failed")
123 func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
124 if !xapp.Rmr.Send(params, true) {
125 err = errors.New("rmr.Send() failed")
130 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
131 payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
133 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
136 xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
138 /* Reserve a sequence number and set it in the payload */
139 new_sub_id := c.registry.ReserveSequenceNumber()
141 _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, new_sub_id)
143 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
147 src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
149 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
153 /* Create transatcion records for every subscription request */
154 xact_key := Transaction_key{new_sub_id, CREATE}
155 xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
156 err = c.tracker.Track_transaction(xact_key, xact_value)
158 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
162 /* Update routing manager about the new subscription*/
163 sub_route_action := subRouteInfo{CREATE, *src_addr, *src_port, new_sub_id }
164 go c.rtmgrClient.SubscriptionRequestUpdate()
165 SubscriptionReqChan <- sub_route_action
167 // Setting new subscription ID in the RMR header
168 params.SubId = int(new_sub_id)
170 xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
172 xapp.Logger.Info("--- Debugging transaction table = %v", c.tracker.transaction_table)
176 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
177 payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
179 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
182 xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
183 if !c.registry.IsValidSequenceNumber(payload_seq_num) {
184 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payload_seq_num)) + " in Subscritpion Response. Message discarded.")
187 c.registry.setSubscriptionToConfirmed(payload_seq_num)
188 xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
189 transaction, err := c.tracker.complete_transaction(payload_seq_num, DELETE)
191 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
194 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
195 params.Mbuf = transaction.Mbuf
196 c.rmrReplyToSender(params)
200 func (act Action) String() string {
201 actions := [...]string{
207 if act < CREATE || act > DELETE {
213 func (act Action) valid() bool {
215 case CREATE, MERGE, DELETE:
222 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
223 payload_seq_num, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
225 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
228 xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
229 if c.registry.IsValidSequenceNumber(payload_seq_num) {
230 c.registry.deleteSubscription(payload_seq_num)
231 trackErr := c.trackDeleteTransaction(params, payload_seq_num)
233 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
237 xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
242 func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payload_seq_num uint16) (err error) {
243 src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
244 xact_key := Transaction_key{payload_seq_num, DELETE}
245 xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
246 err = c.tracker.Track_transaction(xact_key, xact_value)
250 func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
251 payload_seq_num, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
253 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
256 var transaction , _= c.tracker.Retrive_transaction(payload_seq_num, DELETE)
257 sub_route_action := subRouteInfo{DELETE, transaction.Xapp_instance_address, transaction.Xapp_port, payload_seq_num }
258 go c.rtmgrClient.SubscriptionRequestUpdate()
259 SubscriptionReqChan <- sub_route_action
261 xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
262 if c.registry.releaseSequenceNumber(payload_seq_num) {
263 transaction, err = c.tracker.complete_transaction(payload_seq_num, DELETE)
265 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
268 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
269 //params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
270 params.Mbuf = transaction.Mbuf
271 c.rmrReplyToSender(params)