-func getType(path string) (*Type, error) {
- if typeDefinition, err := os.ReadFile(path); err == nil {
- var dat map[string]interface{}
- if marshalError := json.Unmarshal(typeDefinition, &dat); marshalError == nil {
- schema, _ := json.Marshal(dat["schema"])
- typeInfo := Type{
- TypeId: dat["id"].(string),
- DMaaPTopic: dat["dmaapTopic"].(string),
- Schema: string(schema),
- Jobs: make(map[string]JobInfo),
- }
- if _, ok := allJobs[typeInfo.TypeId]; !ok {
- allJobs[typeInfo.TypeId] = typeInfo
+func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+ return &jobsHandler{
+ typeId: typeId,
+ topicUrl: topicURL,
+ jobs: make(map[string]job),
+ addJobCh: make(chan JobInfo),
+ deleteJobCh: make(chan string),
+ pollClient: pollClient,
+ distributeClient: distributeClient,
+ }
+}
+
+func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
+ go func() {
+ for {
+ jh.pollAndDistributeMessages(mRAddress)
+ }
+ }()
+
+ go func() {
+ for {
+ jh.monitorManagementChannels()
+ }
+ }()
+}
+
+func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
+ log.Debugf("Processing jobs for type: %v", jh.typeId)
+ messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
+ if error != nil {
+ log.Warn("Error getting data from MR. Cause: ", error)
+ time.Sleep(time.Minute) // Must wait before trying to call MR again
+ }
+ log.Debug("Received messages: ", string(messagesBody))
+ jh.distributeMessages(messagesBody)
+}
+
+func (jh *jobsHandler) distributeMessages(messages []byte) {
+ if len(messages) > 2 {
+ jh.mu.Lock()
+ defer jh.mu.Unlock()
+ for _, job := range jh.jobs {
+ if len(job.messagesChannel) < cap(job.messagesChannel) {
+ job.messagesChannel <- messages
+ } else {
+ jh.emptyMessagesBuffer(job)