5e7db906504b0adff4607669838e021e41e48024
[sim/o1-interface.git] / ntsimulator / ntsim-ng / features / ves_heartbeat / ves_heartbeat.c
1 /*************************************************************************
2 *
3 * Copyright 2020 highstreet technologies GmbH and others
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 ***************************************************************************/
17
18 #define _GNU_SOURCE
19
20 #include "ves_heartbeat.h"
21 #include "utils/log_utils.h"
22 #include "utils/sys_utils.h"
23 #include "utils/http_client.h"
24 #include "utils/nts_utils.h"
25 #include <stdio.h>
26 #include <time.h>
27 #include <inttypes.h>
28 #include <assert.h>
29 #include <unistd.h>
30 #include <pthread.h>
31 #include <signal.h>
32
33 #include "core/session.h"
34 #include "core/framework.h"
35
36 #define HEARTBEAT_SCHEMA_XPATH      "/nts-network-function:simulation/network-function/ves/heartbeat-period" 
37
38 static volatile int ves_heartbeat_period;
39 static int ves_sequence_number;
40
41 static pthread_t ves_heartbeat_thread;
42 static pthread_mutex_t ves_heartbeat_lock;
43
44
45 //mutex-guarded access
46 static int ves_heartbeat_period_get(void);
47 static void ves_heartbeat_period_set(int new_period);
48
49 static int ves_heartbeat_send_ves_message(int port);
50 static void *ves_heartbeat_thread_routine(void *arg);
51 static cJSON* ves_create_heartbeat_fields(int heartbeat_period);
52 static int heartbeat_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data);
53
54 static volatile sig_atomic_t ves_heartbeat_stopsig;
55 static sr_subscription_ctx_t *ves_heartbeat_subscription = 0;
56
57 int ves_heartbeat_feature_get_status(void) {
58     return (ves_heartbeat_subscription != 0);
59 }
60
61 int ves_heartbeat_feature_start(sr_session_ctx_t *current_session) {
62     assert_session();
63     assert(current_session);
64
65     if(ves_heartbeat_subscription == 0) {
66         sr_val_t *value = 0;
67         if(pthread_mutex_init(&ves_heartbeat_lock, NULL) != 0) { 
68             log_error("mutex init has failed\n");
69             return NTS_ERR_FAILED; 
70         }
71
72         ves_heartbeat_stopsig = 0;
73         ves_heartbeat_period = 0;
74         ves_sequence_number = 0;
75
76         int rc = sr_get_item(current_session, HEARTBEAT_SCHEMA_XPATH, 0, &value);
77         if(rc == SR_ERR_OK) {
78             ves_heartbeat_period_set(value->data.uint16_val);
79             sr_free_val(value);
80         }
81         else if(rc != SR_ERR_NOT_FOUND) {
82             log_error("sr_get_item failed\n");
83         }
84
85         rc = sr_module_change_subscribe(current_session, "nts-network-function", HEARTBEAT_SCHEMA_XPATH, heartbeat_change_cb, NULL, 0, SR_SUBSCR_CTX_REUSE, &ves_heartbeat_subscription);
86         if(rc != SR_ERR_OK) {
87             log_error("could not subscribe to heartbeat\n");
88             return NTS_ERR_FAILED;
89         }
90
91         if(pthread_create(&ves_heartbeat_thread, 0, ves_heartbeat_thread_routine, 0)) {
92             log_error("could not create thread for heartbeat\n");
93             return NTS_ERR_FAILED;
94         }
95     }
96
97     return NTS_ERR_OK;
98 }
99
100 int ves_heartbeat_feature_stop(void) {
101     assert_session();
102
103     if(ves_heartbeat_subscription) {
104         int rc = sr_unsubscribe(ves_heartbeat_subscription);
105         if(rc != SR_ERR_OK) {
106             log_error("could not subscribe to heartbeat\n");
107             return NTS_ERR_FAILED;
108         }
109
110         void *status;
111         ves_heartbeat_stopsig = 1;
112         pthread_join(ves_heartbeat_thread, &status);
113         pthread_mutex_destroy(&ves_heartbeat_lock);
114
115         ves_heartbeat_subscription = 0;
116     }
117
118     return NTS_ERR_OK;
119 }
120
121
122 static int ves_heartbeat_period_get(void) {
123     pthread_mutex_lock(&ves_heartbeat_lock);
124     int ret = ves_heartbeat_period;
125     pthread_mutex_unlock(&ves_heartbeat_lock);
126     return ret;
127 }
128
129 static void ves_heartbeat_period_set(int new_period) {
130     pthread_mutex_lock(&ves_heartbeat_lock);
131     ves_heartbeat_period = new_period;
132     pthread_mutex_unlock(&ves_heartbeat_lock);
133 }
134
135 static int ves_heartbeat_send_ves_message(int port) {
136     char *hostname_string = framework_environment.settings.hostname;
137     cJSON *post_data_json = cJSON_CreateObject();
138     if(post_data_json == 0) {
139         log_error("cJSON_CreateObject failed\n");
140         return NTS_ERR_FAILED;
141     }
142
143     cJSON *event = cJSON_CreateObject();
144     if(event == 0) {
145         log_error("cJSON_CreateObject failed\n");
146         cJSON_Delete(post_data_json);
147         return NTS_ERR_FAILED;
148     }
149     
150     if(cJSON_AddItemToObject(post_data_json, "event", event) == 0) {
151         log_error("cJOSN_AddItemToObject failed\n");
152         cJSON_Delete(post_data_json);
153         return NTS_ERR_FAILED;
154     }
155
156     cJSON *common_event_header = ves_create_common_event_header("heartbeat", "Controller", hostname_string, port, "Low", ves_sequence_number++);
157     if(common_event_header == 0) {
158         log_error("ves_create_common_event_header failed\n");
159         cJSON_Delete(post_data_json);
160         return NTS_ERR_FAILED;
161     }
162     
163     if(cJSON_AddItemToObject(event, "commonEventHeader", common_event_header) == 0) {
164         log_error("cJOSN_AddItemToObject failed\n");
165         cJSON_Delete(post_data_json);
166         return NTS_ERR_FAILED;
167     }
168
169     cJSON *heartbeat_fields = ves_create_heartbeat_fields(ves_heartbeat_period_get());
170     if(heartbeat_fields == 0) {
171         log_error("ves_create_heartbeat_fields failed\n");
172         cJSON_Delete(post_data_json);
173         return NTS_ERR_FAILED;
174     }
175     
176     if(cJSON_AddItemToObject(event, "heartbeatFields", heartbeat_fields) == 0) {
177         log_error("cJOSN_AddItemToObject failed\n");
178         cJSON_Delete(post_data_json);
179         return NTS_ERR_FAILED;
180     }
181
182     char *post_data = cJSON_PrintUnformatted(post_data_json);
183     cJSON_Delete(post_data_json);
184     if(post_data == 0) {
185         log_error("cJSON_PrintUnformatted failed\n");
186         return NTS_ERR_FAILED;
187     }
188
189     ves_details_t *ves_details = ves_endpoint_details_get(0);
190     if(!ves_details) {
191         log_error("ves_endpoint_details_get failed\n");
192         free(post_data);
193         return NTS_ERR_FAILED;
194     }
195     
196     int rc = http_request(ves_details->url, ves_details->username, ves_details->password, "POST", post_data, 0, 0);
197     ves_details_free(ves_details);
198     free(post_data);
199     
200     if(rc != NTS_ERR_OK) {
201         log_error("http_request failed\n");
202         return NTS_ERR_FAILED;
203     }
204
205     return NTS_ERR_OK;
206 }
207
208 static void *ves_heartbeat_thread_routine(void *arg) {
209     assert_session();
210
211     int current_heartbeat_period = 0;
212     unsigned int timer_counter = 0;
213
214     int ssh_base_port = 0;
215     int tls_base_port = 0;
216     nts_mount_point_addressing_method_t mp = nts_mount_point_addressing_method_get(0);
217     if(mp == UNKNOWN_MAPPING) {
218         log_error("mount-point-addressing-method failed, assuming DOCKER_MAPPING\n");
219         mp = DOCKER_MAPPING;
220         ssh_base_port = STANDARD_NETCONF_PORT;
221         tls_base_port = ssh_base_port + framework_environment.settings.ssh_connections;
222     }
223     else if(mp == DOCKER_MAPPING) {
224         ssh_base_port = STANDARD_NETCONF_PORT;
225         tls_base_port = ssh_base_port + framework_environment.settings.ssh_connections;
226     }
227     else {
228         ssh_base_port = framework_environment.host.ssh_base_port;
229         tls_base_port = framework_environment.host.tls_base_port;       
230     }
231
232
233     while((!framework_sigint) && (!ves_heartbeat_stopsig)) {
234         current_heartbeat_period = ves_heartbeat_period_get();
235         timer_counter++;
236
237         if((timer_counter >= current_heartbeat_period) && (current_heartbeat_period > 0)) {
238             timer_counter = 0;
239
240             if((framework_environment.settings.ssh_connections + framework_environment.settings.tls_connections) > 1) {
241                 for(int port = ssh_base_port; port < ssh_base_port + framework_environment.settings.ssh_connections; port++) {
242                     int rc = ves_heartbeat_send_ves_message(port);
243                     if(rc != NTS_ERR_FAILED) {
244                         log_error("could not send VES heartbeat\n");
245                     }
246                 }
247
248                 for(int port = tls_base_port; port < tls_base_port + framework_environment.settings.tls_connections; port++) {
249                     int rc = ves_heartbeat_send_ves_message(port);
250                     if(rc != NTS_ERR_FAILED) {
251                         log_error("could not send VES heartbeat\n");
252                     }
253                 }
254             }
255             else {
256                 int rc = ves_heartbeat_send_ves_message(0);
257                 if(rc != NTS_ERR_FAILED) {
258                     log_error("could not send VES heartbeat\n");
259                 }
260             }
261             
262         }
263
264         sleep(1);
265     }
266
267     return 0;
268 }
269
270 static cJSON* ves_create_heartbeat_fields(int heartbeat_period) {
271     cJSON *heartbeat_fields = cJSON_CreateObject();
272     if(heartbeat_fields == 0) {
273         log_error("could not create JSON object\n");
274         return 0;
275     }
276
277     if(cJSON_AddStringToObject(heartbeat_fields, "heartbeatFieldsVersion", "3.0") == 0) {
278         log_error("cJSON_Add*ToObject failed\n");
279         cJSON_Delete(heartbeat_fields);
280         return 0;
281     }
282
283     if(cJSON_AddNumberToObject(heartbeat_fields, "heartbeatInterval", (double)(heartbeat_period)) == 0) {
284         log_error("cJSON_Add*ToObject failed\n");
285         cJSON_Delete(heartbeat_fields);
286         return 0;
287     }
288
289     cJSON *additional_fields = cJSON_CreateObject();
290     if(additional_fields == 0) {
291         log_error("could not create JSON object\n");
292         log_error("cJSON_Add*ToObject failed\n");
293         cJSON_Delete(heartbeat_fields);
294         return 0;
295     }
296     
297     if(cJSON_AddItemToObject(heartbeat_fields, "additionalFields", additional_fields) == 0) {
298         log_error("cJSON_Add*ToObject failed\n");
299         cJSON_Delete(heartbeat_fields);
300         return 0;
301     }
302
303     char *current_date_and_time = get_current_date_and_time();
304     if(current_date_and_time == 0) {
305         log_error("get_current_date_and_time failed\n");
306         cJSON_Delete(heartbeat_fields);
307         return 0;
308     }
309
310     if(cJSON_AddStringToObject(additional_fields, "eventTime", current_date_and_time) == 0) {
311         log_error("cJSON_Add*ToObject failed\n");
312         cJSON_Delete(heartbeat_fields);
313         free(current_date_and_time);
314         return 0;
315     }
316     free(current_date_and_time);
317
318     return heartbeat_fields;
319 }
320
321 static int heartbeat_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data) {
322     sr_change_iter_t *it = 0;
323     int rc = SR_ERR_OK;
324     sr_change_oper_t oper;
325     sr_val_t *old_value = 0;
326     sr_val_t *new_value = 0;
327
328     if(event == SR_EV_DONE) {
329         rc = sr_get_changes_iter(session, HEARTBEAT_SCHEMA_XPATH, &it);
330         if(rc != SR_ERR_OK) {
331             log_error("sr_get_changes_iter failed\n");
332             return SR_ERR_VALIDATION_FAILED;
333         }
334
335         while((rc = sr_get_change_next(session, it, &oper, &old_value, &new_value)) == SR_ERR_OK) {
336             ves_heartbeat_period_set(new_value->data.uint16_val);
337             sr_free_val(old_value);
338             sr_free_val(new_value);
339         }
340
341         sr_free_change_iter(it);
342     }
343
344     return SR_ERR_OK;
345 }