Merge "Add docker build to mediator pom"
[nonrtric.git] / test / kafka-procon / main.go
1 // Writing a basic HTTP server is easy using the
2 // `net/http` package.
3 package main
4
5 import (
6         "context"
7         "encoding/json"
8         "fmt"
9         "io/ioutil"
10         "net/http"
11         "os"
12         "strconv"
13         "sync/atomic"
14         "time"
15
16         "github.com/confluentinc/confluent-kafka-go/kafka"
17         "github.com/enriquebris/goconcurrentqueue"
18         "github.com/gorilla/mux"
19 )
20
21 // Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface
22 //    globalCounters      var holding the "global counters"
23 //      recieved          number of received messages from all topics                             (int)
24 //      sent              number of sent messages to all topics                                   (int)
25 //    topics              var holding all topic related info
26 //      <topic-name>      name of a topic (present after topic is created)
27 //        content-type    data type of the topic                                                  (string)
28 //        counters
29 //          recieved      number of received messages from the topic                              (int)
30 //          sent          number of sent messages to the topic                                    (int)
31 //        messages
32 //          send          messages waiting to be sent (set when sending is started)               (fifo)
33 //          received      received messages waiting to be fetched (set when reception is started) (fifo)
34
35 type counter struct {
36         c uint64
37 }
38
39 func (c *counter) step() {
40         atomic.AddUint64(&c.c, 1)
41 }
42
43 func (c counter) get() uint64 {
44         return atomic.LoadUint64(&c.c)
45 }
46
47 type counters struct {
48         received counter
49         sent     counter
50 }
51
52 func newCounters() counters {
53         return counters{
54                 received: counter{},
55                 sent:     counter{},
56         }
57 }
58
59 type messages struct {
60         send     *goconcurrentqueue.FIFO
61         received *goconcurrentqueue.FIFO
62 }
63
64 func (m *messages) startSend() bool {
65         if m.send == nil {
66                 m.send = goconcurrentqueue.NewFIFO()
67                 return true
68         }
69         return false
70 }
71
72 func (m *messages) stopSend() {
73         m.send = nil
74 }
75
76 func (m *messages) addToSend(msg string) error {
77         if m.send == nil {
78                 return fmt.Errorf("sending not started")
79         }
80         m.send.Lock()
81         defer m.send.Unlock()
82         return m.send.Enqueue(msg)
83 }
84
85 func (m *messages) getToSend() (interface{}, error) {
86         if m.send == nil {
87                 return "", fmt.Errorf("sending not started")
88         }
89         m.send.Lock()
90         defer m.send.Unlock()
91         return m.send.Dequeue()
92 }
93
94 func (m *messages) startReceive() bool {
95         if m.received == nil {
96                 m.received = goconcurrentqueue.NewFIFO()
97                 return true
98         }
99         return false
100 }
101
102 func (m *messages) stopReceive() {
103         m.send = nil
104 }
105
106 type topic struct {
107         contentType string
108         counters    counters
109         messages    messages
110 }
111
112 func newTopic(ct string) *topic {
113         return &topic{
114                 contentType: ct,
115                 counters:    counters{},
116                 messages:    messages{},
117         }
118 }
119
120 var globalCounters counters
121 var topics map[string]*topic = make(map[string]*topic)
122
123 var bootstrapserver = ""
124
125 func initApp() {
126         bootstrapserver = os.Getenv("KAFKA_BOOTSTRAP_SERVER")
127         if len(bootstrapserver) == 0 {
128                 fmt.Println("Fatal error: env var KAFKA_BOOTSTRAP_SERVER not set")
129                 fmt.Println("Exiting... ")
130                 os.Exit(1)
131         }
132         fmt.Println("Using KAFKA_BOOTSTRAP_SERVER=" + bootstrapserver)
133 }
134
135 //Helper function to get a created topic, if it exists
136 func getTopicFromRequest(w http.ResponseWriter, req *http.Request) (*topic, string, bool) {
137         topicId := mux.Vars(req)["topic"]
138         t, exist := topics[topicId]
139         if exist == false {
140                 w.WriteHeader(http.StatusNotFound)
141                 fmt.Fprintf(w, "Topic %v does not exist", topicId)
142                 return nil, "", false
143         }
144         return t, topicId, true
145 }
146
147 // Alive check
148 // GET on /
149 func healthCheck(w http.ResponseWriter, req *http.Request) {
150         fmt.Fprintf(w, "OK")
151 }
152
153 // Deep reset of this interface stub - no removal of msgs or topics in kafka
154 // POST on /reset
155 func allreset(w http.ResponseWriter, req *http.Request) {
156         for _, v := range topics {
157                 v.messages.stopSend()
158                 v.messages.stopReceive()
159         }
160         time.Sleep(5 * time.Second) //Allow producers/consumers to shut down
161         globalCounters = newCounters()
162         topics = make(map[string]*topic)
163         fmt.Fprintf(w, "OK")
164 }
165
166 // Get topics, return json array of strings of topics created by this interface stub
167 // Returns json array of strings, array is empty if no topics exist
168 // GET on /topics
169
170 func getTopics(w http.ResponseWriter, req *http.Request) {
171         topicKeys := make([]string, 0, len(topics))
172         fmt.Printf("len topics: %v\n", len(topics))
173         for k := range topics {
174                 topicKeys = append(topicKeys, k)
175         }
176         var j, err = json.Marshal(topicKeys)
177         if err != nil {
178                 w.WriteHeader(http.StatusInternalServerError)
179                 fmt.Fprintf(w, "Cannot convert list of topics to json, error details: %v", err)
180                 return
181         } else {
182                 w.Header().Set("Content-Type", "application/json")
183                 w.WriteHeader(http.StatusOK)
184                 w.Write(j)
185         }
186 }
187
188 func writeOkRepsonse(w http.ResponseWriter, httpStatus int, msg string) {
189         w.WriteHeader(httpStatus)
190         w.Header().Set("Content-Type", "text/plain")
191         fmt.Fprintf(w, msg)
192 }
193
194 // Get a counter value
195 // GET /topics/counters/{counter}
196 func getCounter(w http.ResponseWriter, req *http.Request) {
197         ctr := mux.Vars(req)["counter"]
198         var ctrvalue = -1
199         if ctr == "received" {
200                 ctrvalue = int(globalCounters.received.get())
201         } else if ctr == "sent" {
202                 ctrvalue = int(globalCounters.sent.get())
203         }
204
205         if ctrvalue == -1 {
206                 w.WriteHeader(http.StatusBadRequest)
207                 fmt.Fprintf(w, "Counter %v does not exist", ctr)
208                 return
209         }
210         writeOkRepsonse(w, http.StatusOK, strconv.Itoa(ctrvalue))
211         return
212
213 }
214
215 // Create a topic
216 // PUT on /topics/<topic>?type=<type>    type shall be 'application/json' or 'text/plain'
217 func createTopic(w http.ResponseWriter, req *http.Request) {
218         topicId := mux.Vars(req)["topic"]
219         topicType := mux.Vars(req)["type"]
220
221         fmt.Printf("Creating topic: %v, content type: %v\n", topicId, topicType)
222
223         if len(topicType) == 0 {
224                 w.WriteHeader(http.StatusBadRequest)
225                 fmt.Fprintf(w, "Type not specified")
226                 return
227         }
228
229         tid, exist := topics[topicId]
230         if exist == true {
231                 if tid.contentType != topicType {
232                         w.WriteHeader(http.StatusBadRequest)
233                         fmt.Fprintf(w, "Topic type exist but type is different, queue content type: %v, requested content type: %v", tid.contentType, topicType)
234                         return
235                 }
236                 writeOkRepsonse(w, http.StatusOK, "Topic exist")
237                 return
238         }
239
240         t := newTopic(topicType)
241
242         a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
243         defer func() { a.Close() }()
244         if err != nil {
245                 w.WriteHeader(http.StatusInternalServerError)
246                 fmt.Fprintf(w, "Cannot create client to bootstrap server: "+bootstrapserver+", error details: %v", err)
247                 return
248         }
249
250         ctx, cancel := context.WithCancel(context.Background())
251         defer cancel()
252
253         maxDur := 10 * time.Second
254
255         _, err = a.CreateTopics(
256                 ctx,
257                 []kafka.TopicSpecification{{
258                         Topic:             topicId,
259                         NumPartitions:     1,
260                         ReplicationFactor: 1}},
261                 kafka.SetAdminOperationTimeout(maxDur))
262
263         if err != nil {
264                 w.WriteHeader(http.StatusInternalServerError)
265                 fmt.Fprintf(w, "Failed to create topic: %v, error details: %v", topicId, err)
266                 return
267         }
268         topics[topicId] = t
269         w.WriteHeader(http.StatusCreated)
270         fmt.Fprintf(w, "Topic created")
271 }
272
273 // Get topic type
274 // GET on /topic/<topic>
275 func getTopic(w http.ResponseWriter, req *http.Request) {
276         t, _, exist := getTopicFromRequest(w, req)
277         if !exist {
278                 return
279         }
280         w.WriteHeader(http.StatusOK)
281         fmt.Fprintf(w, t.contentType)
282 }
283
284 // Get a topics counter value
285 // GET /topics/{topic}/counters/{counter}
286 func getTopicCounter(w http.ResponseWriter, req *http.Request) {
287         t, _, exist := getTopicFromRequest(w, req)
288         if !exist {
289                 return
290         }
291         ctr := mux.Vars(req)["counter"]
292
293         var ctrvalue = -1
294         if ctr == "received" {
295                 ctrvalue = int(t.counters.received.get())
296         } else if ctr == "sent" {
297                 ctrvalue = int(t.counters.sent.get())
298         }
299
300         if ctrvalue == -1 {
301                 w.WriteHeader(http.StatusBadRequest)
302                 fmt.Fprintf(w, "Counter %v does not exist", ctr)
303                 return
304         }
305         w.WriteHeader(http.StatusOK)
306         fmt.Fprintf(w, strconv.Itoa(ctrvalue))
307         return
308 }
309
310 func startToSend(w http.ResponseWriter, req *http.Request) {
311         t, topicId, exist := getTopicFromRequest(w, req)
312         if !exist {
313                 return
314         }
315
316         if !t.messages.startSend() {
317                 w.WriteHeader(http.StatusOK)
318                 fmt.Fprintf(w, "Already started sending")
319                 return
320         }
321         go func() {
322                 p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
323                 if err != nil {
324                         fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
325                         return
326                 }
327                 defer func() { p.Close() }()
328                 for {
329                         q := t.messages.send
330                         if q == nil {
331                                 return
332                         }
333                         m, err := q.Get(0)
334                         if err == nil {
335                                 err = p.Produce(&kafka.Message{
336                                         TopicPartition: kafka.TopicPartition{Topic: &topicId, Partition: kafka.PartitionAny},
337                                         Value:          []byte(fmt.Sprintf("%v", m)),
338                                 }, nil)
339                                 if err == nil {
340                                         q.Remove(0)
341                                         t.counters.sent.step()
342                                         globalCounters.sent.step()
343                                         msg := fmt.Sprintf("%v", m)
344                                         if len(msg) < 500 {
345                                                 fmt.Printf("Message sent on topic: %v, len: %v, msg: %v", topicId, len(msg), msg)
346                                         } else {
347                                                 fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed", topicId, len(msg))
348                                         }
349                                 } else {
350                                         fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v", topicId, err)
351                                         q.Remove(0)
352                                 }
353                         } else {
354                                 time.Sleep(10 * time.Millisecond)
355                         }
356                 }
357         }()
358
359         w.WriteHeader(http.StatusOK)
360         fmt.Fprintf(w, "Sending started")
361 }
362
363 func startToReceive(w http.ResponseWriter, req *http.Request) {
364         t, topicId, exist := getTopicFromRequest(w, req)
365         if !exist {
366                 return
367         }
368
369         if !t.messages.startReceive() {
370                 w.WriteHeader(http.StatusOK)
371                 fmt.Fprintf(w, "Already started receiving")
372                 return
373         }
374
375         go func() {
376
377                 defer func() { t.messages.stopReceive() }()
378
379                 groudId := "kafkaprocon"
380
381                 c, err := kafka.NewConsumer(&kafka.ConfigMap{
382                         "bootstrap.servers":       bootstrapserver,
383                         "group.id":                groudId,
384                         "auto.offset.reset":       "earliest",
385                         "enable.auto.commit":      true,
386                         "auto.commit.interval.ms": 5000,
387                 })
388                 if err != nil {
389                         fmt.Printf("Cannot create consumer for topic: %v, error details: %v\n", topicId, err)
390                         t.messages.stopReceive()
391                         return
392                 }
393                 c.Commit()
394                 defer func() { c.Close() }()
395                 for {
396                         que := t.messages.received
397                         if que == nil {
398                                 fmt.Printf("Cannot start receiving on topic: %v, queue does not exist\n", topicId)
399                                 return
400                         }
401                         fmt.Printf("Start subscribing on topic: %v\n", topicId)
402                         err = c.SubscribeTopics([]string{topicId}, nil)
403                         if err != nil {
404                                 fmt.Printf("Cannot start subscribing on topic: %v, error details: %v\n", topicId, err)
405                                 return
406                         }
407                         maxDur := 1 * time.Second
408                         for {
409                                 msg, err := c.ReadMessage(maxDur)
410                                 if err == nil {
411                                         if len(msg.Value) < 500 {
412                                                 fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
413                                         } else {
414                                                 fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed", topicId, msg.TopicPartition, len(msg.Value))
415                                         }
416                                         err = t.messages.received.Enqueue(string(msg.Value))
417                                         if err != nil {
418                                                 w.WriteHeader(http.StatusInternalServerError)
419                                                 fmt.Fprintf(w, "Received message topic: %v, cannot be put in queue, %v", topicId, err)
420                                                 return
421                                         }
422                                         t.counters.received.step()
423                                         globalCounters.received.step()
424                                 } else {
425                                         fmt.Printf("Nothing to consume on topic: %v, reason: %v\n", topicId, err)
426                                 }
427                         }
428                 }
429         }()
430
431         w.WriteHeader(http.StatusOK)
432         fmt.Fprintf(w, "Receiving started")
433 }
434
435 // Post a message to a topic
436 // POST /send    content type is specified in content type
437 func sendToTopic(w http.ResponseWriter, req *http.Request) {
438         t, topicId, exist := getTopicFromRequest(w, req)
439         if !exist {
440                 return
441         }
442         q := t.messages.send
443         if q == nil {
444                 w.WriteHeader(http.StatusBadRequest)
445                 fmt.Fprintf(w, "Sending not initiated on topic: %v", topicId)
446                 return
447         }
448         ct := req.Header.Get("Content-Type")
449         if ct != t.contentType {
450                 w.WriteHeader(http.StatusBadRequest)
451                 fmt.Fprintf(w, "Message to send content type: %v on topic: %v does not match queue content type: %v", ct, topicId, t.contentType)
452                 return
453         }
454
455         if ct == "application/json" {
456                 // decoder := json.NewDecoder(req.Body)
457                 // var j :=
458                 // err := decoder.Decode(&j)
459                 // if err != nil {
460                 //      w.WriteHeader(http.StatusBadRequest)
461                 //      w.Header().Set("Content-Type", "text/plain")
462                 //      fmt.Fprintf(w, "Json payload cannot be decoded, error details %v\n", err)
463                 //      return
464                 // }
465                 //m = mux.Vars(req)[""]
466                 if err := req.ParseForm(); err != nil {
467                         w.WriteHeader(http.StatusBadRequest)
468                         fmt.Fprintf(w, "Json payload cannot be decoded on topic: %v, error details %v", topicId, err)
469                         return
470                 }
471                 b, err := ioutil.ReadAll(req.Body)
472                 if err == nil {
473                         if len(b) < 500 {
474                                 fmt.Printf("Json payload to send on topic: %v, msg: %v", topicId, string(b))
475                         } else {
476                                 fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
477                         }
478                 }
479                 err = q.Enqueue(string(b))
480                 if err != nil {
481                         w.WriteHeader(http.StatusInternalServerError)
482                         fmt.Fprintf(w, "Json message to send cannot be put in queue")
483                         return
484                 }
485         } else if ct == "text/plain" {
486                 if err := req.ParseForm(); err != nil {
487                         w.WriteHeader(http.StatusBadRequest)
488                         fmt.Fprintf(w, "Text payload to send on topic: %v cannot be decoded, error details %v\n", topicId, err)
489                         return
490                 }
491                 b, err := ioutil.ReadAll(req.Body)
492                 if err == nil {
493                         if len(b) < 500 {
494                                 fmt.Printf("Text payload to send on topic: %v, msg: %v", topicId, string(b))
495                         } else {
496                                 fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
497                         }
498                 }
499                 err = q.Enqueue(string(b))
500                 if err != nil {
501                         w.WriteHeader(http.StatusInternalServerError)
502                         fmt.Fprintf(w, "Text message to send cannot be put in queue")
503                         return
504                 }
505         } else {
506                 w.WriteHeader(http.StatusBadRequest)
507                 fmt.Fprintf(w, "Message to send, unknown content type %v", ct)
508                 return
509         }
510
511         w.WriteHeader(http.StatusOK)
512         w.Header().Set("Content-Type", "text/plain")
513         fmt.Fprintf(w, "Message to send put in queue")
514 }
515
516 // Get zero or one message from a topic
517 // GET /receive
518 func receiveFromTopic(w http.ResponseWriter, req *http.Request) {
519         t, topicId, exist := getTopicFromRequest(w, req)
520         if !exist {
521                 return
522         }
523         if t.messages.received == nil {
524                 w.WriteHeader(http.StatusBadRequest)
525                 fmt.Fprintf(w, "Receiving not initiated on topic %v", topicId)
526                 return
527         }
528
529         m, err := t.messages.received.Dequeue()
530         if err != nil {
531                 w.WriteHeader(http.StatusNoContent)
532                 return
533         }
534
535         w.Header().Set("Content-Type", t.contentType)
536         w.WriteHeader(http.StatusOK)
537         fmt.Fprintf(w, "%v", m)
538 }
539
540 // Remove the send queue to stop sending
541 func stopToSend(w http.ResponseWriter, req *http.Request) {
542         fmt.Printf("Stop sending\n")
543         t, _, exist := getTopicFromRequest(w, req)
544         if !exist {
545                 return
546         }
547         t.messages.stopSend()
548         w.WriteHeader(http.StatusNoContent)
549 }
550
551 // Remove the receive queue to stop receiving
552 func stopToReceive(w http.ResponseWriter, req *http.Request) {
553         fmt.Printf("Stop receiving\n")
554         t, _, exist := getTopicFromRequest(w, req)
555         if !exist {
556                 return
557         }
558         t.messages.stopReceive()
559         w.WriteHeader(http.StatusNoContent)
560 }
561
562 func HelloServer(w http.ResponseWriter, r *http.Request) {
563         fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
564 }
565
566 func main() {
567
568         initApp()
569
570         r := mux.NewRouter()
571
572         r.HandleFunc("/", healthCheck).Methods("GET")
573         r.HandleFunc("/reset", allreset).Methods("POST")
574         r.HandleFunc("/counters/{counter}", getCounter).Methods("GET")
575         r.HandleFunc("/topics", getTopics).Methods("GET")
576         r.HandleFunc("/topics/{topic}/counters/{counter}", getTopicCounter).Methods("GET")
577         r.HandleFunc("/topics/{topic}", createTopic).Methods("PUT").Queries("type", "{type}")
578         r.HandleFunc("/topics/{topic}", getTopic).Methods("GET")
579         r.HandleFunc("/topics/{topic}/startsend", startToSend).Methods("POST")
580         r.HandleFunc("/topics/{topic}/startreceive", startToReceive).Methods("POST")
581         r.HandleFunc("/topics/{topic}/stopsend", stopToSend).Methods("POST")
582         r.HandleFunc("/topics/{topic}/stopreceive", stopToReceive).Methods("POST")
583         r.HandleFunc("/topics/{topic}/msg", sendToTopic).Methods("POST")
584         r.HandleFunc("/topics/{topic}/msg", receiveFromTopic).Methods("GET")
585
586         port := "8090"
587         srv := &http.Server{
588                 Handler:      r,
589                 Addr:         ":" + port,
590                 WriteTimeout: 15 * time.Second,
591                 ReadTimeout:  15 * time.Second,
592         }
593         fmt.Println("Running on port: " + port)
594         fmt.Printf(srv.ListenAndServe().Error())
595 }