2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
23 #include <rmr/RIC_message_types.h>
27 #cgo LDFLAGS: -lrmr_nng -lnng
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 "github.com/spf13/viper"
35 "github.com/go-openapi/strfmt"
36 httptransport "github.com/go-openapi/runtime/client"
37 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
38 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
47 rtmgrClient *RtmgrClient
49 rc_chan chan *xapp.RMRParams
58 var SubscriptionReqChan = make(chan subRouteInfo, 10)
68 viper.SetEnvPrefix("submgr")
69 viper.AllowEmptyEnv(true)
70 SEEDSN = uint16(viper.GetInt("seed_sn"))
72 rand.Seed(time.Now().UnixNano())
73 SEEDSN = uint16(rand.Intn(65535))
78 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", SEEDSN)
81 func NewControl() Control {
82 registry := new(Registry)
83 registry.Initialize(SEEDSN)
85 tracker := new(Tracker)
88 transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
89 client := rtmgrclient.New(transport, strfmt.Default)
90 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
91 delete_handle := rtmgrhandle.NewDeleteXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
92 rtmgrClient := RtmgrClient{client, handle, delete_handle}
94 return Control{new(E2ap), registry, &rtmgrClient, tracker, make(chan *xapp.RMRParams)}
97 func (c *Control) Run() {
102 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
107 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
108 if !xapp.Rmr.Send(params, false) {
109 err = errors.New("rmr.Send() failed")
114 func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) {
115 if !xapp.Rmr.Send(params, true) {
116 err = errors.New("rmr.Send() failed")
121 func (c *Control) controlLoop() {
126 c.handleSubscriptionRequest(msg)
128 c.handleSubscriptionResponse(msg)
129 case C.RIC_SUB_DEL_REQ:
130 c.handleSubscriptionDeleteRequest(msg)
131 case C.RIC_SUB_DEL_RESP:
132 c.handleSubscriptionDeleteResponse(msg)
134 err := errors.New("Message Type " + strconv.Itoa(msg.Mtype) + " is discarded")
135 xapp.Logger.Error("Unknown message type: %v", err)
140 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
141 payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
143 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
146 xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
148 /* Reserve a sequence number and set it in the payload */
149 new_sub_id := c.registry.ReserveSequenceNumber()
151 _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, new_sub_id)
153 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
157 src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
159 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
163 /* Create transatcion records for every subscription request */
164 xact_key := Transaction_key{new_sub_id, CREATE}
165 xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
166 err = c.tracker.Track_transaction(xact_key, xact_value)
168 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
172 /* Update routing manager about the new subscription*/
173 sub_route_action := subRouteInfo{CREATE, *src_addr, *src_port, new_sub_id }
174 go c.rtmgrClient.SubscriptionRequestUpdate()
175 SubscriptionReqChan <- sub_route_action
177 // Setting new subscription ID in the RMR header
178 params.SubId = int(new_sub_id)
180 xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
182 xapp.Logger.Info("--- Debugging transaction table = %v", c.tracker.transaction_table)
186 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
187 payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
189 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
192 xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
193 if !c.registry.IsValidSequenceNumber(payload_seq_num) {
194 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payload_seq_num)) + " in Subscritpion Response. Message discarded.")
197 c.registry.setSubscriptionToConfirmed(payload_seq_num)
198 xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
199 transaction, err := c.tracker.complete_transaction(payload_seq_num, DELETE)
201 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
204 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
205 params.Mbuf = transaction.Mbuf
206 c.rmrReplyToSender(params)
210 func (act Action) String() string {
211 actions := [...]string{
217 if act < CREATE || act > DELETE {
223 func (act Action) valid() bool {
225 case CREATE, MERGE, DELETE:
232 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
233 payload_seq_num, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
235 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
238 xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
239 if c.registry.IsValidSequenceNumber(payload_seq_num) {
240 c.registry.deleteSubscription(payload_seq_num)
241 trackErr := c.trackDeleteTransaction(params, payload_seq_num)
243 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
247 xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))
252 func (c *Control) trackDeleteTransaction(params *xapp.RMRParams, payload_seq_num uint16) (err error) {
253 src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
254 xact_key := Transaction_key{payload_seq_num, DELETE}
255 xact_value := Transaction{*src_addr, *src_port, params.Payload, params.Mbuf}
256 err = c.tracker.Track_transaction(xact_key, xact_value)
260 func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) {
261 payload_seq_num, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload)
263 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
266 var transaction , _= c.tracker.Retrive_transaction(payload_seq_num, DELETE)
267 sub_route_action := subRouteInfo{DELETE, transaction.Xapp_instance_address, transaction.Xapp_port, payload_seq_num }
268 go c.rtmgrClient.SubscriptionRequestUpdate()
269 SubscriptionReqChan <- sub_route_action
271 xapp.Logger.Info("Subscription Delete Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
272 if c.registry.releaseSequenceNumber(payload_seq_num) {
273 transaction, err = c.tracker.complete_transaction(payload_seq_num, DELETE)
275 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
278 xapp.Logger.Info("Subscription ID: %v, from address: %v:%v. Forwarding to E2 Termination...", int(payload_seq_num), transaction.Xapp_instance_address, transaction.Xapp_port)
279 //params.Src = xAddress + ":" + strconv.Itoa(int(xPort))
280 params.Mbuf = transaction.Mbuf
281 c.rmrReplyToSender(params)