Poll MR and send messages to consumers 25/6725/1
authorelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 21 Sep 2021 13:43:11 +0000 (15:43 +0200)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 21 Sep 2021 13:54:54 +0000 (15:54 +0200)
Issue-ID: NONRTRIC-586
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: I7e261aedb1a528c193390f6a3e99a49d7783d35e

.gitignore
dmaap-mediator-producer/configs/STD_Fault_Messages.json
dmaap-mediator-producer/internal/config/config.go
dmaap-mediator-producer/internal/config/config_test.go
dmaap-mediator-producer/internal/jobs/jobs.go
dmaap-mediator-producer/internal/jobs/jobs_test.go
dmaap-mediator-producer/internal/restclient/HTTPClient.go
dmaap-mediator-producer/internal/server/server_test.go
dmaap-mediator-producer/main.go
dmaap-mediator-producer/mocks/jobhandler/JobHandler.go [moved from dmaap-mediator-producer/mocks/JobHandler.go with 86% similarity]
dmaap-mediator-producer/simulator/consumersimulator.go [new file with mode: 0644]

index df309a1..5915080 100644 (file)
@@ -18,3 +18,5 @@ infer-out/
 
 .vscode
 .factorypath
+
+coverage.*
index b944802..258d743 100644 (file)
@@ -1,7 +1,12 @@
 {
-  "$schema": "https://json-schema.org/draft/2019-09/schema",
-  "title": "STD_Fault_Messages",
-  "description": "Schema for job delivering fault messages from DMaaP Message Router",
-  "type": "object",
-  "properties": {}
+  "id": "STD_Fault_Messages",
+  "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT",
+  "schema": {
+    "$schema": "https://json-schema.org/draft/2019-09/schema",
+    "title": "STD_Fault_Messages",
+    "description": "Schema for job delivering fault messages from DMaaP Message Router",
+    "type": "object",
+    "properties": {},
+    "additionalProperties": false
+  }
 }
\ No newline at end of file
index 3616c58..8a2784a 100644 (file)
@@ -34,6 +34,8 @@ type Config struct {
        InfoJobCallbackHost                 string
        InfoJobCallbackPort                 int
        InfoCoordinatorAddress              string
+       MRHost                              string
+       MRPort                              int
 }
 
 type ProducerRegistrationInfo struct {
@@ -50,6 +52,8 @@ func New() *Config {
                InfoJobCallbackHost:                 getEnv("INFO_JOB_CALLBACK_HOST", ""),
                InfoJobCallbackPort:                 getEnvAsInt("INFO_JOB_CALLBACK_PORT", 8086),
                InfoCoordinatorAddress:              getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
+               MRHost:                              getEnv("MR_HOST", "http://message-router.onap"),
+               MRPort:                              getEnvAsInt("MR_PORT", 3904),
        }
 }
 
index 4a65dc0..1043027 100644 (file)
@@ -37,6 +37,8 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
        os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost")
        os.Setenv("INFO_JOB_CALLBACK_PORT", "8096")
        os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
+       os.Setenv("MR_HOST", "mrHost")
+       os.Setenv("MR_PORT", "3908")
        t.Cleanup(func() {
                os.Clearenv()
        })
@@ -47,6 +49,8 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
                InfoJobCallbackHost:                 "jobCallbackHost",
                InfoJobCallbackPort:                 8096,
                InfoCoordinatorAddress:              "infoCoordAddr",
+               MRHost:                              "mrHost",
+               MRPort:                              3908,
        }
        if got := New(); !reflect.DeepEqual(got, &wantConfig) {
                t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -70,6 +74,8 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T
                InfoJobCallbackHost:                 "",
                InfoJobCallbackPort:                 8086,
                InfoCoordinatorAddress:              "http://enrichmentservice:8083",
+               MRHost:                              "http://message-router.onap",
+               MRPort:                              3904,
        }
        if got := New(); !reflect.DeepEqual(got, &wantConfig) {
                t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -86,6 +92,8 @@ func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
                InfoJobCallbackHost:                 "",
                InfoJobCallbackPort:                 8086,
                InfoCoordinatorAddress:              "http://enrichmentservice:8083",
+               MRHost:                              "http://message-router.onap",
+               MRPort:                              3904,
        }
        if got := New(); !reflect.DeepEqual(got, &wantConfig) {
                t.Errorf("New() = %v, want %v", got, &wantConfig)
index 7347178..eec59c3 100644 (file)
 package jobs
 
 import (
+       "encoding/json"
        "fmt"
        "os"
        "path/filepath"
        "strings"
+       "sync"
+
+       log "github.com/sirupsen/logrus"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
 type Type struct {
-       TypeId string
-       Schema string
+       TypeId     string `json:"id"`
+       DMaaPTopic string `json:"dmaapTopic"`
+       Schema     string `json:"schema"`
+       Jobs       map[string]JobInfo
 }
 
 type JobInfo struct {
@@ -46,9 +53,10 @@ type JobHandler interface {
 }
 
 var (
+       mu      sync.Mutex
        typeDir = "configs"
        Handler JobHandler
-       allJobs = make(map[string]map[string]JobInfo)
+       allJobs = make(map[string]Type)
 )
 
 func init() {
@@ -62,8 +70,10 @@ func newJobHandlerImpl() *jobHandlerImpl {
 }
 
 func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
+       mu.Lock()
+       defer mu.Unlock()
        if err := validateJobInfo(ji); err == nil {
-               jobs := allJobs[ji.InfoTypeIdentity]
+               jobs := allJobs[ji.InfoTypeIdentity].Jobs
                jobs[ji.InfoJobIdentity] = ji
                return nil
        } else {
@@ -86,6 +96,8 @@ func validateJobInfo(ji JobInfo) error {
 }
 
 func GetTypes() ([]*Type, error) {
+       mu.Lock()
+       defer mu.Unlock()
        types := make([]*Type, 0, 1)
        err := filepath.Walk(typeDir,
                func(path string, info os.FileInfo, err error) error {
@@ -106,6 +118,8 @@ func GetTypes() ([]*Type, error) {
 }
 
 func GetSupportedTypes() []string {
+       mu.Lock()
+       defer mu.Unlock()
        supportedTypes := []string{}
        for k := range allJobs {
                supportedTypes = append(supportedTypes, k)
@@ -118,23 +132,63 @@ func AddJob(job JobInfo) error {
 }
 
 func getType(path string) (*Type, error) {
-       fileName := filepath.Base(path)
-       typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName))
-
-       if typeSchema, err := os.ReadFile(path); err == nil {
-               typeInfo := Type{
-                       TypeId: typeName,
-                       Schema: string(typeSchema),
-               }
-               if _, ok := allJobs[typeName]; !ok {
-                       allJobs[typeName] = make(map[string]JobInfo)
+       if typeDefinition, err := os.ReadFile(path); err == nil {
+               var dat map[string]interface{}
+               if marshalError := json.Unmarshal(typeDefinition, &dat); marshalError == nil {
+                       schema, _ := json.Marshal(dat["schema"])
+                       typeInfo := Type{
+                               TypeId:     dat["id"].(string),
+                               DMaaPTopic: dat["dmaapTopic"].(string),
+                               Schema:     string(schema),
+                               Jobs:       make(map[string]JobInfo),
+                       }
+                       if _, ok := allJobs[typeInfo.TypeId]; !ok {
+                               allJobs[typeInfo.TypeId] = typeInfo
+                       }
+                       return &typeInfo, nil
+               } else {
+                       return nil, marshalError
                }
-               return &typeInfo, nil
        } else {
                return nil, err
        }
 }
 
+func RunJobs(mRAddress string) {
+       for {
+               pollAndDistributeMessages(mRAddress)
+       }
+}
+
+func pollAndDistributeMessages(mRAddress string) {
+       for typeId, typeInfo := range allJobs {
+               log.Debugf("Processing jobs for type: %v", typeId)
+               messagesBody, error := restclient.Get(fmt.Sprintf("%v/events/%v/users/dmaapmediatorproducer", mRAddress, typeInfo.DMaaPTopic))
+               if error != nil {
+                       log.Warnf("Error getting data from MR. Cause: %v", error)
+                       continue
+               }
+               distributeMessages(messagesBody, typeInfo)
+       }
+}
+
+func distributeMessages(messages []byte, typeInfo Type) {
+       if len(messages) > 2 {
+               mu.Lock()
+               for _, jobInfo := range typeInfo.Jobs {
+                       go sendMessagesToConsumer(messages, jobInfo)
+               }
+               mu.Unlock()
+       }
+}
+
+func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
+       log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
+       if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
+               log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
+       }
+}
+
 func clearAll() {
-       allJobs = make(map[string]map[string]JobInfo)
+       allJobs = make(map[string]Type)
 }
index 0941033..b53d85e 100644 (file)
 package jobs
 
 import (
+       "bytes"
+       "io/ioutil"
+       "net/http"
        "os"
        "path/filepath"
        "testing"
+       "time"
 
+       "github.com/stretchr/testify/mock"
        "github.com/stretchr/testify/require"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
+       "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
 )
 
-const type1Schema = `{"title": "Type 1"}`
+const typeDefinition = `{"id": "type1", "dmaapTopic": "unauthenticated.SEC_FAULT_OUTPUT", "schema": {"title": "Type 1"}}`
 
 func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
        assertions := require.New(t)
@@ -42,13 +49,15 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes
        })
        typeDir = typesDir
        fname := filepath.Join(typesDir, "type1.json")
-       if err = os.WriteFile(fname, []byte(type1Schema), 0666); err != nil {
+       if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
                t.Errorf("Unable to create temporary files for types due to: %v", err)
        }
        types, err := GetTypes()
        wantedType := Type{
-               TypeId: "type1",
-               Schema: type1Schema,
+               TypeId:     "type1",
+               DMaaPTopic: "unauthenticated.SEC_FAULT_OUTPUT",
+               Schema:     `{"title":"Type 1"}`,
+               Jobs:       make(map[string]JobInfo),
        }
        wantedTypes := []*Type{&wantedType}
        assertions.EqualValues(wantedTypes, types)
@@ -60,11 +69,7 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes
 
 func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) {
        assertions := require.New(t)
-       allJobs["type1"] = make(map[string]JobInfo)
-       t.Cleanup(func() {
-               clearAll()
-       })
-       jobInfo := JobInfo{
+       wantedJob := JobInfo{
                Owner:            "owner",
                LastUpdated:      "now",
                InfoJobIdentity:  "job1",
@@ -72,11 +77,18 @@ func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) {
                InfoJobData:      "{}",
                InfoTypeIdentity: "type1",
        }
+       allJobs["type1"] = Type{
+               TypeId: "type1",
+               Jobs:   map[string]JobInfo{"job1": wantedJob},
+       }
+       t.Cleanup(func() {
+               clearAll()
+       })
 
-       err := AddJob(jobInfo)
+       err := AddJob(wantedJob)
        assertions.Nil(err)
-       assertions.Equal(1, len(allJobs["type1"]))
-       assertions.Equal(jobInfo, allJobs["type1"]["job1"])
+       assertions.Equal(1, len(allJobs["type1"].Jobs))
+       assertions.Equal(wantedJob, allJobs["type1"].Jobs["job1"])
 }
 
 func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
@@ -92,7 +104,9 @@ func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
 
 func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
-       allJobs["type1"] = make(map[string]JobInfo)
+       allJobs["type1"] = Type{
+               TypeId: "type1",
+       }
        t.Cleanup(func() {
                clearAll()
        })
@@ -107,7 +121,9 @@ func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
 
 func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
-       allJobs["type1"] = make(map[string]JobInfo)
+       allJobs["type1"] = Type{
+               TypeId: "type1",
+       }
        jobInfo := JobInfo{
                InfoTypeIdentity: "type1",
                InfoJobIdentity:  "job1",
@@ -118,3 +134,53 @@ func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
        assertions.Equal("missing required target URI: {  job1   type1}", err.Error())
        clearAll()
 }
+
+func TestPollAndDistributeMessages(t *testing.T) {
+       assertions := require.New(t)
+       jobInfo := JobInfo{
+               InfoTypeIdentity: "type1",
+               InfoJobIdentity:  "job1",
+               TargetUri:        "http://consumerHost/target",
+       }
+       allJobs["type1"] = Type{
+               TypeId:     "type1",
+               DMaaPTopic: "topic",
+               Jobs:       map[string]JobInfo{"job1": jobInfo},
+       }
+       t.Cleanup(func() {
+               clearAll()
+       })
+
+       body := ioutil.NopCloser(bytes.NewReader([]byte(`[{"message": {"data": "data"}}]`)))
+       clientMock := mocks.HTTPClient{}
+       clientMock.On("Get", mock.Anything).Return(&http.Response{
+               StatusCode: http.StatusOK,
+               Body:       body,
+       }, nil)
+
+       clientMock.On("Do", mock.Anything).Return(&http.Response{
+               StatusCode: http.StatusOK,
+       }, nil)
+
+       restclient.Client = &clientMock
+
+       pollAndDistributeMessages("http://mrAddr")
+
+       time.Sleep(100 * time.Millisecond)
+
+       var actualRequest *http.Request
+       clientMock.AssertCalled(t, "Get", "http://mrAddr/events/topic/users/dmaapmediatorproducer")
+       clientMock.AssertNumberOfCalls(t, "Get", 1)
+
+       clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+               actualRequest = req
+               return true
+       }))
+       assertions.Equal(http.MethodPost, actualRequest.Method)
+       assertions.Equal("consumerHost", actualRequest.URL.Host)
+       assertions.Equal("/target", actualRequest.URL.Path)
+       assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+       actualBody, _ := ioutil.ReadAll(actualRequest.Body)
+       assertions.Equal([]byte(`[{"message": {"data": "data"}}]`), actualBody)
+       clientMock.AssertNumberOfCalls(t, "Do", 1)
+}
index 78a02b6..a783f7e 100644 (file)
@@ -73,7 +73,15 @@ func Get(url string) ([]byte, error) {
 }
 
 func Put(url string, body []byte) error {
-       if req, reqErr := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(body)); reqErr == nil {
+       return do(http.MethodPut, url, body)
+}
+
+func Post(url string, body []byte) error {
+       return do(http.MethodPost, url, body)
+}
+
+func do(method string, url string, body []byte) error {
+       if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
                req.Header.Set("Content-Type", "application/json; charset=utf-8")
                if response, respErr := Client.Do(req); respErr == nil {
                        if isResponseSuccess(response.StatusCode) {
index d221c93..444deba 100644 (file)
@@ -32,7 +32,7 @@ import (
 
        "github.com/stretchr/testify/require"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
-       "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
+       "oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobhandler"
 )
 
 func TestStatusHandler(t *testing.T) {
@@ -88,7 +88,7 @@ func TestStatusHandler(t *testing.T) {
 
 func TestCreateInfoJobHandler(t *testing.T) {
        assertions := require.New(t)
-       jobHandlerMock := mocks.JobHandler{}
+       jobHandlerMock := jobhandler.JobHandler{}
 
        goodJobInfo := jobs.JobInfo{
                Owner:            "owner",
index 3fe92dc..47e12e9 100644 (file)
@@ -76,7 +76,7 @@ func main() {
        wg := new(sync.WaitGroup)
 
        // add two goroutines to `wg` WaitGroup, one for each avilable server
-       wg.Add(2)
+       wg.Add(3)
 
        log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
        go func() {
@@ -91,6 +91,11 @@ func main() {
                wg.Done()
        }()
 
+       go func() {
+               jobs.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
+               wg.Done()
+       }()
+
        // wait until WaitGroup is done
        wg.Wait()
        log.Debug("Stopping DMaaP Mediator Producer")
@@ -1,10 +1,10 @@
 // Code generated by mockery v2.9.3. DO NOT EDIT.
 
-package mocks
+package jobhandler
 
 import (
        mock "github.com/stretchr/testify/mock"
-       jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
 )
 
 // JobHandler is an autogenerated mock type for the JobHandler type
diff --git a/dmaap-mediator-producer/simulator/consumersimulator.go b/dmaap-mediator-producer/simulator/consumersimulator.go
new file mode 100644 (file)
index 0000000..25421ae
--- /dev/null
@@ -0,0 +1,40 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package main
+
+import (
+       "fmt"
+       "io"
+       http "net/http"
+)
+
+func handleData(w http.ResponseWriter, req *http.Request) {
+       defer req.Body.Close()
+       if reqData, err := io.ReadAll(req.Body); err == nil {
+               fmt.Printf("Consumer received body: %v\n", string(reqData))
+       }
+}
+
+func main() {
+       http.HandleFunc("/jobs", handleData)
+
+       http.ListenAndServe(":40935", nil)
+}