X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-mediator-producer%2Finternal%2Fjobs%2Fjobs.go;fp=dmaap-mediator-producer%2Finternal%2Fjobs%2Fjobs.go;h=0000000000000000000000000000000000000000;hb=5e92b2132b2300080d8a928b7d9c44906724fd78;hp=86bfe05c334963d13fda4744aacd59d7923dbefe;hpb=df5eeb6e3fe42f87ac399f624edef20c87d1e475;p=nonrtric.git diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go deleted file mode 100644 index 86bfe05c..00000000 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ /dev/null @@ -1,461 +0,0 @@ -// - -// ========================LICENSE_START================================= -// O-RAN-SC -// %% -// Copyright (C) 2021: Nordix Foundation -// %% -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// ========================LICENSE_END=================================== -// - -package jobs - -import ( - "fmt" - "strings" - "sync" - "time" - - "github.com/confluentinc/confluent-kafka-go/kafka" - log "github.com/sirupsen/logrus" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/config" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient" - "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient" -) - -type TypeData struct { - Identity string `json:"id"` - jobsHandler *jobsHandler -} - -type sourceType string - -const dMaaPSource = sourceType("dmaap") -const kafkaSource = sourceType("kafka") - -type JobInfo struct { - Owner string `json:"owner"` - LastUpdated string `json:"last_updated"` - InfoJobIdentity string `json:"info_job_identity"` - TargetUri string `json:"target_uri"` - InfoJobData Parameters `json:"info_job_data"` - InfoTypeIdentity string `json:"info_type_identity"` - sourceType sourceType -} // @name JobInfo - -type JobTypesManager interface { - LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition - GetSupportedTypes() []string -} - -type JobsManager interface { - AddJobFromRESTCall(JobInfo) error - DeleteJobFromRESTCall(jobId string) -} - -type JobsManagerImpl struct { - allTypes map[string]TypeData - pollClient restclient.HTTPClient - mrAddress string - kafkaFactory kafkaclient.KafkaFactory - distributeClient restclient.HTTPClient -} - -func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl { - return &JobsManagerImpl{ - allTypes: make(map[string]TypeData), - pollClient: pollClient, - mrAddress: mrAddr, - kafkaFactory: kafkaFactory, - distributeClient: distributeClient, - } -} - -func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error { - if err := jm.validateJobInfo(ji); err == nil { - typeData := jm.allTypes[ji.InfoTypeIdentity] - ji.sourceType = typeData.jobsHandler.sourceType - typeData.jobsHandler.addJobCh <- ji - log.Debug("Added job: ", ji) - return nil - } else { - return err - } -} - -func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) { - for _, typeData := range jm.allTypes { - log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity) - typeData.jobsHandler.deleteJobCh <- jobId - } - log.Debug("Deleted job: ", jobId) -} - -func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error { - if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok { - return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity) - } - if ji.InfoJobIdentity == "" { - return fmt.Errorf("missing required job identity: %v", ji) - } - // Temporary for when there are only REST callbacks needed - if ji.TargetUri == "" { - return fmt.Errorf("missing required target URI: %v", ji) - } - return nil -} - -func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition { - for _, typeDef := range types { - if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" { - log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity) - } - jm.allTypes[typeDef.Identity] = TypeData{ - Identity: typeDef.Identity, - jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient), - } - } - return types -} - -func (jm *JobsManagerImpl) GetSupportedTypes() []string { - supportedTypes := []string{} - for k := range jm.allTypes { - supportedTypes = append(supportedTypes, k) - } - return supportedTypes -} - -func (jm *JobsManagerImpl) StartJobsForAllTypes() { - for _, jobType := range jm.allTypes { - - go jobType.jobsHandler.startPollingAndDistribution() - - } -} - -type jobsHandler struct { - mu sync.Mutex - typeId string - sourceType sourceType - pollingAgent pollingAgent - jobs map[string]job - addJobCh chan JobInfo - deleteJobCh chan string - distributeClient restclient.HTTPClient -} - -func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler { - pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic) - sourceType := kafkaSource - if typeDef.DMaaPTopicURL != "" { - sourceType = dMaaPSource - } - return &jobsHandler{ - typeId: typeDef.Identity, - sourceType: sourceType, - pollingAgent: pollingAgent, - jobs: make(map[string]job), - addJobCh: make(chan JobInfo), - deleteJobCh: make(chan string), - distributeClient: distributeClient, - } -} - -func (jh *jobsHandler) startPollingAndDistribution() { - go func() { - for { - jh.pollAndDistributeMessages() - } - }() - - go func() { - for { - jh.monitorManagementChannels() - } - }() -} - -func (jh *jobsHandler) pollAndDistributeMessages() { - log.Debugf("Processing jobs for type: %v", jh.typeId) - messagesBody, error := jh.pollingAgent.pollMessages() - if error != nil { - log.Warn("Error getting data from source. Cause: ", error) - time.Sleep(time.Minute) // Must wait before trying to call data source again - return - } - jh.distributeMessages(messagesBody) -} - -func (jh *jobsHandler) distributeMessages(messages []byte) { - if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages. - log.Debug("Distributing messages: ", string(messages)) - jh.mu.Lock() - defer jh.mu.Unlock() - for _, job := range jh.jobs { - if len(job.messagesChannel) < cap(job.messagesChannel) { - job.messagesChannel <- messages - } else { - jh.emptyMessagesBuffer(job) - } - } - } -} - -func (jh *jobsHandler) emptyMessagesBuffer(job job) { - log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity) -out: - for { - select { - case <-job.messagesChannel: - default: - break out - } - } -} - -func (jh *jobsHandler) monitorManagementChannels() { - select { - case addedJob := <-jh.addJobCh: - jh.addJob(addedJob) - case deletedJob := <-jh.deleteJobCh: - jh.deleteJob(deletedJob) - } -} - -func (jh *jobsHandler) addJob(addedJob JobInfo) { - jh.mu.Lock() - log.Debug("Add job: ", addedJob) - newJob := newJob(addedJob, jh.distributeClient) - go newJob.start() - jh.jobs[addedJob.InfoJobIdentity] = newJob - jh.mu.Unlock() -} - -func (jh *jobsHandler) deleteJob(deletedJob string) { - jh.mu.Lock() - log.Debug("Delete job: ", deletedJob) - j, exist := jh.jobs[deletedJob] - if exist { - j.controlChannel <- struct{}{} - delete(jh.jobs, deletedJob) - } - jh.mu.Unlock() -} - -type pollingAgent interface { - pollMessages() ([]byte, error) -} - -func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent { - if typeDef.DMaaPTopicURL != "" { - return dMaaPPollingAgent{ - messageRouterURL: mRAddress + typeDef.DMaaPTopicURL, - pollClient: pollClient, - } - } else { - return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic) - } -} - -type dMaaPPollingAgent struct { - messageRouterURL string - pollClient restclient.HTTPClient -} - -func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) { - return restclient.Get(pa.messageRouterURL, pa.pollClient) -} - -type kafkaPollingAgent struct { - kafkaClient kafkaclient.KafkaClient -} - -func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent { - c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID) - if err != nil { - log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err) - } - return kafkaPollingAgent{ - kafkaClient: c, - } -} - -func (pa kafkaPollingAgent) pollMessages() ([]byte, error) { - msg, err := pa.kafkaClient.ReadMessage() - if err == nil { - return msg, nil - } else { - if isKafkaTimedOutError(err) { - return []byte(""), nil - } - return nil, err - } -} - -func isKafkaTimedOutError(err error) bool { - kafkaErr, ok := err.(kafka.Error) - return ok && kafkaErr.Code() == kafka.ErrTimedOut -} - -type job struct { - jobInfo JobInfo - client restclient.HTTPClient - messagesChannel chan []byte - controlChannel chan struct{} -} - -func newJob(j JobInfo, c restclient.HTTPClient) job { - - return job{ - jobInfo: j, - client: c, - messagesChannel: make(chan []byte, 10), - controlChannel: make(chan struct{}), - } -} - -type Parameters struct { - BufferTimeout BufferTimeout `json:"bufferTimeout"` -} // @name Parameters - -type BufferTimeout struct { - MaxSize int `json:"maxSize"` - MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"` -} // @name BufferTimeout - -func (j *job) start() { - if j.isJobBuffered() { - j.startReadingMessagesBuffered() - } else { - j.startReadingSingleMessages() - } -} - -func (j *job) startReadingSingleMessages() { -out: - for { - select { - case <-j.controlChannel: - log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity) - break out - case msg := <-j.messagesChannel: - j.sendMessagesToConsumer(msg) - } - } -} - -func (j *job) startReadingMessagesBuffered() { -out: - for { - select { - case <-j.controlChannel: - log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity) - break out - default: - msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout) - if len(msgs) > 0 { - j.sendMessagesToConsumer(msgs) - } - } - } -} - -func (j *job) read(bufferParams BufferTimeout) []byte { - wg := sync.WaitGroup{} - wg.Add(bufferParams.MaxSize) - rawMsgs := make([][]byte, 0, bufferParams.MaxSize) - c := make(chan struct{}) - go func() { - i := 0 - out: - for { - select { - case <-c: - break out - case msg := <-j.messagesChannel: - rawMsgs = append(rawMsgs, msg) - i++ - wg.Done() - if i == bufferParams.MaxSize { - break out - } - } - } - }() - j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond) - close(c) - return getAsJSONArray(rawMsgs) -} - -func getAsJSONArray(rawMsgs [][]byte) []byte { - if len(rawMsgs) == 0 { - return []byte("") - } - strings := "" - for i := 0; i < len(rawMsgs); i++ { - strings = strings + makeIntoString(rawMsgs[i]) - strings = addSeparatorIfNeeded(strings, i, len(rawMsgs)) - } - return []byte(wrapInJSONArray(strings)) -} - -func makeIntoString(rawMsg []byte) string { - return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"` -} - -func addSeparatorIfNeeded(strings string, position, length int) string { - if position < length-1 { - strings = strings + "," - } - return strings -} - -func wrapInJSONArray(strings string) string { - return "[" + strings + "]" -} - -func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { - c := make(chan struct{}) - go func() { - defer close(c) - wg.Wait() - }() - select { - case <-c: - return false // completed normally - case <-time.After(timeout): - return true // timed out - } -} - -func (j *job) sendMessagesToConsumer(messages []byte) { - log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity) - contentType := restclient.ContentTypeJSON - if j.isJobKafka() && !j.isJobBuffered() { - contentType = restclient.ContentTypePlain - } - if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil { - log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr) - return - } - log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner) -} - -func (j *job) isJobBuffered() bool { - return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMiliseconds > 0 -} - -func (j *job) isJobKafka() bool { - return j.jobInfo.sourceType == kafkaSource -}