Adding source name to kafka headers
[nonrtric/plt/ranpm.git] / pmproducer / src / main / java / org / oran / pmproducer / filter / FilteredData.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2023 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.pmproducer.filter;
22
23 import java.util.ArrayList;
24
25 import lombok.Getter;
26 import lombok.ToString;
27
28 import org.apache.kafka.common.header.Header;
29 import org.apache.kafka.common.header.internals.RecordHeader;
30 import org.oran.pmproducer.tasks.TopicListener.DataFromTopic;
31
32 @ToString
33 public class FilteredData {
34     public final byte[] key;
35     public final byte[] value;
36     public final String infoTypeId;
37
38     @Getter
39     private final boolean isZipped;
40
41     @Getter
42     private final String sourceName;
43
44     private static final FilteredData emptyData = new FilteredData(null, null, null, null);
45
46     public boolean isEmpty() {
47         return (key == null || key.length == 0) && (value == null || value.length == 0);
48     }
49
50     public FilteredData(String sourceName, String typeId, byte[] key, byte[] value) {
51         this(sourceName, typeId, key, value, false);
52     }
53
54     public FilteredData(String nodeName, String typeId, byte[] key, byte[] value, boolean isZipped) {
55         this.key = key;
56         this.value = value;
57         this.isZipped = isZipped;
58         this.infoTypeId = typeId;
59         this.sourceName = nodeName;
60     }
61
62     public String getValueAString() {
63         return value == null ? "" : new String(this.value);
64     }
65
66     public static FilteredData empty() {
67         return emptyData;
68     }
69
70     public Iterable<Header> headers() {
71         ArrayList<Header> result = new ArrayList<>();
72         if (isZipped()) {
73             result.add(new RecordHeader(DataFromTopic.ZIPPED_PROPERTY, null));
74         }
75         result.add(new RecordHeader(DataFromTopic.TYPE_ID_PROPERTY, infoTypeId.getBytes()));
76         if (this.sourceName != null && !this.sourceName.isEmpty()) {
77             result.add(new RecordHeader(DataFromTopic.SOURCE_NAME_PROPERTY, this.sourceName.getBytes()));
78         }
79
80         return result;
81     }
82
83 }