Add VES stndDefined PM and subscription for O-DU.
[sim/o1-interface.git] / ntsimulator / ntsim-ng / core / app / nf_oran_du.c
1 /*************************************************************************
2 *
3 * Copyright 2021 highstreet technologies GmbH and others
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 ***************************************************************************/
17
18 #define _GNU_SOURCE
19
20 #include "nf_oran_du.h"
21 #include "utils/log_utils.h"
22 #include "utils/sys_utils.h"
23 #include "utils/nts_utils.h"
24 #include "utils/rand_utils.h"
25 #include "utils/http_client.h"
26 #include <stdio.h>
27 #include <pthread.h>
28 #include <assert.h>
29
30 #include <sysrepo.h>
31 #include <sysrepo/values.h>
32 #include <libnetconf2/netconf.h>
33
34 #include "core/framework.h"
35 #include "core/context.h"
36 #include "core/session.h"
37 #include "core/xpath.h"
38
39 typedef struct {
40     char *id;
41     uint32_t counter;
42 } subscription_stream_t;
43
44 static subscription_stream_t **subscription_streams;
45 static int subscription_streams_count;
46
47 static int subscription_streams_add(const char *id);
48 static int subscription_streams_free(const char *id);
49 static subscription_stream_t *subscription_streams_get(const char *id);
50 static int subscription_streams_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data);
51
52 typedef struct {
53     pthread_t thread;
54     sig_atomic_t wait;
55     sig_atomic_t terminate;
56
57     char *id;
58     char *administrative_state;
59     char *user_label;
60     char *job_tag;
61     uint32_t granularity_period;
62     char **performance_metrics;
63     int performance_metrics_count;
64     char *stream_target;
65
66     subscription_stream_t *stream;
67 } pm_job_t;
68
69 static pm_job_t **pm_jobs;
70 static int pm_jobs_count;
71
72
73 static int pm_jobs_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data);
74 static void *pm_job_thread_routine(void *arg);
75
76 typedef struct {
77     //list of all fields, including ves mandatory
78     char **field_name;
79     char **field_value;
80     int field_count;
81
82     pm_job_t *job;
83 } nf_du_template_details_t;
84
85 static void nf_du_template_free(nf_du_template_details_t *details);
86 static char *nf_du_template_process_vars(const char *template, const nf_du_template_details_t *details);
87 static char *nf_du_template_process_function(const char *function, const nf_du_template_details_t *details);
88
89 static char *ves_template = 0;
90
91 int nf_oran_du_init(void) {
92     log_add_verbose(1, LOG_COLOR_BOLD_MAGENTA"NTS_FUNCTION_TYPE_O_RAN_O_DU"LOG_COLOR_RESET " mode initializing...\n");
93
94     pm_jobs = 0;
95     pm_jobs_count = 0;
96
97     subscription_streams = 0;
98     subscription_streams_count = 0;
99
100     ves_template = file_read_content("config/ves_template.json");
101     if(ves_template == 0) {
102         log_error("could not read config/ves_template.json");
103         return NTS_ERR_FAILED;
104     }
105
106     //check whether everything is already populated, read and update (if previously ran)
107     sr_val_t *values = 0;
108     size_t value_count = 0;
109     int rc = sr_get_items(session_running, NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH, 0, 0, &values, &value_count);
110     if(rc != SR_ERR_OK) {
111         log_error("get items failed\n");
112         return NTS_ERR_FAILED;
113     }
114
115     //delete everything
116     if(value_count) {
117         log_add_verbose(2, "pm jobs already found (%d). cleaning up for fresh start...\n", value_count);
118
119         for(int i = 0; i < value_count; i++) {           
120             rc = sr_delete_item(session_running, values[i].xpath, 0);
121             if(rc != SR_ERR_OK) {
122                 log_error("sr_delete_item failed\n");
123                 return NTS_ERR_FAILED;
124             }
125         }
126         rc = sr_apply_changes(session_running, 0, 0);
127         if(rc != SR_ERR_OK) {
128             log_error("sr_apply_changes failed\n");
129             return NTS_ERR_FAILED;
130         }
131
132         sr_free_values(values, value_count);
133     }
134
135     //check subscription-streams
136     rc = sr_get_items(session_running, NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH, 0, 0, &values, &value_count);
137     if(rc != SR_ERR_OK) {
138         log_error("get items failed\n");
139         return NTS_ERR_FAILED;
140     }
141
142     if(value_count) {
143         for(int i = 0; i < value_count; i++) {
144             char *id = strdup(strstr(values[i].xpath, "[id='") + 5);
145             *strstr(id, "'") = 0;
146
147             rc = subscription_streams_add(id);
148              if(rc != NTS_ERR_OK) {
149                 log_error("subscription_streams_add failed\n");
150                 return NTS_ERR_FAILED;
151             }
152             free(id);
153         }
154
155         sr_free_values(values, value_count);
156     }
157
158     log_add_verbose(1, "subscribing to changes on %s...\n", NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH);
159     rc = sr_module_change_subscribe(session_running, NTS_NF_ORAN_DU_MODULE, NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH, subscription_streams_change_cb, NULL, 0, SR_SUBSCR_CTX_REUSE, &session_subscription);
160     if(rc != SR_ERR_OK) {
161         log_error("could not subscribe to module changes: %s\n", sr_strerror(rc));
162         return NTS_ERR_FAILED;
163     }
164
165     log_add_verbose(1, "subscribing to changes on %s...\n", NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH);
166     rc = sr_module_change_subscribe(session_running, NTS_NF_ORAN_DU_MODULE, NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH, pm_jobs_change_cb, NULL, 0, SR_SUBSCR_CTX_REUSE, &session_subscription);
167     if(rc != SR_ERR_OK) {
168         log_error("could not subscribe to module changes: %s\n", sr_strerror(rc));
169         return NTS_ERR_FAILED;
170     }
171
172     return NTS_ERR_OK;
173 }
174
175 void nf_oran_du_free(void) {
176     free(ves_template);
177 }
178
179 static int subscription_streams_add(const char *id) {
180     assert(id);
181
182     subscription_streams_count++;
183     subscription_streams = (subscription_stream_t **)realloc(subscription_streams, sizeof(subscription_stream_t *) * (subscription_streams_count));
184     if(subscription_streams == 0) {
185         log_error("realloc failed\n");
186         return NTS_ERR_FAILED;
187     }
188
189     subscription_streams[subscription_streams_count - 1] = (subscription_stream_t *)malloc(sizeof(subscription_stream_t));
190     if(subscription_streams[subscription_streams_count - 1] == 0) {
191         log_error("malloc failed\n");
192         return NTS_ERR_FAILED;
193     }
194     subscription_streams[subscription_streams_count - 1]->id = strdup(id);
195     if(subscription_streams[subscription_streams_count - 1]->id == 0) {
196         log_error("strdup failed\n");
197         return NTS_ERR_FAILED;
198     }
199     subscription_streams[subscription_streams_count - 1]->counter = 0;
200
201     log_add_verbose(1, "added stream target %s\n", id); //checkAL
202
203     return NTS_ERR_OK;
204 }
205
206 static int subscription_streams_free(const char *id) {
207     assert(id);
208
209     subscription_stream_t *found = 0;
210     for(int i = 0; i < subscription_streams_count; i++) {
211         if(strcmp(id, subscription_streams[i]->id) == 0) {
212             found = subscription_streams[i];
213
214             for(int j = i; j < subscription_streams_count - 1; j++) {
215                 subscription_streams[j] = subscription_streams[j + 1];
216             }
217
218             subscription_streams_count--;
219             subscription_streams = (subscription_stream_t **)realloc(subscription_streams, sizeof(subscription_stream_t *) * (subscription_streams_count));
220             if(subscription_streams_count && (subscription_streams == 0)) {
221                 log_error("realloc failed\n");
222                 return NTS_ERR_FAILED;
223             }
224             break;
225         }
226     }
227
228     if(found == 0) {
229         log_error("could not find subscription stream %s\n", id);
230         return NTS_ERR_FAILED;
231     }
232
233     log_add_verbose(1, "removed stream target %s\n", id);    //checkAL
234
235     free(found->id);
236     free(found);
237
238     return NTS_ERR_OK;
239 }
240
241 static subscription_stream_t *subscription_streams_get(const char *id) {
242     assert(id);
243
244     subscription_stream_t *found = 0;
245     for(int i = 0; i < subscription_streams_count; i++) {
246         if(strcmp(id, subscription_streams[i]->id) == 0) {
247             found = subscription_streams[i];
248             break;
249         }
250     }
251
252     if(found == 0) {
253         log_error("could not find subscription stream %s\n", id);
254     }
255     return found;
256 }
257
258 static int subscription_streams_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data) {
259     sr_change_iter_t *it = 0;
260     int rc = SR_ERR_OK;
261     sr_change_oper_t oper;
262     sr_val_t *old_value = 0;
263     sr_val_t *new_value = 0;
264
265     if(event == SR_EV_CHANGE) {
266         rc = sr_get_changes_iter(session, NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH"/id", &it);
267         if(rc != SR_ERR_OK) {
268             log_error("sr_get_changes_iter failed\n");
269             return SR_ERR_VALIDATION_FAILED;
270         }
271
272         while((rc = sr_get_change_next(session, it, &oper, &old_value, &new_value)) == SR_ERR_OK) {
273             if(oper == SR_OP_CREATED) {
274                 if(subscription_streams_add(new_value->data.string_val) != NTS_ERR_OK) {
275                     log_error("could not create subscription stream\n");
276                     return SR_ERR_OPERATION_FAILED;
277                 }
278             }
279             else if(oper == SR_OP_DELETED) {
280                 if(subscription_streams_free(old_value->data.string_val) != NTS_ERR_OK) {
281                     log_error("could not delete subscription stream\n");
282                     return SR_ERR_OPERATION_FAILED;
283                 }
284             }
285
286             sr_free_val(old_value);
287             sr_free_val(new_value);
288         }
289
290         sr_free_change_iter(it);
291     }
292
293     return SR_ERR_OK;
294 }
295
296
297
298 static int pm_jobs_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data) {
299     sr_change_iter_t *it = 0;
300     int rc = SR_ERR_OK;
301     sr_change_oper_t oper;
302     sr_val_t *old_value = 0;
303     sr_val_t *new_value = 0;
304
305     if(event == SR_EV_CHANGE) {
306         rc = sr_get_changes_iter(session, NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH"/id", &it);
307         if(rc != SR_ERR_OK) {
308             log_error("sr_get_changes_iter failed\n");
309             return SR_ERR_VALIDATION_FAILED;
310         }
311
312         while((rc = sr_get_change_next(session, it, &oper, &old_value, &new_value)) == SR_ERR_OK) {
313             if(oper == SR_OP_CREATED) {
314                 pm_job_t *job = (pm_job_t *)malloc(sizeof(pm_job_t));
315                 if(job == 0) {
316                     log_error("bad realloc\n");
317                     return SR_ERR_OPERATION_FAILED;
318                 }
319
320                 pm_jobs_count++;
321                 pm_jobs = (pm_job_t **)realloc(pm_jobs, sizeof(pm_job_t *) * pm_jobs_count);
322                 if(pm_jobs == 0) {
323                     log_error("bad realloc\n");
324                     return SR_ERR_OPERATION_FAILED;
325                 }
326
327                 pm_jobs[pm_jobs_count - 1] = job;
328
329                 job->id = strdup(new_value->data.string_val);
330                 job->wait = 1; //thread will wait on this flag until 0 in the begining to make sure it has data
331                 job->terminate = 0;
332
333                 if(pthread_create(&job->thread, 0, pm_job_thread_routine, job)) {
334                     log_error("could not create thread for pm job\n");
335                     return SR_ERR_OPERATION_FAILED;
336                 }
337             }
338             else if(oper == SR_OP_DELETED) {
339                 int job_id = -1; 
340                 for(int i = 0; i < pm_jobs_count; i++) {
341                     if(strcmp(pm_jobs[i]->id, old_value->data.string_val) == 0) {
342                         job_id = i;
343                         break;
344                     }
345                 }
346                 if(job_id == -1) {
347                     log_error("could not find corresponding job\n");
348                     return SR_ERR_OPERATION_FAILED;
349                 }
350
351                 pm_job_t *job = pm_jobs[job_id];
352                 job->terminate = 1;
353                 
354                 //remove from list
355                 for(int i = job_id; i < pm_jobs_count - 1; i++) {
356                     pm_jobs[i] = pm_jobs[i + 1];
357                 }
358
359                 pm_jobs_count--;
360                 pm_jobs = (pm_job_t **)realloc(pm_jobs, sizeof(pm_job_t *) * pm_jobs_count);
361                 if(pm_jobs_count && (pm_jobs == 0)) {
362                     log_error("bad realloc\n");
363                     return SR_ERR_OPERATION_FAILED;
364                 }
365             }
366
367             sr_free_val(old_value);
368             sr_free_val(new_value);
369         }
370
371         sr_free_change_iter(it);
372     }
373     else if(event == SR_EV_DONE) {
374         for(int i = 0; i < pm_jobs_count; i++) {
375             pm_jobs[i]->wait = 0;
376         }
377     }
378
379     return SR_ERR_OK;
380 }
381
382 static void *pm_job_thread_routine(void *arg) {
383     pm_job_t *job = (pm_job_t *)arg;
384
385     job->administrative_state = strdup("");
386     job->user_label = strdup("");
387     job->job_tag = strdup("");
388     job->granularity_period = 1;
389     job->performance_metrics = 0;
390     job->performance_metrics_count = 0;
391     job->stream_target = strdup("");
392
393     log_add_verbose(1, "pm_job_thread_routine started for job id %s\n", job->id);
394     while(job->wait) {
395         sleep(1);
396     }
397     log_add_verbose(1, "pm_job_thread_routine[%s] finished waiting...\n", job->id);
398     
399     char *xpath_to_get = 0;
400     asprintf(&xpath_to_get, "%s[id='%s']", NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH, job->id);
401     if(xpath_to_get == 0) {
402         log_error("pm_job_thread_routine[%s] asprintf failed\n", job->id);
403         return (void*)NTS_ERR_FAILED;
404     }
405
406     int rc;
407     sr_session_ctx_t *current_session;
408     rc = sr_session_start(session_connection, SR_DS_RUNNING, &current_session);
409     if(rc != SR_ERR_OK) {
410         log_error("pm_job_thread_routine[%s] could not start sysrepo session\n", job->id);
411         return (void*)NTS_ERR_FAILED;
412     }
413
414     struct lyd_node *data = 0;
415     rc = sr_get_subtree(current_session, xpath_to_get, 0, &data);
416     free(xpath_to_get);
417     if(rc != SR_ERR_OK) {
418         log_error("pm_job_thread_routine[%s] could not get value for xPath=%s from the running datastore\n", job->id, xpath_to_get);
419         sr_session_stop(current_session);
420         return (void*)NTS_ERR_FAILED;
421     }
422
423     struct lyd_node *chd = 0;
424     LY_TREE_FOR(data->child, chd) {
425         if(strcmp(chd->schema->name, "administrative-state") == 0) {
426             const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str;
427             free(job->administrative_state);
428             job->administrative_state = strdup(val);
429             if(job->administrative_state == 0) {
430                 log_error("pm_job_thread_routine[%s] strdup failed\n", job->id);
431                 sr_session_stop(current_session);
432                 return (void*)NTS_ERR_FAILED;
433             }
434         }
435
436         if(strcmp(chd->schema->name, "user-label") == 0) {
437             const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str;
438             free(job->user_label);
439             job->user_label = strdup(val);
440             if(job->user_label == 0) {
441                 log_error("pm_job_thread_routine[%s] strdup failed\n", job->id);
442                 sr_session_stop(current_session);
443                 return (void*)NTS_ERR_FAILED;
444             }
445         }
446
447         if(strcmp(chd->schema->name, "job-tag") == 0) {
448             const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str;
449             free(job->job_tag);
450             job->job_tag = strdup(val);
451             if(job->job_tag == 0) {
452                 log_error("pm_job_thread_routine[%s] strdup failed\n", job->id);
453                 sr_session_stop(current_session);
454                 return (void*)NTS_ERR_FAILED;
455             }
456         }
457
458         if(strcmp(chd->schema->name, "granularity-period") == 0) {
459             job->granularity_period = ((const struct lyd_node_leaf_list *)chd)->value.uint32;
460         }
461
462         if(strcmp(chd->schema->name, "performance-metrics") == 0) {
463             const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str;
464             
465             job->performance_metrics_count++;
466             job->performance_metrics = (char **)realloc(job->performance_metrics, sizeof(char **) * job->performance_metrics_count);
467             if(job->performance_metrics == 0) {
468                 log_error("pm_job_thread_routine[%s] realloc failed\n", job->id);
469                 sr_session_stop(current_session);
470                 return (void*)NTS_ERR_FAILED;
471             }
472             job->performance_metrics[job->performance_metrics_count - 1] = strdup(val);
473             if(job->performance_metrics[job->performance_metrics_count - 1] == 0) {
474                 log_error("pm_job_thread_routine[%s] strdup failed\n", job->id);
475                 sr_session_stop(current_session);
476                 return (void*)NTS_ERR_FAILED;
477             }
478         }
479
480         if(strcmp(chd->schema->name, "stream-target") == 0) {
481             const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str;
482             free(job->stream_target);
483             job->stream_target = strdup(val);
484             if(job->stream_target == 0) {
485                 log_error("pm_job_thread_routine[%s] strdup failed\n", job->id);
486                 sr_session_stop(current_session);
487                 return (void*)NTS_ERR_FAILED;
488             }
489
490             job->stream = subscription_streams_get(job->stream_target);
491             if(job->stream == 0) {
492                 log_error("subscription_streams_get error")
493                 return 0;
494             }
495         }
496     }
497
498     if(job->granularity_period == 0) {
499         job->granularity_period = 1;
500     }
501
502     //get ves details
503     char *ves_details_xpath;
504     asprintf(&ves_details_xpath, "%s[id='%s']", NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH, job->stream_target);
505     if(ves_details_xpath == 0) {
506         log_error("pm_job_thread_routine[%s] asprintf failed\n", job->id);
507         sr_session_stop(current_session);
508         return (void*)NTS_ERR_FAILED;
509     }
510
511     ves_details_t *ves_details = ves_endpoint_details_get(current_session, ves_details_xpath);
512     free(ves_details_xpath);
513     if(ves_details == 0) {
514         log_error("pm_job_thread_routine[%s] ves_endpoint_details_get failed\n", job->id);
515         sr_session_stop(current_session);
516         return (void*)NTS_ERR_FAILED;
517     }
518
519     sr_session_stop(current_session);
520
521     nf_du_template_details_t details;
522     details.job = job;
523     details.field_count = 8;
524     details.field_name = (char **)malloc(sizeof(char *) * details.field_count);
525     details.field_value = (char **)malloc(sizeof(char *) * details.field_count);
526
527     details.field_name[0] = strdup("%%administrative-state%%");
528     details.field_value[0] = strdup(job->administrative_state);
529     details.field_name[1] = strdup("%%user-label%%");
530     details.field_value[1] = strdup(job->user_label);
531     details.field_name[2] = strdup("%%job-tag%%");
532     details.field_value[2] = strdup(job->job_tag);
533     details.field_name[3] = strdup("%%granularity-period%%");
534     asprintf(&details.field_value[3], "%d", job->granularity_period);
535     details.field_name[4] = strdup("%%job-id%%");
536     details.field_value[4] = strdup(job->id);
537     
538     long int now = get_microseconds_since_epoch() / 1000000;
539     long int start_time = now - (now % job->granularity_period);
540     long int end_time = start_time + job->granularity_period;
541     int granularity_period = end_time - now;
542
543     details.field_name[5] = strdup("%%starttime%%");
544     asprintf(&details.field_value[5], "%lu", now);  //first intervall is probably less than granularity-period
545     details.field_name[6] = strdup("%%endtime%%");
546     asprintf(&details.field_value[6], "%lu", end_time);
547
548     details.field_name[7] = strdup("%%starttime-literal%%");
549     
550     struct tm tm = *localtime(&now);
551     asprintf(&details.field_value[7], "%04d-%02d-%02dT%02d:%02d:%02d.%01dZ",
552                 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
553                 tm.tm_hour, tm.tm_min, tm.tm_sec, 0);
554
555     while(job->terminate == 0) {
556         sleep(granularity_period);
557
558         char *content = nf_du_template_process_vars(ves_template, &details);
559         if(content == 0) {
560             log_error("nf_du_template_process_vars failed\n");
561             return (void*)NTS_ERR_FAILED;
562         }
563
564         rc = http_request(ves_details->url, ves_details->username, ves_details->password, "POST", content, 0, 0);
565         if(rc != NTS_ERR_OK) {
566             log_error("pm_job_thread_routine[%s] http_request failed\n", job->id);
567         }
568
569         free(content);
570
571
572         start_time = end_time;
573         end_time = start_time + job->granularity_period;
574         granularity_period = job->granularity_period;
575
576         free(details.field_value[5]);
577         free(details.field_value[6]);
578         free(details.field_value[7]);
579         asprintf(&details.field_value[5], "%lu", start_time);
580         asprintf(&details.field_value[6], "%lu", end_time);
581         struct tm tm = *localtime(&start_time);
582         asprintf(&details.field_value[7], "%04d-%02d-%02dT%02d:%02d:%02d.%01dZ",
583             tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
584             tm.tm_hour, tm.tm_min, tm.tm_sec, 0);
585     }
586
587     log_add_verbose(1, "pm_job_thread_routine[%s] ending...\n", job->id);
588
589     nf_du_template_free(&details);
590     free(ves_details);
591     free(job->id);
592     free(job->administrative_state);
593     free(job->user_label);
594     free(job->job_tag);
595     for(int i = 0; i < job->performance_metrics_count; i++) {
596         free(job->performance_metrics[i]);
597     }
598     free(job->performance_metrics);
599     free(job->stream_target);
600     free(job);
601
602     return (void*)NTS_ERR_OK;
603 }
604
605 static void nf_du_template_free(nf_du_template_details_t *details) {
606     assert(details);
607     
608     for(int j = 0; j < details->field_count; j++) {
609         free(details->field_name[j]);
610         free(details->field_value[j]);
611     }
612
613     free(details->field_name);
614     free(details->field_value);
615 }
616
617 static char *nf_du_template_process_vars(const char *template, const nf_du_template_details_t *details) {
618     assert(template);
619     assert(details);
620
621     char *ret = strdup(template);
622     if(ret == 0) {
623         log_error("strdup error\n");
624         return 0;
625     }
626
627     //if template is blank, do not process anything, means nc notif disabled
628     if(ret[0] == 0) {
629         return ret;
630     }
631
632     char **vars = 0;
633     int vars_count = 0;
634     
635     char **funcs = 0;
636     int funcs_count = 0;
637
638     char *var = 0;
639     char *func = 0;
640
641     //do replacements until no replacement is done
642     int replaced = 1;
643     while(replaced) {
644         replaced = 0;
645
646         var = 0;
647         vars = 0;
648         vars_count = 0;
649         func = 0;
650         funcs = 0;
651         funcs_count = 0;
652
653         char *pos_start;
654
655         //look for vars
656         pos_start = strstr(ret, "%%");
657         while(pos_start) {
658             char *pos_end = strstr(pos_start + 2, "%%");
659             int var_size = pos_end - pos_start + 2;
660             var = (char *)malloc(sizeof(char) * (var_size + 1));
661             if(var == 0) {
662                 log_error("bad malloc\n");
663                 goto nf_du_template_process_vars_failed;
664             }
665
666             for(int i = 0; i < var_size; i++) {
667                 var[i] = pos_start[i];
668             }
669             var[var_size] = 0;
670
671             // found var
672             vars_count++;
673             vars = (char **)realloc(vars, sizeof(char *) * vars_count);
674             if(!vars) {
675                 vars_count = 0;
676                 log_error("bad malloc\n");
677                 goto nf_du_template_process_vars_failed;
678             }
679
680             vars[vars_count - 1] = strdup(var);
681             if(!vars[vars_count - 1]) {
682                 vars_count--;
683                 log_error("bad malloc\n");
684                 goto nf_du_template_process_vars_failed;
685             }
686             free(var);
687             var = 0;
688
689             pos_start = strstr(pos_end + 2, "%%");
690         }
691
692         //look for functions
693         pos_start = strstr(ret, "$$");
694         while(pos_start) {
695             char *pos_end = strstr(pos_start + 2, "$$");
696             int func_size = pos_end - pos_start + 2;
697             func = (char *)malloc(sizeof(char) * (func_size + 1));
698             if(func == 0) {
699                 log_error("bad malloc\n");
700                 goto nf_du_template_process_vars_failed;
701             }
702
703             for(int i = 0; i < func_size; i++) {
704                 func[i] = pos_start[i];
705             }
706             func[func_size] = 0;
707
708             // found func
709             funcs_count++;
710             funcs = (char **)realloc(funcs, sizeof(char *) * funcs_count);
711             if(!funcs) {
712                 funcs_count = 0;
713                 log_error("bad malloc\n");
714                 goto nf_du_template_process_vars_failed;
715             }
716
717             funcs[funcs_count - 1] = strdup(func);
718             if(!funcs[funcs_count - 1]) {
719                 funcs_count--;
720                 log_error("bad malloc\n");
721                 goto nf_du_template_process_vars_failed;
722             }
723             free(func);
724             func = 0;
725
726             pos_start = strstr(pos_end + 2, "$$");
727         }
728
729         //replace vars
730         for(int i = 0; i < vars_count; i++) {
731             char *var_value = 0;
732             for(int j = 0; j < details->field_count; j++) {
733                 if(strcmp(details->field_name[j], vars[i]) == 0) {
734                     var_value = strdup(details->field_value[j]);
735                 }
736             }
737
738             if(var_value == 0) {
739                 log_error("value %s not found\n", vars[i]);
740                 goto nf_du_template_process_vars_failed;
741             }
742
743             ret = str_replace(ret, vars[i], var_value);
744             if(ret == 0) {
745                 free(var_value);
746                 var_value = 0;
747                 goto nf_du_template_process_vars_failed;
748             }
749
750             free(var_value);
751             var_value = 0;
752             replaced++;
753         }
754
755         //replace functions
756         for(int i = 0; i < funcs_count; i++) {
757             char *func_value = nf_du_template_process_function(funcs[i], details);
758             if(func_value == 0) {
759                 log_error("function %s not found\n", vars[i]);
760                 goto nf_du_template_process_vars_failed;
761             }
762
763             ret = str_replace(ret, funcs[i], func_value);
764             if(ret == 0) {
765                 free(func_value);
766                 goto nf_du_template_process_vars_failed;
767             }
768
769             free(func_value);
770             func_value = 0;
771             replaced++;
772         }
773
774         for(int i = 0; i < vars_count; i++) {
775             free(vars[i]);
776         }
777         free(vars);
778         vars = 0;
779         vars_count = 0;
780
781         for(int i = 0; i < funcs_count; i++) {
782             free(funcs[i]);
783         }
784         free(funcs);
785         funcs = 0;
786         funcs_count = 0;
787     }
788
789
790     free(var);
791     free(func);
792     for(int i = 0; i < vars_count; i++) {
793         free(vars[i]);
794     }
795     free(vars);
796
797     for(int i = 0; i < funcs_count; i++) {
798         free(funcs[i]);
799     }
800     free(funcs);
801     return ret;
802
803 nf_du_template_process_vars_failed:
804     free(var);
805     free(func);
806
807     for(int i = 0; i < vars_count; i++) {
808         free(vars[i]);
809     }
810     free(vars);
811
812     for(int i = 0; i < funcs_count; i++) {
813         free(funcs[i]);
814     }
815     free(funcs);
816     return 0;
817 }
818
819 static char *nf_du_template_process_function(const char *function, const nf_du_template_details_t *details) {
820     assert(function);
821     assert(details);
822
823     const char *measurement_template = "{\"measurement-type-instance-reference\": \"%%instance_ref%%\",\"value\": $$uint16_rand$$,\"unit\": \"kbit/s\"}\n";
824
825     if(strcmp(function, "$$du_ves_measurements$$") == 0) {
826         char *ret = strdup("");
827
828         for(int i = 0; i < details->job->performance_metrics_count; i++) {
829             char *ci = str_replace(measurement_template, "%%instance_ref%%", details->job->performance_metrics[i]);
830             if(ci == 0) {
831                 log_error("str_replace failed\n");
832                 return 0;
833             }
834             
835             int nl = strlen(ci) + 2;   //\0 and perhaps comma
836             char *ret2 = (char *)malloc(sizeof(char) * (strlen(ret) + nl));
837             if(ret2 == 0) {
838                 log_error("malloc failed\n");
839                 return 0;
840             }
841             strcpy(ret2, ret);
842             free(ret);
843
844             strcat(ret2, ci);
845             free(ci);
846
847             if(i < (details->job->performance_metrics_count - 1)) {
848                 strcat(ret2, ",");
849             }
850
851             ret = ret2;
852         }
853
854         return ret;
855     }
856     else if(strcmp(function, "$$uint16_rand$$") == 0) {
857         char *ret = 0;
858         asprintf(&ret, "%d", rand_uint16());
859         return ret;
860     }
861     else if(strcmp(function, "$$uint32_counter$$") == 0) {
862         char *ret = 0;
863         
864         asprintf(&ret, "%d", details->job->stream->counter);
865         details->job->stream->counter++;
866         return ret;
867     }
868     else if(strcmp(function, "$$hostname$$") == 0) {
869         char *ret = 0;
870         asprintf(&ret, "%s", framework_environment.settings.hostname);
871         return ret;
872     }
873
874     return 0;
875 }