2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
29 "github.com/confluentinc/confluent-kafka-go/kafka"
30 log "github.com/sirupsen/logrus"
31 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
32 "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
33 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
36 type TypeData struct {
37 Identity string `json:"id"`
38 jobsHandler *jobsHandler
41 type sourceType string
43 const dMaaPSource = sourceType("dmaap")
44 const kafkaSource = sourceType("kafka")
47 Owner string `json:"owner"`
48 LastUpdated string `json:"last_updated"`
49 InfoJobIdentity string `json:"info_job_identity"`
50 TargetUri string `json:"target_uri"`
51 InfoJobData Parameters `json:"info_job_data"`
52 InfoTypeIdentity string `json:"info_type_identity"`
56 type JobTypesManager interface {
57 LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
58 GetSupportedTypes() []string
61 type JobsManager interface {
62 AddJobFromRESTCall(JobInfo) error
63 DeleteJobFromRESTCall(jobId string)
66 type JobsManagerImpl struct {
67 allTypes map[string]TypeData
68 pollClient restclient.HTTPClient
70 kafkaFactory kafkaclient.KafkaFactory
71 distributeClient restclient.HTTPClient
74 func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl {
75 return &JobsManagerImpl{
76 allTypes: make(map[string]TypeData),
77 pollClient: pollClient,
79 kafkaFactory: kafkaFactory,
80 distributeClient: distributeClient,
84 func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
85 if err := jm.validateJobInfo(ji); err == nil {
86 typeData := jm.allTypes[ji.InfoTypeIdentity]
87 ji.sourceType = typeData.jobsHandler.sourceType
88 typeData.jobsHandler.addJobCh <- ji
89 log.Debug("Added job: ", ji)
96 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
97 for _, typeData := range jm.allTypes {
98 log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity)
99 typeData.jobsHandler.deleteJobCh <- jobId
101 log.Debug("Deleted job: ", jobId)
104 func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
105 if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
106 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
108 if ji.InfoJobIdentity == "" {
109 return fmt.Errorf("missing required job identity: %v", ji)
111 // Temporary for when there are only REST callbacks needed
112 if ji.TargetUri == "" {
113 return fmt.Errorf("missing required target URI: %v", ji)
118 func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
119 for _, typeDef := range types {
120 if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" {
121 log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity)
123 jm.allTypes[typeDef.Identity] = TypeData{
124 Identity: typeDef.Identity,
125 jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient),
131 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
132 supportedTypes := []string{}
133 for k := range jm.allTypes {
134 supportedTypes = append(supportedTypes, k)
136 return supportedTypes
139 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
140 for _, jobType := range jm.allTypes {
142 go jobType.jobsHandler.startPollingAndDistribution()
147 type jobsHandler struct {
150 sourceType sourceType
151 pollingAgent pollingAgent
153 addJobCh chan JobInfo
154 deleteJobCh chan string
155 distributeClient restclient.HTTPClient
158 func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
159 pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
160 sourceType := kafkaSource
161 if typeDef.DMaaPTopicURL != "" {
162 sourceType = dMaaPSource
165 typeId: typeDef.Identity,
166 sourceType: sourceType,
167 pollingAgent: pollingAgent,
168 jobs: make(map[string]job),
169 addJobCh: make(chan JobInfo),
170 deleteJobCh: make(chan string),
171 distributeClient: distributeClient,
175 func (jh *jobsHandler) startPollingAndDistribution() {
178 jh.pollAndDistributeMessages()
184 jh.monitorManagementChannels()
189 func (jh *jobsHandler) pollAndDistributeMessages() {
190 log.Debugf("Processing jobs for type: %v", jh.typeId)
191 messagesBody, error := jh.pollingAgent.pollMessages()
193 log.Warn("Error getting data from source. Cause: ", error)
194 time.Sleep(time.Minute) // Must wait before trying to call data source again
197 jh.distributeMessages(messagesBody)
200 func (jh *jobsHandler) distributeMessages(messages []byte) {
201 if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
202 log.Debug("Distributing messages: ", string(messages))
205 for _, job := range jh.jobs {
206 if len(job.messagesChannel) < cap(job.messagesChannel) {
207 job.messagesChannel <- messages
209 jh.emptyMessagesBuffer(job)
215 func (jh *jobsHandler) emptyMessagesBuffer(job job) {
216 log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
220 case <-job.messagesChannel:
227 func (jh *jobsHandler) monitorManagementChannels() {
229 case addedJob := <-jh.addJobCh:
231 case deletedJob := <-jh.deleteJobCh:
232 jh.deleteJob(deletedJob)
236 func (jh *jobsHandler) addJob(addedJob JobInfo) {
238 log.Debug("Add job: ", addedJob)
239 newJob := newJob(addedJob, jh.distributeClient)
241 jh.jobs[addedJob.InfoJobIdentity] = newJob
245 func (jh *jobsHandler) deleteJob(deletedJob string) {
247 log.Debug("Delete job: ", deletedJob)
248 j, exist := jh.jobs[deletedJob]
250 j.controlChannel <- struct{}{}
251 delete(jh.jobs, deletedJob)
256 type pollingAgent interface {
257 pollMessages() ([]byte, error)
260 func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
261 if typeDef.DMaaPTopicURL != "" {
262 return dMaaPPollingAgent{
263 messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
264 pollClient: pollClient,
267 return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
271 type dMaaPPollingAgent struct {
272 messageRouterURL string
273 pollClient restclient.HTTPClient
276 func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
277 return restclient.Get(pa.messageRouterURL, pa.pollClient)
280 type kafkaPollingAgent struct {
281 kafkaClient kafkaclient.KafkaClient
284 func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
285 c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID)
287 log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
289 return kafkaPollingAgent{
294 func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
295 msg, err := pa.kafkaClient.ReadMessage()
299 if isKafkaTimedOutError(err) {
300 return []byte(""), nil
306 func isKafkaTimedOutError(err error) bool {
307 kafkaErr, ok := err.(kafka.Error)
308 return ok && kafkaErr.Code() == kafka.ErrTimedOut
313 client restclient.HTTPClient
314 messagesChannel chan []byte
315 controlChannel chan struct{}
318 func newJob(j JobInfo, c restclient.HTTPClient) job {
323 messagesChannel: make(chan []byte, 10),
324 controlChannel: make(chan struct{}),
328 type Parameters struct {
329 BufferTimeout BufferTimeout `json:"bufferTimeout"`
330 } // @name Parameters
332 type BufferTimeout struct {
333 MaxSize int `json:"maxSize"`
334 MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"`
335 } // @name BufferTimeout
337 func (j *job) start() {
338 if j.isJobBuffered() {
339 j.startReadingMessagesBuffered()
341 j.startReadingSingleMessages()
345 func (j *job) startReadingSingleMessages() {
349 case <-j.controlChannel:
350 log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
352 case msg := <-j.messagesChannel:
353 j.sendMessagesToConsumer(msg)
358 func (j *job) startReadingMessagesBuffered() {
362 case <-j.controlChannel:
363 log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
366 msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout)
368 j.sendMessagesToConsumer(msgs)
374 func (j *job) read(bufferParams BufferTimeout) []byte {
375 wg := sync.WaitGroup{}
376 wg.Add(bufferParams.MaxSize)
377 rawMsgs := make([][]byte, 0, bufferParams.MaxSize)
378 c := make(chan struct{})
386 case msg := <-j.messagesChannel:
387 rawMsgs = append(rawMsgs, msg)
390 if i == bufferParams.MaxSize {
396 j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
398 return getAsJSONArray(rawMsgs)
401 func getAsJSONArray(rawMsgs [][]byte) []byte {
402 if len(rawMsgs) == 0 {
406 for i := 0; i < len(rawMsgs); i++ {
407 strings = strings + makeIntoString(rawMsgs[i])
408 strings = addSeparatorIfNeeded(strings, i, len(rawMsgs))
410 return []byte(wrapInJSONArray(strings))
413 func makeIntoString(rawMsg []byte) string {
414 return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"`
417 func addSeparatorIfNeeded(strings string, position, length int) string {
418 if position < length-1 {
419 strings = strings + ","
424 func wrapInJSONArray(strings string) string {
425 return "[" + strings + "]"
428 func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
429 c := make(chan struct{})
436 return false // completed normally
437 case <-time.After(timeout):
438 return true // timed out
442 func (j *job) sendMessagesToConsumer(messages []byte) {
443 log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
444 contentType := restclient.ContentTypeJSON
445 if j.isJobKafka() && !j.isJobBuffered() {
446 contentType = restclient.ContentTypePlain
448 if postErr := restclient.Post(j.jobInfo.TargetUri, messages, contentType, j.client); postErr != nil {
449 log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
452 log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
455 func (j *job) isJobBuffered() bool {
456 return j.jobInfo.InfoJobData.BufferTimeout.MaxSize > 0 && j.jobInfo.InfoJobData.BufferTimeout.MaxTimeMiliseconds > 0
459 func (j *job) isJobKafka() bool {
460 return j.jobInfo.sourceType == kafkaSource