)
type RmrCommand interface {
+ Send(*xapp.RMRParams, bool) bool
SendRts(*xapp.RMRParams) bool
GetRicMessageName(int) string
}
type RmrWrapper struct {
}
+func (rw RmrWrapper) Send(msg *xapp.RMRParams, isRts bool) bool {
+ return xapp.Rmr.Send(msg, isRts)
+}
+
func (rw RmrWrapper) SendRts(msg *xapp.RMRParams) bool {
return xapp.Rmr.SendRts(msg)
}
xapp.RunWithParams(c, waitForSdl)
}
+func (c *Control) sendRequestToMLxApp(jsonMsg []byte) []byte {
+ client := resty.New()
+ resp, err := client.R().
+ SetHeader("Content-Type", "application/x-www-form-urlencoded").
+ SetHeader("Host", c.mlxAppConfigs.HeaderHost).
+ EnableTrace().
+ SetBody(jsonMsg).
+ Post(fmt.Sprintf("%s:%s/%s", c.mlxAppConfigs.Host, c.mlxAppConfigs.Port, c.mlxAppConfigs.ReqUrl))
+
+ if err != nil || resp == nil || resp.StatusCode() != http.StatusOK {
+ xapp.Logger.Error("failed to POST : err = %s, resp = %s, code = %d", err, resp, resp.StatusCode())
+ return nil
+ }
+
+ xapp.Logger.Info("Response from MLxApp : %s", resp)
+ return resp.Body()
+}
+
+func (c *Control) handleRequestQoEPrediction(msg *xapp.RMRParams) {
+ var qoePredictionRequest data.QoePredictionRequest
+ err := json.Unmarshal(msg.Payload, &qoePredictionRequest)
+ if err != nil {
+ xapp.Logger.Error("filed to unmarshal msg : %s", err)
+ return
+ }
+ xapp.Logger.Info("Received Msg : %+v", qoePredictionRequest)
+
+ var qoePredictionInput data.QoePredictionInput
+ qoePredictionInput.SignatureName = SIGNITURE_NAME
+
+ for i := 0; i < len(qoePredictionRequest.CellMeasurements); i++ {
+ qoePredictionInput.Instances = append(qoePredictionInput.Instances, [][]float32{})
+ qoePredictionInput.Instances[i] = append(qoePredictionInput.Instances[i], []float32{float32(qoePredictionRequest.CellMeasurements[i].PDCPBytesUL), float32(qoePredictionRequest.CellMeasurements[i].PDCPBytesDL)})
+ }
+ xapp.Logger.Info("Qoe Prediction Request = %v", qoePredictionInput)
+
+ jsonbytes, err := json.Marshal(qoePredictionInput)
+ if err != nil {
+ xapp.Logger.Error("fail to marshal : %s", err)
+ return
+ }
+
+ response := c.sendRequestToMLxApp(jsonbytes)
+ if response == nil {
+ xapp.Logger.Error("fail to request prediction to MLxApp")
+ return
+ }
+
+ var qoePredictionResult data.QoePredictionResult
+ err = json.Unmarshal(response, &qoePredictionResult)
+ if err != nil {
+ xapp.Logger.Error("filed to unmarshal msg : %s", err)
+ return
+ }
+ xapp.Logger.Debug("Unmarshaled response: %+v", qoePredictionResult)
+
+ predictions := make(map[string][]float32)
+ for i := 0; i < len(qoePredictionRequest.CellMeasurements); i++ {
+ predictions[qoePredictionRequest.CellMeasurements[i].CellID] = []float32{qoePredictionResult.Predictions[i][0], qoePredictionResult.Predictions[i][1]}
+ }
+ predResultMsg := make(map[string]map[string][]float32)
+ predResultMsg[qoePredictionRequest.PredictionUE] = predictions
+
+ jsonBytes, err := json.Marshal(predResultMsg)
+ if err != nil {
+ xapp.Logger.Error("filed to marshal msg : %s", err)
+ return
+ }
+ xapp.Logger.Info("QoE Prediction Result Msg : " + string(jsonBytes))
+
+ c.sendPredictionResult(msg, jsonBytes)
+}
+
func (c *Control) handleRequestPrediction(msg *xapp.RMRParams) {
var predictRequest data.PredictRequest
err := json.Unmarshal(msg.Payload, &predictRequest)
return
}
- client := resty.New()
- resp, err := client.R().
- SetHeader("Content-Type", "application/x-www-form-urlencoded").
- SetHeader("Host", c.mlxAppConfigs.HeaderHost).
- EnableTrace().
- SetBody(jsonbytes).
- Post(fmt.Sprintf("%s:%s/%s", c.mlxAppConfigs.Host, c.mlxAppConfigs.Port, c.mlxAppConfigs.ReqUrl))
-
- if err != nil || resp == nil || resp.StatusCode() != http.StatusOK {
- xapp.Logger.Error("failed to POST : err = %s, resp = %s, code = %d, sendmsg = %v", err, resp, resp.StatusCode(), qoePrectionInput)
+ response := c.sendRequestToMLxApp(jsonbytes)
+ if response == nil {
+ xapp.Logger.Error("fail to request prediction to MLxApp")
return
}
- xapp.Logger.Info("Response from MLxApp : %s", resp)
- c.sendPredictionResult(msg, resp.Body())
+ c.sendPredictionResult(msg, response)
}
func (c *Control) makeRequestPredictionMsg(cellMetricsEntries []data.CellMetricsEntry) data.QoePredictionInput {
switch msg.Mtype {
case xapp.TS_UE_LIST:
go c.handleRequestPrediction(msg)
+ case xapp.TS_QOE_PRED_REQ:
+ go c.handleRequestQoEPrediction(msg)
default:
xapp.Logger.Info("Unknown message type '%d', discarding", msg.Mtype)
}
msg.Mtype = xapp.TS_QOE_PREDICTION
msg.PayloadLen = len(respBody)
msg.Payload = respBody
- ret := c.SendRts(msg)
+ ret := c.Send(msg, false)
xapp.Logger.Info("result of SendPredictionResult = %t", ret)
}
"github.com/stretchr/testify/assert"
)
-func createPostTestServer() *httptest.Server {
+func createPostTestServer(expectedHttpStatus int, expectedBody []byte) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodPost {
if req.URL.Path == "/v1/models/qoe-model:predict" {
rw.Header().Set("Content-Type", "application/x-www-form-urlencoded")
+ rw.WriteHeader(expectedHttpStatus)
+ rw.Write(expectedBody)
return
}
}
}
func TestHandleRequestPrediction_ExpectSuccess(t *testing.T) {
- server := createPostTestServer()
+ server := createPostTestServer(http.StatusOK, nil)
defer server.Close()
ctrl := gomock.NewController(t)
}
func TestNegativeHandleRequestPrediction_WhenRequestUnmarshalFailed_ExpectReturn(t *testing.T) {
- server := createPostTestServer()
+ server := createPostTestServer(http.StatusOK, nil)
defer server.Close()
ctrl := gomock.NewController(t)
}
func TestNegativeHandleRequestPrediction_WhenResponseStatusBadRequest_ExpectReturn(t *testing.T) {
- server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
- if req.Method == http.MethodPost {
- if req.URL.Path == "/v1/models/qoe-model:predict" {
- rw.Header().Set("Content-Type", "application/json; charset=utf-8")
- rw.WriteHeader(http.StatusBadRequest)
- return
- }
- }
- }))
+ server := createPostTestServer(http.StatusBadRequest, nil)
defer server.Close()
ctrl := gomock.NewController(t)
assert.NotNil(t, ret)
}
+
+func TestHandleRequestQoEPrediction_ExpectSuccess(t *testing.T) {
+ pr, _ := json.Marshal(data.QoePredictionRequest{
+ PredictionUE: "12345",
+ UEMeasurement: data.UEMeasurementType{
+ ServingCellID: "310-680-200-555002",
+ MeasTimestampUEPDCPBytes: "2020-03-18 02:23:18.220",
+ MeasPeriodUEPDCPBytes: 20,
+ UEPDCPBytesDL: 2500000,
+ UEPDCPBytesUL: 1000000,
+ MeasTimestampUEPRBUsage: "2020-03-18 02:23:18.220",
+ MeasPeriodUEPRBUsage: 20,
+ UEPRBUsageDL: 10,
+ UEPRBUsageUL: 30,
+ },
+ CellMeasurements: []data.CellMeasurement{
+ data.CellMeasurement{
+ CellID: "310-680-220-555001",
+ MeasTimestampPDCPBytes: "2020-03-18 02:23:18.220",
+ MeasPeriodPDCPBytes: 20,
+ PDCPBytesDL: 250000,
+ PDCPBytesUL: 100000,
+ MeasTimestampAvailPRB: "2020-03-18 02:23:18.220",
+ MeasPeriodAvailPRB: 20,
+ AvailPRBDL: 30,
+ AvailPRBUL: 50,
+ MeasTimestampRF: "2020-03-18 02:23:18.220",
+ MeasPeriodRF: 40,
+ RFMeasurements: data.RFMeasurement{RSRP: -90, RSRQ: -13, RSSINR: -2.5},
+ },
+ data.CellMeasurement{
+ CellID: "310-680-220-555003",
+ MeasTimestampPDCPBytes: "2020-03-18 02:23:18.220",
+ MeasPeriodPDCPBytes: 20,
+ PDCPBytesDL: 200000,
+ PDCPBytesUL: 120000,
+ MeasTimestampAvailPRB: "2020-03-18 02:23:18.220",
+ MeasPeriodAvailPRB: 20,
+ AvailPRBDL: 60,
+ AvailPRBUL: 80,
+ MeasTimestampRF: "2020-03-18 02:23:18.220",
+ MeasPeriodRF: 40,
+ RFMeasurements: data.RFMeasurement{RSRP: -140, RSRQ: -17, RSSINR: -6},
+ },
+ data.CellMeasurement{
+ CellID: "310-680-220-555002",
+ MeasTimestampPDCPBytes: "2020-03-18 02:23:18.220",
+ MeasPeriodPDCPBytes: 20,
+ PDCPBytesDL: 190000,
+ PDCPBytesUL: 100000,
+ MeasTimestampAvailPRB: "2020-03-18 02:23:18.220",
+ MeasPeriodAvailPRB: 20,
+ AvailPRBDL: 30,
+ AvailPRBUL: 45,
+ MeasTimestampRF: "2020-03-18 02:23:18.220",
+ MeasPeriodRF: 40,
+ RFMeasurements: data.RFMeasurement{RSRP: -115, RSRQ: -16, RSSINR: -5},
+ },
+ },
+ })
+
+ qpr, _ := json.Marshal(data.QoePredictionResult{
+ Predictions: [][]float32{{0.20054793, 0.2541615}, {0.19025849, 0.24147518}, {0.17816328, 0.2247211}},
+ })
+
+ server := createPostTestServer(http.StatusOK, qpr)
+ defer server.Close()
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+ t.Setenv("RIC_MSG_BUF_CHAN_LEN", "256")
+ t.Setenv("MLXAPP_HEADERHOST", "qoe-model.kserve-test.example.com")
+ t.Setenv("MLXAPP_HOST", strings.Join(strings.Split(server.URL, ":")[:2], ":"))
+ t.Setenv("MLXAPP_PORT", strings.Split(server.URL, ":")[2])
+ t.Setenv("MLXAPP_REQURL", "v1/models/qoe-model:predict")
+
+ msg := &xapp.RMRParams{
+ Payload: pr,
+ }
+
+ control := NewControl()
+ control.RmrCommand = mocks_control.NewFakeRMRClient()
+
+ control.handleRequestQoEPrediction(msg)
+}
+
+func TestNegativeHandleQoERequestPrediction_WhenRequestUnmarshalFailed_ExpectReturn(t *testing.T) {
+ server := createPostTestServer(http.StatusOK, nil)
+ defer server.Close()
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+
+ msg := &xapp.RMRParams{}
+
+ control := NewControl()
+ control.RmrCommand = mocks_control.NewFakeRMRClient()
+
+ control.handleRequestQoEPrediction(msg)
+}
+
+func TestNegativeHandleRequestQoEPrediction_WhenResponseStatusBadRequest_ExpectReturn(t *testing.T) {
+ server := createPostTestServer(http.StatusBadRequest, nil)
+
+ ctrl := gomock.NewController(t)
+ defer ctrl.Finish()
+ t.Setenv("RIC_MSG_BUF_CHAN_LEN", "256")
+ t.Setenv("MLXAPP_HEADERHOST", "qoe-model.kserve-test.example.com")
+ t.Setenv("MLXAPP_HOST", strings.Join(strings.Split(server.URL, ":")[:2], ":"))
+ t.Setenv("MLXAPP_PORT", strings.Split(server.URL, ":")[2])
+ t.Setenv("MLXAPP_REQURL", "v1/models/qoe-model:predict")
+
+ pr, _ := json.Marshal(data.QoePredictionRequest{
+ PredictionUE: "12345",
+ UEMeasurement: data.UEMeasurementType{
+ ServingCellID: "310-680-200-555002",
+ MeasTimestampUEPDCPBytes: "2020-03-18 02:23:18.220",
+ MeasPeriodUEPDCPBytes: 20,
+ UEPDCPBytesDL: 2500000,
+ UEPDCPBytesUL: 1000000,
+ MeasTimestampUEPRBUsage: "2020-03-18 02:23:18.220",
+ MeasPeriodUEPRBUsage: 20,
+ UEPRBUsageDL: 10,
+ UEPRBUsageUL: 30,
+ },
+ CellMeasurements: []data.CellMeasurement{
+ data.CellMeasurement{
+ CellID: "310-680-220-555001",
+ MeasTimestampPDCPBytes: "2020-03-18 02:23:18.220",
+ MeasPeriodPDCPBytes: 20,
+ PDCPBytesDL: 250000,
+ PDCPBytesUL: 100000,
+ MeasTimestampAvailPRB: "2020-03-18 02:23:18.220",
+ MeasPeriodAvailPRB: 20,
+ AvailPRBDL: 30,
+ AvailPRBUL: 50,
+ MeasTimestampRF: "2020-03-18 02:23:18.220",
+ MeasPeriodRF: 40,
+ RFMeasurements: data.RFMeasurement{RSRP: -90, RSRQ: -13, RSSINR: -2.5},
+ },
+ data.CellMeasurement{
+ CellID: "310-680-220-555003",
+ MeasTimestampPDCPBytes: "2020-03-18 02:23:18.220",
+ MeasPeriodPDCPBytes: 20,
+ PDCPBytesDL: 200000,
+ PDCPBytesUL: 120000,
+ MeasTimestampAvailPRB: "2020-03-18 02:23:18.220",
+ MeasPeriodAvailPRB: 20,
+ AvailPRBDL: 60,
+ AvailPRBUL: 80,
+ MeasTimestampRF: "2020-03-18 02:23:18.220",
+ MeasPeriodRF: 40,
+ RFMeasurements: data.RFMeasurement{RSRP: -140, RSRQ: -17, RSSINR: -6},
+ },
+ data.CellMeasurement{
+ CellID: "310-680-220-555002",
+ MeasTimestampPDCPBytes: "2020-03-18 02:23:18.220",
+ MeasPeriodPDCPBytes: 20,
+ PDCPBytesDL: 190000,
+ PDCPBytesUL: 100000,
+ MeasTimestampAvailPRB: "2020-03-18 02:23:18.220",
+ MeasPeriodAvailPRB: 20,
+ AvailPRBDL: 30,
+ AvailPRBUL: 45,
+ MeasTimestampRF: "2020-03-18 02:23:18.220",
+ MeasPeriodRF: 40,
+ RFMeasurements: data.RFMeasurement{RSRP: -115, RSRQ: -16, RSSINR: -5},
+ },
+ },
+ })
+
+ msg := &xapp.RMRParams{
+ Payload: pr,
+ }
+
+ control := NewControl()
+ control.RmrCommand = mocks_control.NewFakeRMRClient()
+
+ control.handleRequestQoEPrediction(msg)
+}
type PredictRequest struct {
UEPredictionSet []string `json:"UEPredictionSet"` // {"UEPredictionSet": ["Car-1"]}
}
+
+type QoePredictionResult struct {
+ Predictions [][]float32 `json:"predictions"`
+}
+
+type QoePredictionRequest struct {
+ PredictionUE string `json:"PredictionUE"`
+ UEMeasurement UEMeasurementType `json:"UEMeasurements"`
+ CellMeasurements []CellMeasurement `json:"CellMeasurements"`
+}
+
+type UEMeasurementType struct {
+ ServingCellID string `json:"ServingCellID"`
+ MeasTimestampUEPDCPBytes string `json:"MeasTimestampUEPDCPBytes"`
+ MeasPeriodUEPDCPBytes int64 `json:"MeasPeriodUEPDCPBytes"`
+ UEPDCPBytesDL int64 `json:"UEPDCPBytesDL"`
+ UEPDCPBytesUL int64 `json:"UEPDCPBytesUL"`
+ MeasTimestampUEPRBUsage string `json:"MeasTimestampUEPRBUsage"`
+ MeasPeriodUEPRBUsage int64 `json:"MeasPeriodUEPRBUsage"`
+ UEPRBUsageDL int64 `json:"UEPRBUsageDL"`
+ UEPRBUsageUL int64 `json:"UEPRBUsageUL"`
+}
+
+type CellMeasurement struct {
+ CellID string `json:"CellID"`
+ MeasTimestampPDCPBytes string `json:"MeasTimestampPDCPBytes"`
+ MeasPeriodPDCPBytes int64 `json:"MeasPeriodPDCPBytes"`
+ PDCPBytesDL int64 `json:"PDCPBytesDL"`
+ PDCPBytesUL int64 `json:"PDCPBytesUL"`
+ MeasTimestampAvailPRB string `json:"MeasTimestampAvailPRB"`
+ MeasPeriodAvailPRB int64 `json:"MeasPeriodAvailPRB"`
+ AvailPRBDL int64 `json:"AvailPRBDL"`
+ AvailPRBUL int64 `json:"AvailPRBUL"`
+ MeasTimestampRF string `json:"MeasTimestampRF"`
+ MeasPeriodRF int64 `json:"MeasPeriodRF"`
+ RFMeasurements RFMeasurement `json:"RFMeasurements"`
+}
+
+type RFMeasurement struct {
+ RSRP float32 `json:"RSRP"`
+ RSRQ float32 `json:"RSRQ"`
+ RSSINR float32 `json:"RSSINR"`
+}