3 // ========================LICENSE_START=================================
6 // Copyright (C) 2023: Nordix Foundation
8 // Licensed under the Apache License, Version 2.0 (the "License");
9 // you may not use this file except in compliance with the License.
10 // You may obtain a copy of the License at
12 // http://www.apache.org/licenses/LICENSE-2.0
14 // Unless required by applicable law or agreed to in writing, software
15 // distributed under the License is distributed on an "AS IS" BASIS,
16 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 // See the License for the specific language governing permissions and
18 // limitations under the License.
19 // ========================LICENSE_END===================================
20 package miniocollector
27 jsoniter "github.com/json-iterator/go"
28 "github.com/minio/minio-go/v7"
29 "github.com/minio/minio-go/v7/pkg/credentials"
30 log "github.com/sirupsen/logrus"
32 "main/common/dataTypes"
33 "main/components/xmltransform"
40 func Xml_to_json_conv(evt_data *dataTypes.XmlFileEventHeader) string {
41 filestoreUser := os.Getenv("FILESTORE_USER")
42 filestorePwd := os.Getenv("FILESTORE_PWD")
43 filestoreServer := os.Getenv("FILESTORE_SERVER")
45 s3Client, err := minio.New(filestoreServer, &minio.Options{
46 Creds: credentials.NewStaticV4(filestoreUser, filestorePwd, ""),
52 expiry := time.Second * 24 * 60 * 60 // 1 day.
53 objectName := evt_data.Name
54 bucketName := evt_data.ObjectStoreBucket
55 compresion := evt_data.Compression
56 reqParams := make(url.Values)
58 xmlh, err := jsoniter.Marshal(evt_data)
60 fmt.Printf("Error: %s", err)
64 // Generate presigned GET url with lambda function
65 presignedURL, err := s3Client.PresignedGetObject(context.Background(), bucketName, objectName, expiry, reqParams)
69 file_bytes := xmltransform.Convert(presignedURL.String(), compresion, string(xmlh))
70 newObjectName := objectName + "kafka-producer-pm-xml2json-0.json.gz"
72 err = gzipWrite(&buf, &file_bytes)
73 upload_object(s3Client, buf.Bytes(), newObjectName, "pm-files-json")
79 func upload_object(mc *minio.Client, b []byte, objectName string, fsbucket string) {
80 contentType := "application/json"
81 if strings.HasSuffix(objectName, ".gz") {
82 contentType = "application/gzip"
85 // Upload the xml file with PutObject
86 r := bytes.NewReader(b)
87 tctx := context.Background()
88 if check_minio_bucket(mc, fsbucket) == false {
89 err := create_minio_bucket(mc, fsbucket)
91 log.Error("Cannot create bucket: ", fsbucket, ", ", err)
96 for i := 1; i < 64 && ok == false; i = i * 2 {
97 info, err := mc.PutObject(tctx, fsbucket, objectName, r, int64(len(b)), minio.PutObjectOptions{ContentType: contentType})
101 log.Warn("Cannot upload (first attempt): ", objectName, ", ", err)
103 log.Warn("Cannot upload (retry): ", objectName, ", ", err)
105 time.Sleep(time.Duration(i) * time.Second)
107 log.Debug("Successfully uploaded: ", objectName, " of size:", info.Size)
112 func create_minio_bucket(mc *minio.Client, bucket string) error {
113 tctx := context.Background()
114 err := mc.MakeBucket(tctx, bucket, minio.MakeBucketOptions{})
116 // Check to see if we already own this bucket (which happens if you run this twice)
117 exists, errBucketExists := mc.BucketExists(tctx, bucket)
118 if errBucketExists == nil && exists {
119 log.Debug("Already own bucket:", bucket)
122 log.Error("Cannot create or check bucket ", bucket, " in minio client", err)
126 log.Debug("Successfully created bucket: ", bucket)
130 func check_minio_bucket(mc *minio.Client, bucket string) bool {
131 tctx := context.Background()
132 exists, err := mc.BucketExists(tctx, bucket)
133 if err == nil && exists {
134 log.Debug("Already own bucket:", bucket)
137 log.Error("Bucket does not exist, bucket ", bucket, " in minio client", err)
141 // Write gzipped data to a Writer
142 func gzipWrite(w io.Writer, data *[]byte) error {
143 gw, err1 := gzip.NewWriterLevel(w, gzip.BestSpeed)
149 _, err2 := gw.Write(*data)