X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fkafka-procon%2Fmain.go;h=43be1b5ab1f09c04933b1ff84e976de8df9be71b;hb=50ba737a87f792087740cd13a19c30d22086c5db;hp=6f8bad2f2cf5515eadb5a81c2b3f71b8b35494a7;hpb=27b11820d489c7fc2fc1511dcb6c4552a1939c80;p=nonrtric.git diff --git a/test/kafka-procon/main.go b/test/kafka-procon/main.go index 6f8bad2f..43be1b5a 100644 --- a/test/kafka-procon/main.go +++ b/test/kafka-procon/main.go @@ -309,6 +309,7 @@ func getTopicCounter(w http.ResponseWriter, req *http.Request) { func startToSend(w http.ResponseWriter, req *http.Request) { t, topicId, exist := getTopicFromRequest(w, req) + fmt.Printf("Start to send to topic: %v\n", topicId) if !exist { return } @@ -324,7 +325,10 @@ func startToSend(w http.ResponseWriter, req *http.Request) { fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err) return } - defer func() { p.Close() }() + defer func() { + fmt.Printf("Closing producer for topic: %v\n", topicId) + p.Close() + }() for { q := t.messages.send if q == nil { @@ -342,12 +346,12 @@ func startToSend(w http.ResponseWriter, req *http.Request) { globalCounters.sent.step() msg := fmt.Sprintf("%v", m) if len(msg) < 500 { - fmt.Printf("Message sent on topic: %v, len: %v, msg: %v", topicId, len(msg), msg) + fmt.Printf("Message sent on topic: %v, len: %v, msg: %v\n", topicId, len(msg), msg) } else { - fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed", topicId, len(msg)) + fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed\n", topicId, len(msg)) } } else { - fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v", topicId, err) + fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v\n", topicId, err) q.Remove(0) } } else { @@ -409,9 +413,9 @@ func startToReceive(w http.ResponseWriter, req *http.Request) { msg, err := c.ReadMessage(maxDur) if err == nil { if len(msg.Value) < 500 { - fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value)) + fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v\n", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value)) } else { - fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed", topicId, msg.TopicPartition, len(msg.Value)) + fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed\n", topicId, msg.TopicPartition, len(msg.Value)) } err = t.messages.received.Enqueue(string(msg.Value)) if err != nil { @@ -435,7 +439,9 @@ func startToReceive(w http.ResponseWriter, req *http.Request) { // Post a message to a topic // POST /send content type is specified in content type func sendToTopic(w http.ResponseWriter, req *http.Request) { + t, topicId, exist := getTopicFromRequest(w, req) + fmt.Printf("Send to topic: %v\n", topicId) if !exist { return } @@ -471,10 +477,12 @@ func sendToTopic(w http.ResponseWriter, req *http.Request) { b, err := ioutil.ReadAll(req.Body) if err == nil { if len(b) < 500 { - fmt.Printf("Json payload to send on topic: %v, msg: %v", topicId, string(b)) + fmt.Printf("Json payload to send on topic: %v, msg: %v\n", topicId, string(b)) } else { - fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...", topicId) + fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId) } + } else { + fmt.Printf("Json payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err) } err = q.Enqueue(string(b)) if err != nil { @@ -491,10 +499,12 @@ func sendToTopic(w http.ResponseWriter, req *http.Request) { b, err := ioutil.ReadAll(req.Body) if err == nil { if len(b) < 500 { - fmt.Printf("Text payload to send on topic: %v, msg: %v", topicId, string(b)) + fmt.Printf("Text payload to send on topic: %v, msg: %v\n", topicId, string(b)) } else { - fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...", topicId) + fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId) } + } else { + fmt.Printf("Text payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err) } err = q.Enqueue(string(b)) if err != nil {