X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=test%2Fkafka-procon%2Fmain.go;h=75f3163ed35ce03a7bf3d3121c77f88f19954d91;hb=927c17e5aa6123f2c9a5fbd8758d79b8281e7476;hp=6f8bad2f2cf5515eadb5a81c2b3f71b8b35494a7;hpb=aa73209488503ee51db068c6143c7d4ec298a036;p=nonrtric.git diff --git a/test/kafka-procon/main.go b/test/kafka-procon/main.go index 6f8bad2f..75f3163e 100644 --- a/test/kafka-procon/main.go +++ b/test/kafka-procon/main.go @@ -1,3 +1,23 @@ +// - +// ========================LICENSE_START================================= +// O-RAN-SC +// %% +// Copyright (C) 2020-2022: Nordix Foundation +// %% +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ========================LICENSE_END=================================== +// + // Writing a basic HTTP server is easy using the // `net/http` package. package main @@ -20,13 +40,13 @@ import ( // Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface // globalCounters var holding the "global counters" -// recieved number of received messages from all topics (int) +// received number of received messages from all topics (int) // sent number of sent messages to all topics (int) // topics var holding all topic related info // name of a topic (present after topic is created) // content-type data type of the topic (string) // counters -// recieved number of received messages from the topic (int) +// received number of received messages from the topic (int) // sent number of sent messages to the topic (int) // messages // send messages waiting to be sent (set when sending is started) (fifo) @@ -309,6 +329,7 @@ func getTopicCounter(w http.ResponseWriter, req *http.Request) { func startToSend(w http.ResponseWriter, req *http.Request) { t, topicId, exist := getTopicFromRequest(w, req) + fmt.Printf("Start to send to topic: %v\n", topicId) if !exist { return } @@ -324,7 +345,10 @@ func startToSend(w http.ResponseWriter, req *http.Request) { fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err) return } - defer func() { p.Close() }() + defer func() { + fmt.Printf("Closing producer for topic: %v\n", topicId) + p.Close() + }() for { q := t.messages.send if q == nil { @@ -342,12 +366,12 @@ func startToSend(w http.ResponseWriter, req *http.Request) { globalCounters.sent.step() msg := fmt.Sprintf("%v", m) if len(msg) < 500 { - fmt.Printf("Message sent on topic: %v, len: %v, msg: %v", topicId, len(msg), msg) + fmt.Printf("Message sent on topic: %v, len: %v, msg: %v\n", topicId, len(msg), msg) } else { - fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed", topicId, len(msg)) + fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed\n", topicId, len(msg)) } } else { - fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v", topicId, err) + fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v\n", topicId, err) q.Remove(0) } } else { @@ -409,9 +433,9 @@ func startToReceive(w http.ResponseWriter, req *http.Request) { msg, err := c.ReadMessage(maxDur) if err == nil { if len(msg.Value) < 500 { - fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value)) + fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v\n", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value)) } else { - fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed", topicId, msg.TopicPartition, len(msg.Value)) + fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed\n", topicId, msg.TopicPartition, len(msg.Value)) } err = t.messages.received.Enqueue(string(msg.Value)) if err != nil { @@ -435,7 +459,9 @@ func startToReceive(w http.ResponseWriter, req *http.Request) { // Post a message to a topic // POST /send content type is specified in content type func sendToTopic(w http.ResponseWriter, req *http.Request) { + t, topicId, exist := getTopicFromRequest(w, req) + fmt.Printf("Send to topic: %v\n", topicId) if !exist { return } @@ -471,10 +497,12 @@ func sendToTopic(w http.ResponseWriter, req *http.Request) { b, err := ioutil.ReadAll(req.Body) if err == nil { if len(b) < 500 { - fmt.Printf("Json payload to send on topic: %v, msg: %v", topicId, string(b)) + fmt.Printf("Json payload to send on topic: %v, msg: %v\n", topicId, string(b)) } else { - fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...", topicId) + fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId) } + } else { + fmt.Printf("Json payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err) } err = q.Enqueue(string(b)) if err != nil { @@ -491,10 +519,12 @@ func sendToTopic(w http.ResponseWriter, req *http.Request) { b, err := ioutil.ReadAll(req.Body) if err == nil { if len(b) < 500 { - fmt.Printf("Text payload to send on topic: %v, msg: %v", topicId, string(b)) + fmt.Printf("Text payload to send on topic: %v, msg: %v\n", topicId, string(b)) } else { - fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...", topicId) + fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId) } + } else { + fmt.Printf("Text payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err) } err = q.Enqueue(string(b)) if err != nil {