1 /*************************************************************************
3 * Copyright 2020 highstreet technologies GmbH and others
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 ***************************************************************************/
20 #include "ves_heartbeat.h"
21 #include "utils/log_utils.h"
22 #include "utils/sys_utils.h"
23 #include "utils/http_client.h"
24 #include "utils/nts_utils.h"
33 #include "core/session.h"
34 #include "core/framework.h"
35 #include "core/xpath.h"
37 static volatile int ves_heartbeat_period;
38 static int ves_sequence_number;
40 static pthread_t ves_heartbeat_thread;
41 static pthread_mutex_t ves_heartbeat_lock;
44 //mutex-guarded access
45 static int ves_heartbeat_period_get(void);
46 static void ves_heartbeat_period_set(int new_period);
48 static int ves_heartbeat_send_ves_message(int port);
49 static void *ves_heartbeat_thread_routine(void *arg);
50 static cJSON* ves_create_heartbeat_fields(int heartbeat_period);
51 static int heartbeat_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data);
53 static volatile sig_atomic_t ves_heartbeat_stopsig;
54 static sr_subscription_ctx_t *ves_heartbeat_subscription = 0;
56 int ves_heartbeat_feature_get_status(void) {
57 return (ves_heartbeat_subscription != 0);
60 int ves_heartbeat_feature_start(sr_session_ctx_t *current_session) {
62 assert(current_session);
64 if(ves_heartbeat_subscription == 0) {
66 if(pthread_mutex_init(&ves_heartbeat_lock, NULL) != 0) {
67 log_error("mutex init has failed\n");
68 return NTS_ERR_FAILED;
71 ves_heartbeat_stopsig = 0;
72 ves_heartbeat_period = 0;
73 ves_sequence_number = 0;
75 int rc = sr_get_item(current_session, NTS_NF_VES_HEARTBEAT_SCHEMA_XPATH, 0, &value);
77 ves_heartbeat_period_set(value->data.uint16_val);
80 else if(rc != SR_ERR_NOT_FOUND) {
81 log_error("sr_get_item failed\n");
84 rc = sr_module_change_subscribe(current_session, NTS_NETWORK_FUNCTION_MODULE, NTS_NF_VES_HEARTBEAT_SCHEMA_XPATH, heartbeat_change_cb, NULL, 0, SR_SUBSCR_CTX_REUSE, &ves_heartbeat_subscription);
86 log_error("could not subscribe to heartbeat\n");
87 return NTS_ERR_FAILED;
90 if(pthread_create(&ves_heartbeat_thread, 0, ves_heartbeat_thread_routine, 0)) {
91 log_error("could not create thread for heartbeat\n");
92 return NTS_ERR_FAILED;
99 int ves_heartbeat_feature_stop(void) {
102 if(ves_heartbeat_subscription) {
103 int rc = sr_unsubscribe(ves_heartbeat_subscription);
104 if(rc != SR_ERR_OK) {
105 log_error("could not subscribe to heartbeat\n");
106 return NTS_ERR_FAILED;
110 ves_heartbeat_stopsig = 1;
111 pthread_join(ves_heartbeat_thread, &status);
112 pthread_mutex_destroy(&ves_heartbeat_lock);
114 ves_heartbeat_subscription = 0;
121 static int ves_heartbeat_period_get(void) {
122 pthread_mutex_lock(&ves_heartbeat_lock);
123 int ret = ves_heartbeat_period;
124 pthread_mutex_unlock(&ves_heartbeat_lock);
128 static void ves_heartbeat_period_set(int new_period) {
129 pthread_mutex_lock(&ves_heartbeat_lock);
130 ves_heartbeat_period = new_period;
131 pthread_mutex_unlock(&ves_heartbeat_lock);
134 static int ves_heartbeat_send_ves_message(int port) {
135 char *hostname_string = framework_environment.settings.hostname;
136 cJSON *post_data_json = cJSON_CreateObject();
137 if(post_data_json == 0) {
138 log_error("cJSON_CreateObject failed\n");
139 return NTS_ERR_FAILED;
142 cJSON *event = cJSON_CreateObject();
144 log_error("cJSON_CreateObject failed\n");
145 cJSON_Delete(post_data_json);
146 return NTS_ERR_FAILED;
149 if(cJSON_AddItemToObject(post_data_json, "event", event) == 0) {
150 log_error("cJOSN_AddItemToObject failed\n");
151 cJSON_Delete(post_data_json);
152 return NTS_ERR_FAILED;
155 cJSON *common_event_header = ves_create_common_event_header("heartbeat", "Controller", hostname_string, port, "Low", ves_sequence_number++);
156 if(common_event_header == 0) {
157 log_error("ves_create_common_event_header failed\n");
158 cJSON_Delete(post_data_json);
159 return NTS_ERR_FAILED;
162 if(cJSON_AddItemToObject(event, "commonEventHeader", common_event_header) == 0) {
163 log_error("cJOSN_AddItemToObject failed\n");
164 cJSON_Delete(post_data_json);
165 return NTS_ERR_FAILED;
168 cJSON *heartbeat_fields = ves_create_heartbeat_fields(ves_heartbeat_period_get());
169 if(heartbeat_fields == 0) {
170 log_error("ves_create_heartbeat_fields failed\n");
171 cJSON_Delete(post_data_json);
172 return NTS_ERR_FAILED;
175 if(cJSON_AddItemToObject(event, "heartbeatFields", heartbeat_fields) == 0) {
176 log_error("cJOSN_AddItemToObject failed\n");
177 cJSON_Delete(post_data_json);
178 return NTS_ERR_FAILED;
181 char *post_data = cJSON_PrintUnformatted(post_data_json);
182 cJSON_Delete(post_data_json);
184 log_error("cJSON_PrintUnformatted failed\n");
185 return NTS_ERR_FAILED;
188 ves_details_t *ves_details = ves_endpoint_details_get(0);
190 log_error("ves_endpoint_details_get failed\n");
192 return NTS_ERR_FAILED;
195 int rc = http_request(ves_details->url, ves_details->username, ves_details->password, "POST", post_data, 0, 0);
196 ves_details_free(ves_details);
199 if(rc != NTS_ERR_OK) {
200 log_error("http_request failed\n");
201 return NTS_ERR_FAILED;
207 static void *ves_heartbeat_thread_routine(void *arg) {
210 int current_heartbeat_period = 0;
211 unsigned int timer_counter = 0;
213 int ssh_base_port = 0;
214 int tls_base_port = 0;
215 nts_mount_point_addressing_method_t mp = nts_mount_point_addressing_method_get(0);
216 if(mp == UNKNOWN_MAPPING) {
217 log_error("mount-point-addressing-method failed, assuming DOCKER_MAPPING\n");
219 ssh_base_port = STANDARD_NETCONF_PORT;
220 tls_base_port = ssh_base_port + framework_environment.settings.ssh_connections;
222 else if(mp == DOCKER_MAPPING) {
223 ssh_base_port = STANDARD_NETCONF_PORT;
224 tls_base_port = ssh_base_port + framework_environment.settings.ssh_connections;
227 ssh_base_port = framework_environment.host.ssh_base_port;
228 tls_base_port = framework_environment.host.tls_base_port;
232 while((!framework_sigint) && (!ves_heartbeat_stopsig)) {
233 current_heartbeat_period = ves_heartbeat_period_get();
236 if((timer_counter >= current_heartbeat_period) && (current_heartbeat_period > 0)) {
239 if((framework_environment.settings.ssh_connections + framework_environment.settings.tls_connections) > 1) {
240 for(int port = ssh_base_port; port < ssh_base_port + framework_environment.settings.ssh_connections; port++) {
241 int rc = ves_heartbeat_send_ves_message(port);
242 if(rc != NTS_ERR_FAILED) {
243 log_error("could not send VES heartbeat\n");
247 for(int port = tls_base_port; port < tls_base_port + framework_environment.settings.tls_connections; port++) {
248 int rc = ves_heartbeat_send_ves_message(port);
249 if(rc != NTS_ERR_FAILED) {
250 log_error("could not send VES heartbeat\n");
255 int rc = ves_heartbeat_send_ves_message(0);
256 if(rc != NTS_ERR_FAILED) {
257 log_error("could not send VES heartbeat\n");
269 static cJSON* ves_create_heartbeat_fields(int heartbeat_period) {
270 cJSON *heartbeat_fields = cJSON_CreateObject();
271 if(heartbeat_fields == 0) {
272 log_error("could not create JSON object\n");
276 if(cJSON_AddStringToObject(heartbeat_fields, "heartbeatFieldsVersion", "3.0") == 0) {
277 log_error("cJSON_Add*ToObject failed\n");
278 cJSON_Delete(heartbeat_fields);
282 if(cJSON_AddNumberToObject(heartbeat_fields, "heartbeatInterval", (double)(heartbeat_period)) == 0) {
283 log_error("cJSON_Add*ToObject failed\n");
284 cJSON_Delete(heartbeat_fields);
288 cJSON *additional_fields = cJSON_CreateObject();
289 if(additional_fields == 0) {
290 log_error("could not create JSON object\n");
291 log_error("cJSON_Add*ToObject failed\n");
292 cJSON_Delete(heartbeat_fields);
296 if(cJSON_AddItemToObject(heartbeat_fields, "additionalFields", additional_fields) == 0) {
297 log_error("cJSON_Add*ToObject failed\n");
298 cJSON_Delete(heartbeat_fields);
302 char *current_date_and_time = get_current_date_and_time();
303 if(current_date_and_time == 0) {
304 log_error("get_current_date_and_time failed\n");
305 cJSON_Delete(heartbeat_fields);
309 if(cJSON_AddStringToObject(additional_fields, "eventTime", current_date_and_time) == 0) {
310 log_error("cJSON_Add*ToObject failed\n");
311 cJSON_Delete(heartbeat_fields);
312 free(current_date_and_time);
315 free(current_date_and_time);
317 return heartbeat_fields;
320 static int heartbeat_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data) {
321 sr_change_iter_t *it = 0;
323 sr_change_oper_t oper;
324 sr_val_t *old_value = 0;
325 sr_val_t *new_value = 0;
327 if(event == SR_EV_DONE) {
328 rc = sr_get_changes_iter(session, NTS_NF_VES_HEARTBEAT_SCHEMA_XPATH, &it);
329 if(rc != SR_ERR_OK) {
330 log_error("sr_get_changes_iter failed\n");
331 return SR_ERR_VALIDATION_FAILED;
334 while((rc = sr_get_change_next(session, it, &oper, &old_value, &new_value)) == SR_ERR_OK) {
335 ves_heartbeat_period_set(new_value->data.uint16_val);
336 sr_free_val(old_value);
337 sr_free_val(new_value);
340 sr_free_change_iter(it);