)
type Config struct {
- LogLevel string
+ LogLevel log.Level
InfoProducerHost string
InfoProducerPort int
InfoCoordinatorAddress string
MRPort int
}
-type ProducerRegistrationInfo struct {
- InfoProducerSupervisionCallbackUrl string `json:"info_producer_supervision_callback_url"`
- SupportedInfoTypes []string `json:"supported_info_types"`
- InfoJobCallbackUrl string `json:"info_job_callback_url"`
-}
-
func New() *Config {
return &Config{
- LogLevel: getEnv("LOG_LEVEL", "Info"),
+ LogLevel: getLogLevel(),
InfoProducerHost: getEnv("INFO_PRODUCER_HOST", ""),
InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085),
InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
return defaultVal
}
+
+func getLogLevel() log.Level {
+ logLevelStr := getEnv("LOG_LEVEL", "Info")
+ if loglevel, err := log.ParseLevel(logLevelStr); err == nil {
+ return loglevel
+ } else {
+ log.Warnf("Invalid log level: %v. Log level will be Info!", logLevelStr)
+ return log.InfoLevel
+ }
+}
)
func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
+ assertions := require.New(t)
os.Setenv("LOG_LEVEL", "Debug")
os.Setenv("INFO_PRODUCER_HOST", "producerHost")
os.Setenv("INFO_PRODUCER_PORT", "8095")
os.Clearenv()
})
wantConfig := Config{
- LogLevel: "Debug",
+ LogLevel: log.DebugLevel,
InfoProducerHost: "producerHost",
InfoProducerPort: 8095,
InfoCoordinatorAddress: "infoCoordAddr",
MRHost: "mrHost",
MRPort: 3908,
}
- if got := New(); !reflect.DeepEqual(got, &wantConfig) {
- t.Errorf("New() = %v, want %v", got, &wantConfig)
- }
+ got := New()
+
+ assertions.Equal(&wantConfig, got)
}
func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T) {
os.Clearenv()
})
wantConfig := Config{
- LogLevel: "Info",
+ LogLevel: log.InfoLevel,
InfoProducerHost: "",
InfoProducerPort: 8085,
InfoCoordinatorAddress: "http://enrichmentservice:8083",
assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used")
}
-func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
+func TestNew_envFaultyLogLevelConfigContainDefaultValues(t *testing.T) {
+ assertions := require.New(t)
+ var buf bytes.Buffer
+ log.SetOutput(&buf)
+
+ os.Setenv("LOG_LEVEL", "wrong")
+ t.Cleanup(func() {
+ log.SetOutput(os.Stderr)
+ os.Clearenv()
+ })
+
wantConfig := Config{
- LogLevel: "Info",
+ LogLevel: log.InfoLevel,
InfoProducerHost: "",
InfoProducerPort: 8085,
InfoCoordinatorAddress: "http://enrichmentservice:8083",
MRHost: "http://message-router.onap",
MRPort: 3904,
}
- if got := New(); !reflect.DeepEqual(got, &wantConfig) {
- t.Errorf("New() = %v, want %v", got, &wantConfig)
- }
+
+ got := New()
+
+ assertions.Equal(&wantConfig, got)
+ logString := buf.String()
+ assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!")
}
log "github.com/sirupsen/logrus"
- "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
)
const registerProducerPath = "/data-producer/v1/info-producers/"
const typeSchema = `{"type": "object","properties": {},"additionalProperties": false}`
+type TypeDefinition struct {
+ Id string `json:"id"`
+ DmaapTopicURL string `json:"dmaapTopicUrl"`
+}
+
+type ProducerRegistrationInfo struct {
+ InfoProducerSupervisionCallbackUrl string `json:"info_producer_supervision_callback_url"`
+ SupportedInfoTypes []string `json:"supported_info_types"`
+ InfoJobCallbackUrl string `json:"info_job_callback_url"`
+}
+
type Registrator interface {
- RegisterTypes(types []*jobs.TypeData) error
+ RegisterTypes(types []TypeDefinition) error
RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo)
}
type RegistratorImpl struct {
infoCoordinatorAddress string
+ httpClient restclient.HTTPClient
}
-func NewRegistratorImpl(infoCoordAddr string) *RegistratorImpl {
+func NewRegistratorImpl(infoCoordAddr string, client restclient.HTTPClient) *RegistratorImpl {
return &RegistratorImpl{
infoCoordinatorAddress: infoCoordAddr,
+ httpClient: client,
}
}
-func (r RegistratorImpl) RegisterTypes(jobTypes []jobs.TypeData) error {
+func (r RegistratorImpl) RegisterTypes(jobTypes []TypeDefinition) error {
for _, jobType := range jobTypes {
body := fmt.Sprintf(`{"info_job_data_schema": %v}`, typeSchema)
- if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.TypeId), []byte(body)); error != nil {
+ if error := restclient.Put(r.infoCoordinatorAddress+registerTypePath+url.PathEscape(jobType.Id), []byte(body), r.httpClient); error != nil {
return error
}
log.Debugf("Registered type: %v", jobType)
func (r RegistratorImpl) RegisterProducer(producerId string, producerInfo *ProducerRegistrationInfo) error {
if body, marshalErr := json.Marshal(producerInfo); marshalErr == nil {
- if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body)); putErr != nil {
+ if putErr := restclient.Put(r.infoCoordinatorAddress+registerProducerPath+url.PathEscape(producerId), []byte(body), r.httpClient); putErr != nil {
return putErr
}
log.Debugf("Registered producer: %v", producerId)
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
- "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
- "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
- "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
+ "oransc.org/nonrtric/dmaapmediatorproducer/mocks/httpclient"
)
func TestRegisterTypes(t *testing.T) {
assertions := require.New(t)
- clientMock := mocks.HTTPClient{}
+ clientMock := httpclient.HTTPClient{}
clientMock.On("Do", mock.Anything).Return(&http.Response{
StatusCode: http.StatusCreated,
}, nil)
- restclient.Client = &clientMock
-
- type1 := jobs.TypeData{
- TypeId: "Type1",
+ type1 := TypeDefinition{
+ Id: "Type1",
}
- types := []jobs.TypeData{type1}
+ types := []TypeDefinition{type1}
- r := NewRegistratorImpl("http://localhost:9990")
+ r := NewRegistratorImpl("http://localhost:9990", &clientMock)
err := r.RegisterTypes(types)
assertions.Nil(err)
func TestRegisterProducer(t *testing.T) {
assertions := require.New(t)
- clientMock := mocks.HTTPClient{}
+ clientMock := httpclient.HTTPClient{}
clientMock.On("Do", mock.Anything).Return(&http.Response{
StatusCode: http.StatusCreated,
}, nil)
- restclient.Client = &clientMock
-
producer := ProducerRegistrationInfo{
InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl",
SupportedInfoTypes: []string{"type1"},
InfoJobCallbackUrl: "jobCallbackUrl",
}
- r := NewRegistratorImpl("http://localhost:9990")
+ r := NewRegistratorImpl("http://localhost:9990", &clientMock)
err := r.RegisterProducer("Producer1", &producer)
assertions.Nil(err)
"sync"
log "github.com/sirupsen/logrus"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
)
-type TypeDefinitions struct {
- Types []TypeDefinition `json:"types"`
-}
-type TypeDefinition struct {
- Id string `json:"id"`
- DmaapTopicURL string `json:"dmaapTopicUrl"`
-}
-
type TypeData struct {
TypeId string `json:"id"`
DMaaPTopicURL string `json:"dmaapTopicUrl"`
InfoTypeIdentity string `json:"info_type_identity"`
}
+type JobTypeHandler interface {
+ GetTypes() ([]config.TypeDefinition, error)
+ GetSupportedTypes() []string
+}
+
type JobHandler interface {
AddJob(JobInfo) error
DeleteJob(jobId string)
}
-var (
- mu sync.Mutex
- configFile = "configs/type_config.json"
- Handler JobHandler
- allTypes = make(map[string]TypeData)
-)
-
-func init() {
- Handler = newJobHandlerImpl()
+type JobHandlerImpl struct {
+ mu sync.Mutex
+ configFile string
+ allTypes map[string]TypeData
+ pollClient restclient.HTTPClient
+ distributeClient restclient.HTTPClient
}
-type jobHandlerImpl struct{}
-
-func newJobHandlerImpl() *jobHandlerImpl {
- return &jobHandlerImpl{}
+func NewJobHandlerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *JobHandlerImpl {
+ return &JobHandlerImpl{
+ configFile: typeConfigFilePath,
+ allTypes: make(map[string]TypeData),
+ pollClient: pollClient,
+ distributeClient: distributeClient,
+ }
}
-func (jh *jobHandlerImpl) AddJob(ji JobInfo) error {
- mu.Lock()
- defer mu.Unlock()
- if err := validateJobInfo(ji); err == nil {
- jobs := allTypes[ji.InfoTypeIdentity].Jobs
+func (jh *JobHandlerImpl) AddJob(ji JobInfo) error {
+ jh.mu.Lock()
+ defer jh.mu.Unlock()
+ if err := jh.validateJobInfo(ji); err == nil {
+ jobs := jh.allTypes[ji.InfoTypeIdentity].Jobs
jobs[ji.InfoJobIdentity] = ji
log.Debug("Added job: ", ji)
return nil
}
}
-func (jh *jobHandlerImpl) DeleteJob(jobId string) {
- mu.Lock()
- defer mu.Unlock()
- for _, typeData := range allTypes {
+func (jh *JobHandlerImpl) DeleteJob(jobId string) {
+ jh.mu.Lock()
+ defer jh.mu.Unlock()
+ for _, typeData := range jh.allTypes {
delete(typeData.Jobs, jobId)
}
log.Debug("Deleted job: ", jobId)
}
-func validateJobInfo(ji JobInfo) error {
- if _, ok := allTypes[ji.InfoTypeIdentity]; !ok {
+func (jh *JobHandlerImpl) validateJobInfo(ji JobInfo) error {
+ if _, ok := jh.allTypes[ji.InfoTypeIdentity]; !ok {
return fmt.Errorf("type not supported: %v", ji.InfoTypeIdentity)
}
if ji.InfoJobIdentity == "" {
return nil
}
-func GetTypes() ([]TypeData, error) {
- mu.Lock()
- defer mu.Unlock()
- types := make([]TypeData, 0, 1)
- typeDefsByte, err := os.ReadFile(configFile)
+func (jh *JobHandlerImpl) GetTypes() ([]config.TypeDefinition, error) {
+ jh.mu.Lock()
+ defer jh.mu.Unlock()
+ typeDefsByte, err := os.ReadFile(jh.configFile)
if err != nil {
return nil, err
}
- typeDefs := TypeDefinitions{}
+ typeDefs := struct {
+ Types []config.TypeDefinition `json:"types"`
+ }{}
err = json.Unmarshal(typeDefsByte, &typeDefs)
if err != nil {
return nil, err
}
for _, typeDef := range typeDefs.Types {
- typeInfo := TypeData{
+ jh.allTypes[typeDef.Id] = TypeData{
TypeId: typeDef.Id,
DMaaPTopicURL: typeDef.DmaapTopicURL,
Jobs: make(map[string]JobInfo),
}
- if _, ok := allTypes[typeInfo.TypeId]; !ok {
- allTypes[typeInfo.TypeId] = typeInfo
- }
- types = append(types, typeInfo)
}
- return types, nil
+ return typeDefs.Types, nil
}
-func GetSupportedTypes() []string {
- mu.Lock()
- defer mu.Unlock()
+func (jh *JobHandlerImpl) GetSupportedTypes() []string {
+ jh.mu.Lock()
+ defer jh.mu.Unlock()
supportedTypes := []string{}
- for k := range allTypes {
+ for k := range jh.allTypes {
supportedTypes = append(supportedTypes, k)
}
return supportedTypes
}
-func AddJob(job JobInfo) error {
- return Handler.AddJob(job)
-}
-
-func DeleteJob(jobId string) {
- Handler.DeleteJob(jobId)
-}
-
-func RunJobs(mRAddress string) {
+func (jh *JobHandlerImpl) RunJobs(mRAddress string) {
for {
- pollAndDistributeMessages(mRAddress)
+ jh.pollAndDistributeMessages(mRAddress)
}
}
-func pollAndDistributeMessages(mRAddress string) {
- for typeId, typeInfo := range allTypes {
+func (jh *JobHandlerImpl) pollAndDistributeMessages(mRAddress string) {
+ jh.mu.Lock()
+ defer jh.mu.Unlock()
+ for typeId, typeInfo := range jh.allTypes {
log.Debugf("Processing jobs for type: %v", typeId)
- messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL))
+ messagesBody, error := restclient.Get(fmt.Sprintf("%v/%v", mRAddress, typeInfo.DMaaPTopicURL), jh.pollClient)
if error != nil {
log.Warnf("Error getting data from MR. Cause: %v", error)
continue
}
- distributeMessages(messagesBody, typeInfo)
+ jh.distributeMessages(messagesBody, typeInfo)
}
}
-func distributeMessages(messages []byte, typeInfo TypeData) {
+func (jh *JobHandlerImpl) distributeMessages(messages []byte, typeInfo TypeData) {
if len(messages) > 2 {
- mu.Lock()
for _, jobInfo := range typeInfo.Jobs {
- go sendMessagesToConsumer(messages, jobInfo)
+ go jh.sendMessagesToConsumer(messages, jobInfo)
}
- mu.Unlock()
}
}
-func sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
+func (jh *JobHandlerImpl) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
- if postErr := restclient.Post(jobInfo.TargetUri, messages); postErr != nil {
+ if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
}
}
-func clearAll() {
- allTypes = make(map[string]TypeData)
+func (jh *JobHandlerImpl) clearAll() {
+ jh.allTypes = make(map[string]TypeData)
}
"time"
"github.com/stretchr/testify/require"
- "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
)
const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
if err != nil {
t.Errorf("Unable to create temporary directory for types due to: %v", err)
}
+ fname := filepath.Join(typesDir, "type_config.json")
+ handlerUnderTest := NewJobHandlerImpl(fname, nil, nil)
t.Cleanup(func() {
os.RemoveAll(typesDir)
- clearAll()
+ handlerUnderTest.clearAll()
})
- fname := filepath.Join(typesDir, "type_config.json")
- configFile = fname
if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
t.Errorf("Unable to create temporary config file for types due to: %v", err)
}
- types, err := GetTypes()
- wantedType := TypeData{
- TypeId: "type1",
- DMaaPTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
- Jobs: make(map[string]JobInfo),
+ types, err := handlerUnderTest.GetTypes()
+ wantedType := config.TypeDefinition{
+ Id: "type1",
+ DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
}
- wantedTypes := []TypeData{wantedType}
+ wantedTypes := []config.TypeDefinition{wantedType}
assertions.EqualValues(wantedTypes, types)
assertions.Nil(err)
- supportedTypes := GetSupportedTypes()
+ supportedTypes := handlerUnderTest.GetSupportedTypes()
assertions.EqualValues([]string{"type1"}, supportedTypes)
}
func TestAddJobWhenTypeIsSupported_shouldAddJobToAllJobsMap(t *testing.T) {
assertions := require.New(t)
+ handlerUnderTest := NewJobHandlerImpl("", nil, nil)
wantedJob := JobInfo{
Owner: "owner",
LastUpdated: "now",
InfoJobData: "{}",
InfoTypeIdentity: "type1",
}
- allTypes["type1"] = TypeData{
+ handlerUnderTest.allTypes["type1"] = TypeData{
TypeId: "type1",
Jobs: map[string]JobInfo{"job1": wantedJob},
}
t.Cleanup(func() {
- clearAll()
+ handlerUnderTest.clearAll()
})
- err := AddJob(wantedJob)
+ err := handlerUnderTest.AddJob(wantedJob)
assertions.Nil(err)
- assertions.Equal(1, len(allTypes["type1"].Jobs))
- assertions.Equal(wantedJob, allTypes["type1"].Jobs["job1"])
+ assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs))
+ assertions.Equal(wantedJob, handlerUnderTest.allTypes["type1"].Jobs["job1"])
}
func TestAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
assertions := require.New(t)
+ handlerUnderTest := NewJobHandlerImpl("", nil, nil)
jobInfo := JobInfo{
InfoTypeIdentity: "type1",
}
- err := AddJob(jobInfo)
+ err := handlerUnderTest.AddJob(jobInfo)
assertions.NotNil(err)
assertions.Equal("type not supported: type1", err.Error())
}
func TestAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- allTypes["type1"] = TypeData{
+ handlerUnderTest := NewJobHandlerImpl("", nil, nil)
+ handlerUnderTest.allTypes["type1"] = TypeData{
TypeId: "type1",
}
t.Cleanup(func() {
- clearAll()
+ handlerUnderTest.clearAll()
})
+
jobInfo := JobInfo{
InfoTypeIdentity: "type1",
}
-
- err := AddJob(jobInfo)
+ err := handlerUnderTest.AddJob(jobInfo)
assertions.NotNil(err)
assertions.Equal("missing required job identity: { <nil> type1}", err.Error())
}
func TestAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- allTypes["type1"] = TypeData{
+ handlerUnderTest := NewJobHandlerImpl("", nil, nil)
+ handlerUnderTest.allTypes["type1"] = TypeData{
TypeId: "type1",
}
+ t.Cleanup(func() {
+ handlerUnderTest.clearAll()
+ })
+
jobInfo := JobInfo{
InfoTypeIdentity: "type1",
InfoJobIdentity: "job1",
}
-
- err := AddJob(jobInfo)
+ err := handlerUnderTest.AddJob(jobInfo)
assertions.NotNil(err)
assertions.Equal("missing required target URI: { job1 <nil> type1}", err.Error())
- clearAll()
}
func TestDeleteJob(t *testing.T) {
assertions := require.New(t)
+ handlerUnderTest := NewJobHandlerImpl("", nil, nil)
jobToKeep := JobInfo{
InfoJobIdentity: "job1",
InfoTypeIdentity: "type1",
InfoJobIdentity: "job2",
InfoTypeIdentity: "type1",
}
- allTypes["type1"] = TypeData{
+ handlerUnderTest.allTypes["type1"] = TypeData{
TypeId: "type1",
Jobs: map[string]JobInfo{"job1": jobToKeep, "job2": jobToDelete},
}
t.Cleanup(func() {
- clearAll()
+ handlerUnderTest.clearAll()
})
- DeleteJob("job2")
- assertions.Equal(1, len(allTypes["type1"].Jobs))
- assertions.Equal(jobToKeep, allTypes["type1"].Jobs["job1"])
+ handlerUnderTest.DeleteJob("job2")
+ assertions.Equal(1, len(handlerUnderTest.allTypes["type1"].Jobs))
+ assertions.Equal(jobToKeep, handlerUnderTest.allTypes["type1"].Jobs["job1"])
}
func TestPollAndDistributeMessages(t *testing.T) {
assertions := require.New(t)
- jobInfo := JobInfo{
- InfoTypeIdentity: "type1",
- InfoJobIdentity: "job1",
- TargetUri: "http://consumerHost/target",
- }
- allTypes["type1"] = TypeData{
- TypeId: "type1",
- DMaaPTopicURL: "topicUrl",
- Jobs: map[string]JobInfo{"job1": jobInfo},
- }
- t.Cleanup(func() {
- clearAll()
- })
wg := sync.WaitGroup{}
- wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
messages := `[{"message": {"data": "data"}}]`
- clientMock := NewTestClient(func(req *http.Request) *http.Response {
+ pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
if req.URL.String() == "http://mrAddr/topicUrl" {
assertions.Equal(req.Method, "GET")
- wg.Done()
+ wg.Done() // Signal that the poll call has been made
return &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewReader([]byte(messages))),
Header: make(http.Header), // Must be set to non-nil value or it panics
}
- } else if req.URL.String() == "http://consumerHost/target" {
+ }
+ t.Error("Wrong call to client: ", req)
+ t.Fail()
+ return nil
+ })
+ distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
+ if req.URL.String() == "http://consumerHost/target" {
assertions.Equal(req.Method, "POST")
assertions.Equal(messages, getBodyAsString(req))
assertions.Equal("application/json; charset=utf-8", req.Header.Get("Content-Type"))
- wg.Done()
+ wg.Done() // Signal that the distribution call has been made
return &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
t.Fail()
return nil
})
+ handlerUnderTest := NewJobHandlerImpl("", pollClientMock, distributeClientMock)
+ jobInfo := JobInfo{
+ InfoTypeIdentity: "type1",
+ InfoJobIdentity: "job1",
+ TargetUri: "http://consumerHost/target",
+ }
+ handlerUnderTest.allTypes["type1"] = TypeData{
+ TypeId: "type1",
+ DMaaPTopicURL: "topicUrl",
+ Jobs: map[string]JobInfo{"job1": jobInfo},
+ }
+ t.Cleanup(func() {
+ handlerUnderTest.clearAll()
+ })
- restclient.Client = clientMock
-
- pollAndDistributeMessages("http://mrAddr")
+ wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
+ handlerUnderTest.pollAndDistributeMessages("http://mrAddr")
if waitTimeout(&wg, 100*time.Millisecond) {
t.Error("Not all calls to server were made")
return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body))
}
-var (
- Client HTTPClient
-)
-
-func init() {
- Client = &http.Client{}
-}
-
-func Get(url string) ([]byte, error) {
- if response, err := Client.Get(url); err == nil {
- defer response.Body.Close()
- if responseData, err := io.ReadAll(response.Body); err == nil {
- if isResponseSuccess(response.StatusCode) {
+func Get(url string, client HTTPClient) ([]byte, error) {
+ if response, err := client.Get(url); err == nil {
+ if isResponseSuccess(response.StatusCode) {
+ defer response.Body.Close()
+ if responseData, err := io.ReadAll(response.Body); err == nil {
return responseData, nil
} else {
- requestError := RequestError{
- StatusCode: response.StatusCode,
- Body: responseData,
- }
- return nil, requestError
+ return nil, err
}
} else {
- return nil, err
+ return nil, getRequestError(response)
}
} else {
return nil, err
}
}
-func Put(url string, body []byte) error {
- return do(http.MethodPut, url, body)
+func Put(url string, body []byte, client HTTPClient) error {
+ return do(http.MethodPut, url, body, client)
}
-func Post(url string, body []byte) error {
- return do(http.MethodPost, url, body)
+func Post(url string, body []byte, client HTTPClient) error {
+ return do(http.MethodPost, url, body, client)
}
-func do(method string, url string, body []byte) error {
+func do(method string, url string, body []byte, client HTTPClient) error {
if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
req.Header.Set("Content-Type", "application/json; charset=utf-8")
- if response, respErr := Client.Do(req); respErr == nil {
+ if response, respErr := client.Do(req); respErr == nil {
if isResponseSuccess(response.StatusCode) {
return nil
} else {
import (
"bytes"
"errors"
+ "fmt"
"io/ioutil"
"net/http"
- "reflect"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
- "oransc.org/nonrtric/dmaapmediatorproducer/mocks"
+ "oransc.org/nonrtric/dmaapmediatorproducer/mocks/httpclient"
)
-func TestGet(t *testing.T) {
- clientMock := mocks.HTTPClient{}
-
- clientMock.On("Get", "http://testOk").Return(&http.Response{
- StatusCode: http.StatusOK,
- Body: ioutil.NopCloser(bytes.NewReader([]byte("Response"))),
- }, nil)
-
- clientMock.On("Get", "http://testNotOk").Return(&http.Response{
+func TestRequestError_Error(t *testing.T) {
+ assertions := require.New(t)
+ actualError := RequestError{
StatusCode: http.StatusBadRequest,
- Body: ioutil.NopCloser(bytes.NewReader([]byte("Bad Response"))),
- }, nil)
-
- clientMock.On("Get", "http://testError").Return(nil, errors.New("Failed Request"))
-
- Client = &clientMock
-
+ Body: []byte("error"),
+ }
+ assertions.Equal("Request failed due to error response with status: 400 and body: error", actualError.Error())
+}
+func TestGet(t *testing.T) {
+ assertions := require.New(t)
type args struct {
- url string
+ url string
+ mockReturnStatus int
+ mockReturnBody string
+ mockReturnError error
}
tests := []struct {
name string
args args
want []byte
- wantErr bool
wantedError error
}{
{
name: "Test Get with OK response",
args: args{
- url: "http://testOk",
+ url: "http://testOk",
+ mockReturnStatus: http.StatusOK,
+ mockReturnBody: "Response",
},
- want: []byte("Response"),
- wantErr: false,
+ want: []byte("Response"),
},
{
name: "Test Get with Not OK response",
args: args{
- url: "http://testNotOk",
+ url: "http://testNotOk",
+ mockReturnStatus: http.StatusBadRequest,
+ mockReturnBody: "Bad Response",
},
- want: nil,
- wantErr: true,
+ want: nil,
wantedError: RequestError{
StatusCode: http.StatusBadRequest,
Body: []byte("Bad Response"),
{
name: "Test Get with error",
args: args{
- url: "http://testError",
+ url: "http://testError",
+ mockReturnError: errors.New("Failed Request"),
},
want: nil,
- wantErr: true,
wantedError: errors.New("Failed Request"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- got, err := Get(tt.args.url)
- if (err != nil) != tt.wantErr {
- t.Errorf("Get() error = %v, wantErr %v", err, tt.wantErr)
- return
- }
- if !reflect.DeepEqual(got, tt.want) {
- t.Errorf("Get() = %v, want %v", got, tt.want)
- }
- if tt.wantErr && err.Error() != tt.wantedError.Error() {
- t.Errorf("Get() error = %v, wantedError % v", err, tt.wantedError.Error())
- }
+ clientMock := httpclient.HTTPClient{}
+ clientMock.On("Get", tt.args.url).Return(&http.Response{
+ StatusCode: tt.args.mockReturnStatus,
+ Body: ioutil.NopCloser(bytes.NewReader([]byte(tt.args.mockReturnBody))),
+ }, tt.args.mockReturnError)
+
+ got, err := Get(tt.args.url, &clientMock)
+ assertions.Equal(tt.wantedError, err, tt.name)
+ assertions.Equal(tt.want, got, tt.name)
+ clientMock.AssertCalled(t, "Get", tt.args.url)
})
}
}
func TestPutOk(t *testing.T) {
assertions := require.New(t)
- clientMock := mocks.HTTPClient{}
+ clientMock := httpclient.HTTPClient{}
clientMock.On("Do", mock.Anything).Return(&http.Response{
StatusCode: http.StatusOK,
}, nil)
- Client = &clientMock
- if err := Put("http://localhost:9990", []byte("body")); err != nil {
+ if err := Put("http://localhost:9990", []byte("body"), &clientMock); err != nil {
t.Errorf("Put() error = %v, did not want error", err)
}
var actualRequest *http.Request
clientMock.AssertNumberOfCalls(t, "Do", 1)
}
-func TestPutBadResponse(t *testing.T) {
+func TestPostOk(t *testing.T) {
assertions := require.New(t)
- clientMock := mocks.HTTPClient{}
+ clientMock := httpclient.HTTPClient{}
clientMock.On("Do", mock.Anything).Return(&http.Response{
- StatusCode: http.StatusBadRequest,
- Body: ioutil.NopCloser(bytes.NewReader([]byte("Bad Request"))),
+ StatusCode: http.StatusOK,
}, nil)
- Client = &clientMock
- err := Put("url", []byte("body"))
- assertions.NotNil("Put() error = %v, wanted error", err)
- expectedErrorMessage := "Request failed due to error response with status: 400 and body: Bad Request"
- assertions.Equal(expectedErrorMessage, err.Error())
+ if err := Post("http://localhost:9990", []byte("body"), &clientMock); err != nil {
+ t.Errorf("Put() error = %v, did not want error", err)
+ }
+ var actualRequest *http.Request
+ clientMock.AssertCalled(t, "Do", mock.MatchedBy(func(req *http.Request) bool {
+ actualRequest = req
+ return true
+ }))
+ assertions.Equal(http.MethodPost, actualRequest.Method)
+ assertions.Equal("http", actualRequest.URL.Scheme)
+ assertions.Equal("localhost:9990", actualRequest.URL.Host)
+ assertions.Equal("application/json; charset=utf-8", actualRequest.Header.Get("Content-Type"))
+ body, _ := ioutil.ReadAll(actualRequest.Body)
+ expectedBody := []byte("body")
+ assertions.Equal(expectedBody, body)
+ clientMock.AssertNumberOfCalls(t, "Do", 1)
}
-func TestPutError(t *testing.T) {
+func Test_doErrorCases(t *testing.T) {
assertions := require.New(t)
- clientMock := mocks.HTTPClient{}
-
- clientMock.On("Do", mock.Anything).Return(nil, errors.New("Failed Request"))
-
- Client = &clientMock
- err := Put("url", []byte("body"))
- assertions.NotNil("Put() error = %v, wanted error", err)
- expectedErrorMessage := "Failed Request"
- assertions.Equal(expectedErrorMessage, err.Error())
+ type args struct {
+ url string
+ mockReturnStatus int
+ mockReturnBody []byte
+ mockReturnError error
+ }
+ tests := []struct {
+ name string
+ args args
+ wantErr error
+ }{
+ {
+ name: "Bad request should get RequestError",
+ args: args{
+ url: "badRequest",
+ mockReturnStatus: http.StatusBadRequest,
+ mockReturnBody: []byte("bad request"),
+ mockReturnError: nil,
+ },
+ wantErr: RequestError{
+ StatusCode: http.StatusBadRequest,
+ Body: []byte("bad request"),
+ },
+ },
+ {
+ name: "Server unavailable should get error",
+ args: args{
+ url: "serverUnavailable",
+ mockReturnError: fmt.Errorf("Server unavailable"),
+ },
+ wantErr: fmt.Errorf("Server unavailable"),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ clientMock := httpclient.HTTPClient{}
+ clientMock.On("Do", mock.Anything).Return(&http.Response{
+ StatusCode: tt.args.mockReturnStatus,
+ Body: ioutil.NopCloser(bytes.NewReader(tt.args.mockReturnBody)),
+ }, tt.args.mockReturnError)
+ err := do("PUT", tt.args.url, nil, &clientMock)
+ assertions.Equal(tt.wantErr, err, tt.name)
+ })
+ }
}
const jobIdToken = "infoJobId"
const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}"
-func NewRouter() *mux.Router {
+type ProducerCallbackHandler struct {
+ jobHandler jobs.JobHandler
+}
+
+func NewProducerCallbackHandler(jh jobs.JobHandler) *ProducerCallbackHandler {
+ return &ProducerCallbackHandler{
+ jobHandler: jh,
+ }
+}
+
+func NewRouter(jh jobs.JobHandler) *mux.Router {
+ callbackHandler := NewProducerCallbackHandler(jh)
r := mux.NewRouter()
r.HandleFunc(StatusPath, statusHandler).Methods(http.MethodGet).Name("status")
- r.HandleFunc(AddJobPath, addInfoJobHandler).Methods(http.MethodPost).Name("add")
- r.HandleFunc(deleteJobPath, deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete")
+ r.HandleFunc(AddJobPath, callbackHandler.addInfoJobHandler).Methods(http.MethodPost).Name("add")
+ r.HandleFunc(deleteJobPath, callbackHandler.deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete")
r.NotFoundHandler = ¬FoundHandler{}
r.MethodNotAllowedHandler = &methodNotAllowedHandler{}
return r
}
-func statusHandler(w http.ResponseWriter, r *http.Request) {}
+func statusHandler(w http.ResponseWriter, r *http.Request) {
+ // Just respond OK to show the server is alive for now. Might be extended later.
+}
-func addInfoJobHandler(w http.ResponseWriter, r *http.Request) {
+func (h *ProducerCallbackHandler) addInfoJobHandler(w http.ResponseWriter, r *http.Request) {
b, readErr := ioutil.ReadAll(r.Body)
if readErr != nil {
http.Error(w, fmt.Sprintf("Unable to read body due to: %v", readErr), http.StatusBadRequest)
http.Error(w, fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), http.StatusBadRequest)
return
}
- if err := jobs.AddJob(jobInfo); err != nil {
+ if err := h.jobHandler.AddJob(jobInfo); err != nil {
http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest)
}
}
-func deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) {
+func (h *ProducerCallbackHandler) deleteInfoJobHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
id, ok := vars[jobIdToken]
if !ok {
return
}
- jobs.DeleteJob(id)
+ h.jobHandler.DeleteJob(id)
}
type notFoundHandler struct{}
func TestNewRouter(t *testing.T) {
assertions := require.New(t)
- r := NewRouter()
+ r := NewRouter(nil)
statusRoute := r.Get("status")
assertions.NotNil(statusRoute)
supportedMethods, err := statusRoute.GetMethods()
responseRecorder := httptest.NewRecorder()
handler.ServeHTTP(responseRecorder, newRequest("GET", "/wrong", nil, t))
assertions.Equal(http.StatusNotFound, responseRecorder.Code)
-
assertions.Contains(responseRecorder.Body.String(), "404 not found.")
methodNotAllowedHandler := r.MethodNotAllowedHandler
responseRecorder = httptest.NewRecorder()
handler.ServeHTTP(responseRecorder, newRequest(http.MethodPut, "/status", nil, t))
assertions.Equal(http.StatusMethodNotAllowed, responseRecorder.Code)
-
assertions.Contains(responseRecorder.Body.String(), "Method is not supported.")
}
func TestAddInfoJobHandler(t *testing.T) {
assertions := require.New(t)
- jobHandlerMock := jobhandler.JobHandler{}
-
- goodJobInfo := jobs.JobInfo{
- Owner: "owner",
- LastUpdated: "now",
- InfoJobIdentity: "jobId",
- TargetUri: "target",
- InfoJobData: "{}",
- InfoTypeIdentity: "type",
- }
- badJobInfo := jobs.JobInfo{
- Owner: "bad",
- }
- jobHandlerMock.On("AddJob", goodJobInfo).Return(nil)
- jobHandlerMock.On("AddJob", badJobInfo).Return(errors.New("error"))
- jobs.Handler = &jobHandlerMock
type args struct {
- responseRecorder *httptest.ResponseRecorder
- r *http.Request
+ job jobs.JobInfo
+ mockReturn error
}
tests := []struct {
name string
args args
wantedStatus int
wantedBody string
- assertFunc assertMockFunk
}{
{
name: "AddInfoJobHandler with correct path and method, should return OK",
args: args{
- responseRecorder: httptest.NewRecorder(),
- r: newRequest(http.MethodPost, "/jobs", &goodJobInfo, t),
+ job: jobs.JobInfo{
+ Owner: "owner",
+ LastUpdated: "now",
+ InfoJobIdentity: "jobId",
+ TargetUri: "target",
+ InfoJobData: "{}",
+ InfoTypeIdentity: "type",
+ },
},
wantedStatus: http.StatusOK,
wantedBody: "",
- assertFunc: func(mock *jobhandler.JobHandler) {
- mock.AssertCalled(t, "AddJob", goodJobInfo)
- },
},
{
name: "AddInfoJobHandler with incorrect job info, should return BadRequest",
args: args{
- responseRecorder: httptest.NewRecorder(),
- r: newRequest(http.MethodPost, "/jobs", &badJobInfo, t),
+ job: jobs.JobInfo{
+ Owner: "bad",
+ },
+ mockReturn: errors.New("error"),
},
wantedStatus: http.StatusBadRequest,
wantedBody: "Invalid job info. Cause: error",
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- handler := http.HandlerFunc(addInfoJobHandler)
- handler.ServeHTTP(tt.args.responseRecorder, tt.args.r)
- assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code, tt.name)
+ jobHandlerMock := jobhandler.JobHandler{}
+ jobHandlerMock.On("AddJob", tt.args.job).Return(tt.args.mockReturn)
+ callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
- assertions.Contains(tt.args.responseRecorder.Body.String(), tt.wantedBody, tt.name)
+ handler := http.HandlerFunc(callbackHandlerUnderTest.addInfoJobHandler)
+ responseRecorder := httptest.NewRecorder()
+ r := newRequest(http.MethodPost, "/jobs", &tt.args.job, t)
- if tt.assertFunc != nil {
- tt.assertFunc(&jobHandlerMock)
- }
+ handler.ServeHTTP(responseRecorder, r)
+
+ assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
+ assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
+ jobHandlerMock.AssertCalled(t, "AddJob", tt.args.job)
})
}
}
jobHandlerMock := jobhandler.JobHandler{}
jobHandlerMock.On("DeleteJob", mock.Anything).Return(nil)
- jobs.Handler = &jobHandlerMock
+ callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
responseRecorder := httptest.NewRecorder()
r := mux.SetURLVars(newRequest(http.MethodDelete, "/jobs/", nil, t), map[string]string{"infoJobId": "job1"})
- handler := http.HandlerFunc(deleteInfoJobHandler)
+ handler := http.HandlerFunc(callbackHandlerUnderTest.deleteInfoJobHandler)
handler.ServeHTTP(responseRecorder, r)
assertions.Equal(http.StatusOK, responseRecorder.Result().StatusCode)
jobHandlerMock.AssertCalled(t, "DeleteJob", "job1")
}
-type assertMockFunk func(mock *jobhandler.JobHandler)
-
func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request {
var body io.Reader
if jobInfo != nil {
"fmt"
"net/http"
"sync"
+ "time"
log "github.com/sirupsen/logrus"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
+ "oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
)
+const timeoutHTTPClient = time.Second * 5
+const timeoutPollClient = time.Second * 15
+
var configuration *config.Config
-var callbackAddress string
+var httpClient restclient.HTTPClient
+var jobHandler *jobs.JobHandlerImpl
func init() {
configuration = config.New()
- if loglevel, err := log.ParseLevel(configuration.LogLevel); err == nil {
- log.SetLevel(loglevel)
- } else {
- log.Warnf("Invalid log level: %v. Log level will be Info!", configuration.LogLevel)
- }
+}
+func main() {
+ log.SetLevel(configuration.LogLevel)
log.Debug("Initializing DMaaP Mediator Producer")
- if configuration.InfoProducerHost == "" {
- log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_HOST")
+ if err := validateConfiguration(configuration); err != nil {
+ log.Fatalf("Stopping producer due to error: %v", err)
}
- callbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
+ callbackAddress := fmt.Sprintf("%v:%v", configuration.InfoProducerHost, configuration.InfoProducerPort)
- registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
- if types, err := jobs.GetTypes(); err == nil {
- if regErr := registrator.RegisterTypes(types); regErr != nil {
- log.Fatalf("Unable to register all types due to: %v", regErr)
- }
- } else {
- log.Fatalf("Unable to get types to register due to: %v", err)
+ httpClient = &http.Client{
+ Timeout: timeoutHTTPClient,
}
- producer := config.ProducerRegistrationInfo{
- InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
- SupportedInfoTypes: jobs.GetSupportedTypes(),
- InfoJobCallbackUrl: callbackAddress + server.AddJobPath,
+ pollClient := &http.Client{
+ Timeout: timeoutPollClient,
}
- if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
- log.Fatalf("Unable to register producer due to: %v", err)
+ jobHandler = jobs.NewJobHandlerImpl("configs/type_config.json", pollClient, httpClient)
+ if err := registerTypesAndProducer(jobHandler, configuration.InfoCoordinatorAddress, callbackAddress); err != nil {
+ log.Fatalf("Stopping producer due to: %v", err)
}
-}
-func main() {
log.Debug("Starting DMaaP Mediator Producer")
wg := new(sync.WaitGroup)
log.Debugf("Starting callback server at port %v", configuration.InfoProducerPort)
go func() {
- r := server.NewRouter()
+ r := server.NewRouter(jobHandler)
log.Warn(http.ListenAndServe(fmt.Sprintf(":%v", configuration.InfoProducerPort), r))
wg.Done()
}()
go func() {
- jobs.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
+ jobHandler.RunJobs(fmt.Sprintf("%v:%v", configuration.MRHost, configuration.MRPort))
wg.Done()
}()
wg.Wait()
log.Debug("Stopping DMaaP Mediator Producer")
}
+
+func validateConfiguration(configuration *config.Config) error {
+ if configuration.InfoProducerHost == "" {
+ return fmt.Errorf("missing INFO_PRODUCER_HOST")
+ }
+ return nil
+}
+
+func registerTypesAndProducer(jobHandler jobs.JobTypeHandler, infoCoordinatorAddress string, callbackAddress string) error {
+ registrator := config.NewRegistratorImpl(infoCoordinatorAddress, httpClient)
+ if types, err := jobHandler.GetTypes(); err == nil {
+ if regErr := registrator.RegisterTypes(types); regErr != nil {
+ return fmt.Errorf("unable to register all types due to: %v", regErr)
+ }
+ } else {
+ return fmt.Errorf("unable to get types to register due to: %v", err)
+ }
+ producer := config.ProducerRegistrationInfo{
+ InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
+ SupportedInfoTypes: jobHandler.GetSupportedTypes(),
+ InfoJobCallbackUrl: callbackAddress + server.AddJobPath,
+ }
+ if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
+ return fmt.Errorf("unable to register producer due to: %v", err)
+ }
+ return nil
+}
-// Code generated by mockery v1.0.0. DO NOT EDIT.
+// Code generated by mockery v2.9.3. DO NOT EDIT.
-package mocks
+package httpclient
import (
http "net/http"
"fmt"
"io"
http "net/http"
+ "time"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/restclient"
)
+var httpClient http.Client
+
func main() {
+ httpClient = http.Client{
+ Timeout: time.Second * 5,
+ }
port := flag.Int("port", 40935, "The port this consumer will listen on")
flag.Parse()
http.HandleFunc("/jobs", handleData)
+ registerJob(*port)
+
fmt.Print("Starting consumer on port: ", *port)
http.ListenAndServe(fmt.Sprintf(":%v", *port), nil)
- registerJob(*port)
}
func registerJob(port int) {
}{fmt.Sprintf("test%v", port), fmt.Sprintf("http://localhost:%v/jobs", port), "STD_Fault_Messages", "{}"}
fmt.Print("Registering consumer: ", jobInfo)
body, _ := json.Marshal(jobInfo)
- putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body)
+ putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient)
if putErr != nil {
fmt.Printf("Unable to register consumer: %v", putErr)
}
-Subproject commit 8efef0513821e7a586de4831a982833050be03e8
+Subproject commit 3b916e4dc5777863cb4ee873b41ee460fb9aec27
# Policy Agent image and tags
POLICY_AGENT_IMAGE_BASE="onap/ccsdk-oran-a1policymanagementservice"
-POLICY_AGENT_IMAGE_TAG_LOCAL="1.2.2-SNAPSHOT"
-POLICY_AGENT_IMAGE_TAG_REMOTE_SNAPSHOT="1.2-SNAPSHOT"
-POLICY_AGENT_IMAGE_TAG_REMOTE="1.2.2-STAGING-latest" #Will use snapshot repo
-POLICY_AGENT_IMAGE_TAG_REMOTE_RELEASE="1.2.1"
+POLICY_AGENT_IMAGE_TAG_LOCAL="1.3.0-SNAPSHOT"
+POLICY_AGENT_IMAGE_TAG_REMOTE_SNAPSHOT="1.3.0-SNAPSHOT"
+POLICY_AGENT_IMAGE_TAG_REMOTE="1.3.0-STAGING-latest" #Will use snapshot repo
+POLICY_AGENT_IMAGE_TAG_REMOTE_RELEASE="1.3.0"
# SDNC A1 Controller remote image and tag
SDNC_A1_CONTROLLER_IMAGE_BASE="onap/sdnc-image"
-SDNC_A1_CONTROLLER_IMAGE_TAG_LOCAL="2.2.0-SNAPSHOT" ###CHECK THIS
-SDNC_A1_CONTROLLER_IMAGE_TAG_REMOTE_SNAPSHOT="2.2.0-STAGING-latest"
-SDNC_A1_CONTROLLER_IMAGE_TAG_REMOTE="2.2.0-STAGING-latest" #Will use snapshot repo
-SDNC_A1_CONTROLLER_IMAGE_TAG_REMOTE_RELEASE="2.2.0"
+SDNC_A1_CONTROLLER_IMAGE_TAG_LOCAL="2.2.1-SNAPSHOT" ###CHECK THIS
+SDNC_A1_CONTROLLER_IMAGE_TAG_REMOTE_SNAPSHOT="2.2.1-STAGING-latest"
+SDNC_A1_CONTROLLER_IMAGE_TAG_REMOTE="2.2.1-STAGING-latest" #Will use snapshot repo
+SDNC_A1_CONTROLLER_IMAGE_TAG_REMOTE_RELEASE="2.2.1"
#SDNC DB remote image and tag
#The DB is part of SDNC so handled in the same way as SDNC
# Near RT RIC Simulator image and tags - same version as cherry
RIC_SIM_IMAGE_BASE="o-ran-sc/a1-simulator"
RIC_SIM_IMAGE_TAG_LOCAL="latest"
-RIC_SIM_IMAGE_TAG_REMOTE_SNAPSHOT="2.1.0-SNAPSHOT"
-RIC_SIM_IMAGE_TAG_REMOTE="2.1.0"
-RIC_SIM_IMAGE_TAG_REMOTE_RELEASE="2.1.0"
+RIC_SIM_IMAGE_TAG_REMOTE_SNAPSHOT="2.2.0-SNAPSHOT"
+RIC_SIM_IMAGE_TAG_REMOTE="2.2.0"
+RIC_SIM_IMAGE_TAG_REMOTE_RELEASE="2.2.0"
#Consul remote image and tag
filepath: /opt/app/policy-agent/data/application_configuration.json
# path where the service can store data
vardata-directory: /var/policy-management-service
+ # path to json schema for config validation
+ config-file-schema-path: /application_configuration_schema.json
webclient:
# Configuration of the trust store used for the HTTP client (outgoing requests)
# The file location and the password for the truststore is only relevant if trust-store-used == true
value: "1"
- name: ALLOW_HTTP
value: "true"
+ - name: DUPLICATE_CHECK
+ value: "1"
imagePullPolicy: $KUBE_IMAGE_PULL_POLICY
ports:
- name: http
- A1_VERSION=${G1_A1_VERSION}
- REMOTE_HOSTS_LOGGING=1
- ALLOW_HTTP=true
+ - DUPLICATE_CHECK=1
volumes:
- ${RIC_SIM_CERT_MOUNT_DIR}:/usr/src/app/cert:ro
labels:
- A1_VERSION=${G2_A1_VERSION}
- REMOTE_HOSTS_LOGGING=1
- ALLOW_HTTP=true
+ - DUPLICATE_CHECK=1
volumes:
- ${RIC_SIM_CERT_MOUNT_DIR}:/usr/src/app/cert:ro
labels:
- A1_VERSION=${G3_A1_VERSION}
- REMOTE_HOSTS_LOGGING=1
- ALLOW_HTTP=true
+ - DUPLICATE_CHECK=1
volumes:
- ${RIC_SIM_CERT_MOUNT_DIR}:/usr/src/app/cert:ro
labels:
import (
"bytes"
"os"
- "reflect"
"testing"
log "github.com/sirupsen/logrus"
)
func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
+ assertions := require.New(t)
os.Setenv("LOG_LEVEL", "Debug")
os.Setenv("CONSUMER_HOST", "consumerHost")
os.Setenv("CONSUMER_PORT", "8095")
SDNPassword: "pwd",
ORUToODUMapFile: "file",
}
- if got := New(); !reflect.DeepEqual(got, &wantConfig) {
- t.Errorf("New() = %v, want %v", got, &wantConfig)
- }
+
+ got := New()
+ assertions.Equal(&wantConfig, got)
}
func TestNew_faultyIntValueSetConfigContainDefaultValueAndWarnInLog(t *testing.T) {
SDNPassword: "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U",
ORUToODUMapFile: "o-ru-to-o-du-map.csv",
}
- if got := New(); !reflect.DeepEqual(got, &wantConfig) {
- t.Errorf("New() = %v, want %v", got, &wantConfig)
- }
+
+ got := New()
+ assertions.Equal(&wantConfig, got)
+
logString := buf.String()
assertions.Contains(logString, "Invalid int value: wrong for variable: CONSUMER_PORT. Default value: 0 will be used")
}
SDNPassword: "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U",
ORUToODUMapFile: "o-ru-to-o-du-map.csv",
}
- if got := New(); !reflect.DeepEqual(got, &wantConfig) {
- t.Errorf("New() = %v, want %v", got, &wantConfig)
- }
- if got := New(); !reflect.DeepEqual(got, &wantConfig) {
- t.Errorf("New() = %v, want %v", got, &wantConfig)
- }
+ got := New()
+ assertions.Equal(&wantConfig, got)
logString := buf.String()
assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!")
}
)
type Configuration struct {
- ConsumerAddress string
InfoCoordAddress string
SDNRAddress string
SDNRUser string
}
const rawSdnrPath = "/rests/data/network-topology:network-topology/topology=topology-netconf/node=[O-DU-ID]/yang-ext:mount/o-ran-sc-du-hello-world:network-function/du-to-ru-connection=[O-RU-ID]"
-
const unlockMessage = `{"o-ran-sc-du-hello-world:du-to-ru-connection": [{"name":"[O-RU-ID]","administrative-state":"UNLOCKED"}]}`
type LinkFailureHandler struct {
lookupService repository.LookupService
config Configuration
+ client restclient.HTTPClient
}
-func NewLinkFailureHandler(ls repository.LookupService, conf Configuration) *LinkFailureHandler {
+func NewLinkFailureHandler(ls repository.LookupService, conf Configuration, client restclient.HTTPClient) *LinkFailureHandler {
return &LinkFailureHandler{
lookupService: ls,
config: conf,
+ client: client,
}
}
if oDuId, err := lfh.lookupService.GetODuID(oRuId); err == nil {
sdnrPath := getSdnrPath(oRuId, oDuId)
unlockMessage := lfh.getUnlockMessage(oRuId)
- if error := restclient.Put(lfh.config.SDNRAddress+sdnrPath, unlockMessage, lfh.config.SDNRUser, lfh.config.SDNRPassword); error == nil {
+ if error := restclient.Put(lfh.config.SDNRAddress+sdnrPath, unlockMessage, lfh.client, lfh.config.SDNRUser, lfh.config.SDNRPassword); error == nil {
log.Debugf("Sent unlock message for O-RU: %v to O-DU: %v.", oRuId, oDuId)
} else {
log.Warn(error)
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"oransc.org/usecase/oruclosedloop/internal/repository"
- "oransc.org/usecase/oruclosedloop/internal/restclient"
"oransc.org/usecase/oruclosedloop/internal/ves"
"oransc.org/usecase/oruclosedloop/mocks"
)
StatusCode: http.StatusOK,
}, nil)
- restclient.Client = &clientMock
-
lookupServiceMock := mocks.LookupService{}
lookupServiceMock.On("GetODuID", mock.Anything).Return("HCL-O-DU-1122", nil)
SDNRAddress: "http://localhost:9990",
SDNRUser: "admin",
SDNRPassword: "pwd",
- })
+ }, &clientMock)
responseRecorder := httptest.NewRecorder()
r := newRequest(http.MethodPost, "/", getFaultMessage("ERICSSON-O-RU-11220", "CRITICAL"), t)
lookupServiceMock.On("GetODuID", mock.Anything).Return("HCL-O-DU-1122", nil)
- handlerUnderTest := NewLinkFailureHandler(&lookupServiceMock, Configuration{})
+ handlerUnderTest := NewLinkFailureHandler(&lookupServiceMock, Configuration{}, nil)
responseRecorder := httptest.NewRecorder()
r := newRequest(http.MethodPost, "/", getFaultMessage("ERICSSON-O-RU-11220", "NORMAL"), t)
Id: "ERICSSON-O-RU-11220",
})
- handlerUnderTest := NewLinkFailureHandler(&lookupServiceMock, Configuration{})
+ handlerUnderTest := NewLinkFailureHandler(&lookupServiceMock, Configuration{}, nil)
responseRecorder := httptest.NewRecorder()
r := newRequest(http.MethodPost, "/", getFaultMessage("ERICSSON-O-RU-11220", "CRITICAL"), t)
type CsvFileHelperImpl struct{}
-func NewCsvFileHelper() CsvFileHelperImpl {
+func NewCsvFileHelperImpl() CsvFileHelperImpl {
return CsvFileHelperImpl{}
}
-func (h *CsvFileHelperImpl) GetCsvFromFile(name string) ([][]string, error) {
+func (h CsvFileHelperImpl) GetCsvFromFile(name string) ([][]string, error) {
if csvFile, err := os.Open(name); err == nil {
defer csvFile.Close()
reader := csv.NewReader(csvFile)
import (
"os"
- "reflect"
"testing"
+
+ "github.com/stretchr/testify/require"
)
func TestCsvFileHelperImpl_GetCsvFromFile(t *testing.T) {
+ assertions := require.New(t)
filePath := createTempCsvFile()
defer os.Remove(filePath)
type args struct {
name string
}
tests := []struct {
- name string
- fileHelper *CsvFileHelperImpl
- args args
- want [][]string
- wantErr bool
+ name string
+ args args
+ want [][]string
+ wantErrString string
}{
{
- name: "Read from file should return array of content",
- fileHelper: &CsvFileHelperImpl{},
+ name: "Read from file should return array of content",
args: args{
name: filePath,
},
- want: [][]string{{"O-RU-ID", "O-DU-ID"}},
- wantErr: false,
+ want: [][]string{{"O-RU-ID", "O-DU-ID"}},
},
{
- name: "File missing should return error",
- fileHelper: &CsvFileHelperImpl{},
+ name: "File missing should return error",
args: args{
name: "nofile.csv",
},
- want: nil,
- wantErr: true,
+ wantErrString: "open nofile.csv: no such file or directory",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- h := &CsvFileHelperImpl{}
+ h := NewCsvFileHelperImpl()
got, err := h.GetCsvFromFile(tt.args.name)
- if (err != nil) != tt.wantErr {
- t.Errorf("CsvFileHelperImpl.GetCsvFromFile() error = %v, wantErr %v", err, tt.wantErr)
- return
- }
- if !reflect.DeepEqual(got, tt.want) {
- t.Errorf("CsvFileHelperImpl.GetCsvFromFile() = %v, want %v", got, tt.want)
+ assertions.Equal(tt.want, got)
+ if tt.wantErrString != "" {
+ assertions.Contains(err.Error(), tt.wantErrString)
}
})
}
import (
"errors"
- "reflect"
"testing"
+ "github.com/stretchr/testify/require"
"oransc.org/usecase/oruclosedloop/mocks"
)
+func TestIdNotMappedError(t *testing.T) {
+ assertions := require.New(t)
+
+ actualError := IdNotMappedError{
+ Id: "1",
+ }
+ assertions.Equal("O-RU-ID: 1 not mapped.", actualError.Error())
+}
+
func TestNewLookupServiceImpl(t *testing.T) {
+ assertions := require.New(t)
mockCsvFileHelper := &mocks.CsvFileHelper{}
type args struct {
fileHelper CsvFileHelper
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- if got := NewLookupServiceImpl(tt.args.fileHelper, tt.args.fileName); !reflect.DeepEqual(got, tt.want) {
- t.Errorf("NewLookupServiceImpl() = %v, want %v", got, tt.want)
- }
+ got := NewLookupServiceImpl(tt.args.fileHelper, tt.args.fileName)
+ assertions.Equal(tt.want, got)
})
}
}
func TestLookupServiceImpl_Init(t *testing.T) {
- mockCsvFileHelper := &mocks.CsvFileHelper{}
- mockCsvFileHelper.On("GetCsvFromFile", "./map.csv").Return([][]string{{"O-RU-ID", "O-DU-ID"}}, nil).Once()
- mockCsvFileHelper.On("GetCsvFromFile", "foo.csv").Return(nil, errors.New("Error")).Once()
- type fields struct {
- csvFileHelper CsvFileHelper
+ assertions := require.New(t)
+ type args struct {
csvFileName string
- oRuIdToODuIdMap map[string]string
+ mockReturn [][]string
+ mockReturnError error
}
tests := []struct {
- name string
- fields fields
- wantErr bool
+ name string
+ args args
+ wantedORuIdToODuIdMap map[string]string
+ wantErr error
}{
{
- name: "Init with proper csv file should not return error",
- fields: fields{
- csvFileHelper: mockCsvFileHelper,
- csvFileName: "./map.csv",
- oRuIdToODuIdMap: map[string]string{}},
- wantErr: false,
+ name: "Init with proper csv file should not return error and map should be initialized",
+ args: args{
+ csvFileName: "./map.csv",
+ mockReturn: [][]string{{"O-RU-ID", "O-DU-ID"}},
+ },
+ wantedORuIdToODuIdMap: map[string]string{"O-RU-ID": "O-DU-ID"},
},
{
- name: "Init with missing file should return error",
- fields: fields{
- csvFileHelper: mockCsvFileHelper,
+ name: "Init with missing file should return error and map should not be initialized",
+ args: args{
csvFileName: "foo.csv",
- oRuIdToODuIdMap: map[string]string{},
+ mockReturnError: errors.New("Error"),
},
- wantErr: true,
+ wantedORuIdToODuIdMap: map[string]string{},
+ wantErr: errors.New("Error"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- s := LookupServiceImpl{
- csvFileHelper: tt.fields.csvFileHelper,
- csvFileName: tt.fields.csvFileName,
- oRuIdToODuIdMap: tt.fields.oRuIdToODuIdMap,
- }
- if err := s.Init(); (err != nil) != tt.wantErr {
- t.Errorf("LookupServiceImpl.Init() error = %v, wantErr %v", err, tt.wantErr)
- } else if !tt.wantErr {
- wantedMap := map[string]string{"O-RU-ID": "O-DU-ID"}
- if !reflect.DeepEqual(wantedMap, s.oRuIdToODuIdMap) {
- t.Errorf("LookupServiceImpl.Init() map not initialized, wanted map: %v, got map: %v", wantedMap, s.oRuIdToODuIdMap)
- }
- }
+ mockCsvFileHelper := &mocks.CsvFileHelper{}
+ mockCsvFileHelper.On("GetCsvFromFile", tt.args.csvFileName).Return(tt.args.mockReturn, tt.args.mockReturnError)
+
+ s := NewLookupServiceImpl(mockCsvFileHelper, tt.args.csvFileName)
+
+ err := s.Init()
+
+ assertions.Equal(tt.wantErr, err, tt.name)
+ assertions.Equal(tt.wantedORuIdToODuIdMap, s.oRuIdToODuIdMap)
+ mockCsvFileHelper.AssertNumberOfCalls(t, "GetCsvFromFile", 1)
})
}
- mockCsvFileHelper.AssertNumberOfCalls(t, "GetCsvFromFile", 2)
}
func TestLookupServiceImpl_GetODuID(t *testing.T) {
+ assertions := require.New(t)
type fields struct {
csvFileHelper CsvFileHelper
csvFileName string
args: args{
oRuId: "O-RU-ID",
},
- want: "O-DU-ID",
- wantErr: nil,
+ want: "O-DU-ID",
},
{
name: "Id not mapped should return IdNotMappedError",
args: args{
oRuId: "O-RU-ID",
},
- want: "",
wantErr: IdNotMappedError{Id: "O-RU-ID"},
},
}
csvFileName: tt.fields.csvFileName,
oRuIdToODuIdMap: tt.fields.oRuIdToODuIdMap,
}
+
got, err := s.GetODuID(tt.args.oRuId)
- if err != tt.wantErr {
- t.Errorf("LookupServiceImpl.GetODuID() error = %v, wantErr %v", err, tt.wantErr)
- return
- }
- if got != tt.want {
- t.Errorf("LookupServiceImpl.GetODuID() = %v, want %v", got, tt.want)
- }
+
+ assertions.Equal(tt.wantErr, err, tt.name)
+ assertions.Equal(tt.want, got, tt.name)
})
}
}
"fmt"
"io"
"net/http"
- "time"
)
type RequestError struct {
Body []byte
}
-func (pe RequestError) Error() string {
- return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body))
-}
-
// HTTPClient interface
type HTTPClient interface {
Get(url string) (*http.Response, error)
Do(*http.Request) (*http.Response, error)
}
-var (
- Client HTTPClient
-)
-
-func init() {
- Client = &http.Client{
- Timeout: time.Second * 5,
- }
-}
-
-func Get(url string) ([]byte, error) {
- if response, err := Client.Get(url); err == nil {
- if isResponseSuccess(response.StatusCode) {
- defer response.Body.Close()
- if responseData, err := io.ReadAll(response.Body); err == nil {
- return responseData, nil
- } else {
- return nil, err
- }
- } else {
- return nil, getResponseError(response)
- }
- } else {
- return nil, err
- }
+func (pe RequestError) Error() string {
+ return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", pe.StatusCode, string(pe.Body))
}
-func PutWithoutAuth(url string, body []byte) error {
- return do(http.MethodPut, url, body)
+func PutWithoutAuth(url string, body []byte, client HTTPClient) error {
+ return do(http.MethodPut, url, body, client)
}
-func Put(url string, body string, userName string, password string) error {
- return do(http.MethodPut, url, []byte(body), userName, password)
+func Put(url string, body string, client HTTPClient, userName string, password string) error {
+ return do(http.MethodPut, url, []byte(body), client, userName, password)
}
-func Delete(url string) error {
- return do(http.MethodDelete, url, nil)
+func Delete(url string, client HTTPClient) error {
+ return do(http.MethodDelete, url, nil, client)
}
-func do(method string, url string, body []byte, userInfo ...string) error {
+func do(method string, url string, body []byte, client HTTPClient, userInfo ...string) error {
if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
if body != nil {
req.Header.Set("Content-Type", "application/json; charset=utf-8")
if len(userInfo) > 0 {
req.SetBasicAuth(userInfo[0], userInfo[1])
}
- if response, respErr := Client.Do(req); respErr == nil {
+ if response, respErr := client.Do(req); respErr == nil {
if isResponseSuccess(response.StatusCode) {
return nil
} else {
assertions.Equal("Request failed due to error response with status: 400 and body: error", actualError.Error())
}
-func TestGet(t *testing.T) {
- assertions := require.New(t)
-
- type args struct {
- url string
- mockReturnStatus int
- mockReturnBody []byte
- mockReturnError error
- }
- tests := []struct {
- name string
- args args
- want []byte
- wantErr error
- }{
- {
- name: "Ok response",
- args: args{
- url: "ok",
- mockReturnStatus: http.StatusOK,
- mockReturnBody: []byte("body"),
- mockReturnError: nil,
- },
- want: []byte("body"),
- wantErr: nil,
- },
- {
- name: "Bad request should get RequestError",
- args: args{
- url: "badRequest",
- mockReturnStatus: http.StatusBadRequest,
- mockReturnBody: []byte("bad request"),
- mockReturnError: nil,
- },
- want: nil,
- wantErr: RequestError{
- StatusCode: http.StatusBadRequest,
- Body: []byte("bad request"),
- },
- },
- {
- name: "Server unavailable should get error",
- args: args{
- url: "serverUnavailable",
- mockReturnError: fmt.Errorf("Server unavailable"),
- },
- want: nil,
- wantErr: fmt.Errorf("Server unavailable"),
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- clientMock := mocks.HTTPClient{}
- clientMock.On("Get", tt.args.url).Return(&http.Response{
- StatusCode: tt.args.mockReturnStatus,
- Body: ioutil.NopCloser(bytes.NewReader(tt.args.mockReturnBody)),
- }, tt.args.mockReturnError)
- Client = &clientMock
-
- got, err := Get(tt.args.url)
- assertions.Equal(tt.wantErr, err, tt.name)
- assertions.Equal(tt.want, got, tt.name)
- clientMock.AssertCalled(t, "Get", tt.args.url)
- })
- }
-}
-
func TestPutWithoutAuth(t *testing.T) {
assertions := require.New(t)
clientMock.On("Do", mock.Anything).Return(&http.Response{
StatusCode: http.StatusOK,
}, nil)
- Client = &clientMock
- error := PutWithoutAuth("url", []byte("body"))
+ error := PutWithoutAuth("url", []byte("body"), &clientMock)
assertions.Nil(error)
var actualRequest *http.Request
clientMock.On("Do", mock.Anything).Return(&http.Response{
StatusCode: http.StatusOK,
}, nil)
- Client = &clientMock
- error := Put("url", "body", "admin", "pwd")
+ error := Put("url", "body", &clientMock, "admin", "pwd")
assertions.Nil(error)
var actualRequest *http.Request
clientMock.On("Do", mock.Anything).Return(&http.Response{
StatusCode: http.StatusOK,
}, nil)
- Client = &clientMock
- error := Delete("url")
+ error := Delete("url", &clientMock)
assertions.Nil(error)
var actualRequest *http.Request
StatusCode: tt.args.mockReturnStatus,
Body: ioutil.NopCloser(bytes.NewReader(tt.args.mockReturnBody)),
}, tt.args.mockReturnError)
- Client = &clientMock
- err := do("PUT", tt.args.url, nil)
+ err := do("PUT", tt.args.url, nil, &clientMock)
assertions.Equal(tt.wantErr, err, tt.name)
})
}
"encoding/json"
"fmt"
"net/http"
+ "time"
"github.com/gorilla/mux"
log "github.com/sirupsen/logrus"
"oransc.org/usecase/oruclosedloop/internal/restclient"
)
-var consumerConfig linkfailure.Configuration
+const timeoutHTTPClient = time.Second * 5
+const jobId = "14e7bb84-a44d-44c1-90b7-6995a92ad43c"
+
+var infoCoordAddress string
+var linkfailureConfig linkfailure.Configuration
var lookupService repository.LookupService
var host string
var port string
-
-const jobId = "14e7bb84-a44d-44c1-90b7-6995a92ad43c"
+var client restclient.HTTPClient
func init() {
configuration := config.New()
+ client = &http.Client{
+ Timeout: timeoutHTTPClient,
+ }
+
log.SetLevel(configuration.LogLevel)
- if configuration.ConsumerHost == "" || configuration.ConsumerPort == 0 {
- log.Fatal("Consumer host and port must be provided!")
+ if err := validateConfiguration(configuration); err != nil {
+ log.Fatalf("Unable to start consumer due to: %v", err)
}
host = configuration.ConsumerHost
port = fmt.Sprint(configuration.ConsumerPort)
- csvFileHelper := repository.NewCsvFileHelper()
- lookupService = repository.NewLookupServiceImpl(&csvFileHelper, configuration.ORUToODUMapFile)
- if initErr := lookupService.Init(); initErr != nil {
+ csvFileHelper := repository.NewCsvFileHelperImpl()
+ if initErr := initializeLookupService(csvFileHelper, configuration); initErr != nil {
log.Fatalf("Unable to create LookupService due to inability to get O-RU-ID to O-DU-ID map. Cause: %v", initErr)
}
- consumerConfig = linkfailure.Configuration{
- InfoCoordAddress: configuration.InfoCoordinatorAddress,
- SDNRAddress: configuration.SDNRHost + ":" + fmt.Sprint(configuration.SDNRPort),
- SDNRUser: configuration.SDNRUser,
- SDNRPassword: configuration.SDNPassword,
+
+ infoCoordAddress = configuration.InfoCoordinatorAddress
+
+ linkfailureConfig = linkfailure.Configuration{
+ SDNRAddress: configuration.SDNRHost + ":" + fmt.Sprint(configuration.SDNRPort),
+ SDNRUser: configuration.SDNRUser,
+ SDNRPassword: configuration.SDNPassword,
+ }
+}
+
+func validateConfiguration(configuration *config.Config) error {
+ if configuration.ConsumerHost == "" || configuration.ConsumerPort == 0 {
+ return fmt.Errorf("consumer host and port must be provided")
+ }
+ return nil
+}
+
+func initializeLookupService(csvFileHelper repository.CsvFileHelper, configuration *config.Config) error {
+ lookupService = repository.NewLookupServiceImpl(csvFileHelper, configuration.ORUToODUMapFile)
+ if initErr := lookupService.Init(); initErr != nil {
+ return initErr
}
+ return nil
}
func main() {
defer deleteJob()
- messageHandler := linkfailure.NewLinkFailureHandler(lookupService, consumerConfig)
+ messageHandler := linkfailure.NewLinkFailureHandler(lookupService, linkfailureConfig, client)
r := mux.NewRouter()
r.HandleFunc("/", messageHandler.MessagesHandler).Methods(http.MethodPost)
r.HandleFunc("/admin/start", startHandler).Methods(http.MethodPost)
JobDefinition: "{}",
}
body, _ := json.Marshal(jobRegistrationInfo)
- putErr := restclient.PutWithoutAuth(consumerConfig.InfoCoordAddress+"/data-consumer/v1/info-jobs/"+jobId, body)
+ putErr := restclient.PutWithoutAuth(infoCoordAddress+"/data-consumer/v1/info-jobs/"+jobId, body, client)
if putErr != nil {
http.Error(w, fmt.Sprintf("Unable to register consumer job: %v", putErr), http.StatusBadRequest)
return
}
func deleteJob() error {
- return restclient.Delete(consumerConfig.InfoCoordAddress + "/data-consumer/v1/info-jobs/" + jobId)
+ return restclient.Delete(infoCoordAddress+"/data-consumer/v1/info-jobs/"+jobId, client)
}