Merge "Functional Test Updates for I Release"
[nonrtric.git] / test / kafka-procon / main.go
index 6f8bad2..75f3163 100644 (file)
@@ -1,3 +1,23 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2020-2022: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
 // Writing a basic HTTP server is easy using the
 // `net/http` package.
 package main
@@ -20,13 +40,13 @@ import (
 
 // Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface
 //    globalCounters      var holding the "global counters"
-//      recieved          number of received messages from all topics                             (int)
+//      received          number of received messages from all topics                             (int)
 //      sent              number of sent messages to all topics                                   (int)
 //    topics              var holding all topic related info
 //      <topic-name>      name of a topic (present after topic is created)
 //        content-type    data type of the topic                                                  (string)
 //        counters
-//          recieved      number of received messages from the topic                              (int)
+//          received      number of received messages from the topic                              (int)
 //          sent          number of sent messages to the topic                                    (int)
 //        messages
 //          send          messages waiting to be sent (set when sending is started)               (fifo)
@@ -309,6 +329,7 @@ func getTopicCounter(w http.ResponseWriter, req *http.Request) {
 
 func startToSend(w http.ResponseWriter, req *http.Request) {
        t, topicId, exist := getTopicFromRequest(w, req)
+       fmt.Printf("Start to send to topic: %v\n", topicId)
        if !exist {
                return
        }
@@ -324,7 +345,10 @@ func startToSend(w http.ResponseWriter, req *http.Request) {
                        fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
                        return
                }
-               defer func() { p.Close() }()
+               defer func() {
+                       fmt.Printf("Closing producer for topic: %v\n", topicId)
+                       p.Close()
+               }()
                for {
                        q := t.messages.send
                        if q == nil {
@@ -342,12 +366,12 @@ func startToSend(w http.ResponseWriter, req *http.Request) {
                                        globalCounters.sent.step()
                                        msg := fmt.Sprintf("%v", m)
                                        if len(msg) < 500 {
-                                               fmt.Printf("Message sent on topic: %v, len: %v, msg: %v", topicId, len(msg), msg)
+                                               fmt.Printf("Message sent on topic: %v, len: %v, msg: %v\n", topicId, len(msg), msg)
                                        } else {
-                                               fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed", topicId, len(msg))
+                                               fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed\n", topicId, len(msg))
                                        }
                                } else {
-                                       fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v", topicId, err)
+                                       fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v\n", topicId, err)
                                        q.Remove(0)
                                }
                        } else {
@@ -409,9 +433,9 @@ func startToReceive(w http.ResponseWriter, req *http.Request) {
                                msg, err := c.ReadMessage(maxDur)
                                if err == nil {
                                        if len(msg.Value) < 500 {
-                                               fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
+                                               fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v\n", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
                                        } else {
-                                               fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed", topicId, msg.TopicPartition, len(msg.Value))
+                                               fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed\n", topicId, msg.TopicPartition, len(msg.Value))
                                        }
                                        err = t.messages.received.Enqueue(string(msg.Value))
                                        if err != nil {
@@ -435,7 +459,9 @@ func startToReceive(w http.ResponseWriter, req *http.Request) {
 // Post a message to a topic
 // POST /send    content type is specified in content type
 func sendToTopic(w http.ResponseWriter, req *http.Request) {
+
        t, topicId, exist := getTopicFromRequest(w, req)
+       fmt.Printf("Send to topic: %v\n", topicId)
        if !exist {
                return
        }
@@ -471,10 +497,12 @@ func sendToTopic(w http.ResponseWriter, req *http.Request) {
                b, err := ioutil.ReadAll(req.Body)
                if err == nil {
                        if len(b) < 500 {
-                               fmt.Printf("Json payload to send on topic: %v, msg: %v", topicId, string(b))
+                               fmt.Printf("Json payload to send on topic: %v, msg: %v\n", topicId, string(b))
                        } else {
-                               fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+                               fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
                        }
+               } else {
+                       fmt.Printf("Json payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
                }
                err = q.Enqueue(string(b))
                if err != nil {
@@ -491,10 +519,12 @@ func sendToTopic(w http.ResponseWriter, req *http.Request) {
                b, err := ioutil.ReadAll(req.Body)
                if err == nil {
                        if len(b) < 500 {
-                               fmt.Printf("Text payload to send on topic: %v, msg: %v", topicId, string(b))
+                               fmt.Printf("Text payload to send on topic: %v, msg: %v\n", topicId, string(b))
                        } else {
-                               fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+                               fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
                        }
+               } else {
+                       fmt.Printf("Text payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
                }
                err = q.Enqueue(string(b))
                if err != nil {