Documentation updates
[nonrtric/plt/ranpm.git] / pmproducer / src / main / java / org / oran / pmproducer / tasks / TopicListener.java
index 351fcc6..c85c52e 100644 (file)
@@ -83,6 +83,7 @@ public class TopicListener {
 
         public static final String ZIPPED_PROPERTY = "gzip";
         public static final String TYPE_ID_PROPERTY = "type-id";
+        public static final String SOURCE_NAME_PROPERTY = "source-name";
 
         public boolean isZipped() {
             if (headers == null) {
@@ -97,16 +98,25 @@ public class TopicListener {
         }
 
         public String getTypeIdFromHeaders() {
+            return this.getStringProperty(TYPE_ID_PROPERTY);
+        }
+
+        public String getSourceNameFromHeaders() {
+            return this.getStringProperty(SOURCE_NAME_PROPERTY);
+        }
+
+        private String getStringProperty(String propertyName) {
             if (headers == null) {
                 return "";
             }
             for (Header h : headers) {
-                if (h.key().equals(TYPE_ID_PROPERTY)) {
+                if (h.key().equals(propertyName)) {
                     return new String(h.value());
                 }
             }
             return "";
         }
+
     }
 
     private static final Logger logger = LoggerFactory.getLogger(TopicListener.class);
@@ -155,21 +165,22 @@ public class TopicListener {
     }
 
     private ReceiverOptions<byte[], byte[]> kafkaInputProperties(String clientId) {
-        Map<String, Object> consumerProps = new HashMap<>();
+        Map<String, Object> props = new HashMap<>();
         if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
             logger.error("No kafka boostrap server is setup");
         }
 
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
-        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId);
 
-        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId);
+        this.applicationConfig.addKafkaSecurityProps(props);
 
-        return ReceiverOptions.<byte[], byte[]>create(consumerProps)
+        return ReceiverOptions.<byte[], byte[]>create(props)
                 .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
     }