From: Juha Hyttinen Date: Mon, 13 Jan 2020 11:02:26 +0000 (+0200) Subject: RICPLT-2962 Preparation for subs merge X-Git-Tag: 0.4.0~49 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=e406a34d5547107533e65ddfbb2074e96d77b4b3;p=ric-plt%2Fsubmgr.git RICPLT-2962 Preparation for subs merge Change-Id: I2b2f4ddf6887a83decb0a042e5215152bcd3d555 Signed-off-by: Juha Hyttinen --- diff --git a/Dockerfile b/Dockerfile index b3ad890..1fa3ad7 100644 --- a/Dockerfile +++ b/Dockerfile @@ -26,10 +26,11 @@ RUN apt update && apt install -y iputils-ping net-tools curl tcpdump gdb WORKDIR /tmp +ARG RMRVERSION=1.13.1 # Install RMr shared library -RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_1.10.0_amd64.deb/download.deb && dpkg -i rmr_1.10.0_amd64.deb && rm -rf rmr_1.10.0_amd64.deb +RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr_${RMRVERSION}_amd64.deb && rm -rf rmr_${RMRVERSION}_amd64.deb # Install RMr development header files -RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_1.10.0_amd64.deb/download.deb && dpkg -i rmr-dev_1.10.0_amd64.deb && rm -rf rmr-dev_1.10.0_amd64.deb +RUN wget --content-disposition https://packagecloud.io/o-ran-sc/staging/packages/debian/stretch/rmr-dev_${RMRVERSION}_amd64.deb/download.deb && dpkg -i rmr-dev_${RMRVERSION}_amd64.deb && rm -rf rmr-dev_${RMRVERSION}_amd64.deb # "PULLING LOG and COMPILING LOG" #RUN git clone "https://gerrit.o-ran-sc.org/r/com/log" /opt/log && cd /opt/log && \ @@ -115,7 +116,10 @@ RUN /usr/local/go/bin/go mod tidy # unittest COPY test/config-file.json test/config-file.json ENV CFG_FILE=/opt/submgr/test/config-file.json -RUN /usr/local/go/bin/go test -count=1 -v ./pkg/control + +RUN /usr/local/go/bin/go test -test.coverprofile /tmp/submgr_cover.out -count=1 -v ./pkg/control + +RUN /usr/local/go/bin/go tool cover -html=/tmp/submgr_cover.out -o /tmp/submgr_cover.html # test formating (not important) RUN test -z "$(/usr/local/go/bin/gofmt -l pkg/control/*.go)" diff --git a/pkg/control/control.go b/pkg/control/control.go index a1b2328..18eeb4c 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -20,7 +20,8 @@ package control import ( - "errors" + "fmt" + //"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/packer" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" rtmgrhandle "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client/handle" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" @@ -32,6 +33,10 @@ import ( "time" ) +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- + var subReqTime time.Duration = 5 * time.Second var subDelReqTime time.Duration = 5 * time.Second var maxSubReqTryCount uint64 = 2 // Initial try + retry @@ -113,32 +118,64 @@ func (c *Control) Run() { xapp.Run(c) } -func (c *Control) rmrSend(params *xapp.RMRParams) (err error) { +func (c *Control) rmrSendRaw(desc string, params *RMRParams) (err error) { + + xapp.Logger.Info("%s: %s", desc, params.String()) status := false i := 1 for ; i <= 10 && status == false; i++ { c.rmrSendMutex.Lock() - status = xapp.Rmr.Send(params, false) + status = xapp.Rmr.Send(params.RMRParams, false) c.rmrSendMutex.Unlock() if status == false { - xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s", i, params.Mtype, params.SubId, params.Xid) + xapp.Logger.Info("rmr.Send() failed. Retry count %d, %s", i, params.String()) time.Sleep(500 * time.Millisecond) } } if status == false { - err = errors.New("rmr.Send() failed") + err = fmt.Errorf("rmr.Send() failed. Retry count %d, %s", i, params.String()) + xapp.Logger.Error("%s: %s", desc, err.Error()) xapp.Rmr.Free(params.Mbuf) } return } -func (c *Control) rmrReplyToSender(params *xapp.RMRParams) (err error) { - c.rmrSend(params) - return +func (c *Control) rmrSend(desc string, subs *Subscription, trans *Transaction, payload []byte, payloadLen int) (err error) { + params := &RMRParams{&xapp.RMRParams{}} + params.Mtype = trans.GetMtype() + params.SubId = int(subs.GetSubId()) + params.Xid = trans.GetXid() + params.Meid = subs.GetMeid() + params.Src = "" + params.PayloadLen = payloadLen + params.Payload = payload + params.Mbuf = nil + + return c.rmrSendRaw(desc, params) } -func (c *Control) Consume(msg *xapp.RMRParams) (err error) { +func (c *Control) rmrReplyToSender(desc string, subs *Subscription, trans *Transaction, mType int, payload []byte, payloadLen int) (err error) { + params := &RMRParams{&xapp.RMRParams{}} + params.Mtype = mType + params.SubId = int(subs.GetSubId()) + params.Xid = trans.GetXid() + params.Meid = subs.GetMeid() + params.Src = "" + params.PayloadLen = payloadLen + params.Payload = payload + params.Mbuf = nil + + return c.rmrSendRaw(desc, params) +} + +func (c *Control) Consume(params *xapp.RMRParams) (err error) { + xapp.Rmr.Free(params.Mbuf) + params.Mbuf = nil + + msg := &RMRParams{params} + c.msgCounter++ + switch msg.Mtype { case xapp.RICMessageTypes["RIC_SUB_REQ"]: go c.handleSubscriptionRequest(msg) @@ -158,10 +195,8 @@ func (c *Control) Consume(msg *xapp.RMRParams) (err error) { return nil } -func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { - xapp.Logger.Info("SubReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid) - xapp.Rmr.Free(params.Mbuf) - params.Mbuf = nil +func (c *Control) handleSubscriptionRequest(params *RMRParams) { + xapp.Logger.Info("SubReq from xapp: %s", params.String()) srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src) if err != nil { @@ -175,10 +210,47 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { return } - params.SubId = int(subs.Seq) - err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.Seq) + // + // WIP RICPLT-2979 + // + /* + e2SubReq := packerif.NewPackerSubscriptionRequest() + packedData := &packer.PackedData{} + packedData.Buf = params.Payload + err = e2SubReq.UnPack(packedData) + if err != nil { + xapp.Logger.Error("SubReq: UnPack() failed: %s", err.Error()) + } + getErr, subReq := e2SubReq.Get() + if getErr != nil { + xapp.Logger.Error("SubReq: Get() failed: %s", err.Error()) + } + + + subReq.RequestId.Seq = uint32(subs.GetSubId()) + + err = e2SubReq.Set(subReq) + if err != nil { + xapp.Logger.Error("SubReq: Set() failed: %s", err.Error()) + return + } + err, packedData = e2SubReq.Pack(nil) + if err != nil { + xapp.Logger.Error("SubReq: Pack() failed: %s", err.Error()) + return + } + + params.PayloadLen = len(packedData.Buf) + params.Payload = packedData.Buf + */ + // + // + // + + params.SubId = int(subs.GetSubId()) + err = c.e2ap.SetSubscriptionRequestSequenceNumber(params.Payload, subs.GetSubId()) if err != nil { - xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload) + xapp.Logger.Error("SubReq: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String()) c.registry.DelSubscription(subs.Seq) return } @@ -186,28 +258,30 @@ func (c *Control) handleSubscriptionRequest(params *xapp.RMRParams) { // Create transatcion record for every subscription request var forwardRespToXapp bool = true var responseReceived bool = false - _, err = c.tracker.TrackTransaction(subs, RmrEndpoint{*srcAddr, *srcPort}, params, responseReceived, forwardRespToXapp) + trans, err := c.tracker.TrackTransaction(RmrEndpoint{*srcAddr, *srcPort}, params, responseReceived, forwardRespToXapp) if err != nil { xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error()) c.registry.DelSubscription(subs.Seq) return } - // Setting new subscription ID in the RMR header - xapp.Logger.Info("SubReq: Forwarding SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", params.Mtype, params.SubId, params.Xid, params.Meid) - err = c.rmrSend(params) + err = subs.SetTransaction(trans) if err != nil { - xapp.Logger.Error("SubReq: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Error("SubReq: %s, Dropping this msg.", err.Error()) + c.registry.DelSubscription(subs.Seq) + trans.Release() + return } + + c.rmrSend("SubReq to E2T", subs, trans, params.Payload, params.PayloadLen) + c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.Seq), subReqTime, FirstTry, c.handleSubscriptionRequestTimer) - xapp.Logger.Debug("SubReq: Debugging transaction table = %v", c.tracker.transactionXappTable) + xapp.Logger.Debug("SubReq: Debugging trans table = %v", c.tracker.transactionXappTable) return } -func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) { - xapp.Logger.Info("SubResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid) - xapp.Rmr.Free(params.Mbuf) - params.Mbuf = nil +func (c *Control) handleSubscriptionResponse(params *RMRParams) { + xapp.Logger.Info("SubResp from E2T: %s", params.String()) payloadSeqNum, err := c.e2ap.GetSubscriptionResponseSequenceNumber(params.Payload) if err != nil { @@ -222,37 +296,25 @@ func (c *Control) handleSubscriptionResponse(params *xapp.RMRParams) { return } - transaction := subs.GetTransaction() + trans := subs.GetTransaction() c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum)) - responseReceived := transaction.CheckResponseReceived() + responseReceived := trans.CheckResponseReceived() if responseReceived == true { // Subscription timer already received return } - xapp.Logger.Info("SubResp: SubId: %v, from address: %s.", payloadSeqNum, transaction.RmrEndpoint) subs.Confirmed() - transaction.Release() - - params.SubId = int(payloadSeqNum) - params.Xid = transaction.OrigParams.Xid - - xapp.Logger.Info("SubResp: Forwarding Subscription Response to xApp Mtype: %v, SubId: %v, Meid: %v", params.Mtype, params.SubId, params.Meid) - err = c.rmrReplyToSender(params) - if err != nil { - xapp.Logger.Error("SubResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - } - - xapp.Logger.Info("SubResp: SubId: %v, from address: %s. Deleting transaction record", payloadSeqNum, transaction.RmrEndpoint) + trans.Release() + c.rmrReplyToSender("SubResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen) + xapp.Logger.Info("SubResp: SubId: %v, from address: %s. Deleting trans record", payloadSeqNum, trans.RmrEndpoint) return } -func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { - xapp.Logger.Info("SubFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid) - xapp.Rmr.Free(params.Mbuf) - params.Mbuf = nil +func (c *Control) handleSubscriptionFailure(params *RMRParams) { + xapp.Logger.Info("SubFail from E2T: %s", params.String()) payloadSeqNum, err := c.e2ap.GetSubscriptionFailureSequenceNumber(params.Payload) if err != nil { @@ -267,15 +329,15 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { return } - transaction := subs.GetTransaction() - if transaction == nil { - xapp.Logger.Error("SubFail: Unknown transaction. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId) + trans := subs.GetTransaction() + if trans == nil { + xapp.Logger.Error("SubFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId) return } c.timerMap.StopTimer("RIC_SUB_REQ", int(payloadSeqNum)) - responseReceived := transaction.CheckResponseReceived() + responseReceived := trans.CheckResponseReceived() if err != nil { xapp.Logger.Info("SubFail: Dropping this msg. Err: %v SubId: %v", err, payloadSeqNum) return @@ -285,21 +347,14 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { // Subscription timer already received return } - xapp.Logger.Info("SubFail: SubId: %v, from address: %s. Forwarding response to xApp", payloadSeqNum, transaction.RmrEndpoint) - - params.SubId = int(payloadSeqNum) - params.Xid = transaction.OrigParams.Xid + xapp.Logger.Info("SubFail: SubId: %v, from address: %s. Forwarding response to xApp", payloadSeqNum, trans.RmrEndpoint) - xapp.Logger.Info("SubFail: Forwarding SubFail to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) - err = c.rmrReplyToSender(params) - if err != nil { - xapp.Logger.Error("SubFail: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - } + c.rmrReplyToSender("SubFail to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen) time.Sleep(3 * time.Second) - xapp.Logger.Info("SubFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid) - transaction.Release() + xapp.Logger.Info("SubFail: Deleting trans record. SubId: %v, Xid: %s", params.SubId, params.Xid) + trans.Release() if !c.registry.DelSubscription(payloadSeqNum) { xapp.Logger.Error("SubFail: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid) } @@ -307,22 +362,21 @@ func (c *Control) handleSubscriptionFailure(params *xapp.RMRParams) { } func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCount uint64) { - subId := uint16(nbrId) - xapp.Logger.Info("handleSubTimer: SubReq timer expired. subId: %v, tryCount: %v", subId, tryCount) + xapp.Logger.Info("SubReq timeout: subId: %v, tryCount: %v", nbrId, tryCount) - subs := c.registry.GetSubscription(subId) + subs := c.registry.GetSubscription(uint16(nbrId)) if subs == nil { - xapp.Logger.Error("SubFail: Unknown payloadSeqNum. Dropping this msg. SubId: %v", subId) + xapp.Logger.Error("SubReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId) return } - transaction := subs.GetTransaction() - if transaction == nil { - xapp.Logger.Error("SubFail: Unknown transaction. Dropping this msg. SubId: %v", subId) + trans := subs.GetTransaction() + if trans == nil { + xapp.Logger.Error("SubReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId()) return } - responseReceived := transaction.CheckResponseReceived() + responseReceived := trans.CheckResponseReceived() if responseReceived == true { // Subscription Response or Failure already received @@ -330,84 +384,68 @@ func (c *Control) handleSubscriptionRequestTimer(strId string, nbrId int, tryCou } if tryCount < maxSubReqTryCount { - xapp.Logger.Info("handleSubTimer: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", transaction.OrigParams.Mtype, transaction.OrigParams.SubId, transaction.OrigParams.Xid, transaction.OrigParams.Meid) + xapp.Logger.Info("SubReq timeout: Resending SubReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.OrigParams.Mtype, subs.GetSubId(), trans.GetXid(), subs.GetMeid()) - transaction.RetryTransaction() + trans.RetryTransaction() - err := c.rmrSend(transaction.OrigParams) - if err != nil { - xapp.Logger.Error("handleSubTimer: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, transaction.OrigParams.SubId, transaction.OrigParams.Xid) - } + c.rmrSend("SubReq(SubReq timer) to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen) tryCount++ - c.timerMap.StartTimer("RIC_SUB_REQ", int(subId), subReqTime, tryCount, c.handleSubscriptionRequestTimer) + c.timerMap.StartTimer("RIC_SUB_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionRequestTimer) return } var subDelReqPayload []byte - subDelReqPayload, err := c.e2ap.PackSubscriptionDeleteRequest(transaction.OrigParams.Payload, subId) + subDelReqPayload, err := c.e2ap.PackSubscriptionDeleteRequest(trans.OrigParams.Payload, subs.GetSubId()) if err != nil { - xapp.Logger.Error("handleSubTimer: Packing SubDelReq failed. Err: %v", err) + xapp.Logger.Error("SubReq timeout: Packing SubDelReq failed. Err: %v", err) return } // Cancel failed subscription - var params xapp.RMRParams + params := &RMRParams{&xapp.RMRParams{}} params.Mtype = 12020 // RIC SUBSCRIPTION DELETE - params.SubId = int(subId) - params.Xid = transaction.OrigParams.Xid - params.Meid = transaction.OrigParams.Meid - params.Src = transaction.OrigParams.Src + params.SubId = int(subs.GetSubId()) + params.Xid = trans.GetXid() + params.Meid = subs.GetMeid() + params.Src = trans.OrigParams.Src params.PayloadLen = len(subDelReqPayload) params.Payload = subDelReqPayload params.Mbuf = nil // Delete CREATE transaction - transaction.Release() + trans.Release() - // Create DELETE transaction - _, err = c.trackDeleteTransaction(subs, ¶ms, subId, false) + // Create DELETE transaction (internal and no messages toward xapp) + var forwardRespToXapp bool = false + var respReceived bool = false + deltrans, err := c.tracker.TrackTransaction(trans.RmrEndpoint, params, respReceived, forwardRespToXapp) if err != nil { - xapp.Logger.Error("handleSubTimer: %s, Dropping this msg.", err.Error()) + xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error()) return } - xapp.Logger.Info("handleSubTimer: Sending SubDelReq to E2T: Mtype: %v, SubId: %v, Meid: %v", params.Mtype, params.SubId, params.Meid) - c.rmrSend(¶ms) + err = subs.SetTransaction(deltrans) if err != nil { - xapp.Logger.Error("handleSubTimer: Failed to send request to E2T %v. SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Error("SubReq timeout: %s, Dropping this msg.", err.Error()) + deltrans.Release() + return } - c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subId), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer) - return -} -func (act Action) String() string { - actions := [...]string{ - "CREATE", - "MERGE", - "NONE", - "DELETE", - } + c.rmrSend("SubDelReq(SubReq timer) to E2T", subs, deltrans, deltrans.OrigParams.Payload, deltrans.OrigParams.PayloadLen) - if act < CREATE || act > DELETE { - return "Unknown" - } - return actions[act] + c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer) + return } -func (act Action) valid() bool { - switch act { - case CREATE, MERGE, DELETE: - return true - default: - return false - } -} +func (c *Control) handleSubscriptionDeleteRequest(params *RMRParams) { + xapp.Logger.Info("SubDelReq from xapp: %s", params.String()) -func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) { - xapp.Logger.Info("SubDelReq received from Src: %s, Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Src, params.Mtype, params.SubId, params.Xid, params.Meid) - xapp.Rmr.Free(params.Mbuf) - params.Mbuf = nil + srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src) + if err != nil { + xapp.Logger.Error("SubDelReq: Failed to update routing-manager. Dropping this msg. Err: %s, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + return + } payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteRequestSequenceNumber(params.Payload) if err != nil { @@ -417,42 +455,36 @@ func (c *Control) handleSubscriptionDeleteRequest(params *xapp.RMRParams) { xapp.Logger.Info("SubDelReq: Received payloadSeqNum: %v", payloadSeqNum) subs := c.registry.GetSubscription(payloadSeqNum) - if subs != nil { - var forwardRespToXapp bool = true - _, err = c.trackDeleteTransaction(subs, params, payloadSeqNum, forwardRespToXapp) - if err != nil { - xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error()) - return - } - subs.UnConfirmed() - } else { + if subs == nil { xapp.Logger.Error("SubDelReq: Not valid sequence number. Dropping this msg. SubId: %v, Xid: %s", params.SubId, params.Xid) return } - xapp.Logger.Info("SubDelReq: Forwarding Request to E2T. Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) - c.rmrSend(params) + var forwardRespToXapp bool = true + var respReceived bool = false + trans, err := c.tracker.TrackTransaction(RmrEndpoint{*srcAddr, *srcPort}, params, respReceived, forwardRespToXapp) if err != nil { - xapp.Logger.Error("SubDelReq: Failed to send request to E2T. Err %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error()) + return } - c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer) - return -} -func (c *Control) trackDeleteTransaction(subs *Subscription, params *xapp.RMRParams, payloadSeqNum uint16, forwardRespToXapp bool) (transaction *Transaction, err error) { - srcAddr, srcPort, err := c.rtmgrClient.SplitSource(params.Src) + err = subs.SetTransaction(trans) if err != nil { - xapp.Logger.Error("Failed to split source address. Err: %s, SubId: %v, Xid: %s", err, payloadSeqNum, params.Xid) + xapp.Logger.Error("SubDelReq: %s, Dropping this msg.", err.Error()) + trans.Release() + return } - var respReceived bool = false - transaction, err = c.tracker.TrackTransaction(subs, RmrEndpoint{*srcAddr, *srcPort}, params, respReceived, forwardRespToXapp) + + subs.UnConfirmed() + + c.rmrSend("SubDelReq to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen) + + c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subDelReqTime, FirstTry, c.handleSubscriptionDeleteRequestTimer) return } -func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err error) { - xapp.Logger.Info("SubDelResp received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid) - xapp.Rmr.Free(params.Mbuf) - params.Mbuf = nil +func (c *Control) handleSubscriptionDeleteResponse(params *RMRParams) (err error) { + xapp.Logger.Info("SubDelResp from E2T:%s", params.String()) payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteResponseSequenceNumber(params.Payload) if err != nil { @@ -467,51 +499,41 @@ func (c *Control) handleSubscriptionDeleteResponse(params *xapp.RMRParams) (err return } - transaction := subs.GetTransaction() - if transaction == nil { - xapp.Logger.Error("SubDelResp: Unknown transaction. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId) + trans := subs.GetTransaction() + if trans == nil { + xapp.Logger.Error("SubDelResp: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId) return } - c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum)) + c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId())) - responseReceived := transaction.CheckResponseReceived() + responseReceived := trans.CheckResponseReceived() if responseReceived == true { // Subscription Delete timer already received return } - transaction.Release() - - xapp.Logger.Info("SubDelResp: SubId: %v, from address: %s. Forwarding response to xApp", payloadSeqNum, transaction.RmrEndpoint) - if transaction.ForwardRespToXapp == true { - params.SubId = int(payloadSeqNum) - params.Xid = transaction.OrigParams.Xid - xapp.Logger.Info("Forwarding SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) - err = c.rmrReplyToSender(params) - if err != nil { - xapp.Logger.Error("SubDelResp: Failed to send response to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - } + trans.Release() + if trans.ForwardRespToXapp == true { + c.rmrReplyToSender("SubDelResp to xapp", subs, trans, params.Mtype, params.Payload, params.PayloadLen) time.Sleep(3 * time.Second) } - xapp.Logger.Info("SubDelResp: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid) - if !c.registry.DelSubscription(payloadSeqNum) { - xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", params.SubId, params.Xid) + xapp.Logger.Info("SubDelResp: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid()) + if !c.registry.DelSubscription(subs.GetSubId()) { + xapp.Logger.Error("SubDelResp: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid()) return } return } -func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { - xapp.Logger.Info("SubDelFail received from Src: %s, Mtype: %v, SubId: %v, Meid: %v", params.Src, params.Mtype, params.SubId, params.Meid) - xapp.Rmr.Free(params.Mbuf) - params.Mbuf = nil +func (c *Control) handleSubscriptionDeleteFailure(params *RMRParams) { + xapp.Logger.Info("SubDelFail from E2T:%s", params.String()) payloadSeqNum, err := c.e2ap.GetSubscriptionDeleteFailureSequenceNumber(params.Payload) if err != nil { - xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, SubId: %v, Xid: %s, Payload %X", err, params.SubId, params.Xid, params.Payload) + xapp.Logger.Error("SubDelFail: Unable to get Sequence Number from Payload. Dropping this msg. Err: %v, %s", err, params.String()) return } xapp.Logger.Info("SubDelFail: Received payloadSeqNum: %v", payloadSeqNum) @@ -522,124 +544,94 @@ func (c *Control) handleSubscriptionDeleteFailure(params *xapp.RMRParams) { return } - transaction := subs.GetTransaction() - if transaction == nil { - xapp.Logger.Error("SubDelFail: Unknown transaction. Dropping this msg. PayloadSeqNum: %v, SubId: %v", payloadSeqNum, params.SubId) + trans := subs.GetTransaction() + if trans == nil { + xapp.Logger.Error("SubDelFail: Unknown trans. Dropping this msg. PayloadSeqNum: %v, SubId: %v", subs.GetSubId(), params.SubId) return } - c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(payloadSeqNum)) + c.timerMap.StopTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId())) - responseReceived := transaction.CheckResponseReceived() + responseReceived := trans.CheckResponseReceived() if responseReceived == true { // Subscription Delete timer already received return } - xapp.Logger.Info("SubDelFail: SubId: %v, from address: %s. Forwarding response to xApp", payloadSeqNum, transaction.RmrEndpoint) - - if transaction.ForwardRespToXapp == true { + if trans.ForwardRespToXapp == true { var subDelRespPayload []byte - subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponse(transaction.OrigParams.Payload, payloadSeqNum) + subDelRespPayload, err = c.e2ap.PackSubscriptionDeleteResponse(trans.OrigParams.Payload, subs.GetSubId()) if err != nil { xapp.Logger.Error("SubDelFail:Packing SubDelResp failed. Err: %v", err) return } - params.Mtype = 12021 // RIC SUBSCRIPTION DELETE RESPONSE - params.SubId = int(payloadSeqNum) - params.Xid = transaction.OrigParams.Xid - params.Meid = transaction.OrigParams.Meid - params.Src = transaction.OrigParams.Src - params.PayloadLen = len(subDelRespPayload) - params.Payload = subDelRespPayload - params.Mbuf = nil - xapp.Logger.Info("SubDelFail: Forwarding SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %v, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) - err = c.rmrReplyToSender(params) - if err != nil { - xapp.Logger.Error("SubDelFail: Failed to send SubDelResp to xApp. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - } - + // RIC SUBSCRIPTION DELETE RESPONSE + c.rmrReplyToSender("SubDelFail to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload)) time.Sleep(3 * time.Second) } - xapp.Logger.Info("SubDelFail: Deleting transaction record. SubId: %v, Xid: %s", params.SubId, params.Xid) - transaction.Release() - if !c.registry.DelSubscription(payloadSeqNum) { - xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) + xapp.Logger.Info("SubDelFail: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid()) + trans.Release() + if !c.registry.DelSubscription(subs.GetSubId()) { + xapp.Logger.Error("SubDelFail: Failed to release sequency number. Err: %v, SubId: %v, Xid: %s", err, subs.GetSubId(), trans.GetXid()) return } return } func (c *Control) handleSubscriptionDeleteRequestTimer(strId string, nbrId int, tryCount uint64) { - subId := uint16(nbrId) - xapp.Logger.Info("handleSubDelTimer: SubDelReq timer expired. subId: %v, tryCount: %v", subId, tryCount) + xapp.Logger.Info("SubDelReq timeout: subId: %v, tryCount: %v", nbrId, tryCount) - subs := c.registry.GetSubscription(subId) + subs := c.registry.GetSubscription(uint16(nbrId)) if subs == nil { - xapp.Logger.Error("handleSubDelTimer: Unknown payloadSeqNum. Dropping this msg. SubId: %v", subId) + xapp.Logger.Error("SubDelReq timeout: Unknown payloadSeqNum. Dropping this msg. SubId: %v", nbrId) return } - transaction := subs.GetTransaction() - if transaction == nil { - xapp.Logger.Error("handleSubDelTimer: Unknown transaction. Dropping this msg. SubId: %v", subId) + trans := subs.GetTransaction() + if trans == nil { + xapp.Logger.Error("SubDelReq timeout: Unknown trans. Dropping this msg. SubId: %v", subs.GetSubId()) return } - responseReceived := transaction.CheckResponseReceived() + responseReceived := trans.CheckResponseReceived() if responseReceived == true { // Subscription Delete Response or Failure already received return } if tryCount < maxSubDelReqTryCount { - xapp.Logger.Info("handleSubDelTimer: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", transaction.OrigParams.Mtype, transaction.OrigParams.SubId, transaction.OrigParams.Xid, transaction.OrigParams.Meid) + xapp.Logger.Info("SubDelReq timeout: Resending SubDelReq to E2T: Mtype: %v, SubId: %v, Xid %s, Meid %v", trans.OrigParams.Mtype, subs.GetSubId(), trans.GetXid(), subs.GetMeid()) // Set possible to handle new response for the subId - transaction.RetryTransaction() + trans.RetryTransaction() - err := c.rmrSend(transaction.OrigParams) - if err != nil { - xapp.Logger.Error("handleSubDelTimer: Failed to send request to E2T %v, SubId: %v, Xid: %s", err, transaction.OrigParams.SubId, transaction.OrigParams.Xid) - } + c.rmrSend("SubDelReq(SubDelReq timer) to E2T", subs, trans, trans.OrigParams.Payload, trans.OrigParams.PayloadLen) tryCount++ - c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subId), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer) + c.timerMap.StartTimer("RIC_SUB_DEL_REQ", int(subs.GetSubId()), subReqTime, tryCount, c.handleSubscriptionDeleteRequestTimer) return } - var params xapp.RMRParams - if transaction.ForwardRespToXapp == true { + if trans.ForwardRespToXapp == true { var subDelRespPayload []byte - subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponse(transaction.OrigParams.Payload, subId) + subDelRespPayload, err := c.e2ap.PackSubscriptionDeleteResponse(trans.OrigParams.Payload, subs.GetSubId()) if err != nil { - xapp.Logger.Error("handleSubDelTimer: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subId, transaction.OrigParams.Xid, transaction.OrigParams.Payload) + xapp.Logger.Error("SubDelReq timeout: Unable to pack payload. Dropping this this msg. Err: %v, SubId: %v, Xid: %s, Payload %x", err, subs.GetSubId(), trans.GetXid(), trans.OrigParams.Payload) return } - params.Mtype = 12021 // RIC SUBSCRIPTION DELETE RESPONSE - params.SubId = int(subId) - params.Meid = transaction.OrigParams.Meid - params.Xid = transaction.OrigParams.Xid - params.Src = transaction.OrigParams.Src - params.PayloadLen = len(subDelRespPayload) - params.Payload = subDelRespPayload - params.Mbuf = nil - - xapp.Logger.Info("handleSubDelTimer: Sending SubDelResp to xApp: Mtype: %v, SubId: %v, Xid: %s, Meid: %v", params.Mtype, params.SubId, params.Xid, params.Meid) - err = c.rmrReplyToSender(¶ms) - if err != nil { - xapp.Logger.Error("handleSubDelTimer: Failed to send response to xApp: Err: %v, SubId: %v, Xid: %s", err, params.SubId, params.Xid) - } + // RIC SUBSCRIPTION DELETE RESPONSE + c.rmrReplyToSender("SubDelResp(SubDelReq timer) to xapp", subs, trans, 12021, subDelRespPayload, len(subDelRespPayload)) time.Sleep(3 * time.Second) + } - xapp.Logger.Info("handleSubDelTimer: Deleting transaction record. SubId: %v, Xid: %s", subId, params.Xid) - transaction.Release() - if !c.registry.DelSubscription(subId) { - xapp.Logger.Error("handleSubDelTimer: Failed to release sequency number. SubId: %v, Xid: %s", subId, params.Xid) + xapp.Logger.Info("SubDelReq timeout: Deleting trans record. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid()) + trans.Release() + if !c.registry.DelSubscription(subs.GetSubId()) { + xapp.Logger.Error("SubDelReq timeout: Failed to release sequency number. SubId: %v, Xid: %s", subs.GetSubId(), trans.GetXid()) } return } diff --git a/pkg/control/main_test.go b/pkg/control/main_test.go index 155f92c..dffff77 100644 --- a/pkg/control/main_test.go +++ b/pkg/control/main_test.go @@ -21,7 +21,6 @@ package control import ( "encoding/json" - "errors" "fmt" "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_models" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" @@ -71,22 +70,23 @@ type testingRmrControl struct { rmrClientTest *xapp.RMRClient } -func (tc *testingRmrControl) RmrSend(params *xapp.RMRParams) (err error) { +func (tc *testingRmrControl) RmrSend(params *RMRParams) (err error) { // //NOTE: Do this way until xapp-frame sending is improved // + xapp.Logger.Info("(%s) RmrSend %s", tc.desc, params.String()) status := false i := 1 for ; i <= 10 && status == false; i++ { - status = tc.rmrClientTest.SendMsg(params) + status = tc.rmrClientTest.SendMsg(params.RMRParams) if status == false { - xapp.Logger.Info("rmr.Send() failed. Retry count %v, Mtype: %v, SubId: %v, Xid %s", i, params.Mtype, params.SubId, params.Xid) + xapp.Logger.Info("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String()) time.Sleep(500 * time.Millisecond) } } if status == false { - err = errors.New("rmr.Send() failed") - tc.rmrClientTest.Free(params.Mbuf) + err = fmt.Errorf("(%s) RmrSend failed. Retry count %v, %s", tc.desc, i, params.String()) + xapp.Rmr.Free(params.Mbuf) } return } @@ -105,12 +105,12 @@ func initTestingRmrControl(desc string, rtfile string, port string, stat string, // //----------------------------------------------------------------------------- type testingMessageChannel struct { - rmrConChan chan *xapp.RMRParams + rmrConChan chan *RMRParams } func initTestingMessageChannel() testingMessageChannel { mc := testingMessageChannel{} - mc.rmrConChan = make(chan *xapp.RMRParams) + mc.rmrConChan = make(chan *RMRParams) return mc } @@ -148,13 +148,16 @@ func (tc *testingXappControl) newXappTransaction(xid *string, ranname string) *x return trans } -func (tc *testingXappControl) Consume(msg *xapp.RMRParams) (err error) { +func (tc *testingXappControl) Consume(params *xapp.RMRParams) (err error) { + xapp.Rmr.Free(params.Mbuf) + params.Mbuf = nil + msg := &RMRParams{params} if strings.Contains(msg.Xid, tc.desc) { - xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid) + xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String()) tc.rmrConChan <- msg } else { - xapp.Logger.Info("(%s) Ignore mtype=%s subid=%d xid=%s, Expected xid to contain %s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid, tc.desc) + xapp.Logger.Info("(%s) Ignore %s", tc.desc, msg.String()) } return } @@ -175,8 +178,11 @@ type testingE2termControl struct { testingMessageChannel } -func (tc *testingE2termControl) Consume(msg *xapp.RMRParams) (err error) { - xapp.Logger.Info("(%s) Consume mtype=%s subid=%d xid=%s", tc.desc, xapp.RicMessageTypeToName[msg.Mtype], msg.SubId, msg.Xid) +func (tc *testingE2termControl) Consume(params *xapp.RMRParams) (err error) { + xapp.Rmr.Free(params.Mbuf) + params.Mbuf = nil + msg := &RMRParams{params} + xapp.Logger.Info("(%s) Consume %s", tc.desc, msg.String()) tc.rmrConChan <- msg return } @@ -245,6 +251,29 @@ var mainCtrl *testingMainControl func TestMain(m *testing.M) { xapp.Logger.Info("TestMain start") + //--------------------------------- + // + //--------------------------------- + http_handler := func(w http.ResponseWriter, r *http.Request) { + var req rtmgr_models.XappSubscriptionData + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + xapp.Logger.Error("%s", err.Error()) + } + xapp.Logger.Info("(http handler) handling Address=%s Port=%d SubscriptionID=%d", *req.Address, *req.Port, *req.SubscriptionID) + + w.WriteHeader(200) + } + + go func() { + http.HandleFunc("/", http_handler) + http.ListenAndServe("localhost:8989", nil) + }() + + //--------------------------------- + // + //--------------------------------- + // //Cfg creation won't work like this as xapp-frame reads it during init. // @@ -282,28 +311,60 @@ func TestMain(m *testing.M) { xapp.Logger.Info("Using cfg file %s", os.Getenv("CFG_FILE")) //--------------------------------- + // Static routetable for rmr + // // NOTE: Routing table is configured so, that responses // are duplicated to xapp1 and xapp2 instances. // If XID is not matching xapp stub will just // drop message. (Messages 12011, 12012, 12021, 12022) + // + // 14560 submgr + // 15560 e2term stub + // 13560 xapp1 stub + // 13660 xapp2 stub + // //--------------------------------- - xapp.Logger.Info("### submgr main run ###") - subsrt := `newrt|start + allrt := `newrt|start mse|12010|-1|localhost:14560 mse|12010,localhost:14560|-1|localhost:15560 mse|12011,localhost:15560|-1|localhost:14560 -mse|12011|-1|localhost:13560;localhost:13660 mse|12012,localhost:15560|-1|localhost:14560 -mse|12012|-1|localhost:13560;localhost:13660 +mse|12011,localhost:14560|-1|localhost:13660;localhost:13560 +mse|12012,localhost:14560|-1|localhost:13660;localhost:13560 mse|12020|-1|localhost:14560 mse|12020,localhost:14560|-1|localhost:15560 mse|12021,localhost:15560|-1|localhost:14560 -mse|12021|-1|localhost:13560;localhost:13660 mse|12022,localhost:15560|-1|localhost:14560 -mse|12022|-1|localhost:13560;localhost:13660 +mse|12021,localhost:14560|-1|localhost:13660;localhost:13560 +mse|12022,localhost:14560|-1|localhost:13660;localhost:13560 newrt|end ` + + //--------------------------------- + // + //--------------------------------- + xapp.Logger.Info("### submgr main run ###") + + subsrt := allrt + /* + subsrt := `newrt|start + mse|12010|-1|localhost:14560 + mse|12010,localhost:14560|-1|localhost:15560 + mse|12011,localhost:15560|-1|localhost:14560 + mse|12011|-1|localhost:13560;localhost:13660 + mse|12012,localhost:15560|-1|localhost:14560 + mse|12012|-1|localhost:13560;localhost:13660 + mse|12020|-1|localhost:14560 + mse|12020,localhost:14560|-1|localhost:15560 + mse|12021,localhost:15560|-1|localhost:14560 + mse|12021|-1|localhost:13560;localhost:13660 + mse|12022,localhost:15560|-1|localhost:14560 + mse|12022|-1|localhost:13560;localhost:13660 + newrt|end + ` + */ + subrtfilename, _ := testCreateTmpFile(subsrt) defer os.Remove(subrtfilename) mainCtrl = createNewMainControl("main", subrtfilename, "14560") @@ -313,15 +374,18 @@ newrt|end //--------------------------------- xapp.Logger.Info("### xapp1 rmr run ###") - xapprt1 := `newrt|start -mse|12010|-1|localhost:14560 -mse|12011|-1|localhost:13560 -mse|12012|-1|localhost:13560 -mse|12020|-1|localhost:14560 -mse|12021|-1|localhost:13560 -mse|12022|-1|localhost:13560 -newrt|end -` + xapprt1 := allrt + /* + xapprt1 := `newrt|start + mse|12010|-1|localhost:14560 + mse|12011|-1|localhost:13560 + mse|12012|-1|localhost:13560 + mse|12020|-1|localhost:14560 + mse|12021|-1|localhost:13560 + mse|12022|-1|localhost:13560 + newrt|end + ` + */ xapprtfilename1, _ := testCreateTmpFile(xapprt1) defer os.Remove(xapprtfilename1) @@ -333,15 +397,18 @@ newrt|end xapp.Logger.Info("### xapp2 rmr run ###") - xapprt2 := `newrt|start -mse|12010|-1|localhost:14560 -mse|12011|-1|localhost:13660 -mse|12012|-1|localhost:13660 -mse|12020|-1|localhost:14560 -mse|12021|-1|localhost:13660 -mse|12022|-1|localhost:13660 -newrt|end -` + xapprt2 := allrt + /* + xapprt2 := `newrt|start + mse|12010|-1|localhost:14560 + mse|12011|-1|localhost:13660 + mse|12012|-1|localhost:13660 + mse|12020|-1|localhost:14560 + mse|12021|-1|localhost:13660 + mse|12022|-1|localhost:13660 + newrt|end + ` + */ xapprtfilename2, _ := testCreateTmpFile(xapprt2) defer os.Remove(xapprtfilename2) @@ -352,38 +419,28 @@ newrt|end //--------------------------------- xapp.Logger.Info("### e2term rmr run ###") - e2termrt := `newrt|start -mse|12010|-1|localhost:15560 -mse|12011|-1|localhost:14560 -mse|12012|-1|localhost:14560 -mse|12020|-1|localhost:15560 -mse|12021|-1|localhost:14560 -mse|12022|-1|localhost:14560 -newrt|end -` + e2termrt := allrt + /* + e2termrt := `newrt|start + mse|12010|-1|localhost:15560 + mse|12011|-1|localhost:14560 + mse|12012|-1|localhost:14560 + mse|12020|-1|localhost:15560 + mse|12021|-1|localhost:14560 + mse|12022|-1|localhost:14560 + newrt|end + ` + */ e2termrtfilename, _ := testCreateTmpFile(e2termrt) defer os.Remove(e2termrtfilename) e2termConn = createNewE2termControl("e2termstub", e2termrtfilename, "15560", "RMRE2TERMSTUB") //--------------------------------- - // + // Stupid sleep to try improve robustness + // due: http handler and rmr routes init delays //--------------------------------- - http_handler := func(w http.ResponseWriter, r *http.Request) { - var req rtmgr_models.XappSubscriptionData - err := json.NewDecoder(r.Body).Decode(&req) - if err != nil { - xapp.Logger.Error("%s", err.Error()) - } - xapp.Logger.Info("(http handler) handling Address=%s Port=%d SubscriptionID=%d", *req.Address, *req.Port, *req.SubscriptionID) - - w.WriteHeader(200) - } - - go func() { - http.HandleFunc("/", http_handler) - http.ListenAndServe("localhost:8989", nil) - }() + <-time.After(2 * time.Second) //--------------------------------- // diff --git a/pkg/control/messaging_test.go b/pkg/control/messaging_test.go index 2becce3..1b50c8b 100644 --- a/pkg/control/messaging_test.go +++ b/pkg/control/messaging_test.go @@ -37,7 +37,7 @@ var e2asnpacker e2ap.E2APPackerIf = e2ap_wrapper.NewAsn1E2Packer() // //----------------------------------------------------------------------------- func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, oldTrans *xappTransaction) *xappTransaction { - xapp.Logger.Info("handle_xapp_subs_req") + xapp.Logger.Info("(%s) handle_xapp_subs_req", xappConn.desc) e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest() //--------------------------------- @@ -86,7 +86,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, oldTrans trans = xappConn.newXappTransaction(nil, "RAN_NAME_1") } - params := &xapp.RMRParams{} + params := &RMRParams{&xapp.RMRParams{}} params.Mtype = xapp.RIC_SUB_REQ params.SubId = -1 params.Payload = packedMsg.Buf @@ -106,7 +106,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_req(t *testing.T, oldTrans // //----------------------------------------------------------------------------- func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T, trans *xappTransaction) int { - xapp.Logger.Info("handle_xapp_subs_resp") + xapp.Logger.Info("(%s) handle_xapp_subs_resp", xappConn.desc) e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse() var e2SubsId int @@ -149,7 +149,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_resp(t *testing.T, trans *x // //----------------------------------------------------------------------------- func (xappConn *testingXappControl) handle_xapp_subs_fail(t *testing.T, trans *xappTransaction) int { - xapp.Logger.Info("handle_xapp_subs_fail") + xapp.Logger.Info("(%s) handle_xapp_subs_fail", xappConn.desc) e2SubsFail := e2asnpacker.NewPackerSubscriptionFailure() var e2SubsId int @@ -192,7 +192,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_fail(t *testing.T, trans *x // //----------------------------------------------------------------------------- func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, oldTrans *xappTransaction, e2SubsId int) *xappTransaction { - xapp.Logger.Info("handle_xapp_subs_del_req") + xapp.Logger.Info("(%s) handle_xapp_subs_del_req", xappConn.desc) e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest() //--------------------------------- @@ -218,7 +218,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, oldTr trans = xappConn.newXappTransaction(nil, "RAN_NAME_1") } - params := &xapp.RMRParams{} + params := &RMRParams{&xapp.RMRParams{}} params.Mtype = xapp.RIC_SUB_DEL_REQ params.SubId = e2SubsId params.Payload = packedMsg.Buf @@ -238,7 +238,7 @@ func (xappConn *testingXappControl) handle_xapp_subs_del_req(t *testing.T, oldTr // //----------------------------------------------------------------------------- func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T, trans *xappTransaction) { - xapp.Logger.Info("handle_xapp_subs_del_resp") + xapp.Logger.Info("(%s) handle_xapp_subs_del_resp", xappConn.desc) e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse() //--------------------------------- @@ -274,8 +274,8 @@ func (xappConn *testingXappControl) handle_xapp_subs_del_resp(t *testing.T, tran //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func (e2termConn *testingE2termControl) handle_e2term_subs_req(t *testing.T) (*e2ap.E2APSubscriptionRequest, *xapp.RMRParams) { - xapp.Logger.Info("handle_e2term_subs_req") +func (e2termConn *testingE2termControl) handle_e2term_subs_req(t *testing.T) (*e2ap.E2APSubscriptionRequest, *RMRParams) { + xapp.Logger.Info("(%s) handle_e2term_subs_req", e2termConn.desc) e2SubsReq := e2asnpacker.NewPackerSubscriptionRequest() //--------------------------------- @@ -305,8 +305,8 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_req(t *testing.T) (*e return nil, nil } -func (e2termConn *testingE2termControl) handle_e2term_subs_resp(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *xapp.RMRParams) { - xapp.Logger.Info("handle_e2term_subs_resp") +func (e2termConn *testingE2termControl) handle_e2term_subs_resp(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *RMRParams) { + xapp.Logger.Info("(%s) handle_e2term_subs_resp", e2termConn.desc) e2SubsResp := e2asnpacker.NewPackerSubscriptionResponse() //--------------------------------- @@ -340,7 +340,7 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_resp(t *testing.T, re testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error()) } - params := &xapp.RMRParams{} + params := &RMRParams{&xapp.RMRParams{}} params.Mtype = xapp.RIC_SUB_RESP params.SubId = msg.SubId params.Payload = packedMsg.Buf @@ -357,8 +357,8 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_resp(t *testing.T, re //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func (e2termConn *testingE2termControl) handle_e2term_subs_fail(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *xapp.RMRParams) { - xapp.Logger.Info("handle_e2term_subs_fail") +func (e2termConn *testingE2termControl) handle_e2term_subs_fail(t *testing.T, req *e2ap.E2APSubscriptionRequest, msg *RMRParams) { + xapp.Logger.Info("(%s) handle_e2term_subs_fail", e2termConn.desc) e2SubsFail := e2asnpacker.NewPackerSubscriptionFailure() //--------------------------------- @@ -385,7 +385,7 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_fail(t *testing.T, re testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error()) } - params := &xapp.RMRParams{} + params := &RMRParams{&xapp.RMRParams{}} params.Mtype = xapp.RIC_SUB_FAILURE params.SubId = msg.SubId params.Payload = packedMsg.Buf @@ -402,8 +402,8 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_fail(t *testing.T, re //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func (e2termConn *testingE2termControl) handle_e2term_subs_del_req(t *testing.T) (*e2ap.E2APSubscriptionDeleteRequest, *xapp.RMRParams) { - xapp.Logger.Info("handle_e2term_subs_del_req") +func (e2termConn *testingE2termControl) handle_e2term_subs_del_req(t *testing.T) (*e2ap.E2APSubscriptionDeleteRequest, *RMRParams) { + xapp.Logger.Info("(%s) handle_e2term_subs_del_req", e2termConn.desc) e2SubsDelReq := e2asnpacker.NewPackerSubscriptionDeleteRequest() //--------------------------------- @@ -441,8 +441,8 @@ func handle_e2term_recv_empty() bool { return true } -func (e2termConn *testingE2termControl) handle_e2term_subs_del_resp(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *xapp.RMRParams) { - xapp.Logger.Info("handle_e2term_subs_del_resp") +func (e2termConn *testingE2termControl) handle_e2term_subs_del_resp(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *RMRParams) { + xapp.Logger.Info("(%s) handle_e2term_subs_del_resp", e2termConn.desc) e2SubsDelResp := e2asnpacker.NewPackerSubscriptionDeleteResponse() //--------------------------------- @@ -462,7 +462,7 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_del_resp(t *testing.T testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error()) } - params := &xapp.RMRParams{} + params := &RMRParams{&xapp.RMRParams{}} params.Mtype = xapp.RIC_SUB_DEL_RESP params.SubId = msg.SubId params.Payload = packedMsg.Buf @@ -479,8 +479,8 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_del_resp(t *testing.T //----------------------------------------------------------------------------- // //----------------------------------------------------------------------------- -func (e2termConn *testingE2termControl) handle_e2term_subs_del_fail(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *xapp.RMRParams) { - xapp.Logger.Info("handle_e2term_del_subs_fail") +func (e2termConn *testingE2termControl) handle_e2term_subs_del_fail(t *testing.T, req *e2ap.E2APSubscriptionDeleteRequest, msg *RMRParams) { + xapp.Logger.Info("(%s) handle_e2term_del_subs_fail", e2termConn.desc) e2SubsDelFail := e2asnpacker.NewPackerSubscriptionDeleteFailure() //--------------------------------- @@ -502,7 +502,7 @@ func (e2termConn *testingE2termControl) handle_e2term_subs_del_fail(t *testing.T testError(t, "(%s) pack NOK %s", e2termConn.desc, packerr.Error()) } - params := &xapp.RMRParams{} + params := &RMRParams{&xapp.RMRParams{}} params.Mtype = xapp.RIC_SUB_DEL_FAILURE params.SubId = msg.SubId params.Payload = packedMsg.Buf diff --git a/pkg/control/subscription.go b/pkg/control/subscription.go index 9bbe3d4..6ed3d32 100644 --- a/pkg/control/subscription.go +++ b/pkg/control/subscription.go @@ -45,6 +45,21 @@ func (s *Subscription) String() string { return strconv.FormatUint(uint64(s.Seq), 10) + "/" + s.RmrEndpoint.String() + "/" + s.Meid.RanName } +func (s *Subscription) GetSubId() uint16 { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.Seq +} + +func (s *Subscription) GetMeid() *xapp.RMRMeid { + s.mutex.Lock() + defer s.mutex.Unlock() + if s.Meid != nil { + return s.Meid + } + return nil +} + func (s *Subscription) Confirmed() { s.mutex.Lock() defer s.mutex.Unlock() diff --git a/pkg/control/tracker.go b/pkg/control/tracker.go index 087b781..869e8ca 100644 --- a/pkg/control/tracker.go +++ b/pkg/control/tracker.go @@ -21,7 +21,6 @@ package control import ( "fmt" - "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "sync" ) @@ -37,12 +36,13 @@ func (t *Tracker) Init() { t.transactionXappTable = make(map[TransactionXappKey]*Transaction) } -func (t *Tracker) TrackTransaction(subs *Subscription, endpoint RmrEndpoint, params *xapp.RMRParams, respReceived bool, forwardRespToXapp bool) (*Transaction, error) { +func (t *Tracker) TrackTransaction(endpoint RmrEndpoint, params *RMRParams, respReceived bool, forwardRespToXapp bool) (*Transaction, error) { trans := &Transaction{ tracker: nil, Subs: nil, RmrEndpoint: endpoint, + Mtype: params.Mtype, Xid: params.Xid, OrigParams: params, RespReceived: respReceived, @@ -58,10 +58,6 @@ func (t *Tracker) TrackTransaction(subs *Subscription, endpoint RmrEndpoint, par return nil, err } - err := subs.SetTransaction(trans) - if err != nil { - return nil, err - } trans.tracker = t t.transactionXappTable[xappkey] = trans return trans, nil diff --git a/pkg/control/transaction.go b/pkg/control/transaction.go index f686b44..2f4acab 100644 --- a/pkg/control/transaction.go +++ b/pkg/control/transaction.go @@ -20,7 +20,6 @@ package control import ( - "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "strconv" "sync" ) @@ -45,8 +44,9 @@ type Transaction struct { tracker *Tracker // tracker instance Subs *Subscription RmrEndpoint RmrEndpoint - Xid string // xapp xid in req - OrigParams *xapp.RMRParams // request orginal params + Mtype int + Xid string // xapp xid in req + OrigParams *RMRParams // request orginal params RespReceived bool ForwardRespToXapp bool } @@ -61,6 +61,24 @@ func (t *Transaction) String() string { return subId + "/" + t.RmrEndpoint.String() + "/" + t.Xid } +func (t *Transaction) GetXid() string { + t.mutex.Lock() + defer t.mutex.Unlock() + return t.Xid +} + +func (t *Transaction) GetMtype() int { + t.mutex.Lock() + defer t.mutex.Unlock() + return t.Mtype +} + +func (t *Transaction) GetSrc() string { + t.mutex.Lock() + defer t.mutex.Unlock() + return t.RmrEndpoint.String() +} + func (t *Transaction) CheckResponseReceived() bool { t.mutex.Lock() defer t.mutex.Unlock() diff --git a/pkg/control/types.go b/pkg/control/types.go index 64fd15f..22c44b3 100644 --- a/pkg/control/types.go +++ b/pkg/control/types.go @@ -20,15 +20,24 @@ package control import ( + "bytes" + "fmt" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "strconv" ) +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- type RmrDatagram struct { MessageType int SubscriptionId uint16 Payload []byte } +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- type SubRouteInfo struct { Command Action Address string @@ -36,6 +45,9 @@ type SubRouteInfo struct { SubID uint16 } +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- type RmrEndpoint struct { Addr string // xapp addr Port uint16 // xapp port @@ -45,4 +57,34 @@ func (endpoint RmrEndpoint) String() string { return endpoint.Addr + ":" + strconv.FormatUint(uint64(endpoint.Port), 10) } +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- type Action int + +func (act Action) String() string { + actions := [...]string{ + "CREATE", + "MERGE", + "NONE", + "DELETE", + } + + if act < CREATE || act > DELETE { + return "UNKNOWN" + } + return actions[act] +} + +//----------------------------------------------------------------------------- +// To add own method for rmrparams +//----------------------------------------------------------------------------- +type RMRParams struct { + *xapp.RMRParams +} + +func (params *RMRParams) String() string { + var b bytes.Buffer + fmt.Fprintf(&b, "Src: %s, Mtype: %s(%d), SubId: %v, Xid: %s, Meid: %v", params.Src, xapp.RicMessageTypeToName[params.Mtype], params.Mtype, params.SubId, params.Xid, params.Meid) + return b.String() +} diff --git a/pkg/control/types_test.go b/pkg/control/types_test.go new file mode 100644 index 0000000..f29b3db --- /dev/null +++ b/pkg/control/types_test.go @@ -0,0 +1,42 @@ +/* +================================================================================== + Copyright (c) 2019 AT&T Intellectual Property. + Copyright (c) 2019 Nokia + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +================================================================================== +*/ + +package control + +import ( + "testing" +) + +func TestAction(t *testing.T) { + + testActionString := func(t *testing.T, val int, str string) { + if Action(val).String() != str { + testError(t, "String for value %d expected %s got %s", val, str, Action(val).String()) + } + } + + testActionString(t, 0, "CREATE") + testActionString(t, 1, "MERGE") + testActionString(t, 2, "NONE") + testActionString(t, 3, "DELETE") + testActionString(t, 5, "UNKNOWN") + testActionString(t, 6, "UNKNOWN") + testActionString(t, 7, "UNKNOWN") + testActionString(t, 10, "UNKNOWN") +}