X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pkg%2Fcontrol%2Fcontrol.go;h=9a5022fc885319623947d6cf8f75bd31676eb51b;hb=fbc56f981b7bd3387f7326401ceccbf31893a2ed;hp=d213c65e1c1179a8261bb4abf43f924e4f9c7cf3;hpb=303e57cbddf5d0d5e77e2bf7ec60643fcf525419;p=ric-plt%2Fsubmgr.git diff --git a/pkg/control/control.go b/pkg/control/control.go index d213c65..9a5022f 100644 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -27,69 +27,97 @@ package control */ import "C" - import ( - "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" - "errors" - "strconv" + "errors" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "github.com/spf13/viper" + "math/rand" + "strconv" + "time" ) type Control struct { - e2ap *E2ap - registry *Registry + e2ap *E2ap + registry *Registry +} + +var SEEDSN uint16 + +func init() { + viper.AutomaticEnv() + viper.SetEnvPrefix("submgr") + viper.AllowEmptyEnv(true) + SEEDSN = uint16(viper.GetInt("seed_sn")) + if SEEDSN == 0 { + rand.Seed(time.Now().UnixNano()) + SEEDSN = uint16(rand.Intn(65535)) + } + if SEEDSN > 65535 { + SEEDSN = 0 + } + xapp.Logger.Info("SUBMGR: Initial Sequence Number: %v", SEEDSN) } func NewControl() Control { - return Control{new(E2ap),new(Registry)} + registry := new(Registry) + registry.Initialize(SEEDSN) + return Control{new(E2ap), registry} } func (c *Control) Run() { - xapp.Run(c) + xapp.Run(c) } func (c *Control) Consume(mtype, sub_id int, len int, payload []byte) (err error) { - switch mtype { - case C.RIC_SUB_REQ: - err = c.handleSubscriptionRequest(&RmrDatagram{mtype, sub_id, payload}) - case C.RIC_SUB_RESP: - err = c.handleSubscriptionResponse(&RmrDatagram{mtype, sub_id, payload}) - default: - err = errors.New("Message Type "+strconv.Itoa(mtype)+" discarded") - } - return + switch mtype { + case C.RIC_SUB_REQ: + err = c.handleSubscriptionRequest(&RmrDatagram{mtype, uint16(sub_id), payload}) + case C.RIC_SUB_RESP: + err = c.handleSubscriptionResponse(&RmrDatagram{mtype, uint16(sub_id), payload}) + default: + err = errors.New("Message Type " + strconv.Itoa(mtype) + " is discarded") + } + return } func (c *Control) rmrSend(datagram *RmrDatagram) (err error) { - if !xapp.Rmr.Send(datagram.MessageType, datagram.SubscriptionId, len(datagram.Payload), datagram.Payload) { - err = errors.New("rmr.Send() failed") - } - return + if !xapp.Rmr.Send(datagram.MessageType, int(datagram.SubscriptionId), len(datagram.Payload), datagram.Payload) { + err = errors.New("rmr.Send() failed") + } + return } -func (c *Control) handleSubscriptionRequest(datagram *RmrDatagram) ( err error) { - /* TODO: removed to being able to integrate with UEMGR - content, err := c.e2ap.GetPayloadContent(datagram.Payload) - */ - xapp.Logger.Info("Subscription Request Message received with ID: %v", datagram.SubscriptionId) - new_sub_id := c.registry.GetSubscriptionId() - /* TODO: removed to being able to integrate with UEMGR - payload, err := c.e2ap.SetSubscriptionSequenceNumber(datagram.Payload, new_sub_id) - if err != nil { - xapp.Logger.Error("Unable to set Subscription Sequence Number in Payload due to: "+ err.Error()) - return - } - */ - xapp.Logger.Info("New Subscription Registered, forwarding to E2T") - c.rmrSend(&RmrDatagram{C.RIC_SUB_REQ , new_sub_id, datagram.Payload}) - return +func (c *Control) handleSubscriptionRequest(datagram *RmrDatagram) (err error) { + payload_seq_num, err := c.e2ap.GetSubscriptionRequestSequenceNumber(datagram.Payload) + if err != nil { + err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error()) + return + } + xapp.Logger.Info("Subscription Request Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", datagram.SubscriptionId, payload_seq_num) + new_sub_id := c.registry.ReserveSequenceNumber() + payload, err := c.e2ap.SetSubscriptionRequestSequenceNumber(datagram.Payload, new_sub_id) + if err != nil { + err = errors.New("Unable to set Subscription Sequence Number in Payload due to: " + err.Error()) + return + } + xapp.Logger.Info("Generated ID: %v. Forwarding to E2 Termination...", int(new_sub_id)) + c.rmrSend(&RmrDatagram{C.RIC_SUB_REQ, new_sub_id, payload}) + return } -func (c *Control) handleSubscriptionResponse(datagram *RmrDatagram) ( err error) { - /* TODO: removed to being able to integrate with UEMGR - content, err := c.e2ap.GetPayloadContent(datagram.Payload) - */ - xapp.Logger.Info("Subscription Response Message received with ID: %v", datagram.SubscriptionId) - xapp.Logger.Info("Subscription Response Registered, forwarding to Requestor") - c.rmrSend(&RmrDatagram{C.RIC_SUB_RESP , datagram.SubscriptionId, datagram.Payload}) - return -} \ No newline at end of file +func (c *Control) handleSubscriptionResponse(datagram *RmrDatagram) (err error) { + payload_seq_num, err := c.e2ap.GetSubscriptionResponseSequenceNumber(datagram.Payload) + if err != nil { + err = errors.New("Unable to get Subscription Sequence Number from Payload due to: " + err.Error()) + return + } + xapp.Logger.Info("Subscription Response Received. RMR SUBSCRIPTION_ID: %v | PAYLOAD SEQUENCE_NUMBER: %v", datagram.SubscriptionId, payload_seq_num) + if !c.registry.IsValidSequenceNumber(payload_seq_num) { + err = errors.New("Unknown Subscription ID: " + strconv.Itoa(int(payload_seq_num)) + " in Subscritpion Response. Message discarded.") + return + } + c.registry.setSubscriptionToConfirmed(payload_seq_num) + xapp.Logger.Info("Subscription Response Registered. Forwarding to Requestor...") + c.rmrSend(&RmrDatagram{C.RIC_SUB_RESP, payload_seq_num, datagram.Payload}) + return +}