2 // ========================LICENSE_START=================================
5 // Copyright (C) 2020-2022: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
21 // Writing a basic HTTP server is easy using the
22 // `net/http` package.
36 "github.com/confluentinc/confluent-kafka-go/kafka"
37 "github.com/enriquebris/goconcurrentqueue"
38 "github.com/gorilla/mux"
41 // Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface
42 // globalCounters var holding the "global counters"
43 // received number of received messages from all topics (int)
44 // sent number of sent messages to all topics (int)
45 // topics var holding all topic related info
46 // <topic-name> name of a topic (present after topic is created)
47 // content-type data type of the topic (string)
49 // received number of received messages from the topic (int)
50 // sent number of sent messages to the topic (int)
52 // send messages waiting to be sent (set when sending is started) (fifo)
53 // received received messages waiting to be fetched (set when reception is started) (fifo)
59 func (c *counter) step() {
60 atomic.AddUint64(&c.c, 1)
63 func (c counter) get() uint64 {
64 return atomic.LoadUint64(&c.c)
67 type counters struct {
72 func newCounters() counters {
79 type messages struct {
80 send *goconcurrentqueue.FIFO
81 received *goconcurrentqueue.FIFO
84 func (m *messages) startSend() bool {
86 m.send = goconcurrentqueue.NewFIFO()
92 func (m *messages) stopSend() {
96 func (m *messages) addToSend(msg string) error {
98 return fmt.Errorf("sending not started")
101 defer m.send.Unlock()
102 return m.send.Enqueue(msg)
105 func (m *messages) getToSend() (interface{}, error) {
107 return "", fmt.Errorf("sending not started")
110 defer m.send.Unlock()
111 return m.send.Dequeue()
114 func (m *messages) startReceive() bool {
115 if m.received == nil {
116 m.received = goconcurrentqueue.NewFIFO()
122 func (m *messages) stopReceive() {
132 func newTopic(ct string) *topic {
135 counters: counters{},
136 messages: messages{},
140 var globalCounters counters
141 var topics map[string]*topic = make(map[string]*topic)
143 var bootstrapserver = ""
146 bootstrapserver = os.Getenv("KAFKA_BOOTSTRAP_SERVER")
147 if len(bootstrapserver) == 0 {
148 fmt.Println("Fatal error: env var KAFKA_BOOTSTRAP_SERVER not set")
149 fmt.Println("Exiting... ")
152 fmt.Println("Using KAFKA_BOOTSTRAP_SERVER=" + bootstrapserver)
155 //Helper function to get a created topic, if it exists
156 func getTopicFromRequest(w http.ResponseWriter, req *http.Request) (*topic, string, bool) {
157 topicId := mux.Vars(req)["topic"]
158 t, exist := topics[topicId]
160 w.WriteHeader(http.StatusNotFound)
161 fmt.Fprintf(w, "Topic %v does not exist", topicId)
162 return nil, "", false
164 return t, topicId, true
169 func healthCheck(w http.ResponseWriter, req *http.Request) {
173 // Deep reset of this interface stub - no removal of msgs or topics in kafka
175 func allreset(w http.ResponseWriter, req *http.Request) {
176 for _, v := range topics {
177 v.messages.stopSend()
178 v.messages.stopReceive()
180 time.Sleep(5 * time.Second) //Allow producers/consumers to shut down
181 globalCounters = newCounters()
182 topics = make(map[string]*topic)
186 // Get topics, return json array of strings of topics created by this interface stub
187 // Returns json array of strings, array is empty if no topics exist
190 func getTopics(w http.ResponseWriter, req *http.Request) {
191 topicKeys := make([]string, 0, len(topics))
192 fmt.Printf("len topics: %v\n", len(topics))
193 for k := range topics {
194 topicKeys = append(topicKeys, k)
196 var j, err = json.Marshal(topicKeys)
198 w.WriteHeader(http.StatusInternalServerError)
199 fmt.Fprintf(w, "Cannot convert list of topics to json, error details: %v", err)
202 w.Header().Set("Content-Type", "application/json")
203 w.WriteHeader(http.StatusOK)
208 func writeOkRepsonse(w http.ResponseWriter, httpStatus int, msg string) {
209 w.WriteHeader(httpStatus)
210 w.Header().Set("Content-Type", "text/plain")
214 // Get a counter value
215 // GET /topics/counters/{counter}
216 func getCounter(w http.ResponseWriter, req *http.Request) {
217 ctr := mux.Vars(req)["counter"]
219 if ctr == "received" {
220 ctrvalue = int(globalCounters.received.get())
221 } else if ctr == "sent" {
222 ctrvalue = int(globalCounters.sent.get())
226 w.WriteHeader(http.StatusBadRequest)
227 fmt.Fprintf(w, "Counter %v does not exist", ctr)
230 writeOkRepsonse(w, http.StatusOK, strconv.Itoa(ctrvalue))
236 // PUT on /topics/<topic>?type=<type> type shall be 'application/json' or 'text/plain'
237 func createTopic(w http.ResponseWriter, req *http.Request) {
238 topicId := mux.Vars(req)["topic"]
239 topicType := mux.Vars(req)["type"]
241 fmt.Printf("Creating topic: %v, content type: %v\n", topicId, topicType)
243 if len(topicType) == 0 {
244 w.WriteHeader(http.StatusBadRequest)
245 fmt.Fprintf(w, "Type not specified")
249 tid, exist := topics[topicId]
251 if tid.contentType != topicType {
252 w.WriteHeader(http.StatusBadRequest)
253 fmt.Fprintf(w, "Topic type exist but type is different, queue content type: %v, requested content type: %v", tid.contentType, topicType)
256 writeOkRepsonse(w, http.StatusOK, "Topic exist")
260 t := newTopic(topicType)
262 a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
263 defer func() { a.Close() }()
265 w.WriteHeader(http.StatusInternalServerError)
266 fmt.Fprintf(w, "Cannot create client to bootstrap server: "+bootstrapserver+", error details: %v", err)
270 ctx, cancel := context.WithCancel(context.Background())
273 maxDur := 10 * time.Second
275 _, err = a.CreateTopics(
277 []kafka.TopicSpecification{{
280 ReplicationFactor: 1}},
281 kafka.SetAdminOperationTimeout(maxDur))
284 w.WriteHeader(http.StatusInternalServerError)
285 fmt.Fprintf(w, "Failed to create topic: %v, error details: %v", topicId, err)
289 w.WriteHeader(http.StatusCreated)
290 fmt.Fprintf(w, "Topic created")
294 // GET on /topic/<topic>
295 func getTopic(w http.ResponseWriter, req *http.Request) {
296 t, _, exist := getTopicFromRequest(w, req)
300 w.WriteHeader(http.StatusOK)
301 fmt.Fprintf(w, t.contentType)
304 // Get a topics counter value
305 // GET /topics/{topic}/counters/{counter}
306 func getTopicCounter(w http.ResponseWriter, req *http.Request) {
307 t, _, exist := getTopicFromRequest(w, req)
311 ctr := mux.Vars(req)["counter"]
314 if ctr == "received" {
315 ctrvalue = int(t.counters.received.get())
316 } else if ctr == "sent" {
317 ctrvalue = int(t.counters.sent.get())
321 w.WriteHeader(http.StatusBadRequest)
322 fmt.Fprintf(w, "Counter %v does not exist", ctr)
325 w.WriteHeader(http.StatusOK)
326 fmt.Fprintf(w, strconv.Itoa(ctrvalue))
330 func startToSend(w http.ResponseWriter, req *http.Request) {
331 t, topicId, exist := getTopicFromRequest(w, req)
332 fmt.Printf("Start to send to topic: %v\n", topicId)
337 if !t.messages.startSend() {
338 w.WriteHeader(http.StatusOK)
339 fmt.Fprintf(w, "Already started sending")
343 p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
345 fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
349 fmt.Printf("Closing producer for topic: %v\n", topicId)
359 err = p.Produce(&kafka.Message{
360 TopicPartition: kafka.TopicPartition{Topic: &topicId, Partition: kafka.PartitionAny},
361 Value: []byte(fmt.Sprintf("%v", m)),
365 t.counters.sent.step()
366 globalCounters.sent.step()
367 msg := fmt.Sprintf("%v", m)
369 fmt.Printf("Message sent on topic: %v, len: %v, msg: %v\n", topicId, len(msg), msg)
371 fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed\n", topicId, len(msg))
374 fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v\n", topicId, err)
378 time.Sleep(10 * time.Millisecond)
383 w.WriteHeader(http.StatusOK)
384 fmt.Fprintf(w, "Sending started")
387 func startToReceive(w http.ResponseWriter, req *http.Request) {
388 t, topicId, exist := getTopicFromRequest(w, req)
393 if !t.messages.startReceive() {
394 w.WriteHeader(http.StatusOK)
395 fmt.Fprintf(w, "Already started receiving")
401 defer func() { t.messages.stopReceive() }()
403 groudId := "kafkaprocon"
405 c, err := kafka.NewConsumer(&kafka.ConfigMap{
406 "bootstrap.servers": bootstrapserver,
408 "auto.offset.reset": "earliest",
409 "enable.auto.commit": true,
410 "auto.commit.interval.ms": 5000,
413 fmt.Printf("Cannot create consumer for topic: %v, error details: %v\n", topicId, err)
414 t.messages.stopReceive()
418 defer func() { c.Close() }()
420 que := t.messages.received
422 fmt.Printf("Cannot start receiving on topic: %v, queue does not exist\n", topicId)
425 fmt.Printf("Start subscribing on topic: %v\n", topicId)
426 err = c.SubscribeTopics([]string{topicId}, nil)
428 fmt.Printf("Cannot start subscribing on topic: %v, error details: %v\n", topicId, err)
431 maxDur := 1 * time.Second
433 msg, err := c.ReadMessage(maxDur)
435 if len(msg.Value) < 500 {
436 fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v\n", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
438 fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed\n", topicId, msg.TopicPartition, len(msg.Value))
440 err = t.messages.received.Enqueue(string(msg.Value))
442 w.WriteHeader(http.StatusInternalServerError)
443 fmt.Fprintf(w, "Received message topic: %v, cannot be put in queue, %v", topicId, err)
446 t.counters.received.step()
447 globalCounters.received.step()
449 fmt.Printf("Nothing to consume on topic: %v, reason: %v\n", topicId, err)
455 w.WriteHeader(http.StatusOK)
456 fmt.Fprintf(w, "Receiving started")
459 // Post a message to a topic
460 // POST /send content type is specified in content type
461 func sendToTopic(w http.ResponseWriter, req *http.Request) {
463 t, topicId, exist := getTopicFromRequest(w, req)
464 fmt.Printf("Send to topic: %v\n", topicId)
470 w.WriteHeader(http.StatusBadRequest)
471 fmt.Fprintf(w, "Sending not initiated on topic: %v", topicId)
474 ct := req.Header.Get("Content-Type")
475 if ct != t.contentType {
476 w.WriteHeader(http.StatusBadRequest)
477 fmt.Fprintf(w, "Message to send content type: %v on topic: %v does not match queue content type: %v", ct, topicId, t.contentType)
481 if ct == "application/json" {
482 // decoder := json.NewDecoder(req.Body)
484 // err := decoder.Decode(&j)
486 // w.WriteHeader(http.StatusBadRequest)
487 // w.Header().Set("Content-Type", "text/plain")
488 // fmt.Fprintf(w, "Json payload cannot be decoded, error details %v\n", err)
491 //m = mux.Vars(req)[""]
492 if err := req.ParseForm(); err != nil {
493 w.WriteHeader(http.StatusBadRequest)
494 fmt.Fprintf(w, "Json payload cannot be decoded on topic: %v, error details %v", topicId, err)
497 b, err := ioutil.ReadAll(req.Body)
500 fmt.Printf("Json payload to send on topic: %v, msg: %v\n", topicId, string(b))
502 fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
505 fmt.Printf("Json payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
507 err = q.Enqueue(string(b))
509 w.WriteHeader(http.StatusInternalServerError)
510 fmt.Fprintf(w, "Json message to send cannot be put in queue")
513 } else if ct == "text/plain" {
514 if err := req.ParseForm(); err != nil {
515 w.WriteHeader(http.StatusBadRequest)
516 fmt.Fprintf(w, "Text payload to send on topic: %v cannot be decoded, error details %v\n", topicId, err)
519 b, err := ioutil.ReadAll(req.Body)
522 fmt.Printf("Text payload to send on topic: %v, msg: %v\n", topicId, string(b))
524 fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
527 fmt.Printf("Text payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
529 err = q.Enqueue(string(b))
531 w.WriteHeader(http.StatusInternalServerError)
532 fmt.Fprintf(w, "Text message to send cannot be put in queue")
536 w.WriteHeader(http.StatusBadRequest)
537 fmt.Fprintf(w, "Message to send, unknown content type %v", ct)
541 w.WriteHeader(http.StatusOK)
542 w.Header().Set("Content-Type", "text/plain")
543 fmt.Fprintf(w, "Message to send put in queue")
546 // Get zero or one message from a topic
548 func receiveFromTopic(w http.ResponseWriter, req *http.Request) {
549 t, topicId, exist := getTopicFromRequest(w, req)
553 if t.messages.received == nil {
554 w.WriteHeader(http.StatusBadRequest)
555 fmt.Fprintf(w, "Receiving not initiated on topic %v", topicId)
559 m, err := t.messages.received.Dequeue()
561 w.WriteHeader(http.StatusNoContent)
565 w.Header().Set("Content-Type", t.contentType)
566 w.WriteHeader(http.StatusOK)
567 fmt.Fprintf(w, "%v", m)
570 // Remove the send queue to stop sending
571 func stopToSend(w http.ResponseWriter, req *http.Request) {
572 fmt.Printf("Stop sending\n")
573 t, _, exist := getTopicFromRequest(w, req)
577 t.messages.stopSend()
578 w.WriteHeader(http.StatusNoContent)
581 // Remove the receive queue to stop receiving
582 func stopToReceive(w http.ResponseWriter, req *http.Request) {
583 fmt.Printf("Stop receiving\n")
584 t, _, exist := getTopicFromRequest(w, req)
588 t.messages.stopReceive()
589 w.WriteHeader(http.StatusNoContent)
592 func HelloServer(w http.ResponseWriter, r *http.Request) {
593 fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
602 r.HandleFunc("/", healthCheck).Methods("GET")
603 r.HandleFunc("/reset", allreset).Methods("POST")
604 r.HandleFunc("/counters/{counter}", getCounter).Methods("GET")
605 r.HandleFunc("/topics", getTopics).Methods("GET")
606 r.HandleFunc("/topics/{topic}/counters/{counter}", getTopicCounter).Methods("GET")
607 r.HandleFunc("/topics/{topic}", createTopic).Methods("PUT").Queries("type", "{type}")
608 r.HandleFunc("/topics/{topic}", getTopic).Methods("GET")
609 r.HandleFunc("/topics/{topic}/startsend", startToSend).Methods("POST")
610 r.HandleFunc("/topics/{topic}/startreceive", startToReceive).Methods("POST")
611 r.HandleFunc("/topics/{topic}/stopsend", stopToSend).Methods("POST")
612 r.HandleFunc("/topics/{topic}/stopreceive", stopToReceive).Methods("POST")
613 r.HandleFunc("/topics/{topic}/msg", sendToTopic).Methods("POST")
614 r.HandleFunc("/topics/{topic}/msg", receiveFromTopic).Methods("GET")
620 WriteTimeout: 15 * time.Second,
621 ReadTimeout: 15 * time.Second,
623 fmt.Println("Running on port: " + port)
624 fmt.Printf(srv.ListenAndServe().Error())