From: minhac.lee Date: Wed, 14 Dec 2022 08:20:26 +0000 (+0900) Subject: - Added function to handle msg from qp-driver X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=0217b5adcaac41eb493a111c0a61007707d844ef;p=ric-app%2Fqp-aimlfw.git - Added function to handle msg from qp-driver - Added test code Issue-Id: AIMLFW-10 Signed-off-by: minhac.lee Change-Id: Ifb744cc005800879611c26e7645c0377a09c8227 --- diff --git a/control/control.go b/control/control.go index 08fa737..64b6025 100644 --- a/control/control.go +++ b/control/control.go @@ -49,6 +49,7 @@ const ( ) type RmrCommand interface { + Send(*xapp.RMRParams, bool) bool SendRts(*xapp.RMRParams) bool GetRicMessageName(int) string } @@ -56,6 +57,10 @@ type RmrCommand interface { type RmrWrapper struct { } +func (rw RmrWrapper) Send(msg *xapp.RMRParams, isRts bool) bool { + return xapp.Rmr.Send(msg, isRts) +} + func (rw RmrWrapper) SendRts(msg *xapp.RMRParams) bool { return xapp.Rmr.SendRts(msg) } @@ -134,6 +139,79 @@ func (c *Control) Run() { xapp.RunWithParams(c, waitForSdl) } +func (c *Control) sendRequestToMLxApp(jsonMsg []byte) []byte { + client := resty.New() + resp, err := client.R(). + SetHeader("Content-Type", "application/x-www-form-urlencoded"). + SetHeader("Host", c.mlxAppConfigs.HeaderHost). + EnableTrace(). + SetBody(jsonMsg). + Post(fmt.Sprintf("%s:%s/%s", c.mlxAppConfigs.Host, c.mlxAppConfigs.Port, c.mlxAppConfigs.ReqUrl)) + + if err != nil || resp == nil || resp.StatusCode() != http.StatusOK { + xapp.Logger.Error("failed to POST : err = %s, resp = %s, code = %d", err, resp, resp.StatusCode()) + return nil + } + + xapp.Logger.Info("Response from MLxApp : %s", resp) + return resp.Body() +} + +func (c *Control) handleRequestQoEPrediction(msg *xapp.RMRParams) { + var qoePredictionRequest data.QoePredictionRequest + err := json.Unmarshal(msg.Payload, &qoePredictionRequest) + if err != nil { + xapp.Logger.Error("filed to unmarshal msg : %s", err) + return + } + xapp.Logger.Info("Received Msg : %+v", qoePredictionRequest) + + var qoePredictionInput data.QoePredictionInput + qoePredictionInput.SignatureName = SIGNITURE_NAME + + for i := 0; i < len(qoePredictionRequest.CellMeasurements); i++ { + qoePredictionInput.Instances = append(qoePredictionInput.Instances, [][]float32{}) + qoePredictionInput.Instances[i] = append(qoePredictionInput.Instances[i], []float32{float32(qoePredictionRequest.CellMeasurements[i].PDCPBytesUL), float32(qoePredictionRequest.CellMeasurements[i].PDCPBytesDL)}) + } + xapp.Logger.Info("Qoe Prediction Request = %v", qoePredictionInput) + + jsonbytes, err := json.Marshal(qoePredictionInput) + if err != nil { + xapp.Logger.Error("fail to marshal : %s", err) + return + } + + response := c.sendRequestToMLxApp(jsonbytes) + if response == nil { + xapp.Logger.Error("fail to request prediction to MLxApp") + return + } + + var qoePredictionResult data.QoePredictionResult + err = json.Unmarshal(response, &qoePredictionResult) + if err != nil { + xapp.Logger.Error("filed to unmarshal msg : %s", err) + return + } + xapp.Logger.Debug("Unmarshaled response: %+v", qoePredictionResult) + + predictions := make(map[string][]float32) + for i := 0; i < len(qoePredictionRequest.CellMeasurements); i++ { + predictions[qoePredictionRequest.CellMeasurements[i].CellID] = []float32{qoePredictionResult.Predictions[i][0], qoePredictionResult.Predictions[i][1]} + } + predResultMsg := make(map[string]map[string][]float32) + predResultMsg[qoePredictionRequest.PredictionUE] = predictions + + jsonBytes, err := json.Marshal(predResultMsg) + if err != nil { + xapp.Logger.Error("filed to marshal msg : %s", err) + return + } + xapp.Logger.Info("QoE Prediction Result Msg : " + string(jsonBytes)) + + c.sendPredictionResult(msg, jsonBytes) +} + func (c *Control) handleRequestPrediction(msg *xapp.RMRParams) { var predictRequest data.PredictRequest err := json.Unmarshal(msg.Payload, &predictRequest) @@ -163,21 +241,13 @@ func (c *Control) handleRequestPrediction(msg *xapp.RMRParams) { return } - client := resty.New() - resp, err := client.R(). - SetHeader("Content-Type", "application/x-www-form-urlencoded"). - SetHeader("Host", c.mlxAppConfigs.HeaderHost). - EnableTrace(). - SetBody(jsonbytes). - Post(fmt.Sprintf("%s:%s/%s", c.mlxAppConfigs.Host, c.mlxAppConfigs.Port, c.mlxAppConfigs.ReqUrl)) - - if err != nil || resp == nil || resp.StatusCode() != http.StatusOK { - xapp.Logger.Error("failed to POST : err = %s, resp = %s, code = %d, sendmsg = %v", err, resp, resp.StatusCode(), qoePrectionInput) + response := c.sendRequestToMLxApp(jsonbytes) + if response == nil { + xapp.Logger.Error("fail to request prediction to MLxApp") return } - xapp.Logger.Info("Response from MLxApp : %s", resp) - c.sendPredictionResult(msg, resp.Body()) + c.sendPredictionResult(msg, response) } func (c *Control) makeRequestPredictionMsg(cellMetricsEntries []data.CellMetricsEntry) data.QoePredictionInput { @@ -198,6 +268,8 @@ func (c *Control) controlLoop() { switch msg.Mtype { case xapp.TS_UE_LIST: go c.handleRequestPrediction(msg) + case xapp.TS_QOE_PRED_REQ: + go c.handleRequestQoEPrediction(msg) default: xapp.Logger.Info("Unknown message type '%d', discarding", msg.Mtype) } @@ -208,7 +280,7 @@ func (c *Control) sendPredictionResult(msg *xapp.RMRParams, respBody []byte) { msg.Mtype = xapp.TS_QOE_PREDICTION msg.PayloadLen = len(respBody) msg.Payload = respBody - ret := c.SendRts(msg) + ret := c.Send(msg, false) xapp.Logger.Info("result of SendPredictionResult = %t", ret) } diff --git a/control/control_test.go b/control/control_test.go index 1e60428..c114976 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -17,11 +17,13 @@ import ( "github.com/stretchr/testify/assert" ) -func createPostTestServer() *httptest.Server { +func createPostTestServer(expectedHttpStatus int, expectedBody []byte) *httptest.Server { return httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { if req.Method == http.MethodPost { if req.URL.Path == "/v1/models/qoe-model:predict" { rw.Header().Set("Content-Type", "application/x-www-form-urlencoded") + rw.WriteHeader(expectedHttpStatus) + rw.Write(expectedBody) return } } @@ -41,7 +43,7 @@ func TestNewControl_ExpectSuccess(t *testing.T) { } func TestHandleRequestPrediction_ExpectSuccess(t *testing.T) { - server := createPostTestServer() + server := createPostTestServer(http.StatusOK, nil) defer server.Close() ctrl := gomock.NewController(t) @@ -91,7 +93,7 @@ func TestHandleRequestPrediction_ExpectSuccess(t *testing.T) { } func TestNegativeHandleRequestPrediction_WhenRequestUnmarshalFailed_ExpectReturn(t *testing.T) { - server := createPostTestServer() + server := createPostTestServer(http.StatusOK, nil) defer server.Close() ctrl := gomock.NewController(t) @@ -150,15 +152,7 @@ func TestNegativeHandleRequestPrediction_WhenInfluxQueryResultEmpty_ExpectReturn } func TestNegativeHandleRequestPrediction_WhenResponseStatusBadRequest_ExpectReturn(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - if req.Method == http.MethodPost { - if req.URL.Path == "/v1/models/qoe-model:predict" { - rw.Header().Set("Content-Type", "application/json; charset=utf-8") - rw.WriteHeader(http.StatusBadRequest) - return - } - } - })) + server := createPostTestServer(http.StatusBadRequest, nil) defer server.Close() ctrl := gomock.NewController(t) @@ -207,3 +201,183 @@ func TestConsume_ExpectSuccess(t *testing.T) { assert.NotNil(t, ret) } + +func TestHandleRequestQoEPrediction_ExpectSuccess(t *testing.T) { + pr, _ := json.Marshal(data.QoePredictionRequest{ + PredictionUE: "12345", + UEMeasurement: data.UEMeasurementType{ + ServingCellID: "310-680-200-555002", + MeasTimestampUEPDCPBytes: "2020-03-18 02:23:18.220", + MeasPeriodUEPDCPBytes: 20, + UEPDCPBytesDL: 2500000, + UEPDCPBytesUL: 1000000, + MeasTimestampUEPRBUsage: "2020-03-18 02:23:18.220", + MeasPeriodUEPRBUsage: 20, + UEPRBUsageDL: 10, + UEPRBUsageUL: 30, + }, + CellMeasurements: []data.CellMeasurement{ + data.CellMeasurement{ + CellID: "310-680-220-555001", + MeasTimestampPDCPBytes: "2020-03-18 02:23:18.220", + MeasPeriodPDCPBytes: 20, + PDCPBytesDL: 250000, + PDCPBytesUL: 100000, + MeasTimestampAvailPRB: "2020-03-18 02:23:18.220", + MeasPeriodAvailPRB: 20, + AvailPRBDL: 30, + AvailPRBUL: 50, + MeasTimestampRF: "2020-03-18 02:23:18.220", + MeasPeriodRF: 40, + RFMeasurements: data.RFMeasurement{RSRP: -90, RSRQ: -13, RSSINR: -2.5}, + }, + data.CellMeasurement{ + CellID: "310-680-220-555003", + MeasTimestampPDCPBytes: "2020-03-18 02:23:18.220", + MeasPeriodPDCPBytes: 20, + PDCPBytesDL: 200000, + PDCPBytesUL: 120000, + MeasTimestampAvailPRB: "2020-03-18 02:23:18.220", + MeasPeriodAvailPRB: 20, + AvailPRBDL: 60, + AvailPRBUL: 80, + MeasTimestampRF: "2020-03-18 02:23:18.220", + MeasPeriodRF: 40, + RFMeasurements: data.RFMeasurement{RSRP: -140, RSRQ: -17, RSSINR: -6}, + }, + data.CellMeasurement{ + CellID: "310-680-220-555002", + MeasTimestampPDCPBytes: "2020-03-18 02:23:18.220", + MeasPeriodPDCPBytes: 20, + PDCPBytesDL: 190000, + PDCPBytesUL: 100000, + MeasTimestampAvailPRB: "2020-03-18 02:23:18.220", + MeasPeriodAvailPRB: 20, + AvailPRBDL: 30, + AvailPRBUL: 45, + MeasTimestampRF: "2020-03-18 02:23:18.220", + MeasPeriodRF: 40, + RFMeasurements: data.RFMeasurement{RSRP: -115, RSRQ: -16, RSSINR: -5}, + }, + }, + }) + + qpr, _ := json.Marshal(data.QoePredictionResult{ + Predictions: [][]float32{{0.20054793, 0.2541615}, {0.19025849, 0.24147518}, {0.17816328, 0.2247211}}, + }) + + server := createPostTestServer(http.StatusOK, qpr) + defer server.Close() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + t.Setenv("RIC_MSG_BUF_CHAN_LEN", "256") + t.Setenv("MLXAPP_HEADERHOST", "qoe-model.kserve-test.example.com") + t.Setenv("MLXAPP_HOST", strings.Join(strings.Split(server.URL, ":")[:2], ":")) + t.Setenv("MLXAPP_PORT", strings.Split(server.URL, ":")[2]) + t.Setenv("MLXAPP_REQURL", "v1/models/qoe-model:predict") + + msg := &xapp.RMRParams{ + Payload: pr, + } + + control := NewControl() + control.RmrCommand = mocks_control.NewFakeRMRClient() + + control.handleRequestQoEPrediction(msg) +} + +func TestNegativeHandleQoERequestPrediction_WhenRequestUnmarshalFailed_ExpectReturn(t *testing.T) { + server := createPostTestServer(http.StatusOK, nil) + defer server.Close() + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + msg := &xapp.RMRParams{} + + control := NewControl() + control.RmrCommand = mocks_control.NewFakeRMRClient() + + control.handleRequestQoEPrediction(msg) +} + +func TestNegativeHandleRequestQoEPrediction_WhenResponseStatusBadRequest_ExpectReturn(t *testing.T) { + server := createPostTestServer(http.StatusBadRequest, nil) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + t.Setenv("RIC_MSG_BUF_CHAN_LEN", "256") + t.Setenv("MLXAPP_HEADERHOST", "qoe-model.kserve-test.example.com") + t.Setenv("MLXAPP_HOST", strings.Join(strings.Split(server.URL, ":")[:2], ":")) + t.Setenv("MLXAPP_PORT", strings.Split(server.URL, ":")[2]) + t.Setenv("MLXAPP_REQURL", "v1/models/qoe-model:predict") + + pr, _ := json.Marshal(data.QoePredictionRequest{ + PredictionUE: "12345", + UEMeasurement: data.UEMeasurementType{ + ServingCellID: "310-680-200-555002", + MeasTimestampUEPDCPBytes: "2020-03-18 02:23:18.220", + MeasPeriodUEPDCPBytes: 20, + UEPDCPBytesDL: 2500000, + UEPDCPBytesUL: 1000000, + MeasTimestampUEPRBUsage: "2020-03-18 02:23:18.220", + MeasPeriodUEPRBUsage: 20, + UEPRBUsageDL: 10, + UEPRBUsageUL: 30, + }, + CellMeasurements: []data.CellMeasurement{ + data.CellMeasurement{ + CellID: "310-680-220-555001", + MeasTimestampPDCPBytes: "2020-03-18 02:23:18.220", + MeasPeriodPDCPBytes: 20, + PDCPBytesDL: 250000, + PDCPBytesUL: 100000, + MeasTimestampAvailPRB: "2020-03-18 02:23:18.220", + MeasPeriodAvailPRB: 20, + AvailPRBDL: 30, + AvailPRBUL: 50, + MeasTimestampRF: "2020-03-18 02:23:18.220", + MeasPeriodRF: 40, + RFMeasurements: data.RFMeasurement{RSRP: -90, RSRQ: -13, RSSINR: -2.5}, + }, + data.CellMeasurement{ + CellID: "310-680-220-555003", + MeasTimestampPDCPBytes: "2020-03-18 02:23:18.220", + MeasPeriodPDCPBytes: 20, + PDCPBytesDL: 200000, + PDCPBytesUL: 120000, + MeasTimestampAvailPRB: "2020-03-18 02:23:18.220", + MeasPeriodAvailPRB: 20, + AvailPRBDL: 60, + AvailPRBUL: 80, + MeasTimestampRF: "2020-03-18 02:23:18.220", + MeasPeriodRF: 40, + RFMeasurements: data.RFMeasurement{RSRP: -140, RSRQ: -17, RSSINR: -6}, + }, + data.CellMeasurement{ + CellID: "310-680-220-555002", + MeasTimestampPDCPBytes: "2020-03-18 02:23:18.220", + MeasPeriodPDCPBytes: 20, + PDCPBytesDL: 190000, + PDCPBytesUL: 100000, + MeasTimestampAvailPRB: "2020-03-18 02:23:18.220", + MeasPeriodAvailPRB: 20, + AvailPRBDL: 30, + AvailPRBUL: 45, + MeasTimestampRF: "2020-03-18 02:23:18.220", + MeasPeriodRF: 40, + RFMeasurements: data.RFMeasurement{RSRP: -115, RSRQ: -16, RSSINR: -5}, + }, + }, + }) + + msg := &xapp.RMRParams{ + Payload: pr, + } + + control := NewControl() + control.RmrCommand = mocks_control.NewFakeRMRClient() + + control.handleRequestQoEPrediction(msg) +} diff --git a/control/mocks/fake_rmr_client.go b/control/mocks/fake_rmr_client.go index 225ac6f..81e09ec 100644 --- a/control/mocks/fake_rmr_client.go +++ b/control/mocks/fake_rmr_client.go @@ -15,3 +15,7 @@ func (frw FakeRMRClient) SendRts(msg *xapp.RMRParams) bool { func (rw FakeRMRClient) GetRicMessageName(id int) string { return "ricMessageName" } + +func (frw FakeRMRClient) Send(msg *xapp.RMRParams, isRts bool) bool { + return true +} diff --git a/data/types.go b/data/types.go index f511912..f591d66 100644 --- a/data/types.go +++ b/data/types.go @@ -44,3 +44,46 @@ type QoePredictionInput struct { type PredictRequest struct { UEPredictionSet []string `json:"UEPredictionSet"` // {"UEPredictionSet": ["Car-1"]} } + +type QoePredictionResult struct { + Predictions [][]float32 `json:"predictions"` +} + +type QoePredictionRequest struct { + PredictionUE string `json:"PredictionUE"` + UEMeasurement UEMeasurementType `json:"UEMeasurements"` + CellMeasurements []CellMeasurement `json:"CellMeasurements"` +} + +type UEMeasurementType struct { + ServingCellID string `json:"ServingCellID"` + MeasTimestampUEPDCPBytes string `json:"MeasTimestampUEPDCPBytes"` + MeasPeriodUEPDCPBytes int64 `json:"MeasPeriodUEPDCPBytes"` + UEPDCPBytesDL int64 `json:"UEPDCPBytesDL"` + UEPDCPBytesUL int64 `json:"UEPDCPBytesUL"` + MeasTimestampUEPRBUsage string `json:"MeasTimestampUEPRBUsage"` + MeasPeriodUEPRBUsage int64 `json:"MeasPeriodUEPRBUsage"` + UEPRBUsageDL int64 `json:"UEPRBUsageDL"` + UEPRBUsageUL int64 `json:"UEPRBUsageUL"` +} + +type CellMeasurement struct { + CellID string `json:"CellID"` + MeasTimestampPDCPBytes string `json:"MeasTimestampPDCPBytes"` + MeasPeriodPDCPBytes int64 `json:"MeasPeriodPDCPBytes"` + PDCPBytesDL int64 `json:"PDCPBytesDL"` + PDCPBytesUL int64 `json:"PDCPBytesUL"` + MeasTimestampAvailPRB string `json:"MeasTimestampAvailPRB"` + MeasPeriodAvailPRB int64 `json:"MeasPeriodAvailPRB"` + AvailPRBDL int64 `json:"AvailPRBDL"` + AvailPRBUL int64 `json:"AvailPRBUL"` + MeasTimestampRF string `json:"MeasTimestampRF"` + MeasPeriodRF int64 `json:"MeasPeriodRF"` + RFMeasurements RFMeasurement `json:"RFMeasurements"` +} + +type RFMeasurement struct { + RSRP float32 `json:"RSRP"` + RSRQ float32 `json:"RSRQ"` + RSSINR float32 `json:"RSSINR"` +}