Uplift from master branch
[nonrtric.git] / dmaap-mediator-producer / internal / kafkaclient / kafkaclient.go
diff --git a/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go b/dmaap-mediator-producer/internal/kafkaclient/kafkaclient.go
new file mode 100644 (file)
index 0000000..16abcb4
--- /dev/null
@@ -0,0 +1,94 @@
+// -
+//   ========================LICENSE_START=================================
+//   O-RAN-SC
+//   %%
+//   Copyright (C) 2021: Nordix Foundation
+//   %%
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//        http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+//   ========================LICENSE_END===================================
+//
+
+package kafkaclient
+
+import (
+       "time"
+
+       "github.com/confluentinc/confluent-kafka-go/kafka"
+)
+
+type KafkaFactory interface {
+       NewKafkaConsumer(topicID string) (KafkaConsumer, error)
+}
+
+type KafkaFactoryImpl struct {
+       BootstrapServer string
+}
+
+func (kf KafkaFactoryImpl) NewKafkaConsumer(topicID string) (KafkaConsumer, error) {
+       consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
+               "bootstrap.servers": kf.BootstrapServer,
+               "group.id":          "dmaap-mediator-producer",
+               "auto.offset.reset": "earliest",
+       })
+       if err != nil {
+               return nil, err
+       }
+       return KafkaConsumerImpl{consumer: consumer}, nil
+}
+
+func NewKafkaClient(factory KafkaFactory, topicID string) (KafkaClient, error) {
+       consumer, err := factory.NewKafkaConsumer(topicID)
+       if err != nil {
+               return KafkaClient{}, err
+       }
+       consumer.Commit()
+       err = consumer.Subscribe(topicID)
+       if err != nil {
+               return KafkaClient{}, err
+       }
+       return KafkaClient{consumer: consumer}, nil
+}
+
+type KafkaClient struct {
+       consumer KafkaConsumer
+}
+
+func (kc KafkaClient) ReadMessage() ([]byte, error) {
+       msg, err := kc.consumer.ReadMessage(time.Second)
+       if err != nil {
+               return nil, err
+       }
+       return msg.Value, nil
+}
+
+type KafkaConsumer interface {
+       Commit() ([]kafka.TopicPartition, error)
+       Subscribe(topic string) (err error)
+       ReadMessage(timeout time.Duration) (*kafka.Message, error)
+}
+
+type KafkaConsumerImpl struct {
+       consumer *kafka.Consumer
+}
+
+func (kc KafkaConsumerImpl) Commit() ([]kafka.TopicPartition, error) {
+       return kc.consumer.Commit()
+}
+
+func (kc KafkaConsumerImpl) Subscribe(topic string) error {
+       return kc.consumer.Subscribe(topic, nil)
+}
+
+func (kc KafkaConsumerImpl) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
+       return kc.consumer.ReadMessage(timeout)
+}