2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
23 #include <rmr/RIC_message_types.h>
27 #cgo LDFLAGS: -lrmr_nng -lnng
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 "github.com/spf13/viper"
35 "github.com/go-openapi/strfmt"
36 httptransport "github.com/go-openapi/runtime/client"
37 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
38 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
47 rtmgrClient *RtmgrClient
56 type RMRParams struct {
68 var SubscriptionReqChan = make(chan subRouteInfo, 10)
78 viper.SetEnvPrefix("submgr")
79 viper.AllowEmptyEnv(true)
80 SEEDSN = uint16(viper.GetInt("seed_sn"))
82 rand.Seed(time.Now().UnixNano())
83 SEEDSN = uint16(rand.Intn(65535))
88 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", SEEDSN)
91 func NewControl() Control {
92 registry := new(Registry)
93 registry.Initialize(SEEDSN)
95 tracker := new(Tracker)
98 transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
99 client := rtmgrclient.New(transport, strfmt.Default)
100 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
101 rtmgrClient := RtmgrClient{client, handle}
103 return Control{new(E2ap), registry, &rtmgrClient, tracker}
106 func (c *Control) Run() {
110 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
113 err = c.handleSubscriptionRequest(rp)
115 err = c.handleSubscriptionResponse(rp)
116 case C.RIC_SUB_DEL_REQ:
117 err = c.handleSubscriptionDeleteRequest(rp)
119 err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
124 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
125 if !xapp.Rmr.Send(params, false) {
126 err = errors.New("rmr.Send() failed")
131 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
132 payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
134 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
137 xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
139 /* Reserve a sequence number and set it in the payload */
140 new_sub_id := c.registry.ReserveSequenceNumber()
142 _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, new_sub_id)
144 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
148 src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
150 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
154 /* Create transatcion records for every subscription request */
155 xact_key := Transaction_key{new_sub_id, CREATE}
156 xact_value := Transaction{*src_addr, *src_port, params.Payload}
157 err = c.tracker.Track_transaction(xact_key, xact_value)
159 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
163 /* Update routing manager about the new subscription*/
164 sub_route_action := subRouteInfo{CREATE, *src_addr, *src_port, new_sub_id }
165 go c.rtmgrClient.SubscriptionRequestUpdate()
166 SubscriptionReqChan <- sub_route_action
168 // Setting new subscription ID in the RMR header
169 params.SubId = int(new_sub_id)
171 xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
176 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
177 payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
179 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
182 xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
183 if !c.registry.IsValidSequenceNumber(payload_seq_num) {
184 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payload_seq_num)) + " in Subscritpion Response. Message discarded.")
187 c.registry.setSubscriptionToConfirmed(payload_seq_num)
188 xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
193 func (act Action) String() string {
194 actions := [...]string{
200 if act < CREATE || act > DELETE {
206 func (act Action) valid() bool {
208 case CREATE, MERGE, DELETE:
215 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
216 payload_seq_num, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
218 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
221 xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
222 if c.registry.IsValidSequenceNumber(payload_seq_num) {
223 c.registry.deleteSubscription(payload_seq_num)
225 xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))