59ec39a8cd7b825f17ab8612165573b455361ca6
[smo/teiv.git] /
1 /*
2  *  ============LICENSE_START=======================================================
3  *  Modifications Copyright (C) 2025 OpenInfra Foundation Europe
4  *  ================================================================================
5  *  Licensed under the Apache License, Version 2.0 (the "License");
6  *  you may not use this file except in compliance with the License.
7  *  You may obtain a copy of the License at
8  *
9  *        http://www.apache.org/licenses/LICENSE-2.0
10  *
11  *  Unless required by applicable law or agreed to in writing, software
12  *  distributed under the License is distributed on an "AS IS" BASIS,
13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  *  See the License for the specific language governing permissions and
15  *  limitations under the License.
16  *
17  *  SPDX-License-Identifier: Apache-2.0
18  *  ============LICENSE_END=========================================================
19  */
20 package org.oran.smo.ncmp_to_teiv_adapter;
21
22 import com.fasterxml.jackson.core.JsonProcessingException;
23 import com.fasterxml.jackson.databind.ObjectMapper;
24 import io.cloudevents.CloudEvent;
25 import lombok.RequiredArgsConstructor;
26 import lombok.extern.slf4j.Slf4j;
27 import org.oran.smo.ncmp_to_teiv_adapter.models.ManagedElementWrapper;
28 import org.springframework.beans.factory.annotation.Value;
29 import org.springframework.scheduling.annotation.Scheduled;
30 import org.springframework.stereotype.Component;
31
32 import java.io.IOException;
33 import java.util.HashMap;
34 import java.util.List;
35 import java.util.Map;
36
37 import static org.oran.smo.ncmp_to_teiv_adapter.ResourceReader.readResourceFile;
38
39 @Slf4j
40 @Component
41 @RequiredArgsConstructor
42 public class NcmpToTeivIngestion {
43
44     @Value("${send-sample-ocucp-event}")
45     private boolean sendSampleOCUCPEvent;
46
47     private static final ObjectMapper objectMapper = new ObjectMapper();
48     private final KafkaEventProducer kafkaEventProducer;
49     private final NcmpPollingClient ncmpPollingClient;
50
51     private Map<String, Object> addedCmHandles = new HashMap<>();
52
53     @Scheduled(fixedRateString = "${polling.interval}")
54     public void pollExternalApi() throws IOException {
55         List<String> cmHandlesToAdd = ncmpPollingClient.getAllCmHandlesFromNcmp();
56         List<String> cmHandlesToDelete = addedCmHandles.keySet().stream().filter(v -> !cmHandlesToAdd.contains(v)).toList();
57         addSmo();
58
59         for (String cmHandle : cmHandlesToAdd) {
60             addCmHandle(cmHandle);
61         }
62         for (String cmHandle : cmHandlesToDelete) {
63             removeCmHandle(cmHandle);
64         }
65         if (sendSampleOCUCPEvent) {
66             sendSampleOCUCPEvent();
67         }
68     }
69
70     private void addSmo() {
71         String payload = SmoPayloadBuilder.build();
72         CloudEvent event = CloudEventFactory.createEvent(payload, "merge");
73         log.info("Sending CloudEvent with payload: {}", payload);
74         kafkaEventProducer.sendCloudEvent(event);
75     }
76
77     private void addCmHandle(String cmHandle) {
78         ManagedElementWrapper wrapper = ncmpPollingClient.getAllManagedElementsFromNcmp(cmHandle);
79         Map<String, Object> json = wrapper.toTeivCloudEventPayload();
80         try {
81             String payload = objectMapper.writeValueAsString(json);
82             CloudEvent event = CloudEventFactory.createEvent(payload, "merge");
83             log.info("Sending CloudEvent with payload: {}", payload);
84             kafkaEventProducer.sendCloudEvent(event);
85             addedCmHandles.put(cmHandle, json);
86         } catch (JsonProcessingException e) {
87             log.error("Error processing data from cmHandle {}. Event not sent. Error message: {}", cmHandle, e
88                     .getMessage());
89         }
90     }
91
92     private void removeCmHandle(String cmHandle) {
93         try {
94             Object json = addedCmHandles.get(cmHandle);
95             String payload = objectMapper.writeValueAsString(json);
96             CloudEvent event = CloudEventFactory.createEvent(payload, "delete");
97             log.info("Sending CloudEvent with payload: {}", payload);
98             kafkaEventProducer.sendCloudEvent(event);
99             addedCmHandles.remove(cmHandle);
100         } catch (JsonProcessingException e) {
101             log.error("Error processing data from cmHandle {}. Event not sent. Error message: {}", cmHandle, e
102                     .getMessage());
103         }
104     }
105
106     private void sendSampleOCUCPEvent() throws IOException {
107         String content = readResourceFile("sample-responses/_3gpp-common-managed-element-ocucp.json");
108         ManagedElementWrapper wrapper = objectMapper.readValue(content, ManagedElementWrapper.class);
109         Map<String, Object> toJson = wrapper.toTeivCloudEventPayload();
110         try {
111             String payload = objectMapper.writeValueAsString(toJson);
112             CloudEvent event = CloudEventFactory.createEvent(payload, "merge");
113             log.info("Sending CloudEvent with payload: {}", payload);
114             kafkaEventProducer.sendCloudEvent(event);
115             addedCmHandles.put("pynts-o-cu-cp-1", toJson);
116         } catch (JsonProcessingException e) {
117             log.error("Error processing ocucp data. Event not sent. Error message: {}", e.getMessage());
118         }
119     }
120 }