## Development
-To make it easy to test during development of the consumer, two stubs are provided in the `stub` folder.
+To make it easy to test during development of the consumer, there is a stub provided in the `stub` folder.
-One, under the `mrstub` folder, called `mrstub` that stubs the VES message received from Dmaap and pushes messages with information about performance measurements for the slices in a determinated DU. To build and start the stub, do the following:
->1. cd stub/producer
->2. go build
->3. ./mrstub
+This stub is used to simulate both received VES messages from Dmaap MR with information about performance measurements for the slices in a determinated DU and also SDNR, that sends information about Radio Resource Management Policy Ratio and allows to modify value for RRM Policy Dedicated Ratio from default to higher value.
-One, under the `sdnr` folder, called `sdnr` that at startup will listen for REST calls and print the body of them. By default, it listens to the port `3904`, but his can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
->1. cd stub/sdnr
->2. go build
->3. ./sdnr
+By default, SDNR stub listens to the port `3904`, but his can be overridden by passing a `--sdnr-port [PORT]` flag when starting the stub. For Dmaap MR stub default port is `3905` but it can be overriden by passing a `--dmaap-port [PORT]` flag when starting the stub.
+To build and start the stub, do the following:
+
+>1. cd stub
+>2. go build
+>3. ./stub [--sdnr-port <portNo>] [--dmaap-port <portNo>]
## License
SDNRAddress: getEnv("SDNR_ADDR", "http://localhost:3904"),
SDNRUser: getEnv("SDNR_USER", "admin"),
SDNPassword: getEnv("SDNR_PASSWORD", "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U"),
- Polltime: getEnvAsInt("Polltime", 10),
+ Polltime: getEnvAsInt("Polltime", 30),
LogLevel: getLogLevel(),
}
}
"fmt"
"io"
"net/http"
+
+ log "github.com/sirupsen/logrus"
)
type Client struct {
}
func (c *Client) Post(path string, payload interface{}, v interface{}) error {
+
+ s, _ := json.MarshalIndent(payload, "", "\t")
+ log.Debugf("Post request payload: " + string(s))
+
req, err := c.newRequest(http.MethodPost, path, payload)
if err != nil {
return fmt.Errorf("failed to create POST request: %w", err)
if reqBody != nil {
req.Header.Set("Content-Type", "application/json; charset=utf-8")
}
- fmt.Printf("Http Client Request: [%s:%s]\n", req.Method, req.URL)
+ log.Debugf("Http Client Request: [%s:%s]\n", req.Method, req.URL)
return req, nil
}
if err := dec.Decode(v); err != nil {
return fmt.Errorf("could not parse response body: %w [%s:%s]", err, r.Method, r.URL.String())
}
- fmt.Printf("Http Client Response: %+v\n", v)
+ log.Debugf("Http Client Response: %v\n", v)
return nil
}
package sliceassurance
import (
- "fmt"
"net/http"
- "strconv"
"time"
"oransc.org/usecase/oduclosedloop/internal/restclient"
"oransc.org/usecase/oduclosedloop/internal/structures"
"oransc.org/usecase/oduclosedloop/messages"
+
+ log "github.com/sirupsen/logrus"
)
const (
)
type App struct {
- Client restclient.HTTPClient
- MetricsPolicies *structures.SliceAssuranceMeas
+ client restclient.HTTPClient
+ metricsPolicies *structures.SliceAssuranceMeas
}
var dmaapMRUrl string
-var SDNRUrl string
+var sDNRUrl string
func (a *App) Initialize(dmaapUrl string, sdnrUrl string) {
dmaapMRUrl = dmaapUrl
- SDNRUrl = sdnrUrl
+ sDNRUrl = sdnrUrl
- a.Client = restclient.New(&http.Client{})
- a.MetricsPolicies = structures.NewSliceAssuranceMeas()
+ a.client = restclient.New(&http.Client{})
+ a.metricsPolicies = structures.NewSliceAssuranceMeas()
}
func (a *App) Run(topic string, pollTime int) {
for {
- fmt.Printf("Polling new messages from DmaapMR\n")
- var stdMessage messages.StdDefinedMessage
-
- a.Client.Get(dmaapMRUrl+topic, &stdMessage)
-
- a.processMessages(stdMessage)
+ a.getMessagesFromDmaap(dmaapMRUrl + topic)
- exceedsThMetrics := a.checkIfThresholdIsExceed()
- if len(exceedsThMetrics) > 0 {
- a.updateDedicatedRatio(exceedsThMetrics)
+ for key := range a.metricsPolicies.Metrics {
+ a.getRRMInformation(key.Duid)
}
+ a.updateDedicatedRatio()
time.Sleep(time.Second * time.Duration(pollTime))
}
}
-func (a *App) processMessages(stdMessage messages.StdDefinedMessage) {
+func (a *App) getMessagesFromDmaap(url string) {
+ var stdMessage messages.StdDefinedMessage
+ a.client.Get(url, &stdMessage)
+ log.Infof("Polling new messages from DmaapMR: %v", stdMessage)
for _, meas := range stdMessage.GetMeasurements() {
-
- fmt.Printf("New measurement: %+v\n", meas)
//Create sliceMetric and check if metric exist and update existing one or create new one
- tmpSm := meas.CreateSliceMetric()
- a.MetricsPolicies.AddOrUpdateMetric(tmpSm)
-
- //Fetch policy ratio metrics from SDNR
- var duRRMPolicyRatio messages.ORanDuRestConf
- a.Client.Get(getUrlForDistributedUnitFunctions(SDNRUrl, tmpSm.DUId), &duRRMPolicyRatio)
-
- //Get DuId and check if we have metrics for it
- policyRatioDuId := duRRMPolicyRatio.DistributedUnitFunction.Id
- policies := duRRMPolicyRatio.DistributedUnitFunction.RRMPolicyRatio
- for _, policy := range policies {
- members:
- for _, member := range policy.RRMPolicyMembers {
- metric := a.MetricsPolicies.GetSliceMetric(policyRatioDuId, member.SliceDifferentiator, member.SliceServiceType)
- if metric != nil {
- a.MetricsPolicies.AddNewPolicy(addOrUpdatePolicyRatio(metric, policy))
- break members
- }
- }
+ if _, err := a.metricsPolicies.AddOrUpdateMetric(meas); err != nil {
+ log.Error("Metric could not be added ", err)
}
}
}
-func (a *App) checkIfThresholdIsExceed() []*structures.SliceMetric {
- exceedsThMetrics := make([]*structures.SliceMetric, 0)
- for _, metric := range a.MetricsPolicies.Metrics {
- for key, value := range metric.PM {
+func (a *App) getRRMInformation(duid string) {
+ var duRRMPolicyRatio messages.ORanDuRestConf
+ a.client.Get(getUrlForDistributedUnitFunctions(sDNRUrl, duid), &duRRMPolicyRatio)
- if (value) > THRESHOLD_TPUT {
- fmt.Printf("PM: [%v, %v] exceeds threshold value!\n", key, value)
- exceedsThMetrics = append(exceedsThMetrics, metric)
- }
- }
+ policies := duRRMPolicyRatio.DistributedUnitFunction.RRMPolicyRatio
+ for _, policy := range policies {
+ a.metricsPolicies.AddNewPolicy(duid, policy)
}
- return exceedsThMetrics
}
-func (a *App) updateDedicatedRatio(exceedsThMetrics []*structures.SliceMetric) {
- for _, m := range exceedsThMetrics {
- //Check if RRMPolicyDedicatedRatio is higher than default value
- policy := a.MetricsPolicies.Policies[m.RRMPolicyRatioId]
+func (a *App) updateDedicatedRatio() {
- if policy != nil && policy.PolicyDedicatedRatio <= DEFAULT_DEDICATED_RATIO {
- //Send PostRequest to update DedicatedRatio
- url := getUrlUpdatePolicyDedicatedRatio(SDNRUrl, m.DUId, policy.PolicyRatioId)
- a.Client.Post(url, messages.GetDedicatedRatioUpdateMessage(*m, *policy, NEW_DEDICATED_RATIO), nil)
+ for _, metric := range a.metricsPolicies.Metrics {
+ policy, check := a.metricsPolicies.Policies[metric.RRMPolicyRatioId]
+ //TODO What happened if dedicated ratio is already higher that default and threshold is exceed?
+ if check && policy.PolicyDedicatedRatio <= DEFAULT_DEDICATED_RATIO {
+ log.Infof("Send Post Request to update DedicatedRatio for DU id: %v Policy id: %v", metric.DUId, policy.PolicyRatioId)
+ url := getUrlUpdatePolicyDedicatedRatio(sDNRUrl, metric.DUId, policy.PolicyRatioId)
+ a.client.Post(url, policy.GetUpdateDedicatedRatioMessage(metric.SliceDiff, metric.SliceServiceType, NEW_DEDICATED_RATIO), nil)
}
}
}
-func addOrUpdatePolicyRatio(metric *structures.SliceMetric, policy messages.RRMPolicyRatio) *structures.PolicyRatio {
- if metric.RRMPolicyRatioId == "" {
- metric.RRMPolicyRatioId = policy.Id
- }
- return &structures.PolicyRatio{
- PolicyRatioId: policy.Id,
- PolicyMaxRatio: policy.RRMPolicyMaxRatio,
- PolicyMinRatio: policy.RRMPolicyMinRatio,
- PolicyDedicatedRatio: toInt(policy.RRMPolicyDedicatedRatio),
- }
-}
-
-func toInt(num string) int {
- res, err := strconv.Atoi(num)
- if err != nil {
- return -1
- }
- return res
-}
-
func getUrlForDistributedUnitFunctions(host string, duid string) string {
return host + "/rests/data/network-topology:network-topology/topology=topology-netconf/node=" + NODE_ID + "/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions=" + duid
}
package structures
+import "oransc.org/usecase/oduclosedloop/messages"
+
type SliceMetric struct {
DUId string
CellId string
type PolicyRatio struct {
PolicyRatioId string
PolicyMaxRatio int
- PolicyMinRatio string
+ PolicyMinRatio int
PolicyDedicatedRatio int
}
-func NewPolicyRatioEntry(id string, max_ratio int, min_ratio string, ded_ratio int) *PolicyRatio {
+func NewPolicyRatio(id string, max_ratio int, min_ratio int, ded_ratio int) *PolicyRatio {
pr := PolicyRatio{
PolicyRatioId: id,
PolicyMaxRatio: max_ratio,
}
return &pr
}
+
+func (pr *PolicyRatio) GetUpdateDedicatedRatioMessage(sd int, sst int, dedicatedRatio int) []messages.RRMPolicyRatio {
+ message := messages.RRMPolicyRatio{
+ Id: pr.PolicyRatioId,
+ AdmState: "Locked",
+ UserLabel: "Some user label",
+ RRMPolicyMaxRatio: pr.PolicyMaxRatio,
+ RRMPolicyMinRatio: pr.PolicyMinRatio,
+ RRMPolicyDedicatedRatio: dedicatedRatio,
+ ResourceType: "prb",
+ RRMPolicyMembers: []messages.RRMPolicyMember{
+ {
+ MobileCountryCode: "046",
+ MobileNetworkCode: "651",
+ SliceDifferentiator: sd,
+ SliceServiceType: sst,
+ },
+ },
+ }
+ return []messages.RRMPolicyRatio{message}
+}
package structures
+import (
+ "fmt"
+ "regexp"
+ "strconv"
+
+ "oransc.org/usecase/oduclosedloop/messages"
+)
+
+type MapKey struct {
+ Duid string
+ sd int
+ sst int
+}
+
type SliceAssuranceMeas struct {
- Metrics []*SliceMetric
+ Metrics map[MapKey]*SliceMetric
Policies map[string]*PolicyRatio
}
func NewSliceAssuranceMeas() *SliceAssuranceMeas {
s := SliceAssuranceMeas{}
- s.Metrics = make([]*SliceMetric, 0)
+ s.Metrics = make(map[MapKey]*SliceMetric)
s.Policies = make(map[string]*PolicyRatio)
return &s
}
-func (sa *SliceAssuranceMeas) AddNewPolicy(pr *PolicyRatio) {
- sa.Policies[pr.PolicyRatioId] = pr
+func (sa *SliceAssuranceMeas) AddNewPolicy(duid string, rrmPolicyRatio messages.RRMPolicyRatio) {
+ for _, policyMember := range rrmPolicyRatio.RRMPolicyMembers {
+ metric := sa.GetSliceMetric(duid, policyMember.SliceDifferentiator, policyMember.SliceServiceType)
+ if metric != nil {
+ pr := NewPolicyRatio(rrmPolicyRatio.Id, rrmPolicyRatio.RRMPolicyMaxRatio, rrmPolicyRatio.RRMPolicyMinRatio, rrmPolicyRatio.RRMPolicyDedicatedRatio)
+ sa.Policies[pr.PolicyRatioId] = pr
+ metric.RRMPolicyRatioId = rrmPolicyRatio.Id
+ }
+ }
}
func (sa *SliceAssuranceMeas) GetSliceMetric(duid string, sd int, sst int) *SliceMetric {
- for _, metric := range sa.Metrics {
- if metric.DUId == duid && metric.SliceDiff == sd && metric.SliceServiceType == sst {
- return metric
- }
+ key := MapKey{duid, sd, sst}
+ value, check := sa.Metrics[key]
+
+ if check {
+ return value
}
+
return nil
}
-func (sa *SliceAssuranceMeas) AddOrUpdateMetric(sm *SliceMetric) {
- metric := sa.GetSliceMetric(sm.DUId, sm.SliceDiff, sm.SliceServiceType)
- if metric != nil {
- for key, value := range sm.PM {
- metric.PM[key] = value
+func (sa *SliceAssuranceMeas) AddOrUpdateMetric(meas messages.Measurement) (string, error) {
+
+ var duid string
+ var sd, sst int
+
+ regex := *regexp.MustCompile(`\/network-function\/distributed-unit-functions\[id=\'(.*)\'\]/cell\[id=\'(.*)\'\]/supported-measurements\/performance-measurement-type\[\.=\'(.*)\'\]\/supported-snssai-subcounter-instances\/slice-differentiator\[\.=(\d)\]\[slice-service-type=(\d+)\]`)
+ res := regex.FindAllStringSubmatch(meas.MeasurementTypeInstanceReference, -1)
+
+ if res != nil && len(res[0]) == 6 {
+ duid = res[0][1]
+ sd = toInt(res[0][4])
+ sst = toInt(res[0][5])
+
+ key := MapKey{duid, sd, sst}
+ value, check := sa.Metrics[key]
+
+ if check {
+ sa.updateMetric(key, value, res[0][3], meas.Value)
+ } else {
+ // Only add new one if value exceeds threshold
+ sa.addMetric(res, meas.Value)
}
} else {
- sa.Metrics = append(sa.Metrics, sm)
+ return duid, fmt.Errorf(" wrong format for MeasurementTypeInstanceReference")
+ }
+ return duid, nil
+}
+
+func (sa *SliceAssuranceMeas) addMetric(res [][]string, metricValue int) {
+ if metricValue > 700 {
+ metric := NewSliceMetric(res[0][1], res[0][2], toInt(res[0][4]), toInt(res[0][5]))
+ metric.PM[res[0][3]] = metricValue
+ key := MapKey{res[0][1], toInt(res[0][4]), toInt(res[0][5])}
+ sa.Metrics[key] = metric
+ }
+}
+
+func (sa *SliceAssuranceMeas) updateMetric(key MapKey, value *SliceMetric, metricName string, metricValue int) {
+ if metricValue < 700 {
+ delete(sa.Metrics, key)
+ } else {
+ value.PM[metricName] = metricValue
+ }
+}
+
+func toInt(num string) int {
+ res, err := strconv.Atoi(num)
+ if err != nil {
+ return -1
+ }
+ return res
+}
+
+func (sa *SliceAssuranceMeas) PrintStructures() {
+ fmt.Printf("SliceAssurance Metrics: \n")
+ for key, metric := range sa.Metrics {
+ fmt.Printf("Key: %+v\n", key)
+ fmt.Printf("Metric: %+v\n", metric)
+ }
+ fmt.Printf("SliceAssurance Policies: \n")
+ for key, metric := range sa.Policies {
+ fmt.Printf("Key: %+v\n", key)
+ fmt.Printf("Metric: %+v\n", metric)
}
}
configuration = config.New()
log.SetLevel(configuration.LogLevel)
+ log.SetFormatter(&log.JSONFormatter{})
+
log.Debug("Using configuration: ", configuration)
dmaapUrl := configuration.MRHost + ":" + configuration.MRPort
package messages
-import (
- "strconv"
-
- "oransc.org/usecase/oduclosedloop/internal/structures"
-)
-
type ORanDuRestConf struct {
DistributedUnitFunction DistributedUnitFunction `json:"distributed-unit-functions"`
}
type DistributedUnitFunction struct {
Id string `json:"id"`
- Cell []Cell `json:"cell"`
RRMPolicyRatio []RRMPolicyRatio `json:"radio-resource-management-policy-ratio"`
}
-type Cell struct {
- Id string `json:"id"`
- AdmState string `json:"administrative-state"`
- OpState string `json:"operational-state"`
- UserLabel string `json:"user-label"`
-}
-
type RRMPolicyRatio struct {
Id string `json:"id"`
AdmState string `json:"administrative-state"`
UserLabel string `json:"user-label"`
RRMPolicyMaxRatio int `json:"radio-resource-management-policy-max-ratio"`
- RRMPolicyMinRatio string `json:"radio-resource-management-policy-min-ratio"`
- RRMPolicyDedicatedRatio string `json:"radio-resource-management-policy-dedicated-ratio"`
+ RRMPolicyMinRatio int `json:"radio-resource-management-policy-min-ratio"`
+ RRMPolicyDedicatedRatio int `json:"radio-resource-management-policy-dedicated-ratio"`
ResourceType string `json:"resource-type"`
RRMPolicyMembers []RRMPolicyMember `json:"radio-resource-management-policy-members"`
}
SliceDifferentiator int `json:"slice-differentiator"`
SliceServiceType int `json:"slice-service-type"`
}
-
-func GetDedicatedRatioUpdateMessage(metric structures.SliceMetric, policy structures.PolicyRatio, dedicatedRatio int) RRMPolicyRatio {
- return RRMPolicyRatio{
- Id: policy.PolicyRatioId,
- AdmState: "Locked",
- UserLabel: "Some user label",
- RRMPolicyMaxRatio: policy.PolicyMaxRatio,
- RRMPolicyMinRatio: policy.PolicyMinRatio,
- RRMPolicyDedicatedRatio: strconv.Itoa(dedicatedRatio),
- ResourceType: "prb",
- RRMPolicyMembers: []RRMPolicyMember{
- {
- MobileCountryCode: "046",
- MobileNetworkCode: "651",
- SliceDifferentiator: metric.SliceDiff,
- SliceServiceType: metric.SliceServiceType,
- },
- },
- }
-}
package messages
-import (
- "fmt"
- "regexp"
- "strconv"
- "strings"
-
- "oransc.org/usecase/oduclosedloop/internal/structures"
-)
-
type StdDefinedMessage struct {
Event Event `json:"event"`
}
func (message StdDefinedMessage) GetMeasurements() []Measurement {
return message.Event.StndDefinedFields.Data.Measurements
}
-
-func (meas Measurement) CreateSliceMetric() *structures.SliceMetric {
- var pmName string
- var duid, cellid string
- var sd, sst int
-
- typeParts := strings.Split(meas.MeasurementTypeInstanceReference, "/")
- for _, part := range typeParts {
- if strings.Contains(part, "distributed-unit-functions") {
- duid = getValueInsideQuotes(part)
-
- } else if strings.Contains(part, "cell[") {
- cellid = getValueInsideQuotes(part)
-
- } else if strings.Contains(part, "performance-measurement-type") {
- pmName = getValueInsideQuotes(part)
-
- } else if strings.Contains(part, "slice-differentiator") {
- sd = getPropertyNumber(part)
-
- } else if strings.Contains(part, "slice-differentiator") {
- res, err := strconv.Atoi(getValueInsideQuotes(part))
- if err != nil {
- sst = -1
- }
- sst = res
- }
- }
-
- sm := structures.NewSliceMetric(duid, cellid, sd, sst)
- sm.PM[pmName] = meas.Value
- return sm
-}
-
-func getValueInsideQuotes(text string) string {
- re := regexp.MustCompile(`\'(.*?)\'`)
-
- match := re.FindAllString(text, -1)
- var res string
- if len(match) == 1 {
- res = strings.Trim(match[0], "'")
- }
- return res
-}
-
-func getPropertyNumber(text string) int {
- re := regexp.MustCompile("[0-9]+")
- match := re.FindAllString(text, -1)
- var res int
- var err error
- if len(match) == 1 {
- res, err = strconv.Atoi(match[0])
- if err != nil {
- fmt.Println(err)
- return -1
- }
- }
- return res
-}
--- /dev/null
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package main
+
+import (
+ "encoding/csv"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "math/rand"
+ "net/http"
+ "os"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/gorilla/mux"
+ "oransc.org/usecase/oduclosedloop/messages"
+
+ log "github.com/sirupsen/logrus"
+)
+
+const THRESHOLD_TPUT int = 700
+
+type SliceAssuranceInformation struct {
+ duId string
+ cellId string
+ sd int
+ sst int
+ metricName string
+ metricValue int
+ policyRatioId string
+ policyMaxRatio int
+ policyMinRatio int
+ policyDedicatedRatio int
+}
+
+var data []*SliceAssuranceInformation
+var messagesToSend []messages.Measurement
+
+func loadData() {
+ lines, err := GetCsvFromFile("test-data.csv")
+ if err != nil {
+ panic(err)
+ }
+ for _, line := range lines {
+ sai := SliceAssuranceInformation{
+ duId: line[0],
+ cellId: line[1],
+ sd: toInt(line[2]),
+ sst: toInt(line[3]),
+ metricName: line[4],
+ metricValue: toInt(line[5]),
+ policyRatioId: line[6],
+ policyMaxRatio: toInt(line[7]),
+ policyMinRatio: toInt(line[8]),
+ policyDedicatedRatio: toInt(line[9]),
+ }
+ data = append(data, &sai)
+ }
+}
+
+func GetCsvFromFile(name string) ([][]string, error) {
+ if csvFile, err := os.Open(name); err == nil {
+ defer csvFile.Close()
+ reader := csv.NewReader(csvFile)
+ reader.FieldsPerRecord = -1
+ if csvData, err := reader.ReadAll(); err == nil {
+ return csvData, nil
+ } else {
+ return nil, err
+ }
+ } else {
+ return nil, err
+ }
+}
+
+func toInt(num string) int {
+ res, err := strconv.Atoi(num)
+ if err != nil {
+ return -1
+ }
+ return res
+}
+
+func main() {
+ rand.Seed(time.Now().UnixNano())
+
+ portSdnr := flag.Int("sdnr-port", 3904, "The port this SDNR stub will listen on")
+ portDmaapMR := flag.Int("dmaap-port", 3905, "The port this Dmaap message router will listen on")
+ flag.Parse()
+
+ loadData()
+
+ wg := new(sync.WaitGroup)
+ wg.Add(2)
+
+ go func() {
+
+ r := mux.NewRouter()
+ r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={NODE-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={O-DU-ID}", getSdnrResponseMessage).Methods(http.MethodGet)
+ r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={NODE-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={O-DU-ID}/radio-resource-management-policy-ratio={POLICY-ID}", updateRRMPolicyDedicatedRatio).Methods(http.MethodPost)
+ r.HandleFunc("/events/unauthenticated.PERFORMANCE_MEASUREMENTS", sendDmaapMRMessages).Methods(http.MethodGet)
+
+ fmt.Println("Starting SDNR stub on port: ", *portSdnr)
+
+ log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *portSdnr), r))
+ wg.Done()
+ }()
+
+ go func() {
+
+ r := mux.NewRouter()
+ r.HandleFunc("/events/unauthenticated.PERFORMANCE_MEASUREMENTS", sendDmaapMRMessages).Methods(http.MethodGet)
+
+ fmt.Println("Starting DmaapMR stub on port: ", *portDmaapMR)
+
+ log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *portDmaapMR), r))
+ wg.Done()
+ }()
+
+ wg.Wait()
+}
+
+func getSdnrResponseMessage(w http.ResponseWriter, r *http.Request) {
+ vars := mux.Vars(r)
+ log.Info("Get messages for RRM Policy Ratio information for O-Du ID ", vars["O-DU-ID"])
+
+ message := messages.ORanDuRestConf{
+ DistributedUnitFunction: messages.DistributedUnitFunction{
+ Id: vars["O-DU-ID"],
+ RRMPolicyRatio: getPolicyRatioMessage(),
+ },
+ }
+ respondWithJSON(w, http.StatusOK, message)
+}
+
+func getPolicyRatioMessage() []messages.RRMPolicyRatio {
+ var policies []messages.RRMPolicyRatio
+
+ for _, entry := range data {
+
+ message := messages.RRMPolicyRatio{
+ Id: entry.policyRatioId,
+ AdmState: "locked",
+ UserLabel: entry.policyRatioId,
+ RRMPolicyMaxRatio: entry.policyMaxRatio,
+ RRMPolicyMinRatio: entry.policyMinRatio,
+ RRMPolicyDedicatedRatio: entry.policyDedicatedRatio,
+ ResourceType: "prb",
+ RRMPolicyMembers: []messages.RRMPolicyMember{
+ {
+ MobileCountryCode: "046",
+ MobileNetworkCode: "651",
+ SliceDifferentiator: entry.sd,
+ SliceServiceType: entry.sst,
+ },
+ },
+ }
+ policies = append(policies, message)
+ }
+ return policies
+}
+
+func updateRRMPolicyDedicatedRatio(w http.ResponseWriter, r *http.Request) {
+ log.Info("Post request to update RRMPolicyDedicatedRatio")
+
+ var prMessages []messages.RRMPolicyRatio
+ decoder := json.NewDecoder(r.Body)
+
+ if err := decoder.Decode(&prMessages); err != nil {
+ respondWithError(w, http.StatusBadRequest, "Invalid request payload")
+ return
+ }
+ defer r.Body.Close()
+
+ findAndUpdatePolicy(prMessages)
+ respondWithJSON(w, http.StatusOK, map[string]string{"status": "200"})
+}
+
+func findAndUpdatePolicy(rRMPolicyRatio []messages.RRMPolicyRatio) {
+ for _, policy := range rRMPolicyRatio {
+ for _, entry := range data {
+ if entry.policyRatioId == policy.Id {
+ entry.policyDedicatedRatio = policy.RRMPolicyDedicatedRatio
+ log.Info("New value for Policy dedicated ratio: ", entry.policyDedicatedRatio)
+ if entry.metricValue > THRESHOLD_TPUT {
+ entry.metricValue = rand.Intn(THRESHOLD_TPUT)
+ }
+ messagesToSend = append(messagesToSend, generateMeasurementEntry(entry))
+ }
+ }
+ }
+}
+
+func respondWithError(w http.ResponseWriter, code int, message string) {
+ respondWithJSON(w, code, map[string]string{"error": message})
+}
+
+func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
+ response, _ := json.Marshal(payload)
+
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(code)
+ w.Write(response)
+}
+
+func sendDmaapMRMessages(w http.ResponseWriter, r *http.Request) {
+ log.Info("Send Dmaap messages")
+ entry := data[rand.Intn(5)]
+
+ maxTput := THRESHOLD_TPUT + 100
+ randomTput := rand.Intn(maxTput-THRESHOLD_TPUT+1) + THRESHOLD_TPUT
+ if randomTput%3 == 0 {
+ log.Info("Using tput value higher than THRESHOLD_TPUT ", randomTput)
+ entry.metricValue = randomTput
+ }
+
+ messagesToSend = append(messagesToSend, generateMeasurementEntry(entry))
+
+ message := messages.StdDefinedMessage{
+ Event: messages.Event{
+ CommonEventHeader: messages.CommonEventHeader{
+ Domain: "stndDefined",
+ StndDefinedNamespace: "o-ran-sc-du-hello-world-pm-streaming-oas3",
+ },
+ StndDefinedFields: messages.StndDefinedFields{
+ StndDefinedFieldsVersion: "1.0",
+ SchemaReference: "https://gerrit.o-ran-sc.org/r/gitweb?p=scp/oam/modeling.git;a=blob_plain;f=data-model/oas3/experimental/o-ran-sc-du-hello-world-oas3.json;hb=refs/heads/master",
+ Data: messages.Data{
+ DataId: "id",
+ Measurements: messagesToSend,
+ },
+ },
+ },
+ }
+
+ time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
+ respondWithJSON(w, http.StatusOK, message)
+
+ messagesToSend = nil
+}
+
+func generateMeasurementEntry(entry *SliceAssuranceInformation) messages.Measurement {
+
+ measurementTypeInstanceReference := "/network-function/distributed-unit-functions[id='" + entry.duId + "']/cell[id='" + entry.cellId + "']/supported-measurements/performance-measurement-type[.='" + entry.metricName + "']/supported-snssai-subcounter-instances/slice-differentiator[.=" + strconv.Itoa(entry.sd) + "][slice-service-type=" + strconv.Itoa(entry.sst) + "]"
+ meas := messages.Measurement{
+
+ MeasurementTypeInstanceReference: measurementTypeInstanceReference,
+ Value: entry.metricValue,
+ Unit: "kbit/s",
+ }
+ return meas
+}
--- /dev/null
+O-DU-1211,cell-1,1,1,user-equipment-average-throughput-downlink,300,rrm-pol-1,20,10,15
+O-DU-1211,cell-1,1,1,user-equipment-average-throughput-uplink,500,rrm-pol-1,20,10,15
+O-DU-1211,cell-1,1,2,user-equipment-average-throughput-downlink,700,rrm-pol-2,20,10,15
+O-DU-1211,cell-1,1,2,user-equipment-average-throughput-uplink,400,rrm-pol-2,20,10,15
+O-DU-1211,cell-1,2,1,user-equipment-average-throughput-downlink,800,rrm-pol-3,20,10,15
+O-DU-1211,cell-1,2,1,user-equipment-average-throughput-uplink,100,rrm-pol-3,20,10,15
+O-DU-1211,cell-1,2,2,user-equipment-average-throughput-downlink,900,rrm-pol-4,20,10,15
+O-DU-1211,cell-1,2,2,user-equipment-average-throughput-uplink,500,rrm-pol-4,20,10,15
+O-DU-1211,cell-1,3,1,user-equipment-average-throughput-downlink,800,rrm-pol-5,20,10,15
+O-DU-1211,cell-1,3,1,user-equipment-average-throughput-uplink,100,rrm-pol-5,20,10,15
\ No newline at end of file