X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ntsimulator%2Fntsim-ng%2Fcore%2Fapp%2Fnf_oran_du.c;fp=ntsimulator%2Fntsim-ng%2Fcore%2Fapp%2Fnf_oran_du.c;h=2bdd63c847abd921a9d8c5d8ae1e4d5e0d8ace6a;hb=caec2fcb18e829420672509fe5e356b48d0c3840;hp=0000000000000000000000000000000000000000;hpb=f379349d310d61b27d2c0bf7334d2268cc8f42e2;p=sim%2Fo1-interface.git diff --git a/ntsimulator/ntsim-ng/core/app/nf_oran_du.c b/ntsimulator/ntsim-ng/core/app/nf_oran_du.c new file mode 100644 index 0000000..2bdd63c --- /dev/null +++ b/ntsimulator/ntsim-ng/core/app/nf_oran_du.c @@ -0,0 +1,875 @@ +/************************************************************************* +* +* Copyright 2021 highstreet technologies GmbH and others +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +***************************************************************************/ + +#define _GNU_SOURCE + +#include "nf_oran_du.h" +#include "utils/log_utils.h" +#include "utils/sys_utils.h" +#include "utils/nts_utils.h" +#include "utils/rand_utils.h" +#include "utils/http_client.h" +#include +#include +#include + +#include +#include +#include + +#include "core/framework.h" +#include "core/context.h" +#include "core/session.h" +#include "core/xpath.h" + +typedef struct { + char *id; + uint32_t counter; +} subscription_stream_t; + +static subscription_stream_t **subscription_streams; +static int subscription_streams_count; + +static int subscription_streams_add(const char *id); +static int subscription_streams_free(const char *id); +static subscription_stream_t *subscription_streams_get(const char *id); +static int subscription_streams_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data); + +typedef struct { + pthread_t thread; + sig_atomic_t wait; + sig_atomic_t terminate; + + char *id; + char *administrative_state; + char *user_label; + char *job_tag; + uint32_t granularity_period; + char **performance_metrics; + int performance_metrics_count; + char *stream_target; + + subscription_stream_t *stream; +} pm_job_t; + +static pm_job_t **pm_jobs; +static int pm_jobs_count; + + +static int pm_jobs_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data); +static void *pm_job_thread_routine(void *arg); + +typedef struct { + //list of all fields, including ves mandatory + char **field_name; + char **field_value; + int field_count; + + pm_job_t *job; +} nf_du_template_details_t; + +static void nf_du_template_free(nf_du_template_details_t *details); +static char *nf_du_template_process_vars(const char *template, const nf_du_template_details_t *details); +static char *nf_du_template_process_function(const char *function, const nf_du_template_details_t *details); + +static char *ves_template = 0; + +int nf_oran_du_init(void) { + log_add_verbose(1, LOG_COLOR_BOLD_MAGENTA"NTS_FUNCTION_TYPE_O_RAN_O_DU"LOG_COLOR_RESET " mode initializing...\n"); + + pm_jobs = 0; + pm_jobs_count = 0; + + subscription_streams = 0; + subscription_streams_count = 0; + + ves_template = file_read_content("config/ves_template.json"); + if(ves_template == 0) { + log_error("could not read config/ves_template.json"); + return NTS_ERR_FAILED; + } + + //check whether everything is already populated, read and update (if previously ran) + sr_val_t *values = 0; + size_t value_count = 0; + int rc = sr_get_items(session_running, NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH, 0, 0, &values, &value_count); + if(rc != SR_ERR_OK) { + log_error("get items failed\n"); + return NTS_ERR_FAILED; + } + + //delete everything + if(value_count) { + log_add_verbose(2, "pm jobs already found (%d). cleaning up for fresh start...\n", value_count); + + for(int i = 0; i < value_count; i++) { + rc = sr_delete_item(session_running, values[i].xpath, 0); + if(rc != SR_ERR_OK) { + log_error("sr_delete_item failed\n"); + return NTS_ERR_FAILED; + } + } + rc = sr_apply_changes(session_running, 0, 0); + if(rc != SR_ERR_OK) { + log_error("sr_apply_changes failed\n"); + return NTS_ERR_FAILED; + } + + sr_free_values(values, value_count); + } + + //check subscription-streams + rc = sr_get_items(session_running, NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH, 0, 0, &values, &value_count); + if(rc != SR_ERR_OK) { + log_error("get items failed\n"); + return NTS_ERR_FAILED; + } + + if(value_count) { + for(int i = 0; i < value_count; i++) { + char *id = strdup(strstr(values[i].xpath, "[id='") + 5); + *strstr(id, "'") = 0; + + rc = subscription_streams_add(id); + if(rc != NTS_ERR_OK) { + log_error("subscription_streams_add failed\n"); + return NTS_ERR_FAILED; + } + free(id); + } + + sr_free_values(values, value_count); + } + + log_add_verbose(1, "subscribing to changes on %s...\n", NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH); + rc = sr_module_change_subscribe(session_running, NTS_NF_ORAN_DU_MODULE, NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH, subscription_streams_change_cb, NULL, 0, SR_SUBSCR_CTX_REUSE, &session_subscription); + if(rc != SR_ERR_OK) { + log_error("could not subscribe to module changes: %s\n", sr_strerror(rc)); + return NTS_ERR_FAILED; + } + + log_add_verbose(1, "subscribing to changes on %s...\n", NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH); + rc = sr_module_change_subscribe(session_running, NTS_NF_ORAN_DU_MODULE, NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH, pm_jobs_change_cb, NULL, 0, SR_SUBSCR_CTX_REUSE, &session_subscription); + if(rc != SR_ERR_OK) { + log_error("could not subscribe to module changes: %s\n", sr_strerror(rc)); + return NTS_ERR_FAILED; + } + + return NTS_ERR_OK; +} + +void nf_oran_du_free(void) { + free(ves_template); +} + +static int subscription_streams_add(const char *id) { + assert(id); + + subscription_streams_count++; + subscription_streams = (subscription_stream_t **)realloc(subscription_streams, sizeof(subscription_stream_t *) * (subscription_streams_count)); + if(subscription_streams == 0) { + log_error("realloc failed\n"); + return NTS_ERR_FAILED; + } + + subscription_streams[subscription_streams_count - 1] = (subscription_stream_t *)malloc(sizeof(subscription_stream_t)); + if(subscription_streams[subscription_streams_count - 1] == 0) { + log_error("malloc failed\n"); + return NTS_ERR_FAILED; + } + subscription_streams[subscription_streams_count - 1]->id = strdup(id); + if(subscription_streams[subscription_streams_count - 1]->id == 0) { + log_error("strdup failed\n"); + return NTS_ERR_FAILED; + } + subscription_streams[subscription_streams_count - 1]->counter = 0; + + log_add_verbose(1, "added stream target %s\n", id); //checkAL + + return NTS_ERR_OK; +} + +static int subscription_streams_free(const char *id) { + assert(id); + + subscription_stream_t *found = 0; + for(int i = 0; i < subscription_streams_count; i++) { + if(strcmp(id, subscription_streams[i]->id) == 0) { + found = subscription_streams[i]; + + for(int j = i; j < subscription_streams_count - 1; j++) { + subscription_streams[j] = subscription_streams[j + 1]; + } + + subscription_streams_count--; + subscription_streams = (subscription_stream_t **)realloc(subscription_streams, sizeof(subscription_stream_t *) * (subscription_streams_count)); + if(subscription_streams_count && (subscription_streams == 0)) { + log_error("realloc failed\n"); + return NTS_ERR_FAILED; + } + break; + } + } + + if(found == 0) { + log_error("could not find subscription stream %s\n", id); + return NTS_ERR_FAILED; + } + + log_add_verbose(1, "removed stream target %s\n", id); //checkAL + + free(found->id); + free(found); + + return NTS_ERR_OK; +} + +static subscription_stream_t *subscription_streams_get(const char *id) { + assert(id); + + subscription_stream_t *found = 0; + for(int i = 0; i < subscription_streams_count; i++) { + if(strcmp(id, subscription_streams[i]->id) == 0) { + found = subscription_streams[i]; + break; + } + } + + if(found == 0) { + log_error("could not find subscription stream %s\n", id); + } + return found; +} + +static int subscription_streams_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data) { + sr_change_iter_t *it = 0; + int rc = SR_ERR_OK; + sr_change_oper_t oper; + sr_val_t *old_value = 0; + sr_val_t *new_value = 0; + + if(event == SR_EV_CHANGE) { + rc = sr_get_changes_iter(session, NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH"/id", &it); + if(rc != SR_ERR_OK) { + log_error("sr_get_changes_iter failed\n"); + return SR_ERR_VALIDATION_FAILED; + } + + while((rc = sr_get_change_next(session, it, &oper, &old_value, &new_value)) == SR_ERR_OK) { + if(oper == SR_OP_CREATED) { + if(subscription_streams_add(new_value->data.string_val) != NTS_ERR_OK) { + log_error("could not create subscription stream\n"); + return SR_ERR_OPERATION_FAILED; + } + } + else if(oper == SR_OP_DELETED) { + if(subscription_streams_free(old_value->data.string_val) != NTS_ERR_OK) { + log_error("could not delete subscription stream\n"); + return SR_ERR_OPERATION_FAILED; + } + } + + sr_free_val(old_value); + sr_free_val(new_value); + } + + sr_free_change_iter(it); + } + + return SR_ERR_OK; +} + + + +static int pm_jobs_change_cb(sr_session_ctx_t *session, const char *module_name, const char *xpath, sr_event_t event, uint32_t request_id, void *private_data) { + sr_change_iter_t *it = 0; + int rc = SR_ERR_OK; + sr_change_oper_t oper; + sr_val_t *old_value = 0; + sr_val_t *new_value = 0; + + if(event == SR_EV_CHANGE) { + rc = sr_get_changes_iter(session, NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH"/id", &it); + if(rc != SR_ERR_OK) { + log_error("sr_get_changes_iter failed\n"); + return SR_ERR_VALIDATION_FAILED; + } + + while((rc = sr_get_change_next(session, it, &oper, &old_value, &new_value)) == SR_ERR_OK) { + if(oper == SR_OP_CREATED) { + pm_job_t *job = (pm_job_t *)malloc(sizeof(pm_job_t)); + if(job == 0) { + log_error("bad realloc\n"); + return SR_ERR_OPERATION_FAILED; + } + + pm_jobs_count++; + pm_jobs = (pm_job_t **)realloc(pm_jobs, sizeof(pm_job_t *) * pm_jobs_count); + if(pm_jobs == 0) { + log_error("bad realloc\n"); + return SR_ERR_OPERATION_FAILED; + } + + pm_jobs[pm_jobs_count - 1] = job; + + job->id = strdup(new_value->data.string_val); + job->wait = 1; //thread will wait on this flag until 0 in the begining to make sure it has data + job->terminate = 0; + + if(pthread_create(&job->thread, 0, pm_job_thread_routine, job)) { + log_error("could not create thread for pm job\n"); + return SR_ERR_OPERATION_FAILED; + } + } + else if(oper == SR_OP_DELETED) { + int job_id = -1; + for(int i = 0; i < pm_jobs_count; i++) { + if(strcmp(pm_jobs[i]->id, old_value->data.string_val) == 0) { + job_id = i; + break; + } + } + if(job_id == -1) { + log_error("could not find corresponding job\n"); + return SR_ERR_OPERATION_FAILED; + } + + pm_job_t *job = pm_jobs[job_id]; + job->terminate = 1; + + //remove from list + for(int i = job_id; i < pm_jobs_count - 1; i++) { + pm_jobs[i] = pm_jobs[i + 1]; + } + + pm_jobs_count--; + pm_jobs = (pm_job_t **)realloc(pm_jobs, sizeof(pm_job_t *) * pm_jobs_count); + if(pm_jobs_count && (pm_jobs == 0)) { + log_error("bad realloc\n"); + return SR_ERR_OPERATION_FAILED; + } + } + + sr_free_val(old_value); + sr_free_val(new_value); + } + + sr_free_change_iter(it); + } + else if(event == SR_EV_DONE) { + for(int i = 0; i < pm_jobs_count; i++) { + pm_jobs[i]->wait = 0; + } + } + + return SR_ERR_OK; +} + +static void *pm_job_thread_routine(void *arg) { + pm_job_t *job = (pm_job_t *)arg; + + job->administrative_state = strdup(""); + job->user_label = strdup(""); + job->job_tag = strdup(""); + job->granularity_period = 1; + job->performance_metrics = 0; + job->performance_metrics_count = 0; + job->stream_target = strdup(""); + + log_add_verbose(1, "pm_job_thread_routine started for job id %s\n", job->id); + while(job->wait) { + sleep(1); + } + log_add_verbose(1, "pm_job_thread_routine[%s] finished waiting...\n", job->id); + + char *xpath_to_get = 0; + asprintf(&xpath_to_get, "%s[id='%s']", NTS_NF_ORAN_DU_PM_JOBS_SCHEMA_XPATH, job->id); + if(xpath_to_get == 0) { + log_error("pm_job_thread_routine[%s] asprintf failed\n", job->id); + return (void*)NTS_ERR_FAILED; + } + + int rc; + sr_session_ctx_t *current_session; + rc = sr_session_start(session_connection, SR_DS_RUNNING, ¤t_session); + if(rc != SR_ERR_OK) { + log_error("pm_job_thread_routine[%s] could not start sysrepo session\n", job->id); + return (void*)NTS_ERR_FAILED; + } + + struct lyd_node *data = 0; + rc = sr_get_subtree(current_session, xpath_to_get, 0, &data); + free(xpath_to_get); + if(rc != SR_ERR_OK) { + log_error("pm_job_thread_routine[%s] could not get value for xPath=%s from the running datastore\n", job->id, xpath_to_get); + sr_session_stop(current_session); + return (void*)NTS_ERR_FAILED; + } + + struct lyd_node *chd = 0; + LY_TREE_FOR(data->child, chd) { + if(strcmp(chd->schema->name, "administrative-state") == 0) { + const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str; + free(job->administrative_state); + job->administrative_state = strdup(val); + if(job->administrative_state == 0) { + log_error("pm_job_thread_routine[%s] strdup failed\n", job->id); + sr_session_stop(current_session); + return (void*)NTS_ERR_FAILED; + } + } + + if(strcmp(chd->schema->name, "user-label") == 0) { + const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str; + free(job->user_label); + job->user_label = strdup(val); + if(job->user_label == 0) { + log_error("pm_job_thread_routine[%s] strdup failed\n", job->id); + sr_session_stop(current_session); + return (void*)NTS_ERR_FAILED; + } + } + + if(strcmp(chd->schema->name, "job-tag") == 0) { + const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str; + free(job->job_tag); + job->job_tag = strdup(val); + if(job->job_tag == 0) { + log_error("pm_job_thread_routine[%s] strdup failed\n", job->id); + sr_session_stop(current_session); + return (void*)NTS_ERR_FAILED; + } + } + + if(strcmp(chd->schema->name, "granularity-period") == 0) { + job->granularity_period = ((const struct lyd_node_leaf_list *)chd)->value.uint32; + } + + if(strcmp(chd->schema->name, "performance-metrics") == 0) { + const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str; + + job->performance_metrics_count++; + job->performance_metrics = (char **)realloc(job->performance_metrics, sizeof(char **) * job->performance_metrics_count); + if(job->performance_metrics == 0) { + log_error("pm_job_thread_routine[%s] realloc failed\n", job->id); + sr_session_stop(current_session); + return (void*)NTS_ERR_FAILED; + } + job->performance_metrics[job->performance_metrics_count - 1] = strdup(val); + if(job->performance_metrics[job->performance_metrics_count - 1] == 0) { + log_error("pm_job_thread_routine[%s] strdup failed\n", job->id); + sr_session_stop(current_session); + return (void*)NTS_ERR_FAILED; + } + } + + if(strcmp(chd->schema->name, "stream-target") == 0) { + const char *val = ((const struct lyd_node_leaf_list *)chd)->value_str; + free(job->stream_target); + job->stream_target = strdup(val); + if(job->stream_target == 0) { + log_error("pm_job_thread_routine[%s] strdup failed\n", job->id); + sr_session_stop(current_session); + return (void*)NTS_ERR_FAILED; + } + + job->stream = subscription_streams_get(job->stream_target); + if(job->stream == 0) { + log_error("subscription_streams_get error") + return 0; + } + } + } + + if(job->granularity_period == 0) { + job->granularity_period = 1; + } + + //get ves details + char *ves_details_xpath; + asprintf(&ves_details_xpath, "%s[id='%s']", NTS_NF_ORAN_DU_SUBSCRIPTION_STREAMS_SCHEMA_XPATH, job->stream_target); + if(ves_details_xpath == 0) { + log_error("pm_job_thread_routine[%s] asprintf failed\n", job->id); + sr_session_stop(current_session); + return (void*)NTS_ERR_FAILED; + } + + ves_details_t *ves_details = ves_endpoint_details_get(current_session, ves_details_xpath); + free(ves_details_xpath); + if(ves_details == 0) { + log_error("pm_job_thread_routine[%s] ves_endpoint_details_get failed\n", job->id); + sr_session_stop(current_session); + return (void*)NTS_ERR_FAILED; + } + + sr_session_stop(current_session); + + nf_du_template_details_t details; + details.job = job; + details.field_count = 8; + details.field_name = (char **)malloc(sizeof(char *) * details.field_count); + details.field_value = (char **)malloc(sizeof(char *) * details.field_count); + + details.field_name[0] = strdup("%%administrative-state%%"); + details.field_value[0] = strdup(job->administrative_state); + details.field_name[1] = strdup("%%user-label%%"); + details.field_value[1] = strdup(job->user_label); + details.field_name[2] = strdup("%%job-tag%%"); + details.field_value[2] = strdup(job->job_tag); + details.field_name[3] = strdup("%%granularity-period%%"); + asprintf(&details.field_value[3], "%d", job->granularity_period); + details.field_name[4] = strdup("%%job-id%%"); + details.field_value[4] = strdup(job->id); + + long int now = get_microseconds_since_epoch() / 1000000; + long int start_time = now - (now % job->granularity_period); + long int end_time = start_time + job->granularity_period; + int granularity_period = end_time - now; + + details.field_name[5] = strdup("%%starttime%%"); + asprintf(&details.field_value[5], "%lu", now); //first intervall is probably less than granularity-period + details.field_name[6] = strdup("%%endtime%%"); + asprintf(&details.field_value[6], "%lu", end_time); + + details.field_name[7] = strdup("%%starttime-literal%%"); + + struct tm tm = *localtime(&now); + asprintf(&details.field_value[7], "%04d-%02d-%02dT%02d:%02d:%02d.%01dZ", + tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, + tm.tm_hour, tm.tm_min, tm.tm_sec, 0); + + while(job->terminate == 0) { + sleep(granularity_period); + + char *content = nf_du_template_process_vars(ves_template, &details); + if(content == 0) { + log_error("nf_du_template_process_vars failed\n"); + return (void*)NTS_ERR_FAILED; + } + + rc = http_request(ves_details->url, ves_details->username, ves_details->password, "POST", content, 0, 0); + if(rc != NTS_ERR_OK) { + log_error("pm_job_thread_routine[%s] http_request failed\n", job->id); + } + + free(content); + + + start_time = end_time; + end_time = start_time + job->granularity_period; + granularity_period = job->granularity_period; + + free(details.field_value[5]); + free(details.field_value[6]); + free(details.field_value[7]); + asprintf(&details.field_value[5], "%lu", start_time); + asprintf(&details.field_value[6], "%lu", end_time); + struct tm tm = *localtime(&start_time); + asprintf(&details.field_value[7], "%04d-%02d-%02dT%02d:%02d:%02d.%01dZ", + tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, + tm.tm_hour, tm.tm_min, tm.tm_sec, 0); + } + + log_add_verbose(1, "pm_job_thread_routine[%s] ending...\n", job->id); + + nf_du_template_free(&details); + free(ves_details); + free(job->id); + free(job->administrative_state); + free(job->user_label); + free(job->job_tag); + for(int i = 0; i < job->performance_metrics_count; i++) { + free(job->performance_metrics[i]); + } + free(job->performance_metrics); + free(job->stream_target); + free(job); + + return (void*)NTS_ERR_OK; +} + +static void nf_du_template_free(nf_du_template_details_t *details) { + assert(details); + + for(int j = 0; j < details->field_count; j++) { + free(details->field_name[j]); + free(details->field_value[j]); + } + + free(details->field_name); + free(details->field_value); +} + +static char *nf_du_template_process_vars(const char *template, const nf_du_template_details_t *details) { + assert(template); + assert(details); + + char *ret = strdup(template); + if(ret == 0) { + log_error("strdup error\n"); + return 0; + } + + //if template is blank, do not process anything, means nc notif disabled + if(ret[0] == 0) { + return ret; + } + + char **vars = 0; + int vars_count = 0; + + char **funcs = 0; + int funcs_count = 0; + + char *var = 0; + char *func = 0; + + //do replacements until no replacement is done + int replaced = 1; + while(replaced) { + replaced = 0; + + var = 0; + vars = 0; + vars_count = 0; + func = 0; + funcs = 0; + funcs_count = 0; + + char *pos_start; + + //look for vars + pos_start = strstr(ret, "%%"); + while(pos_start) { + char *pos_end = strstr(pos_start + 2, "%%"); + int var_size = pos_end - pos_start + 2; + var = (char *)malloc(sizeof(char) * (var_size + 1)); + if(var == 0) { + log_error("bad malloc\n"); + goto nf_du_template_process_vars_failed; + } + + for(int i = 0; i < var_size; i++) { + var[i] = pos_start[i]; + } + var[var_size] = 0; + + // found var + vars_count++; + vars = (char **)realloc(vars, sizeof(char *) * vars_count); + if(!vars) { + vars_count = 0; + log_error("bad malloc\n"); + goto nf_du_template_process_vars_failed; + } + + vars[vars_count - 1] = strdup(var); + if(!vars[vars_count - 1]) { + vars_count--; + log_error("bad malloc\n"); + goto nf_du_template_process_vars_failed; + } + free(var); + var = 0; + + pos_start = strstr(pos_end + 2, "%%"); + } + + //look for functions + pos_start = strstr(ret, "$$"); + while(pos_start) { + char *pos_end = strstr(pos_start + 2, "$$"); + int func_size = pos_end - pos_start + 2; + func = (char *)malloc(sizeof(char) * (func_size + 1)); + if(func == 0) { + log_error("bad malloc\n"); + goto nf_du_template_process_vars_failed; + } + + for(int i = 0; i < func_size; i++) { + func[i] = pos_start[i]; + } + func[func_size] = 0; + + // found func + funcs_count++; + funcs = (char **)realloc(funcs, sizeof(char *) * funcs_count); + if(!funcs) { + funcs_count = 0; + log_error("bad malloc\n"); + goto nf_du_template_process_vars_failed; + } + + funcs[funcs_count - 1] = strdup(func); + if(!funcs[funcs_count - 1]) { + funcs_count--; + log_error("bad malloc\n"); + goto nf_du_template_process_vars_failed; + } + free(func); + func = 0; + + pos_start = strstr(pos_end + 2, "$$"); + } + + //replace vars + for(int i = 0; i < vars_count; i++) { + char *var_value = 0; + for(int j = 0; j < details->field_count; j++) { + if(strcmp(details->field_name[j], vars[i]) == 0) { + var_value = strdup(details->field_value[j]); + } + } + + if(var_value == 0) { + log_error("value %s not found\n", vars[i]); + goto nf_du_template_process_vars_failed; + } + + ret = str_replace(ret, vars[i], var_value); + if(ret == 0) { + free(var_value); + var_value = 0; + goto nf_du_template_process_vars_failed; + } + + free(var_value); + var_value = 0; + replaced++; + } + + //replace functions + for(int i = 0; i < funcs_count; i++) { + char *func_value = nf_du_template_process_function(funcs[i], details); + if(func_value == 0) { + log_error("function %s not found\n", vars[i]); + goto nf_du_template_process_vars_failed; + } + + ret = str_replace(ret, funcs[i], func_value); + if(ret == 0) { + free(func_value); + goto nf_du_template_process_vars_failed; + } + + free(func_value); + func_value = 0; + replaced++; + } + + for(int i = 0; i < vars_count; i++) { + free(vars[i]); + } + free(vars); + vars = 0; + vars_count = 0; + + for(int i = 0; i < funcs_count; i++) { + free(funcs[i]); + } + free(funcs); + funcs = 0; + funcs_count = 0; + } + + + free(var); + free(func); + for(int i = 0; i < vars_count; i++) { + free(vars[i]); + } + free(vars); + + for(int i = 0; i < funcs_count; i++) { + free(funcs[i]); + } + free(funcs); + return ret; + +nf_du_template_process_vars_failed: + free(var); + free(func); + + for(int i = 0; i < vars_count; i++) { + free(vars[i]); + } + free(vars); + + for(int i = 0; i < funcs_count; i++) { + free(funcs[i]); + } + free(funcs); + return 0; +} + +static char *nf_du_template_process_function(const char *function, const nf_du_template_details_t *details) { + assert(function); + assert(details); + + const char *measurement_template = "{\"measurement-type-instance-reference\": \"%%instance_ref%%\",\"value\": $$uint16_rand$$,\"unit\": \"kbit/s\"}\n"; + + if(strcmp(function, "$$du_ves_measurements$$") == 0) { + char *ret = strdup(""); + + for(int i = 0; i < details->job->performance_metrics_count; i++) { + char *ci = str_replace(measurement_template, "%%instance_ref%%", details->job->performance_metrics[i]); + if(ci == 0) { + log_error("str_replace failed\n"); + return 0; + } + + int nl = strlen(ci) + 2; //\0 and perhaps comma + char *ret2 = (char *)malloc(sizeof(char) * (strlen(ret) + nl)); + if(ret2 == 0) { + log_error("malloc failed\n"); + return 0; + } + strcpy(ret2, ret); + free(ret); + + strcat(ret2, ci); + free(ci); + + if(i < (details->job->performance_metrics_count - 1)) { + strcat(ret2, ","); + } + + ret = ret2; + } + + return ret; + } + else if(strcmp(function, "$$uint16_rand$$") == 0) { + char *ret = 0; + asprintf(&ret, "%d", rand_uint16()); + return ret; + } + else if(strcmp(function, "$$uint32_counter$$") == 0) { + char *ret = 0; + + asprintf(&ret, "%d", details->job->stream->counter); + details->job->stream->counter++; + return ret; + } + else if(strcmp(function, "$$hostname$$") == 0) { + char *ret = 0; + asprintf(&ret, "%s", framework_environment.settings.hostname); + return ret; + } + + return 0; +}