2 // ========================LICENSE_START=================================
5 // Copyright (C) 2022: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
36 "github.com/gorilla/mux"
37 "oransc.org/usecase/oduclosedloop/icsversion/messages"
39 log "github.com/sirupsen/logrus"
42 const THRESHOLD_TPUT int = 7000
44 type SliceAssuranceInformation struct {
54 policyDedicatedRatio int
57 var data []*SliceAssuranceInformation
58 var messagesToSend []messages.Measurement
62 lines, err := GetCsvFromFile("test-data.csv")
66 for _, line := range lines {
67 sai := SliceAssuranceInformation{
73 metricValue: toInt(line[5]),
74 policyRatioId: line[6],
75 policyMaxRatio: toInt(line[7]),
76 policyMinRatio: toInt(line[8]),
77 policyDedicatedRatio: toInt(line[9]),
79 data = append(data, &sai)
83 func GetCsvFromFile(name string) ([][]string, error) {
84 if csvFile, err := os.Open(name); err == nil {
86 reader := csv.NewReader(csvFile)
87 reader.FieldsPerRecord = -1
88 if csvData, err := reader.ReadAll(); err == nil {
98 func toInt(num string) int {
99 res, err := strconv.Atoi(num)
107 rand.Seed(time.Now().UnixNano())
109 portSdnr := flag.Int("sdnr-port", 3904, "The port this SDNR stub will listen on")
110 producerPort := flag.Int("prod-port", 3905, "The port this Producer mediator will listen on")
115 wg := new(sync.WaitGroup)
121 r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={NODE-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={O-DU-ID}", getSdnrResponseMessage).Methods(http.MethodGet)
122 r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={NODE-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={O-DU-ID}/radio-resource-management-policy-ratio={POLICY-ID}", updateRRMPolicyDedicatedRatio).Methods(http.MethodPut)
124 fmt.Println("Starting SDNR stub on port: ", *portSdnr)
126 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *portSdnr), r))
133 r.HandleFunc("/create/{jobId}", createJobHandler).Methods(http.MethodPut)
134 r.HandleFunc("/delete/{jobId}", deleteJobHandler).Methods(http.MethodDelete)
136 fmt.Println("Producer listening on port: ", *producerPort)
138 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *producerPort), r))
145 func createJobHandler(w http.ResponseWriter, r *http.Request) {
147 id, ok := vars["jobId"]
149 http.Error(w, "No job ID provided", http.StatusBadRequest)
154 fmt.Println("Start pushing messages for job: ", id)
155 go sendDmaapMessages()
158 func deleteJobHandler(w http.ResponseWriter, r *http.Request) {
160 id, ok := vars["jobId"]
162 http.Error(w, "No job ID provided", http.StatusBadRequest)
166 fmt.Println("Stop pushing messages for job: ", id)
170 func getSdnrResponseMessage(w http.ResponseWriter, r *http.Request) {
172 log.Info("Get messages for RRM Policy Ratio information for O-Du ID ", vars["O-DU-ID"])
174 distUnitFunctions := getDistributedUnitFunctionMessage(vars["O-DU-ID"])
176 respondWithJSON(w, http.StatusOK, distUnitFunctions)
179 func getDistributedUnitFunctionMessage(oduId string) messages.ORanDuRestConf {
181 var policies []messages.RRMPolicyRatio
182 for _, entry := range data {
183 message := messages.RRMPolicyRatio{
184 Id: entry.policyRatioId,
186 UserLabel: entry.policyRatioId,
187 RRMPolicyMaxRatio: entry.policyMaxRatio,
188 RRMPolicyMinRatio: entry.policyMinRatio,
189 RRMPolicyDedicatedRatio: entry.policyDedicatedRatio,
191 RRMPolicyMembers: []messages.RRMPolicyMember{
193 MobileCountryCode: "310",
194 MobileNetworkCode: "150",
195 SliceDifferentiator: entry.sd,
196 SliceServiceType: entry.sst,
200 policies = append(policies, message)
203 var publicLandMobileNetworks []messages.PublicLandMobileNetworks
204 for _, entry := range data {
205 publicLandMobileNetwork := messages.PublicLandMobileNetworks{
206 MobileCountryCode: "310",
207 MobileNetworkCode: "150",
208 SliceDifferentiator: entry.sd,
209 SliceServiceType: entry.sst,
211 publicLandMobileNetworks = append(publicLandMobileNetworks, publicLandMobileNetwork)
214 var supportedSnssaiSubcounterInstances []messages.SupportedSnssaiSubcounterInstances
215 for _, entry := range data {
216 supportedSnssaiSubcounterInstance := messages.SupportedSnssaiSubcounterInstances{
217 SliceDifferentiator: entry.sd,
218 SliceServiceType: entry.sst,
220 supportedSnssaiSubcounterInstances = append(supportedSnssaiSubcounterInstances, supportedSnssaiSubcounterInstance)
223 cell := messages.Cell{
227 BaseStationChannelBandwidth: messages.BaseStationChannelBandwidth{
230 SupplementaryUplink: 84000,
232 OperationalState: "enabled",
233 TrackingAreaCode: 10,
234 AdmState: "unlocked",
235 PublicLandMobileNetworks: publicLandMobileNetworks,
236 SupportedMeasurements: []messages.SupportedMeasurements{
238 PerformanceMeasurementType: "o-ran-sc-du-hello-world:user-equipment-average-throughput-uplink",
239 SupportedSnssaiSubcounterInstances: supportedSnssaiSubcounterInstances,
242 PerformanceMeasurementType: "o-ran-sc-du-hello-world:user-equipment-average-throughput-downlink",
243 SupportedSnssaiSubcounterInstances: supportedSnssaiSubcounterInstances,
246 TrafficState: "active",
247 AbsoluteRadioFrequencyChannelNumber: messages.AbsoluteRadioFrequencyChannelNumber{
250 SupplementaryUplink: 14500,
253 SynchronizationSignalBlock: messages.SynchronizationSignalBlock{
255 FrequencyChannelNumber: 12,
257 SubcarrierSpacing: 30,
262 distUnitFunction := messages.DistributedUnitFunction{
264 OperationalState: "enabled",
265 AdmState: "unlocked",
267 Cell: []messages.Cell{
270 RRMPolicyRatio: policies,
273 duRRMPolicyRatio := messages.ORanDuRestConf{
274 DistributedUnitFunction: []messages.DistributedUnitFunction{
279 return duRRMPolicyRatio
282 func updateRRMPolicyDedicatedRatio(w http.ResponseWriter, r *http.Request) {
283 var policies struct {
284 RRMPolicies []messages.RRMPolicyRatio `json:"radio-resource-management-policy-ratio"`
286 decoder := json.NewDecoder(r.Body)
288 if err := decoder.Decode(&policies); err != nil {
289 respondWithError(w, http.StatusBadRequest, "Invalid request payload")
294 prMessages := policies.RRMPolicies
295 log.Infof("Post request to update RRMPolicyDedicatedRatio %+v", prMessages)
296 findAndUpdatePolicy(prMessages)
297 respondWithJSON(w, http.StatusOK, map[string]string{"status": "200"})
300 func findAndUpdatePolicy(rRMPolicyRatio []messages.RRMPolicyRatio) {
301 for _, policy := range rRMPolicyRatio {
302 for _, entry := range data {
303 if entry.policyRatioId == policy.Id {
304 log.Infof("update Policy Dedicated Ratio: value for policy %+v\n Old value: %v New value: %v ", policy, entry.policyDedicatedRatio, policy.RRMPolicyDedicatedRatio)
305 entry.policyDedicatedRatio = policy.RRMPolicyDedicatedRatio
306 if entry.metricValue > THRESHOLD_TPUT {
307 entry.metricValue = rand.Intn(THRESHOLD_TPUT)
309 messagesToSend = append(messagesToSend, generateMeasurementEntry(entry))
315 func respondWithError(w http.ResponseWriter, code int, message string) {
316 respondWithJSON(w, code, map[string]string{"error": message})
319 func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
320 response, _ := json.Marshal(payload)
322 w.Header().Set("Content-Type", "application/json")
327 func sendDmaapMessages() {
329 client := &http.Client{
330 Timeout: 10 * time.Second,
333 log.Info("Send Dmaap messages")
334 for range time.Tick(10 * time.Second) {
338 m, _ := json.Marshal(generateStdMessage())
339 msgToSend, _ := json.Marshal([]string{string(m)})
341 time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
343 odu_addr := getEnv("ODU_ADDR", "http://consumer-sa:8095")
344 req, _ := http.NewRequest(http.MethodPost, odu_addr, bytes.NewBuffer(msgToSend))
345 req.Header.Set("Content-Type", "application/json; charset=utf-8")
347 _, err := client.Do(req)
349 fmt.Println("Error sending to consumer: ", err)
351 fmt.Println("Sent message to consumer!")
355 func getEnv(key string, defaultVal string) string {
356 if value, exists := os.LookupEnv(key); exists {
363 func generateStdMessage() messages.StdDefinedMessage {
364 entry := data[rand.Intn(5)]
366 maxTput := THRESHOLD_TPUT + 100
367 randomTput := rand.Intn(maxTput-THRESHOLD_TPUT+1) + THRESHOLD_TPUT
368 if randomTput%3 == 0 {
369 log.Info("Using tput value higher than THRESHOLD_TPUT ", randomTput)
370 entry.metricValue = randomTput
373 messagesToSend = append(messagesToSend, generateMeasurementEntry(entry))
375 message := messages.StdDefinedMessage{
376 Event: messages.Event{
377 CommonEventHeader: messages.CommonEventHeader{
378 Domain: "stndDefined",
379 EventId: "pm-1_1644252450",
380 EventName: "stndDefined_performanceMeasurementStreaming",
381 EventType: "performanceMeasurementStreaming",
384 ReportingEntityId: "",
385 ReportingEntityName: "O-DU-1122",
387 SourceName: "O-DU-1122",
388 StartEpochMicrosec: 1644252450000000,
389 LastEpochMicrosec: 1644252480000000,
390 NfNamingCode: "SIM-O-DU",
391 NfVendorName: "O-RAN-SC SIM Project",
392 StndDefinedNamespace: "o-ran-sc-du-hello-world-pm-streaming-oas3",
393 TimeZoneOffset: "+00:00",
395 VesEventListenerVersion: "7.2.1",
397 StndDefinedFields: messages.StndDefinedFields{
398 StndDefinedFieldsVersion: "1.0",
399 SchemaReference: "https://gerrit.o-ran-sc.org/r/gitweb?p=scp/oam/modeling.git;a=blob_plain;f=data-model/oas3/experimental/o-ran-sc-du-hello-world-oas3.json;hb=refs/heads/master",
401 DataId: "pm-1_1644252450",
402 StartTime: "2022-02-07T16:47:30.0Z",
403 AdministrativeState: "unlocked",
404 OperationalState: "enabled",
406 JobTag: "my-job-tag",
407 GranularityPeriod: 30,
408 Measurements: messagesToSend,
414 fmt.Printf("Sending Dmaap message:\n %+v\n", message)
418 func generateMeasurementEntry(entry *SliceAssuranceInformation) messages.Measurement {
420 measurementTypeInstanceReference := "/o-ran-sc-du-hello-world:network-function/distributed-unit-functions[id='" + entry.duId + "']/cell[id='" + entry.cellId + "']/supported-measurements[performance-measurement-type='(urn:o-ran-sc:yang:o-ran-sc-du-hello-world?revision=2021-11-23)" + entry.metricName + "']/supported-snssai-subcounter-instances[slice-differentiator='" + strconv.Itoa(entry.sd) + "'][slice-service-type='" + strconv.Itoa(entry.sst) + "']"
421 meas := messages.Measurement{
423 MeasurementTypeInstanceReference: measurementTypeInstanceReference,
424 Value: entry.metricValue,