Add kafka jobs to DMaaP Mediator Producer 70/7470/6
authorelinuxhenrik <henrik.b.andersson@est.tech>
Thu, 23 Dec 2021 15:36:31 +0000 (16:36 +0100)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Tue, 4 Jan 2022 09:33:26 +0000 (10:33 +0100)
Issue-ID: NONRTRIC-702
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Ia7123240ad91449209afc4769b850cb0c8d9598f

17 files changed:
dmaap-mediator-producer/configs/typeSchemaDmaap.json [new file with mode: 0644]
dmaap-mediator-producer/configs/typeSchemaKafka.json [new file with mode: 0644]
dmaap-mediator-producer/configs/type_config.json
dmaap-mediator-producer/go.mod
dmaap-mediator-producer/go.sum
dmaap-mediator-producer/internal/config/config.go
dmaap-mediator-producer/internal/config/config_test.go
dmaap-mediator-producer/internal/config/registrator.go
dmaap-mediator-producer/internal/config/registrator_test.go
dmaap-mediator-producer/internal/jobs/jobs.go
dmaap-mediator-producer/internal/jobs/jobs_test.go
dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go [new file with mode: 0644]
dmaap-mediator-producer/internal/server/server_test.go
dmaap-mediator-producer/main.go
dmaap-mediator-producer/mocks/KafkaConsumer.go [new file with mode: 0644]
dmaap-mediator-producer/mocks/KafkaFactory.go [new file with mode: 0644]
dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go [moved from dmaap-mediator-producer/mocks/jobhandler/JobHandler.go with 66% similarity]

diff --git a/dmaap-mediator-producer/configs/typeSchemaDmaap.json b/dmaap-mediator-producer/configs/typeSchemaDmaap.json
new file mode 100644 (file)
index 0000000..a50b236
--- /dev/null
@@ -0,0 +1,10 @@
+{
+  "$schema": "http://json-schema.org/draft-04/schema#",
+  "type": "object",
+  "properties": {
+    "filter": {
+       "type": "string"
+     }
+  },
+  "additionalProperties": false
+}
diff --git a/dmaap-mediator-producer/configs/typeSchemaKafka.json b/dmaap-mediator-producer/configs/typeSchemaKafka.json
new file mode 100644 (file)
index 0000000..dcd40f9
--- /dev/null
@@ -0,0 +1,26 @@
+{
+  "$schema": "http://json-schema.org/draft-04/schema#",
+  "type": "object",
+  "properties": {
+    "filter": {
+      "type": "string"
+    },
+    "bufferTimeout": {
+      "type": "object",
+      "properties": {
+        "maxSize": {
+          "type": "integer"
+        },
+        "maxTimeMiliseconds": {
+          "type": "integer"
+        }
+      },
+      "additionalProperties": false,
+      "required": [
+        "maxSize",
+        "maxTimeMiliseconds"
+      ]
+    }
+  },
+  "additionalProperties": false
+}
\ No newline at end of file
index f75d0e4..1149669 100644 (file)
@@ -4,6 +4,10 @@
       {
         "id": "STD_Fault_Messages",
         "dmaapTopicUrl": "/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages"
+      },
+      {
+        "id": "Kafka_TestTopic",
+        "kafkaInputTopic": "TestTopic"
       }
   ]
 }
\ No newline at end of file
index eaaecf7..e701c01 100644 (file)
@@ -10,6 +10,7 @@ require (
 )
 
 require (
+       github.com/confluentinc/confluent-kafka-go v1.8.2 // indirect
        github.com/davecgh/go-spew v1.1.1 // indirect
        github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
        github.com/pmezard/go-difflib v1.0.0 // indirect
index 4b3557b..cf2c90f 100644 (file)
@@ -1,3 +1,5 @@
+github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E=
+github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
index e03c40a..7582e9c 100644 (file)
@@ -24,6 +24,7 @@ import (
        "encoding/json"
        "fmt"
        "os"
+       "path/filepath"
        "strconv"
 
        log "github.com/sirupsen/logrus"
@@ -35,6 +36,7 @@ type Config struct {
        InfoProducerPort       int
        InfoCoordinatorAddress string
        DMaaPMRAddress         string
+       KafkaBootstrapServers  string
        ProducerCertPath       string
        ProducerKeyPath        string
 }
@@ -45,6 +47,7 @@ func New() *Config {
                InfoProducerPort:       getEnvAsInt("INFO_PRODUCER_PORT", 8085),
                InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"),
                DMaaPMRAddress:         getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"),
+               KafkaBootstrapServers:  getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"),
                ProducerCertPath:       getEnv("PRODUCER_CERT_PATH", "security/producer.crt"),
                ProducerKeyPath:        getEnv("PRODUCER_KEY_PATH", "security/producer.key"),
                LogLevel:               getLogLevel(),
@@ -83,8 +86,8 @@ func getLogLevel() log.Level {
        }
 }
 
-func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) {
-       typeDefsByte, err := os.ReadFile(configFile)
+func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) {
+       typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json"))
        if err != nil {
                return nil, err
        }
@@ -96,5 +99,35 @@ func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) {
                return nil, err
        }
 
+       kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json"))
+       if err != nil {
+               return nil, err
+       }
+
+       dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json"))
+       if err != nil {
+               return nil, err
+       }
+
+       for i, typeDef := range typeDefs.Types {
+               if typeDef.IsKafkaType() {
+                       typeDefs.Types[i].TypeSchema = kafkaTypeSchema
+               } else {
+                       typeDefs.Types[i].TypeSchema = dMaaPTypeSchema
+               }
+       }
        return typeDefs.Types, nil
 }
+
+func getTypeSchema(schemaFile string) (interface{}, error) {
+       typeDefsByte, err := os.ReadFile(schemaFile)
+       if err != nil {
+               return nil, err
+       }
+       var schema interface{}
+       err = json.Unmarshal(typeDefsByte, &schema)
+       if err != nil {
+               return nil, err
+       }
+       return schema, nil
+}
index faf5900..e66a818 100644 (file)
@@ -22,6 +22,7 @@ package config
 
 import (
        "bytes"
+       "encoding/json"
        "os"
        "path/filepath"
        "testing"
@@ -37,6 +38,7 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
        os.Setenv("INFO_PRODUCER_PORT", "8095")
        os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
        os.Setenv("DMAAP_MR_ADDR", "mrHost:3908")
+       os.Setenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9093")
        os.Setenv("PRODUCER_CERT_PATH", "cert")
        os.Setenv("PRODUCER_KEY_PATH", "key")
        t.Cleanup(func() {
@@ -48,6 +50,7 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
                InfoProducerPort:       8095,
                InfoCoordinatorAddress: "infoCoordAddr",
                DMaaPMRAddress:         "mrHost:3908",
+               KafkaBootstrapServers:  "localhost:9093",
                ProducerCertPath:       "cert",
                ProducerKeyPath:        "key",
        }
@@ -72,6 +75,7 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T
                InfoProducerPort:       8085,
                InfoCoordinatorAddress: "https://informationservice:8434",
                DMaaPMRAddress:         "https://message-router.onap:3905",
+               KafkaBootstrapServers:  "localhost:9092",
                ProducerCertPath:       "security/producer.crt",
                ProducerKeyPath:        "security/producer.key",
        }
@@ -98,6 +102,7 @@ func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) {
                InfoProducerPort:       8085,
                InfoCoordinatorAddress: "https://informationservice:8434",
                DMaaPMRAddress:         "https://message-router.onap:3905",
+               KafkaBootstrapServers:  "localhost:9092",
                ProducerCertPath:       "security/producer.crt",
                ProducerKeyPath:        "security/producer.key",
        }
@@ -109,7 +114,17 @@ func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) {
        assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!")
 }
 
-const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
+const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}, {"id": "type2", "kafkaInputTopic": "TestTopic"}]}`
+const typeSchemaFileContent = `{
+       "$schema": "http://json-schema.org/draft-04/schema#",
+       "type": "object",
+       "properties": {
+         "filter": {
+                "type": "string"
+          }
+       },
+       "additionalProperties": false
+  }`
 
 func TestGetTypesFromConfiguration_fileOkShouldReturnSliceOfTypeDefinitions(t *testing.T) {
        assertions := require.New(t)
@@ -124,14 +139,30 @@ func TestGetTypesFromConfiguration_fileOkShouldReturnSliceOfTypeDefinitions(t *t
        if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
                t.Errorf("Unable to create temporary config file for types due to: %v", err)
        }
+       fname = filepath.Join(typesDir, "typeSchemaDmaap.json")
+       if err = os.WriteFile(fname, []byte(typeSchemaFileContent), 0666); err != nil {
+               t.Errorf("Unable to create temporary schema file for DMaaP type due to: %v", err)
+       }
+       fname = filepath.Join(typesDir, "typeSchemaKafka.json")
+       if err = os.WriteFile(fname, []byte(typeSchemaFileContent), 0666); err != nil {
+               t.Errorf("Unable to create temporary schema file for Kafka type due to: %v", err)
+       }
+       var typeSchemaObj interface{}
+       json.Unmarshal([]byte(typeSchemaFileContent), &typeSchemaObj)
 
-       types, err := GetJobTypesFromConfiguration(fname)
+       types, err := GetJobTypesFromConfiguration(typesDir)
 
-       wantedType := TypeDefinition{
-               Id:            "type1",
-               DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+       wantedDMaaPType := TypeDefinition{
+               Identity:      "type1",
+               DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+               TypeSchema:    typeSchemaObj,
+       }
+       wantedKafkaType := TypeDefinition{
+               Identity:        "type2",
+               KafkaInputTopic: "TestTopic",
+               TypeSchema:      typeSchemaObj,
        }
-       wantedTypes := []TypeDefinition{wantedType}
+       wantedTypes := []TypeDefinition{wantedDMaaPType, wantedKafkaType}
        assertions.EqualValues(wantedTypes, types)
        assertions.Nil(err)
 }
index 83ed43f..bac14e6 100644 (file)
@@ -32,11 +32,20 @@ import (
 
 const registerTypePath = "/data-producer/v1/info-types/"
 const registerProducerPath = "/data-producer/v1/info-producers/"
-const typeSchema = `{"type": "object","properties": {},"additionalProperties": false}`
 
 type TypeDefinition struct {
-       Id            string `json:"id"`
-       DmaapTopicURL string `json:"dmaapTopicUrl"`
+       Identity        string `json:"id"`
+       DMaaPTopicURL   string `json:"dmaapTopicUrl"`
+       KafkaInputTopic string `json:"kafkaInputTopic"`
+       TypeSchema      interface{}
+}
+
+func (td TypeDefinition) IsKafkaType() bool {
+       return td.KafkaInputTopic != ""
+}
+
+func (td TypeDefinition) IsDMaaPType() bool {
+       return td.DMaaPTopicURL != ""
 }
 
 type ProducerRegistrationInfo struct {
@@ -64,8 +73,8 @@ func NewRegistratorImpl(infoCoordAddr string, client restclient.HTTPClient) *Reg
 
 func (r RegistratorImpl) RegisterTypes(jobTypes []TypeDefinition) error {
        for _, jobType := range jobTypes {
-               body := fmt.Sprintf(`{"info_job_data_schema": %v}`, typeSchema)
-               if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Id), []byte(body), r.httpClient); error != nil {
+               body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.TypeSchema)
+               if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Identity), []byte(body), r.httpClient); error != nil {
                        return error
                }
                log.Debugf("Registered type: %v", jobType)
index 324aed0..d1b61d8 100644 (file)
@@ -40,7 +40,8 @@ func TestRegisterTypes(t *testing.T) {
        }, nil)
 
        type1 := TypeDefinition{
-               Id: "Type1",
+               Identity:   "Type1",
+               TypeSchema: `{"type": "object","properties": {},"additionalProperties": false}`,
        }
        types := []TypeDefinition{type1}
 
index 867894f..3ef5ca3 100644 (file)
@@ -25,24 +25,25 @@ import (
        "sync"
        "time"
 
+       "github.com/confluentinc/confluent-kafka-go/kafka"
        log "github.com/sirupsen/logrus"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
 )
 
 type TypeData struct {
-       TypeId        string `json:"id"`
-       DMaaPTopicURL string `json:"dmaapTopicUrl"`
-       jobsHandler   *jobsHandler
+       Identity    string `json:"id"`
+       jobsHandler *jobsHandler
 }
 
 type JobInfo struct {
-       Owner            string      `json:"owner"`
-       LastUpdated      string      `json:"last_updated"`
-       InfoJobIdentity  string      `json:"info_job_identity"`
-       TargetUri        string      `json:"target_uri"`
-       InfoJobData      interface{} `json:"info_job_data"`
-       InfoTypeIdentity string      `json:"info_type_identity"`
+       Owner            string     `json:"owner"`
+       LastUpdated      string     `json:"last_updated"`
+       InfoJobIdentity  string     `json:"info_job_identity"`
+       TargetUri        string     `json:"target_uri"`
+       InfoJobData      Parameters `json:"info_job_data"`
+       InfoTypeIdentity string     `json:"info_type_identity"`
 }
 
 type JobTypesManager interface {
@@ -59,14 +60,16 @@ type JobsManagerImpl struct {
        allTypes         map[string]TypeData
        pollClient       restclient.HTTPClient
        mrAddress        string
+       kafkaFactory     kafkaclient.KafkaFactory
        distributeClient restclient.HTTPClient
 }
 
-func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
+func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl {
        return &JobsManagerImpl{
                allTypes:         make(map[string]TypeData),
                pollClient:       pollClient,
                mrAddress:        mrAddr,
+               kafkaFactory:     kafkaFactory,
                distributeClient: distributeClient,
        }
 }
@@ -84,7 +87,7 @@ func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
 
 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
        for _, typeData := range jm.allTypes {
-               log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
+               log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity)
                typeData.jobsHandler.deleteJobCh <- jobId
        }
        log.Debug("Deleted job: ", jobId)
@@ -106,10 +109,12 @@ func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
 
 func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
        for _, typeDef := range types {
-               jm.allTypes[typeDef.Id] = TypeData{
-                       TypeId:        typeDef.Id,
-                       DMaaPTopicURL: typeDef.DmaapTopicURL,
-                       jobsHandler:   newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
+               if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" {
+                       log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity)
+               }
+               jm.allTypes[typeDef.Identity] = TypeData{
+                       Identity:    typeDef.Identity,
+                       jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient),
                }
        }
        return types
@@ -126,7 +131,7 @@ func (jm *JobsManagerImpl) GetSupportedTypes() []string {
 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
        for _, jobType := range jm.allTypes {
 
-               go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
+               go jobType.jobsHandler.startPollingAndDistribution()
 
        }
 }
@@ -134,30 +139,29 @@ func (jm *JobsManagerImpl) StartJobsForAllTypes() {
 type jobsHandler struct {
        mu               sync.Mutex
        typeId           string
-       topicUrl         string
+       pollingAgent     pollingAgent
        jobs             map[string]job
        addJobCh         chan JobInfo
        deleteJobCh      chan string
-       pollClient       restclient.HTTPClient
        distributeClient restclient.HTTPClient
 }
 
-func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+       pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
        return &jobsHandler{
-               typeId:           typeId,
-               topicUrl:         topicURL,
+               typeId:           typeDef.Identity,
+               pollingAgent:     pollingAgent,
                jobs:             make(map[string]job),
                addJobCh:         make(chan JobInfo),
                deleteJobCh:      make(chan string),
-               pollClient:       pollClient,
                distributeClient: distributeClient,
        }
 }
 
-func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
+func (jh *jobsHandler) startPollingAndDistribution() {
        go func() {
                for {
-                       jh.pollAndDistributeMessages(mRAddress)
+                       jh.pollAndDistributeMessages()
                }
        }()
 
@@ -168,19 +172,20 @@ func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
        }()
 }
 
-func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
+func (jh *jobsHandler) pollAndDistributeMessages() {
        log.Debugf("Processing jobs for type: %v", jh.typeId)
-       messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
+       messagesBody, error := jh.pollingAgent.pollMessages()
        if error != nil {
-               log.Warn("Error getting data from MR. Cause: ", error)
-               time.Sleep(time.Minute) // Must wait before trying to call MR again
+               log.Warn("Error getting data from source. Cause: ", error)
+               time.Sleep(time.Minute) // Must wait before trying to call data source again
+               return
        }
-       log.Debug("Received messages: ", string(messagesBody))
        jh.distributeMessages(messagesBody)
 }
 
 func (jh *jobsHandler) distributeMessages(messages []byte) {
-       if len(messages) > 2 {
+       if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
+               log.Debug("Distributing messages: ", string(messages))
                jh.mu.Lock()
                defer jh.mu.Unlock()
                for _, job := range jh.jobs {
@@ -234,6 +239,61 @@ func (jh *jobsHandler) deleteJob(deletedJob string) {
        jh.mu.Unlock()
 }
 
+type pollingAgent interface {
+       pollMessages() ([]byte, error)
+}
+
+func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
+       if typeDef.DMaaPTopicURL != "" {
+               return dMaaPPollingAgent{
+                       messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
+                       pollClient:       pollClient,
+               }
+       } else {
+               return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
+       }
+}
+
+type dMaaPPollingAgent struct {
+       messageRouterURL string
+       pollClient       restclient.HTTPClient
+}
+
+func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
+       return restclient.Get(pa.messageRouterURL, pa.pollClient)
+}
+
+type kafkaPollingAgent struct {
+       kafkaClient kafkaclient.KafkaClient
+}
+
+func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
+       c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID)
+       if err != nil {
+               log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
+       }
+       return kafkaPollingAgent{
+               kafkaClient: c,
+       }
+}
+
+func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
+       msg, err := pa.kafkaClient.ReadMessage()
+       if err == nil {
+               return msg, nil
+       } else {
+               if isKafkaTimedOutError(err) {
+                       return []byte(""), nil
+               }
+               return nil, err
+       }
+}
+
+func isKafkaTimedOutError(err error) bool {
+       kafkaErr, ok := err.(kafka.Error)
+       return ok && kafkaErr.Code() == kafka.ErrTimedOut
+}
+
 type job struct {
        jobInfo         JobInfo
        client          restclient.HTTPClient
@@ -242,6 +302,7 @@ type job struct {
 }
 
 func newJob(j JobInfo, c restclient.HTTPClient) job {
+
        return job{
                jobInfo:         j,
                client:          c,
@@ -250,7 +311,24 @@ func newJob(j JobInfo, c restclient.HTTPClient) job {
        }
 }
 
+type Parameters struct {
+       BufferTimeout BufferTimeout `json:"bufferTimeout"`
+}
+
+type BufferTimeout struct {
+       MaxSize            int   `json:"maxSize"`
+       MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"`
+}
+
 func (j *job) start() {
+       if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 {
+               j.startReadingSingleMessages()
+       } else {
+               j.startReadingMessagesBuffered()
+       }
+}
+
+func (j *job) startReadingSingleMessages() {
 out:
        for {
                select {
@@ -263,10 +341,68 @@ out:
        }
 }
 
+func (j *job) startReadingMessagesBuffered() {
+out:
+       for {
+               select {
+               case <-j.controlChannel:
+                       log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
+                       break out
+               default:
+                       msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout)
+                       if len(msgs) > 0 {
+                               j.sendMessagesToConsumer(msgs)
+                       }
+               }
+       }
+}
+
+func (j *job) read(bufferParams BufferTimeout) []byte {
+       wg := sync.WaitGroup{}
+       wg.Add(bufferParams.MaxSize)
+       var msgs []byte
+       c := make(chan struct{})
+       go func() {
+               i := 0
+       out:
+               for {
+                       select {
+                       case <-c:
+                               break out
+                       case msg := <-j.messagesChannel:
+                               i++
+                               msgs = append(msgs, msg...)
+                               wg.Done()
+                               if i == bufferParams.MaxSize {
+                                       break out
+                               }
+                       }
+               }
+       }()
+       j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
+       close(c)
+       return msgs
+}
+
+func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
+       c := make(chan struct{})
+       go func() {
+               defer close(c)
+               wg.Wait()
+       }()
+       select {
+       case <-c:
+               return false // completed normally
+       case <-time.After(timeout):
+               return true // timed out
+       }
+}
+
 func (j *job) sendMessagesToConsumer(messages []byte) {
        log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
        if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
                log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
+               return
        }
        log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
 }
index 30b4ffd..ab1165c 100644 (file)
@@ -22,52 +22,60 @@ package jobs
 
 import (
        "bytes"
+       "fmt"
        "io/ioutil"
        "net/http"
+       "strconv"
        "sync"
        "testing"
        "time"
 
+       "github.com/confluentinc/confluent-kafka-go/kafka"
+       "github.com/stretchr/testify/mock"
        "github.com/stretchr/testify/require"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
+       "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
 )
 
-const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
-
-func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
+func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
        assertions := require.New(t)
 
-       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
 
-       wantedType := config.TypeDefinition{
-               Id:            "type1",
-               DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+       wantedDMaaPType := config.TypeDefinition{
+               Identity:      "type1",
+               DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+       }
+       wantedKafkaType := config.TypeDefinition{
+               Identity:        "type2",
+               KafkaInputTopic: "topic",
        }
-       wantedTypes := []config.TypeDefinition{wantedType}
+       wantedTypes := []config.TypeDefinition{wantedDMaaPType, wantedKafkaType}
 
        types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
 
        assertions.EqualValues(wantedTypes, types)
 
        supportedTypes := managerUnderTest.GetSupportedTypes()
-       assertions.EqualValues([]string{"type1"}, supportedTypes)
+       assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes)
 }
 
 func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
        wantedJob := JobInfo{
                Owner:            "owner",
                LastUpdated:      "now",
                InfoJobIdentity:  "job1",
                TargetUri:        "target",
-               InfoJobData:      "{}",
+               InfoJobData:      Parameters{},
                InfoTypeIdentity: "type1",
        }
        jobsHandler := jobsHandler{
                addJobCh: make(chan JobInfo)}
        managerUnderTest.allTypes["type1"] = TypeData{
-               TypeId:      "type1",
+               Identity:    "type1",
                jobsHandler: &jobsHandler,
        }
 
@@ -83,7 +91,7 @@ func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T
 
 func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
        jobInfo := JobInfo{
                InfoTypeIdentity: "type1",
        }
@@ -95,9 +103,9 @@ func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T)
 
 func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
        managerUnderTest.allTypes["type1"] = TypeData{
-               TypeId: "type1",
+               Identity: "type1",
        }
 
        jobInfo := JobInfo{
@@ -105,14 +113,14 @@ func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
        }
        err := managerUnderTest.AddJobFromRESTCall(jobInfo)
        assertions.NotNil(err)
-       assertions.Equal("missing required job identity: {    <nil> type1}", err.Error())
+       assertions.Equal("missing required job identity: {    {{0 0}} type1}", err.Error())
 }
 
 func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
        managerUnderTest.allTypes["type1"] = TypeData{
-               TypeId: "type1",
+               Identity: "type1",
        }
 
        jobInfo := JobInfo{
@@ -121,16 +129,16 @@ func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
        }
        err := managerUnderTest.AddJobFromRESTCall(jobInfo)
        assertions.NotNil(err)
-       assertions.Equal("missing required target URI: {  job1  <nil> type1}", err.Error())
+       assertions.Equal("missing required target URI: {  job1  {{0 0}} type1}", err.Error())
 }
 
 func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
        assertions := require.New(t)
-       managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+       managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil)
        jobsHandler := jobsHandler{
                deleteJobCh: make(chan string)}
        managerUnderTest.allTypes["type1"] = TypeData{
-               TypeId:      "type1",
+               Identity:    "type1",
                jobsHandler: &jobsHandler,
        }
 
@@ -139,21 +147,21 @@ func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
        assertions.Equal("job2", <-jobsHandler.deleteJobCh)
 }
 
-func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
+func TestStartJobsManagerAddDMaaPJob_shouldStartPollAndDistributeMessages(t *testing.T) {
        assertions := require.New(t)
 
        called := false
-       messages := `[{"message": {"data": "data"}}]`
+       dMaaPMessages := `[{"message": {"data": "dmaap"}}]`
        pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
                if req.URL.String() == "http://mrAddr/topicUrl" {
                        assertions.Equal(req.Method, "GET")
                        body := "[]"
                        if !called {
                                called = true
-                               body = messages
+                               body = dMaaPMessages
                        }
                        return &http.Response{
-                               StatusCode: 200,
+                               StatusCode: http.StatusOK,
                                Body:       ioutil.NopCloser(bytes.NewReader([]byte(body))),
                                Header:     make(http.Header), // Must be set to non-nil value or it panics
                        }
@@ -165,9 +173,9 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T)
 
        wg := sync.WaitGroup{}
        distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
-               if req.URL.String() == "http://consumerHost/target" {
+               if req.URL.String() == "http://consumerHost/dmaaptarget" {
                        assertions.Equal(req.Method, "POST")
-                       assertions.Equal(messages, getBodyAsString(req, t))
+                       assertions.Equal(dMaaPMessages, getBodyAsString(req, t))
                        assertions.Equal("application/json", req.Header.Get("Content-Type"))
                        wg.Done()
                        return &http.Response{
@@ -180,25 +188,88 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T)
                t.Fail()
                return nil
        })
-       jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
-
-       jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock)
-       jobsManager.allTypes["type1"] = TypeData{
+       dMaaPTypeDef := config.TypeDefinition{
+               Identity:      "type1",
                DMaaPTopicURL: "/topicUrl",
-               TypeId:        "type1",
-               jobsHandler:   jobsHandler,
        }
+       dMaaPJobsHandler := newJobsHandler(dMaaPTypeDef, "http://mrAddr", nil, pollClientMock, distributeClientMock)
 
+       jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, distributeClientMock)
+       jobsManager.allTypes["type1"] = TypeData{
+               Identity:    "type1",
+               jobsHandler: dMaaPJobsHandler,
+       }
        jobsManager.StartJobsForAllTypes()
 
-       jobInfo := JobInfo{
+       dMaaPJobInfo := JobInfo{
                InfoTypeIdentity: "type1",
                InfoJobIdentity:  "job1",
-               TargetUri:        "http://consumerHost/target",
+               TargetUri:        "http://consumerHost/dmaaptarget",
        }
 
        wg.Add(1) // Wait till the distribution has happened
-       err := jobsManager.AddJobFromRESTCall(jobInfo)
+       err := jobsManager.AddJobFromRESTCall(dMaaPJobInfo)
+       assertions.Nil(err)
+
+       if waitTimeout(&wg, 2*time.Second) {
+               t.Error("Not all calls to server were made")
+               t.Fail()
+       }
+}
+
+func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *testing.T) {
+       assertions := require.New(t)
+
+       kafkaMessages := `1`
+       wg := sync.WaitGroup{}
+       distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+               if req.URL.String() == "http://consumerHost/kafkatarget" {
+                       assertions.Equal(req.Method, "POST")
+                       assertions.Equal(kafkaMessages, getBodyAsString(req, t))
+                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
+                       wg.Done()
+                       return &http.Response{
+                               StatusCode: 200,
+                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+                               Header:     make(http.Header), // Must be set to non-nil value or it panics
+                       }
+               }
+               t.Error("Wrong call to client: ", req)
+               t.Fail()
+               return nil
+       })
+
+       kafkaTypeDef := config.TypeDefinition{
+               Identity:        "type2",
+               KafkaInputTopic: "topic",
+       }
+       kafkaFactoryMock := mocks.KafkaFactory{}
+       kafkaConsumerMock := mocks.KafkaConsumer{}
+       kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
+       kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
+       kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(&kafka.Message{
+               Value: []byte(kafkaMessages),
+       }, error(nil)).Once()
+       kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, fmt.Errorf("Just to stop"))
+       kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
+       kafkaJobsHandler := newJobsHandler(kafkaTypeDef, "", kafkaFactoryMock, nil, distributeClientMock)
+
+       jobsManager := NewJobsManagerImpl(nil, "", kafkaFactoryMock, distributeClientMock)
+       jobsManager.allTypes["type2"] = TypeData{
+               Identity:    "type2",
+               jobsHandler: kafkaJobsHandler,
+       }
+
+       jobsManager.StartJobsForAllTypes()
+
+       kafkaJobInfo := JobInfo{
+               InfoTypeIdentity: "type2",
+               InfoJobIdentity:  "job2",
+               TargetUri:        "http://consumerHost/kafkatarget",
+       }
+
+       wg.Add(1) // Wait till the distribution has happened
+       err := jobsManager.AddJobFromRESTCall(kafkaJobInfo)
        assertions.Nil(err)
 
        if waitTimeout(&wg, 2*time.Second) {
@@ -210,7 +281,11 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T)
 func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
        jobToDelete := newJob(JobInfo{}, nil)
        go jobToDelete.start()
-       jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+       typeDef := config.TypeDefinition{
+               Identity:      "type1",
+               DMaaPTopicURL: "/topicUrl",
+       }
+       jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
        jobsHandler.jobs["job1"] = jobToDelete
 
        go jobsHandler.monitorManagementChannels()
@@ -233,7 +308,11 @@ func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
                InfoJobIdentity: "job",
        }, nil)
 
-       jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+       typeDef := config.TypeDefinition{
+               Identity:      "type1",
+               DMaaPTopicURL: "/topicUrl",
+       }
+       jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil)
        jobsHandler.jobs["job1"] = job
 
        fillMessagesBuffer(job.messagesChannel)
@@ -243,6 +322,143 @@ func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
        require.New(t).Len(job.messagesChannel, 0)
 }
 
+func TestKafkaPollingAgentTimedOut_shouldResultInEMptyMessages(t *testing.T) {
+       assertions := require.New(t)
+
+       kafkaFactoryMock := mocks.KafkaFactory{}
+       kafkaConsumerMock := mocks.KafkaConsumer{}
+       kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil))
+       kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil))
+       kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, kafka.NewError(kafka.ErrTimedOut, "", false))
+       kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil)
+
+       pollingAgentUnderTest := newKafkaPollingAgent(kafkaFactoryMock, "")
+       messages, err := pollingAgentUnderTest.pollMessages()
+
+       assertions.Equal([]byte(""), messages)
+       assertions.Nil(err)
+}
+
+func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) {
+       assertions := require.New(t)
+
+       wg := sync.WaitGroup{}
+       messageNo := 1
+       distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+               if req.URL.String() == "http://consumerHost/target" {
+                       assertions.Equal(req.Method, "POST")
+                       assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t))
+                       messageNo++
+                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
+                       wg.Done()
+                       return &http.Response{
+                               StatusCode: 200,
+                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+                               Header:     make(http.Header), // Must be set to non-nil value or it panics
+                       }
+               }
+               t.Error("Wrong call to client: ", req)
+               t.Fail()
+               return nil
+       })
+
+       jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock)
+
+       wg.Add(2)
+       go jobUnderTest.start()
+
+       jobUnderTest.messagesChannel <- []byte("message1")
+       jobUnderTest.messagesChannel <- []byte("message2")
+
+       if waitTimeout(&wg, 2*time.Second) {
+               t.Error("Not all calls to server were made")
+               t.Fail()
+       }
+}
+
+func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) {
+       assertions := require.New(t)
+
+       wg := sync.WaitGroup{}
+       distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+               if req.URL.String() == "http://consumerHost/target" {
+                       assertions.Equal(req.Method, "POST")
+                       assertions.Equal("12", getBodyAsString(req, t))
+                       assertions.Equal("application/json", req.Header.Get("Content-Type"))
+                       wg.Done()
+                       return &http.Response{
+                               StatusCode: 200,
+                               Body:       ioutil.NopCloser(bytes.NewBufferString(`OK`)),
+                               Header:     make(http.Header), // Must be set to non-nil value or it panics
+                       }
+               }
+               t.Error("Wrong call to client: ", req)
+               t.Fail()
+               return nil
+       })
+
+       jobUnderTest := newJob(JobInfo{
+               TargetUri: "http://consumerHost/target",
+               InfoJobData: Parameters{
+                       BufferTimeout: BufferTimeout{
+                               MaxSize:            5,
+                               MaxTimeMiliseconds: 200,
+                       },
+               },
+       }, distributeClientMock)
+
+       wg.Add(1)
+       go jobUnderTest.start()
+
+       go func() {
+               jobUnderTest.messagesChannel <- []byte("1")
+               jobUnderTest.messagesChannel <- []byte("2")
+       }()
+
+       if waitTimeout(&wg, 2*time.Second) {
+               t.Error("Not all calls to server were made")
+               t.Fail()
+       }
+}
+
+func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t *testing.T) {
+       assertions := require.New(t)
+
+       jobUnderTest := newJob(JobInfo{}, nil)
+
+       go func() {
+               for i := 0; i < 4; i++ {
+                       jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
+               }
+       }()
+
+       msgs := jobUnderTest.read(BufferTimeout{
+               MaxSize:            2,
+               MaxTimeMiliseconds: 200,
+       })
+
+       assertions.Equal([]byte("01"), msgs)
+}
+func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
+       assertions := require.New(t)
+
+       jobUnderTest := newJob(JobInfo{}, nil)
+
+       go func() {
+               for i := 0; i < 4; i++ {
+                       time.Sleep(10 * time.Millisecond)
+                       jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i))
+               }
+       }()
+
+       msgs := jobUnderTest.read(BufferTimeout{
+               MaxSize:            2,
+               MaxTimeMiliseconds: 30,
+       })
+
+       assertions.Equal([]byte("01"), msgs)
+}
+
 func fillMessagesBuffer(mc chan []byte) {
        for i := 0; i < cap(mc); i++ {
                mc <- []byte("msg")
diff --git a/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go b/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go
new file mode 100644 (file)
index 0000000..16abcb4
--- /dev/null
@@ -0,0 +1,94 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package kafkaclient
+
+import (
+       "time"
+
+       "github.com/confluentinc/confluent-kafka-go/kafka"
+)
+
+type KafkaFactory interface {
+       NewKafkaConsumer(topicID string) (KafkaConsumer, error)
+}
+
+type KafkaFactoryImpl struct {
+       BootstrapServer string
+}
+
+func (kf KafkaFactoryImpl) NewKafkaConsumer(topicID string) (KafkaConsumer, error) {
+       consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
+               "bootstrap.servers": kf.BootstrapServer,
+               "group.id":          "dmaap-mediator-producer",
+               "auto.offset.reset": "earliest",
+       })
+       if err != nil {
+               return nil, err
+       }
+       return KafkaConsumerImpl{consumer: consumer}, nil
+}
+
+func NewKafkaClient(factory KafkaFactory, topicID string) (KafkaClient, error) {
+       consumer, err := factory.NewKafkaConsumer(topicID)
+       if err != nil {
+               return KafkaClient{}, err
+       }
+       consumer.Commit()
+       err = consumer.Subscribe(topicID)
+       if err != nil {
+               return KafkaClient{}, err
+       }
+       return KafkaClient{consumer: consumer}, nil
+}
+
+type KafkaClient struct {
+       consumer KafkaConsumer
+}
+
+func (kc KafkaClient) ReadMessage() ([]byte, error) {
+       msg, err := kc.consumer.ReadMessage(time.Second)
+       if err != nil {
+               return nil, err
+       }
+       return msg.Value, nil
+}
+
+type KafkaConsumer interface {
+       Commit() ([]kafka.TopicPartition, error)
+       Subscribe(topic string) (err error)
+       ReadMessage(timeout time.Duration) (*kafka.Message, error)
+}
+
+type KafkaConsumerImpl struct {
+       consumer *kafka.Consumer
+}
+
+func (kc KafkaConsumerImpl) Commit() ([]kafka.TopicPartition, error) {
+       return kc.consumer.Commit()
+}
+
+func (kc KafkaConsumerImpl) Subscribe(topic string) error {
+       return kc.consumer.Subscribe(topic, nil)
+}
+
+func (kc KafkaConsumerImpl) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
+       return kc.consumer.ReadMessage(timeout)
+}
index 6248c22..6fe4d7a 100644 (file)
@@ -34,7 +34,7 @@ import (
        "github.com/stretchr/testify/mock"
        "github.com/stretchr/testify/require"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
-       "oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobhandler"
+       "oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobshandler"
 )
 
 func TestNewRouter(t *testing.T) {
@@ -88,7 +88,7 @@ func TestNewRouter(t *testing.T) {
        assertions.Equal("/admin/log", path)
 }
 
-func TestAddInfoJobHandler(t *testing.T) {
+func TestAddInfoJobToJobsHandler(t *testing.T) {
        assertions := require.New(t)
 
        type args struct {
@@ -102,21 +102,21 @@ func TestAddInfoJobHandler(t *testing.T) {
                wantedBody   string
        }{
                {
-                       name: "AddInfoJobHandler with correct job, should return OK",
+                       name: "AddInfoJobToJobsHandler with correct job, should return OK",
                        args: args{
                                job: jobs.JobInfo{
                                        Owner:            "owner",
                                        LastUpdated:      "now",
                                        InfoJobIdentity:  "jobId",
                                        TargetUri:        "target",
-                                       InfoJobData:      "{}",
+                                       InfoJobData:      jobs.Parameters{},
                                        InfoTypeIdentity: "type",
                                },
                        },
                        wantedStatus: http.StatusOK,
                },
                {
-                       name: "AddInfoJobHandler with incorrect job info, should return BadRequest",
+                       name: "AddInfoJobToJobsHandler with incorrect job info, should return BadRequest",
                        args: args{
                                job: jobs.JobInfo{
                                        Owner: "bad",
@@ -129,10 +129,10 @@ func TestAddInfoJobHandler(t *testing.T) {
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       jobHandlerMock := jobhandler.JobHandler{}
-                       jobHandlerMock.On("AddJobFromRESTCall", tt.args.job).Return(tt.args.mockReturn)
+                       jobsHandlerMock := jobshandler.JobsHandler{}
+                       jobsHandlerMock.On("AddJobFromRESTCall", tt.args.job).Return(tt.args.mockReturn)
 
-                       callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
+                       callbackHandlerUnderTest := NewProducerCallbackHandler(&jobsHandlerMock)
 
                        handler := http.HandlerFunc(callbackHandlerUnderTest.addInfoJobHandler)
                        responseRecorder := httptest.NewRecorder()
@@ -142,17 +142,17 @@ func TestAddInfoJobHandler(t *testing.T) {
 
                        assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
                        assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
-                       jobHandlerMock.AssertCalled(t, "AddJobFromRESTCall", tt.args.job)
+                       jobsHandlerMock.AssertCalled(t, "AddJobFromRESTCall", tt.args.job)
                })
        }
 }
 
 func TestDeleteJob(t *testing.T) {
        assertions := require.New(t)
-       jobHandlerMock := jobhandler.JobHandler{}
-       jobHandlerMock.On("DeleteJobFromRESTCall", mock.Anything).Return(nil)
+       jobsHandlerMock := jobshandler.JobsHandler{}
+       jobsHandlerMock.On("DeleteJobFromRESTCall", mock.Anything).Return(nil)
 
-       callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
+       callbackHandlerUnderTest := NewProducerCallbackHandler(&jobsHandlerMock)
 
        responseRecorder := httptest.NewRecorder()
        r := mux.SetURLVars(newRequest(http.MethodDelete, "/jobs/", nil, t), map[string]string{"infoJobId": "job1"})
@@ -162,7 +162,7 @@ func TestDeleteJob(t *testing.T) {
 
        assertions.Equal("", responseRecorder.Body.String())
 
-       jobHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
+       jobsHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
 }
 
 func TestSetLogLevel(t *testing.T) {
index 1aabdda..819ffa9 100644 (file)
@@ -29,6 +29,7 @@ import (
        log "github.com/sirupsen/logrus"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
 )
@@ -55,9 +56,12 @@ func main() {
        } else {
                log.Fatalf("Stopping producer due to error: %v", err)
        }
+
        retryClient := restclient.CreateRetryClient(cert)
+       kafkaFactory := kafkaclient.KafkaFactoryImpl{BootstrapServer: configuration.KafkaBootstrapServers}
+       distributionClient := restclient.CreateClientWithoutRetry(cert, 10*time.Second)
 
-       jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
+       jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, kafkaFactory, distributionClient)
        go startCallbackServer(jobsManager, callbackAddress)
 
        if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
@@ -78,11 +82,14 @@ func validateConfiguration(configuration *config.Config) error {
        if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" {
                return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY")
        }
+       if configuration.DMaaPMRAddress == "" && configuration.KafkaBootstrapServers == "" {
+               return fmt.Errorf("at least one of DMAAP_MR_ADDR or KAFKA_BOOTSRAP_SERVERS must be provided")
+       }
        return nil
 }
 func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
        registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
-       configTypes, err := config.GetJobTypesFromConfiguration("configs/type_config.json")
+       configTypes, err := config.GetJobTypesFromConfiguration("configs")
        if err != nil {
                return fmt.Errorf("unable to register all types due to: %v", err)
        }
diff --git a/dmaap-mediator-producer/mocks/KafkaConsumer.go b/dmaap-mediator-producer/mocks/KafkaConsumer.go
new file mode 100644 (file)
index 0000000..8ae0893
--- /dev/null
@@ -0,0 +1,76 @@
+// Code generated by mockery v1.0.0. DO NOT EDIT.
+
+package mocks
+
+import (
+       kafka "github.com/confluentinc/confluent-kafka-go/kafka"
+
+       mock "github.com/stretchr/testify/mock"
+
+       time "time"
+)
+
+// KafkaConsumer is an autogenerated mock type for the KafkaConsumer type
+type KafkaConsumer struct {
+       mock.Mock
+}
+
+// Commit provides a mock function with given fields:
+func (_m KafkaConsumer) Commit() ([]kafka.TopicPartition, error) {
+       ret := _m.Called()
+
+       var r0 []kafka.TopicPartition
+       if rf, ok := ret.Get(0).(func() []kafka.TopicPartition); ok {
+               r0 = rf()
+       } else {
+               if ret.Get(0) != nil {
+                       r0 = ret.Get(0).([]kafka.TopicPartition)
+               }
+       }
+
+       var r1 error
+       if rf, ok := ret.Get(1).(func() error); ok {
+               r1 = rf()
+       } else {
+               r1 = ret.Error(1)
+       }
+
+       return r0, r1
+}
+
+// ReadMessage provides a mock function with given fields: timeout
+func (_m KafkaConsumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
+       ret := _m.Called(timeout)
+
+       var r0 *kafka.Message
+       if rf, ok := ret.Get(0).(func(time.Duration) *kafka.Message); ok {
+               r0 = rf(timeout)
+       } else {
+               if ret.Get(0) != nil {
+                       r0 = ret.Get(0).(*kafka.Message)
+               }
+       }
+
+       var r1 error
+       if rf, ok := ret.Get(1).(func(time.Duration) error); ok {
+               r1 = rf(timeout)
+       } else {
+               r1 = ret.Error(1)
+       }
+
+       return r0, r1
+}
+
+// Subscribe provides a mock function with given fields: topic
+func (_m KafkaConsumer) Subscribe(topic string) error {
+       ret := _m.Called(topic)
+
+       var r0 error
+       if rf, ok := ret.Get(0).(func(string) error); ok {
+               r0 = rf(topic)
+       } else {
+               r0 = ret.Error(0)
+       }
+
+       return r0
+}
diff --git a/dmaap-mediator-producer/mocks/KafkaFactory.go b/dmaap-mediator-producer/mocks/KafkaFactory.go
new file mode 100644 (file)
index 0000000..f05457a
--- /dev/null
@@ -0,0 +1,36 @@
+// Code generated by mockery v1.0.0. DO NOT EDIT.
+
+package mocks
+
+import (
+       mock "github.com/stretchr/testify/mock"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
+)
+
+// KafkaFactory is an autogenerated mock type for the KafkaFactory type
+type KafkaFactory struct {
+       mock.Mock
+}
+
+// NewKafkaConsumer provides a mock function with given fields: topicID
+func (_m KafkaFactory) NewKafkaConsumer(topicID string) (kafkaclient.KafkaConsumer, error) {
+       ret := _m.Called(topicID)
+
+       var r0 kafkaclient.KafkaConsumer
+       if rf, ok := ret.Get(0).(func(string) kafkaclient.KafkaConsumer); ok {
+               r0 = rf(topicID)
+       } else {
+               if ret.Get(0) != nil {
+                       r0 = ret.Get(0).(kafkaclient.KafkaConsumer)
+               }
+       }
+
+       var r1 error
+       if rf, ok := ret.Get(1).(func(string) error); ok {
+               r1 = rf(topicID)
+       } else {
+               r1 = ret.Error(1)
+       }
+
+       return r0, r1
+}
@@ -1,19 +1,19 @@
 // Code generated by mockery v2.9.3. DO NOT EDIT.
 
-package jobhandler
+package jobshandler
 
 import (
        mock "github.com/stretchr/testify/mock"
        jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
 )
 
-// JobHandler is an autogenerated mock type for the JobHandler type
-type JobHandler struct {
+// JobsHandler is an autogenerated mock type for the JobsHandler type
+type JobsHandler struct {
        mock.Mock
 }
 
 // AddJob provides a mock function with given fields: _a0
-func (_m *JobHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error {
+func (_m *JobsHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error {
        ret := _m.Called(_a0)
 
        var r0 error
@@ -27,6 +27,6 @@ func (_m *JobHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error {
 }
 
 // DeleteJob provides a mock function with given fields: jobId
-func (_m *JobHandler) DeleteJobFromRESTCall(jobId string) {
+func (_m *JobsHandler) DeleteJobFromRESTCall(jobId string) {
        _m.Called(jobId)
 }