Remove unused code
[nonrtric/plt/ranpm.git] / pm-file-converter / components / miniocollector / miniocollector.go
diff --git a/pm-file-converter/components/miniocollector/miniocollector.go b/pm-file-converter/components/miniocollector/miniocollector.go
new file mode 100644 (file)
index 0000000..1663194
--- /dev/null
@@ -0,0 +1,151 @@
+// -
+//
+//     ========================LICENSE_START=================================
+//     O-RAN-SC
+//     %%
+//     Copyright (C) 2023: Nordix Foundation
+//     %%
+//     Licensed under the Apache License, Version 2.0 (the "License");
+//     you may not use this file except in compliance with the License.
+//     You may obtain a copy of the License at
+//
+//          http://www.apache.org/licenses/LICENSE-2.0
+//
+//     Unless required by applicable law or agreed to in writing, software
+//     distributed under the License is distributed on an "AS IS" BASIS,
+//     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//     See the License for the specific language governing permissions and
+//     limitations under the License.
+//     ========================LICENSE_END===================================
+package miniocollector
+
+import (
+       "bytes"
+       "compress/gzip"
+       "context"
+       "fmt"
+       jsoniter "github.com/json-iterator/go"
+       "github.com/minio/minio-go/v7"
+       "github.com/minio/minio-go/v7/pkg/credentials"
+       log "github.com/sirupsen/logrus"
+       "io"
+       "main/common/dataTypes"
+       "main/components/xmltransform"
+       "net/url"
+       "os"
+       "strings"
+       "time"
+)
+
+func Xml_to_json_conv(evt_data *dataTypes.XmlFileEventHeader) string {
+       filestoreUser := os.Getenv("FILESTORE_USER")
+       filestorePwd := os.Getenv("FILESTORE_PWD")
+       filestoreServer := os.Getenv("FILESTORE_SERVER")
+
+       s3Client, err := minio.New(filestoreServer, &minio.Options{
+               Creds:  credentials.NewStaticV4(filestoreUser, filestorePwd, ""),
+               Secure: false,
+       })
+       if err != nil {
+               log.Fatalln(err)
+       }
+       expiry := time.Second * 24 * 60 * 60 // 1 day.
+       objectName := evt_data.Name
+       bucketName := evt_data.ObjectStoreBucket
+       compresion := evt_data.Compression
+       reqParams := make(url.Values)
+
+       xmlh, err := jsoniter.Marshal(evt_data)
+       if err != nil {
+               fmt.Printf("Error: %s", err)
+               return ""
+       }
+
+       // Generate presigned GET url with lambda function
+       presignedURL, err := s3Client.PresignedGetObject(context.Background(), bucketName, objectName, expiry, reqParams)
+       if err != nil {
+               log.Fatalln(err)
+       }
+       file_bytes := xmltransform.Convert(presignedURL.String(), compresion, string(xmlh))
+       newObjectName := objectName + "kafka-producer-pm-xml2json-0.json.gz"
+       var buf bytes.Buffer
+       err = gzipWrite(&buf, &file_bytes)
+       upload_object(s3Client, buf.Bytes(), newObjectName, "pm-files-json")
+       fmt.Println("")
+
+       return newObjectName
+}
+
+func upload_object(mc *minio.Client, b []byte, objectName string, fsbucket string) {
+       contentType := "application/json"
+       if strings.HasSuffix(objectName, ".gz") {
+               contentType = "application/gzip"
+       }
+
+       // Upload the xml file with PutObject
+       r := bytes.NewReader(b)
+       tctx := context.Background()
+       if check_minio_bucket(mc, fsbucket) == false {
+               err := create_minio_bucket(mc, fsbucket)
+               if err != nil {
+                       log.Error("Cannot create bucket: ", fsbucket, ", ", err)
+                       return
+               }
+       }
+       ok := false
+       for i := 1; i < 64 && ok == false; i = i * 2 {
+               info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
+               if err != nil {
+
+                       if i == 1 {
+                               log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
+                       } else {
+                               log.Warn("Cannot upload (retry): ", objectName, ", ", err)
+                       }
+                       time.Sleep(time.Duration(i) * time.Second)
+               } else {
+                       log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
+               }
+       }
+}
+
+func create_minio_bucket(mc *minio.Client, bucket string) error {
+       tctx := context.Background()
+       err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{})
+       if err != nil {
+               // Check to see if we already own this bucket (which happens if you run this twice)
+               exists, errBucketExists := mc.BucketExists(tctx, bucket)
+               if errBucketExists == nil && exists {
+                       log.Debug("Already own bucket:", bucket)
+                       return nil
+               } else {
+                       log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
+                       return err
+               }
+       }
+       log.Debug("Successfully created bucket: ", bucket)
+       return nil
+}
+
+func check_minio_bucket(mc *minio.Client, bucket string) bool {
+       tctx := context.Background()
+       exists, err := mc.BucketExists(tctx, bucket)
+       if err == nil && exists {
+               log.Debug("Already own bucket:", bucket)
+               return true
+       }
+       log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
+       return false
+}
+
+// Write gzipped data to a Writer
+func gzipWrite(w io.Writer, data *[]byte) error {
+       gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
+
+       if err1 != nil {
+               return err1
+       }
+       defer gw.Close()
+       _, err2 := gw.Write(*data)
+       return err2
+}