--- /dev/null
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package kafkaclient
+
+import (
+ "time"
+
+ "github.com/confluentinc/confluent-kafka-go/kafka"
+)
+
+type KafkaFactory interface {
+ NewKafkaConsumer(topicID string) (KafkaConsumer, error)
+}
+
+type KafkaFactoryImpl struct {
+ BootstrapServer string
+}
+
+func (kf KafkaFactoryImpl) NewKafkaConsumer(topicID string) (KafkaConsumer, error) {
+ consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
+ "bootstrap.servers": kf.BootstrapServer,
+ "group.id": "dmaap-mediator-producer",
+ "auto.offset.reset": "earliest",
+ })
+ if err != nil {
+ return nil, err
+ }
+ return KafkaConsumerImpl{consumer: consumer}, nil
+}
+
+func NewKafkaClient(factory KafkaFactory, topicID string) (KafkaClient, error) {
+ consumer, err := factory.NewKafkaConsumer(topicID)
+ if err != nil {
+ return KafkaClient{}, err
+ }
+ consumer.Commit()
+ err = consumer.Subscribe(topicID)
+ if err != nil {
+ return KafkaClient{}, err
+ }
+ return KafkaClient{consumer: consumer}, nil
+}
+
+type KafkaClient struct {
+ consumer KafkaConsumer
+}
+
+func (kc KafkaClient) ReadMessage() ([]byte, error) {
+ msg, err := kc.consumer.ReadMessage(time.Second)
+ if err != nil {
+ return nil, err
+ }
+ return msg.Value, nil
+}
+
+type KafkaConsumer interface {
+ Commit() ([]kafka.TopicPartition, error)
+ Subscribe(topic string) (err error)
+ ReadMessage(timeout time.Duration) (*kafka.Message, error)
+}
+
+type KafkaConsumerImpl struct {
+ consumer *kafka.Consumer
+}
+
+func (kc KafkaConsumerImpl) Commit() ([]kafka.TopicPartition, error) {
+ return kc.consumer.Commit()
+}
+
+func (kc KafkaConsumerImpl) Subscribe(topic string) error {
+ return kc.consumer.Subscribe(topic, nil)
+}
+
+func (kc KafkaConsumerImpl) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
+ return kc.consumer.ReadMessage(timeout)
+}