From: wahidw Date: Sun, 13 Dec 2020 17:34:29 +0000 (+0000) Subject: Initial commit for Xapp Orchestration X-Git-Tag: 0.5.3~9 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=d872630f520b3883e5cfb1815c36aa60862623f7;p=ric-plt%2Fappmgr.git Initial commit for Xapp Orchestration Signed-off-by: wahidw Change-Id: I9bf949bf4e5b36fadd0c0ad292cf3c9c11bfbc1c --- diff --git a/Dockerfile b/Dockerfile index bace8a8..7f313d8 100755 --- a/Dockerfile +++ b/Dockerfile @@ -15,23 +15,21 @@ #----------------------------------------------------------- -ARG HELMVERSION=v2.12.3 FROM nexus3.o-ran-sc.org:10004/o-ran-sc/bldr-ubuntu18-c-go:9-u18.04 AS appmgr-build RUN apt-get update -y && apt-get install -y jq ENV PATH="/usr/local/go/bin:${PATH}" -ARG HELMVERSION # Install helm -RUN wget -nv https://get.helm.sh/helm-${HELMVERSION}-linux-amd64.tar.gz \ - && tar -zxvf helm-${HELMVERSION}-linux-amd64.tar.gz \ - && cp linux-amd64/helm /usr/local/bin/helm \ - && rm -rf helm-${HELMVERSION}-linux-amd64.tar.gz \ - && rm -rf linux-amd64 +#RUN wget -nv https://storage.googleapis.com/kubernetes-helm/helm-${HELMVERSION}-linux-amd64.tar.gz \ +# && tar -zxvf helm-${HELMVERSION}-linux-amd64.tar.gz \ +# && cp linux-amd64/helm /usr/local/bin/helm \ +# && rm -rf helm-${HELMVERSION}-linux-amd64.tar.gz \ +# && rm -rf linux-amd64 # Install kubectl from Docker Hub -COPY --from=lachlanevenson/k8s-kubectl:v1.16.0 /usr/local/bin/kubectl /usr/local/bin/kubectl +#COPY --from=lachlanevenson/k8s-kubectl:v1.16.0 /usr/local/bin/kubectl /usr/local/bin/kubectl ENV GOPATH="/go" @@ -62,7 +60,7 @@ COPY . /go/src/ws RUN GO111MODULE=on GO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o /go/src/ws/cache/go/cmd/appmgr cmd/appmgr.go # Run unit tests -RUN GO111MODULE=on GO_ENABLED=0 GOOS=linux go test -p 1 -cover ./pkg/cm/ ./pkg/helm/ ./pkg/resthooks/ +RUN GO111MODULE=on GO_ENABLED=0 GOOS=linux go test -p 1 -cover ./pkg/resthooks/ RUN gofmt -l $(find cmd/ pkg/ -name '*.go' -not -name '*_test.go') @@ -80,13 +78,11 @@ RUN apt-get update -y \ # COPY --from=appmgr-build /usr/local/include/ /usr/local/include/ COPY --from=appmgr-build /usr/local/lib/ /usr/local/lib/ -COPY --from=appmgr-build /usr/local/bin/helm /usr/local/bin/helm -COPY --from=appmgr-build /usr/local/bin/kubectl /usr/local/bin/kubectl +#COPY --from=appmgr-build /usr/local/bin/helm /usr/local/bin/helm +#COPY --from=appmgr-build /usr/local/bin/kubectl /usr/local/bin/kubectl RUN ldconfig -ARG HELMVERSION -ENV HELMVERSION=$HELMVERSION # # xApp Manager # diff --git a/api/appmgr_rest_api.yaml b/api/appmgr_rest_api.yaml index 30c5550..1e6e1fa 100755 --- a/api/appmgr_rest_api.yaml +++ b/api/appmgr_rest_api.yaml @@ -352,19 +352,22 @@ paths: description: Registration successful '400': description: Invalid input - /register/{xAppName}: - delete: + /deregister: + post: summary: Deregister an existing xApp tags: - xapp - registration operationId: deregisterXapp + consumes: + - application/json parameters: - - name: xAppName - in: path + - name: deregisterRequest + in: body description: Xapp to be unregistered required: true - type: string + schema: + $ref: '#/definitions/deregisterRequest' responses: '204': description: Successful deregistration of xApp @@ -585,9 +588,21 @@ definitions: type: string appVersion: type: string + configPath: + type: string appInstanceName: type: string httpEndpoint: type: string rmrEndpoint: type: string + deregisterRequest: + type: object + required: + - appName + - appInstanceName + properties: + appName: + type: string + appInstanceName: + type: string diff --git a/pkg/restful/restful.go b/pkg/restful/restful.go index 9124ecd..e450547 100755 --- a/pkg/restful/restful.go +++ b/pkg/restful/restful.go @@ -20,8 +20,15 @@ package restful import ( + "encoding/json" + "errors" + "fmt" + //"io/ioutil" "log" + "net/http" "os" + "strconv" + "strings" "time" "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/models" @@ -31,17 +38,27 @@ import ( "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/restapi/operations/xapp" "github.com/go-openapi/loads" "github.com/go-openapi/runtime/middleware" + "github.com/valyala/fastjson" "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr" - "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/cm" - "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/helm" "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/resthooks" ) +type XappData struct { + httpendpoint string + rmrendpoint string + status string + xappname string + xappinstname string + xappversion string + xappconfigpath string + xappInstance *models.XappInstance +} + +var xappmap = map[string]map[string]*XappData{} + func NewRestful() *Restful { r := &Restful{ - helm: helm.NewHelm(), - cm: cm.NewCM(), rh: resthooks.NewResthook(true), ready: false, } @@ -57,10 +74,40 @@ func (r *Restful) Run() { appmgr.Logger.Info("Xapp manager started ... serving on %s:%d\n", server.Host, server.Port) - go r.NotifyClients() + go r.RetrieveApps() if err := server.Serve(); err != nil { log.Fatal(err.Error()) } + +} + +func (r *Restful) RetrieveApps() { + time.Sleep(5 * time.Second) + var xlist models.RegisterRequest + applist := r.rh.GetAppsInSDL() + if applist != nil { + appmgr.Logger.Info("List obtained from GetAppsInSDL is %s", *applist) + newstring := strings.Split(*applist, " ") + for i, _ := range newstring { + appmgr.Logger.Debug("Checking for xapp %s", newstring[i]) + if newstring[i] != "" { + err := json.Unmarshal([]byte(newstring[i]), &xlist) + if err != nil { + appmgr.Logger.Error("Error while unmarshalling") + continue + } + } else { + continue //SDL may have empty item,so need to skip + } + + xmodel, _ := r.PrepareConfig(xlist, false) + if xmodel == nil { + appmgr.Logger.Error("Xapp not found, deleting it from DB") + r.rh.UpdateAppData(xlist, true) + } + } + } + } func (r *Restful) SetupHandler() *operations.AppManagerAPI { @@ -77,7 +124,7 @@ func (r *Restful) SetupHandler() *operations.AppManagerAPI { return health.NewGetHealthAliveOK() }) - api.HealthGetHealthReadyHandler = health.GetHealthReadyHandlerFunc( + api.HealthGetHealthReadyHandler = health.GetHealthReadyHandlerFunc( func(params health.GetHealthReadyParams) middleware.Responder { return health.NewGetHealthReadyOK() }) @@ -120,109 +167,202 @@ func (r *Restful) SetupHandler() *operations.AppManagerAPI { // URL: /ric/v1/xapp api.XappGetAllXappsHandler = xapp.GetAllXappsHandlerFunc( func(params xapp.GetAllXappsParams) middleware.Responder { - if result, err := r.helm.StatusAll(); err == nil { + if result, err := r.GetApps(); err == nil { return xapp.NewGetAllXappsOK().WithPayload(result) } return xapp.NewGetAllXappsInternalServerError() }) - api.XappListAllXappsHandler = xapp.ListAllXappsHandlerFunc( - func(params xapp.ListAllXappsParams) middleware.Responder { - if result := r.helm.SearchAll(); err == nil { - return xapp.NewListAllXappsOK().WithPayload(result) + api.RegisterXappHandler = operations.RegisterXappHandlerFunc( + func(params operations.RegisterXappParams) middleware.Responder { + appmgr.Logger.Info("appname is %s", (*params.RegisterRequest.AppName)) + appmgr.Logger.Info("endpoint is %s", (*params.RegisterRequest.HTTPEndpoint)) + appmgr.Logger.Info("rmrendpoint is %s", (*params.RegisterRequest.RmrEndpoint)) + if result, err := r.RegisterXapp(*params.RegisterRequest); err == nil { + go r.rh.PublishSubscription(*result, models.EventTypeDeployed) + return operations.NewRegisterXappCreated() } - return xapp.NewListAllXappsInternalServerError() + return operations.NewRegisterXappBadRequest() }) - api.XappGetXappByNameHandler = xapp.GetXappByNameHandlerFunc( - func(params xapp.GetXappByNameParams) middleware.Responder { - if result, err := r.helm.Status(params.XAppName); err == nil { - return xapp.NewGetXappByNameOK().WithPayload(&result) + api.DeregisterXappHandler = operations.DeregisterXappHandlerFunc( + func(params operations.DeregisterXappParams) middleware.Responder { + appmgr.Logger.Info("appname is %s", (*params.DeregisterRequest.AppName)) + if result, err := r.DeregisterXapp(*params.DeregisterRequest); err == nil { + go r.rh.PublishSubscription(*result, models.EventTypeUndeployed) + return operations.NewDeregisterXappNoContent() } - return xapp.NewGetXappByNameNotFound() + return operations.NewDeregisterXappBadRequest() }) - api.XappGetXappInstanceByNameHandler = xapp.GetXappInstanceByNameHandlerFunc( - func(params xapp.GetXappInstanceByNameParams) middleware.Responder { - if result, err := r.helm.Status(params.XAppName); err == nil { - for _, v := range result.Instances { - if *v.Name == params.XAppInstanceName { - return xapp.NewGetXappInstanceByNameOK().WithPayload(v) - } - } - } - return xapp.NewGetXappInstanceByNameNotFound() - }) + return api +} + +func httpGetXAppsconfig(url string) (*appmgr.RtmData, error) { + appmgr.Logger.Info("Invoked httprestful.httpGetXApps: " + url) + resp, err := http.Get(url) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode == http.StatusOK { + var data XappConfigList + appmgr.Logger.Info("http client raw response: %v", resp) + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + appmgr.Logger.Error("Json decode failed: " + err.Error()) + return nil, err + } + //data[0] assuming only for one app + str := fmt.Sprintf("%v", data[0].Config) + appmgr.Logger.Info("HTTP BODY: %v", str) + + resp.Body.Close() + + var p fastjson.Parser + var msgs appmgr.RtmData - api.XappDeployXappHandler = xapp.DeployXappHandlerFunc( - func(params xapp.DeployXappParams) middleware.Responder { - if result, err := r.helm.Install(*params.XappDescriptor); err == nil { - go r.PublishXappCreateEvent(params) - return xapp.NewDeployXappCreated().WithPayload(&result) + v, err := p.Parse(str) + if err != nil { + appmgr.Logger.Info("fastjson.Parser for failed: %v", err) + return nil, err + } + + if v.Exists("rmr") { + for _, m := range v.GetArray("rmr", "txMessages") { + msgs.TxMessages = append(msgs.TxMessages, strings.Trim(m.String(), `"`)) } - return xapp.NewUndeployXappInternalServerError() - }) - api.XappUndeployXappHandler = xapp.UndeployXappHandlerFunc( - func(params xapp.UndeployXappParams) middleware.Responder { - if result, err := r.helm.Delete(params.XAppName); err == nil { - go r.PublishXappDeleteEvent(result) - return xapp.NewUndeployXappNoContent() + for _, m := range v.GetArray("rmr", "rxMessages") { + msgs.RxMessages = append(msgs.RxMessages, strings.Trim(m.String(), `"`)) } - return xapp.NewUndeployXappInternalServerError() - }) - // URL: /ric/v1/config - api.XappGetAllXappConfigHandler = xapp.GetAllXappConfigHandlerFunc( - func(params xapp.GetAllXappConfigParams) middleware.Responder { - return xapp.NewGetAllXappConfigOK().WithPayload(r.cm.UploadConfigAll()) - }) + for _, m := range v.GetArray("rmr", "policies") { + if val, err := strconv.Atoi(strings.Trim(m.String(), `"`)); err == nil { + msgs.Policies = append(msgs.Policies, int64(val)) + } + } + } else { + for _, p := range v.GetArray("messaging", "ports") { + appmgr.Logger.Info("txMessages=%v, rxMessages=%v", p.GetArray("txMessages"), p.GetArray("rxMessages")) + for _, m := range p.GetArray("txMessages") { + msgs.TxMessages = append(msgs.TxMessages, strings.Trim(m.String(), `"`)) + } - api.XappGetConfigElementHandler = xapp.GetConfigElementHandlerFunc( - func(params xapp.GetConfigElementParams) middleware.Responder { - return xapp.NewGetConfigElementOK().WithPayload(r.cm.UploadConfigElement(params.Element)) - }) + for _, m := range p.GetArray("rxMessages") { + msgs.RxMessages = append(msgs.RxMessages, strings.Trim(m.String(), `"`)) + } - api.XappModifyXappConfigHandler = xapp.ModifyXappConfigHandlerFunc( - func(params xapp.ModifyXappConfigParams) middleware.Responder { - result, err := r.cm.UpdateConfigMap(*params.XAppConfig) - if err != nil { - if err.Error() != "Validation failed!" { - return xapp.NewModifyXappConfigInternalServerError() - } else { - return xapp.NewModifyXappConfigUnprocessableEntity() + for _, m := range p.GetArray("policies") { + if val, err := strconv.Atoi(strings.Trim(m.String(), `"`)); err == nil { + msgs.Policies = append(msgs.Policies, int64(val)) + } } } - r.rh.PublishSubscription(models.Xapp{}, models.EventTypeModified) - return xapp.NewModifyXappConfigOK().WithPayload(result) - }) + } + return &msgs, nil + } + appmgr.Logger.Info("httprestful got an unexpected http status code: %v", resp.StatusCode) + return nil, nil +} - return api +func (r *Restful) RegisterXapp(params models.RegisterRequest) (xapp *models.Xapp, err error) { + return r.PrepareConfig(params, true) } -func (r *Restful) NotifyClients() { - r.helm.Initialize() - if xapps, err := r.helm.StatusAll(); err == nil { - r.rh.NotifyClients(xapps, models.EventTypeRestarted) - r.ready = true +func (r *Restful) DeregisterXapp(params models.DeregisterRequest) (xapp *models.Xapp, err error) { + var registeredlist models.RegisterRequest + registeredlist.AppName = params.AppName + registeredlist.AppInstanceName = params.AppInstanceName + if _, found := xappmap[*params.AppName]; found { + var x models.Xapp + x.Instances = append(x.Instances, xappmap[*params.AppName][*params.AppInstanceName].xappInstance) + registeredlist.HTTPEndpoint = &xappmap[*params.AppName][*params.AppInstanceName].httpendpoint + delete(xappmap[*params.AppName], *params.AppInstanceName) + if len(xappmap[*params.AppName]) == 0 { + delete(xappmap, *params.AppName) + } + r.rh.UpdateAppData(registeredlist, true) + return &x, nil + } else { + appmgr.Logger.Error("XApp Instance %v Not Found", *params.AppName) + return nil, errors.New("XApp Instance Not Found") } } -func (r *Restful) PublishXappCreateEvent(params xapp.DeployXappParams) { - name := *params.XappDescriptor.XappName - if params.XappDescriptor.ReleaseName != "" { - name = params.XappDescriptor.ReleaseName - } +func (r *Restful) PrepareConfig(params models.RegisterRequest, updateflag bool) (xapp *models.Xapp, err error) { + maxRetries := 5 + //tmpString := strings.Split(*params.HTTPEndpoint, "//") + appmgr.Logger.Info("http endpoint is %s", *params.HTTPEndpoint) + for i := 1; i <= maxRetries; i++ { + data, err := httpGetXAppsconfig(fmt.Sprintf("http://%s%s", *params.HTTPEndpoint, params.ConfigPath)) + + if data != nil && err == nil { + appmgr.Logger.Info("iRetry Count = %v", i) + var xapp models.Xapp - for i := 0; i < 5; i++ { - time.Sleep(time.Duration(5) * time.Second) - if result, _ := r.helm.Status(name); result.Instances != nil { - r.rh.PublishSubscription(result, models.EventTypeDeployed) + xapp.Name = params.AppName + xapp.Version = params.AppVersion + //xapp.Status = params.Status + + r.rh.UpdateAppData(params, updateflag) + return r.FillInstanceData(params, &xapp, *data) break + } else if err == nil { + appmgr.Logger.Error("Unexpected HTTP status code/JSON Parsing error") + } else { + appmgr.Logger.Error("Couldn't get data due to" + err.Error()) } + time.Sleep(2 * time.Second) } + + return nil, errors.New("Unable to get configmap after 5 retries") } -func (r *Restful) PublishXappDeleteEvent(xapp models.Xapp) { - r.rh.PublishSubscription(xapp, models.EventTypeUndeployed) +func (r *Restful) FillInstanceData(params models.RegisterRequest, xapp *models.Xapp, rtData appmgr.RtmData) (xapps *models.Xapp, err error) { + + //tmpString := strings.Split(*params.RmrEndpoint, "//") + endPointStr := strings.Split(*params.RmrEndpoint, ":") + var x models.XappInstance + x.Name = params.AppInstanceName + //x.Status = strings.ToLower(params.Status) + x.Status = "deployed" + x.IP = endPointStr[0] + x.Port, _ = strconv.ParseInt(endPointStr[1], 10, 64) + x.TxMessages = rtData.TxMessages + x.RxMessages = rtData.RxMessages + x.Policies = rtData.Policies + xapp.Instances = append(xapp.Instances, &x) + + a := &XappData{httpendpoint: *params.HTTPEndpoint, rmrendpoint: *params.RmrEndpoint, status: "deployed", xappname: *params.AppName, xappversion: params.AppVersion, xappinstname: *params.AppInstanceName, xappconfigpath: params.ConfigPath, xappInstance: &x} + + if _, ok := xappmap[*params.AppName]; ok { + xappmap[*params.AppName][*params.AppInstanceName] = a + appmgr.Logger.Info("appname already present, %v", xappmap[*params.AppName]) + } else { + xappmap[*params.AppName] = make(map[string]*XappData) + xappmap[*params.AppName][*params.AppInstanceName] = a + appmgr.Logger.Info("Creating app instance, %v", xappmap[*params.AppName]) + } + + return xapp, nil + +} + +func (r *Restful) GetApps() (xapps models.AllDeployedXapps, err error) { + xapps = models.AllDeployedXapps{} + for _, v := range xappmap { + var x models.Xapp + for i, j := range v { + x.Status = j.status + x.Name = &j.xappname + x.Version = j.xappversion + appmgr.Logger.Info("Xapps details currently in map Appname = %v,rmrendpoint = %v,Status = %v", i, j.rmrendpoint, j.status) + x.Instances = append(x.Instances, j.xappInstance) + } + xapps = append(xapps, &x) + } + + return xapps, nil + } diff --git a/pkg/restful/types.go b/pkg/restful/types.go index fbbf294..7d395f4 100755 --- a/pkg/restful/types.go +++ b/pkg/restful/types.go @@ -47,3 +47,29 @@ type Restful struct { rh *resthook.Resthook ready bool } + +//Taken from xapp-frame models +type ConfigMetadata struct { + + // The type of the content + // Required: true + // Enum: [json xml other] + ConfigType *string `json:"configType"` + + // Name of the xApp + // Required: true + XappName *string `json:"xappName"` +} + +type XAppConfig struct { + + // Configuration in JSON format + // Required: true + Config interface{} `json:"config"` + + // metadata + // Required: true + Metadata *ConfigMetadata `json:"metadata"` +} + +type XappConfigList []*XAppConfig diff --git a/pkg/resthooks/resthooks.go b/pkg/resthooks/resthooks.go index 9ac49f9..6b405fc 100755 --- a/pkg/resthooks/resthooks.go +++ b/pkg/resthooks/resthooks.go @@ -22,10 +22,12 @@ package resthooks import ( "bytes" "encoding/json" + "fmt" sdl "gerrit.o-ran-sc.org/r/ric-plt/sdlgo" cmap "github.com/orcaman/concurrent-map" "github.com/segmentio/ksuid" "net/http" + "strings" "time" "gerrit.o-ran-sc.org/r/ric-plt/appmgr/pkg/appmgr" @@ -33,13 +35,14 @@ import ( ) func NewResthook(restoreData bool) *Resthook { - return createResthook(restoreData, sdl.NewSdlInstance("appmgr", sdl.NewDatabase())) + return createResthook(restoreData, sdl.NewSdlInstance("appmgr", sdl.NewDatabase()),sdl.NewSdlInstance("appdb", sdl.NewDatabase())) } -func createResthook(restoreData bool, sdlInst iSdl) *Resthook { +func createResthook(restoreData bool, sdlInst iSdl, sdlInst2 iSdl) *Resthook { rh := &Resthook{ client: &http.Client{}, db: sdlInst, + db2: sdlInst2, } if restoreData { @@ -237,3 +240,68 @@ func (rh *Resthook) FlushSubscriptions() { rh.db.RemoveAll() rh.subscriptions = cmap.New() } + +func (rh *Resthook) UpdateAppData(params models.RegisterRequest, updateflag bool) { + appmgr.Logger.Info("Endpoint to be added in SDL: %s", *params.HTTPEndpoint) + if updateflag == false { + return + } + + value, err := rh.db2.Get([]string{"endpoints"}) + if err != nil { + appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error()) + return + } + + appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"]) + var appsindb []string + var data string + dbflag := false + + if value["endpoints"] != nil { + formstring := fmt.Sprintf("%s", value["endpoints"]) + newstring := strings.Split(formstring, " ") + for i, _ := range newstring { + if len(newstring) == 1 && strings.Contains(newstring[i], *params.HTTPEndpoint) { + appmgr.Logger.Info("Removing Key %s", *params.HTTPEndpoint) + rh.db2.Remove([]string{"endpoints"}) + dbflag = true + break + } + if strings.Contains(newstring[i], *params.HTTPEndpoint) { + appmgr.Logger.Info("Removing entry %s", *params.HTTPEndpoint) + dbflag = true + continue + } + appsindb = append(appsindb, newstring[i]) + data = strings.Join(appsindb, " ") + } + rh.db2.Set("endpoints", strings.TrimSpace(data)) + } + + if dbflag == false { + xappData, err := json.Marshal(params) + if err != nil { + appmgr.Logger.Info("json.Marshal failed: %v", err) + return + } + appsindb = append(appsindb, string(xappData)) + data = strings.Join(appsindb, " ") + rh.db2.Set("endpoints", strings.TrimSpace(data)) + } +} + +func (rh *Resthook) GetAppsInSDL() *string { + value, err := rh.db2.Get([]string{"endpoints"}) + if err != nil { + appmgr.Logger.Error("DB.session.Get failed: %v ", err.Error()) + return nil + } + appmgr.Logger.Info("List of Apps in SDL: %v", value["endpoints"]) + if value["endpoints"] == nil || value["endpoints"] == "" { + return nil + } else { + apps := fmt.Sprintf("%s", value["endpoints"]) + return &apps + } +} diff --git a/pkg/resthooks/resthooks_test.go b/pkg/resthooks/resthooks_test.go index e89383f..e9ab68a 100755 --- a/pkg/resthooks/resthooks_test.go +++ b/pkg/resthooks/resthooks_test.go @@ -41,6 +41,7 @@ import ( var rh *Resthook var resp models.SubscriptionResponse var mockedSdl *SdlMock +var mockedSdl2 *SdlMock // Test cases func TestMain(m *testing.M) { @@ -48,7 +49,8 @@ func TestMain(m *testing.M) { appmgr.Logger.SetLevel(0) mockedSdl = new(SdlMock) - rh = createResthook(false, mockedSdl) + mockedSdl2 = new(SdlMock) + rh = createResthook(false, mockedSdl,mockedSdl2) code := m.Run() os.Exit(code) } @@ -203,6 +205,7 @@ func TestNotifyReturnsErrorAfterRetriesIfNoHttpServer(t *testing.T) { func TestRestoreSubscriptionsSuccess(t *testing.T) { var mockSdlRetOk error mSdl := new(SdlMock) + mSdl2 := new(SdlMock) key := "key-1" subsReq := createSubscription(models.EventTypeCreated, int64(5), int64(10), "http://localhost:8087/xapps_hook") @@ -214,7 +217,7 @@ func TestRestoreSubscriptionsSuccess(t *testing.T) { mockSdlGetRetVal[key] = string(serializedSubsReq) mSdl.On("GetAll").Return([]string{key}, mockSdlRetOk).Twice() mSdl.On("Get", []string{key}).Return(mockSdlGetRetVal, mockSdlRetOk).Once() - restHook := createResthook(true, mSdl) + restHook := createResthook(true, mSdl,mSdl2) val, found := restHook.subscriptions.Get(key) assert.True(t, found) @@ -224,6 +227,7 @@ func TestRestoreSubscriptionsSuccess(t *testing.T) { func TestRestoreSubscriptionsFailsIfSdlGetAllFails(t *testing.T) { var mockSdlRetStatus error mSdl := new(SdlMock) + mSdl2 := new(SdlMock) getCalled := 0 mGetAllCall := mSdl.On("GetAll") mGetAllCall.RunFn = func(args mock.Arguments) { @@ -234,13 +238,14 @@ func TestRestoreSubscriptionsFailsIfSdlGetAllFails(t *testing.T) { mGetAllCall.ReturnArguments = mock.Arguments{[]string{}, mockSdlRetStatus} } - restHook := createResthook(true, mSdl) + restHook := createResthook(true, mSdl,mSdl2) assert.Equal(t, 0, len(restHook.subscriptions.Items())) } func TestRestoreSubscriptionsFailsIfSdlGetFails(t *testing.T) { var mockSdlRetOk error mSdl := new(SdlMock) + mSdl2 := new(SdlMock) mockSdlRetNok := errors.New("some SDL error") key := "key-1" subsReq := createSubscription(models.EventTypeCreated, int64(5), int64(10), "http://localhost:8087/xapps_hook") @@ -253,7 +258,7 @@ func TestRestoreSubscriptionsFailsIfSdlGetFails(t *testing.T) { mSdl.On("GetAll").Return([]string{key}, mockSdlRetOk).Twice() mSdl.On("Get", []string{key}).Return(mockSdlGetRetVal, mockSdlRetNok).Once() - restHook := createResthook(true, mSdl) + restHook := createResthook(true, mSdl,mSdl2) assert.Equal(t, 0, len(restHook.subscriptions.Items())) } @@ -352,3 +357,8 @@ func (m *SdlMock) RemoveAll() error { a := m.Called() return a.Error(0) } + +func (m *SdlMock) Remove(keys []string) error { + a := m.Called() + return a.Error(0) +} diff --git a/pkg/resthooks/types.go b/pkg/resthooks/types.go index f2af972..1376500 100755 --- a/pkg/resthooks/types.go +++ b/pkg/resthooks/types.go @@ -36,6 +36,7 @@ type Resthook struct { client *http.Client subscriptions cmap.ConcurrentMap db iSdl + db2 iSdl Seq int64 } @@ -52,4 +53,5 @@ type iSdl interface { Get(keys []string) (map[string]interface{}, error) GetAll() ([]string, error) RemoveAll() error + Remove([]string) error }