From 6e0d5846a1f6d938605a4afa7f392d97ac2ba8bc Mon Sep 17 00:00:00 2001 From: elinuxhenrik Date: Thu, 4 Nov 2021 10:29:02 +0100 Subject: [PATCH] Refactor for more efficient job handling Issue-ID: NONRTRIC-631 Signed-off-by: elinuxhenrik Change-Id: I8c338ba66f64285d7696197d58dd4038047cff3c --- dmaap-mediator-producer/Dockerfile | 2 +- dmaap-mediator-producer/internal/jobs/jobs.go | 138 ++++++++++++------- dmaap-mediator-producer/internal/jobs/jobs_test.go | 152 ++++++++++++--------- dmaap-mediator-producer/internal/server/server.go | 14 +- dmaap-mediator-producer/main.go | 13 +- .../consumer/consumerstub.go} | 0 .../dmaap/mrsimulator.go => stub/dmaap/mrstub.go} | 0 7 files changed, 194 insertions(+), 125 deletions(-) rename dmaap-mediator-producer/{simulator/consumer/consumersimulator.go => stub/consumer/consumerstub.go} (100%) rename dmaap-mediator-producer/{simulator/dmaap/mrsimulator.go => stub/dmaap/mrstub.go} (100%) diff --git a/dmaap-mediator-producer/Dockerfile b/dmaap-mediator-producer/Dockerfile index 95bcbae3..78981bd2 100644 --- a/dmaap-mediator-producer/Dockerfile +++ b/dmaap-mediator-producer/Dockerfile @@ -34,6 +34,6 @@ FROM gcr.io/distroless/base-debian10 WORKDIR / ## Copy from "build" stage COPY --from=build /dmaapmediatorproducer . -COPY --from=build /app/configs/ configs +COPY --from=build /app/configs/* /configs/ USER nonroot:nonroot ENTRYPOINT ["/dmaapmediatorproducer"] diff --git a/dmaap-mediator-producer/internal/jobs/jobs.go b/dmaap-mediator-producer/internal/jobs/jobs.go index 854372ad..b856e296 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs.go +++ b/dmaap-mediator-producer/internal/jobs/jobs.go @@ -34,7 +34,7 @@ import ( type TypeData struct { TypeId string `json:"id"` DMaaPTopicURL string `json:"dmaapTopicUrl"` - Jobs map[string]JobInfo + jobHandler *jobHandler } type JobInfo struct { @@ -46,39 +46,49 @@ type JobInfo struct { InfoTypeIdentity string `json:"info_type_identity"` } -type JobTypeHandler interface { - GetTypes() ([]config.TypeDefinition, error) +type JobTypesManager interface { + LoadTypesFromConfiguration() ([]config.TypeDefinition, error) GetSupportedTypes() []string } -type JobHandler interface { +type JobsManager interface { AddJob(JobInfo) error DeleteJob(jobId string) } -type JobHandlerImpl struct { - mu sync.Mutex +type JobsManagerImpl struct { configFile string allTypes map[string]TypeData pollClient restclient.HTTPClient + mrAddress string + distributeClient restclient.HTTPClient +} + +type jobHandler struct { + mu sync.Mutex + typeId string + topicUrl string + jobs map[string]JobInfo + addJobCh chan JobInfo + deleteJobCh chan string + pollClient restclient.HTTPClient distributeClient restclient.HTTPClient } -func NewJobHandlerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *JobHandlerImpl { - return &JobHandlerImpl{ +func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl { + return &JobsManagerImpl{ configFile: typeConfigFilePath, allTypes: make(map[string]TypeData), pollClient: pollClient, + mrAddress: mrAddr, distributeClient: distributeClient, } } -func (jh *JobHandlerImpl) AddJob(ji JobInfo) error { - jh.mu.Lock() - defer jh.mu.Unlock() - if err := jh.validateJobInfo(ji); err == nil { - jobs := jh.allTypes[ji.InfoTypeIdentity].Jobs - jobs[ji.InfoJobIdentity] = ji +func (jm *JobsManagerImpl) AddJob(ji JobInfo) error { + if err := jm.validateJobInfo(ji); err == nil { + typeData := jm.allTypes[ji.InfoTypeIdentity] + typeData.jobHandler.addJobCh <- ji log.Debug("Added job: ", ji) return nil } else { @@ -86,17 +96,16 @@ func (jh *JobHandlerImpl) AddJob(ji JobInfo) error { } } -func (jh *JobHandlerImpl) DeleteJob(jobId string) { - jh.mu.Lock() - defer jh.mu.Unlock() - for _, typeData := range jh.allTypes { - delete(typeData.Jobs, jobId) +func (jm *JobsManagerImpl) DeleteJob(jobId string) { + for _, typeData := range jm.allTypes { + log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId) + typeData.jobHandler.deleteJobCh <- jobId } log.Debug("Deleted job: ", jobId) } -func (jh *JobHandlerImpl) validateJobInfo(ji JobInfo) error { - if _, ok := jh.allTypes[ji.InfoTypeIdentity]; !ok { +func (jm *JobsManagerImpl) validateJobInfo(ji JobInfo) error { + if _, ok := jm.allTypes[ji.InfoTypeIdentity]; !ok { return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity) } if ji.InfoJobIdentity == "" { @@ -109,10 +118,8 @@ func (jh *JobHandlerImpl) validateJobInfo(ji JobInfo) error { return nil } -func (jh *JobHandlerImpl) GetTypes() ([]config.TypeDefinition, error) { - jh.mu.Lock() - defer jh.mu.Unlock() - typeDefsByte, err := os.ReadFile(jh.configFile) +func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) { + typeDefsByte, err := os.ReadFile(jm.configFile) if err != nil { return nil, err } @@ -124,55 +131,77 @@ func (jh *JobHandlerImpl) GetTypes() ([]config.TypeDefinition, error) { return nil, err } for _, typeDef := range typeDefs.Types { - jh.allTypes[typeDef.Id] = TypeData{ + addCh := make(chan JobInfo) + deleteCh := make(chan string) + jh := jobHandler{ + typeId: typeDef.Id, + topicUrl: typeDef.DmaapTopicURL, + jobs: make(map[string]JobInfo), + addJobCh: addCh, + deleteJobCh: deleteCh, + pollClient: jm.pollClient, + distributeClient: jm.distributeClient, + } + jm.allTypes[typeDef.Id] = TypeData{ TypeId: typeDef.Id, DMaaPTopicURL: typeDef.DmaapTopicURL, - Jobs: make(map[string]JobInfo), + jobHandler: &jh, } } return typeDefs.Types, nil } -func (jh *JobHandlerImpl) GetSupportedTypes() []string { - jh.mu.Lock() - defer jh.mu.Unlock() +func (jm *JobsManagerImpl) GetSupportedTypes() []string { supportedTypes := []string{} - for k := range jh.allTypes { + for k := range jm.allTypes { supportedTypes = append(supportedTypes, k) } return supportedTypes } -func (jh *JobHandlerImpl) RunJobs(mRAddress string) { - for { - jh.pollAndDistributeMessages(mRAddress) +func (jm *JobsManagerImpl) StartJobs() { + for _, jobType := range jm.allTypes { + + go jobType.jobHandler.start(jm.mrAddress) + } } -func (jh *JobHandlerImpl) pollAndDistributeMessages(mRAddress string) { +func (jh *jobHandler) start(mRAddress string) { + go func() { + for { + jh.pollAndDistributeMessages(mRAddress) + } + }() + + go func() { + for { + jh.monitorManagementChannels() + } + }() +} + +func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) { jh.mu.Lock() defer jh.mu.Unlock() - for typeId, typeInfo := range jh.allTypes { - log.Debugf("Processing jobs for type: %v", typeId) - messagesBody, error := restclient.Get(mRAddress+typeInfo.DMaaPTopicURL, jh.pollClient) - if error != nil { - log.Warnf("Error getting data from MR. Cause: %v", error) - continue - } - log.Debugf("Received messages: %v", string(messagesBody)) - jh.distributeMessages(messagesBody, typeInfo) + log.Debugf("Processing jobs for type: %v", jh.typeId) + messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient) + if error != nil { + log.Warnf("Error getting data from MR. Cause: %v", error) } + log.Debugf("Received messages: %v", string(messagesBody)) + jh.distributeMessages(messagesBody) } -func (jh *JobHandlerImpl) distributeMessages(messages []byte, typeInfo TypeData) { +func (jh *jobHandler) distributeMessages(messages []byte) { if len(messages) > 2 { - for _, jobInfo := range typeInfo.Jobs { + for _, jobInfo := range jh.jobs { go jh.sendMessagesToConsumer(messages, jobInfo) } } } -func (jh *JobHandlerImpl) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) { +func (jh *jobHandler) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) { log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity) if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil { log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr) @@ -180,6 +209,17 @@ func (jh *JobHandlerImpl) sendMessagesToConsumer(messages []byte, jobInfo JobInf log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner) } -func (jh *JobHandlerImpl) clearAll() { - jh.allTypes = make(map[string]TypeData) +func (jh *jobHandler) monitorManagementChannels() { + select { + case addedJob := <-jh.addJobCh: + jh.mu.Lock() + log.Debugf("received %v from addJobCh\n", addedJob) + jh.jobs[addedJob.InfoJobIdentity] = addedJob + jh.mu.Unlock() + case deletedJob := <-jh.deleteJobCh: + jh.mu.Lock() + log.Debugf("received %v from deleteJobCh\n", deletedJob) + delete(jh.jobs, deletedJob) + jh.mu.Unlock() + } } diff --git a/dmaap-mediator-producer/internal/jobs/jobs_test.go b/dmaap-mediator-producer/internal/jobs/jobs_test.go index 555285c5..3651a136 100644 --- a/dmaap-mediator-producer/internal/jobs/jobs_test.go +++ b/dmaap-mediator-producer/internal/jobs/jobs_test.go @@ -43,15 +43,14 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes t.Errorf("Unable to create temporary directory for types due to: %v", err) } fname := filepath.Join(typesDir, "type_config.json") - handlerUnderTest := NewJobHandlerImpl(fname, nil, nil) + managerUnderTest := NewJobsManagerImpl(fname, nil, "", nil) t.Cleanup(func() { os.RemoveAll(typesDir) - handlerUnderTest.clearAll() }) if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil { t.Errorf("Unable to create temporary config file for types due to: %v", err) } - types, err := handlerUnderTest.GetTypes() + types, err := managerUnderTest.LoadTypesFromConfiguration() wantedType := config.TypeDefinition{ Id: "type1", DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1", @@ -60,13 +59,13 @@ func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *tes assertions.EqualValues(wantedTypes, types) assertions.Nil(err) - supportedTypes := handlerUnderTest.GetSupportedTypes() + supportedTypes := managerUnderTest.GetSupportedTypes() assertions.EqualValues([]string{"type1"}, supportedTypes) } -func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) { +func TestManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) { assertions := require.New(t) - handlerUnderTest := NewJobHandlerImpl("", nil, nil) + managerUnderTest := NewJobsManagerImpl("", nil, "", nil) wantedJob := JobInfo{ Owner: "owner", LastUpdated: "now", @@ -75,94 +74,82 @@ func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) { InfoJobData: "{}", InfoTypeIdentity: "type1", } - handlerUnderTest.allTypes["type1"] = TypeData{ - TypeId: "type1", - Jobs: map[string]JobInfo{"job1": wantedJob}, + jobHandler := jobHandler{ + addJobCh: make(chan JobInfo)} + managerUnderTest.allTypes["type1"] = TypeData{ + TypeId: "type1", + jobHandler: &jobHandler, } - t.Cleanup(func() { - handlerUnderTest.clearAll() - }) - err := handlerUnderTest.AddJob(wantedJob) + var err error + go func() { + err = managerUnderTest.AddJob(wantedJob) + }() + assertions.Nil(err) - assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs)) - assertions.Equal(wantedJob, handlerUnderTest.allTypes["type1"].Jobs["job1"]) + addedJob := <-jobHandler.addJobCh + assertions.Equal(wantedJob, addedJob) } -func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { +func TestManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) { assertions := require.New(t) - handlerUnderTest := NewJobHandlerImpl("", nil, nil) + managerUnderTest := NewJobsManagerImpl("", nil, "", nil) jobInfo := JobInfo{ InfoTypeIdentity: "type1", } - err := handlerUnderTest.AddJob(jobInfo) + err := managerUnderTest.AddJob(jobInfo) assertions.NotNil(err) assertions.Equal("type not supported: type1", err.Error()) } -func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { +func TestManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - handlerUnderTest := NewJobHandlerImpl("", nil, nil) - handlerUnderTest.allTypes["type1"] = TypeData{ + managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + managerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", } - t.Cleanup(func() { - handlerUnderTest.clearAll() - }) jobInfo := JobInfo{ InfoTypeIdentity: "type1", } - err := handlerUnderTest.AddJob(jobInfo) + err := managerUnderTest.AddJob(jobInfo) assertions.NotNil(err) assertions.Equal("missing required job identity: { type1}", err.Error()) } -func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { +func TestManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) { assertions := require.New(t) - handlerUnderTest := NewJobHandlerImpl("", nil, nil) - handlerUnderTest.allTypes["type1"] = TypeData{ + managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + managerUnderTest.allTypes["type1"] = TypeData{ TypeId: "type1", } - t.Cleanup(func() { - handlerUnderTest.clearAll() - }) jobInfo := JobInfo{ InfoTypeIdentity: "type1", InfoJobIdentity: "job1", } - err := handlerUnderTest.AddJob(jobInfo) + err := managerUnderTest.AddJob(jobInfo) assertions.NotNil(err) assertions.Equal("missing required target URI: { job1 type1}", err.Error()) } -func TestDeleteJob(t *testing.T) { +func TestManagerDeleteJob(t *testing.T) { assertions := require.New(t) - handlerUnderTest := NewJobHandlerImpl("", nil, nil) - jobToKeep := JobInfo{ - InfoJobIdentity: "job1", - InfoTypeIdentity: "type1", + managerUnderTest := NewJobsManagerImpl("", nil, "", nil) + jobHandler := jobHandler{ + deleteJobCh: make(chan string)} + managerUnderTest.allTypes["type1"] = TypeData{ + TypeId: "type1", + jobHandler: &jobHandler, } - jobToDelete := JobInfo{ - InfoJobIdentity: "job2", - InfoTypeIdentity: "type1", - } - handlerUnderTest.allTypes["type1"] = TypeData{ - TypeId: "type1", - Jobs: map[string]JobInfo{"job1": jobToKeep, "job2": jobToDelete}, - } - t.Cleanup(func() { - handlerUnderTest.clearAll() - }) - handlerUnderTest.DeleteJob("job2") - assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs)) - assertions.Equal(jobToKeep, handlerUnderTest.allTypes["type1"].Jobs["job1"]) + go managerUnderTest.DeleteJob("job2") + + assertions.Equal("job2", <-jobHandler.deleteJobCh) } -func TestPollAndDistributeMessages(t *testing.T) { +func TestHandlerPollAndDistributeMessages(t *testing.T) { assertions := require.New(t) wg := sync.WaitGroup{} @@ -198,21 +185,17 @@ func TestPollAndDistributeMessages(t *testing.T) { return nil }) - handlerUnderTest := NewJobHandlerImpl("", pollClientMock, distributeClientMock) - jobInfo := JobInfo{ InfoTypeIdentity: "type1", InfoJobIdentity: "job1", TargetUri: "http://consumerHost/target", } - handlerUnderTest.allTypes["type1"] = TypeData{ - TypeId: "type1", - DMaaPTopicURL: "/topicUrl", - Jobs: map[string]JobInfo{"job1": jobInfo}, + handlerUnderTest := jobHandler{ + topicUrl: "/topicUrl", + jobs: map[string]JobInfo{jobInfo.InfoJobIdentity: jobInfo}, + pollClient: pollClientMock, + distributeClient: distributeClientMock, } - t.Cleanup(func() { - handlerUnderTest.clearAll() - }) wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute handlerUnderTest.pollAndDistributeMessages("http://mrAddr") @@ -223,6 +206,53 @@ func TestPollAndDistributeMessages(t *testing.T) { } } +func TestHandlerAddJob_shouldAddJobToJobsMap(t *testing.T) { + assertions := require.New(t) + + jobInfo := JobInfo{ + InfoTypeIdentity: "type1", + InfoJobIdentity: "job1", + TargetUri: "http://consumerHost/target", + } + + addCh := make(chan JobInfo) + handlerUnderTest := jobHandler{ + mu: sync.Mutex{}, + jobs: map[string]JobInfo{}, + addJobCh: addCh, + } + + go func() { + addCh <- jobInfo + }() + + handlerUnderTest.monitorManagementChannels() + + assertions.Len(handlerUnderTest.jobs, 1) + assertions.Equal(jobInfo, handlerUnderTest.jobs["job1"]) +} + +func TestHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) { + assertions := require.New(t) + + deleteCh := make(chan string) + handlerUnderTest := jobHandler{ + mu: sync.Mutex{}, + jobs: map[string]JobInfo{"job1": { + InfoJobIdentity: "job1", + }}, + deleteJobCh: deleteCh, + } + + go func() { + deleteCh <- "job1" + }() + + handlerUnderTest.monitorManagementChannels() + + assertions.Len(handlerUnderTest.jobs, 0) +} + type RoundTripFunc func(req *http.Request) *http.Response func (f RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) { diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go index 5861cbe7..8bed1f91 100644 --- a/dmaap-mediator-producer/internal/server/server.go +++ b/dmaap-mediator-producer/internal/server/server.go @@ -36,17 +36,17 @@ const jobIdToken = "infoJobId" const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}" type ProducerCallbackHandler struct { - jobHandler jobs.JobHandler + jobsManager jobs.JobsManager } -func NewProducerCallbackHandler(jh jobs.JobHandler) *ProducerCallbackHandler { +func NewProducerCallbackHandler(jm jobs.JobsManager) *ProducerCallbackHandler { return &ProducerCallbackHandler{ - jobHandler: jh, + jobsManager: jm, } } -func NewRouter(jh jobs.JobHandler) *mux.Router { - callbackHandler := NewProducerCallbackHandler(jh) +func NewRouter(jm jobs.JobsManager) *mux.Router { + callbackHandler := NewProducerCallbackHandler(jm) r := mux.NewRouter() r.HandleFunc(StatusPath, statusHandler).Methods(http.MethodGet).Name("status") r.HandleFunc(AddJobPath, callbackHandler.addInfoJobHandler).Methods(http.MethodPost).Name("add") @@ -71,7 +71,7 @@ func (h *ProducerCallbackHandler) addInfoJobHandler(w http.ResponseWriter, r *ht http.Error(w, fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), http.StatusBadRequest) return } - if err := h.jobHandler.AddJob(jobInfo); err != nil { + if err := h.jobsManager.AddJob(jobInfo); err != nil { http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest) } } @@ -84,7 +84,7 @@ func (h *ProducerCallbackHandler) deleteInfoJobHandler(w http.ResponseWriter, r return } - h.jobHandler.DeleteJob(id) + h.jobsManager.DeleteJob(id) } type notFoundHandler struct{} diff --git a/dmaap-mediator-producer/main.go b/dmaap-mediator-producer/main.go index e8689bd4..380087f3 100644 --- a/dmaap-mediator-producer/main.go +++ b/dmaap-mediator-producer/main.go @@ -55,22 +55,21 @@ func main() { log.Fatalf("Stopping producer due to error: %v", err) } - jobHandler := jobs.NewJobHandlerImpl("configs/type_config.json", retryClient, &http.Client{ + jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, &http.Client{ Timeout: time.Second * 5, }) - if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil { + if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil { log.Fatalf("Stopping producer due to: %v", err) } + jobsManager.StartJobs() log.Debug("Starting DMaaP Mediator Producer") go func() { log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort) - r := server.NewRouter(jobHandler) + r := server.NewRouter(jobsManager) log.Fatalf("Server stopped: %v", http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.InfoProducerPort), configuration.ProducerCertPath, configuration.ProducerKeyPath, r)) }() - go jobHandler.RunJobs(configuration.DMaaPMRAddress) - keepProducerAlive() } @@ -108,9 +107,9 @@ func createRetryClient(cert *tls.Certificate) *http.Client { return rawRetryClient.StandardClient() } -func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error { +func registerTypesAndProducer(jobHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error { registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client) - if types, err := jobHandler.GetTypes(); err == nil { + if types, err := jobHandler.LoadTypesFromConfiguration(); err == nil { if regErr := registrator.RegisterTypes(types); regErr != nil { return fmt.Errorf("unable to register all types due to: %v", regErr) } diff --git a/dmaap-mediator-producer/simulator/consumer/consumersimulator.go b/dmaap-mediator-producer/stub/consumer/consumerstub.go similarity index 100% rename from dmaap-mediator-producer/simulator/consumer/consumersimulator.go rename to dmaap-mediator-producer/stub/consumer/consumerstub.go diff --git a/dmaap-mediator-producer/simulator/dmaap/mrsimulator.go b/dmaap-mediator-producer/stub/dmaap/mrstub.go similarity index 100% rename from dmaap-mediator-producer/simulator/dmaap/mrsimulator.go rename to dmaap-mediator-producer/stub/dmaap/mrstub.go -- 2.16.6