# By default this file is in the docker build directory,
# but the location can configured in the JJB template.
---
-tag: "4.0.0.3"
+tag: "4.0.0.4"
"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client"
"gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
httptransport "github.com/go-openapi/runtime/client"
"github.com/go-openapi/strfmt"
e2ap *E2ap
registry *Registry
tracker *Tracker
- timerMap *TimerMap
+ //subscriber *xapp.Subscriber
}
type RMRMeid struct {
tracker := new(Tracker)
tracker.Init()
- timerMap := new(TimerMap)
- timerMap.Init()
+ //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout"))
c := &Control{e2ap: new(E2ap),
registry: registry,
tracker: tracker,
- timerMap: timerMap,
+ //subscriber: subscriber,
}
c.XappWrapper.Init("")
+ go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler)
+ //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler)
return c
}
xapp.Run(c)
}
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (models.SubscriptionResult, error) {
+ /*
+ switch p := params.(type) {
+ case *models.ReportParams:
+ trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid})
+ if trans == nil {
+ xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params))
+ return
+ }
+ defer trans.Release()
+ case *models.ControlParams:
+ case *models.PolicyParams:
+ }
+ */
+ return models.SubscriptionResult{}, fmt.Errorf("Subscription rest interface not implemented")
+}
+
+func (c *Control) QueryHandler() (models.SubscriptionList, error) {
+ return c.registry.QueryHandler()
+}
+
+//-------------------------------------------------------------------
+//
+//-------------------------------------------------------------------
+
func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) {
params := xapptweaks.NewParams(nil)
params.Mtype = trans.GetMtype()
defer c.Rmr.Free(msg.Mbuf)
switch msg.Mtype {
- case xapp.RICMessageTypes["RIC_SUB_REQ"]:
+ case xapp.RIC_SUB_REQ:
go c.handleXAPPSubscriptionRequest(msg)
- case xapp.RICMessageTypes["RIC_SUB_RESP"]:
+ case xapp.RIC_SUB_RESP:
go c.handleE2TSubscriptionResponse(msg)
- case xapp.RICMessageTypes["RIC_SUB_FAILURE"]:
+ case xapp.RIC_SUB_FAILURE:
go c.handleE2TSubscriptionFailure(msg)
- case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]:
+ case xapp.RIC_SUB_DEL_REQ:
go c.handleXAPPSubscriptionDeleteRequest(msg)
- case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]:
+ case xapp.RIC_SUB_DEL_RESP:
go c.handleE2TSubscriptionDeleteResponse(msg)
- case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]:
+ case xapp.RIC_SUB_DEL_FAILURE:
go c.handleE2TSubscriptionDeleteFailure(msg)
default:
xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype)
import (
"fmt"
"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models"
"gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
"sync"
"time"
}
}
+func (r *Registry) QueryHandler() (models.SubscriptionList, error) {
+ r.mutex.Lock()
+ defer r.mutex.Unlock()
+
+ resp := models.SubscriptionList{}
+ for _, subs := range r.register {
+ subs.mutex.Lock()
+ resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.Seq), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()})
+ subs.mutex.Unlock()
+ }
+ return resp, nil
+}
+
func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) {
if len(r.subIds) > 0 {
sequenceNumber := r.subIds[0]
}
func (eplist *RmrEndpointList) String() string {
+ valuesText := eplist.StringList()
+ return strings.Join(valuesText, ",")
+}
+
+func (eplist *RmrEndpointList) StringList() []string {
tmpList := eplist.Endpoints
valuesText := []string{}
for i := range tmpList {
valuesText = append(valuesText, tmpList[i].String())
}
- return strings.Join(valuesText, ",")
+ return valuesText
}
func (eplist *RmrEndpointList) Size() int {
import (
"gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap"
"gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/teststube2ap"
+ "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp"
+ "github.com/stretchr/testify/assert"
"testing"
)
func TestSubReqAndSubDelOk(t *testing.T) {
CaseBegin("TestSubReqAndSubDelOk")
- waiter := rtmgrHttp.AllocNextEvent(true)
cretrans := xappConn1.SendSubsReq(t, nil, nil)
- waiter.WaitResult(t)
crereq, cremsg := e2termConn1.RecvSubsReq(t)
e2termConn1.SendSubsResp(t, crereq, cremsg)
e2SubsId := xappConn1.RecvSubsResp(t, cretrans)
+
+ resp, _ := xapp.Subscription.QuerySubscriptions()
+ assert.Equal(t, resp[0].SubscriptionID, int64(e2SubsId))
+ assert.Equal(t, resp[0].Meid, "RAN_NAME_1")
+ assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560"})
+
deltrans := xappConn1.SendSubsDelReq(t, nil, e2SubsId)
delreq, delmsg := e2termConn1.RecvSubsDelReq(t)
- waiter = rtmgrHttp.AllocNextEvent(true)
e2termConn1.SendSubsDelResp(t, delreq, delmsg)
xappConn1.RecvSubsDelResp(t, deltrans)
- waiter.WaitResult(t)
//Wait that subs is cleaned
mainCtrl.wait_subs_clean(t, e2SubsId, 10)
//e2termConn1.SendSubsResp(t, crereq2, cremsg2)
e2SubsId2 := xappConn2.RecvSubsResp(t, cretrans2)
+ resp, _ := xapp.Subscription.QuerySubscriptions()
+ assert.Equal(t, resp[0].SubscriptionID, int64(e2SubsId1))
+ assert.Equal(t, resp[0].Meid, "RAN_NAME_1")
+ assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560", "localhost:13660"})
+
//Del1
deltrans1 := xappConn1.SendSubsDelReq(t, nil, e2SubsId1)
//e2termConn1.RecvSubsDelReq(t)
"HostAddr" : "localhost",
"port" : "8989",
"baseUrl" : "/"
+ },
+ "subscription": {
+ "host": "localhost:8088",
+ "timeout": 2
}
+
}