2 ==================================================================================
3 Copyright (c) 2019 AT&T Intellectual Property.
4 Copyright (c) 2019 Nokia
6 Licensed under the Apache License, Version 2.0 (the "License");
7 you may not use this file except in compliance with the License.
8 You may obtain a copy of the License at
10 http://www.apache.org/licenses/LICENSE-2.0
12 Unless required by applicable law or agreed to in writing, software
13 distributed under the License is distributed on an "AS IS" BASIS,
14 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 See the License for the specific language governing permissions and
16 limitations under the License.
17 ==================================================================================
23 #include <rmr/RIC_message_types.h>
27 #cgo LDFLAGS: -lrmr_nng -lnng
33 "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
34 "github.com/spf13/viper"
35 "github.com/go-openapi/strfmt"
36 httptransport "github.com/go-openapi/runtime/client"
37 rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
38 rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle"
47 rtmgrClient *RtmgrClient
56 type RMRParams struct {
68 var SubscriptionReqChan = make(chan subRouteInfo, 10)
78 viper.SetEnvPrefix("submgr")
79 viper.AllowEmptyEnv(true)
80 SEEDSN = uint16(viper.GetInt("seed_sn"))
82 rand.Seed(time.Now().UnixNano())
83 SEEDSN = uint16(rand.Intn(65535))
88 xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", SEEDSN)
91 func NewControl() Control {
92 registry := new(Registry)
93 registry.Initialize(SEEDSN)
95 transport := httptransport.New(viper.GetString("rtmgr.HostAddr") + ":" + viper.GetString("rtmgr.port"), viper.GetString("rtmgr.baseUrl"), []string{"http"})
96 client := rtmgrclient.New(transport, strfmt.Default)
97 handle := rtmgrhandle.NewProvideXappSubscriptionHandleParamsWithTimeout(10 * time.Second)
98 rtmgrClient := RtmgrClient{client, handle}
100 return Control{new(E2ap), registry, &rtmgrClient, new(Tracker)}
103 func (c *Control) Run() {
107 func (c *Control) Consume(rp *xapp.RMRParams) (err error) {
110 err = c.handleSubscriptionRequest(rp)
112 err = c.handleSubscriptionResponse(rp)
113 case C.RIC_SUB_DEL_REQ:
114 err = c.handleSubscriptionDeleteRequest(rp)
116 err = errors.New("Message Type " + strconv.Itoa(rp.Mtype) + " is discarded")
121 func (c *Control) rmrSend(params *xapp.RMRParams) (err error) {
122 if !xapp.Rmr.Send(params, false) {
123 err = errors.New("rmr.Send() failed")
128 func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) (err error) {
129 payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(params.Payload)
131 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
134 xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
136 /* Reserve a sequence number and set it in the payload */
137 new_sub_id := c.registry.ReserveSequenceNumber()
139 _, err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, new_sub_id)
141 err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error())
145 src_addr, src_port, err := c.rtmgrClient.SplitSource(params.Src)
147 xapp.Logger.Error("Failed to update routing-manager about the subscription request with reason: %s", err)
151 /* Create transatcion records for every subscription request */
152 xact_key := Transaction_key{new_sub_id, CREATE}
153 xact_value := Transaction{*src_addr, *src_port, params.Payload}
154 err = c.tracker.Track_transaction(xact_key, &xact_value)
156 xapp.Logger.Error("Failed to create a transaction record due to %v", err)
160 /* Update routing manager about the new subscription*/
161 sub_route_action := subRouteInfo{CREATE, *src_addr, *src_port, new_sub_id }
162 go c.rtmgrClient.SubscriptionRequestUpdate()
163 SubscriptionReqChan <- sub_route_action
165 // Setting new subscription ID in the RMR header
166 params.SubId = int(new_sub_id)
168 xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id))
173 func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) (err error) {
174 payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload)
176 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
179 xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
180 if !c.registry.IsValidSequenceNumber(payload_seq_num) {
181 err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payload_seq_num)) + " in Subscritpion Response. Message discarded.")
184 c.registry.setSubscriptionToConfirmed(payload_seq_num)
185 xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...")
190 func (act Action) String() string {
191 actions := [...]string{
197 if act < CREATE || act > DELETE {
203 func (act Action) valid() bool {
205 case CREATE, MERGE, DELETE:
212 func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) (err error) {
213 payload_seq_num, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload)
215 err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error())
218 xapp.Logger.Info("Subscription Delete Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", params.SubId, payload_seq_num)
219 if c.registry.IsValidSequenceNumber(payload_seq_num) {
220 c.registry.deleteSubscription(payload_seq_num)
222 xapp.Logger.Info("Subscription ID: %v. Forwarding to E2 Termination...", int(payload_seq_num))