Status handler for DMaaP mediator producer 96/6696/1
authorelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 6 Sep 2021 14:05:01 +0000 (16:05 +0200)
committerelinuxhenrik <henrik.b.andersson@est.tech>
Mon, 6 Sep 2021 14:05:07 +0000 (16:05 +0200)
Issue-ID: NONRTRIC-595
Signed-off-by: elinuxhenrik <henrik.b.andersson@est.tech>
Change-Id: Ia97075ba6532c040659e7781ee6e753511d7b91a

dmaap-mediator-producer/internal/config/config.go
dmaap-mediator-producer/internal/config/config_test.go
dmaap-mediator-producer/internal/server/server.go [new file with mode: 0644]
dmaap-mediator-producer/internal/server/server_test.go [new file with mode: 0644]
dmaap-mediator-producer/main.go

index a3e3a11..764e89c 100644 (file)
@@ -25,10 +25,12 @@ import (
 )
 
 type Config struct {
-       LogLevel                           string
-       InfoJobCallbackUrl                 string
-       InfoCoordinatorAddress             string
-       InfoProducerSupervisionCallbackUrl string
+       LogLevel                            string
+       InfoProducerSupervisionCallbackHost string
+       InfoProducerSupervisionCallbackPort string
+       InfoJobCallbackHost                 string
+       InfoJobCallbackPort                 string
+       InfoCoordinatorAddress              string
 }
 
 type ProducerRegistrationInfo struct {
@@ -39,10 +41,12 @@ type ProducerRegistrationInfo struct {
 
 func New() *Config {
        return &Config{
-               LogLevel:                           getEnv("LOG_LEVEL", "Info"),
-               InfoJobCallbackUrl:                 getEnv("INFO_JOB_CALLBACK_URL", ""),
-               InfoCoordinatorAddress:             getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
-               InfoProducerSupervisionCallbackUrl: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", ""),
+               LogLevel:                            getEnv("LOG_LEVEL", "Info"),
+               InfoProducerSupervisionCallbackHost: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", ""),
+               InfoProducerSupervisionCallbackPort: getEnv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "8085"),
+               InfoJobCallbackHost:                 getEnv("INFO_JOB_CALLBACK_HOST", ""),
+               InfoJobCallbackPort:                 getEnv("INFO_JOB_CALLBACK_PORT", "8086"),
+               InfoCoordinatorAddress:              getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
        }
 }
 
index 88ba1d8..6b02e42 100644 (file)
@@ -28,15 +28,19 @@ import (
 
 func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
        os.Setenv("LOG_LEVEL", "Debug")
-       os.Setenv("INFO_JOB_CALLBACK_URL", "jobCallbackUrl")
+       os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_HOST", "supervisionCallbackHost")
+       os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_PORT", "supervisionCallbackPort")
+       os.Setenv("INFO_JOB_CALLBACK_HOST", "jobCallbackHost")
+       os.Setenv("INFO_JOB_CALLBACK_PORT", "jobCallbackPort")
        os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
-       os.Setenv("INFO_PRODUCER_SUPERVISION_CALLBACK_URL", "supervisionCallbackUrl")
        defer os.Clearenv()
        wantConfig := Config{
-               LogLevel:                           "Debug",
-               InfoJobCallbackUrl:                 "jobCallbackUrl",
-               InfoCoordinatorAddress:             "infoCoordAddr",
-               InfoProducerSupervisionCallbackUrl: "supervisionCallbackUrl",
+               LogLevel:                            "Debug",
+               InfoProducerSupervisionCallbackHost: "supervisionCallbackHost",
+               InfoProducerSupervisionCallbackPort: "supervisionCallbackPort",
+               InfoJobCallbackHost:                 "jobCallbackHost",
+               InfoJobCallbackPort:                 "jobCallbackPort",
+               InfoCoordinatorAddress:              "infoCoordAddr",
        }
        if got := New(); !reflect.DeepEqual(got, &wantConfig) {
                t.Errorf("New() = %v, want %v", got, &wantConfig)
@@ -45,10 +49,12 @@ func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
 
 func TestNew_envVarsNotSetConfigContainDefaultValues(t *testing.T) {
        wantConfig := Config{
-               LogLevel:                           "Info",
-               InfoJobCallbackUrl:                 "",
-               InfoCoordinatorAddress:             "http://enrichmentservice:8083",
-               InfoProducerSupervisionCallbackUrl: "",
+               LogLevel:                            "Info",
+               InfoProducerSupervisionCallbackHost: "",
+               InfoProducerSupervisionCallbackPort: "8085",
+               InfoJobCallbackHost:                 "",
+               InfoJobCallbackPort:                 "8086",
+               InfoCoordinatorAddress:              "http://enrichmentservice:8083",
        }
        if got := New(); !reflect.DeepEqual(got, &wantConfig) {
                t.Errorf("New() = %v, want %v", got, &wantConfig)
diff --git a/dmaap-mediator-producer/internal/server/server.go b/dmaap-mediator-producer/internal/server/server.go
new file mode 100644 (file)
index 0000000..ca30d73
--- /dev/null
@@ -0,0 +1,40 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package server
+
+import (
+       "fmt"
+       "net/http"
+)
+
+func StatusHandler(w http.ResponseWriter, r *http.Request) {
+       if r.URL.Path != "/" {
+               http.Error(w, "404 not found.", http.StatusNotFound)
+               return
+       }
+
+       if r.Method != "GET" {
+               http.Error(w, "Method is not supported.", http.StatusNotFound)
+               return
+       }
+
+       fmt.Fprintf(w, "All is well!")
+}
diff --git a/dmaap-mediator-producer/internal/server/server_test.go b/dmaap-mediator-producer/internal/server/server_test.go
new file mode 100644 (file)
index 0000000..9213216
--- /dev/null
@@ -0,0 +1,90 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package server
+
+import (
+       "io"
+       "net/http"
+       "net/http/httptest"
+       "testing"
+
+       "github.com/stretchr/testify/require"
+)
+
+func TestStatusHandler(t *testing.T) {
+       assertions := require.New(t)
+       type args struct {
+               responseRecorder *httptest.ResponseRecorder
+               r                *http.Request
+       }
+       tests := []struct {
+               name         string
+               args         args
+               wantedStatus int
+               wantedBody   string
+       }{
+               {
+                       name: "StatusHandler with correct path and method, should return OK",
+                       args: args{
+                               responseRecorder: httptest.NewRecorder(),
+                               r:                newRequest("GET", "/", nil, t),
+                       },
+                       wantedStatus: http.StatusOK,
+                       wantedBody:   "All is well!",
+               },
+               {
+                       name: "StatusHandler with incorrect path, should return NotFound",
+                       args: args{
+                               responseRecorder: httptest.NewRecorder(),
+                               r:                newRequest("GET", "/wrong", nil, t),
+                       },
+                       wantedStatus: http.StatusNotFound,
+                       wantedBody:   "404 not found.\n",
+               },
+               {
+                       name: "StatusHandler with incorrect method, should return NotFound",
+                       args: args{
+                               responseRecorder: httptest.NewRecorder(),
+                               r:                newRequest("PUT", "/", nil, t),
+                       },
+                       wantedStatus: http.StatusNotFound,
+                       wantedBody:   "Method is not supported.\n",
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       handler := http.HandlerFunc(StatusHandler)
+                       handler.ServeHTTP(tt.args.responseRecorder, tt.args.r)
+                       assertions.Equal(tt.wantedStatus, tt.args.responseRecorder.Code)
+
+                       assertions.Equal(tt.wantedBody, tt.args.responseRecorder.Body.String())
+               })
+       }
+}
+
+func newRequest(method string, url string, body io.Reader, t *testing.T) *http.Request {
+       if req, err := http.NewRequest(method, url, body); err == nil {
+               return req
+       } else {
+               t.Fatalf("Could not create request due to: %v", err)
+               return nil
+       }
+}
index d38496f..b357f69 100644 (file)
 package main
 
 import (
-       "time"
+       "fmt"
+       "net/http"
 
        log "github.com/sirupsen/logrus"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/config"
        "oransc.org/nonrtric/dmaapmediatorproducer/internal/jobtypes"
+       "oransc.org/nonrtric/dmaapmediatorproducer/internal/server"
 )
 
 var configuration *config.Config
+var supervisionCallbackAddress string
+var jobInfoCallbackAddress string
 
 func init() {
        configuration = config.New()
@@ -39,12 +43,15 @@ func init() {
        }
 
        log.Debug("Initializing DMaaP Mediator Producer")
-       if configuration.InfoJobCallbackUrl == "" {
-               log.Fatal("Missing INFO_JOB_CALLBACK_URL")
+       if configuration.InfoProducerSupervisionCallbackHost == "" {
+               log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_HOST")
        }
-       if configuration.InfoProducerSupervisionCallbackUrl == "" {
-               log.Fatal("Missing INFO_PRODUCER_SUPERVISION_CALLBACK_URL")
+       supervisionCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoProducerSupervisionCallbackHost, configuration.InfoProducerSupervisionCallbackPort)
+
+       if configuration.InfoJobCallbackHost == "" {
+               log.Fatal("Missing INFO_JOB_CALLBACK_HOST")
        }
+       jobInfoCallbackAddress = fmt.Sprintf("%v:%v", configuration.InfoJobCallbackHost, configuration.InfoJobCallbackPort)
 
        registrator := config.NewRegistratorImpl(configuration.InfoCoordinatorAddress)
        if types, err := jobtypes.GetTypes(); err == nil {
@@ -55,9 +62,9 @@ func init() {
                log.Fatalf("Unable to get types to register due to: %v", err)
        }
        producer := config.ProducerRegistrationInfo{
-               InfoProducerSupervisionCallbackUrl: configuration.InfoProducerSupervisionCallbackUrl,
+               InfoProducerSupervisionCallbackUrl: supervisionCallbackAddress,
                SupportedInfoTypes:                 jobtypes.GetSupportedTypes(),
-               InfoJobCallbackUrl:                 configuration.InfoJobCallbackUrl,
+               InfoJobCallbackUrl:                 jobInfoCallbackAddress,
        }
        if err := registrator.RegisterProducer("DMaaP_Mediator_Producer", &producer); err != nil {
                log.Fatalf("Unable to register producer due to: %v", err)
@@ -66,5 +73,11 @@ func init() {
 
 func main() {
        log.Debug("Starting DMaaP Mediator Producer")
-       time.Sleep(1000 * time.Millisecond)
+       log.Debugf("Starting status callback server at port %v", configuration.InfoProducerSupervisionCallbackPort)
+       http.HandleFunc("/", server.StatusHandler)
+
+       if err := http.ListenAndServe(":"+configuration.InfoProducerSupervisionCallbackPort, nil); err != nil {
+               log.Fatal(err)
+       }
+       log.Debug("Stopping DMaaP Mediator Producer")
 }