Remove unused code
[nonrtric/plt/ranpm.git] / pm-file-converter / components / miniocollector / miniocollector.go
1 // -
2 //
3 //      ========================LICENSE_START=================================
4 //      O-RAN-SC
5 //      %%
6 //      Copyright (C) 2023: Nordix Foundation
7 //      %%
8 //      Licensed under the Apache License, Version 2.0 (the "License");
9 //      you may not use this file except in compliance with the License.
10 //      You may obtain a copy of the License at
11 //
12 //           http://www.apache.org/licenses/LICENSE-2.0
13 //
14 //      Unless required by applicable law or agreed to in writing, software
15 //      distributed under the License is distributed on an "AS IS" BASIS,
16 //      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 //      See the License for the specific language governing permissions and
18 //      limitations under the License.
19 //      ========================LICENSE_END===================================
20 package miniocollector
21
22 import (
23         "bytes"
24         "compress/gzip"
25         "context"
26         "fmt"
27         jsoniter "github.com/json-iterator/go"
28         "github.com/minio/minio-go/v7"
29         "github.com/minio/minio-go/v7/pkg/credentials"
30         log "github.com/sirupsen/logrus"
31         "io"
32         "main/common/dataTypes"
33         "main/components/xmltransform"
34         "net/url"
35         "os"
36         "strings"
37         "time"
38 )
39
40 func Xml_to_json_conv(evt_data *dataTypes.XmlFileEventHeader) string {
41         filestoreUser := os.Getenv("FILESTORE_USER")
42         filestorePwd := os.Getenv("FILESTORE_PWD")
43         filestoreServer := os.Getenv("FILESTORE_SERVER")
44
45         s3Client, err := minio.New(filestoreServer, &minio.Options{
46                 Creds:  credentials.NewStaticV4(filestoreUser, filestorePwd, ""),
47                 Secure: false,
48         })
49         if err != nil {
50                 log.Fatalln(err)
51         }
52         expiry := time.Second * 24 * 60 * 60 // 1 day.
53         objectName := evt_data.Name
54         bucketName := evt_data.ObjectStoreBucket
55         compresion := evt_data.Compression
56         reqParams := make(url.Values)
57
58         xmlh, err := jsoniter.Marshal(evt_data)
59         if err != nil {
60                 fmt.Printf("Error: %s", err)
61                 return ""
62         }
63
64         // Generate presigned GET url with lambda function
65         presignedURL, err := s3Client.PresignedGetObject(context.Background(), bucketName, objectName, expiry, reqParams)
66         if err != nil {
67                 log.Fatalln(err)
68         }
69         file_bytes := xmltransform.Convert(presignedURL.String(), compresion, string(xmlh))
70         newObjectName := objectName + "kafka-producer-pm-xml2json-0.json.gz"
71         var buf bytes.Buffer
72         err = gzipWrite(&buf, &file_bytes)
73         upload_object(s3Client, buf.Bytes(), newObjectName, "pm-files-json")
74         fmt.Println("")
75
76         return newObjectName
77 }
78
79 func upload_object(mc *minio.Client, b []byte, objectName string, fsbucket string) {
80         contentType := "application/json"
81         if strings.HasSuffix(objectName, ".gz") {
82                 contentType = "application/gzip"
83         }
84
85         // Upload the xml file with PutObject
86         r := bytes.NewReader(b)
87         tctx := context.Background()
88         if check_minio_bucket(mc, fsbucket) == false {
89                 err := create_minio_bucket(mc, fsbucket)
90                 if err != nil {
91                         log.Error("Cannot create bucket: ", fsbucket, ", ", err)
92                         return
93                 }
94         }
95         ok := false
96         for i := 1; i < 64 && ok == false; i = i * 2 {
97                 info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
98                 if err != nil {
99
100                         if i == 1 {
101                                 log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
102                         } else {
103                                 log.Warn("Cannot upload (retry): ", objectName, ", ", err)
104                         }
105                         time.Sleep(time.Duration(i) * time.Second)
106                 } else {
107                         log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
108                 }
109         }
110 }
111
112 func create_minio_bucket(mc *minio.Client, bucket string) error {
113         tctx := context.Background()
114         err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{})
115         if err != nil {
116                 // Check to see if we already own this bucket (which happens if you run this twice)
117                 exists, errBucketExists := mc.BucketExists(tctx, bucket)
118                 if errBucketExists == nil && exists {
119                         log.Debug("Already own bucket:", bucket)
120                         return nil
121                 } else {
122                         log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
123                         return err
124                 }
125         }
126         log.Debug("Successfully created bucket: ", bucket)
127         return nil
128 }
129
130 func check_minio_bucket(mc *minio.Client, bucket string) bool {
131         tctx := context.Background()
132         exists, err := mc.BucketExists(tctx, bucket)
133         if err == nil && exists {
134                 log.Debug("Already own bucket:", bucket)
135                 return true
136         }
137         log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
138         return false
139 }
140
141 // Write gzipped data to a Writer
142 func gzipWrite(w io.Writer, data *[]byte) error {
143         gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
144
145         if err1 != nil {
146                 return err1
147         }
148         defer gw.Close()
149         _, err2 := gw.Write(*data)
150         return err2
151 }