2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
28 "github.com/confluentinc/confluent-kafka-go/kafka"
29 log "github.com/sirupsen/logrus"
30 "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
31 "oransc.org/nonrtric/dmaapmediatorproducer/internal/kafkaclient"
32 "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
35 type TypeData struct {
36 Identity string `json:"id"`
37 jobsHandler *jobsHandler
41 Owner string `json:"owner"`
42 LastUpdated string `json:"last_updated"`
43 InfoJobIdentity string `json:"info_job_identity"`
44 TargetUri string `json:"target_uri"`
45 InfoJobData Parameters `json:"info_job_data"`
46 InfoTypeIdentity string `json:"info_type_identity"`
49 type JobTypesManager interface {
50 LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
51 GetSupportedTypes() []string
54 type JobsManager interface {
55 AddJobFromRESTCall(JobInfo) error
56 DeleteJobFromRESTCall(jobId string)
59 type JobsManagerImpl struct {
60 allTypes map[string]TypeData
61 pollClient restclient.HTTPClient
63 kafkaFactory kafkaclient.KafkaFactory
64 distributeClient restclient.HTTPClient
67 func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, kafkaFactory kafkaclient.KafkaFactory, distributeClient restclient.HTTPClient) *JobsManagerImpl {
68 return &JobsManagerImpl{
69 allTypes: make(map[string]TypeData),
70 pollClient: pollClient,
72 kafkaFactory: kafkaFactory,
73 distributeClient: distributeClient,
77 func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
78 if err := jm.validateJobInfo(ji); err == nil {
79 typeData := jm.allTypes[ji.InfoTypeIdentity]
80 typeData.jobsHandler.addJobCh <- ji
81 log.Debug("Added job: ", ji)
88 func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
89 for _, typeData := range jm.allTypes {
90 log.Debugf("Deleting job %v from type %v", jobId, typeData.Identity)
91 typeData.jobsHandler.deleteJobCh <- jobId
93 log.Debug("Deleted job: ", jobId)
96 func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error {
97 if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok {
98 return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
100 if ji.InfoJobIdentity == "" {
101 return fmt.Errorf("missing required job identity: %v", ji)
103 // Temporary for when there are only REST callbacks needed
104 if ji.TargetUri == "" {
105 return fmt.Errorf("missing required target URI: %v", ji)
110 func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
111 for _, typeDef := range types {
112 if typeDef.DMaaPTopicURL == "" && typeDef.KafkaInputTopic == "" {
113 log.Fatal("DMaaPTopicURL or KafkaInputTopic must be defined for type: ", typeDef.Identity)
115 jm.allTypes[typeDef.Identity] = TypeData{
116 Identity: typeDef.Identity,
117 jobsHandler: newJobsHandler(typeDef, jm.mrAddress, jm.kafkaFactory, jm.pollClient, jm.distributeClient),
123 func (jm *JobsManagerImpl) GetSupportedTypes() []string {
124 supportedTypes := []string{}
125 for k := range jm.allTypes {
126 supportedTypes = append(supportedTypes, k)
128 return supportedTypes
131 func (jm *JobsManagerImpl) StartJobsForAllTypes() {
132 for _, jobType := range jm.allTypes {
134 go jobType.jobsHandler.startPollingAndDistribution()
139 type jobsHandler struct {
142 pollingAgent pollingAgent
144 addJobCh chan JobInfo
145 deleteJobCh chan string
146 distributeClient restclient.HTTPClient
149 func newJobsHandler(typeDef config.TypeDefinition, mRAddress string, kafkaFactory kafkaclient.KafkaFactory, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
150 pollingAgent := createPollingAgent(typeDef, mRAddress, pollClient, kafkaFactory, typeDef.KafkaInputTopic)
152 typeId: typeDef.Identity,
153 pollingAgent: pollingAgent,
154 jobs: make(map[string]job),
155 addJobCh: make(chan JobInfo),
156 deleteJobCh: make(chan string),
157 distributeClient: distributeClient,
161 func (jh *jobsHandler) startPollingAndDistribution() {
164 jh.pollAndDistributeMessages()
170 jh.monitorManagementChannels()
175 func (jh *jobsHandler) pollAndDistributeMessages() {
176 log.Debugf("Processing jobs for type: %v", jh.typeId)
177 messagesBody, error := jh.pollingAgent.pollMessages()
179 log.Warn("Error getting data from source. Cause: ", error)
180 time.Sleep(time.Minute) // Must wait before trying to call data source again
183 jh.distributeMessages(messagesBody)
186 func (jh *jobsHandler) distributeMessages(messages []byte) {
187 if string(messages) != "[]" && len(messages) > 0 { // MR returns an ampty array if there are no messages.
188 log.Debug("Distributing messages: ", string(messages))
191 for _, job := range jh.jobs {
192 if len(job.messagesChannel) < cap(job.messagesChannel) {
193 job.messagesChannel <- messages
195 jh.emptyMessagesBuffer(job)
201 func (jh *jobsHandler) emptyMessagesBuffer(job job) {
202 log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
206 case <-job.messagesChannel:
213 func (jh *jobsHandler) monitorManagementChannels() {
215 case addedJob := <-jh.addJobCh:
217 case deletedJob := <-jh.deleteJobCh:
218 jh.deleteJob(deletedJob)
222 func (jh *jobsHandler) addJob(addedJob JobInfo) {
224 log.Debug("Add job: ", addedJob)
225 newJob := newJob(addedJob, jh.distributeClient)
227 jh.jobs[addedJob.InfoJobIdentity] = newJob
231 func (jh *jobsHandler) deleteJob(deletedJob string) {
233 log.Debug("Delete job: ", deletedJob)
234 j, exist := jh.jobs[deletedJob]
236 j.controlChannel <- struct{}{}
237 delete(jh.jobs, deletedJob)
242 type pollingAgent interface {
243 pollMessages() ([]byte, error)
246 func createPollingAgent(typeDef config.TypeDefinition, mRAddress string, pollClient restclient.HTTPClient, kafkaFactory kafkaclient.KafkaFactory, topicID string) pollingAgent {
247 if typeDef.DMaaPTopicURL != "" {
248 return dMaaPPollingAgent{
249 messageRouterURL: mRAddress + typeDef.DMaaPTopicURL,
250 pollClient: pollClient,
253 return newKafkaPollingAgent(kafkaFactory, typeDef.KafkaInputTopic)
257 type dMaaPPollingAgent struct {
258 messageRouterURL string
259 pollClient restclient.HTTPClient
262 func (pa dMaaPPollingAgent) pollMessages() ([]byte, error) {
263 return restclient.Get(pa.messageRouterURL, pa.pollClient)
266 type kafkaPollingAgent struct {
267 kafkaClient kafkaclient.KafkaClient
270 func newKafkaPollingAgent(kafkaFactory kafkaclient.KafkaFactory, topicID string) kafkaPollingAgent {
271 c, err := kafkaclient.NewKafkaClient(kafkaFactory, topicID)
273 log.Fatalf("Cannot create Kafka client for topic: %v, error details: %v\n", topicID, err)
275 return kafkaPollingAgent{
280 func (pa kafkaPollingAgent) pollMessages() ([]byte, error) {
281 msg, err := pa.kafkaClient.ReadMessage()
285 if isKafkaTimedOutError(err) {
286 return []byte(""), nil
292 func isKafkaTimedOutError(err error) bool {
293 kafkaErr, ok := err.(kafka.Error)
294 return ok && kafkaErr.Code() == kafka.ErrTimedOut
299 client restclient.HTTPClient
300 messagesChannel chan []byte
301 controlChannel chan struct{}
304 func newJob(j JobInfo, c restclient.HTTPClient) job {
309 messagesChannel: make(chan []byte, 10),
310 controlChannel: make(chan struct{}),
314 type Parameters struct {
315 BufferTimeout BufferTimeout `json:"bufferTimeout"`
318 type BufferTimeout struct {
319 MaxSize int `json:"maxSize"`
320 MaxTimeMiliseconds int64 `json:"maxTimeMiliseconds"`
323 func (j *job) start() {
324 if j.jobInfo.InfoJobData.BufferTimeout.MaxSize == 0 {
325 j.startReadingSingleMessages()
327 j.startReadingMessagesBuffered()
331 func (j *job) startReadingSingleMessages() {
335 case <-j.controlChannel:
336 log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
338 case msg := <-j.messagesChannel:
339 j.sendMessagesToConsumer(msg)
344 func (j *job) startReadingMessagesBuffered() {
348 case <-j.controlChannel:
349 log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
352 msgs := j.read(j.jobInfo.InfoJobData.BufferTimeout)
354 j.sendMessagesToConsumer(msgs)
360 func (j *job) read(bufferParams BufferTimeout) []byte {
361 wg := sync.WaitGroup{}
362 wg.Add(bufferParams.MaxSize)
364 c := make(chan struct{})
372 case msg := <-j.messagesChannel:
374 msgs = append(msgs, msg...)
376 if i == bufferParams.MaxSize {
382 j.waitTimeout(&wg, time.Duration(bufferParams.MaxTimeMiliseconds)*time.Millisecond)
387 func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
388 c := make(chan struct{})
395 return false // completed normally
396 case <-time.After(timeout):
397 return true // timed out
401 func (j *job) sendMessagesToConsumer(messages []byte) {
402 log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
403 if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
404 log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
407 log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)