e562b267ed0cbb95c3f6c2b5fdc20aec5d678a94
[nonrtric.git] / test / kafka-procon / main.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2020-2022: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 // Writing a basic HTTP server is easy using the
22 // `net/http` package.
23 package main
24
25 import (
26         "context"
27         "encoding/json"
28         "fmt"
29         "io/ioutil"
30         "net/http"
31         "os"
32         "strconv"
33         "sync/atomic"
34         "time"
35
36         "github.com/confluentinc/confluent-kafka-go/kafka"
37         "github.com/enriquebris/goconcurrentqueue"
38         "github.com/gorilla/mux"
39 )
40
41 // Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface
42 //    globalCounters      var holding the "global counters"
43 //      recieved          number of received messages from all topics                             (int)
44 //      sent              number of sent messages to all topics                                   (int)
45 //    topics              var holding all topic related info
46 //      <topic-name>      name of a topic (present after topic is created)
47 //        content-type    data type of the topic                                                  (string)
48 //        counters
49 //          recieved      number of received messages from the topic                              (int)
50 //          sent          number of sent messages to the topic                                    (int)
51 //        messages
52 //          send          messages waiting to be sent (set when sending is started)               (fifo)
53 //          received      received messages waiting to be fetched (set when reception is started) (fifo)
54
55 type counter struct {
56         c uint64
57 }
58
59 func (c *counter) step() {
60         atomic.AddUint64(&c.c, 1)
61 }
62
63 func (c counter) get() uint64 {
64         return atomic.LoadUint64(&c.c)
65 }
66
67 type counters struct {
68         received counter
69         sent     counter
70 }
71
72 func newCounters() counters {
73         return counters{
74                 received: counter{},
75                 sent:     counter{},
76         }
77 }
78
79 type messages struct {
80         send     *goconcurrentqueue.FIFO
81         received *goconcurrentqueue.FIFO
82 }
83
84 func (m *messages) startSend() bool {
85         if m.send == nil {
86                 m.send = goconcurrentqueue.NewFIFO()
87                 return true
88         }
89         return false
90 }
91
92 func (m *messages) stopSend() {
93         m.send = nil
94 }
95
96 func (m *messages) addToSend(msg string) error {
97         if m.send == nil {
98                 return fmt.Errorf("sending not started")
99         }
100         m.send.Lock()
101         defer m.send.Unlock()
102         return m.send.Enqueue(msg)
103 }
104
105 func (m *messages) getToSend() (interface{}, error) {
106         if m.send == nil {
107                 return "", fmt.Errorf("sending not started")
108         }
109         m.send.Lock()
110         defer m.send.Unlock()
111         return m.send.Dequeue()
112 }
113
114 func (m *messages) startReceive() bool {
115         if m.received == nil {
116                 m.received = goconcurrentqueue.NewFIFO()
117                 return true
118         }
119         return false
120 }
121
122 func (m *messages) stopReceive() {
123         m.send = nil
124 }
125
126 type topic struct {
127         contentType string
128         counters    counters
129         messages    messages
130 }
131
132 func newTopic(ct string) *topic {
133         return &topic{
134                 contentType: ct,
135                 counters:    counters{},
136                 messages:    messages{},
137         }
138 }
139
140 var globalCounters counters
141 var topics map[string]*topic = make(map[string]*topic)
142
143 var bootstrapserver = ""
144
145 func initApp() {
146         bootstrapserver = os.Getenv("KAFKA_BOOTSTRAP_SERVER")
147         if len(bootstrapserver) == 0 {
148                 fmt.Println("Fatal error: env var KAFKA_BOOTSTRAP_SERVER not set")
149                 fmt.Println("Exiting... ")
150                 os.Exit(1)
151         }
152         fmt.Println("Using KAFKA_BOOTSTRAP_SERVER=" + bootstrapserver)
153 }
154
155 //Helper function to get a created topic, if it exists
156 func getTopicFromRequest(w http.ResponseWriter, req *http.Request) (*topic, string, bool) {
157         topicId := mux.Vars(req)["topic"]
158         t, exist := topics[topicId]
159         if exist == false {
160                 w.WriteHeader(http.StatusNotFound)
161                 fmt.Fprintf(w, "Topic %v does not exist", topicId)
162                 return nil, "", false
163         }
164         return t, topicId, true
165 }
166
167 // Alive check
168 // GET on /
169 func healthCheck(w http.ResponseWriter, req *http.Request) {
170         fmt.Fprintf(w, "OK")
171 }
172
173 // Deep reset of this interface stub - no removal of msgs or topics in kafka
174 // POST on /reset
175 func allreset(w http.ResponseWriter, req *http.Request) {
176         for _, v := range topics {
177                 v.messages.stopSend()
178                 v.messages.stopReceive()
179         }
180         time.Sleep(5 * time.Second) //Allow producers/consumers to shut down
181         globalCounters = newCounters()
182         topics = make(map[string]*topic)
183         fmt.Fprintf(w, "OK")
184 }
185
186 // Get topics, return json array of strings of topics created by this interface stub
187 // Returns json array of strings, array is empty if no topics exist
188 // GET on /topics
189
190 func getTopics(w http.ResponseWriter, req *http.Request) {
191         topicKeys := make([]string, 0, len(topics))
192         fmt.Printf("len topics: %v\n", len(topics))
193         for k := range topics {
194                 topicKeys = append(topicKeys, k)
195         }
196         var j, err = json.Marshal(topicKeys)
197         if err != nil {
198                 w.WriteHeader(http.StatusInternalServerError)
199                 fmt.Fprintf(w, "Cannot convert list of topics to json, error details: %v", err)
200                 return
201         } else {
202                 w.Header().Set("Content-Type", "application/json")
203                 w.WriteHeader(http.StatusOK)
204                 w.Write(j)
205         }
206 }
207
208 func writeOkRepsonse(w http.ResponseWriter, httpStatus int, msg string) {
209         w.WriteHeader(httpStatus)
210         w.Header().Set("Content-Type", "text/plain")
211         fmt.Fprintf(w, msg)
212 }
213
214 // Get a counter value
215 // GET /topics/counters/{counter}
216 func getCounter(w http.ResponseWriter, req *http.Request) {
217         ctr := mux.Vars(req)["counter"]
218         var ctrvalue = -1
219         if ctr == "received" {
220                 ctrvalue = int(globalCounters.received.get())
221         } else if ctr == "sent" {
222                 ctrvalue = int(globalCounters.sent.get())
223         }
224
225         if ctrvalue == -1 {
226                 w.WriteHeader(http.StatusBadRequest)
227                 fmt.Fprintf(w, "Counter %v does not exist", ctr)
228                 return
229         }
230         writeOkRepsonse(w, http.StatusOK, strconv.Itoa(ctrvalue))
231         return
232
233 }
234
235 // Create a topic
236 // PUT on /topics/<topic>?type=<type>    type shall be 'application/json' or 'text/plain'
237 func createTopic(w http.ResponseWriter, req *http.Request) {
238         topicId := mux.Vars(req)["topic"]
239         topicType := mux.Vars(req)["type"]
240
241         fmt.Printf("Creating topic: %v, content type: %v\n", topicId, topicType)
242
243         if len(topicType) == 0 {
244                 w.WriteHeader(http.StatusBadRequest)
245                 fmt.Fprintf(w, "Type not specified")
246                 return
247         }
248
249         tid, exist := topics[topicId]
250         if exist == true {
251                 if tid.contentType != topicType {
252                         w.WriteHeader(http.StatusBadRequest)
253                         fmt.Fprintf(w, "Topic type exist but type is different, queue content type: %v, requested content type: %v", tid.contentType, topicType)
254                         return
255                 }
256                 writeOkRepsonse(w, http.StatusOK, "Topic exist")
257                 return
258         }
259
260         t := newTopic(topicType)
261
262         a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
263         defer func() { a.Close() }()
264         if err != nil {
265                 w.WriteHeader(http.StatusInternalServerError)
266                 fmt.Fprintf(w, "Cannot create client to bootstrap server: "+bootstrapserver+", error details: %v", err)
267                 return
268         }
269
270         ctx, cancel := context.WithCancel(context.Background())
271         defer cancel()
272
273         maxDur := 10 * time.Second
274
275         _, err = a.CreateTopics(
276                 ctx,
277                 []kafka.TopicSpecification{{
278                         Topic:             topicId,
279                         NumPartitions:     1,
280                         ReplicationFactor: 1}},
281                 kafka.SetAdminOperationTimeout(maxDur))
282
283         if err != nil {
284                 w.WriteHeader(http.StatusInternalServerError)
285                 fmt.Fprintf(w, "Failed to create topic: %v, error details: %v", topicId, err)
286                 return
287         }
288         topics[topicId] = t
289         w.WriteHeader(http.StatusCreated)
290         fmt.Fprintf(w, "Topic created")
291 }
292
293 // Get topic type
294 // GET on /topic/<topic>
295 func getTopic(w http.ResponseWriter, req *http.Request) {
296         t, _, exist := getTopicFromRequest(w, req)
297         if !exist {
298                 return
299         }
300         w.WriteHeader(http.StatusOK)
301         fmt.Fprintf(w, t.contentType)
302 }
303
304 // Get a topics counter value
305 // GET /topics/{topic}/counters/{counter}
306 func getTopicCounter(w http.ResponseWriter, req *http.Request) {
307         t, _, exist := getTopicFromRequest(w, req)
308         if !exist {
309                 return
310         }
311         ctr := mux.Vars(req)["counter"]
312
313         var ctrvalue = -1
314         if ctr == "received" {
315                 ctrvalue = int(t.counters.received.get())
316         } else if ctr == "sent" {
317                 ctrvalue = int(t.counters.sent.get())
318         }
319
320         if ctrvalue == -1 {
321                 w.WriteHeader(http.StatusBadRequest)
322                 fmt.Fprintf(w, "Counter %v does not exist", ctr)
323                 return
324         }
325         w.WriteHeader(http.StatusOK)
326         fmt.Fprintf(w, strconv.Itoa(ctrvalue))
327         return
328 }
329
330 func startToSend(w http.ResponseWriter, req *http.Request) {
331         t, topicId, exist := getTopicFromRequest(w, req)
332         fmt.Printf("Start to send to topic: %v\n", topicId)
333         if !exist {
334                 return
335         }
336
337         if !t.messages.startSend() {
338                 w.WriteHeader(http.StatusOK)
339                 fmt.Fprintf(w, "Already started sending")
340                 return
341         }
342         go func() {
343                 p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
344                 if err != nil {
345                         fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
346                         return
347                 }
348                 defer func() {
349                         fmt.Printf("Closing producer for topic: %v\n", topicId)
350                         p.Close()
351                 }()
352                 for {
353                         q := t.messages.send
354                         if q == nil {
355                                 return
356                         }
357                         m, err := q.Get(0)
358                         if err == nil {
359                                 err = p.Produce(&kafka.Message{
360                                         TopicPartition: kafka.TopicPartition{Topic: &topicId, Partition: kafka.PartitionAny},
361                                         Value:          []byte(fmt.Sprintf("%v", m)),
362                                 }, nil)
363                                 if err == nil {
364                                         q.Remove(0)
365                                         t.counters.sent.step()
366                                         globalCounters.sent.step()
367                                         msg := fmt.Sprintf("%v", m)
368                                         if len(msg) < 500 {
369                                                 fmt.Printf("Message sent on topic: %v, len: %v, msg: %v\n", topicId, len(msg), msg)
370                                         } else {
371                                                 fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed\n", topicId, len(msg))
372                                         }
373                                 } else {
374                                         fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v\n", topicId, err)
375                                         q.Remove(0)
376                                 }
377                         } else {
378                                 time.Sleep(10 * time.Millisecond)
379                         }
380                 }
381         }()
382
383         w.WriteHeader(http.StatusOK)
384         fmt.Fprintf(w, "Sending started")
385 }
386
387 func startToReceive(w http.ResponseWriter, req *http.Request) {
388         t, topicId, exist := getTopicFromRequest(w, req)
389         if !exist {
390                 return
391         }
392
393         if !t.messages.startReceive() {
394                 w.WriteHeader(http.StatusOK)
395                 fmt.Fprintf(w, "Already started receiving")
396                 return
397         }
398
399         go func() {
400
401                 defer func() { t.messages.stopReceive() }()
402
403                 groudId := "kafkaprocon"
404
405                 c, err := kafka.NewConsumer(&kafka.ConfigMap{
406                         "bootstrap.servers":       bootstrapserver,
407                         "group.id":                groudId,
408                         "auto.offset.reset":       "earliest",
409                         "enable.auto.commit":      true,
410                         "auto.commit.interval.ms": 5000,
411                 })
412                 if err != nil {
413                         fmt.Printf("Cannot create consumer for topic: %v, error details: %v\n", topicId, err)
414                         t.messages.stopReceive()
415                         return
416                 }
417                 c.Commit()
418                 defer func() { c.Close() }()
419                 for {
420                         que := t.messages.received
421                         if que == nil {
422                                 fmt.Printf("Cannot start receiving on topic: %v, queue does not exist\n", topicId)
423                                 return
424                         }
425                         fmt.Printf("Start subscribing on topic: %v\n", topicId)
426                         err = c.SubscribeTopics([]string{topicId}, nil)
427                         if err != nil {
428                                 fmt.Printf("Cannot start subscribing on topic: %v, error details: %v\n", topicId, err)
429                                 return
430                         }
431                         maxDur := 1 * time.Second
432                         for {
433                                 msg, err := c.ReadMessage(maxDur)
434                                 if err == nil {
435                                         if len(msg.Value) < 500 {
436                                                 fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v\n", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
437                                         } else {
438                                                 fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed\n", topicId, msg.TopicPartition, len(msg.Value))
439                                         }
440                                         err = t.messages.received.Enqueue(string(msg.Value))
441                                         if err != nil {
442                                                 w.WriteHeader(http.StatusInternalServerError)
443                                                 fmt.Fprintf(w, "Received message topic: %v, cannot be put in queue, %v", topicId, err)
444                                                 return
445                                         }
446                                         t.counters.received.step()
447                                         globalCounters.received.step()
448                                 } else {
449                                         fmt.Printf("Nothing to consume on topic: %v, reason: %v\n", topicId, err)
450                                 }
451                         }
452                 }
453         }()
454
455         w.WriteHeader(http.StatusOK)
456         fmt.Fprintf(w, "Receiving started")
457 }
458
459 // Post a message to a topic
460 // POST /send    content type is specified in content type
461 func sendToTopic(w http.ResponseWriter, req *http.Request) {
462
463         t, topicId, exist := getTopicFromRequest(w, req)
464         fmt.Printf("Send to topic: %v\n", topicId)
465         if !exist {
466                 return
467         }
468         q := t.messages.send
469         if q == nil {
470                 w.WriteHeader(http.StatusBadRequest)
471                 fmt.Fprintf(w, "Sending not initiated on topic: %v", topicId)
472                 return
473         }
474         ct := req.Header.Get("Content-Type")
475         if ct != t.contentType {
476                 w.WriteHeader(http.StatusBadRequest)
477                 fmt.Fprintf(w, "Message to send content type: %v on topic: %v does not match queue content type: %v", ct, topicId, t.contentType)
478                 return
479         }
480
481         if ct == "application/json" {
482                 // decoder := json.NewDecoder(req.Body)
483                 // var j :=
484                 // err := decoder.Decode(&j)
485                 // if err != nil {
486                 //      w.WriteHeader(http.StatusBadRequest)
487                 //      w.Header().Set("Content-Type", "text/plain")
488                 //      fmt.Fprintf(w, "Json payload cannot be decoded, error details %v\n", err)
489                 //      return
490                 // }
491                 //m = mux.Vars(req)[""]
492                 if err := req.ParseForm(); err != nil {
493                         w.WriteHeader(http.StatusBadRequest)
494                         fmt.Fprintf(w, "Json payload cannot be decoded on topic: %v, error details %v", topicId, err)
495                         return
496                 }
497                 b, err := ioutil.ReadAll(req.Body)
498                 if err == nil {
499                         if len(b) < 500 {
500                                 fmt.Printf("Json payload to send on topic: %v, msg: %v\n", topicId, string(b))
501                         } else {
502                                 fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
503                         }
504                 } else {
505                         fmt.Printf("Json payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
506                 }
507                 err = q.Enqueue(string(b))
508                 if err != nil {
509                         w.WriteHeader(http.StatusInternalServerError)
510                         fmt.Fprintf(w, "Json message to send cannot be put in queue")
511                         return
512                 }
513         } else if ct == "text/plain" {
514                 if err := req.ParseForm(); err != nil {
515                         w.WriteHeader(http.StatusBadRequest)
516                         fmt.Fprintf(w, "Text payload to send on topic: %v cannot be decoded, error details %v\n", topicId, err)
517                         return
518                 }
519                 b, err := ioutil.ReadAll(req.Body)
520                 if err == nil {
521                         if len(b) < 500 {
522                                 fmt.Printf("Text payload to send on topic: %v, msg: %v\n", topicId, string(b))
523                         } else {
524                                 fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...\n", topicId)
525                         }
526                 } else {
527                         fmt.Printf("Text payload to send on topic: %v cannnot be decoded, err: %v\n", topicId, err)
528                 }
529                 err = q.Enqueue(string(b))
530                 if err != nil {
531                         w.WriteHeader(http.StatusInternalServerError)
532                         fmt.Fprintf(w, "Text message to send cannot be put in queue")
533                         return
534                 }
535         } else {
536                 w.WriteHeader(http.StatusBadRequest)
537                 fmt.Fprintf(w, "Message to send, unknown content type %v", ct)
538                 return
539         }
540
541         w.WriteHeader(http.StatusOK)
542         w.Header().Set("Content-Type", "text/plain")
543         fmt.Fprintf(w, "Message to send put in queue")
544 }
545
546 // Get zero or one message from a topic
547 // GET /receive
548 func receiveFromTopic(w http.ResponseWriter, req *http.Request) {
549         t, topicId, exist := getTopicFromRequest(w, req)
550         if !exist {
551                 return
552         }
553         if t.messages.received == nil {
554                 w.WriteHeader(http.StatusBadRequest)
555                 fmt.Fprintf(w, "Receiving not initiated on topic %v", topicId)
556                 return
557         }
558
559         m, err := t.messages.received.Dequeue()
560         if err != nil {
561                 w.WriteHeader(http.StatusNoContent)
562                 return
563         }
564
565         w.Header().Set("Content-Type", t.contentType)
566         w.WriteHeader(http.StatusOK)
567         fmt.Fprintf(w, "%v", m)
568 }
569
570 // Remove the send queue to stop sending
571 func stopToSend(w http.ResponseWriter, req *http.Request) {
572         fmt.Printf("Stop sending\n")
573         t, _, exist := getTopicFromRequest(w, req)
574         if !exist {
575                 return
576         }
577         t.messages.stopSend()
578         w.WriteHeader(http.StatusNoContent)
579 }
580
581 // Remove the receive queue to stop receiving
582 func stopToReceive(w http.ResponseWriter, req *http.Request) {
583         fmt.Printf("Stop receiving\n")
584         t, _, exist := getTopicFromRequest(w, req)
585         if !exist {
586                 return
587         }
588         t.messages.stopReceive()
589         w.WriteHeader(http.StatusNoContent)
590 }
591
592 func HelloServer(w http.ResponseWriter, r *http.Request) {
593         fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
594 }
595
596 func main() {
597
598         initApp()
599
600         r := mux.NewRouter()
601
602         r.HandleFunc("/", healthCheck).Methods("GET")
603         r.HandleFunc("/reset", allreset).Methods("POST")
604         r.HandleFunc("/counters/{counter}", getCounter).Methods("GET")
605         r.HandleFunc("/topics", getTopics).Methods("GET")
606         r.HandleFunc("/topics/{topic}/counters/{counter}", getTopicCounter).Methods("GET")
607         r.HandleFunc("/topics/{topic}", createTopic).Methods("PUT").Queries("type", "{type}")
608         r.HandleFunc("/topics/{topic}", getTopic).Methods("GET")
609         r.HandleFunc("/topics/{topic}/startsend", startToSend).Methods("POST")
610         r.HandleFunc("/topics/{topic}/startreceive", startToReceive).Methods("POST")
611         r.HandleFunc("/topics/{topic}/stopsend", stopToSend).Methods("POST")
612         r.HandleFunc("/topics/{topic}/stopreceive", stopToReceive).Methods("POST")
613         r.HandleFunc("/topics/{topic}/msg", sendToTopic).Methods("POST")
614         r.HandleFunc("/topics/{topic}/msg", receiveFromTopic).Methods("GET")
615
616         port := "8090"
617         srv := &http.Server{
618                 Handler:      r,
619                 Addr:         ":" + port,
620                 WriteTimeout: 15 * time.Second,
621                 ReadTimeout:  15 * time.Second,
622         }
623         fmt.Println("Running on port: " + port)
624         fmt.Printf(srv.ListenAndServe().Error())
625 }