43be1b5ab1f09c04933b1ff84e976de8df9be71b
[nonrtric.git] / test / kafka-procon / main.go
1 // Writing a basic HTTP server is easy using the
2 // `net/http` package.
3 package main
4
5 import (
6         "context"
7         "encoding/json"
8         "fmt"
9         "io/ioutil"
10         "net/http"
11         "os"
12         "strconv"
13         "sync/atomic"
14         "time"
15
16         "github.com/confluentinc/confluent-kafka-go/kafka"
17         "github.com/enriquebris/goconcurrentqueue"
18         "github.com/gorilla/mux"
19 )
20
21 // Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface
22 //    globalCounters      var holding the "global counters"
23 //      recieved          number of received messages from all topics                             (int)
24 //      sent              number of sent messages to all topics                                   (int)
25 //    topics              var holding all topic related info
26 //      <topic-name>      name of a topic (present after topic is created)
27 //        content-type    data type of the topic                                                  (string)
28 //        counters
29 //          recieved      number of received messages from the topic                              (int)
30 //          sent          number of sent messages to the topic                                    (int)
31 //        messages
32 //          send          messages waiting to be sent (set when sending is started)               (fifo)
33 //          received      received messages waiting to be fetched (set when reception is started) (fifo)
34
35 type counter struct {
36         c uint64
37 }
38
39 func (c *counter) step() {
40         atomic.AddUint64(&c.c, 1)
41 }
42
43 func (c counter) get() uint64 {
44         return atomic.LoadUint64(&c.c)
45 }
46
47 type counters struct {
48         received counter
49         sent     counter
50 }
51
52 func newCounters() counters {
53         return counters{
54                 received: counter{},
55                 sent:     counter{},
56         }
57 }
58
59 type messages struct {
60         send     *goconcurrentqueue.FIFO
61         received *goconcurrentqueue.FIFO
62 }
63
64 func (m *messages) startSend() bool {
65         if m.send == nil {
66                 m.send = goconcurrentqueue.NewFIFO()
67                 return true
68         }
69         return false
70 }
71
72 func (m *messages) stopSend() {
73         m.send = nil
74 }
75
76 func (m *messages) addToSend(msg string) error {
77         if m.send == nil {
78                 return fmt.Errorf("sending not started")
79         }
80         m.send.Lock()
81         defer m.send.Unlock()
82         return m.send.Enqueue(msg)
83 }
84
85 func (m *messages) getToSend() (interface{}, error) {
86         if m.send == nil {
87                 return "", fmt.Errorf("sending not started")
88         }
89         m.send.Lock()
90         defer m.send.Unlock()
91         return m.send.Dequeue()
92 }
93
94 func (m *messages) startReceive() bool {
95         if m.received == nil {
96                 m.received = goconcurrentqueue.NewFIFO()
97                 return true
98         }
99         return false
100 }
101
102 func (m *messages) stopReceive() {
103         m.send = nil
104 }
105
106 type topic struct {
107         contentType string
108         counters    counters
109         messages    messages
110 }
111
112 func newTopic(ct string) *topic {
113         return &topic{
114                 contentType: ct,
115                 counters:    counters{},
116                 messages:    messages{},
117         }
118 }
119
120 var globalCounters counters
121 var topics map[string]*topic = make(map[string]*topic)
122
123 var bootstrapserver = ""
124
125 func initApp() {
126         bootstrapserver = os.Getenv("KAFKA_BOOTSTRAP_SERVER")
127         if len(bootstrapserver) == 0 {
128                 fmt.Println("Fatal error: env var KAFKA_BOOTSTRAP_SERVER not set")
129                 fmt.Println("Exiting... ")
130                 os.Exit(1)
131         }
132         fmt.Println("Using KAFKA_BOOTSTRAP_SERVER=" + bootstrapserver)
133 }
134
135 //Helper function to get a created topic, if it exists
136 func getTopicFromRequest(w http.ResponseWriter, req *http.Request) (*topic, string, bool) {
137         topicId := mux.Vars(req)["topic"]
138         t, exist := topics[topicId]
139         if exist == false {
140                 w.WriteHeader(http.StatusNotFound)
141                 fmt.Fprintf(w, "Topic %v does not exist", topicId)
142                 return nil, "", false
143         }
144         return t, topicId, true
145 }
146
147 // Alive check
148 // GET on /
149 func healthCheck(w http.ResponseWriter, req *http.Request) {
150         fmt.Fprintf(w, "OK")
151 }
152
153 // Deep reset of this interface stub - no removal of msgs or topics in kafka
154 // POST on /reset
155 func allreset(w http.ResponseWriter, req *http.Request) {
156         for _, v := range topics {
157                 v.messages.stopSend()
158                 v.messages.stopReceive()
159         }
160         time.Sleep(5 * time.Second) //Allow producers/consumers to shut down
161         globalCounters = newCounters()
162         topics = make(map[string]*topic)
163         fmt.Fprintf(w, "OK")
164 }
165
166 // Get topics, return json array of strings of topics created by this interface stub
167 // Returns json array of strings, array is empty if no topics exist
168 // GET on /topics
169
170 func getTopics(w http.ResponseWriter, req *http.Request) {
171         topicKeys := make([]string, 0, len(topics))
172         fmt.Printf("len topics: %v\n", len(topics))
173         for k := range topics {
174                 topicKeys = append(topicKeys, k)
175         }
176         var j, err = json.Marshal(topicKeys)
177         if err != nil {
178                 w.WriteHeader(http.StatusInternalServerError)
179                 fmt.Fprintf(w, "Cannot convert list of topics to json, error details: %v", err)
180                 return
181         } else {
182                 w.Header().Set("Content-Type", "application/json")
183                 w.WriteHeader(http.StatusOK)
184                 w.Write(j)
185         }
186 }
187
188 func writeOkRepsonse(w http.ResponseWriter, httpStatus int, msg string) {
189         w.WriteHeader(httpStatus)
190         w.Header().Set("Content-Type", "text/plain")
191         fmt.Fprintf(w, msg)
192 }
193
194 // Get a counter value
195 // GET /topics/counters/{counter}
196 func getCounter(w http.ResponseWriter, req *http.Request) {
197         ctr := mux.Vars(req)["counter"]
198         var ctrvalue = -1
199         if ctr == "received" {
200                 ctrvalue = int(globalCounters.received.get())
201         } else if ctr == "sent" {
202                 ctrvalue = int(globalCounters.sent.get())
203         }
204
205         if ctrvalue == -1 {
206                 w.WriteHeader(http.StatusBadRequest)
207                 fmt.Fprintf(w, "Counter %v does not exist", ctr)
208                 return
209         }
210         writeOkRepsonse(w, http.StatusOK, strconv.Itoa(ctrvalue))
211         return
212
213 }
214
215 // Create a topic
216 // PUT on /topics/<topic>?type=<type>    type shall be 'application/json' or 'text/plain'
217 func createTopic(w http.ResponseWriter, req *http.Request) {
218         topicId := mux.Vars(req)["topic"]
219         topicType := mux.Vars(req)["type"]
220
221         fmt.Printf("Creating topic: %v, content type: %v\n", topicId, topicType)
222
223         if len(topicType) == 0 {
224                 w.WriteHeader(http.StatusBadRequest)
225                 fmt.Fprintf(w, "Type not specified")
226                 return
227         }
228
229         tid, exist := topics[topicId]
230         if exist == true {
231                 if tid.contentType != topicType {
232                         w.WriteHeader(http.StatusBadRequest)
233                         fmt.Fprintf(w, "Topic type exist but type is different, queue content type: %v, requested content type: %v", tid.contentType, topicType)
234                         return
235                 }
236                 writeOkRepsonse(w, http.StatusOK, "Topic exist")
237                 return
238         }
239
240         t := newTopic(topicType)
241
242         a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
243         defer func() { a.Close() }()
244         if err != nil {
245                 w.WriteHeader(http.StatusInternalServerError)
246                 fmt.Fprintf(w, "Cannot create client to bootstrap server: "+bootstrapserver+", error details: %v", err)
247                 return
248         }
249
250         ctx, cancel := context.WithCancel(context.Background())
251         defer cancel()
252
253         maxDur := 10 * time.Second
254
255         _, err = a.CreateTopics(
256                 ctx,
257                 []kafka.TopicSpecification{{
258                         Topic:             topicId,
259                         NumPartitions:     1,
260                         ReplicationFactor: 1}},
261                 kafka.SetAdminOperationTimeout(maxDur))
262
263         if err != nil {
264                 w.WriteHeader(http.StatusInternalServerError)
265                 fmt.Fprintf(w, "Failed to create topic: %v, error details: %v", topicId, err)
266                 return
267         }
268         topics[topicId] = t
269         w.WriteHeader(http.StatusCreated)
270         fmt.Fprintf(w, "Topic created")
271 }
272
273 // Get topic type
274 // GET on /topic/<topic>
275 func getTopic(w http.ResponseWriter, req *http.Request) {
276         t, _, exist := getTopicFromRequest(w, req)
277         if !exist {
278                 return
279         }
280         w.WriteHeader(http.StatusOK)
281         fmt.Fprintf(w, t.contentType)
282 }
283
284 // Get a topics counter value
285 // GET /topics/{topic}/counters/{counter}
286 func getTopicCounter(w http.ResponseWriter, req *http.Request) {
287         t, _, exist := getTopicFromRequest(w, req)
288         if !exist {
289                 return
290         }
291         ctr := mux.Vars(req)["counter"]
292
293         var ctrvalue = -1
294         if ctr == "received" {
295                 ctrvalue = int(t.counters.received.get())
296         } else if ctr == "sent" {
297                 ctrvalue = int(t.counters.sent.get())
298         }
299
300         if ctrvalue == -1 {
301                 w.WriteHeader(http.StatusBadRequest)
302                 fmt.Fprintf(w, "Counter %v does not exist", ctr)
303                 return
304         }
305         w.WriteHeader(http.StatusOK)
306         fmt.Fprintf(w, strconv.Itoa(ctrvalue))
307         return
308 }
309
310 func startToSend(w http.ResponseWriter, req *http.Request) {
311         t, topicId, exist := getTopicFromRequest(w, req)
312         fmt.Printf("Start to send to topic: %v\n", topicId)
313         if !exist {
314                 return
315         }
316
317         if !t.messages.startSend() {
318                 w.WriteHeader(http.StatusOK)
319                 fmt.Fprintf(w, "Already started sending")
320                 return
321         }
322         go func() {
323                 p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
324                 if err != nil {
325                         fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
326                         return
327                 }
328                 defer func() {
329                         fmt.Printf("Closing producer for topic: %v\n", topicId)
330                         p.Close()
331                 }()
332                 for {
333                         q := t.messages.send
334                         if q == nil {
335                                 return
336                         }
337                         m, err := q.Get(0)
338                         if err == nil {
339                                 err = p.Produce(&kafka.Message{
340                                         TopicPartition: kafka.TopicPartition{Topic: &topicId, Partition: kafka.PartitionAny},
341                                         Value:          []byte(fmt.Sprintf("%v", m)),
342                                 }, nil)
343                                 if err == nil {
344                                         q.Remove(0)
345                                         t.counters.sent.step()
346                                         globalCounters.sent.step()
347                                         msg := fmt.Sprintf("%v", m)
348                                         if len(msg) < 500 {
349                                                 fmt.Printf("Message sent on topic: %v, len: %v, msg: %v\n", topicId, len(msg), msg)
350                                         } else {
351                                                 fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed\n", topicId, len(msg))
352                                         }
353                                 } else {
354                                         fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v\n", topicId, err)
355                                         q.Remove(0)
356                                 }
357                         } else {
358                                 time.Sleep(10 * time.Millisecond)
359                         }
360                 }
361         }()
362
363         w.WriteHeader(http.StatusOK)
364         fmt.Fprintf(w, "Sending started")
365 }
366
367 func startToReceive(w http.ResponseWriter, req *http.Request) {
368         t, topicId, exist := getTopicFromRequest(w, req)
369         if !exist {
370                 return
371         }
372
373         if !t.messages.startReceive() {
374                 w.WriteHeader(http.StatusOK)
375                 fmt.Fprintf(w, "Already started receiving")
376                 return
377         }
378
379         go func() {
380
381                 defer func() { t.messages.stopReceive() }()
382
383                 groudId := "kafkaprocon"
384
385                 c, err := kafka.NewConsumer(&kafka.ConfigMap{
386                         "bootstrap.servers":       bootstrapserver,
387                         "group.id":                groudId,
388                         "auto.offset.reset":       "earliest",
389                         "enable.auto.commit":      true,
390                         "auto.commit.interval.ms": 5000,
391                 })
392                 if err != nil {
393                         fmt.Printf("Cannot create consumer for topic: %v, error details: %v\n", topicId, err)
394                         t.messages.stopReceive()
395                         return
396                 }
397                 c.Commit()
398                 defer func() { c.Close() }()
399                 for {
400                         que := t.messages.received
401                         if que == nil {
402                                 fmt.Printf("Cannot start receiving on topic: %v, queue does not exist\n", topicId)
403                                 return
404                         }
405                         fmt.Printf("Start subscribing on topic: %v\n", topicId)
406                         err = c.SubscribeTopics([]string{topicId}, nil)
407                         if err != nil {
408                                 fmt.Printf("Cannot start subscribing on topic: %v, error details: %v\n", topicId, err)
409                                 return
410                         }
411                         maxDur := 1 * time.Second
412                         for {
413                                 msg, err := c.ReadMessage(maxDur)
414                                 if err == nil {
415                                         if len(msg.Value) < 500 {
416                                                 fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v\n", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
417                                         } else {
418                                                 fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed\n", topicId, msg.TopicPartition, len(msg.Value))
419                                         }
420                                         err = t.messages.received.Enqueue(string(msg.Value))
421                                         if err != nil {
422                                                 w.WriteHeader(http.StatusInternalServerError)
423                                                 fmt.Fprintf(w, "Received message topic: %v, cannot be put in queue, %v", topicId, err)
424                                                 return
425                                         }
426                                         t.counters.received.step()
427                                         globalCounters.received.step()
428                                 } else {
429                                         fmt.Printf("Nothing to consume on topic: %v, reason: %v\n", topicId, err)
430                                 }
431                         }
432                 }
433         }()
434
435         w.WriteHeader(http.StatusOK)
436         fmt.Fprintf(w, "Receiving started")
437 }
438
439 // Post a message to a topic
440 // POST /send    content type is specified in content type
441 func sendToTopic(w http.ResponseWriter, req *http.Request) {
442
443         t, topicId, exist := getTopicFromRequest(w, req)
444         fmt.Printf("Send to topic: %v\n", topicId)
445         if !exist {
446                 return
447         }
448         q := t.messages.send
449         if q == nil {
450                 w.WriteHeader(http.StatusBadRequest)
451                 fmt.Fprintf(w, "Sending not initiated on topic: %v", topicId)
452                 return
453         }
454         ct := req.Header.Get("Content-Type")
455         if ct != t.contentType {
456                 w.WriteHeader(http.StatusBadRequest)
457                 fmt.Fprintf(w, "Message to send content type: %v on topic: %v does not match queue content type: %v", ct, topicId, t.contentType)
458                 return
459         }
460
461         if ct == "application/json" {
462                 // decoder := json.NewDecoder(req.Body)
463                 // var j :=
464                 // err := decoder.Decode(&j)
465                 // if err != nil {
466                 //      w.WriteHeader(http.StatusBadRequest)
467                 //      w.Header().Set("Content-Type", "text/plain")
468                 //      fmt.Fprintf(w, "Json payload cannot be decoded, error details %v\n", err)
469                 //      return
470                 // }
471                 //m = mux.Vars(req)[""]
472                 if err := req.ParseForm(); err != nil {
473                         w.WriteHeader(http.StatusBadRequest)
474                         fmt.Fprintf(w, "Json payload cannot be decoded on topic: %v, error details %v", topicId, err)
475                         return
476                 }
477                 b, err := ioutil.ReadAll(req.Body)
478                 if err == nil {
479                         if len(b) < 500 {
480                                 fmt.Printf("Json payload to send on topic: %v, msg: %v\n", topicId, string(b))
481                         } else {
482                                 fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
483                         }
484                 } else {
485                         fmt.Printf("Json payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
486                 }
487                 err = q.Enqueue(string(b))
488                 if err != nil {
489                         w.WriteHeader(http.StatusInternalServerError)
490                         fmt.Fprintf(w, "Json message to send cannot be put in queue")
491                         return
492                 }
493         } else if ct == "text/plain" {
494                 if err := req.ParseForm(); err != nil {
495                         w.WriteHeader(http.StatusBadRequest)
496                         fmt.Fprintf(w, "Text payload to send on topic: %v cannot be decoded, error details %v\n", topicId, err)
497                         return
498                 }
499                 b, err := ioutil.ReadAll(req.Body)
500                 if err == nil {
501                         if len(b) < 500 {
502                                 fmt.Printf("Text payload to send on topic: %v, msg: %v\n", topicId, string(b))
503                         } else {
504                                 fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
505                         }
506                 } else {
507                         fmt.Printf("Text payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
508                 }
509                 err = q.Enqueue(string(b))
510                 if err != nil {
511                         w.WriteHeader(http.StatusInternalServerError)
512                         fmt.Fprintf(w, "Text message to send cannot be put in queue")
513                         return
514                 }
515         } else {
516                 w.WriteHeader(http.StatusBadRequest)
517                 fmt.Fprintf(w, "Message to send, unknown content type %v", ct)
518                 return
519         }
520
521         w.WriteHeader(http.StatusOK)
522         w.Header().Set("Content-Type", "text/plain")
523         fmt.Fprintf(w, "Message to send put in queue")
524 }
525
526 // Get zero or one message from a topic
527 // GET /receive
528 func receiveFromTopic(w http.ResponseWriter, req *http.Request) {
529         t, topicId, exist := getTopicFromRequest(w, req)
530         if !exist {
531                 return
532         }
533         if t.messages.received == nil {
534                 w.WriteHeader(http.StatusBadRequest)
535                 fmt.Fprintf(w, "Receiving not initiated on topic %v", topicId)
536                 return
537         }
538
539         m, err := t.messages.received.Dequeue()
540         if err != nil {
541                 w.WriteHeader(http.StatusNoContent)
542                 return
543         }
544
545         w.Header().Set("Content-Type", t.contentType)
546         w.WriteHeader(http.StatusOK)
547         fmt.Fprintf(w, "%v", m)
548 }
549
550 // Remove the send queue to stop sending
551 func stopToSend(w http.ResponseWriter, req *http.Request) {
552         fmt.Printf("Stop sending\n")
553         t, _, exist := getTopicFromRequest(w, req)
554         if !exist {
555                 return
556         }
557         t.messages.stopSend()
558         w.WriteHeader(http.StatusNoContent)
559 }
560
561 // Remove the receive queue to stop receiving
562 func stopToReceive(w http.ResponseWriter, req *http.Request) {
563         fmt.Printf("Stop receiving\n")
564         t, _, exist := getTopicFromRequest(w, req)
565         if !exist {
566                 return
567         }
568         t.messages.stopReceive()
569         w.WriteHeader(http.StatusNoContent)
570 }
571
572 func HelloServer(w http.ResponseWriter, r *http.Request) {
573         fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
574 }
575
576 func main() {
577
578         initApp()
579
580         r := mux.NewRouter()
581
582         r.HandleFunc("/", healthCheck).Methods("GET")
583         r.HandleFunc("/reset", allreset).Methods("POST")
584         r.HandleFunc("/counters/{counter}", getCounter).Methods("GET")
585         r.HandleFunc("/topics", getTopics).Methods("GET")
586         r.HandleFunc("/topics/{topic}/counters/{counter}", getTopicCounter).Methods("GET")
587         r.HandleFunc("/topics/{topic}", createTopic).Methods("PUT").Queries("type", "{type}")
588         r.HandleFunc("/topics/{topic}", getTopic).Methods("GET")
589         r.HandleFunc("/topics/{topic}/startsend", startToSend).Methods("POST")
590         r.HandleFunc("/topics/{topic}/startreceive", startToReceive).Methods("POST")
591         r.HandleFunc("/topics/{topic}/stopsend", stopToSend).Methods("POST")
592         r.HandleFunc("/topics/{topic}/stopreceive", stopToReceive).Methods("POST")
593         r.HandleFunc("/topics/{topic}/msg", sendToTopic).Methods("POST")
594         r.HandleFunc("/topics/{topic}/msg", receiveFromTopic).Methods("GET")
595
596         port := "8090"
597         srv := &http.Server{
598                 Handler:      r,
599                 Addr:         ":" + port,
600                 WriteTimeout: 15 * time.Second,
601                 ReadTimeout:  15 * time.Second,
602         }
603         fmt.Println("Running on port: " + port)
604         fmt.Printf(srv.ListenAndServe().Error())
605 }