import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
-
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
ManagedElementWrapper wrapper = ncmpPollingClient.getAllManagedElementsFromNcmp(cmHandle);
Map<String, Object> json = wrapper.toTeivCloudEventPayload();
try {
- String payload = objectMapper.writeValueAsString(json);
- CloudEvent event = CloudEventFactory.createEvent(payload, "merge");
- log.info("Sending CloudEvent with payload: {}", payload);
- kafkaEventProducer.sendCloudEvent(event);
+ sendCloudEvent(json, "merge");
+
+ if (addedCmHandles.containsKey(cmHandle)) {
+ Map<String, Object> previousJson = (Map<String, Object>) addedCmHandles.get(cmHandle);
+
+ Map<String, Object> toDeleteJson = new HashMap<>();
+ toDeleteJson.put("entities", List.of(findMissingItems(extractEntities(previousJson), extractEntities(json),
+ "entities")));
+ toDeleteJson.put("relationships", List.of(findMissingItems(extractRelationships(previousJson),
+ extractRelationships(json), "relationships")));
+
+ sendCloudEvent(toDeleteJson, "delete");
+ }
+
addedCmHandles.put(cmHandle, json);
} catch (JsonProcessingException e) {
log.error("Error processing data from cmHandle {}. Event not sent. Error message: {}", cmHandle, e
}
}
+ private Map<String, List<Map<String, Object>>> extractEntities(Map<String, Object> json) {
+ return (Map<String, List<Map<String, Object>>>) ((List<Object>) json.get("entities")).get(0);
+ }
+
+ private Map<String, List<Map<String, Object>>> extractRelationships(Map<String, Object> json) {
+ return (Map<String, List<Map<String, Object>>>) ((List<Object>) json.get("relationships")).get(0);
+ }
+
+ private Map<String, Object> findMissingItems(Map<String, List<Map<String, Object>>> previousItems,
+ Map<String, List<Map<String, Object>>> currentItems, String itemType) {
+ Map<String, Object> missingItems = new HashMap<>();
+ for (String key : previousItems.keySet()) {
+ List<Map<String, Object>> currentList = currentItems.getOrDefault(key, List.of());
+ List<Map<String, Object>> previousList = previousItems.get(key);
+
+ List<Map<String, Object>> missing = previousList.stream().filter(item -> !currentList.contains(item)).toList();
+ if (!missing.isEmpty()) {
+ missingItems.put(key, missing);
+ log.info("Missing {} for key {}: {}", itemType, key, missing);
+ }
+ }
+ return missingItems;
+ }
+
private void removeCmHandle(String cmHandle) {
try {
Object json = addedCmHandles.get(cmHandle);
- String payload = objectMapper.writeValueAsString(json);
- CloudEvent event = CloudEventFactory.createEvent(payload, "delete");
- log.info("Sending CloudEvent with payload: {}", payload);
- kafkaEventProducer.sendCloudEvent(event);
+ sendCloudEvent((Map<String, Object>) json, "delete");
addedCmHandles.remove(cmHandle);
} catch (JsonProcessingException e) {
log.error("Error processing data from cmHandle {}. Event not sent. Error message: {}", cmHandle, e
}
}
+ private void sendCloudEvent(Map<String, Object> json, String eventType) throws JsonProcessingException {
+ String payload = objectMapper.writeValueAsString(json);
+ CloudEvent event = CloudEventFactory.createEvent(payload, eventType);
+ log.info("Sending CloudEvent with payload: {}", payload);
+ kafkaEventProducer.sendCloudEvent(event);
+ }
+
private void sendSampleOCUCPEvent() throws IOException {
String content = readResourceFile("sample-responses/_3gpp-common-managed-element-ocucp.json");
ManagedElementWrapper wrapper = objectMapper.readValue(content, ManagedElementWrapper.class);
Map<String, Object> toJson = wrapper.toTeivCloudEventPayload();
try {
- String payload = objectMapper.writeValueAsString(toJson);
- CloudEvent event = CloudEventFactory.createEvent(payload, "merge");
- log.info("Sending CloudEvent with payload: {}", payload);
- kafkaEventProducer.sendCloudEvent(event);
- addedCmHandles.put("pynts-o-cu-cp-1", toJson);
+ sendCloudEvent(toJson, "merge");
} catch (JsonProcessingException e) {
log.error("Error processing ocucp data. Event not sent. Error message: {}", e.getMessage());
}