2 // ========================LICENSE_START=================================
5 // Copyright (C) 2021: Nordix Foundation
7 // Licensed under the Apache License, Version 2.0 (the "License");
8 // you may not use this file except in compliance with the License.
9 // You may obtain a copy of the License at
11 // http://www.apache.org/licenses/LICENSE-2.0
13 // Unless required by applicable law or agreed to in writing, software
14 // distributed under the License is distributed on an "AS IS" BASIS,
15 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 // See the License for the specific language governing permissions and
17 // limitations under the License.
18 // ========================LICENSE_END===================================
26 "github.com/confluentinc/confluent-kafka-go/kafka"
29 type KafkaFactory interface {
30 NewKafkaConsumer(topicID string) (KafkaConsumer, error)
33 type KafkaFactoryImpl struct {
34 BootstrapServer string
37 func (kf KafkaFactoryImpl) NewKafkaConsumer(topicID string) (KafkaConsumer, error) {
38 consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
39 "bootstrap.servers": kf.BootstrapServer,
40 "group.id": "dmaap-mediator-producer",
41 "auto.offset.reset": "earliest",
46 return KafkaConsumerImpl{consumer: consumer}, nil
49 func NewKafkaClient(factory KafkaFactory, topicID string) (KafkaClient, error) {
50 consumer, err := factory.NewKafkaConsumer(topicID)
52 return KafkaClient{}, err
55 err = consumer.Subscribe(topicID)
57 return KafkaClient{}, err
59 return KafkaClient{consumer: consumer}, nil
62 type KafkaClient struct {
63 consumer KafkaConsumer
66 func (kc KafkaClient) ReadMessage() ([]byte, error) {
67 msg, err := kc.consumer.ReadMessage(time.Second)
74 type KafkaConsumer interface {
75 Commit() ([]kafka.TopicPartition, error)
76 Subscribe(topic string) (err error)
77 ReadMessage(timeout time.Duration) (*kafka.Message, error)
80 type KafkaConsumerImpl struct {
81 consumer *kafka.Consumer
84 func (kc KafkaConsumerImpl) Commit() ([]kafka.TopicPartition, error) {
85 return kc.consumer.Commit()
88 func (kc KafkaConsumerImpl) Subscribe(topic string) error {
89 return kc.consumer.Subscribe(topic, nil)
92 func (kc KafkaConsumerImpl) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
93 return kc.consumer.ReadMessage(timeout)