1 /*************************************************************************
3 * Copyright 2021 highstreet technologies GmbH and others
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 ***************************************************************************/
20 #include "nf_oran_du.h"
21 #include "utils/log_utils.h"
22 #include "utils/sys_utils.h"
23 #include "utils/nts_utils.h"
24 #include "utils/rand_utils.h"
25 #include "utils/http_client.h"
31 #include <sysrepo/values.h>
32 #include <libnetconf2/netconf.h>
34 #include "core/framework.h"
35 #include "core/context.h"
36 #include "core/session.h"
37 #include "core/xpath.h"
42 } subscription_stream_t;
44 static subscription_stream_t **subscription_streams;
45 static int subscription_streams_count;
47 static int subscription_streams_add(const char *id);
48 static int subscription_streams_free(const char *id);
49 static subscription_stream_t *subscription_streams_get(const char *id);
50 static int subscription_streams_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data);
55 sig_atomic_t terminate;
58 char *administrative_state;
61 uint32_t granularity_period;
62 char **performance_metrics;
63 int performance_metrics_count;
66 subscription_stream_t *stream;
69 static pm_job_t **pm_jobs;
70 static int pm_jobs_count;
73 static int pm_jobs_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data);
74 static void *pm_job_thread_routine(void *arg);
77 //list of all fields, including ves mandatory
83 } nf_du_template_details_t;
85 static void nf_du_template_free(nf_du_template_details_t *details);
86 static char *nf_du_template_process_vars(const char *template, const nf_du_template_details_t *details);
87 static char *nf_du_template_process_function(const char *function, const nf_du_template_details_t *details);
89 static char *ves_template = 0;
91 int nf_oran_du_init(void) {
92 log_add_verbose(1, LOG_COLOR_BOLD_MAGENTA"NTS_FUNCTION_TYPE_O_RAN_O_DU"LOG_COLOR_RESET " mode initializing...\n");
97 subscription_streams = 0;
98 subscription_streams_count = 0;
100 ves_template = file_read_content("config/ves_template.json");
101 if(ves_template == 0) {
102 log_error("could not read config/ves_template.json");
103 return NTS_ERR_FAILED;
106 //check whether everything is already populated, read and update (if previously ran)
107 sr_val_t *values = 0;
108 size_t value_count = 0;
109 int rc = sr_get_items(session_running, NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH, 0, 0, &values, &value_count);
110 if(rc != SR_ERR_OK) {
111 log_error("get items failed\n");
112 return NTS_ERR_FAILED;
117 log_add_verbose(2, "pm jobs already found (%d). cleaning up for fresh start...\n", value_count);
119 for(int i = 0; i < value_count; i++) {
120 rc = sr_delete_item(session_running, values[i].xpath, 0);
121 if(rc != SR_ERR_OK) {
122 log_error("sr_delete_item failed\n");
123 return NTS_ERR_FAILED;
126 rc = sr_apply_changes(session_running, 0, 0);
127 if(rc != SR_ERR_OK) {
128 log_error("sr_apply_changes failed\n");
129 return NTS_ERR_FAILED;
132 sr_free_values(values, value_count);
135 //check subscription-streams
136 rc = sr_get_items(session_running, NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH, 0, 0, &values, &value_count);
137 if(rc != SR_ERR_OK) {
138 log_error("get items failed\n");
139 return NTS_ERR_FAILED;
143 for(int i = 0; i < value_count; i++) {
144 char *id = strdup(strstr(values[i].xpath, "[id='") + 5);
145 *strstr(id, "'") = 0;
147 rc = subscription_streams_add(id);
148 if(rc != NTS_ERR_OK) {
149 log_error("subscription_streams_add failed\n");
150 return NTS_ERR_FAILED;
155 sr_free_values(values, value_count);
158 log_add_verbose(1, "subscribing to changes on %s...\n", NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH);
159 rc = sr_module_change_subscribe(session_running, NTS_NF_ORAN_DU_MODULE, NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH, subscription_streams_change_cb, NULL, 0, SR_SUBSCR_CTX_REUSE, &session_subscription);
160 if(rc != SR_ERR_OK) {
161 log_error("could not subscribe to module changes: %s\n", sr_strerror(rc));
162 return NTS_ERR_FAILED;
165 log_add_verbose(1, "subscribing to changes on %s...\n", NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH);
166 rc = sr_module_change_subscribe(session_running, NTS_NF_ORAN_DU_MODULE, NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH, pm_jobs_change_cb, NULL, 0, SR_SUBSCR_CTX_REUSE, &session_subscription);
167 if(rc != SR_ERR_OK) {
168 log_error("could not subscribe to module changes: %s\n", sr_strerror(rc));
169 return NTS_ERR_FAILED;
175 void nf_oran_du_free(void) {
179 static int subscription_streams_add(const char *id) {
182 subscription_streams_count++;
183 subscription_streams = (subscription_stream_t **)realloc(subscription_streams, sizeof(subscription_stream_t *) * (subscription_streams_count));
184 if(subscription_streams == 0) {
185 log_error("realloc failed\n");
186 return NTS_ERR_FAILED;
189 subscription_streams[subscription_streams_count - 1] = (subscription_stream_t *)malloc(sizeof(subscription_stream_t));
190 if(subscription_streams[subscription_streams_count - 1] == 0) {
191 log_error("malloc failed\n");
192 return NTS_ERR_FAILED;
194 subscription_streams[subscription_streams_count - 1]->id = strdup(id);
195 if(subscription_streams[subscription_streams_count - 1]->id == 0) {
196 log_error("strdup failed\n");
197 return NTS_ERR_FAILED;
199 subscription_streams[subscription_streams_count - 1]->counter = 0;
201 log_add_verbose(1, "added stream target %s\n", id); //checkAL
206 static int subscription_streams_free(const char *id) {
209 subscription_stream_t *found = 0;
210 for(int i = 0; i < subscription_streams_count; i++) {
211 if(strcmp(id, subscription_streams[i]->id) == 0) {
212 found = subscription_streams[i];
214 for(int j = i; j < subscription_streams_count - 1; j++) {
215 subscription_streams[j] = subscription_streams[j + 1];
218 subscription_streams_count--;
219 subscription_streams = (subscription_stream_t **)realloc(subscription_streams, sizeof(subscription_stream_t *) * (subscription_streams_count));
220 if(subscription_streams_count && (subscription_streams == 0)) {
221 log_error("realloc failed\n");
222 return NTS_ERR_FAILED;
229 log_error("could not find subscription stream %s\n", id);
230 return NTS_ERR_FAILED;
233 log_add_verbose(1, "removed stream target %s\n", id); //checkAL
241 static subscription_stream_t *subscription_streams_get(const char *id) {
244 subscription_stream_t *found = 0;
245 for(int i = 0; i < subscription_streams_count; i++) {
246 if(strcmp(id, subscription_streams[i]->id) == 0) {
247 found = subscription_streams[i];
253 log_error("could not find subscription stream %s\n", id);
258 static int subscription_streams_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data) {
259 sr_change_iter_t *it = 0;
261 sr_change_oper_t oper;
262 sr_val_t *old_value = 0;
263 sr_val_t *new_value = 0;
265 if(event == SR_EV_CHANGE) {
266 rc = sr_get_changes_iter(session, NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH"/id", &it);
267 if(rc != SR_ERR_OK) {
268 log_error("sr_get_changes_iter failed\n");
269 return SR_ERR_VALIDATION_FAILED;
272 while((rc = sr_get_change_next(session, it, &oper, &old_value, &new_value)) == SR_ERR_OK) {
273 if(oper == SR_OP_CREATED) {
274 if(subscription_streams_add(new_value->data.string_val) != NTS_ERR_OK) {
275 log_error("could not create subscription stream\n");
276 return SR_ERR_OPERATION_FAILED;
279 else if(oper == SR_OP_DELETED) {
280 if(subscription_streams_free(old_value->data.string_val) != NTS_ERR_OK) {
281 log_error("could not delete subscription stream\n");
282 return SR_ERR_OPERATION_FAILED;
286 sr_free_val(old_value);
287 sr_free_val(new_value);
290 sr_free_change_iter(it);
298 static int pm_jobs_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data) {
299 sr_change_iter_t *it = 0;
301 sr_change_oper_t oper;
302 sr_val_t *old_value = 0;
303 sr_val_t *new_value = 0;
305 if(event == SR_EV_CHANGE) {
306 rc = sr_get_changes_iter(session, NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH"/id", &it);
307 if(rc != SR_ERR_OK) {
308 log_error("sr_get_changes_iter failed\n");
309 return SR_ERR_VALIDATION_FAILED;
312 while((rc = sr_get_change_next(session, it, &oper, &old_value, &new_value)) == SR_ERR_OK) {
313 if(oper == SR_OP_CREATED) {
314 pm_job_t *job = (pm_job_t *)malloc(sizeof(pm_job_t));
316 log_error("bad realloc\n");
317 return SR_ERR_OPERATION_FAILED;
321 pm_jobs = (pm_job_t **)realloc(pm_jobs, sizeof(pm_job_t *) * pm_jobs_count);
323 log_error("bad realloc\n");
324 return SR_ERR_OPERATION_FAILED;
327 pm_jobs[pm_jobs_count - 1] = job;
329 job->id = strdup(new_value->data.string_val);
330 job->wait = 1; //thread will wait on this flag until 0 in the begining to make sure it has data
333 if(pthread_create(&job->thread, 0, pm_job_thread_routine, job)) {
334 log_error("could not create thread for pm job\n");
335 return SR_ERR_OPERATION_FAILED;
338 else if(oper == SR_OP_DELETED) {
340 for(int i = 0; i < pm_jobs_count; i++) {
341 if(strcmp(pm_jobs[i]->id, old_value->data.string_val) == 0) {
347 log_error("could not find corresponding job\n");
348 return SR_ERR_OPERATION_FAILED;
351 pm_job_t *job = pm_jobs[job_id];
355 for(int i = job_id; i < pm_jobs_count - 1; i++) {
356 pm_jobs[i] = pm_jobs[i + 1];
360 pm_jobs = (pm_job_t **)realloc(pm_jobs, sizeof(pm_job_t *) * pm_jobs_count);
361 if(pm_jobs_count && (pm_jobs == 0)) {
362 log_error("bad realloc\n");
363 return SR_ERR_OPERATION_FAILED;
367 sr_free_val(old_value);
368 sr_free_val(new_value);
371 sr_free_change_iter(it);
373 else if(event == SR_EV_DONE) {
374 for(int i = 0; i < pm_jobs_count; i++) {
375 pm_jobs[i]->wait = 0;
382 static void *pm_job_thread_routine(void *arg) {
383 pm_job_t *job = (pm_job_t *)arg;
385 job->administrative_state = strdup("");
386 job->user_label = strdup("");
387 job->job_tag = strdup("");
388 job->granularity_period = 1;
389 job->performance_metrics = 0;
390 job->performance_metrics_count = 0;
391 job->stream_target = strdup("");
393 log_add_verbose(1, "pm_job_thread_routine started for job id %s\n", job->id);
397 log_add_verbose(1, "pm_job_thread_routine[%s] finished waiting...\n", job->id);
399 char *xpath_to_get = 0;
400 asprintf(&xpath_to_get, "%s[id='%s']", NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH, job->id);
401 if(xpath_to_get == 0) {
402 log_error("pm_job_thread_routine[%s] asprintf failed\n", job->id);
403 return (void*)NTS_ERR_FAILED;
407 sr_session_ctx_t *current_session;
408 rc = sr_session_start(session_connection, SR_DS_RUNNING, ¤t_session);
409 if(rc != SR_ERR_OK) {
410 log_error("pm_job_thread_routine[%s] could not start sysrepo session\n", job->id);
411 return (void*)NTS_ERR_FAILED;
414 struct lyd_node *data = 0;
415 rc = sr_get_subtree(current_session, xpath_to_get, 0, &data);
417 if(rc != SR_ERR_OK) {
418 log_error("pm_job_thread_routine[%s] could not get value for xPath=%s from the running datastore\n", job->id, xpath_to_get);
419 sr_session_stop(current_session);
420 return (void*)NTS_ERR_FAILED;
423 struct lyd_node *chd = 0;
424 LY_TREE_FOR(data->child, chd) {
425 if(strcmp(chd->schema->name, "administrative-state") == 0) {
426 const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str;
427 free(job->administrative_state);
428 job->administrative_state = strdup(val);
429 if(job->administrative_state == 0) {
430 log_error("pm_job_thread_routine[%s] strdup failed\n", job->id);
431 sr_session_stop(current_session);
432 return (void*)NTS_ERR_FAILED;
436 if(strcmp(chd->schema->name, "user-label") == 0) {
437 const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str;
438 free(job->user_label);
439 job->user_label = strdup(val);
440 if(job->user_label == 0) {
441 log_error("pm_job_thread_routine[%s] strdup failed\n", job->id);
442 sr_session_stop(current_session);
443 return (void*)NTS_ERR_FAILED;
447 if(strcmp(chd->schema->name, "job-tag") == 0) {
448 const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str;
450 job->job_tag = strdup(val);
451 if(job->job_tag == 0) {
452 log_error("pm_job_thread_routine[%s] strdup failed\n", job->id);
453 sr_session_stop(current_session);
454 return (void*)NTS_ERR_FAILED;
458 if(strcmp(chd->schema->name, "granularity-period") == 0) {
459 job->granularity_period = ((const struct lyd_node_leaf_list *)chd)->value.uint32;
462 if(strcmp(chd->schema->name, "performance-metrics") == 0) {
463 const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str;
465 job->performance_metrics_count++;
466 job->performance_metrics = (char **)realloc(job->performance_metrics, sizeof(char **) * job->performance_metrics_count);
467 if(job->performance_metrics == 0) {
468 log_error("pm_job_thread_routine[%s] realloc failed\n", job->id);
469 sr_session_stop(current_session);
470 return (void*)NTS_ERR_FAILED;
472 job->performance_metrics[job->performance_metrics_count - 1] = strdup(val);
473 if(job->performance_metrics[job->performance_metrics_count - 1] == 0) {
474 log_error("pm_job_thread_routine[%s] strdup failed\n", job->id);
475 sr_session_stop(current_session);
476 return (void*)NTS_ERR_FAILED;
480 if(strcmp(chd->schema->name, "stream-target") == 0) {
481 const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str;
482 free(job->stream_target);
483 job->stream_target = strdup(val);
484 if(job->stream_target == 0) {
485 log_error("pm_job_thread_routine[%s] strdup failed\n", job->id);
486 sr_session_stop(current_session);
487 return (void*)NTS_ERR_FAILED;
490 job->stream = subscription_streams_get(job->stream_target);
491 if(job->stream == 0) {
492 log_error("subscription_streams_get error")
498 if(job->granularity_period == 0) {
499 job->granularity_period = 1;
503 char *ves_details_xpath;
504 asprintf(&ves_details_xpath, "%s[id='%s']", NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH, job->stream_target);
505 if(ves_details_xpath == 0) {
506 log_error("pm_job_thread_routine[%s] asprintf failed\n", job->id);
507 sr_session_stop(current_session);
508 return (void*)NTS_ERR_FAILED;
511 ves_details_t *ves_details = ves_endpoint_details_get(current_session, ves_details_xpath);
512 free(ves_details_xpath);
513 if(ves_details == 0) {
514 log_error("pm_job_thread_routine[%s] ves_endpoint_details_get failed\n", job->id);
515 sr_session_stop(current_session);
516 return (void*)NTS_ERR_FAILED;
519 sr_session_stop(current_session);
521 nf_du_template_details_t details;
523 details.field_count = 8;
524 details.field_name = (char **)malloc(sizeof(char *) * details.field_count);
525 details.field_value = (char **)malloc(sizeof(char *) * details.field_count);
527 details.field_name[0] = strdup("%%administrative-state%%");
528 details.field_value[0] = strdup(job->administrative_state);
529 details.field_name[1] = strdup("%%user-label%%");
530 details.field_value[1] = strdup(job->user_label);
531 details.field_name[2] = strdup("%%job-tag%%");
532 details.field_value[2] = strdup(job->job_tag);
533 details.field_name[3] = strdup("%%granularity-period%%");
534 asprintf(&details.field_value[3], "%d", job->granularity_period);
535 details.field_name[4] = strdup("%%job-id%%");
536 details.field_value[4] = strdup(job->id);
538 long int now = get_microseconds_since_epoch() / 1000000;
539 long int start_time = now - (now % job->granularity_period);
540 long int end_time = start_time + job->granularity_period;
541 int granularity_period = end_time - now;
543 details.field_name[5] = strdup("%%starttime%%");
544 asprintf(&details.field_value[5], "%lu", now); //first intervall is probably less than granularity-period
545 details.field_name[6] = strdup("%%endtime%%");
546 asprintf(&details.field_value[6], "%lu", end_time);
548 details.field_name[7] = strdup("%%starttime-literal%%");
550 struct tm tm = *localtime(&now);
551 asprintf(&details.field_value[7], "%04d-%02d-%02dT%02d:%02d:%02d.%01dZ",
552 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
553 tm.tm_hour, tm.tm_min, tm.tm_sec, 0);
555 while(job->terminate == 0) {
556 sleep(granularity_period);
558 char *content = nf_du_template_process_vars(ves_template, &details);
560 log_error("nf_du_template_process_vars failed\n");
561 return (void*)NTS_ERR_FAILED;
564 rc = http_request(ves_details->url, ves_details->username, ves_details->password, "POST", content, 0, 0);
565 if(rc != NTS_ERR_OK) {
566 log_error("pm_job_thread_routine[%s] http_request failed\n", job->id);
572 start_time = end_time;
573 end_time = start_time + job->granularity_period;
574 granularity_period = job->granularity_period;
576 free(details.field_value[5]);
577 free(details.field_value[6]);
578 free(details.field_value[7]);
579 asprintf(&details.field_value[5], "%lu", start_time);
580 asprintf(&details.field_value[6], "%lu", end_time);
581 struct tm tm = *localtime(&start_time);
582 asprintf(&details.field_value[7], "%04d-%02d-%02dT%02d:%02d:%02d.%01dZ",
583 tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday,
584 tm.tm_hour, tm.tm_min, tm.tm_sec, 0);
587 log_add_verbose(1, "pm_job_thread_routine[%s] ending...\n", job->id);
589 nf_du_template_free(&details);
592 free(job->administrative_state);
593 free(job->user_label);
595 for(int i = 0; i < job->performance_metrics_count; i++) {
596 free(job->performance_metrics[i]);
598 free(job->performance_metrics);
599 free(job->stream_target);
602 return (void*)NTS_ERR_OK;
605 static void nf_du_template_free(nf_du_template_details_t *details) {
608 for(int j = 0; j < details->field_count; j++) {
609 free(details->field_name[j]);
610 free(details->field_value[j]);
613 free(details->field_name);
614 free(details->field_value);
617 static char *nf_du_template_process_vars(const char *template, const nf_du_template_details_t *details) {
621 char *ret = strdup(template);
623 log_error("strdup error\n");
627 //if template is blank, do not process anything, means nc notif disabled
641 //do replacements until no replacement is done
656 pos_start = strstr(ret, "%%");
658 char *pos_end = strstr(pos_start + 2, "%%");
659 int var_size = pos_end - pos_start + 2;
660 var = (char *)malloc(sizeof(char) * (var_size + 1));
662 log_error("bad malloc\n");
663 goto nf_du_template_process_vars_failed;
666 for(int i = 0; i < var_size; i++) {
667 var[i] = pos_start[i];
673 vars = (char **)realloc(vars, sizeof(char *) * vars_count);
676 log_error("bad malloc\n");
677 goto nf_du_template_process_vars_failed;
680 vars[vars_count - 1] = strdup(var);
681 if(!vars[vars_count - 1]) {
683 log_error("bad malloc\n");
684 goto nf_du_template_process_vars_failed;
689 pos_start = strstr(pos_end + 2, "%%");
693 pos_start = strstr(ret, "$$");
695 char *pos_end = strstr(pos_start + 2, "$$");
696 int func_size = pos_end - pos_start + 2;
697 func = (char *)malloc(sizeof(char) * (func_size + 1));
699 log_error("bad malloc\n");
700 goto nf_du_template_process_vars_failed;
703 for(int i = 0; i < func_size; i++) {
704 func[i] = pos_start[i];
710 funcs = (char **)realloc(funcs, sizeof(char *) * funcs_count);
713 log_error("bad malloc\n");
714 goto nf_du_template_process_vars_failed;
717 funcs[funcs_count - 1] = strdup(func);
718 if(!funcs[funcs_count - 1]) {
720 log_error("bad malloc\n");
721 goto nf_du_template_process_vars_failed;
726 pos_start = strstr(pos_end + 2, "$$");
730 for(int i = 0; i < vars_count; i++) {
732 for(int j = 0; j < details->field_count; j++) {
733 if(strcmp(details->field_name[j], vars[i]) == 0) {
734 var_value = strdup(details->field_value[j]);
739 log_error("value %s not found\n", vars[i]);
740 goto nf_du_template_process_vars_failed;
743 ret = str_replace(ret, vars[i], var_value);
747 goto nf_du_template_process_vars_failed;
756 for(int i = 0; i < funcs_count; i++) {
757 char *func_value = nf_du_template_process_function(funcs[i], details);
758 if(func_value == 0) {
759 log_error("function %s not found\n", vars[i]);
760 goto nf_du_template_process_vars_failed;
763 ret = str_replace(ret, funcs[i], func_value);
766 goto nf_du_template_process_vars_failed;
774 for(int i = 0; i < vars_count; i++) {
781 for(int i = 0; i < funcs_count; i++) {
792 for(int i = 0; i < vars_count; i++) {
797 for(int i = 0; i < funcs_count; i++) {
803 nf_du_template_process_vars_failed:
807 for(int i = 0; i < vars_count; i++) {
812 for(int i = 0; i < funcs_count; i++) {
819 static char *nf_du_template_process_function(const char *function, const nf_du_template_details_t *details) {
823 const char *measurement_template = "{\"measurement-type-instance-reference\": \"%%instance_ref%%\",\"value\": $$uint16_rand$$,\"unit\": \"kbit/s\"}\n";
825 if(strcmp(function, "$$du_ves_measurements$$") == 0) {
826 char *ret = strdup("");
828 for(int i = 0; i < details->job->performance_metrics_count; i++) {
829 char *ci = str_replace(measurement_template, "%%instance_ref%%", details->job->performance_metrics[i]);
831 log_error("str_replace failed\n");
835 int nl = strlen(ci) + 2; //\0 and perhaps comma
836 char *ret2 = (char *)malloc(sizeof(char) * (strlen(ret) + nl));
838 log_error("malloc failed\n");
847 if(i < (details->job->performance_metrics_count - 1)) {
856 else if(strcmp(function, "$$uint16_rand$$") == 0) {
858 asprintf(&ret, "%d", rand_uint16());
861 else if(strcmp(function, "$$uint32_counter$$") == 0) {
864 asprintf(&ret, "%d", details->job->stream->counter);
865 details->job->stream->counter++;
868 else if(strcmp(function, "$$hostname$$") == 0) {
870 asprintf(&ret, "%s", framework_environment.settings.hostname);