public static final String ZIPPED_PROPERTY = "gzip";
public static final String TYPE_ID_PROPERTY = "type-id";
+ public static final String SOURCE_NAME_PROPERTY = "source-name";
public boolean isZipped() {
if (headers == null) {
}
public String getTypeIdFromHeaders() {
+ return this.getStringProperty(TYPE_ID_PROPERTY);
+ }
+
+ public String getSourceNameFromHeaders() {
+ return this.getStringProperty(SOURCE_NAME_PROPERTY);
+ }
+
+ private String getStringProperty(String propertyName) {
if (headers == null) {
return "";
}
for (Header h : headers) {
- if (h.key().equals(TYPE_ID_PROPERTY)) {
+ if (h.key().equals(propertyName)) {
return new String(h.value());
}
}
return "";
}
+
}
private static final Logger logger = LoggerFactory.getLogger(TopicListener.class);
}
private ReceiverOptions<byte[], byte[]> kafkaInputProperties(String clientId) {
- Map<String, Object> consumerProps = new HashMap<>();
+ Map<String, Object> props = new HashMap<>();
if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
logger.error("No kafka boostrap server is setup");
}
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId);
- consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId);
+ this.applicationConfig.addKafkaSecurityProps(props);
- return ReceiverOptions.<byte[], byte[]>create(consumerProps)
+ return ReceiverOptions.<byte[], byte[]>create(props)
.subscription(Collections.singleton(this.type.getKafkaInputTopic()));
}