2 // ========================LICENSE_START=================================
5 // Copyright (C) 2022: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
36 "github.com/gorilla/mux"
37 "oransc.org/usecase/oduclosedloop/icsversion/messages"
39 log "github.com/sirupsen/logrus"
42 const THRESHOLD_TPUT int = 7000
44 type SliceAssuranceInformation struct {
54 policyDedicatedRatio int
57 var data []*SliceAssuranceInformation
58 var messagesToSend []messages.Measurement
62 lines, err := GetCsvFromFile("test-data.csv")
66 for _, line := range lines {
67 sai := SliceAssuranceInformation{
73 metricValue: toInt(line[5]),
74 policyRatioId: line[6],
75 policyMaxRatio: toInt(line[7]),
76 policyMinRatio: toInt(line[8]),
77 policyDedicatedRatio: toInt(line[9]),
79 data = append(data, &sai)
83 func GetCsvFromFile(name string) ([][]string, error) {
84 if csvFile, err := os.Open(name); err == nil {
86 reader := csv.NewReader(csvFile)
87 reader.FieldsPerRecord = -1
88 if csvData, err := reader.ReadAll(); err == nil {
98 func toInt(num string) int {
99 res, err := strconv.Atoi(num)
107 rand.Seed(time.Now().UnixNano())
109 portSdnr := flag.Int("sdnr-port", 3904, "The port this SDNR stub will listen on")
110 dmaapProducerPort := flag.Int("dmaap-port", 3905, "The port this Dmaap mediator will listen on")
115 wg := new(sync.WaitGroup)
121 r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={NODE-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={O-DU-ID}", getSdnrResponseMessage).Methods(http.MethodGet)
122 r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={NODE-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={O-DU-ID}/radio-resource-management-policy-ratio={POLICY-ID}", updateRRMPolicyDedicatedRatio).Methods(http.MethodPut)
124 fmt.Println("Starting SDNR stub on port: ", *portSdnr)
126 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *portSdnr), r))
133 r.HandleFunc("/create/{jobId}", createJobHandler).Methods(http.MethodPut)
134 r.HandleFunc("/delete/{jobId}", deleteJobHandler).Methods(http.MethodDelete)
136 fmt.Println("Producer listening on port: ", *dmaapProducerPort)
138 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *dmaapProducerPort), r))
145 func createJobHandler(w http.ResponseWriter, r *http.Request) {
146 fmt.Println("createJobHandler:: ", r)
148 id, ok := vars["jobId"]
150 http.Error(w, "No job ID provided", http.StatusBadRequest)
155 fmt.Println("Start pushing messages for job: ", id)
156 go sendDmaapMessages()
159 func deleteJobHandler(w http.ResponseWriter, r *http.Request) {
161 id, ok := vars["jobId"]
163 http.Error(w, "No job ID provided", http.StatusBadRequest)
167 fmt.Println("Stop pushing messages for job: ", id)
171 func getSdnrResponseMessage(w http.ResponseWriter, r *http.Request) {
173 log.Info("Get messages for RRM Policy Ratio information for O-Du ID ", vars["O-DU-ID"])
175 distUnitFunctions := getDistributedUnitFunctionMessage(vars["O-DU-ID"])
177 respondWithJSON(w, http.StatusOK, distUnitFunctions)
180 func getDistributedUnitFunctionMessage(oduId string) messages.ORanDuRestConf {
182 var policies []messages.RRMPolicyRatio
183 for _, entry := range data {
184 message := messages.RRMPolicyRatio{
185 Id: entry.policyRatioId,
187 UserLabel: entry.policyRatioId,
188 RRMPolicyMaxRatio: entry.policyMaxRatio,
189 RRMPolicyMinRatio: entry.policyMinRatio,
190 RRMPolicyDedicatedRatio: entry.policyDedicatedRatio,
192 RRMPolicyMembers: []messages.RRMPolicyMember{
194 MobileCountryCode: "310",
195 MobileNetworkCode: "150",
196 SliceDifferentiator: entry.sd,
197 SliceServiceType: entry.sst,
201 policies = append(policies, message)
204 var publicLandMobileNetworks []messages.PublicLandMobileNetworks
205 for _, entry := range data {
206 publicLandMobileNetwork := messages.PublicLandMobileNetworks{
207 MobileCountryCode: "310",
208 MobileNetworkCode: "150",
209 SliceDifferentiator: entry.sd,
210 SliceServiceType: entry.sst,
212 publicLandMobileNetworks = append(publicLandMobileNetworks, publicLandMobileNetwork)
215 var supportedSnssaiSubcounterInstances []messages.SupportedSnssaiSubcounterInstances
216 for _, entry := range data {
217 supportedSnssaiSubcounterInstance := messages.SupportedSnssaiSubcounterInstances{
218 SliceDifferentiator: entry.sd,
219 SliceServiceType: entry.sst,
221 supportedSnssaiSubcounterInstances = append(supportedSnssaiSubcounterInstances, supportedSnssaiSubcounterInstance)
224 cell := messages.Cell{
228 BaseStationChannelBandwidth: messages.BaseStationChannelBandwidth{
231 SupplementaryUplink: 84000,
233 OperationalState: "enabled",
234 TrackingAreaCode: 10,
235 AdmState: "unlocked",
236 PublicLandMobileNetworks: publicLandMobileNetworks,
237 SupportedMeasurements: []messages.SupportedMeasurements{
239 PerformanceMeasurementType: "o-ran-sc-du-hello-world:user-equipment-average-throughput-uplink",
240 SupportedSnssaiSubcounterInstances: supportedSnssaiSubcounterInstances,
243 PerformanceMeasurementType: "o-ran-sc-du-hello-world:user-equipment-average-throughput-downlink",
244 SupportedSnssaiSubcounterInstances: supportedSnssaiSubcounterInstances,
247 TrafficState: "active",
248 AbsoluteRadioFrequencyChannelNumber: messages.AbsoluteRadioFrequencyChannelNumber{
251 SupplementaryUplink: 14500,
254 SynchronizationSignalBlock: messages.SynchronizationSignalBlock{
256 FrequencyChannelNumber: 12,
258 SubcarrierSpacing: 30,
263 distUnitFunction := messages.DistributedUnitFunction{
265 OperationalState: "enabled",
266 AdmState: "unlocked",
268 Cell: []messages.Cell{
271 RRMPolicyRatio: policies,
274 duRRMPolicyRatio := messages.ORanDuRestConf{
275 DistributedUnitFunction: []messages.DistributedUnitFunction{
280 return duRRMPolicyRatio
283 func updateRRMPolicyDedicatedRatio(w http.ResponseWriter, r *http.Request) {
284 var policies struct {
285 RRMPolicies []messages.RRMPolicyRatio `json:"radio-resource-management-policy-ratio"`
287 decoder := json.NewDecoder(r.Body)
289 if err := decoder.Decode(&policies); err != nil {
290 respondWithError(w, http.StatusBadRequest, "Invalid request payload")
295 prMessages := policies.RRMPolicies
296 log.Infof("Post request to update RRMPolicyDedicatedRatio %+v", prMessages)
297 findAndUpdatePolicy(prMessages)
298 respondWithJSON(w, http.StatusOK, map[string]string{"status": "200"})
301 func findAndUpdatePolicy(rRMPolicyRatio []messages.RRMPolicyRatio) {
302 for _, policy := range rRMPolicyRatio {
303 for _, entry := range data {
304 if entry.policyRatioId == policy.Id {
305 log.Infof("update Policy Dedicated Ratio: value for policy %+v\n Old value: %v New value: %v ", policy, entry.policyDedicatedRatio, policy.RRMPolicyDedicatedRatio)
306 entry.policyDedicatedRatio = policy.RRMPolicyDedicatedRatio
307 if entry.metricValue > THRESHOLD_TPUT {
308 entry.metricValue = rand.Intn(THRESHOLD_TPUT)
310 messagesToSend = append(messagesToSend, generateMeasurementEntry(entry))
316 func respondWithError(w http.ResponseWriter, code int, message string) {
317 respondWithJSON(w, code, map[string]string{"error": message})
320 func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
321 response, _ := json.Marshal(payload)
323 w.Header().Set("Content-Type", "application/json")
328 func sendDmaapMessages() {
330 client := &http.Client{
331 Timeout: 10 * time.Second,
334 log.Info("Send Dmaap messages")
335 for range time.Tick(10 * time.Second) {
339 m, _ := json.Marshal(generateStdMessage())
340 msgToSend, _ := json.Marshal([]string{string(m)})
342 time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
344 oru_addr := getEnv("ORU_ADDR", "http://localhost:8095")
345 req, _ := http.NewRequest(http.MethodPost, oru_addr, bytes.NewBuffer(msgToSend))
346 req.Header.Set("Content-Type", "application/json; charset=utf-8")
348 _, err := client.Do(req)
350 fmt.Println("Error sending to consumer: ", err)
352 fmt.Println("Sent message to consumer!")
356 func getEnv(key string, defaultVal string) string {
357 if value, exists := os.LookupEnv(key); exists {
364 func generateStdMessage() messages.StdDefinedMessage {
365 entry := data[rand.Intn(5)]
367 maxTput := THRESHOLD_TPUT + 100
368 randomTput := rand.Intn(maxTput-THRESHOLD_TPUT+1) + THRESHOLD_TPUT
369 if randomTput%3 == 0 {
370 log.Info("Using tput value higher than THRESHOLD_TPUT ", randomTput)
371 entry.metricValue = randomTput
374 messagesToSend = append(messagesToSend, generateMeasurementEntry(entry))
376 message := messages.StdDefinedMessage{
377 Event: messages.Event{
378 CommonEventHeader: messages.CommonEventHeader{
379 Domain: "stndDefined",
380 EventId: "pm-1_1644252450",
381 EventName: "stndDefined_performanceMeasurementStreaming",
382 EventType: "performanceMeasurementStreaming",
385 ReportingEntityId: "",
386 ReportingEntityName: "O-DU-1122",
388 SourceName: "O-DU-1122",
389 StartEpochMicrosec: 1644252450000000,
390 LastEpochMicrosec: 1644252480000000,
391 NfNamingCode: "SIM-O-DU",
392 NfVendorName: "O-RAN-SC SIM Project",
393 StndDefinedNamespace: "o-ran-sc-du-hello-world-pm-streaming-oas3",
394 TimeZoneOffset: "+00:00",
396 VesEventListenerVersion: "7.2.1",
398 StndDefinedFields: messages.StndDefinedFields{
399 StndDefinedFieldsVersion: "1.0",
400 SchemaReference: "https://gerrit.o-ran-sc.org/r/gitweb?p=scp/oam/modeling.git;a=blob_plain;f=data-model/oas3/experimental/o-ran-sc-du-hello-world-oas3.json;hb=refs/heads/master",
402 DataId: "pm-1_1644252450",
403 StartTime: "2022-02-07T16:47:30.0Z",
404 AdministrativeState: "unlocked",
405 OperationalState: "enabled",
407 JobTag: "my-job-tag",
408 GranularityPeriod: 30,
409 Measurements: messagesToSend,
415 fmt.Printf("Sending Dmaap message:\n %+v\n", message)
419 func generateMeasurementEntry(entry *SliceAssuranceInformation) messages.Measurement {
421 measurementTypeInstanceReference := "/o-ran-sc-du-hello-world:network-function/distributed-unit-functions[id='" + entry.duId + "']/cell[id='" + entry.cellId + "']/supported-measurements[performance-measurement-type='(urn:o-ran-sc:yang:o-ran-sc-du-hello-world?revision=2021-11-23)" + entry.metricName + "']/supported-snssai-subcounter-instances[slice-differentiator='" + strconv.Itoa(entry.sd) + "'][slice-service-type='" + strconv.Itoa(entry.sst) + "']"
422 meas := messages.Measurement{
424 MeasurementTypeInstanceReference: measurementTypeInstanceReference,
425 Value: entry.metricValue,