2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
35 "github.com/prometheus/client_golang/prometheus"
36 "github.com/prometheus/client_golang/prometheus/promhttp"
38 "github.com/gorilla/mux"
39 "oransc.org/usecase/oduclosedloop/messages"
41 log "github.com/sirupsen/logrus"
44 const THRESHOLD_TPUT int = 7000
46 type SliceAssuranceInformation struct {
56 policyDedicatedRatio int
60 data []*SliceAssuranceInformation
61 messagesToSend []messages.Measurement
62 metric = prometheus.NewGaugeVec(prometheus.GaugeOpts{
63 Name: "rapp_slice_assurance_metric",
64 Help: "Duration of the last request call.",
65 }, []string{"MeasurementTypeInstanceReference", "unit"})
69 lines, err := GetCsvFromFile("test-data.csv")
73 for _, line := range lines {
74 sai := SliceAssuranceInformation{
80 metricValue: toInt(line[5]),
81 policyRatioId: line[6],
82 policyMaxRatio: toInt(line[7]),
83 policyMinRatio: toInt(line[8]),
84 policyDedicatedRatio: toInt(line[9]),
86 data = append(data, &sai)
87 metric.WithLabelValues(getMeasTypeInstanceRef(&sai), "kbit/s").Set(float64(THRESHOLD_TPUT))
91 func GetCsvFromFile(name string) ([][]string, error) {
92 if csvFile, err := os.Open(name); err == nil {
94 reader := csv.NewReader(csvFile)
95 reader.FieldsPerRecord = -1
96 if csvData, err := reader.ReadAll(); err == nil {
106 func toInt(num string) int {
107 res, err := strconv.Atoi(num)
115 rand.Seed(time.Now().UnixNano())
117 portSdnr := flag.Int("sdnr-port", 3904, "The port this SDNR stub will listen on")
118 portDmaapMR := flag.Int("dmaap-port", 3905, "The port this Dmaap message router will listen on")
123 wg := new(sync.WaitGroup)
126 prometheus.Register(metric)
131 r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={NODE-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={O-DU-ID}", getSdnrResponseMessage).Methods(http.MethodGet)
132 r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={NODE-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={O-DU-ID}/radio-resource-management-policy-ratio={POLICY-ID}", updateRRMPolicyDedicatedRatio).Methods(http.MethodPut)
134 fmt.Println("Starting SDNR stub on port: ", *portSdnr)
136 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *portSdnr), r))
143 r.Handle("/metrics", promhttp.Handler())
144 r.HandleFunc("/events/unauthenticated.VES_O_RAN_SC_HELLO_WORLD_PM_STREAMING_OUTPUT/myG/C1", sendDmaapMRMessages).Methods(http.MethodGet)
146 fmt.Println("Starting DmaapMR stub on port: ", *portDmaapMR)
148 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *portDmaapMR), r))
155 func getSdnrResponseMessage(w http.ResponseWriter, r *http.Request) {
157 log.Info("Get messages for RRM Policy Ratio information for O-Du ID ", vars["O-DU-ID"])
159 distUnitFunctions := getDistributedUnitFunctionMessage(vars["O-DU-ID"])
161 respondWithJSON(w, http.StatusOK, distUnitFunctions)
164 func getDistributedUnitFunctionMessage(oduId string) messages.ORanDuRestConf {
166 var policies []messages.RRMPolicyRatio
167 keys := make(map[string]bool)
168 for _, entry := range data {
169 if _, value := keys[entry.policyRatioId]; !value {
170 keys[entry.policyRatioId] = true
171 message := messages.RRMPolicyRatio{
173 Id: entry.policyRatioId,
175 UserLabel: entry.policyRatioId,
176 RRMPolicyMaxRatio: entry.policyMaxRatio,
177 RRMPolicyMinRatio: entry.policyMinRatio,
178 RRMPolicyDedicatedRatio: entry.policyDedicatedRatio,
180 RRMPolicyMembers: []messages.RRMPolicyMember{
182 MobileCountryCode: "310",
183 MobileNetworkCode: "150",
184 SliceDifferentiator: entry.sd,
185 SliceServiceType: entry.sst,
189 policies = append(policies, message)
193 var publicLandMobileNetworks []messages.PublicLandMobileNetworks
194 for _, entry := range data {
195 publicLandMobileNetwork := messages.PublicLandMobileNetworks{
196 MobileCountryCode: "310",
197 MobileNetworkCode: "150",
198 SliceDifferentiator: entry.sd,
199 SliceServiceType: entry.sst,
201 publicLandMobileNetworks = append(publicLandMobileNetworks, publicLandMobileNetwork)
204 var supportedSnssaiSubcounterInstances []messages.SupportedSnssaiSubcounterInstances
205 for _, entry := range data {
206 supportedSnssaiSubcounterInstance := messages.SupportedSnssaiSubcounterInstances{
207 SliceDifferentiator: entry.sd,
208 SliceServiceType: entry.sst,
210 supportedSnssaiSubcounterInstances = append(supportedSnssaiSubcounterInstances, supportedSnssaiSubcounterInstance)
213 cell := messages.Cell{
217 BaseStationChannelBandwidth: messages.BaseStationChannelBandwidth{
220 SupplementaryUplink: 84000,
222 OperationalState: "enabled",
223 TrackingAreaCode: 10,
224 AdmState: "unlocked",
225 PublicLandMobileNetworks: publicLandMobileNetworks,
226 SupportedMeasurements: []messages.SupportedMeasurements{
228 PerformanceMeasurementType: "o-ran-sc-du-hello-world:user-equipment-average-throughput-uplink",
229 SupportedSnssaiSubcounterInstances: supportedSnssaiSubcounterInstances,
232 PerformanceMeasurementType: "o-ran-sc-du-hello-world:user-equipment-average-throughput-downlink",
233 SupportedSnssaiSubcounterInstances: supportedSnssaiSubcounterInstances,
236 TrafficState: "active",
237 AbsoluteRadioFrequencyChannelNumber: messages.AbsoluteRadioFrequencyChannelNumber{
240 SupplementaryUplink: 14500,
243 SynchronizationSignalBlock: messages.SynchronizationSignalBlock{
245 FrequencyChannelNumber: 12,
247 SubcarrierSpacing: 30,
252 distUnitFunction := messages.DistributedUnitFunction{
254 OperationalState: "enabled",
255 AdmState: "unlocked",
257 Cell: []messages.Cell{
260 RRMPolicyRatio: policies,
263 duRRMPolicyRatio := messages.ORanDuRestConf{
264 DistributedUnitFunction: []messages.DistributedUnitFunction{
269 return duRRMPolicyRatio
272 func updateRRMPolicyDedicatedRatio(w http.ResponseWriter, r *http.Request) {
273 var policies struct {
274 RRMPolicies []messages.RRMPolicyRatio `json:"radio-resource-management-policy-ratio"`
276 decoder := json.NewDecoder(r.Body)
278 if err := decoder.Decode(&policies); err != nil {
279 respondWithError(w, http.StatusBadRequest, "Invalid request payload")
284 prMessages := policies.RRMPolicies
285 log.Infof("Post request to update RRMPolicyDedicatedRatio %+v", prMessages)
286 findAndUpdatePolicy(prMessages)
287 respondWithJSON(w, http.StatusOK, map[string]string{"status": "200"})
290 func findAndUpdatePolicy(rRMPolicyRatio []messages.RRMPolicyRatio) {
291 for _, policy := range rRMPolicyRatio {
292 for _, entry := range data {
293 if entry.policyRatioId == policy.Id {
294 log.Infof("update Policy Dedicated Ratio: value for policy %+v\n Old value: %v New value: %v ", policy, entry.policyDedicatedRatio, policy.RRMPolicyDedicatedRatio)
295 entry.policyDedicatedRatio = policy.RRMPolicyDedicatedRatio
296 if entry.metricValue > THRESHOLD_TPUT {
297 entry.metricValue = rand.Intn(THRESHOLD_TPUT)
299 messagesToSend = append(messagesToSend, generateMeasurementEntry(entry))
305 func respondWithError(w http.ResponseWriter, code int, message string) {
306 respondWithJSON(w, code, map[string]string{"error": message})
309 func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
310 response, _ := json.Marshal(payload)
312 w.Header().Set("Content-Type", "application/json")
317 func sendDmaapMRMessages(w http.ResponseWriter, r *http.Request) {
318 log.Info("Send Dmaap messages")
319 entry := data[rand.Intn(5)]
323 maxTput := THRESHOLD_TPUT + 100
324 randomTput := rand.Intn(maxTput-THRESHOLD_TPUT+1) + THRESHOLD_TPUT
325 if randomTput%3 == 0 {
326 log.Info("Using tput value higher than THRESHOLD_TPUT ", randomTput)
327 entry.metricValue = randomTput
329 randomEventId := rand.Intn(10000)
330 messagesToSend = append(messagesToSend, generateMeasurementEntry(entry))
332 message := messages.StdDefinedMessage{
333 Event: messages.Event{
334 CommonEventHeader: messages.CommonEventHeader{
335 Domain: "stndDefined",
336 EventId: "pm-1_16442" + strconv.Itoa(randomEventId),
337 EventName: "stndDefined_performanceMeasurementStreaming",
338 EventType: "performanceMeasurementStreaming",
341 ReportingEntityId: "",
342 ReportingEntityName: "O-DU-1122",
344 SourceName: "O-DU-1122",
345 StartEpochMicrosec: 1644252450000000,
346 LastEpochMicrosec: 1644252480000000,
347 NfNamingCode: "SIM-O-DU",
348 NfVendorName: "O-RAN-SC SIM Project",
349 StndDefinedNamespace: "o-ran-sc-du-hello-world-pm-streaming-oas3",
350 TimeZoneOffset: "+00:00",
352 VesEventListenerVersion: "7.2.1",
354 StndDefinedFields: messages.StndDefinedFields{
355 StndDefinedFieldsVersion: "1.0",
356 SchemaReference: "https://gerrit.o-ran-sc.org/r/gitweb?p=scp/oam/modeling.git;a=blob_plain;f=data-model/oas3/experimental/o-ran-sc-du-hello-world-oas3.json;hb=refs/heads/master",
358 DataId: "pm-1_1644252450",
359 StartTime: "2022-02-07T16:47:30.0Z",
360 AdministrativeState: "unlocked",
361 OperationalState: "enabled",
363 JobTag: "my-job-tag",
364 GranularityPeriod: 30,
365 Measurements: messagesToSend,
371 fmt.Printf("Sending Dmaap message:\n %+v\n", message)
373 messageAsByteArray, _ := json.Marshal(message)
374 response := [1]string{string(messageAsByteArray)}
376 time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
377 respondWithJSON(w, http.StatusOK, response)
382 func generateMeasurementEntry(entry *SliceAssuranceInformation) messages.Measurement {
384 measurementTypeInstanceReference := getMeasTypeInstanceRef(entry)
385 meas := messages.Measurement{
387 MeasurementTypeInstanceReference: measurementTypeInstanceReference,
388 Value: entry.metricValue,
394 func getMeasTypeInstanceRef(entry *SliceAssuranceInformation) string {
395 return "/o-ran-sc-du-hello-world:network-function/distributed-unit-functions[id='" + entry.duId + "']/cell[id='" + entry.cellId + "']/supported-measurements[performance-measurement-type='(urn:o-ran-sc:yang:o-ran-sc-du-hello-world?revision=2021-11-23)" + entry.metricName + "']/supported-snssai-subcounter-instances[slice-differentiator='" + strconv.Itoa(entry.sd) + "'][slice-service-type='" + strconv.Itoa(entry.sst) + "']"
398 func changeMetric(entry *SliceAssuranceInformation) {
401 tmpVal := rand.Intn(max-min) + min
402 log.Infof("Adding %v to cell: %v, diff: %v ", tmpVal, entry.cellId, strconv.Itoa(entry.sd))
403 metric.WithLabelValues(getMeasTypeInstanceRef(entry), "kbit/s").Add(float64(tmpVal))