Fixing Sonar Code Smell for PMConverter
[nonrtric/plt/ranpm.git] / pm-file-converter / components / miniocollector / miniocollector.go
1 // -
2 //
3 //      ========================LICENSE_START=================================
4 //      O-RAN-SC
5 //      %%
6 //      Copyright (C) 2023: Nordix Foundation
7 //      %%
8 //      Licensed under the Apache License, Version 2.0 (the "License");
9 //      you may not use this file except in compliance with the License.
10 //      You may obtain a copy of the License at
11 //
12 //           http://www.apache.org/licenses/LICENSE-2.0
13 //
14 //      Unless required by applicable law or agreed to in writing, software
15 //      distributed under the License is distributed on an "AS IS" BASIS,
16 //      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 //      See the License for the specific language governing permissions and
18 //      limitations under the License.
19 //      ========================LICENSE_END===================================
20 package miniocollector
21
22 import (
23         "bytes"
24         "compress/gzip"
25         "context"
26         "fmt"
27         "io"
28         "main/common/dataTypes"
29         "main/components/xmltransform"
30         "net/url"
31         "os"
32         "strings"
33         "time"
34
35         jsoniter "github.com/json-iterator/go"
36         "github.com/minio/minio-go/v7"
37         "github.com/minio/minio-go/v7/pkg/credentials"
38         log "github.com/sirupsen/logrus"
39 )
40
41 // nolint
42 func XmlToJsonConv(evtData *dataTypes.XmlFileEventHeader) string {
43         filestoreUser := os.Getenv("FILESTORE_USER")
44         filestorePwd := os.Getenv("FILESTORE_PWD")
45         filestoreServer := os.Getenv("FILESTORE_SERVER")
46
47         s3Client, err := minio.New(filestoreServer, &minio.Options{
48                 Creds:  credentials.NewStaticV4(filestoreUser, filestorePwd, ""),
49                 Secure: false,
50         })
51         if err != nil {
52                 log.Fatalln(err)
53         }
54         expiry := time.Second * 24 * 60 * 60 // 1 day.
55         objectName := evtData.Name
56         bucketName := evtData.ObjectStoreBucket
57         compresion := evtData.Compression
58         reqParams := make(url.Values)
59
60         xmlh, err := jsoniter.Marshal(evtData)
61         if err != nil {
62                 fmt.Printf("Error: %s", err)
63                 return ""
64         }
65
66         // Generate presigned GET url with lambda function
67         presignedURL, err := s3Client.PresignedGetObject(context.Background(), bucketName, objectName, expiry, reqParams)
68         if err != nil {
69                 log.Fatalln(err)
70         }
71         fileBytes := xmltransform.Convert(presignedURL.String(), compresion, string(xmlh))
72         newObjectName := objectName + "kafka-producer-pm-xml2json-0.json.gz"
73         var buf bytes.Buffer
74         err = gzipWrite(&buf, &fileBytes)
75         uploadObject(s3Client, buf.Bytes(), newObjectName, "pm-files-json")
76         fmt.Println("")
77
78         return newObjectName
79 }
80
81 // nolint
82 func uploadObject(mc *minio.Client, b []byte, objectName string, fsbucket string) {
83         contentType := "application/json"
84         if strings.HasSuffix(objectName, ".gz") {
85                 contentType = "application/gzip"
86         }
87
88         // Upload the xml file with PutObject
89         r := bytes.NewReader(b)
90         tctx := context.Background()
91         if checkMinioBucket(mc, fsbucket) == false {
92                 err := createMinioBucket(mc, fsbucket)
93                 if err != nil {
94                         log.Error("Cannot create bucket: ", fsbucket, ", ", err)
95                         return
96                 }
97         }
98         ok := false
99         for i := 1; i < 64 && ok == false; i = i * 2 {
100                 info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
101                 if err != nil {
102
103                         if i == 1 {
104                                 log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
105                         } else {
106                                 log.Warn("Cannot upload (retry): ", objectName, ", ", err)
107                         }
108                         time.Sleep(time.Duration(i) * time.Second)
109                 } else {
110                         log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
111                 }
112         }
113 }
114
115 // nolint
116 func createMinioBucket(mc *minio.Client, bucket string) error {
117         tctx := context.Background()
118         err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{})
119         if err != nil {
120                 // Check to see if we already own this bucket (which happens if you run this twice)
121                 exists, errBucketExists := mc.BucketExists(tctx, bucket)
122                 if errBucketExists == nil && exists {
123                         log.Debug("Already own bucket:", bucket)
124                         return nil
125                 } else {
126                         log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
127                         return err
128                 }
129         }
130         log.Debug("Successfully created bucket: ", bucket)
131         return nil
132 }
133
134 // nolint
135 func checkMinioBucket(mc *minio.Client, bucket string) bool {
136         tctx := context.Background()
137         exists, err := mc.BucketExists(tctx, bucket)
138         if err == nil && exists {
139                 log.Debug("Already own bucket:", bucket)
140                 return true
141         }
142         log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
143         return false
144 }
145
146 // Write gzipped data to a Writer
147 // nolint
148 func gzipWrite(w io.Writer, data *[]byte) error {
149         gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
150
151         if err1 != nil {
152                 return err1
153         }
154         defer gw.Close()
155         _, err2 := gw.Write(*data)
156         return err2
157 }