1 /*************************************************************************
3 * Copyright 2020 highstreet technologies GmbH and others
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
9 * http://www.apache.org/licenses/LICENSE-2.0
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 ***************************************************************************/
21 #include "utils/log_utils.h"
22 #include "utils/sys_utils.h"
28 #include "core/framework.h"
29 #include "core/session.h"
31 static manager_operation_t *manager_operations;
32 static pthread_mutex_t manager_operations_mutex;
33 static sem_t manager_operations_sem;
35 manager_protocol_type_t manager_port[65536];
37 static int manager_operations_execute(manager_operation_t *oper);
39 int manager_operations_init(void) {
40 manager_operations = 0;
41 if(pthread_mutex_init(&manager_operations_mutex, NULL) != 0) {
42 log_error("mutex init has failed\n");
43 return NTS_ERR_FAILED;
46 if(sem_init(&manager_operations_sem, 0, 0) != 0) {
47 log_error("sem init has failed\n");
48 return NTS_ERR_FAILED;
51 //checkAL ar fi misto sa stim ce porturi sunt si ce porturi nu sunt available...
52 for(int i = 0; i < 1000; i++) {
53 manager_port[i] = MANAGER_PROTOCOL_UNAVAILABLE;
56 for(int i = 1000; i < 65536; i++) {
57 manager_port[i] = MANAGER_PROTOCOL_UNUSED;
63 void manager_operations_loop(void) {
67 clock_gettime(CLOCK_REALTIME, &ts);
70 if(sem_timedwait(&manager_operations_sem, &ts) == 0) {
73 rc = sr_lock(session_running, NTS_MANAGER_MODULE);
84 log_error("sr_lock failed\n");
85 //checkAL ce facem acum ?
88 pthread_mutex_lock(&manager_operations_mutex);
90 const char *status = "SUCCESS";
94 while(manager_operations) {
95 //pop operation from list
96 manager_operation_t *oper = manager_operations;
97 manager_operations = manager_operations->next;
99 //if operation is RPC first update any *other* fields
101 rc = manager_operations_execute(oper);
102 if(rc != NTS_ERR_OK) {
103 log_error("manager_operations_execute failed\n");
105 strcpy(errmsg, oper->errmsg);
106 manager_operations_free_oper(oper);
110 manager_operations_free_oper(oper);
113 for(int i = 0; i < docker_context_count; i++) {
114 //do any reconfig necesarry
115 for(int j = 0; j < manager_context[i].started_instances; j++) {
116 if(manager_context[i].instance[j].is_configured == false) {
117 rc = manager_actions_config_instance(&manager_context[i], &manager_context[i].instance[j]);
118 if(rc != NTS_ERR_OK) {
119 status = "reconfig FAILED";
120 sprintf(errmsg, "reconfig FAILED - instance %s", manager_context[i].instance[j].container.name);
121 log_error("%s\n", errmsg);
127 rc = manager_sr_on_last_operation_status(status, errmsg);
128 if(rc != NTS_ERR_OK) {
129 log_error("manager_sr_on_last_operation_status failed\n");
132 pthread_mutex_unlock(&manager_operations_mutex);
133 rc = sr_unlock(session_running, NTS_MANAGER_MODULE); //release datastore
134 if(rc != SR_ERR_OK) {
135 log_error("sr_unlock failed\n");
140 void manager_operations_free(void) {
141 //terminate all containers
142 for(int i = 0; i < docker_context_count; i++) {
143 while(manager_context[i].started_instances) {
144 manager_actions_stop(&manager_context[i]);
148 sem_destroy(&manager_operations_sem);
149 pthread_mutex_destroy(&manager_operations_mutex);
152 manager_operation_t *manager_operations_new_oper(manager_operation_type_t type) {
153 manager_operation_t *new_oper = malloc(sizeof(manager_operation_t));
155 log_error("malloc failed\n");
159 new_oper->type = type;
161 new_oper->ft_index = -1;
162 new_oper->function_type = 0;
164 new_oper->started_instances = -1;
165 new_oper->mounted_instances = -1;
167 new_oper->docker_instance_name = 0;
168 new_oper->docker_version_tag = 0;
169 new_oper->docker_repository = 0;
171 new_oper->mount_point_addressing_method = 0;
173 new_oper->fault_generation.delay_period = 0;
174 new_oper->fault_generation.delay_period_count = -1;
176 new_oper->netconf.faults_enabled = -1;
177 new_oper->netconf.call_home = -1;
179 new_oper->ves.faults_enabled = -1;
180 new_oper->ves.pnf_registration = -1;
181 new_oper->ves.heartbeat_period = -1;
183 new_oper->errmsg = 0;
189 int manager_operations_free_oper(manager_operation_t *oper) {
192 free(oper->function_type);
193 free(oper->docker_instance_name);
194 free(oper->docker_repository);
195 free(oper->docker_version_tag);
196 free(oper->mount_point_addressing_method);
203 int manager_operations_begin(void) {
204 return pthread_mutex_lock(&manager_operations_mutex);
207 int manager_operations_add(manager_operation_t *oper) {
210 if(manager_operations == 0) {
211 manager_operations = oper;
214 manager_operation_t *h = manager_operations;
224 void manager_operations_finish_and_execute(void) {
225 pthread_mutex_unlock(&manager_operations_mutex);
226 sem_post(&manager_operations_sem);
229 void manager_operations_finish_with_error(void) {
230 while(manager_operations) {
231 manager_operation_t *h = manager_operations->next;
232 manager_operations_free_oper(manager_operations);
233 manager_operations = h;
235 pthread_mutex_unlock(&manager_operations_mutex);
236 sem_post(&manager_operations_sem);
241 int manager_operations_validate(manager_operation_t *oper) {
244 //prepopulate unset values
245 if(oper->docker_instance_name == 0) {
246 oper->docker_instance_name = strdup(manager_context[oper->ft_index].docker_instance_name);
249 if(oper->docker_repository == 0) {
250 oper->docker_repository = strdup(manager_context[oper->ft_index].docker_repository);
253 if(oper->docker_version_tag == 0) {
254 oper->docker_version_tag = strdup(manager_context[oper->ft_index].docker_version_tag);
257 if(oper->started_instances == -1) {
258 oper->started_instances = manager_context[oper->ft_index].started_instances;
261 if(oper->mounted_instances == -1) {
262 oper->mounted_instances = manager_context[oper->ft_index].mounted_instances;
265 //check docker image if exists
267 for(int i = 0; i < docker_context[oper->ft_index].available_images_count; i++) {
268 if(strcmp(docker_context[oper->ft_index].available_images[i].repo, oper->docker_repository) == 0) {
269 if(strcmp(docker_context[oper->ft_index].available_images[i].tag, oper->docker_version_tag) == 0) {
277 log_error("could not find image: %s/%s:%s\n", oper->docker_repository, docker_context[oper->ft_index].image, oper->docker_version_tag);
278 return NTS_ERR_FAILED;
284 static int manager_operations_execute(manager_operation_t *oper) {
287 int k = oper->ft_index;
290 //operation --> actions
291 if(manager_context[k].started_instances > oper->started_instances) {
293 while(manager_context[k].started_instances > oper->started_instances) {
295 rc = manager_actions_stop(&manager_context[k]);
296 if(rc != NTS_ERR_OK) {
297 asprintf(&oper->errmsg, "stop FAILED - function-type %s", manager_context[k].function_type);
298 log_error("%s\n", oper->errmsg);
300 return NTS_ERR_FAILED;
303 rc = manager_sr_update_context(&manager_context[k]);
304 if(rc != NTS_ERR_OK) {
305 log_error("manager_sr_update_context failed\n");
310 else if(manager_context[k].started_instances < oper->started_instances) {
312 while(manager_context[k].started_instances < oper->started_instances) {
314 rc = manager_actions_start(&manager_context[k]);
315 if(rc != NTS_ERR_OK) {
316 asprintf(&oper->errmsg, "start FAILED - function-type %s", manager_context[k].function_type);
317 log_error("%s\n", oper->errmsg);
318 return NTS_ERR_FAILED;
321 rc = manager_sr_update_context(&manager_context[k]);
322 if(rc != NTS_ERR_OK) {
323 log_error("manager_sr_update_context failed\n");
326 rc = manager_actions_config_instance(&manager_context[k], &manager_context[k].instance[manager_context[k].started_instances - 1]);
327 if(rc != NTS_ERR_OK) {
328 asprintf(&oper->errmsg, "config FAILED - instance %s", manager_context[k].instance[manager_context[k].started_instances - 1].container.name);
329 log_error("%s\n", oper->errmsg);
334 if(manager_context[k].mounted_instances > oper->mounted_instances) {
336 while(manager_context[k].mounted_instances > oper->mounted_instances) {
337 rc = manager_actions_unmount(&manager_context[k]);
338 if(rc != NTS_ERR_OK) {
339 asprintf(&oper->errmsg, "unmount FAILED - instance %s", manager_context[k].instance[manager_context[k].mounted_instances - 1].container.name);
340 log_error("%s\n", oper->errmsg);
341 return NTS_ERR_FAILED;
344 rc = manager_sr_update_context(&manager_context[k]);
345 if(rc != NTS_ERR_OK) {
346 log_error("manager_sr_update_context failed\n");
351 else if(manager_context[k].mounted_instances < oper->mounted_instances) {
353 while(manager_context[k].mounted_instances < oper->mounted_instances) {
354 rc = manager_actions_mount(&manager_context[k]);
355 if(rc != NTS_ERR_OK) {
356 asprintf(&oper->errmsg, "mount FAILED - instance %s", manager_context[k].instance[manager_context[k].mounted_instances].container.name);
357 log_error("%s\n", oper->errmsg);
358 return NTS_ERR_FAILED;
361 rc = manager_sr_update_context(&manager_context[k]);
362 if(rc != NTS_ERR_OK) {
363 log_error("manager_sr_update_context failed\n");