X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=pmproducer%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fpmproducer%2Ftasks%2FTopicListener.java;h=c85c52ed0147ec2de7eb9729d6eca044bb05a882;hb=08e640604edf8056bfe35155c04e88653e1872c4;hp=351fcc6d2163aa64d168b6dc7b943d7943758d4d;hpb=6dfbff6834c3a9da2d8f06b15eb94048cbad2d88;p=nonrtric%2Fplt%2Franpm.git diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java index 351fcc6..c85c52e 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java @@ -83,6 +83,7 @@ public class TopicListener { public static final String ZIPPED_PROPERTY = "gzip"; public static final String TYPE_ID_PROPERTY = "type-id"; + public static final String SOURCE_NAME_PROPERTY = "source-name"; public boolean isZipped() { if (headers == null) { @@ -97,16 +98,25 @@ public class TopicListener { } public String getTypeIdFromHeaders() { + return this.getStringProperty(TYPE_ID_PROPERTY); + } + + public String getSourceNameFromHeaders() { + return this.getStringProperty(SOURCE_NAME_PROPERTY); + } + + private String getStringProperty(String propertyName) { if (headers == null) { return ""; } for (Header h : headers) { - if (h.key().equals(TYPE_ID_PROPERTY)) { + if (h.key().equals(propertyName)) { return new String(h.value()); } } return ""; } + } private static final Logger logger = LoggerFactory.getLogger(TopicListener.class); @@ -155,21 +165,22 @@ public class TopicListener { } private ReceiverOptions kafkaInputProperties(String clientId) { - Map consumerProps = new HashMap<>(); + Map props = new HashMap<>(); if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) { logger.error("No kafka boostrap server is setup"); } - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId); - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId); + this.applicationConfig.addKafkaSecurityProps(props); - return ReceiverOptions.create(consumerProps) + return ReceiverOptions.create(props) .subscription(Collections.singleton(this.type.getKafkaInputTopic())); }