Add VES stndDefined PM and subscription for O-DU.
[sim/o1-interface.git] / ntsimulator / ntsim-ng / features / ves_heartbeat / ves_heartbeat.c
1 /*************************************************************************
2 *
3 * Copyright 2020 highstreet technologies GmbH and others
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 ***************************************************************************/
17
18 #define _GNU_SOURCE
19
20 #include "ves_heartbeat.h"
21 #include "utils/log_utils.h"
22 #include "utils/sys_utils.h"
23 #include "utils/http_client.h"
24 #include "utils/nts_utils.h"
25 #include <stdio.h>
26 #include <time.h>
27 #include <inttypes.h>
28 #include <assert.h>
29 #include <unistd.h>
30 #include <pthread.h>
31 #include <signal.h>
32
33 #include "core/session.h"
34 #include "core/framework.h"
35 #include "core/xpath.h"
36
37 static volatile int ves_heartbeat_period;
38 static int ves_sequence_number;
39
40 static pthread_t ves_heartbeat_thread;
41 static pthread_mutex_t ves_heartbeat_lock;
42
43
44 //mutex-guarded access
45 static int ves_heartbeat_period_get(void);
46 static void ves_heartbeat_period_set(int new_period);
47
48 static int ves_heartbeat_send_ves_message(int port);
49 static void *ves_heartbeat_thread_routine(void *arg);
50 static cJSON* ves_create_heartbeat_fields(int heartbeat_period);
51 static int heartbeat_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data);
52
53 static volatile sig_atomic_t ves_heartbeat_stopsig;
54 static sr_subscription_ctx_t *ves_heartbeat_subscription = 0;
55
56 int ves_heartbeat_feature_get_status(void) {
57     return (ves_heartbeat_subscription != 0);
58 }
59
60 int ves_heartbeat_feature_start(sr_session_ctx_t *current_session) {
61     assert_session();
62     assert(current_session);
63
64     if(ves_heartbeat_subscription == 0) {
65         sr_val_t *value = 0;
66         if(pthread_mutex_init(&ves_heartbeat_lock, NULL) != 0) { 
67             log_error("mutex init has failed\n");
68             return NTS_ERR_FAILED; 
69         }
70
71         ves_heartbeat_stopsig = 0;
72         ves_heartbeat_period = 0;
73         ves_sequence_number = 0;
74
75         int rc = sr_get_item(current_session, NTS_NF_VES_HEARTBEAT_SCHEMA_XPATH, 0, &value);
76         if(rc == SR_ERR_OK) {
77             ves_heartbeat_period_set(value->data.uint16_val);
78             sr_free_val(value);
79         }
80         else if(rc != SR_ERR_NOT_FOUND) {
81             log_error("sr_get_item failed\n");
82         }
83
84         rc = sr_module_change_subscribe(current_session, NTS_NETWORK_FUNCTION_MODULE, NTS_NF_VES_HEARTBEAT_SCHEMA_XPATH, heartbeat_change_cb, NULL, 0, SR_SUBSCR_CTX_REUSE, &ves_heartbeat_subscription);
85         if(rc != SR_ERR_OK) {
86             log_error("could not subscribe to heartbeat\n");
87             return NTS_ERR_FAILED;
88         }
89
90         if(pthread_create(&ves_heartbeat_thread, 0, ves_heartbeat_thread_routine, 0)) {
91             log_error("could not create thread for heartbeat\n");
92             return NTS_ERR_FAILED;
93         }
94     }
95
96     return NTS_ERR_OK;
97 }
98
99 int ves_heartbeat_feature_stop(void) {
100     assert_session();
101
102     if(ves_heartbeat_subscription) {
103         int rc = sr_unsubscribe(ves_heartbeat_subscription);
104         if(rc != SR_ERR_OK) {
105             log_error("could not subscribe to heartbeat\n");
106             return NTS_ERR_FAILED;
107         }
108
109         void *status;
110         ves_heartbeat_stopsig = 1;
111         pthread_join(ves_heartbeat_thread, &status);
112         pthread_mutex_destroy(&ves_heartbeat_lock);
113
114         ves_heartbeat_subscription = 0;
115     }
116
117     return NTS_ERR_OK;
118 }
119
120
121 static int ves_heartbeat_period_get(void) {
122     pthread_mutex_lock(&ves_heartbeat_lock);
123     int ret = ves_heartbeat_period;
124     pthread_mutex_unlock(&ves_heartbeat_lock);
125     return ret;
126 }
127
128 static void ves_heartbeat_period_set(int new_period) {
129     pthread_mutex_lock(&ves_heartbeat_lock);
130     ves_heartbeat_period = new_period;
131     pthread_mutex_unlock(&ves_heartbeat_lock);
132 }
133
134 static int ves_heartbeat_send_ves_message(int port) {
135     char *hostname_string = framework_environment.settings.hostname;
136     cJSON *post_data_json = cJSON_CreateObject();
137     if(post_data_json == 0) {
138         log_error("cJSON_CreateObject failed\n");
139         return NTS_ERR_FAILED;
140     }
141
142     cJSON *event = cJSON_CreateObject();
143     if(event == 0) {
144         log_error("cJSON_CreateObject failed\n");
145         cJSON_Delete(post_data_json);
146         return NTS_ERR_FAILED;
147     }
148     
149     if(cJSON_AddItemToObject(post_data_json, "event", event) == 0) {
150         log_error("cJOSN_AddItemToObject failed\n");
151         cJSON_Delete(post_data_json);
152         return NTS_ERR_FAILED;
153     }
154
155     cJSON *common_event_header = ves_create_common_event_header("heartbeat", "Controller", hostname_string, port, "Low", ves_sequence_number++);
156     if(common_event_header == 0) {
157         log_error("ves_create_common_event_header failed\n");
158         cJSON_Delete(post_data_json);
159         return NTS_ERR_FAILED;
160     }
161     
162     if(cJSON_AddItemToObject(event, "commonEventHeader", common_event_header) == 0) {
163         log_error("cJOSN_AddItemToObject failed\n");
164         cJSON_Delete(post_data_json);
165         return NTS_ERR_FAILED;
166     }
167
168     cJSON *heartbeat_fields = ves_create_heartbeat_fields(ves_heartbeat_period_get());
169     if(heartbeat_fields == 0) {
170         log_error("ves_create_heartbeat_fields failed\n");
171         cJSON_Delete(post_data_json);
172         return NTS_ERR_FAILED;
173     }
174     
175     if(cJSON_AddItemToObject(event, "heartbeatFields", heartbeat_fields) == 0) {
176         log_error("cJOSN_AddItemToObject failed\n");
177         cJSON_Delete(post_data_json);
178         return NTS_ERR_FAILED;
179     }
180
181     char *post_data = cJSON_PrintUnformatted(post_data_json);
182     cJSON_Delete(post_data_json);
183     if(post_data == 0) {
184         log_error("cJSON_PrintUnformatted failed\n");
185         return NTS_ERR_FAILED;
186     }
187
188     ves_details_t *ves_details = ves_endpoint_details_get(0, 0);
189     if(!ves_details) {
190         log_error("ves_endpoint_details_get failed\n");
191         free(post_data);
192         return NTS_ERR_FAILED;
193     }
194     
195     int rc = http_request(ves_details->url, ves_details->username, ves_details->password, "POST", post_data, 0, 0);
196     ves_details_free(ves_details);
197     free(post_data);
198     
199     if(rc != NTS_ERR_OK) {
200         log_error("http_request failed\n");
201         return NTS_ERR_FAILED;
202     }
203
204     return NTS_ERR_OK;
205 }
206
207 static void *ves_heartbeat_thread_routine(void *arg) {
208     assert_session();
209
210     int current_heartbeat_period = 0;
211     unsigned int timer_counter = 0;
212
213     int ssh_base_port = 0;
214     int tls_base_port = 0;
215     nts_mount_point_addressing_method_t mp = nts_mount_point_addressing_method_get(0);
216     if(mp == UNKNOWN_MAPPING) {
217         log_error("mount-point-addressing-method failed, assuming DOCKER_MAPPING\n");
218         mp = DOCKER_MAPPING;
219         ssh_base_port = STANDARD_NETCONF_PORT;
220         tls_base_port = ssh_base_port + framework_environment.settings.ssh_connections;
221     }
222     else if(mp == DOCKER_MAPPING) {
223         ssh_base_port = STANDARD_NETCONF_PORT;
224         tls_base_port = ssh_base_port + framework_environment.settings.ssh_connections;
225     }
226     else {
227         ssh_base_port = framework_environment.host.ssh_base_port;
228         tls_base_port = framework_environment.host.tls_base_port;       
229     }
230
231
232     while((!framework_sigint) && (!ves_heartbeat_stopsig)) {
233         current_heartbeat_period = ves_heartbeat_period_get();
234         timer_counter++;
235
236         if((timer_counter >= current_heartbeat_period) && (current_heartbeat_period > 0)) {
237             timer_counter = 0;
238
239             if((framework_environment.settings.ssh_connections + framework_environment.settings.tls_connections) > 1) {
240                 for(int port = ssh_base_port; port < ssh_base_port + framework_environment.settings.ssh_connections; port++) {
241                     int rc = ves_heartbeat_send_ves_message(port);
242                     if(rc != NTS_ERR_FAILED) {
243                         log_error("could not send VES heartbeat\n");
244                     }
245                 }
246
247                 for(int port = tls_base_port; port < tls_base_port + framework_environment.settings.tls_connections; port++) {
248                     int rc = ves_heartbeat_send_ves_message(port);
249                     if(rc != NTS_ERR_FAILED) {
250                         log_error("could not send VES heartbeat\n");
251                     }
252                 }
253             }
254             else {
255                 int rc = ves_heartbeat_send_ves_message(0);
256                 if(rc != NTS_ERR_FAILED) {
257                     log_error("could not send VES heartbeat\n");
258                 }
259             }
260             
261         }
262
263         sleep(1);
264     }
265
266     return 0;
267 }
268
269 static cJSON* ves_create_heartbeat_fields(int heartbeat_period) {
270     cJSON *heartbeat_fields = cJSON_CreateObject();
271     if(heartbeat_fields == 0) {
272         log_error("could not create JSON object\n");
273         return 0;
274     }
275
276     if(cJSON_AddStringToObject(heartbeat_fields, "heartbeatFieldsVersion", "3.0") == 0) {
277         log_error("cJSON_Add*ToObject failed\n");
278         cJSON_Delete(heartbeat_fields);
279         return 0;
280     }
281
282     if(cJSON_AddNumberToObject(heartbeat_fields, "heartbeatInterval", (double)(heartbeat_period)) == 0) {
283         log_error("cJSON_Add*ToObject failed\n");
284         cJSON_Delete(heartbeat_fields);
285         return 0;
286     }
287
288     cJSON *additional_fields = cJSON_CreateObject();
289     if(additional_fields == 0) {
290         log_error("could not create JSON object\n");
291         log_error("cJSON_Add*ToObject failed\n");
292         cJSON_Delete(heartbeat_fields);
293         return 0;
294     }
295     
296     if(cJSON_AddItemToObject(heartbeat_fields, "additionalFields", additional_fields) == 0) {
297         log_error("cJSON_Add*ToObject failed\n");
298         cJSON_Delete(heartbeat_fields);
299         return 0;
300     }
301
302     char *current_date_and_time = get_current_date_and_time();
303     if(current_date_and_time == 0) {
304         log_error("get_current_date_and_time failed\n");
305         cJSON_Delete(heartbeat_fields);
306         return 0;
307     }
308
309     if(cJSON_AddStringToObject(additional_fields, "eventTime", current_date_and_time) == 0) {
310         log_error("cJSON_Add*ToObject failed\n");
311         cJSON_Delete(heartbeat_fields);
312         free(current_date_and_time);
313         return 0;
314     }
315     free(current_date_and_time);
316
317     return heartbeat_fields;
318 }
319
320 static int heartbeat_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data) {
321     sr_change_iter_t *it = 0;
322     int rc = SR_ERR_OK;
323     sr_change_oper_t oper;
324     sr_val_t *old_value = 0;
325     sr_val_t *new_value = 0;
326
327     if(event == SR_EV_DONE) {
328         rc = sr_get_changes_iter(session, NTS_NF_VES_HEARTBEAT_SCHEMA_XPATH, &it);
329         if(rc != SR_ERR_OK) {
330             log_error("sr_get_changes_iter failed\n");
331             return SR_ERR_VALIDATION_FAILED;
332         }
333
334         while((rc = sr_get_change_next(session, it, &oper, &old_value, &new_value)) == SR_ERR_OK) {
335             ves_heartbeat_period_set(new_value->data.uint16_val);
336             sr_free_val(old_value);
337             sr_free_val(new_value);
338         }
339
340         sr_free_change_iter(it);
341     }
342
343     return SR_ERR_OK;
344 }