From: Juha Hyttinen Date: Fri, 28 Feb 2020 06:53:33 +0000 (+0200) Subject: RIC-79 intial implementation to fetch subscriptions via rest X-Git-Tag: 0.4.0~7 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?p=ric-plt%2Fsubmgr.git;a=commitdiff_plain;h=c9eb08a02821033f198fcde1ea60279b19c5c617 RIC-79 intial implementation to fetch subscriptions via rest Change-Id: Iff2915998c130cb93fa3b1767b2499d8ef3713e5 Signed-off-by: Juha Hyttinen --- diff --git a/container-tag.yaml b/container-tag.yaml index 9309d4e..31d99a0 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -2,4 +2,4 @@ # By default this file is in the docker build directory, # but the location can configured in the JJB template. --- -tag: "4.0.0.3" +tag: "4.0.0.4" diff --git a/pkg/control/control.go b/pkg/control/control.go index 2d7d535..7b0760c 100755 --- a/pkg/control/control.go +++ b/pkg/control/control.go @@ -24,6 +24,7 @@ import ( "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" rtmgrclient "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/rtmgr_client" "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/xapptweaks" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" httptransport "github.com/go-openapi/runtime/client" "github.com/go-openapi/strfmt" @@ -66,7 +67,7 @@ type Control struct { e2ap *E2ap registry *Registry tracker *Tracker - timerMap *TimerMap + //subscriber *xapp.Subscriber } type RMRMeid struct { @@ -94,15 +95,16 @@ func NewControl() *Control { tracker := new(Tracker) tracker.Init() - timerMap := new(TimerMap) - timerMap.Init() + //subscriber := xapp.NewSubscriber(viper.GetString("subscription.host"), viper.GetInt("subscription.timeout")) c := &Control{e2ap: new(E2ap), registry: registry, tracker: tracker, - timerMap: timerMap, + //subscriber: subscriber, } c.XappWrapper.Init("") + go xapp.Subscription.Listen(c.SubscriptionHandler, c.QueryHandler) + //go c.subscriber.Listen(c.SubscriptionHandler, c.QueryHandler) return c } @@ -117,6 +119,34 @@ func (c *Control) Run() { xapp.Run(c) } +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- +func (c *Control) SubscriptionHandler(stype models.SubscriptionType, params interface{}) (models.SubscriptionResult, error) { + /* + switch p := params.(type) { + case *models.ReportParams: + trans := c.tracker.NewXappTransaction(NewRmrEndpoint(p.ClientEndpoint),"" , 0, &xapp.RMRMeid{RanName: p.Meid}) + if trans == nil { + xapp.Logger.Error("XAPP-SubReq: %s", idstring(fmt.Errorf("transaction not created"), params)) + return + } + defer trans.Release() + case *models.ControlParams: + case *models.PolicyParams: + } + */ + return models.SubscriptionResult{}, fmt.Errorf("Subscription rest interface not implemented") +} + +func (c *Control) QueryHandler() (models.SubscriptionList, error) { + return c.registry.QueryHandler() +} + +//------------------------------------------------------------------- +// +//------------------------------------------------------------------- + func (c *Control) rmrSendToE2T(desc string, subs *Subscription, trans *TransactionSubs) (err error) { params := xapptweaks.NewParams(nil) params.Mtype = trans.GetMtype() @@ -158,17 +188,17 @@ func (c *Control) Consume(params *xapp.RMRParams) (err error) { defer c.Rmr.Free(msg.Mbuf) switch msg.Mtype { - case xapp.RICMessageTypes["RIC_SUB_REQ"]: + case xapp.RIC_SUB_REQ: go c.handleXAPPSubscriptionRequest(msg) - case xapp.RICMessageTypes["RIC_SUB_RESP"]: + case xapp.RIC_SUB_RESP: go c.handleE2TSubscriptionResponse(msg) - case xapp.RICMessageTypes["RIC_SUB_FAILURE"]: + case xapp.RIC_SUB_FAILURE: go c.handleE2TSubscriptionFailure(msg) - case xapp.RICMessageTypes["RIC_SUB_DEL_REQ"]: + case xapp.RIC_SUB_DEL_REQ: go c.handleXAPPSubscriptionDeleteRequest(msg) - case xapp.RICMessageTypes["RIC_SUB_DEL_RESP"]: + case xapp.RIC_SUB_DEL_RESP: go c.handleE2TSubscriptionDeleteResponse(msg) - case xapp.RICMessageTypes["RIC_SUB_DEL_FAILURE"]: + case xapp.RIC_SUB_DEL_FAILURE: go c.handleE2TSubscriptionDeleteFailure(msg) default: xapp.Logger.Info("Unknown Message Type '%d', discarding", msg.Mtype) diff --git a/pkg/control/registry.go b/pkg/control/registry.go index f24dccd..b816b61 100644 --- a/pkg/control/registry.go +++ b/pkg/control/registry.go @@ -22,6 +22,7 @@ package control import ( "fmt" "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/models" "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" "sync" "time" @@ -46,6 +47,19 @@ func (r *Registry) Initialize() { } } +func (r *Registry) QueryHandler() (models.SubscriptionList, error) { + r.mutex.Lock() + defer r.mutex.Unlock() + + resp := models.SubscriptionList{} + for _, subs := range r.register { + subs.mutex.Lock() + resp = append(resp, &models.SubscriptionData{SubscriptionID: int64(subs.ReqId.Seq), Meid: subs.Meid.RanName, Endpoint: subs.EpList.StringList()}) + subs.mutex.Unlock() + } + return resp, nil +} + func (r *Registry) allocateSubs(trans *TransactionXapp, subReqMsg *e2ap.E2APSubscriptionRequest) (*Subscription, error) { if len(r.subIds) > 0 { sequenceNumber := r.subIds[0] diff --git a/pkg/control/types.go b/pkg/control/types.go index ab62153..5408f48 100644 --- a/pkg/control/types.go +++ b/pkg/control/types.go @@ -86,12 +86,17 @@ type RmrEndpointList struct { } func (eplist *RmrEndpointList) String() string { + valuesText := eplist.StringList() + return strings.Join(valuesText, ",") +} + +func (eplist *RmrEndpointList) StringList() []string { tmpList := eplist.Endpoints valuesText := []string{} for i := range tmpList { valuesText = append(valuesText, tmpList[i].String()) } - return strings.Join(valuesText, ",") + return valuesText } func (eplist *RmrEndpointList) Size() int { diff --git a/pkg/control/ut_messaging_test.go b/pkg/control/ut_messaging_test.go index 53d8a85..c0d76e2 100644 --- a/pkg/control/ut_messaging_test.go +++ b/pkg/control/ut_messaging_test.go @@ -22,6 +22,8 @@ package control import ( "gerrit.o-ran-sc.org/r/ric-plt/e2ap/pkg/e2ap" "gerrit.o-ran-sc.org/r/ric-plt/submgr/pkg/teststube2ap" + "gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/xapp" + "github.com/stretchr/testify/assert" "testing" ) @@ -102,20 +104,22 @@ func TestSubReqAndRouteNok(t *testing.T) { func TestSubReqAndSubDelOk(t *testing.T) { CaseBegin("TestSubReqAndSubDelOk") - waiter := rtmgrHttp.AllocNextEvent(true) cretrans := xappConn1.SendSubsReq(t, nil, nil) - waiter.WaitResult(t) crereq, cremsg := e2termConn1.RecvSubsReq(t) e2termConn1.SendSubsResp(t, crereq, cremsg) e2SubsId := xappConn1.RecvSubsResp(t, cretrans) + + resp, _ := xapp.Subscription.QuerySubscriptions() + assert.Equal(t, resp[0].SubscriptionID, int64(e2SubsId)) + assert.Equal(t, resp[0].Meid, "RAN_NAME_1") + assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560"}) + deltrans := xappConn1.SendSubsDelReq(t, nil, e2SubsId) delreq, delmsg := e2termConn1.RecvSubsDelReq(t) - waiter = rtmgrHttp.AllocNextEvent(true) e2termConn1.SendSubsDelResp(t, delreq, delmsg) xappConn1.RecvSubsDelResp(t, deltrans) - waiter.WaitResult(t) //Wait that subs is cleaned mainCtrl.wait_subs_clean(t, e2SubsId, 10) @@ -937,6 +941,11 @@ func TestSubReqAndSubDelOkSameAction(t *testing.T) { //e2termConn1.SendSubsResp(t, crereq2, cremsg2) e2SubsId2 := xappConn2.RecvSubsResp(t, cretrans2) + resp, _ := xapp.Subscription.QuerySubscriptions() + assert.Equal(t, resp[0].SubscriptionID, int64(e2SubsId1)) + assert.Equal(t, resp[0].Meid, "RAN_NAME_1") + assert.Equal(t, resp[0].Endpoint, []string{"localhost:13560", "localhost:13660"}) + //Del1 deltrans1 := xappConn1.SendSubsDelReq(t, nil, e2SubsId1) //e2termConn1.RecvSubsDelReq(t) diff --git a/test/config-file.json b/test/config-file.json index 1ff0070..a98d169 100644 --- a/test/config-file.json +++ b/test/config-file.json @@ -21,5 +21,10 @@ "HostAddr" : "localhost", "port" : "8989", "baseUrl" : "/" + }, + "subscription": { + "host": "localhost:8088", + "timeout": 2 } + }