1 // Writing a basic HTTP server is easy using the
16 "github.com/confluentinc/confluent-kafka-go/kafka"
17 "github.com/enriquebris/goconcurrentqueue"
18 "github.com/gorilla/mux"
21 // Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface
22 // globalCounters var holding the "global counters"
23 // recieved number of received messages from all topics (int)
24 // sent number of sent messages to all topics (int)
25 // topics var holding all topic related info
26 // <topic-name> name of a topic (present after topic is created)
27 // content-type data type of the topic (string)
29 // recieved number of received messages from the topic (int)
30 // sent number of sent messages to the topic (int)
32 // send messages waiting to be sent (set when sending is started) (fifo)
33 // received received messages waiting to be fetched (set when reception is started) (fifo)
39 func (c *counter) step() {
40 atomic.AddUint64(&c.c, 1)
43 func (c counter) get() uint64 {
44 return atomic.LoadUint64(&c.c)
47 type counters struct {
52 func newCounters() counters {
59 type messages struct {
60 send *goconcurrentqueue.FIFO
61 received *goconcurrentqueue.FIFO
64 func (m *messages) startSend() bool {
66 m.send = goconcurrentqueue.NewFIFO()
72 func (m *messages) stopSend() {
76 func (m *messages) addToSend(msg string) error {
78 return fmt.Errorf("sending not started")
82 return m.send.Enqueue(msg)
85 func (m *messages) getToSend() (interface{}, error) {
87 return "", fmt.Errorf("sending not started")
91 return m.send.Dequeue()
94 func (m *messages) startReceive() bool {
95 if m.received == nil {
96 m.received = goconcurrentqueue.NewFIFO()
102 func (m *messages) stopReceive() {
112 func newTopic(ct string) *topic {
115 counters: counters{},
116 messages: messages{},
120 var globalCounters counters
121 var topics map[string]*topic = make(map[string]*topic)
123 var bootstrapserver = ""
126 bootstrapserver = os.Getenv("KAFKA_BOOTSTRAP_SERVER")
127 if len(bootstrapserver) == 0 {
128 fmt.Println("Fatal error: env var KAFKA_BOOTSTRAP_SERVER not set")
129 fmt.Println("Exiting... ")
132 fmt.Println("Using KAFKA_BOOTSTRAP_SERVER=" + bootstrapserver)
135 //Helper function to get a created topic, if it exists
136 func getTopicFromRequest(w http.ResponseWriter, req *http.Request) (*topic, string, bool) {
137 topicId := mux.Vars(req)["topic"]
138 t, exist := topics[topicId]
140 w.WriteHeader(http.StatusNotFound)
141 fmt.Fprintf(w, "Topic %v does not exist", topicId)
142 return nil, "", false
144 return t, topicId, true
149 func healthCheck(w http.ResponseWriter, req *http.Request) {
153 // Deep reset of this interface stub - no removal of msgs or topics in kafka
155 func allreset(w http.ResponseWriter, req *http.Request) {
156 for _, v := range topics {
157 v.messages.stopSend()
158 v.messages.stopReceive()
160 time.Sleep(5 * time.Second) //Allow producers/consumers to shut down
161 globalCounters = newCounters()
162 topics = make(map[string]*topic)
166 // Get topics, return json array of strings of topics created by this interface stub
167 // Returns json array of strings, array is empty if no topics exist
170 func getTopics(w http.ResponseWriter, req *http.Request) {
171 topicKeys := make([]string, 0, len(topics))
172 fmt.Printf("len topics: %v\n", len(topics))
173 for k := range topics {
174 topicKeys = append(topicKeys, k)
176 var j, err = json.Marshal(topicKeys)
178 w.WriteHeader(http.StatusInternalServerError)
179 fmt.Fprintf(w, "Cannot convert list of topics to json, error details: %v", err)
182 w.Header().Set("Content-Type", "application/json")
183 w.WriteHeader(http.StatusOK)
188 func writeOkRepsonse(w http.ResponseWriter, httpStatus int, msg string) {
189 w.WriteHeader(httpStatus)
190 w.Header().Set("Content-Type", "text/plain")
194 // Get a counter value
195 // GET /topics/counters/{counter}
196 func getCounter(w http.ResponseWriter, req *http.Request) {
197 ctr := mux.Vars(req)["counter"]
199 if ctr == "received" {
200 ctrvalue = int(globalCounters.received.get())
201 } else if ctr == "sent" {
202 ctrvalue = int(globalCounters.sent.get())
206 w.WriteHeader(http.StatusBadRequest)
207 fmt.Fprintf(w, "Counter %v does not exist", ctr)
210 writeOkRepsonse(w, http.StatusOK, strconv.Itoa(ctrvalue))
216 // PUT on /topics/<topic>?type=<type> type shall be 'application/json' or 'text/plain'
217 func createTopic(w http.ResponseWriter, req *http.Request) {
218 topicId := mux.Vars(req)["topic"]
219 topicType := mux.Vars(req)["type"]
221 fmt.Printf("Creating topic: %v, content type: %v\n", topicId, topicType)
223 if len(topicType) == 0 {
224 w.WriteHeader(http.StatusBadRequest)
225 fmt.Fprintf(w, "Type not specified")
229 tid, exist := topics[topicId]
231 if tid.contentType != topicType {
232 w.WriteHeader(http.StatusBadRequest)
233 fmt.Fprintf(w, "Topic type exist but type is different, queue content type: %v, requested content type: %v", tid.contentType, topicType)
236 writeOkRepsonse(w, http.StatusOK, "Topic exist")
240 t := newTopic(topicType)
242 a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
243 defer func() { a.Close() }()
245 w.WriteHeader(http.StatusInternalServerError)
246 fmt.Fprintf(w, "Cannot create client to bootstrap server: "+bootstrapserver+", error details: %v", err)
250 ctx, cancel := context.WithCancel(context.Background())
253 maxDur := 10 * time.Second
255 _, err = a.CreateTopics(
257 []kafka.TopicSpecification{{
260 ReplicationFactor: 1}},
261 kafka.SetAdminOperationTimeout(maxDur))
264 w.WriteHeader(http.StatusInternalServerError)
265 fmt.Fprintf(w, "Failed to create topic: %v, error details: %v", topicId, err)
269 w.WriteHeader(http.StatusCreated)
270 fmt.Fprintf(w, "Topic created")
274 // GET on /topic/<topic>
275 func getTopic(w http.ResponseWriter, req *http.Request) {
276 t, _, exist := getTopicFromRequest(w, req)
280 w.WriteHeader(http.StatusOK)
281 fmt.Fprintf(w, t.contentType)
284 // Get a topics counter value
285 // GET /topics/{topic}/counters/{counter}
286 func getTopicCounter(w http.ResponseWriter, req *http.Request) {
287 t, _, exist := getTopicFromRequest(w, req)
291 ctr := mux.Vars(req)["counter"]
294 if ctr == "received" {
295 ctrvalue = int(t.counters.received.get())
296 } else if ctr == "sent" {
297 ctrvalue = int(t.counters.sent.get())
301 w.WriteHeader(http.StatusBadRequest)
302 fmt.Fprintf(w, "Counter %v does not exist", ctr)
305 w.WriteHeader(http.StatusOK)
306 fmt.Fprintf(w, strconv.Itoa(ctrvalue))
310 func startToSend(w http.ResponseWriter, req *http.Request) {
311 t, topicId, exist := getTopicFromRequest(w, req)
312 fmt.Printf("Start to send to topic: %v\n", topicId)
317 if !t.messages.startSend() {
318 w.WriteHeader(http.StatusOK)
319 fmt.Fprintf(w, "Already started sending")
323 p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
325 fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
329 fmt.Printf("Closing producer for topic: %v\n", topicId)
339 err = p.Produce(&kafka.Message{
340 TopicPartition: kafka.TopicPartition{Topic: &topicId, Partition: kafka.PartitionAny},
341 Value: []byte(fmt.Sprintf("%v", m)),
345 t.counters.sent.step()
346 globalCounters.sent.step()
347 msg := fmt.Sprintf("%v", m)
349 fmt.Printf("Message sent on topic: %v, len: %v, msg: %v\n", topicId, len(msg), msg)
351 fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed\n", topicId, len(msg))
354 fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v\n", topicId, err)
358 time.Sleep(10 * time.Millisecond)
363 w.WriteHeader(http.StatusOK)
364 fmt.Fprintf(w, "Sending started")
367 func startToReceive(w http.ResponseWriter, req *http.Request) {
368 t, topicId, exist := getTopicFromRequest(w, req)
373 if !t.messages.startReceive() {
374 w.WriteHeader(http.StatusOK)
375 fmt.Fprintf(w, "Already started receiving")
381 defer func() { t.messages.stopReceive() }()
383 groudId := "kafkaprocon"
385 c, err := kafka.NewConsumer(&kafka.ConfigMap{
386 "bootstrap.servers": bootstrapserver,
388 "auto.offset.reset": "earliest",
389 "enable.auto.commit": true,
390 "auto.commit.interval.ms": 5000,
393 fmt.Printf("Cannot create consumer for topic: %v, error details: %v\n", topicId, err)
394 t.messages.stopReceive()
398 defer func() { c.Close() }()
400 que := t.messages.received
402 fmt.Printf("Cannot start receiving on topic: %v, queue does not exist\n", topicId)
405 fmt.Printf("Start subscribing on topic: %v\n", topicId)
406 err = c.SubscribeTopics([]string{topicId}, nil)
408 fmt.Printf("Cannot start subscribing on topic: %v, error details: %v\n", topicId, err)
411 maxDur := 1 * time.Second
413 msg, err := c.ReadMessage(maxDur)
415 if len(msg.Value) < 500 {
416 fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v\n", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
418 fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed\n", topicId, msg.TopicPartition, len(msg.Value))
420 err = t.messages.received.Enqueue(string(msg.Value))
422 w.WriteHeader(http.StatusInternalServerError)
423 fmt.Fprintf(w, "Received message topic: %v, cannot be put in queue, %v", topicId, err)
426 t.counters.received.step()
427 globalCounters.received.step()
429 fmt.Printf("Nothing to consume on topic: %v, reason: %v\n", topicId, err)
435 w.WriteHeader(http.StatusOK)
436 fmt.Fprintf(w, "Receiving started")
439 // Post a message to a topic
440 // POST /send content type is specified in content type
441 func sendToTopic(w http.ResponseWriter, req *http.Request) {
443 t, topicId, exist := getTopicFromRequest(w, req)
444 fmt.Printf("Send to topic: %v\n", topicId)
450 w.WriteHeader(http.StatusBadRequest)
451 fmt.Fprintf(w, "Sending not initiated on topic: %v", topicId)
454 ct := req.Header.Get("Content-Type")
455 if ct != t.contentType {
456 w.WriteHeader(http.StatusBadRequest)
457 fmt.Fprintf(w, "Message to send content type: %v on topic: %v does not match queue content type: %v", ct, topicId, t.contentType)
461 if ct == "application/json" {
462 // decoder := json.NewDecoder(req.Body)
464 // err := decoder.Decode(&j)
466 // w.WriteHeader(http.StatusBadRequest)
467 // w.Header().Set("Content-Type", "text/plain")
468 // fmt.Fprintf(w, "Json payload cannot be decoded, error details %v\n", err)
471 //m = mux.Vars(req)[""]
472 if err := req.ParseForm(); err != nil {
473 w.WriteHeader(http.StatusBadRequest)
474 fmt.Fprintf(w, "Json payload cannot be decoded on topic: %v, error details %v", topicId, err)
477 b, err := ioutil.ReadAll(req.Body)
480 fmt.Printf("Json payload to send on topic: %v, msg: %v\n", topicId, string(b))
482 fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
485 fmt.Printf("Json payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
487 err = q.Enqueue(string(b))
489 w.WriteHeader(http.StatusInternalServerError)
490 fmt.Fprintf(w, "Json message to send cannot be put in queue")
493 } else if ct == "text/plain" {
494 if err := req.ParseForm(); err != nil {
495 w.WriteHeader(http.StatusBadRequest)
496 fmt.Fprintf(w, "Text payload to send on topic: %v cannot be decoded, error details %v\n", topicId, err)
499 b, err := ioutil.ReadAll(req.Body)
502 fmt.Printf("Text payload to send on topic: %v, msg: %v\n", topicId, string(b))
504 fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
507 fmt.Printf("Text payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
509 err = q.Enqueue(string(b))
511 w.WriteHeader(http.StatusInternalServerError)
512 fmt.Fprintf(w, "Text message to send cannot be put in queue")
516 w.WriteHeader(http.StatusBadRequest)
517 fmt.Fprintf(w, "Message to send, unknown content type %v", ct)
521 w.WriteHeader(http.StatusOK)
522 w.Header().Set("Content-Type", "text/plain")
523 fmt.Fprintf(w, "Message to send put in queue")
526 // Get zero or one message from a topic
528 func receiveFromTopic(w http.ResponseWriter, req *http.Request) {
529 t, topicId, exist := getTopicFromRequest(w, req)
533 if t.messages.received == nil {
534 w.WriteHeader(http.StatusBadRequest)
535 fmt.Fprintf(w, "Receiving not initiated on topic %v", topicId)
539 m, err := t.messages.received.Dequeue()
541 w.WriteHeader(http.StatusNoContent)
545 w.Header().Set("Content-Type", t.contentType)
546 w.WriteHeader(http.StatusOK)
547 fmt.Fprintf(w, "%v", m)
550 // Remove the send queue to stop sending
551 func stopToSend(w http.ResponseWriter, req *http.Request) {
552 fmt.Printf("Stop sending\n")
553 t, _, exist := getTopicFromRequest(w, req)
557 t.messages.stopSend()
558 w.WriteHeader(http.StatusNoContent)
561 // Remove the receive queue to stop receiving
562 func stopToReceive(w http.ResponseWriter, req *http.Request) {
563 fmt.Printf("Stop receiving\n")
564 t, _, exist := getTopicFromRequest(w, req)
568 t.messages.stopReceive()
569 w.WriteHeader(http.StatusNoContent)
572 func HelloServer(w http.ResponseWriter, r *http.Request) {
573 fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
582 r.HandleFunc("/", healthCheck).Methods("GET")
583 r.HandleFunc("/reset", allreset).Methods("POST")
584 r.HandleFunc("/counters/{counter}", getCounter).Methods("GET")
585 r.HandleFunc("/topics", getTopics).Methods("GET")
586 r.HandleFunc("/topics/{topic}/counters/{counter}", getTopicCounter).Methods("GET")
587 r.HandleFunc("/topics/{topic}", createTopic).Methods("PUT").Queries("type", "{type}")
588 r.HandleFunc("/topics/{topic}", getTopic).Methods("GET")
589 r.HandleFunc("/topics/{topic}/startsend", startToSend).Methods("POST")
590 r.HandleFunc("/topics/{topic}/startreceive", startToReceive).Methods("POST")
591 r.HandleFunc("/topics/{topic}/stopsend", stopToSend).Methods("POST")
592 r.HandleFunc("/topics/{topic}/stopreceive", stopToReceive).Methods("POST")
593 r.HandleFunc("/topics/{topic}/msg", sendToTopic).Methods("POST")
594 r.HandleFunc("/topics/{topic}/msg", receiveFromTopic).Methods("GET")
600 WriteTimeout: 15 * time.Second,
601 ReadTimeout: 15 * time.Second,
603 fmt.Println("Running on port: " + port)
604 fmt.Printf(srv.ListenAndServe().Error())