+// Writing a basic HTTP server is easy using the
+// `net/http` package.
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "os"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+ "github.com/enriquebris/goconcurrentqueue"
+ "github.com/gorilla/mux"
+)
+
+// Note: consumer 'group' and consumer 'user' both uses hardcoded values specific to this interface
+// globalCounters var holding the "global counters"
+// recieved number of received messages from all topics (int)
+// sent number of sent messages to all topics (int)
+// topics var holding all topic related info
+// <topic-name> name of a topic (present after topic is created)
+// content-type data type of the topic (string)
+// counters
+// recieved number of received messages from the topic (int)
+// sent number of sent messages to the topic (int)
+// messages
+// send messages waiting to be sent (set when sending is started) (fifo)
+// received received messages waiting to be fetched (set when reception is started) (fifo)
+
+type counter struct {
+ c uint64
+}
+
+func (c *counter) step() {
+ atomic.AddUint64(&c.c, 1)
+}
+
+func (c counter) get() uint64 {
+ return atomic.LoadUint64(&c.c)
+}
+
+type counters struct {
+ received counter
+ sent counter
+}
+
+func newCounters() counters {
+ return counters{
+ received: counter{},
+ sent: counter{},
+ }
+}
+
+type messages struct {
+ send *goconcurrentqueue.FIFO
+ received *goconcurrentqueue.FIFO
+}
+
+func (m *messages) startSend() bool {
+ if m.send == nil {
+ m.send = goconcurrentqueue.NewFIFO()
+ return true
+ }
+ return false
+}
+
+func (m *messages) stopSend() {
+ m.send = nil
+}
+
+func (m *messages) addToSend(msg string) error {
+ if m.send == nil {
+ return fmt.Errorf("sending not started")
+ }
+ m.send.Lock()
+ defer m.send.Unlock()
+ return m.send.Enqueue(msg)
+}
+
+func (m *messages) getToSend() (interface{}, error) {
+ if m.send == nil {
+ return "", fmt.Errorf("sending not started")
+ }
+ m.send.Lock()
+ defer m.send.Unlock()
+ return m.send.Dequeue()
+}
+
+func (m *messages) startReceive() bool {
+ if m.received == nil {
+ m.received = goconcurrentqueue.NewFIFO()
+ return true
+ }
+ return false
+}
+
+func (m *messages) stopReceive() {
+ m.send = nil
+}
+
+type topic struct {
+ contentType string
+ counters counters
+ messages messages
+}
+
+func newTopic(ct string) *topic {
+ return &topic{
+ contentType: ct,
+ counters: counters{},
+ messages: messages{},
+ }
+}
+
+var globalCounters counters
+var topics map[string]*topic = make(map[string]*topic)
+
+var bootstrapserver = ""
+
+func initApp() {
+ bootstrapserver = os.Getenv("KAFKA_BOOTSTRAP_SERVER")
+ if len(bootstrapserver) == 0 {
+ fmt.Println("Fatal error: env var KAFKA_BOOTSTRAP_SERVER not set")
+ fmt.Println("Exiting... ")
+ os.Exit(1)
+ }
+ fmt.Println("Using KAFKA_BOOTSTRAP_SERVER=" + bootstrapserver)
+}
+
+//Helper function to get a created topic, if it exists
+func getTopicFromRequest(w http.ResponseWriter, req *http.Request) (*topic, string, bool) {
+ topicId := mux.Vars(req)["topic"]
+ t, exist := topics[topicId]
+ if exist == false {
+ w.WriteHeader(http.StatusNotFound)
+ fmt.Fprintf(w, "Topic %v does not exist", topicId)
+ return nil, "", false
+ }
+ return t, topicId, true
+}
+
+// Alive check
+// GET on /
+func healthCheck(w http.ResponseWriter, req *http.Request) {
+ fmt.Fprintf(w, "OK")
+}
+
+// Deep reset of this interface stub - no removal of msgs or topics in kafka
+// POST on /reset
+func allreset(w http.ResponseWriter, req *http.Request) {
+ for _, v := range topics {
+ v.messages.stopSend()
+ v.messages.stopReceive()
+ }
+ time.Sleep(5 * time.Second) //Allow producers/consumers to shut down
+ globalCounters = newCounters()
+ topics = make(map[string]*topic)
+ fmt.Fprintf(w, "OK")
+}
+
+// Get topics, return json array of strings of topics created by this interface stub
+// Returns json array of strings, array is empty if no topics exist
+// GET on /topics
+
+func getTopics(w http.ResponseWriter, req *http.Request) {
+ topicKeys := make([]string, 0, len(topics))
+ fmt.Printf("len topics: %v\n", len(topics))
+ for k := range topics {
+ topicKeys = append(topicKeys, k)
+ }
+ var j, err = json.Marshal(topicKeys)
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Cannot convert list of topics to json, error details: %v", err)
+ return
+ } else {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ w.Write(j)
+ }
+}
+
+func writeOkRepsonse(w http.ResponseWriter, httpStatus int, msg string) {
+ w.WriteHeader(httpStatus)
+ w.Header().Set("Content-Type", "text/plain")
+ fmt.Fprintf(w, msg)
+}
+
+// Get a counter value
+// GET /topics/counters/{counter}
+func getCounter(w http.ResponseWriter, req *http.Request) {
+ ctr := mux.Vars(req)["counter"]
+ var ctrvalue = -1
+ if ctr == "received" {
+ ctrvalue = int(globalCounters.received.get())
+ } else if ctr == "sent" {
+ ctrvalue = int(globalCounters.sent.get())
+ }
+
+ if ctrvalue == -1 {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Counter %v does not exist", ctr)
+ return
+ }
+ writeOkRepsonse(w, http.StatusOK, strconv.Itoa(ctrvalue))
+ return
+
+}
+
+// Create a topic
+// PUT on /topics/<topic>?type=<type> type shall be 'application/json' or 'text/plain'
+func createTopic(w http.ResponseWriter, req *http.Request) {
+ topicId := mux.Vars(req)["topic"]
+ topicType := mux.Vars(req)["type"]
+
+ fmt.Printf("Creating topic: %v, content type: %v\n", topicId, topicType)
+
+ if len(topicType) == 0 {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Type not specified")
+ return
+ }
+
+ tid, exist := topics[topicId]
+ if exist == true {
+ if tid.contentType != topicType {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Topic type exist but type is different, queue content type: %v, requested content type: %v", tid.contentType, topicType)
+ return
+ }
+ writeOkRepsonse(w, http.StatusOK, "Topic exist")
+ return
+ }
+
+ t := newTopic(topicType)
+
+ a, err := kafka.NewAdminClient(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
+ defer func() { a.Close() }()
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Cannot create client to bootstrap server: "+bootstrapserver+", error details: %v", err)
+ return
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ maxDur := 10 * time.Second
+
+ _, err = a.CreateTopics(
+ ctx,
+ []kafka.TopicSpecification{{
+ Topic: topicId,
+ NumPartitions: 1,
+ ReplicationFactor: 1}},
+ kafka.SetAdminOperationTimeout(maxDur))
+
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Failed to create topic: %v, error details: %v", topicId, err)
+ return
+ }
+ topics[topicId] = t
+ w.WriteHeader(http.StatusCreated)
+ fmt.Fprintf(w, "Topic created")
+}
+
+// Get topic type
+// GET on /topic/<topic>
+func getTopic(w http.ResponseWriter, req *http.Request) {
+ t, _, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, t.contentType)
+}
+
+// Get a topics counter value
+// GET /topics/{topic}/counters/{counter}
+func getTopicCounter(w http.ResponseWriter, req *http.Request) {
+ t, _, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ ctr := mux.Vars(req)["counter"]
+
+ var ctrvalue = -1
+ if ctr == "received" {
+ ctrvalue = int(t.counters.received.get())
+ } else if ctr == "sent" {
+ ctrvalue = int(t.counters.sent.get())
+ }
+
+ if ctrvalue == -1 {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Counter %v does not exist", ctr)
+ return
+ }
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, strconv.Itoa(ctrvalue))
+ return
+}
+
+func startToSend(w http.ResponseWriter, req *http.Request) {
+ t, topicId, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+
+ if !t.messages.startSend() {
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "Already started sending")
+ return
+ }
+ go func() {
+ p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": bootstrapserver})
+ if err != nil {
+ fmt.Printf("Cannot create producer for topic: %v, error details: %v\n", topicId, err)
+ return
+ }
+ defer func() { p.Close() }()
+ for {
+ q := t.messages.send
+ if q == nil {
+ return
+ }
+ m, err := q.Get(0)
+ if err == nil {
+ err = p.Produce(&kafka.Message{
+ TopicPartition: kafka.TopicPartition{Topic: &topicId, Partition: kafka.PartitionAny},
+ Value: []byte(fmt.Sprintf("%v", m)),
+ }, nil)
+ if err == nil {
+ q.Remove(0)
+ t.counters.sent.step()
+ globalCounters.sent.step()
+ msg := fmt.Sprintf("%v", m)
+ if len(msg) < 500 {
+ fmt.Printf("Message sent on topic: %v, len: %v, msg: %v", topicId, len(msg), msg)
+ } else {
+ fmt.Printf("Message sent on topic: %v, len: %v, is larger than 500...not printed", topicId, len(msg))
+ }
+ } else {
+ fmt.Printf("Failed to send message on topic: %v. Discarded. Error details: %v", topicId, err)
+ q.Remove(0)
+ }
+ } else {
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+ }()
+
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "Sending started")
+}
+
+func startToReceive(w http.ResponseWriter, req *http.Request) {
+ t, topicId, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+
+ if !t.messages.startReceive() {
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "Already started receiving")
+ return
+ }
+
+ go func() {
+
+ defer func() { t.messages.stopReceive() }()
+
+ groudId := "kafkaprocon"
+
+ c, err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": bootstrapserver,
+ "group.id": groudId,
+ "auto.offset.reset": "earliest",
+ "enable.auto.commit": true,
+ "auto.commit.interval.ms": 5000,
+ })
+ if err != nil {
+ fmt.Printf("Cannot create consumer for topic: %v, error details: %v\n", topicId, err)
+ t.messages.stopReceive()
+ return
+ }
+ c.Commit()
+ defer func() { c.Close() }()
+ for {
+ que := t.messages.received
+ if que == nil {
+ fmt.Printf("Cannot start receiving on topic: %v, queue does not exist\n", topicId)
+ return
+ }
+ fmt.Printf("Start subscribing on topic: %v\n", topicId)
+ err = c.SubscribeTopics([]string{topicId}, nil)
+ if err != nil {
+ fmt.Printf("Cannot start subscribing on topic: %v, error details: %v\n", topicId, err)
+ return
+ }
+ maxDur := 1 * time.Second
+ for {
+ msg, err := c.ReadMessage(maxDur)
+ if err == nil {
+ if len(msg.Value) < 500 {
+ fmt.Printf("Message received on topic: %v, partion: %v, len: %v, msg: %v", topicId, msg.TopicPartition, len(msg.Value), string(msg.Value))
+ } else {
+ fmt.Printf("Message received on topic: %v, partion: %v, len: %v is larger than 500...not printed", topicId, msg.TopicPartition, len(msg.Value))
+ }
+ err = t.messages.received.Enqueue(string(msg.Value))
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Received message topic: %v, cannot be put in queue, %v", topicId, err)
+ return
+ }
+ t.counters.received.step()
+ globalCounters.received.step()
+ } else {
+ fmt.Printf("Nothing to consume on topic: %v, reason: %v\n", topicId, err)
+ }
+ }
+ }
+ }()
+
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "Receiving started")
+}
+
+// Post a message to a topic
+// POST /send content type is specified in content type
+func sendToTopic(w http.ResponseWriter, req *http.Request) {
+ t, topicId, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ q := t.messages.send
+ if q == nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Sending not initiated on topic: %v", topicId)
+ return
+ }
+ ct := req.Header.Get("Content-Type")
+ if ct != t.contentType {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Message to send content type: %v on topic: %v does not match queue content type: %v", ct, topicId, t.contentType)
+ return
+ }
+
+ if ct == "application/json" {
+ // decoder := json.NewDecoder(req.Body)
+ // var j :=
+ // err := decoder.Decode(&j)
+ // if err != nil {
+ // w.WriteHeader(http.StatusBadRequest)
+ // w.Header().Set("Content-Type", "text/plain")
+ // fmt.Fprintf(w, "Json payload cannot be decoded, error details %v\n", err)
+ // return
+ // }
+ //m = mux.Vars(req)[""]
+ if err := req.ParseForm(); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Json payload cannot be decoded on topic: %v, error details %v", topicId, err)
+ return
+ }
+ b, err := ioutil.ReadAll(req.Body)
+ if err == nil {
+ if len(b) < 500 {
+ fmt.Printf("Json payload to send on topic: %v, msg: %v", topicId, string(b))
+ } else {
+ fmt.Printf("Json payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+ }
+ }
+ err = q.Enqueue(string(b))
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Json message to send cannot be put in queue")
+ return
+ }
+ } else if ct == "text/plain" {
+ if err := req.ParseForm(); err != nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Text payload to send on topic: %v cannot be decoded, error details %v\n", topicId, err)
+ return
+ }
+ b, err := ioutil.ReadAll(req.Body)
+ if err == nil {
+ if len(b) < 500 {
+ fmt.Printf("Text payload to send on topic: %v, msg: %v", topicId, string(b))
+ } else {
+ fmt.Printf("Text payload to send on topic: %v larger than 500 bytes, not printed...", topicId)
+ }
+ }
+ err = q.Enqueue(string(b))
+ if err != nil {
+ w.WriteHeader(http.StatusInternalServerError)
+ fmt.Fprintf(w, "Text message to send cannot be put in queue")
+ return
+ }
+ } else {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Message to send, unknown content type %v", ct)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
+ w.Header().Set("Content-Type", "text/plain")
+ fmt.Fprintf(w, "Message to send put in queue")
+}
+
+// Get zero or one message from a topic
+// GET /receive
+func receiveFromTopic(w http.ResponseWriter, req *http.Request) {
+ t, topicId, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ if t.messages.received == nil {
+ w.WriteHeader(http.StatusBadRequest)
+ fmt.Fprintf(w, "Receiving not initiated on topic %v", topicId)
+ return
+ }
+
+ m, err := t.messages.received.Dequeue()
+ if err != nil {
+ w.WriteHeader(http.StatusNoContent)
+ return
+ }
+
+ w.Header().Set("Content-Type", t.contentType)
+ w.WriteHeader(http.StatusOK)
+ fmt.Fprintf(w, "%v", m)
+}
+
+// Remove the send queue to stop sending
+func stopToSend(w http.ResponseWriter, req *http.Request) {
+ fmt.Printf("Stop sending\n")
+ t, _, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ t.messages.stopSend()
+ w.WriteHeader(http.StatusNoContent)
+}
+
+// Remove the receive queue to stop receiving
+func stopToReceive(w http.ResponseWriter, req *http.Request) {
+ fmt.Printf("Stop receiving\n")
+ t, _, exist := getTopicFromRequest(w, req)
+ if !exist {
+ return
+ }
+ t.messages.stopReceive()
+ w.WriteHeader(http.StatusNoContent)
+}
+
+func HelloServer(w http.ResponseWriter, r *http.Request) {
+ fmt.Fprintf(w, "Hello, %s!", r.URL.Path[1:])
+}
+
+func main() {
+
+ initApp()
+
+ r := mux.NewRouter()
+
+ r.HandleFunc("/", healthCheck).Methods("GET")
+ r.HandleFunc("/reset", allreset).Methods("POST")
+ r.HandleFunc("/counters/{counter}", getCounter).Methods("GET")
+ r.HandleFunc("/topics", getTopics).Methods("GET")
+ r.HandleFunc("/topics/{topic}/counters/{counter}", getTopicCounter).Methods("GET")
+ r.HandleFunc("/topics/{topic}", createTopic).Methods("PUT").Queries("type", "{type}")
+ r.HandleFunc("/topics/{topic}", getTopic).Methods("GET")
+ r.HandleFunc("/topics/{topic}/startsend", startToSend).Methods("POST")
+ r.HandleFunc("/topics/{topic}/startreceive", startToReceive).Methods("POST")
+ r.HandleFunc("/topics/{topic}/stopsend", stopToSend).Methods("POST")
+ r.HandleFunc("/topics/{topic}/stopreceive", stopToReceive).Methods("POST")
+ r.HandleFunc("/topics/{topic}/msg", sendToTopic).Methods("POST")
+ r.HandleFunc("/topics/{topic}/msg", receiveFromTopic).Methods("GET")
+
+ port := "8090"
+ srv := &http.Server{
+ Handler: r,
+ Addr: ":" + port,
+ WriteTimeout: 15 * time.Second,
+ ReadTimeout: 15 * time.Second,
+ }
+ fmt.Println("Running on port: " + port)
+ fmt.Printf(srv.ListenAndServe().Error())
+}