Consumer O-DU slice assurance rApp
[nonrtric/rapp/ransliceassurance.git] / icsversion / stub / prodSdnc / prodSdncStub.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2022: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package main
22
23 import (
24         "bytes"
25         "encoding/csv"
26         "encoding/json"
27         "flag"
28         "fmt"
29         "math/rand"
30         "net/http"
31         "os"
32         "strconv"
33         "sync"
34         "time"
35
36         "github.com/gorilla/mux"
37         "oransc.org/usecase/oduclosedloop/icsversion/messages"
38
39         log "github.com/sirupsen/logrus"
40 )
41
42 const THRESHOLD_TPUT int = 7000
43
44 type SliceAssuranceInformation struct {
45         duId                 string
46         cellId               string
47         sd                   int
48         sst                  int
49         metricName           string
50         metricValue          int
51         policyRatioId        string
52         policyMaxRatio       int
53         policyMinRatio       int
54         policyDedicatedRatio int
55 }
56
57 var data []*SliceAssuranceInformation
58 var messagesToSend []messages.Measurement
59 var started bool
60
61 func loadData() {
62         lines, err := GetCsvFromFile("test-data.csv")
63         if err != nil {
64                 panic(err)
65         }
66         for _, line := range lines {
67                 sai := SliceAssuranceInformation{
68                         duId:                 line[0],
69                         cellId:               line[1],
70                         sd:                   toInt(line[2]),
71                         sst:                  toInt(line[3]),
72                         metricName:           line[4],
73                         metricValue:          toInt(line[5]),
74                         policyRatioId:        line[6],
75                         policyMaxRatio:       toInt(line[7]),
76                         policyMinRatio:       toInt(line[8]),
77                         policyDedicatedRatio: toInt(line[9]),
78                 }
79                 data = append(data, &sai)
80         }
81 }
82
83 func GetCsvFromFile(name string) ([][]string, error) {
84         if csvFile, err := os.Open(name); err == nil {
85                 defer csvFile.Close()
86                 reader := csv.NewReader(csvFile)
87                 reader.FieldsPerRecord = -1
88                 if csvData, err := reader.ReadAll(); err == nil {
89                         return csvData, nil
90                 } else {
91                         return nil, err
92                 }
93         } else {
94                 return nil, err
95         }
96 }
97
98 func toInt(num string) int {
99         res, err := strconv.Atoi(num)
100         if err != nil {
101                 return -1
102         }
103         return res
104 }
105
106 func main() {
107         rand.Seed(time.Now().UnixNano())
108
109         portSdnr := flag.Int("sdnr-port", 3904, "The port this SDNR stub will listen on")
110         producerPort := flag.Int("prod-port", 3905, "The port this Producer mediator will listen on")
111         flag.Parse()
112
113         loadData()
114
115         wg := new(sync.WaitGroup)
116         wg.Add(2)
117
118         go func() {
119
120                 r := mux.NewRouter()
121                 r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={NODE-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={O-DU-ID}", getSdnrResponseMessage).Methods(http.MethodGet)
122                 r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={NODE-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={O-DU-ID}/radio-resource-management-policy-ratio={POLICY-ID}", updateRRMPolicyDedicatedRatio).Methods(http.MethodPut)
123
124                 fmt.Println("Starting SDNR stub on port: ", *portSdnr)
125
126                 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *portSdnr), r))
127                 wg.Done()
128         }()
129
130         go func() {
131
132                 r := mux.NewRouter()
133                 r.HandleFunc("/create/{jobId}", createJobHandler).Methods(http.MethodPut)
134                 r.HandleFunc("/delete/{jobId}", deleteJobHandler).Methods(http.MethodDelete)
135
136                 fmt.Println("Producer listening on port: ", *producerPort)
137
138                 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *producerPort), r))
139                 wg.Done()
140         }()
141
142         wg.Wait()
143 }
144
145 func createJobHandler(w http.ResponseWriter, r *http.Request) {
146         vars := mux.Vars(r)
147         id, ok := vars["jobId"]
148         if !ok {
149                 http.Error(w, "No job ID provided", http.StatusBadRequest)
150                 return
151         }
152
153         started = true
154         fmt.Println("Start pushing messages for job: ", id)
155         go sendDmaapMessages()
156 }
157
158 func deleteJobHandler(w http.ResponseWriter, r *http.Request) {
159         vars := mux.Vars(r)
160         id, ok := vars["jobId"]
161         if !ok {
162                 http.Error(w, "No job ID provided", http.StatusBadRequest)
163                 return
164         }
165
166         fmt.Println("Stop pushing messages for job: ", id)
167         started = false
168 }
169
170 func getSdnrResponseMessage(w http.ResponseWriter, r *http.Request) {
171         vars := mux.Vars(r)
172         log.Info("Get messages for RRM Policy Ratio information for O-Du ID ", vars["O-DU-ID"])
173
174         distUnitFunctions := getDistributedUnitFunctionMessage(vars["O-DU-ID"])
175
176         respondWithJSON(w, http.StatusOK, distUnitFunctions)
177 }
178
179 func getDistributedUnitFunctionMessage(oduId string) messages.ORanDuRestConf {
180
181         var policies []messages.RRMPolicyRatio
182         for _, entry := range data {
183                 message := messages.RRMPolicyRatio{
184                         Id:                      entry.policyRatioId,
185                         AdmState:                "locked",
186                         UserLabel:               entry.policyRatioId,
187                         RRMPolicyMaxRatio:       entry.policyMaxRatio,
188                         RRMPolicyMinRatio:       entry.policyMinRatio,
189                         RRMPolicyDedicatedRatio: entry.policyDedicatedRatio,
190                         ResourceType:            "prb",
191                         RRMPolicyMembers: []messages.RRMPolicyMember{
192                                 {
193                                         MobileCountryCode:   "310",
194                                         MobileNetworkCode:   "150",
195                                         SliceDifferentiator: entry.sd,
196                                         SliceServiceType:    entry.sst,
197                                 },
198                         },
199                 }
200                 policies = append(policies, message)
201         }
202
203         var publicLandMobileNetworks []messages.PublicLandMobileNetworks
204         for _, entry := range data {
205                 publicLandMobileNetwork := messages.PublicLandMobileNetworks{
206                         MobileCountryCode:   "310",
207                         MobileNetworkCode:   "150",
208                         SliceDifferentiator: entry.sd,
209                         SliceServiceType:    entry.sst,
210                 }
211                 publicLandMobileNetworks = append(publicLandMobileNetworks, publicLandMobileNetwork)
212         }
213
214         var supportedSnssaiSubcounterInstances []messages.SupportedSnssaiSubcounterInstances
215         for _, entry := range data {
216                 supportedSnssaiSubcounterInstance := messages.SupportedSnssaiSubcounterInstances{
217                         SliceDifferentiator: entry.sd,
218                         SliceServiceType:    entry.sst,
219                 }
220                 supportedSnssaiSubcounterInstances = append(supportedSnssaiSubcounterInstances, supportedSnssaiSubcounterInstance)
221         }
222
223         cell := messages.Cell{
224                 Id:             "cell-1",
225                 LocalId:        1,
226                 PhysicalCellId: 1,
227                 BaseStationChannelBandwidth: messages.BaseStationChannelBandwidth{
228                         Uplink:              83000,
229                         Downlink:            80000,
230                         SupplementaryUplink: 84000,
231                 },
232                 OperationalState:         "enabled",
233                 TrackingAreaCode:         10,
234                 AdmState:                 "unlocked",
235                 PublicLandMobileNetworks: publicLandMobileNetworks,
236                 SupportedMeasurements: []messages.SupportedMeasurements{
237                         {
238                                 PerformanceMeasurementType:         "o-ran-sc-du-hello-world:user-equipment-average-throughput-uplink",
239                                 SupportedSnssaiSubcounterInstances: supportedSnssaiSubcounterInstances,
240                         },
241                         {
242                                 PerformanceMeasurementType:         "o-ran-sc-du-hello-world:user-equipment-average-throughput-downlink",
243                                 SupportedSnssaiSubcounterInstances: supportedSnssaiSubcounterInstances,
244                         },
245                 },
246                 TrafficState: "active",
247                 AbsoluteRadioFrequencyChannelNumber: messages.AbsoluteRadioFrequencyChannelNumber{
248                         Uplink:              14000,
249                         Downlink:            15000,
250                         SupplementaryUplink: 14500,
251                 },
252                 UserLabel: "cell-1",
253                 SynchronizationSignalBlock: messages.SynchronizationSignalBlock{
254                         Duration:               2,
255                         FrequencyChannelNumber: 12,
256                         Periodicity:            10,
257                         SubcarrierSpacing:      30,
258                         Offset:                 3,
259                 },
260         }
261
262         distUnitFunction := messages.DistributedUnitFunction{
263                 Id:               oduId,
264                 OperationalState: "enabled",
265                 AdmState:         "unlocked",
266                 UserLabel:        oduId,
267                 Cell: []messages.Cell{
268                         cell,
269                 },
270                 RRMPolicyRatio: policies,
271         }
272
273         duRRMPolicyRatio := messages.ORanDuRestConf{
274                 DistributedUnitFunction: []messages.DistributedUnitFunction{
275                         distUnitFunction,
276                 },
277         }
278
279         return duRRMPolicyRatio
280 }
281
282 func updateRRMPolicyDedicatedRatio(w http.ResponseWriter, r *http.Request) {
283         var policies struct {
284                 RRMPolicies []messages.RRMPolicyRatio `json:"radio-resource-management-policy-ratio"`
285         }
286         decoder := json.NewDecoder(r.Body)
287
288         if err := decoder.Decode(&policies); err != nil {
289                 respondWithError(w, http.StatusBadRequest, "Invalid request payload")
290                 return
291         }
292         defer r.Body.Close()
293
294         prMessages := policies.RRMPolicies
295         log.Infof("Post request to update RRMPolicyDedicatedRatio %+v", prMessages)
296         findAndUpdatePolicy(prMessages)
297         respondWithJSON(w, http.StatusOK, map[string]string{"status": "200"})
298 }
299
300 func findAndUpdatePolicy(rRMPolicyRatio []messages.RRMPolicyRatio) {
301         for _, policy := range rRMPolicyRatio {
302                 for _, entry := range data {
303                         if entry.policyRatioId == policy.Id {
304                                 log.Infof("update Policy Dedicated Ratio: value for policy %+v\n Old value: %v New value: %v ", policy, entry.policyDedicatedRatio, policy.RRMPolicyDedicatedRatio)
305                                 entry.policyDedicatedRatio = policy.RRMPolicyDedicatedRatio
306                                 if entry.metricValue > THRESHOLD_TPUT {
307                                         entry.metricValue = rand.Intn(THRESHOLD_TPUT)
308                                 }
309                                 messagesToSend = append(messagesToSend, generateMeasurementEntry(entry))
310                         }
311                 }
312         }
313 }
314
315 func respondWithError(w http.ResponseWriter, code int, message string) {
316         respondWithJSON(w, code, map[string]string{"error": message})
317 }
318
319 func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
320         response, _ := json.Marshal(payload)
321
322         w.Header().Set("Content-Type", "application/json")
323         w.WriteHeader(code)
324         w.Write(response)
325 }
326
327 func sendDmaapMessages() {
328
329         client := &http.Client{
330                 Timeout: 10 * time.Second,
331         }
332
333         log.Info("Send Dmaap messages")
334         for range time.Tick(10 * time.Second) {
335                 if !started {
336                         break
337                 }
338                 m, _ := json.Marshal(generateStdMessage())
339                 msgToSend, _ := json.Marshal([]string{string(m)})
340
341                 time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
342
343                 odu_addr := getEnv("ODU_ADDR", "http://consumer-sa:8095")
344                 req, _ := http.NewRequest(http.MethodPost, odu_addr, bytes.NewBuffer(msgToSend))
345                 req.Header.Set("Content-Type", "application/json; charset=utf-8")
346
347                 _, err := client.Do(req)
348                 if err != nil {
349                         fmt.Println("Error sending to consumer: ", err)
350                 }
351                 fmt.Println("Sent message to consumer!")
352         }
353 }
354
355 func getEnv(key string, defaultVal string) string {
356         if value, exists := os.LookupEnv(key); exists {
357                 return value
358         }
359
360         return defaultVal
361 }
362
363 func generateStdMessage() messages.StdDefinedMessage {
364         entry := data[rand.Intn(5)]
365
366         maxTput := THRESHOLD_TPUT + 100
367         randomTput := rand.Intn(maxTput-THRESHOLD_TPUT+1) + THRESHOLD_TPUT
368         if randomTput%3 == 0 {
369                 log.Info("Using tput value higher than THRESHOLD_TPUT ", randomTput)
370                 entry.metricValue = randomTput
371         }
372
373         messagesToSend = append(messagesToSend, generateMeasurementEntry(entry))
374
375         message := messages.StdDefinedMessage{
376                 Event: messages.Event{
377                         CommonEventHeader: messages.CommonEventHeader{
378                                 Domain:                  "stndDefined",
379                                 EventId:                 "pm-1_1644252450",
380                                 EventName:               "stndDefined_performanceMeasurementStreaming",
381                                 EventType:               "performanceMeasurementStreaming",
382                                 Sequence:                825,
383                                 Priority:                "Low",
384                                 ReportingEntityId:       "",
385                                 ReportingEntityName:     "O-DU-1122",
386                                 SourceId:                "",
387                                 SourceName:              "O-DU-1122",
388                                 StartEpochMicrosec:      1644252450000000,
389                                 LastEpochMicrosec:       1644252480000000,
390                                 NfNamingCode:            "SIM-O-DU",
391                                 NfVendorName:            "O-RAN-SC SIM Project",
392                                 StndDefinedNamespace:    "o-ran-sc-du-hello-world-pm-streaming-oas3",
393                                 TimeZoneOffset:          "+00:00",
394                                 Version:                 "4.1",
395                                 VesEventListenerVersion: "7.2.1",
396                         },
397                         StndDefinedFields: messages.StndDefinedFields{
398                                 StndDefinedFieldsVersion: "1.0",
399                                 SchemaReference:          "https://gerrit.o-ran-sc.org/r/gitweb?p=scp/oam/modeling.git;a=blob_plain;f=data-model/oas3/experimental/o-ran-sc-du-hello-world-oas3.json;hb=refs/heads/master",
400                                 Data: messages.Data{
401                                         DataId:              "pm-1_1644252450",
402                                         StartTime:           "2022-02-07T16:47:30.0Z",
403                                         AdministrativeState: "unlocked",
404                                         OperationalState:    "enabled",
405                                         UserLabel:           "pm",
406                                         JobTag:              "my-job-tag",
407                                         GranularityPeriod:   30,
408                                         Measurements:        messagesToSend,
409                                 },
410                         },
411                 },
412         }
413         messagesToSend = nil
414         fmt.Printf("Sending Dmaap message:\n %+v\n", message)
415         return message
416 }
417
418 func generateMeasurementEntry(entry *SliceAssuranceInformation) messages.Measurement {
419
420         measurementTypeInstanceReference := "/o-ran-sc-du-hello-world:network-function/distributed-unit-functions[id='" + entry.duId + "']/cell[id='" + entry.cellId + "']/supported-measurements[performance-measurement-type='(urn:o-ran-sc:yang:o-ran-sc-du-hello-world?revision=2021-11-23)" + entry.metricName + "']/supported-snssai-subcounter-instances[slice-differentiator='" + strconv.Itoa(entry.sd) + "'][slice-service-type='" + strconv.Itoa(entry.sst) + "']"
421         meas := messages.Measurement{
422
423                 MeasurementTypeInstanceReference: measurementTypeInstanceReference,
424                 Value:                            entry.metricValue,
425                 Unit:                             "kbit/s",
426         }
427         return meas
428 }