From 6f5d3d1eccb8a1857c645ba6bd0b5e1b89ca7088 Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Thu, 23 Dec 2021 16:36:31 +0100 Subject: [PATCH] Add kafka jobs to DMaaP Mediator Producer Issue-ID: NONRTRIC-702 Signed-off-by: elinuxhenrik Change-Id: Ia7123240ad91449209afc4769b850cb0c8d9598f --- .../configs/typeSchemaDmaap.json | 10 + .../configs/typeSchemaKafka.json | 26 ++ dmaap-mediator-producer/configs/type_config.json | 4 + dmaap-mediator-producer/go.mod | 1 + dmaap-mediator-producer/go.sum | 2 + dmaap-mediator-producer/internal/config/config.go | 37 ++- .../internal/config/config_test.go | 43 ++- .../internal/config/registrator.go | 19 +- .../internal/config/registrator_test.go | 3 +- dmaap-mediator-producer/internal/jobs/jobs.go | 196 +++++++++++--- dmaap-mediator-producer/internal/jobs/jobs_test.go | 292 ++++++++++++++++++--- .../internal/kafkaclient/kafkaclient.go | 94 +++++++ .../internal/server/server_test.go | 26 +- dmaap-mediator-producer/main.go | 11 +- dmaap-mediator-producer/mocks/KafkaConsumer.go | 76 ++++++ dmaap-mediator-producer/mocks/KafkaFactory.go | 36 +++ .../JobHandler.go => jobshandler/JobsHandler.go} | 10 +- 17 files changed, 784 insertions(+), 102 deletions(-) create mode 100644 dmaap-mediator-producer/configs/typeSchemaDmaap.json create mode 100644 dmaap-mediator-producer/configs/typeSchemaKafka.json create mode 100644 dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go create mode 100644 dmaap-mediator-producer/mocks/KafkaConsumer.go create mode 100644 dmaap-mediator-producer/mocks/KafkaFactory.go rename dmaap-mediator-producer/mocks/{jobhandler/JobHandler.go => jobshandler/JobsHandler.go} (66%) diff --git a/dmaap-mediator-producer/configs/typeSchemaDmaap.json b/dmaap-mediator-producer/configs/typeSchemaDmaap.json new file mode 100644 index 00000000..a50b236f --- /dev/null +++ b/dmaap-mediator-producer/configs/typeSchemaDmaap.json @@ -0,0 +1,10 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "filter": { + "type": "string" + } + }, + "additionalProperties": false +} diff --git a/dmaap-mediator-producer/configs/typeSchemaKafka.json b/dmaap-mediator-producer/configs/typeSchemaKafka.json new file mode 100644 index 00000000..dcd40f9b --- /dev/null +++ b/dmaap-mediator-producer/configs/typeSchemaKafka.json @@ -0,0 +1,26 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "filter": { + "type": "string" + }, + "bufferTimeout": { + "type": "object", + "properties": { + "maxSize": { + "type": "integer" + }, + "maxTimeMiliseconds": { + "type": "integer" + } + }, + "additionalProperties": false, + "required": [ + "maxSize", + "maxTimeMiliseconds" + ] + } + }, + "additionalProperties": false +} \ No newline at end of file diff --git a/dmaap-mediator-producer/configs/type_config.json b/dmaap-mediator-producer/configs/type_config.json index f75d0e4b..11496693 100644 --- a/dmaap-mediator-producer/configs/type_config.json +++ b/dmaap-mediator-producer/configs/type_config.json @@ -4,6 +4,10 @@ { "id": "STD_Fault_Messages", "dmaapTopicUrl": "/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages" + }, + { + "id": "Kafka_TestTopic", + "kafkaInputTopic": "TestTopic" } ] } \ No newline at end of file diff --git a/dmaap-mediator-producer/go.mod b/dmaap-mediator-producer/go.mod index eaaecf7f..e701c011 100644 --- a/dmaap-mediator-producer/go.mod +++ b/dmaap-mediator-producer/go.mod @@ -10,6 +10,7 @@ require ( ) require ( + github.com/confluentinc/confluent-kafka-go v1.8.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/hashicorp/go-cleanhttp v0.5.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect diff --git a/dmaap-mediator-producer/go.sum b/dmaap-mediator-producer/go.sum index 4b3557bb..cf2c90fb 100644 --- a/dmaap-mediator-producer/go.sum +++ b/dmaap-mediator-producer/go.sum @@ -1,3 +1,5 @@ +github.com/confluentinc/confluent-kafka-go v1.8.2 h1:PBdbvYpyOdFLehj8j+9ba7FL4c4Moxn79gy9cYKxG5E= +github.com/confluentinc/confluent-kafka-go v1.8.2/go.mod h1:u2zNLny2xq+5rWeTQjFHbDzzNuba4P1vo31r9r4uAdg= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/dmaap-mediator-producer/internal/config/config.go b/dmaap-mediator-producer/internal/config/config.go index e03c40ac..7582e9cc 100644 --- a/dmaap-mediator-producer/internal/config/config.go +++ b/dmaap-mediator-producer/internal/config/config.go @@ -24,6 +24,7 @@ import ( "encoding/json" "fmt" "os" + "path/filepath" "strconv" log "github.com/sirupsen/logrus" @@ -35,6 +36,7 @@ type Config struct { InfoProducerPort int InfoCoordinatorAddress string DMaaPMRAddress string + KafkaBootstrapServers string ProducerCertPath string ProducerKeyPath string } @@ -45,6 +47,7 @@ func New() *Config { InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085), InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"), DMaaPMRAddress: getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"), + KafkaBootstrapServers: getEnv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9092"), ProducerCertPath: getEnv("PRODUCER_CERT_PATH", "security/producer.crt"), ProducerKeyPath: getEnv("PRODUCER_KEY_PATH", "security/producer.key"), LogLevel: getLogLevel(), @@ -83,8 +86,8 @@ func getLogLevel() log.Level { } } -func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) { - typeDefsByte, err := os.ReadFile(configFile) +func GetJobTypesFromConfiguration(configFolder string) ([]TypeDefinition, error) { + typeDefsByte, err := os.ReadFile(filepath.Join(configFolder, "type_config.json")) if err != nil { return nil, err } @@ -96,5 +99,35 @@ func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) { return nil, err } + kafkaTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaKafka.json")) + if err != nil { + return nil, err + } + + dMaaPTypeSchema, err := getTypeSchema(filepath.Join(configFolder, "typeSchemaDmaap.json")) + if err != nil { + return nil, err + } + + for i, typeDef := range typeDefs.Types { + if typeDef.IsKafkaType() { + typeDefs.Types[i].TypeSchema = kafkaTypeSchema + } else { + typeDefs.Types[i].TypeSchema = dMaaPTypeSchema + } + } return typeDefs.Types, nil } + +func getTypeSchema(schemaFile string) (interface{}, error) { + typeDefsByte, err := os.ReadFile(schemaFile) + if err != nil { + return nil, err + } + var schema interface{} + err = json.Unmarshal(typeDefsByte, &schema) + if err != nil { + return nil, err + } + return schema, nil +} diff --git a/dmaap-mediator-producer/internal/config/config_test.go b/dmaap-mediator-producer/internal/config/config_test.go index faf5900d..e66a8182 100644 --- a/dmaap-mediator-producer/internal/config/config_test.go +++ b/dmaap-mediator-producer/internal/config/config_test.go @@ -22,6 +22,7 @@ package config import ( "bytes" + "encoding/json" "os" "path/filepath" "testing" @@ -37,6 +38,7 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { os.Setenv("INFO_PRODUCER_PORT", "8095") os.Setenv("INFO_COORD_ADDR", "infoCoordAddr") os.Setenv("DMAAP_MR_ADDR", "mrHost:3908") + os.Setenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:9093") os.Setenv("PRODUCER_CERT_PATH", "cert") os.Setenv("PRODUCER_KEY_PATH", "key") t.Cleanup(func() { @@ -48,6 +50,7 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) { InfoProducerPort: 8095, InfoCoordinatorAddress: "infoCoordAddr", DMaaPMRAddress: "mrHost:3908", + KafkaBootstrapServers: "localhost:9093", ProducerCertPath: "cert", ProducerKeyPath: "key", } @@ -72,6 +75,7 @@ func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T InfoProducerPort: 8085, InfoCoordinatorAddress: "https://informationservice:8434", DMaaPMRAddress: "https://message-router.onap:3905", + KafkaBootstrapServers: "localhost:9092", ProducerCertPath: "security/producer.crt", ProducerKeyPath: "security/producer.key", } @@ -98,6 +102,7 @@ func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) { InfoProducerPort: 8085, InfoCoordinatorAddress: "https://informationservice:8434", DMaaPMRAddress: "https://message-router.onap:3905", + KafkaBootstrapServers: "localhost:9092", ProducerCertPath: "security/producer.crt", ProducerKeyPath: "security/producer.key", } @@ -109,7 +114,17 @@ func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) { assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!") } -const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}` +const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}, {"id": "type2", "kafkaInputTopic": "TestTopic"}]}` +const typeSchemaFileContent = `{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "properties": { + "filter": { + "type": "string" + } + }, + "additionalProperties": false + }` func TestGetTypesFromConfiguration_fileOkShouldReturnSliceOfTypeDefinitions(t *testing.T) { assertions := require.New(t) @@ -124,14 +139,30 @@ func TestGetTypesFromConfiguration_fileOkShouldReturnSliceOfTypeDefinitions(t *t if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil { t.Errorf("Unable to create temporary config file for types due to: %v", err) } + fname = filepath.Join(typesDir, "typeSchemaDmaap.json") + if err = os.WriteFile(fname, []byte(typeSchemaFileContent), 0666); err != nil { + t.Errorf("Unable to create temporary schema file for DMaaP type due to: %v", err) + } + fname = filepath.Join(typesDir, "typeSchemaKafka.json") + if err = os.WriteFile(fname, []byte(typeSchemaFileContent), 0666); err != nil { + t.Errorf("Unable to create temporary schema file for Kafka type due to: %v", err) + } + var typeSchemaObj interface{} + json.Unmarshal([]byte(typeSchemaFileContent), &typeSchemaObj) - types, err := GetJobTypesFromConfiguration(fname) + types, err := GetJobTypesFromConfiguration(typesDir) - wantedType := TypeDefinition{ - Id: "type1", - DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", + wantedDMaaPType := TypeDefinition{ + Identity: "type1", + DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", + TypeSchema: typeSchemaObj, + } + wantedKafkaType := TypeDefinition{ + Identity: "type2", + KafkaInputTopic: "TestTopic", + TypeSchema: typeSchemaObj, } - wantedTypes := []TypeDefinition{wantedType} + wantedTypes := []TypeDefinition{wantedDMaaPType, wantedKafkaType} assertions.EqualValues(wantedTypes, types) assertions.Nil(err) } diff --git a/dmaap-mediator-producer/internal/config/registrator.go b/dmaap-mediator-producer/internal/config/registrator.go index 83ed43f2..bac14e63 100644 --- a/dmaap-mediator-producer/internal/config/registrator.go +++ b/dmaap-mediator-producer/internal/config/registrator.go @@ -32,11 +32,20 @@ import ( const registerTypePath = "/data-producer/v1/info-types/" const registerProducerPath = "/data-producer/v1/info-producers/" -const typeSchema = `{"type": "object","properties": {},"additionalProperties": false}` type TypeDefinition struct { - Id string `json:"id"` - DmaapTopicURL string `json:"dmaapTopicUrl"` + Identity string `json:"id"` + DMaaPTopicURL string `json:"dmaapTopicUrl"` + KafkaInputTopic string `json:"kafkaInputTopic"` + TypeSchema interface{} +} + +func (td TypeDefinition) IsKafkaType() bool { + return td.KafkaInputTopic != "" +} + +func (td TypeDefinition) IsDMaaPType() bool { + return td.DMaaPTopicURL != "" } type ProducerRegistrationInfo struct { @@ -64,8 +73,8 @@ func NewRegistratorImpl(infoCoordAddr string, client restclient.HTTPClient) *Reg func (r RegistratorImpl) RegisterTypes(jobTypes []TypeDefinition) error { for _, jobType := range jobTypes { - body := fmt.Sprintf(`{"info_job_data_schema": %v}`, typeSchema) - if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Id), []byte(body), r.httpClient); error != nil { + body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.TypeSchema) + if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Identity), []byte(body), r.httpClient); error != nil { return error } log.Debugf("Registered type: %v", jobType) diff --git a/dmaap-mediator-producer/internal/config/registrator_test.go b/dmaap-mediator-producer/internal/config/registrator_test.go index 324aed0c..d1b61d87 100644 --- a/dmaap-mediator-producer/internal/config/registrator_test.go +++ b/dmaap-mediator-producer/internal/config/registrator_test.go @@ -40,7 +40,8 @@ func TestRegisterTypes(t *testing.T) { }, nil) type1 := TypeDefinition{ - Id: "Type1", + Identity: "Type1", + TypeSchema: `{"type": "object","properties": {},"additionalProperties": false}`, } types := []TypeDefinition{type1} diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index 867894f7..3ef5ca3e 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -25,24 +25,25 @@ import ( "sync" "time" + "github.com/confluentinc/confluent-kafka-go/kafka" log "github.com/sirupsen/logrus" "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" ) type TypeData struct { - TypeId string `json:"id"` - DMaaPTopicURL string `json:"dmaapTopicUrl"` - jobsHandler *jobsHandler + Identity string `json:"id"` + jobsHandler *jobsHandler } type JobInfo struct { - Owner string `json:"owner"` - LastUpdated string `json:"last_updated"` - InfoJobIdentity string `json:"info_job_identity"` - TargetUri string `json:"target_uri"` - InfoJobData interface{} `json:"info_job_data"` - InfoTypeIdentity string `json:"info_type_identity"` + Owner string `json:"owner"` + LastUpdated string `json:"last_updated"` + InfoJobIdentity string `json:"info_job_identity"` + TargetUri string `json:"target_uri"` + InfoJobData Parameters `json:"info_job_data"` + InfoTypeIdentity string `json:"info_type_identity"` } type JobTypesManager interface { @@ -59,14 +60,16 @@ type JobsManagerImpl struct { allTypes map[string]TypeData pollClient restclient.HTTPClient mrAddress string + kafkaFactory kafkaclient.KafkaFactory distributeClient restclient.HTTPClient } -func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl { +func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl { return &JobsManagerImpl{ allTypes: make(map[string]TypeData), pollClient: pollClient, mrAddress: mrAddr, + kafkaFactory: kafkaFactory, distributeClient: distributeClient, } } @@ -84,7 +87,7 @@ func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error { func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) { for _, typeData := range jm.allTypes { - log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId) + log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity) typeData.jobsHandler.deleteJobCh <- jobId } log.Debug("Deleted job: ", jobId) @@ -106,10 +109,12 @@ func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error { func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition { for _, typeDef := range types { - jm.allTypes[typeDef.Id] = TypeData{ - TypeId: typeDef.Id, - DMaaPTopicURL: typeDef.DmaapTopicURL, - jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient), + if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" { + log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity) + } + jm.allTypes[typeDef.Identity] = TypeData{ + Identity: typeDef.Identity, + jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient), } } return types @@ -126,7 +131,7 @@ func (jm *JobsManagerImpl) GetSupportedTypes() []string { func (jm *JobsManagerImpl) StartJobsForAllTypes() { for _, jobType := range jm.allTypes { - go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress) + go jobType.jobsHandler.startPollingAndDistribution() } } @@ -134,30 +139,29 @@ func (jm *JobsManagerImpl) StartJobsForAllTypes() { type jobsHandler struct { mu sync.Mutex typeId string - topicUrl string + pollingAgent pollingAgent jobs map[string]job addJobCh chan JobInfo deleteJobCh chan string - pollClient restclient.HTTPClient distributeClient restclient.HTTPClient } -func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler { +func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler { + pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic) return &jobsHandler{ - typeId: typeId, - topicUrl: topicURL, + typeId: typeDef.Identity, + pollingAgent: pollingAgent, jobs: make(map[string]job), addJobCh: make(chan JobInfo), deleteJobCh: make(chan string), - pollClient: pollClient, distributeClient: distributeClient, } } -func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) { +func (jh *jobsHandler) startPollingAndDistribution() { go func() { for { - jh.pollAndDistributeMessages(mRAddress) + jh.pollAndDistributeMessages() } }() @@ -168,19 +172,20 @@ func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) { }() } -func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) { +func (jh *jobsHandler) pollAndDistributeMessages() { log.Debugf("Processing jobs for type: %v", jh.typeId) - messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient) + messagesBody, error := jh.pollingAgent.pollMessages() if error != nil { - log.Warn("Error getting data from MR. Cause: ", error) - time.Sleep(time.Minute) // Must wait before trying to call MR again + log.Warn("Error getting data from source. Cause: ", error) + time.Sleep(time.Minute) // Must wait before trying to call data source again + return } - log.Debug("Received messages: ", string(messagesBody)) jh.distributeMessages(messagesBody) } func (jh *jobsHandler) distributeMessages(messages []byte) { - if len(messages) > 2 { + if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages. + log.Debug("Distributing messages: ", string(messages)) jh.mu.Lock() defer jh.mu.Unlock() for _, job := range jh.jobs { @@ -234,6 +239,61 @@ func (jh *jobsHandler) deleteJob(deletedJob string) { jh.mu.Unlock() } +type pollingAgent interface { + pollMessages() ([]byte, error) +} + +func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent { + if typeDef.DMaaPTopicURL != "" { + return dMaaPPollingAgent{ + messageRouterURL: mRAddress + typeDef.DMaaPTopicURL, + pollClient: pollClient, + } + } else { + return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic) + } +} + +type dMaaPPollingAgent struct { + messageRouterURL string + pollClient restclient.HTTPClient +} + +func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) { + return restclient.Get(pa.messageRouterURL, pa.pollClient) +} + +type kafkaPollingAgent struct { + kafkaClient kafkaclient.KafkaClient +} + +func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent { + c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID) + if err != nil { + log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err) + } + return kafkaPollingAgent{ + kafkaClient: c, + } +} + +func (pa kafkaPollingAgent) pollMessages() ([]byte, error) { + msg, err := pa.kafkaClient.ReadMessage() + if err == nil { + return msg, nil + } else { + if isKafkaTimedOutError(err) { + return []byte(""), nil + } + return nil, err + } +} + +func isKafkaTimedOutError(err error) bool { + kafkaErr, ok := err.(kafka.Error) + return ok && kafkaErr.Code() == kafka.ErrTimedOut +} + type job struct { jobInfo JobInfo client restclient.HTTPClient @@ -242,6 +302,7 @@ type job struct { } func newJob(j JobInfo, c restclient.HTTPClient) job { + return job{ jobInfo: j, client: c, @@ -250,7 +311,24 @@ func newJob(j JobInfo, c restclient.HTTPClient) job { } } +type Parameters struct { + BufferTimeout BufferTimeout `json:"bufferTimeout"` +} + +type BufferTimeout struct { + MaxSize int `json:"maxSize"` + MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"` +} + func (j *job) start() { + if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 { + j.startReadingSingleMessages() + } else { + j.startReadingMessagesBuffered() + } +} + +func (j *job) startReadingSingleMessages() { out: for { select { @@ -263,10 +341,68 @@ out: } } +func (j *job) startReadingMessagesBuffered() { +out: + for { + select { + case <-j.controlChannel: + log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity) + break out + default: + msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout) + if len(msgs) > 0 { + j.sendMessagesToConsumer(msgs) + } + } + } +} + +func (j *job) read(bufferParams BufferTimeout) []byte { + wg := sync.WaitGroup{} + wg.Add(bufferParams.MaxSize) + var msgs []byte + c := make(chan struct{}) + go func() { + i := 0 + out: + for { + select { + case <-c: + break out + case msg := <-j.messagesChannel: + i++ + msgs = append(msgs, msg...) + wg.Done() + if i == bufferParams.MaxSize { + break out + } + } + } + }() + j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond) + close(c) + return msgs +} + +func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { + c := make(chan struct{}) + go func() { + defer close(c) + wg.Wait() + }() + select { + case <-c: + return false // completed normally + case <-time.After(timeout): + return true // timed out + } +} + func (j *job) sendMessagesToConsumer(messages []byte) { log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity) if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil { log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr) + return } log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner) } diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 30b4ffd9..ab1165c9 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -22,52 +22,60 @@ package jobs import ( "bytes" + "fmt" "io/ioutil" "net/http" + "strconv" "sync" "testing" "time" + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient" + "oransc.org/nonrtric/dmaapmediatorproducer/mocks" ) -const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}` - -func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { +func TestJobsManagerGetTypes_shouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl(nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) - wantedType := config.TypeDefinition{ - Id: "type1", - DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", + wantedDMaaPType := config.TypeDefinition{ + Identity: "type1", + DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", + } + wantedKafkaType := config.TypeDefinition{ + Identity: "type2", + KafkaInputTopic: "topic", } - wantedTypes := []config.TypeDefinition{wantedType} + wantedTypes := []config.TypeDefinition{wantedDMaaPType, wantedKafkaType} types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes) assertions.EqualValues(wantedTypes, types) supportedTypes := managerUnderTest.GetSupportedTypes() - assertions.EqualValues([]string{"type1"}, supportedTypes) + assertions.ElementsMatch([]string{"type1", "type2"}, supportedTypes) } func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl(nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) wantedJob := JobInfo{ Owner: "owner", LastUpdated: "now", InfoJobIdentity: "job1", TargetUri: "target", - InfoJobData: "{}", + InfoJobData: Parameters{}, InfoTypeIdentity: "type1", } jobsHandler := jobsHandler{ addJobCh: make(chan JobInfo)} managerUnderTest.allTypes["type1"] = TypeData{ - TypeId: "type1", + Identity: "type1", jobsHandler: &jobsHandler, } @@ -83,7 +91,7 @@ func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl(nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) jobInfo := JobInfo{ InfoTypeIdentity: "type1", } @@ -95,9 +103,9 @@ func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl(nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) managerUnderTest.allTypes["type1"] = TypeData{ - TypeId: "type1", + Identity: "type1", } jobInfo := JobInfo{ @@ -105,14 +113,14 @@ func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { } err := managerUnderTest.AddJobFromRESTCall(jobInfo) assertions.NotNil(err) - assertions.Equal("missing required job identity: { type1}", err.Error()) + assertions.Equal("missing required job identity: { {{0 0}} type1}", err.Error()) } func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl(nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) managerUnderTest.allTypes["type1"] = TypeData{ - TypeId: "type1", + Identity: "type1", } jobInfo := JobInfo{ @@ -121,16 +129,16 @@ func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { } err := managerUnderTest.AddJobFromRESTCall(jobInfo) assertions.NotNil(err) - assertions.Equal("missing required target URI: { job1 type1}", err.Error()) + assertions.Equal("missing required target URI: { job1 {{0 0}} type1}", err.Error()) } func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) { assertions := require.New(t) - managerUnderTest := NewJobsManagerImpl(nil, "", nil) + managerUnderTest := NewJobsManagerImpl(nil, "", kafkaclient.KafkaFactoryImpl{}, nil) jobsHandler := jobsHandler{ deleteJobCh: make(chan string)} managerUnderTest.allTypes["type1"] = TypeData{ - TypeId: "type1", + Identity: "type1", jobsHandler: &jobsHandler, } @@ -139,21 +147,21 @@ func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) { assertions.Equal("job2", <-jobsHandler.deleteJobCh) } -func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) { +func TestStartJobsManagerAddDMaaPJob_shouldStartPollAndDistributeMessages(t *testing.T) { assertions := require.New(t) called := false - messages := `[{"message": {"data": "data"}}]` + dMaaPMessages := `[{"message": {"data": "dmaap"}}]` pollClientMock := NewTestClient(func(req *http.Request) *http.Response { if req.URL.String() == "http://mrAddr/topicUrl" { assertions.Equal(req.Method, "GET") body := "[]" if !called { called = true - body = messages + body = dMaaPMessages } return &http.Response{ - StatusCode: 200, + StatusCode: http.StatusOK, Body: ioutil.NopCloser(bytes.NewReader([]byte(body))), Header: make(http.Header), // Must be set to non-nil value or it panics } @@ -165,9 +173,9 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) wg := sync.WaitGroup{} distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { - if req.URL.String() == "http://consumerHost/target" { + if req.URL.String() == "http://consumerHost/dmaaptarget" { assertions.Equal(req.Method, "POST") - assertions.Equal(messages, getBodyAsString(req, t)) + assertions.Equal(dMaaPMessages, getBodyAsString(req, t)) assertions.Equal("application/json", req.Header.Get("Content-Type")) wg.Done() return &http.Response{ @@ -180,25 +188,88 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) t.Fail() return nil }) - jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock) - - jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock) - jobsManager.allTypes["type1"] = TypeData{ + dMaaPTypeDef := config.TypeDefinition{ + Identity: "type1", DMaaPTopicURL: "/topicUrl", - TypeId: "type1", - jobsHandler: jobsHandler, } + dMaaPJobsHandler := newJobsHandler(dMaaPTypeDef, "http://mrAddr", nil, pollClientMock, distributeClientMock) + jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, distributeClientMock) + jobsManager.allTypes["type1"] = TypeData{ + Identity: "type1", + jobsHandler: dMaaPJobsHandler, + } jobsManager.StartJobsForAllTypes() - jobInfo := JobInfo{ + dMaaPJobInfo := JobInfo{ InfoTypeIdentity: "type1", InfoJobIdentity: "job1", - TargetUri: "http://consumerHost/target", + TargetUri: "http://consumerHost/dmaaptarget", } wg.Add(1) // Wait till the distribution has happened - err := jobsManager.AddJobFromRESTCall(jobInfo) + err := jobsManager.AddJobFromRESTCall(dMaaPJobInfo) + assertions.Nil(err) + + if waitTimeout(&wg, 2*time.Second) { + t.Error("Not all calls to server were made") + t.Fail() + } +} + +func TestStartJobsManagerAddKafkaJob_shouldStartPollAndDistributeMessages(t *testing.T) { + assertions := require.New(t) + + kafkaMessages := `1` + wg := sync.WaitGroup{} + distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://consumerHost/kafkatarget" { + assertions.Equal(req.Method, "POST") + assertions.Equal(kafkaMessages, getBodyAsString(req, t)) + assertions.Equal("application/json", req.Header.Get("Content-Type")) + wg.Done() + return &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)), + Header: make(http.Header), // Must be set to non-nil value or it panics + } + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil + }) + + kafkaTypeDef := config.TypeDefinition{ + Identity: "type2", + KafkaInputTopic: "topic", + } + kafkaFactoryMock := mocks.KafkaFactory{} + kafkaConsumerMock := mocks.KafkaConsumer{} + kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil)) + kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil)) + kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(&kafka.Message{ + Value: []byte(kafkaMessages), + }, error(nil)).Once() + kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, fmt.Errorf("Just to stop")) + kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil) + kafkaJobsHandler := newJobsHandler(kafkaTypeDef, "", kafkaFactoryMock, nil, distributeClientMock) + + jobsManager := NewJobsManagerImpl(nil, "", kafkaFactoryMock, distributeClientMock) + jobsManager.allTypes["type2"] = TypeData{ + Identity: "type2", + jobsHandler: kafkaJobsHandler, + } + + jobsManager.StartJobsForAllTypes() + + kafkaJobInfo := JobInfo{ + InfoTypeIdentity: "type2", + InfoJobIdentity: "job2", + TargetUri: "http://consumerHost/kafkatarget", + } + + wg.Add(1) // Wait till the distribution has happened + err := jobsManager.AddJobFromRESTCall(kafkaJobInfo) assertions.Nil(err) if waitTimeout(&wg, 2*time.Second) { @@ -210,7 +281,11 @@ func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) { jobToDelete := newJob(JobInfo{}, nil) go jobToDelete.start() - jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil) + typeDef := config.TypeDefinition{ + Identity: "type1", + DMaaPTopicURL: "/topicUrl", + } + jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil) jobsHandler.jobs["job1"] = jobToDelete go jobsHandler.monitorManagementChannels() @@ -233,7 +308,11 @@ func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) { InfoJobIdentity: "job", }, nil) - jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil) + typeDef := config.TypeDefinition{ + Identity: "type1", + DMaaPTopicURL: "/topicUrl", + } + jobsHandler := newJobsHandler(typeDef, "http://mrAddr", kafkaclient.KafkaFactoryImpl{}, nil, nil) jobsHandler.jobs["job1"] = job fillMessagesBuffer(job.messagesChannel) @@ -243,6 +322,143 @@ func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) { require.New(t).Len(job.messagesChannel, 0) } +func TestKafkaPollingAgentTimedOut_shouldResultInEMptyMessages(t *testing.T) { + assertions := require.New(t) + + kafkaFactoryMock := mocks.KafkaFactory{} + kafkaConsumerMock := mocks.KafkaConsumer{} + kafkaConsumerMock.On("Commit").Return([]kafka.TopicPartition{}, error(nil)) + kafkaConsumerMock.On("Subscribe", mock.Anything).Return(error(nil)) + kafkaConsumerMock.On("ReadMessage", mock.Anything).Return(nil, kafka.NewError(kafka.ErrTimedOut, "", false)) + kafkaFactoryMock.On("NewKafkaConsumer", mock.Anything).Return(kafkaConsumerMock, nil) + + pollingAgentUnderTest := newKafkaPollingAgent(kafkaFactoryMock, "") + messages, err := pollingAgentUnderTest.pollMessages() + + assertions.Equal([]byte(""), messages) + assertions.Nil(err) +} + +func TestJobWithoutParameters_shouldSendOneMessageAtATime(t *testing.T) { + assertions := require.New(t) + + wg := sync.WaitGroup{} + messageNo := 1 + distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://consumerHost/target" { + assertions.Equal(req.Method, "POST") + assertions.Equal(fmt.Sprint("message", messageNo), getBodyAsString(req, t)) + messageNo++ + assertions.Equal("application/json", req.Header.Get("Content-Type")) + wg.Done() + return &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)), + Header: make(http.Header), // Must be set to non-nil value or it panics + } + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil + }) + + jobUnderTest := newJob(JobInfo{TargetUri: "http://consumerHost/target"}, distributeClientMock) + + wg.Add(2) + go jobUnderTest.start() + + jobUnderTest.messagesChannel <- []byte("message1") + jobUnderTest.messagesChannel <- []byte("message2") + + if waitTimeout(&wg, 2*time.Second) { + t.Error("Not all calls to server were made") + t.Fail() + } +} + +func TestJobWithBufferedParameters_shouldSendMessagesTogether(t *testing.T) { + assertions := require.New(t) + + wg := sync.WaitGroup{} + distributeClientMock := NewTestClient(func(req *http.Request) *http.Response { + if req.URL.String() == "http://consumerHost/target" { + assertions.Equal(req.Method, "POST") + assertions.Equal("12", getBodyAsString(req, t)) + assertions.Equal("application/json", req.Header.Get("Content-Type")) + wg.Done() + return &http.Response{ + StatusCode: 200, + Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)), + Header: make(http.Header), // Must be set to non-nil value or it panics + } + } + t.Error("Wrong call to client: ", req) + t.Fail() + return nil + }) + + jobUnderTest := newJob(JobInfo{ + TargetUri: "http://consumerHost/target", + InfoJobData: Parameters{ + BufferTimeout: BufferTimeout{ + MaxSize: 5, + MaxTimeMiliseconds: 200, + }, + }, + }, distributeClientMock) + + wg.Add(1) + go jobUnderTest.start() + + go func() { + jobUnderTest.messagesChannel <- []byte("1") + jobUnderTest.messagesChannel <- []byte("2") + }() + + if waitTimeout(&wg, 2*time.Second) { + t.Error("Not all calls to server were made") + t.Fail() + } +} + +func TestJobReadMoreThanBufferSizeMessages_shouldOnlyReturnMaxSizeNoOfMessages(t *testing.T) { + assertions := require.New(t) + + jobUnderTest := newJob(JobInfo{}, nil) + + go func() { + for i := 0; i < 4; i++ { + jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i)) + } + }() + + msgs := jobUnderTest.read(BufferTimeout{ + MaxSize: 2, + MaxTimeMiliseconds: 200, + }) + + assertions.Equal([]byte("01"), msgs) +} +func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) { + assertions := require.New(t) + + jobUnderTest := newJob(JobInfo{}, nil) + + go func() { + for i := 0; i < 4; i++ { + time.Sleep(10 * time.Millisecond) + jobUnderTest.messagesChannel <- []byte(strconv.Itoa(i)) + } + }() + + msgs := jobUnderTest.read(BufferTimeout{ + MaxSize: 2, + MaxTimeMiliseconds: 30, + }) + + assertions.Equal([]byte("01"), msgs) +} + func fillMessagesBuffer(mc chan []byte) { for i := 0; i < cap(mc); i++ { mc <- []byte("msg") diff --git a/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go b/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go new file mode 100644 index 00000000..16abcb4e --- /dev/null +++ b/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go @@ -0,0 +1,94 @@ +// - +// ========================LICENSE_START================================= +// O-RAN-SC +// %% +// Copyright (C) 2021: Nordix Foundation +// %% +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + +package kafkaclient + +import ( + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" +) + +type KafkaFactory interface { + NewKafkaConsumer(topicID string) (KafkaConsumer, error) +} + +type KafkaFactoryImpl struct { + BootstrapServer string +} + +func (kf KafkaFactoryImpl) NewKafkaConsumer(topicID string) (KafkaConsumer, error) { + consumer, err := kafka.NewConsumer(&kafka.ConfigMap{ + "bootstrap.servers": kf.BootstrapServer, + "group.id": "dmaap-mediator-producer", + "auto.offset.reset": "earliest", + }) + if err != nil { + return nil, err + } + return KafkaConsumerImpl{consumer: consumer}, nil +} + +func NewKafkaClient(factory KafkaFactory, topicID string) (KafkaClient, error) { + consumer, err := factory.NewKafkaConsumer(topicID) + if err != nil { + return KafkaClient{}, err + } + consumer.Commit() + err = consumer.Subscribe(topicID) + if err != nil { + return KafkaClient{}, err + } + return KafkaClient{consumer: consumer}, nil +} + +type KafkaClient struct { + consumer KafkaConsumer +} + +func (kc KafkaClient) ReadMessage() ([]byte, error) { + msg, err := kc.consumer.ReadMessage(time.Second) + if err != nil { + return nil, err + } + return msg.Value, nil +} + +type KafkaConsumer interface { + Commit() ([]kafka.TopicPartition, error) + Subscribe(topic string) (err error) + ReadMessage(timeout time.Duration) (*kafka.Message, error) +} + +type KafkaConsumerImpl struct { + consumer *kafka.Consumer +} + +func (kc KafkaConsumerImpl) Commit() ([]kafka.TopicPartition, error) { + return kc.consumer.Commit() +} + +func (kc KafkaConsumerImpl) Subscribe(topic string) error { + return kc.consumer.Subscribe(topic, nil) +} + +func (kc KafkaConsumerImpl) ReadMessage(timeout time.Duration) (*kafka.Message, error) { + return kc.consumer.ReadMessage(timeout) +} diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go index 6248c228..6fe4d7a7 100644 --- a/dmaap-mediator-producer/internal/server/server_test.go +++ b/dmaap-mediator-producer/internal/server/server_test.go @@ -34,7 +34,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" - "oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobhandler" + "oransc.org/nonrtric/dmaapmediatorproducer/mocks/jobshandler" ) func TestNewRouter(t *testing.T) { @@ -88,7 +88,7 @@ func TestNewRouter(t *testing.T) { assertions.Equal("/admin/log", path) } -func TestAddInfoJobHandler(t *testing.T) { +func TestAddInfoJobToJobsHandler(t *testing.T) { assertions := require.New(t) type args struct { @@ -102,21 +102,21 @@ func TestAddInfoJobHandler(t *testing.T) { wantedBody string }{ { - name: "AddInfoJobHandler with correct job, should return OK", + name: "AddInfoJobToJobsHandler with correct job, should return OK", args: args{ job: jobs.JobInfo{ Owner: "owner", LastUpdated: "now", InfoJobIdentity: "jobId", TargetUri: "target", - InfoJobData: "{}", + InfoJobData: jobs.Parameters{}, InfoTypeIdentity: "type", }, }, wantedStatus: http.StatusOK, }, { - name: "AddInfoJobHandler with incorrect job info, should return BadRequest", + name: "AddInfoJobToJobsHandler with incorrect job info, should return BadRequest", args: args{ job: jobs.JobInfo{ Owner: "bad", @@ -129,10 +129,10 @@ func TestAddInfoJobHandler(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - jobHandlerMock := jobhandler.JobHandler{} - jobHandlerMock.On("AddJobFromRESTCall", tt.args.job).Return(tt.args.mockReturn) + jobsHandlerMock := jobshandler.JobsHandler{} + jobsHandlerMock.On("AddJobFromRESTCall", tt.args.job).Return(tt.args.mockReturn) - callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock) + callbackHandlerUnderTest := NewProducerCallbackHandler(&jobsHandlerMock) handler := http.HandlerFunc(callbackHandlerUnderTest.addInfoJobHandler) responseRecorder := httptest.NewRecorder() @@ -142,17 +142,17 @@ func TestAddInfoJobHandler(t *testing.T) { assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name) assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name) - jobHandlerMock.AssertCalled(t, "AddJobFromRESTCall", tt.args.job) + jobsHandlerMock.AssertCalled(t, "AddJobFromRESTCall", tt.args.job) }) } } func TestDeleteJob(t *testing.T) { assertions := require.New(t) - jobHandlerMock := jobhandler.JobHandler{} - jobHandlerMock.On("DeleteJobFromRESTCall", mock.Anything).Return(nil) + jobsHandlerMock := jobshandler.JobsHandler{} + jobsHandlerMock.On("DeleteJobFromRESTCall", mock.Anything).Return(nil) - callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock) + callbackHandlerUnderTest := NewProducerCallbackHandler(&jobsHandlerMock) responseRecorder := httptest.NewRecorder() r := mux.SetURLVars(newRequest(http.MethodDelete, "/jobs/", nil, t), map[string]string{"infoJobId": "job1"}) @@ -162,7 +162,7 @@ func TestDeleteJob(t *testing.T) { assertions.Equal("", responseRecorder.Body.String()) - jobHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1") + jobsHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1") } func TestSetLogLevel(t *testing.T) { diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index 1aabddaa..819ffa9d 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -29,6 +29,7 @@ import ( log "github.com/sirupsen/logrus" "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient" "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" "oransc.org/nonrtric/dmaapmediatorproducer/internal/server" ) @@ -55,9 +56,12 @@ func main() { } else { log.Fatalf("Stopping producer due to error: %v", err) } + retryClient := restclient.CreateRetryClient(cert) + kafkaFactory := kafkaclient.KafkaFactoryImpl{BootstrapServer: configuration.KafkaBootstrapServers} + distributionClient := restclient.CreateClientWithoutRetry(cert, 10*time.Second) - jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second)) + jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, kafkaFactory, distributionClient) go startCallbackServer(jobsManager, callbackAddress) if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil { @@ -78,11 +82,14 @@ func validateConfiguration(configuration *config.Config) error { if configuration.ProducerCertPath == "" || configuration.ProducerKeyPath == "" { return fmt.Errorf("missing PRODUCER_CERT and/or PRODUCER_KEY") } + if configuration.DMaaPMRAddress == "" && configuration.KafkaBootstrapServers == "" { + return fmt.Errorf("at least one of DMAAP_MR_ADDR or KAFKA_BOOTSRAP_SERVERS must be provided") + } return nil } func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error { registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client) - configTypes, err := config.GetJobTypesFromConfiguration("configs/type_config.json") + configTypes, err := config.GetJobTypesFromConfiguration("configs") if err != nil { return fmt.Errorf("unable to register all types due to: %v", err) } diff --git a/dmaap-mediator-producer/mocks/KafkaConsumer.go b/dmaap-mediator-producer/mocks/KafkaConsumer.go new file mode 100644 index 00000000..8ae0893e --- /dev/null +++ b/dmaap-mediator-producer/mocks/KafkaConsumer.go @@ -0,0 +1,76 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import ( + kafka "github.com/confluentinc/confluent-kafka-go/kafka" + + mock "github.com/stretchr/testify/mock" + + time "time" +) + +// KafkaConsumer is an autogenerated mock type for the KafkaConsumer type +type KafkaConsumer struct { + mock.Mock +} + +// Commit provides a mock function with given fields: +func (_m KafkaConsumer) Commit() ([]kafka.TopicPartition, error) { + ret := _m.Called() + + var r0 []kafka.TopicPartition + if rf, ok := ret.Get(0).(func() []kafka.TopicPartition); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]kafka.TopicPartition) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func() error); ok { + r1 = rf() + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ReadMessage provides a mock function with given fields: timeout +func (_m KafkaConsumer) ReadMessage(timeout time.Duration) (*kafka.Message, error) { + ret := _m.Called(timeout) + + var r0 *kafka.Message + if rf, ok := ret.Get(0).(func(time.Duration) *kafka.Message); ok { + r0 = rf(timeout) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*kafka.Message) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(time.Duration) error); ok { + r1 = rf(timeout) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// Subscribe provides a mock function with given fields: topic +func (_m KafkaConsumer) Subscribe(topic string) error { + ret := _m.Called(topic) + + var r0 error + if rf, ok := ret.Get(0).(func(string) error); ok { + r0 = rf(topic) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/dmaap-mediator-producer/mocks/KafkaFactory.go b/dmaap-mediator-producer/mocks/KafkaFactory.go new file mode 100644 index 00000000..f05457a5 --- /dev/null +++ b/dmaap-mediator-producer/mocks/KafkaFactory.go @@ -0,0 +1,36 @@ +// Code generated by mockery v1.0.0. DO NOT EDIT. + +package mocks + +import ( + mock "github.com/stretchr/testify/mock" + "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient" +) + +// KafkaFactory is an autogenerated mock type for the KafkaFactory type +type KafkaFactory struct { + mock.Mock +} + +// NewKafkaConsumer provides a mock function with given fields: topicID +func (_m KafkaFactory) NewKafkaConsumer(topicID string) (kafkaclient.KafkaConsumer, error) { + ret := _m.Called(topicID) + + var r0 kafkaclient.KafkaConsumer + if rf, ok := ret.Get(0).(func(string) kafkaclient.KafkaConsumer); ok { + r0 = rf(topicID) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(kafkaclient.KafkaConsumer) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(topicID) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} diff --git a/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go b/dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go similarity index 66% rename from dmaap-mediator-producer/mocks/jobhandler/JobHandler.go rename to dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go index ad20752c..271590fa 100644 --- a/dmaap-mediator-producer/mocks/jobhandler/JobHandler.go +++ b/dmaap-mediator-producer/mocks/jobshandler/JobsHandler.go @@ -1,19 +1,19 @@ // Code generated by mockery v2.9.3. DO NOT EDIT. -package jobhandler +package jobshandler import ( mock "github.com/stretchr/testify/mock" jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs" ) -// JobHandler is an autogenerated mock type for the JobHandler type -type JobHandler struct { +// JobsHandler is an autogenerated mock type for the JobsHandler type +type JobsHandler struct { mock.Mock } // AddJob provides a mock function with given fields: _a0 -func (_m *JobHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error { +func (_m *JobsHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error { ret := _m.Called(_a0) var r0 error @@ -27,6 +27,6 @@ func (_m *JobHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error { } // DeleteJob provides a mock function with given fields: jobId -func (_m *JobHandler) DeleteJobFromRESTCall(jobId string) { +func (_m *JobsHandler) DeleteJobFromRESTCall(jobId string) { _m.Called(jobId) } -- 2.16.6