import (
"os"
+ "strconv"
+
+ log "github.com/sirupsen/logrus"
)
type Config struct {
LogLevel string
InfoProducerSupervisionCallbackHost string
- InfoProducerSupervisionCallbackPort string
+ InfoProducerSupervisionCallbackPort int
InfoJobCallbackHost string
- InfoJobCallbackPort string
+ InfoJobCallbackPort int
InfoCoordinatorAddress string
}
return &Config{
LogLevel: getEnv("LOG_LEVEL", "Info"),
InfoProducerSupervisionCallbackHost: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", ""),
- InfoProducerSupervisionCallbackPort: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8085"),
+ InfoProducerSupervisionCallbackPort: getEnvAsInt("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", 8085),
InfoJobCallbackHost: getEnv("INFO_JOB_CALLBACK_HOST", ""),
- InfoJobCallbackPort: getEnv("INFO_JOB_CALLBACK_PORT", "8086"),
+ InfoJobCallbackPort: getEnvAsInt("INFO_JOB_CALLBACK_PORT", 8086),
InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
}
}
return defaultVal
}
+
+func getEnvAsInt(name string, defaultVal int) int {
+ valueStr := getEnv(name, "")
+ if value, err := strconv.Atoi(valueStr); err == nil {
+ return value
+ } else if valueStr != "" {
+ log.Warnf("Invalid int value: %v for variable: %v. Default value: %v will be used", valueStr, name, defaultVal)
+ }
+
+ return defaultVal
+}
package config
import (
+ "bytes"
"os"
"reflect"
"testing"
+
+ log "github.com/sirupsen/logrus"
+ "github.com/stretchr/testify/require"
)
func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
os.Setenv("LOG_LEVEL", "Debug")
os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", "supervisionCallbackHost")
- os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "supervisionCallbackPort")
+ os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8095")
os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost")
- os.Setenv("INFO_JOB_CALLBACK_PORT", "jobCallbackPort")
+ os.Setenv("INFO_JOB_CALLBACK_PORT", "8096")
os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
defer os.Clearenv()
wantConfig := Config{
LogLevel: "Debug",
InfoProducerSupervisionCallbackHost: "supervisionCallbackHost",
- InfoProducerSupervisionCallbackPort: "supervisionCallbackPort",
+ InfoProducerSupervisionCallbackPort: 8095,
InfoJobCallbackHost: "jobCallbackHost",
- InfoJobCallbackPort: "jobCallbackPort",
+ InfoJobCallbackPort: 8096,
InfoCoordinatorAddress: "infoCoordAddr",
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
}
}
+func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T) {
+ os.Clearenv()
+ assertions := require.New(t)
+ var buf bytes.Buffer
+ log.SetOutput(&buf)
+ defer func() {
+ log.SetOutput(os.Stderr)
+ }()
+
+ os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "wrong")
+ defer os.Clearenv()
+ wantConfig := Config{
+ LogLevel: "Info",
+ InfoProducerSupervisionCallbackHost: "",
+ InfoProducerSupervisionCallbackPort: 8085,
+ InfoJobCallbackHost: "",
+ InfoJobCallbackPort: 8086,
+ InfoCoordinatorAddress: "http://enrichmentservice:8083",
+ }
+ if got := New(); !reflect.DeepEqual(got, &wantConfig) {
+ t.Errorf("New() = %v, want %v", got, &wantConfig)
+ }
+ logString := buf.String()
+ assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_SUPERVISION_CALLBACK_PORT. Default value: 8085 will be used")
+}
+
func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
+ os.Clearenv()
wantConfig := Config{
LogLevel: "Info",
InfoProducerSupervisionCallbackHost: "",
- InfoProducerSupervisionCallbackPort: "8085",
+ InfoProducerSupervisionCallbackPort: 8085,
InfoJobCallbackHost: "",
- InfoJobCallbackPort: "8086",
+ InfoJobCallbackPort: 8086,
InfoCoordinatorAddress: "http://enrichmentservice:8083",
}
if got := New(); !reflect.DeepEqual(got, &wantConfig) {
log "github.com/sirupsen/logrus"
- "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
)
const registerProducerPath = "/data-producer/v1/info-producers/"
type Registrator interface {
- RegisterTypes(types []*jobtypes.Type) error
+ RegisterTypes(types []*jobs.Type) error
RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo)
}
}
}
-func (r RegistratorImpl) RegisterTypes(jobTypes []*jobtypes.Type) error {
+func (r RegistratorImpl) RegisterTypes(jobTypes []*jobs.Type) error {
for _, jobType := range jobTypes {
body := fmt.Sprintf(`{"info_job_data_schema": %v}`, jobType.Schema)
if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil {
// ========================LICENSE_END===================================
//
-package jobtypes
+package jobs
import (
"os"
Schema string
}
-var typeDir = "configs"
-var supportedTypes = make([]string, 0)
+type JobInfo struct {
+ Owner string `json:"owner"`
+ LastUpdated string `json:"last_updated"`
+ InfoJobIdentity string `json:"info_job_identity"`
+ TargetUri string `json:"target_uri"`
+ InfoJobData string `json:"info_job_data"`
+ InfoTypeIdentity string `json:"info_type_identity"`
+}
+
+type JobHandler interface {
+ AddJob(JobInfo) error
+}
+
+var (
+ typeDir = "configs"
+ Handler JobHandler
+ allJobs = make(map[string]map[string]JobInfo)
+)
+
+func init() {
+ Handler = newJobHandlerImpl()
+}
+
+type jobHandlerImpl struct{}
+
+func newJobHandlerImpl() *jobHandlerImpl {
+ return &jobHandlerImpl{}
+}
+
+func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
+ if jobs, ok := allJobs[ji.InfoTypeIdentity]; ok {
+ if _, ok := jobs[ji.InfoJobIdentity]; ok {
+ // TODO: Update job
+ } else {
+ jobs[ji.InfoJobIdentity] = ji
+ }
+ }
+ return nil
+}
func GetTypes() ([]*Type, error) {
types := make([]*Type, 0, 1)
}
func GetSupportedTypes() []string {
+ supportedTypes := []string{}
+ for k := range allJobs {
+ supportedTypes = append(supportedTypes, k)
+ }
return supportedTypes
}
+func AddJob(job JobInfo) error {
+ return Handler.AddJob(job)
+}
+
func getType(path string) (*Type, error) {
fileName := filepath.Base(path)
typeName := strings.TrimSuffix(fileName, filepath.Ext(fileName))
TypeId: typeName,
Schema: string(typeSchema),
}
- supportedTypes = append(supportedTypes, typeName)
+ if _, ok := allJobs[typeName]; !ok {
+ allJobs[typeName] = make(map[string]JobInfo)
+ }
return &typeInfo, nil
} else {
return nil, err
// ========================LICENSE_END===================================
//
-package jobtypes
+package jobs
import (
"os"
supportedTypes := GetSupportedTypes()
assertions.EqualValues([]string{"type1"}, supportedTypes)
}
+
+func TestAddJob_shouldAddJobToAllJobsMap(t *testing.T) {
+ assertions := require.New(t)
+ allJobs["type1"] = make(map[string]JobInfo)
+ jobInfo := JobInfo{
+ Owner: "owner",
+ LastUpdated: "now",
+ InfoJobIdentity: "job1",
+ TargetUri: "target",
+ InfoJobData: "{}",
+ InfoTypeIdentity: "type1",
+ }
+
+ err := AddJob(jobInfo)
+ assertions.Nil(err)
+ assertions.Equal(1, len(allJobs["type1"]))
+}
package server
import (
+ "encoding/json"
"fmt"
+ "io/ioutil"
"net/http"
+
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
)
func StatusHandler(w http.ResponseWriter, r *http.Request) {
}
if r.Method != "GET" {
- http.Error(w, "Method is not supported.", http.StatusNotFound)
+ http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed)
return
}
fmt.Fprintf(w, "All is well!")
}
+
+func CreateInfoJobHandler(w http.ResponseWriter, r *http.Request) {
+ if r.URL.Path != "/producer_simulator/info_job" {
+ http.Error(w, "404 not found.", http.StatusNotFound)
+ return
+ }
+
+ if r.Method != "POST" {
+ http.Error(w, "Method is not supported.", http.StatusMethodNotAllowed)
+ return
+ }
+
+ b, readErr := ioutil.ReadAll(r.Body)
+ if readErr != nil {
+ http.Error(w, fmt.Sprintf("Unable to read body due to: %v", readErr), http.StatusBadRequest)
+ return
+ }
+ jobInfo := jobs.JobInfo{}
+ if unmarshalErr := json.Unmarshal(b, &jobInfo); unmarshalErr != nil {
+ http.Error(w, fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), http.StatusBadRequest)
+ return
+ }
+ if err := jobs.AddJob(jobInfo); err != nil {
+ http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest)
+ }
+}
+
+func CreateServer(port int, handlerFunc func(http.ResponseWriter, *http.Request)) *http.Server {
+
+ mux := http.NewServeMux()
+ mux.HandleFunc("/", handlerFunc)
+ server := http.Server{
+ Addr: fmt.Sprintf(":%v", port), // :{port}
+ Handler: mux,
+ }
+ return &server
+}
package server
import (
+ "bytes"
+ "encoding/json"
+ "errors"
"io"
+ "io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"github.com/stretchr/testify/require"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+ "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
)
func TestStatusHandler(t *testing.T) {
wantedBody: "404 not found.\n",
},
{
- name: "StatusHandler with incorrect method, should return NotFound",
+ name: "StatusHandler with incorrect method, should return MethodNotAllowed",
args: args{
responseRecorder: httptest.NewRecorder(),
r: newRequest("PUT", "/", nil, t),
},
- wantedStatus: http.StatusNotFound,
+ wantedStatus: http.StatusMethodNotAllowed,
wantedBody: "Method is not supported.\n",
},
}
}
}
-func newRequest(method string, url string, body io.Reader, t *testing.T) *http.Request {
+func TestCreateInfoJobHandler(t *testing.T) {
+ assertions := require.New(t)
+ jobHandlerMock := mocks.JobHandler{}
+
+ goodJobInfo := jobs.JobInfo{
+ Owner: "owner",
+ LastUpdated: "now",
+ InfoJobIdentity: "jobId",
+ TargetUri: "target",
+ InfoJobData: "{}",
+ InfoTypeIdentity: "type",
+ }
+ badJobInfo := jobs.JobInfo{
+ Owner: "bad",
+ }
+ jobHandlerMock.On("AddJob", goodJobInfo).Return(nil)
+ jobHandlerMock.On("AddJob", badJobInfo).Return(errors.New("error"))
+ jobs.Handler = &jobHandlerMock
+
+ type args struct {
+ responseRecorder *httptest.ResponseRecorder
+ r *http.Request
+ }
+ tests := []struct {
+ name string
+ args args
+ wantedStatus int
+ wantedBody string
+ }{
+ {
+ name: "CreateInfoJobHandler with correct path and method, should return OK",
+ args: args{
+ responseRecorder: httptest.NewRecorder(),
+ r: newRequest("POST", "/producer_simulator/info_job", &goodJobInfo, t),
+ },
+ wantedStatus: http.StatusOK,
+ wantedBody: "",
+ },
+ {
+ name: "CreateInfoJobHandler with incorrect job info, should return BadRequest",
+ args: args{
+ responseRecorder: httptest.NewRecorder(),
+ r: newRequest("POST", "/producer_simulator/info_job", &badJobInfo, t),
+ },
+ wantedStatus: http.StatusBadRequest,
+ wantedBody: "Invalid job info. Cause: error",
+ },
+ {
+ name: "CreateInfoJobHandler with incorrect path, should return NotFound",
+ args: args{
+ responseRecorder: httptest.NewRecorder(),
+ r: newRequest("GET", "/wrong", nil, t),
+ },
+ wantedStatus: http.StatusNotFound,
+ wantedBody: "404 not found.",
+ },
+ {
+ name: "CreateInfoJobHandler with incorrect method, should return MethodNotAllowed",
+ args: args{
+ responseRecorder: httptest.NewRecorder(),
+ r: newRequest("PUT", "/producer_simulator/info_job", nil, t),
+ },
+ wantedStatus: http.StatusMethodNotAllowed,
+ wantedBody: "Method is not supported.",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ handler := http.HandlerFunc(CreateInfoJobHandler)
+ handler.ServeHTTP(tt.args.responseRecorder, tt.args.r)
+ assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code)
+
+ assertions.Contains(tt.args.responseRecorder.Body.String(), tt.wantedBody)
+ })
+ }
+}
+
+func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request {
+ var body io.Reader
+ if jobInfo != nil {
+ bodyAsBytes, _ := json.Marshal(jobInfo)
+ body = ioutil.NopCloser(bytes.NewReader(bodyAsBytes))
+ }
if req, err := http.NewRequest(method, url, body); err == nil {
return req
} else {
import (
"fmt"
- "net/http"
+ "sync"
log "github.com/sirupsen/logrus"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
- "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
)
jobInfoCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoJobCallbackHost, configuration.InfoJobCallbackPort)
registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
- if types, err := jobtypes.GetTypes(); err == nil {
+ if types, err := jobs.GetTypes(); err == nil {
if regErr := registrator.RegisterTypes(types); regErr != nil {
log.Fatalf("Unable to register all types due to: %v", regErr)
}
}
producer := config.ProducerRegistrationInfo{
InfoProducerSupervisionCallbackUrl: supervisionCallbackAddress,
- SupportedInfoTypes: jobtypes.GetSupportedTypes(),
+ SupportedInfoTypes: jobs.GetSupportedTypes(),
InfoJobCallbackUrl: jobInfoCallbackAddress,
}
if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
func main() {
log.Debug("Starting DMaaP Mediator Producer")
+ wg := new(sync.WaitGroup)
+
+ // add two goroutines to `wg` WaitGroup, one for each avilable server
+ wg.Add(2)
+
log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
- http.HandleFunc("/", server.StatusHandler)
+ go func() {
+ server := server.CreateServer(configuration.InfoProducerSupervisionCallbackPort, server.StatusHandler)
+ log.Warn(server.ListenAndServe())
+ wg.Done()
+ }()
- if err := http.ListenAndServe(":"+configuration.InfoProducerSupervisionCallbackPort, nil); err != nil {
- log.Fatal(err)
- }
+ go func() {
+ server := server.CreateServer(configuration.InfoJobCallbackPort, server.CreateInfoJobHandler)
+ log.Warn(server.ListenAndServe())
+ wg.Done()
+ }()
+
+ // wait until WaitGroup is done
+ wg.Wait()
log.Debug("Stopping DMaaP Mediator Producer")
}
--- /dev/null
+// Code generated by mockery v2.9.3. DO NOT EDIT.
+
+package mocks
+
+import (
+ mock "github.com/stretchr/testify/mock"
+ jobs "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+)
+
+// JobHandler is an autogenerated mock type for the JobHandler type
+type JobHandler struct {
+ mock.Mock
+}
+
+// AddJob provides a mock function with given fields: _a0
+func (_m *JobHandler) AddJob(_a0 jobs.JobInfo) error {
+ ret := _m.Called(_a0)
+
+ var r0 error
+ if rf, ok := ret.Get(0).(func(jobs.JobInfo) error); ok {
+ r0 = rf(_a0)
+ } else {
+ r0 = ret.Error(0)
+ }
+
+ return r0
+}