Updated test for helm chart recipe
[nonrtric.git] / test / kafka-procon / main.go
index 6f8bad2..43be1b5 100644 (file)
@@ -309,6 +309,7 @@ func getTopicCounter(w http.ResponseWriter, req *http.Request) {
 
 func startToSend(w http.ResponseWriter, req *http.Request) {
        t, topicId, exist := getTopicFromRequest(w, req)
+       fmt.Printf("Start to send to topic: %v\n", topicId)
        if !exist {
                return
        }
@@ -324,7 +325,10 @@ func startToSend(w http.ResponseWriter, req *http.Request) {
                        fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
                        return
                }
-               defer func() { p.Close() }()
+               defer func() {
+                       fmt.Printf("Closing producer for topic: %v\n", topicId)
+                       p.Close()
+               }()
                for {
                        q := t.messages.send
                        if q == nil {
@@ -342,12 +346,12 @@ func startToSend(w http.ResponseWriter, req *http.Request) {
                                        globalCounters.sent.step()
                                        msg := fmt.Sprintf("%v", m)
                                        if len(msg) < 500 {
-                                               fmt.Printf("Message sent on topic: %v, len: %v, msg: %v", topicId, len(msg), msg)
+                                               fmt.Printf("Message sent on topic: %v, len: %v, msg: %v\n", topicId, len(msg), msg)
                                        } else {
-                                               fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed", topicId, len(msg))
+                                               fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed\n", topicId, len(msg))
                                        }
                                } else {
-                                       fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v", topicId, err)
+                                       fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v\n", topicId, err)
                                        q.Remove(0)
                                }
                        } else {
@@ -409,9 +413,9 @@ func startToReceive(w http.ResponseWriter, req *http.Request) {
                                msg, err := c.ReadMessage(maxDur)
                                if err == nil {
                                        if len(msg.Value) < 500 {
-                                               fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
+                                               fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v\n", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
                                        } else {
-                                               fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed", topicId, msg.TopicPartition, len(msg.Value))
+                                               fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed\n", topicId, msg.TopicPartition, len(msg.Value))
                                        }
                                        err = t.messages.received.Enqueue(string(msg.Value))
                                        if err != nil {
@@ -435,7 +439,9 @@ func startToReceive(w http.ResponseWriter, req *http.Request) {
 // Post a message to a topic
 // POST /send    content type is specified in content type
 func sendToTopic(w http.ResponseWriter, req *http.Request) {
+
        t, topicId, exist := getTopicFromRequest(w, req)
+       fmt.Printf("Send to topic: %v\n", topicId)
        if !exist {
                return
        }
@@ -471,10 +477,12 @@ func sendToTopic(w http.ResponseWriter, req *http.Request) {
                b, err := ioutil.ReadAll(req.Body)
                if err == nil {
                        if len(b) < 500 {
-                               fmt.Printf("Json payload to send on topic: %v, msg: %v", topicId, string(b))
+                               fmt.Printf("Json payload to send on topic: %v, msg: %v\n", topicId, string(b))
                        } else {
-                               fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+                               fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
                        }
+               } else {
+                       fmt.Printf("Json payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
                }
                err = q.Enqueue(string(b))
                if err != nil {
@@ -491,10 +499,12 @@ func sendToTopic(w http.ResponseWriter, req *http.Request) {
                b, err := ioutil.ReadAll(req.Body)
                if err == nil {
                        if len(b) < 500 {
-                               fmt.Printf("Text payload to send on topic: %v, msg: %v", topicId, string(b))
+                               fmt.Printf("Text payload to send on topic: %v, msg: %v\n", topicId, string(b))
                        } else {
-                               fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+                               fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
                        }
+               } else {
+                       fmt.Printf("Text payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
                }
                err = q.Enqueue(string(b))
                if err != nil {