Merge "Remove rApp Catalogue"
[nonrtric.git] / dmaap-mediator-producer / internal / kafkaclient / kafkaclient.go
1 // -
2 //   ========================LICENSE_START=================================
3 //   O-RAN-SC
4 //   %%
5 //   Copyright (C) 2021: Nordix Foundation
6 //   %%
7 //   Licensed under the Apache License, Version 2.0 (the "License");
8 //   you may not use this file except in compliance with the License.
9 //   You may obtain a copy of the License at
10 //
11 //        http://www.apache.org/licenses/LICENSE-2.0
12 //
13 //   Unless required by applicable law or agreed to in writing, software
14 //   distributed under the License is distributed on an "AS IS" BASIS,
15 //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 //   See the License for the specific language governing permissions and
17 //   limitations under the License.
18 //   ========================LICENSE_END===================================
19 //
20
21 package kafkaclient
22
23 import (
24         "time"
25
26         "github.com/confluentinc/confluent-kafka-go/kafka"
27 )
28
29 type KafkaFactory interface {
30         NewKafkaConsumer(topicID string) (KafkaConsumer, error)
31 }
32
33 type KafkaFactoryImpl struct {
34         BootstrapServer string
35 }
36
37 func (kf KafkaFactoryImpl) NewKafkaConsumer(topicID string) (KafkaConsumer, error) {
38         consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
39                 "bootstrap.servers": kf.BootstrapServer,
40                 "group.id":          "dmaap-mediator-producer",
41                 "auto.offset.reset": "earliest",
42         })
43         if err != nil {
44                 return nil, err
45         }
46         return KafkaConsumerImpl{consumer: consumer}, nil
47 }
48
49 func NewKafkaClient(factory KafkaFactory, topicID string) (KafkaClient, error) {
50         consumer, err := factory.NewKafkaConsumer(topicID)
51         if err != nil {
52                 return KafkaClient{}, err
53         }
54         consumer.Commit()
55         err = consumer.Subscribe(topicID)
56         if err != nil {
57                 return KafkaClient{}, err
58         }
59         return KafkaClient{consumer: consumer}, nil
60 }
61
62 type KafkaClient struct {
63         consumer KafkaConsumer
64 }
65
66 func (kc KafkaClient) ReadMessage() ([]byte, error) {
67         msg, err := kc.consumer.ReadMessage(time.Second)
68         if err != nil {
69                 return nil, err
70         }
71         return msg.Value, nil
72 }
73
74 type KafkaConsumer interface {
75         Commit() ([]kafka.TopicPartition, error)
76         Subscribe(topic string) (err error)
77         ReadMessage(timeout time.Duration) (*kafka.Message, error)
78 }
79
80 type KafkaConsumerImpl struct {
81         consumer *kafka.Consumer
82 }
83
84 func (kc KafkaConsumerImpl) Commit() ([]kafka.TopicPartition, error) {
85         return kc.consumer.Commit()
86 }
87
88 func (kc KafkaConsumerImpl) Subscribe(topic string) error {
89         return kc.consumer.Subscribe(topic, nil)
90 }
91
92 func (kc KafkaConsumerImpl) ReadMessage(timeout time.Duration) (*kafka.Message, error) {
93         return kc.consumer.ReadMessage(timeout)
94 }