Consumer O-DU slice assurance rApp
[nonrtric/rapp/ransliceassurance.git] / icsversion / stub / DmaapSdnc / dmaapSdncStub.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2022: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package main
22
23 import (
24         "bytes"
25         "encoding/csv"
26         "encoding/json"
27         "flag"
28         "fmt"
29         "math/rand"
30         "net/http"
31         "os"
32         "strconv"
33         "sync"
34         "time"
35
36         "github.com/gorilla/mux"
37         "oransc.org/usecase/oduclosedloop/icsversion/messages"
38
39         log "github.com/sirupsen/logrus"
40 )
41
42 const THRESHOLD_TPUT int = 7000
43
44 type SliceAssuranceInformation struct {
45         duId                 string
46         cellId               string
47         sd                   int
48         sst                  int
49         metricName           string
50         metricValue          int
51         policyRatioId        string
52         policyMaxRatio       int
53         policyMinRatio       int
54         policyDedicatedRatio int
55 }
56
57 var data []*SliceAssuranceInformation
58 var messagesToSend []messages.Measurement
59 var started bool
60
61 func loadData() {
62         lines, err := GetCsvFromFile("test-data.csv")
63         if err != nil {
64                 panic(err)
65         }
66         for _, line := range lines {
67                 sai := SliceAssuranceInformation{
68                         duId:                 line[0],
69                         cellId:               line[1],
70                         sd:                   toInt(line[2]),
71                         sst:                  toInt(line[3]),
72                         metricName:           line[4],
73                         metricValue:          toInt(line[5]),
74                         policyRatioId:        line[6],
75                         policyMaxRatio:       toInt(line[7]),
76                         policyMinRatio:       toInt(line[8]),
77                         policyDedicatedRatio: toInt(line[9]),
78                 }
79                 data = append(data, &sai)
80         }
81 }
82
83 func GetCsvFromFile(name string) ([][]string, error) {
84         if csvFile, err := os.Open(name); err == nil {
85                 defer csvFile.Close()
86                 reader := csv.NewReader(csvFile)
87                 reader.FieldsPerRecord = -1
88                 if csvData, err := reader.ReadAll(); err == nil {
89                         return csvData, nil
90                 } else {
91                         return nil, err
92                 }
93         } else {
94                 return nil, err
95         }
96 }
97
98 func toInt(num string) int {
99         res, err := strconv.Atoi(num)
100         if err != nil {
101                 return -1
102         }
103         return res
104 }
105
106 func main() {
107         rand.Seed(time.Now().UnixNano())
108
109         portSdnr := flag.Int("sdnr-port", 3904, "The port this SDNR stub will listen on")
110         dmaapProducerPort := flag.Int("dmaap-port", 3905, "The port this Dmaap mediator will listen on")
111         flag.Parse()
112
113         loadData()
114
115         wg := new(sync.WaitGroup)
116         wg.Add(2)
117
118         go func() {
119
120                 r := mux.NewRouter()
121                 r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={NODE-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={O-DU-ID}", getSdnrResponseMessage).Methods(http.MethodGet)
122                 r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={NODE-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/distributed-unit-functions={O-DU-ID}/radio-resource-management-policy-ratio={POLICY-ID}", updateRRMPolicyDedicatedRatio).Methods(http.MethodPut)
123
124                 fmt.Println("Starting SDNR stub on port: ", *portSdnr)
125
126                 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *portSdnr), r))
127                 wg.Done()
128         }()
129
130         go func() {
131
132                 r := mux.NewRouter()
133                 r.HandleFunc("/create/{jobId}", createJobHandler).Methods(http.MethodPut)
134                 r.HandleFunc("/delete/{jobId}", deleteJobHandler).Methods(http.MethodDelete)
135
136                 fmt.Println("Producer listening on port: ", *dmaapProducerPort)
137
138                 log.Fatal(http.ListenAndServe(fmt.Sprintf(":%v", *dmaapProducerPort), r))
139                 wg.Done()
140         }()
141
142         wg.Wait()
143 }
144
145 func createJobHandler(w http.ResponseWriter, r *http.Request) {
146         fmt.Println("createJobHandler::  ", r)
147         vars := mux.Vars(r)
148         id, ok := vars["jobId"]
149         if !ok {
150                 http.Error(w, "No job ID provided", http.StatusBadRequest)
151                 return
152         }
153
154         started = true
155         fmt.Println("Start pushing messages for job: ", id)
156         go sendDmaapMessages()
157 }
158
159 func deleteJobHandler(w http.ResponseWriter, r *http.Request) {
160         vars := mux.Vars(r)
161         id, ok := vars["jobId"]
162         if !ok {
163                 http.Error(w, "No job ID provided", http.StatusBadRequest)
164                 return
165         }
166
167         fmt.Println("Stop pushing messages for job: ", id)
168         started = false
169 }
170
171 func getSdnrResponseMessage(w http.ResponseWriter, r *http.Request) {
172         vars := mux.Vars(r)
173         log.Info("Get messages for RRM Policy Ratio information for O-Du ID ", vars["O-DU-ID"])
174
175         distUnitFunctions := getDistributedUnitFunctionMessage(vars["O-DU-ID"])
176
177         respondWithJSON(w, http.StatusOK, distUnitFunctions)
178 }
179
180 func getDistributedUnitFunctionMessage(oduId string) messages.ORanDuRestConf {
181
182         var policies []messages.RRMPolicyRatio
183         for _, entry := range data {
184                 message := messages.RRMPolicyRatio{
185                         Id:                      entry.policyRatioId,
186                         AdmState:                "locked",
187                         UserLabel:               entry.policyRatioId,
188                         RRMPolicyMaxRatio:       entry.policyMaxRatio,
189                         RRMPolicyMinRatio:       entry.policyMinRatio,
190                         RRMPolicyDedicatedRatio: entry.policyDedicatedRatio,
191                         ResourceType:            "prb",
192                         RRMPolicyMembers: []messages.RRMPolicyMember{
193                                 {
194                                         MobileCountryCode:   "310",
195                                         MobileNetworkCode:   "150",
196                                         SliceDifferentiator: entry.sd,
197                                         SliceServiceType:    entry.sst,
198                                 },
199                         },
200                 }
201                 policies = append(policies, message)
202         }
203
204         var publicLandMobileNetworks []messages.PublicLandMobileNetworks
205         for _, entry := range data {
206                 publicLandMobileNetwork := messages.PublicLandMobileNetworks{
207                         MobileCountryCode:   "310",
208                         MobileNetworkCode:   "150",
209                         SliceDifferentiator: entry.sd,
210                         SliceServiceType:    entry.sst,
211                 }
212                 publicLandMobileNetworks = append(publicLandMobileNetworks, publicLandMobileNetwork)
213         }
214
215         var supportedSnssaiSubcounterInstances []messages.SupportedSnssaiSubcounterInstances
216         for _, entry := range data {
217                 supportedSnssaiSubcounterInstance := messages.SupportedSnssaiSubcounterInstances{
218                         SliceDifferentiator: entry.sd,
219                         SliceServiceType:    entry.sst,
220                 }
221                 supportedSnssaiSubcounterInstances = append(supportedSnssaiSubcounterInstances, supportedSnssaiSubcounterInstance)
222         }
223
224         cell := messages.Cell{
225                 Id:             "cell-1",
226                 LocalId:        1,
227                 PhysicalCellId: 1,
228                 BaseStationChannelBandwidth: messages.BaseStationChannelBandwidth{
229                         Uplink:              83000,
230                         Downlink:            80000,
231                         SupplementaryUplink: 84000,
232                 },
233                 OperationalState:         "enabled",
234                 TrackingAreaCode:         10,
235                 AdmState:                 "unlocked",
236                 PublicLandMobileNetworks: publicLandMobileNetworks,
237                 SupportedMeasurements: []messages.SupportedMeasurements{
238                         {
239                                 PerformanceMeasurementType:         "o-ran-sc-du-hello-world:user-equipment-average-throughput-uplink",
240                                 SupportedSnssaiSubcounterInstances: supportedSnssaiSubcounterInstances,
241                         },
242                         {
243                                 PerformanceMeasurementType:         "o-ran-sc-du-hello-world:user-equipment-average-throughput-downlink",
244                                 SupportedSnssaiSubcounterInstances: supportedSnssaiSubcounterInstances,
245                         },
246                 },
247                 TrafficState: "active",
248                 AbsoluteRadioFrequencyChannelNumber: messages.AbsoluteRadioFrequencyChannelNumber{
249                         Uplink:              14000,
250                         Downlink:            15000,
251                         SupplementaryUplink: 14500,
252                 },
253                 UserLabel: "cell-1",
254                 SynchronizationSignalBlock: messages.SynchronizationSignalBlock{
255                         Duration:               2,
256                         FrequencyChannelNumber: 12,
257                         Periodicity:            10,
258                         SubcarrierSpacing:      30,
259                         Offset:                 3,
260                 },
261         }
262
263         distUnitFunction := messages.DistributedUnitFunction{
264                 Id:               oduId,
265                 OperationalState: "enabled",
266                 AdmState:         "unlocked",
267                 UserLabel:        oduId,
268                 Cell: []messages.Cell{
269                         cell,
270                 },
271                 RRMPolicyRatio: policies,
272         }
273
274         duRRMPolicyRatio := messages.ORanDuRestConf{
275                 DistributedUnitFunction: []messages.DistributedUnitFunction{
276                         distUnitFunction,
277                 },
278         }
279
280         return duRRMPolicyRatio
281 }
282
283 func updateRRMPolicyDedicatedRatio(w http.ResponseWriter, r *http.Request) {
284         var policies struct {
285                 RRMPolicies []messages.RRMPolicyRatio `json:"radio-resource-management-policy-ratio"`
286         }
287         decoder := json.NewDecoder(r.Body)
288
289         if err := decoder.Decode(&policies); err != nil {
290                 respondWithError(w, http.StatusBadRequest, "Invalid request payload")
291                 return
292         }
293         defer r.Body.Close()
294
295         prMessages := policies.RRMPolicies
296         log.Infof("Post request to update RRMPolicyDedicatedRatio %+v", prMessages)
297         findAndUpdatePolicy(prMessages)
298         respondWithJSON(w, http.StatusOK, map[string]string{"status": "200"})
299 }
300
301 func findAndUpdatePolicy(rRMPolicyRatio []messages.RRMPolicyRatio) {
302         for _, policy := range rRMPolicyRatio {
303                 for _, entry := range data {
304                         if entry.policyRatioId == policy.Id {
305                                 log.Infof("update Policy Dedicated Ratio: value for policy %+v\n Old value: %v New value: %v ", policy, entry.policyDedicatedRatio, policy.RRMPolicyDedicatedRatio)
306                                 entry.policyDedicatedRatio = policy.RRMPolicyDedicatedRatio
307                                 if entry.metricValue > THRESHOLD_TPUT {
308                                         entry.metricValue = rand.Intn(THRESHOLD_TPUT)
309                                 }
310                                 messagesToSend = append(messagesToSend, generateMeasurementEntry(entry))
311                         }
312                 }
313         }
314 }
315
316 func respondWithError(w http.ResponseWriter, code int, message string) {
317         respondWithJSON(w, code, map[string]string{"error": message})
318 }
319
320 func respondWithJSON(w http.ResponseWriter, code int, payload interface{}) {
321         response, _ := json.Marshal(payload)
322
323         w.Header().Set("Content-Type", "application/json")
324         w.WriteHeader(code)
325         w.Write(response)
326 }
327
328 func sendDmaapMessages() {
329
330         client := &http.Client{
331                 Timeout: 10 * time.Second,
332         }
333
334         log.Info("Send Dmaap messages")
335         for range time.Tick(10 * time.Second) {
336                 if !started {
337                         break
338                 }
339                 m, _ := json.Marshal(generateStdMessage())
340                 msgToSend, _ := json.Marshal([]string{string(m)})
341
342                 time.Sleep(time.Duration(rand.Intn(3)) * time.Second)
343
344                 oru_addr := getEnv("ORU_ADDR", "http://localhost:8095")
345                 req, _ := http.NewRequest(http.MethodPost, oru_addr, bytes.NewBuffer(msgToSend))
346                 req.Header.Set("Content-Type", "application/json; charset=utf-8")
347
348                 _, err := client.Do(req)
349                 if err != nil {
350                         fmt.Println("Error sending to consumer: ", err)
351                 }
352                 fmt.Println("Sent message to consumer!")
353         }
354 }
355
356 func getEnv(key string, defaultVal string) string {
357         if value, exists := os.LookupEnv(key); exists {
358                 return value
359         }
360
361         return defaultVal
362 }
363
364 func generateStdMessage() messages.StdDefinedMessage {
365         entry := data[rand.Intn(5)]
366
367         maxTput := THRESHOLD_TPUT + 100
368         randomTput := rand.Intn(maxTput-THRESHOLD_TPUT+1) + THRESHOLD_TPUT
369         if randomTput%3 == 0 {
370                 log.Info("Using tput value higher than THRESHOLD_TPUT ", randomTput)
371                 entry.metricValue = randomTput
372         }
373
374         messagesToSend = append(messagesToSend, generateMeasurementEntry(entry))
375
376         message := messages.StdDefinedMessage{
377                 Event: messages.Event{
378                         CommonEventHeader: messages.CommonEventHeader{
379                                 Domain:                  "stndDefined",
380                                 EventId:                 "pm-1_1644252450",
381                                 EventName:               "stndDefined_performanceMeasurementStreaming",
382                                 EventType:               "performanceMeasurementStreaming",
383                                 Sequence:                825,
384                                 Priority:                "Low",
385                                 ReportingEntityId:       "",
386                                 ReportingEntityName:     "O-DU-1122",
387                                 SourceId:                "",
388                                 SourceName:              "O-DU-1122",
389                                 StartEpochMicrosec:      1644252450000000,
390                                 LastEpochMicrosec:       1644252480000000,
391                                 NfNamingCode:            "SIM-O-DU",
392                                 NfVendorName:            "O-RAN-SC SIM Project",
393                                 StndDefinedNamespace:    "o-ran-sc-du-hello-world-pm-streaming-oas3",
394                                 TimeZoneOffset:          "+00:00",
395                                 Version:                 "4.1",
396                                 VesEventListenerVersion: "7.2.1",
397                         },
398                         StndDefinedFields: messages.StndDefinedFields{
399                                 StndDefinedFieldsVersion: "1.0",
400                                 SchemaReference:          "https://gerrit.o-ran-sc.org/r/gitweb?p=scp/oam/modeling.git;a=blob_plain;f=data-model/oas3/experimental/o-ran-sc-du-hello-world-oas3.json;hb=refs/heads/master",
401                                 Data: messages.Data{
402                                         DataId:              "pm-1_1644252450",
403                                         StartTime:           "2022-02-07T16:47:30.0Z",
404                                         AdministrativeState: "unlocked",
405                                         OperationalState:    "enabled",
406                                         UserLabel:           "pm",
407                                         JobTag:              "my-job-tag",
408                                         GranularityPeriod:   30,
409                                         Measurements:        messagesToSend,
410                                 },
411                         },
412                 },
413         }
414         messagesToSend = nil
415         fmt.Printf("Sending Dmaap message:\n %+v\n", message)
416         return message
417 }
418
419 func generateMeasurementEntry(entry *SliceAssuranceInformation) messages.Measurement {
420
421         measurementTypeInstanceReference := "/o-ran-sc-du-hello-world:network-function/distributed-unit-functions[id='" + entry.duId + "']/cell[id='" + entry.cellId + "']/supported-measurements[performance-measurement-type='(urn:o-ran-sc:yang:o-ran-sc-du-hello-world?revision=2021-11-23)" + entry.metricName + "']/supported-snssai-subcounter-instances[slice-differentiator='" + strconv.Itoa(entry.sd) + "'][slice-service-type='" + strconv.Itoa(entry.sst) + "']"
422         meas := messages.Measurement{
423
424                 MeasurementTypeInstanceReference: measurementTypeInstanceReference,
425                 Value:                            entry.metricValue,
426                 Unit:                             "kbit/s",
427         }
428         return meas
429 }