Add supoprt for D release use-case.
[sim/o1-interface.git] / ntsimulator / ntsim-ng / core / app / manager_operations.c
1 /*************************************************************************
2 *
3 * Copyright 2020 highstreet technologies GmbH and others
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 ***************************************************************************/
17
18 #define _GNU_SOURCE
19
20 #include "manager.h"
21 #include "utils/log_utils.h"
22 #include "utils/sys_utils.h"
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <assert.h>
26 #include <pthread.h>
27
28 #include "core/framework.h"
29 #include "core/session.h"
30
31 static manager_operation_t *manager_operations;
32 static pthread_mutex_t manager_operations_mutex;
33 static sem_t manager_operations_sem;
34
35 manager_protocol_type_t manager_port[65536];
36
37 static int manager_operations_execute(manager_operation_t *oper);
38
39 int manager_operations_init(void) {
40     manager_operations = 0;
41     if(pthread_mutex_init(&manager_operations_mutex, NULL) != 0) { 
42         log_error("mutex init has failed\n"); 
43         return NTS_ERR_FAILED; 
44     }
45
46     if(sem_init(&manager_operations_sem, 0, 0) != 0) {
47         log_error("sem init has failed\n"); 
48         return NTS_ERR_FAILED; 
49     }
50
51     //checkAL ar fi misto sa stim ce porturi sunt si ce porturi nu sunt available...
52     for(int i = 0; i < 1000; i++) {
53         manager_port[i] = MANAGER_PROTOCOL_UNAVAILABLE;
54     }
55
56     for(int i = 1000; i < 65536; i++) {
57         manager_port[i] = MANAGER_PROTOCOL_UNUSED;
58     }
59
60     return NTS_ERR_OK;
61 }
62
63 void manager_operations_loop(void) {
64     int rc;
65
66     struct timespec ts;
67     clock_gettime(CLOCK_REALTIME, &ts);
68     ts.tv_sec += 1;
69
70     if(sem_timedwait(&manager_operations_sem, &ts) == 0) {
71         int retries = 10;
72         while(retries) {
73             rc = sr_lock(session_running, NTS_MANAGER_MODULE);
74             if(rc == SR_ERR_OK) {
75                 break;
76             }
77             else {
78                 sleep(1);
79             }
80             retries--;
81         }
82
83         if(retries == 0) {
84             log_error("sr_lock failed\n");
85             //checkAL ce facem acum ?
86         }
87
88         pthread_mutex_lock(&manager_operations_mutex);
89
90         const char *status = "SUCCESS";
91         char errmsg[256];
92         errmsg[0] = 0;
93
94         while(manager_operations) {
95             //pop operation from list
96             manager_operation_t *oper = manager_operations;
97             manager_operations = manager_operations->next;       
98             
99             //if operation is RPC first update any *other* fields
100
101             rc = manager_operations_execute(oper);
102             if(rc != NTS_ERR_OK) {
103                 log_error("manager_operations_execute failed\n");
104                 status = "FAILED";
105                 strcpy(errmsg, oper->errmsg);
106                 manager_operations_free_oper(oper);
107                 break;
108             }
109
110             manager_operations_free_oper(oper);
111         }
112
113         for(int i = 0; i < docker_context_count; i++) {
114             //do any reconfig necesarry
115             for(int j = 0; j < manager_context[i].started_instances; j++) {
116                 if(manager_context[i].instance[j].is_configured == false) {
117                     rc = manager_actions_config_instance(&manager_context[i], &manager_context[i].instance[j]);
118                     if(rc != NTS_ERR_OK) {
119                         status = "reconfig FAILED";
120                         sprintf(errmsg, "reconfig FAILED - instance %s", manager_context[i].instance[j].container.name);
121                         log_error("%s\n", errmsg);
122                     }
123                 }
124             }
125         }
126
127         rc = manager_sr_on_last_operation_status(status, errmsg);
128         if(rc != NTS_ERR_OK) {
129             log_error("manager_sr_on_last_operation_status failed\n");
130         }
131
132         pthread_mutex_unlock(&manager_operations_mutex);
133         rc = sr_unlock(session_running, NTS_MANAGER_MODULE);    //release datastore
134         if(rc != SR_ERR_OK) {
135             log_error("sr_unlock failed\n");
136         }
137     }
138 }
139
140 void manager_operations_free(void) {
141     //terminate all containers
142     for(int i = 0; i < docker_context_count; i++) {
143         while(manager_context[i].started_instances) {
144             manager_actions_stop(&manager_context[i]);
145         }
146     }
147
148     sem_destroy(&manager_operations_sem);
149     pthread_mutex_destroy(&manager_operations_mutex);
150 }
151
152 manager_operation_t *manager_operations_new_oper(manager_operation_type_t type) {
153     manager_operation_t *new_oper = malloc(sizeof(manager_operation_t));
154     if(new_oper == 0) {
155         log_error("malloc failed\n");
156         return 0;
157     }
158
159     new_oper->type = type;
160
161     new_oper->ft_index = -1;
162     new_oper->function_type = 0;
163
164     new_oper->started_instances = -1;
165     new_oper->mounted_instances = -1;
166     
167     new_oper->docker_instance_name = 0;
168     new_oper->docker_version_tag = 0;
169     new_oper->docker_repository = 0;
170
171     new_oper->mount_point_addressing_method = 0;
172
173     new_oper->fault_generation.delay_period = 0;
174     new_oper->fault_generation.delay_period_count = -1;
175
176     new_oper->netconf.faults_enabled = -1;
177     new_oper->netconf.call_home = -1;
178
179     new_oper->ves.faults_enabled = -1;
180     new_oper->ves.pnf_registration = -1;
181     new_oper->ves.heartbeat_period = -1;
182
183     new_oper->errmsg = 0;
184     new_oper->next = 0;
185     
186     return new_oper;
187 }
188
189 int manager_operations_free_oper(manager_operation_t *oper) {
190     assert(oper);
191
192     free(oper->function_type);
193     free(oper->docker_instance_name);
194     free(oper->docker_repository);
195     free(oper->docker_version_tag);
196     free(oper->mount_point_addressing_method);
197     free(oper->errmsg);
198
199     free(oper);
200     return NTS_ERR_OK;
201 }
202
203 int manager_operations_begin(void) {
204     return pthread_mutex_lock(&manager_operations_mutex);
205 }
206
207 int manager_operations_add(manager_operation_t *oper) {
208     assert(oper);
209
210     if(manager_operations == 0) {
211         manager_operations = oper;
212     }
213     else {
214         manager_operation_t *h = manager_operations;
215         while(h->next) {
216             h = h->next;
217         }
218         h->next = oper;
219     }
220
221     return NTS_ERR_OK;
222 }
223
224 void manager_operations_finish_and_execute(void) {
225     pthread_mutex_unlock(&manager_operations_mutex);
226     sem_post(&manager_operations_sem);
227 }
228
229 void manager_operations_finish_with_error(void) {
230     while(manager_operations) {
231         manager_operation_t *h = manager_operations->next;
232         manager_operations_free_oper(manager_operations);
233         manager_operations = h;
234     }
235     pthread_mutex_unlock(&manager_operations_mutex);
236     sem_post(&manager_operations_sem);
237 }
238
239
240
241 int manager_operations_validate(manager_operation_t *oper) {
242     assert(oper);
243     
244     //prepopulate unset values
245     if(oper->docker_instance_name == 0) {
246         oper->docker_instance_name = strdup(manager_context[oper->ft_index].docker_instance_name);
247     }
248
249     if(oper->docker_repository == 0) {
250         oper->docker_repository = strdup(manager_context[oper->ft_index].docker_repository);
251     }
252
253     if(oper->docker_version_tag == 0) {
254         oper->docker_version_tag = strdup(manager_context[oper->ft_index].docker_version_tag);
255     }
256
257     if(oper->started_instances == -1) {
258         oper->started_instances = manager_context[oper->ft_index].started_instances;
259     }
260
261     if(oper->mounted_instances == -1) {
262         oper->mounted_instances = manager_context[oper->ft_index].mounted_instances;
263     }
264
265     //check docker image if exists
266     bool found = false;
267     for(int i = 0; i < docker_context[oper->ft_index].available_images_count; i++) {
268         if(strcmp(docker_context[oper->ft_index].available_images[i].repo, oper->docker_repository) == 0) {
269             if(strcmp(docker_context[oper->ft_index].available_images[i].tag, oper->docker_version_tag) == 0) {
270                 found = true;
271                 break;
272             }
273         }
274     }
275
276     if(found == false) {
277         log_error("could not find image: %s/%s:%s\n", oper->docker_repository, docker_context[oper->ft_index].image, oper->docker_version_tag);
278         return NTS_ERR_FAILED;
279     }
280
281     return NTS_ERR_OK;
282 }
283
284 static int manager_operations_execute(manager_operation_t *oper) {
285     assert(oper);
286
287     int k = oper->ft_index;
288     int rc;
289
290     //operation --> actions
291     if(manager_context[k].started_instances > oper->started_instances) {
292         //stop instances
293         while(manager_context[k].started_instances > oper->started_instances) {
294
295             rc = manager_actions_stop(&manager_context[k]);
296             if(rc != NTS_ERR_OK) {
297                 asprintf(&oper->errmsg, "stop FAILED - function-type %s", manager_context[k].function_type);
298                 log_error("%s\n", oper->errmsg);
299                 
300                 return NTS_ERR_FAILED;
301             }
302
303             rc = manager_sr_update_context(&manager_context[k]);
304             if(rc != NTS_ERR_OK) {
305                 log_error("manager_sr_update_context failed\n");
306             }
307         }
308
309     }
310     else if(manager_context[k].started_instances < oper->started_instances) {
311         //start instances     
312         while(manager_context[k].started_instances < oper->started_instances) {
313
314             rc = manager_actions_start(&manager_context[k]);
315             if(rc != NTS_ERR_OK) {
316                 asprintf(&oper->errmsg, "start FAILED - function-type %s", manager_context[k].function_type);
317                 log_error("%s\n", oper->errmsg);
318                 return NTS_ERR_FAILED;
319             }
320           
321             rc = manager_sr_update_context(&manager_context[k]);
322             if(rc != NTS_ERR_OK) {
323                 log_error("manager_sr_update_context failed\n");
324             }
325
326             rc = manager_actions_config_instance(&manager_context[k], &manager_context[k].instance[manager_context[k].started_instances - 1]);
327             if(rc != NTS_ERR_OK) {
328                 asprintf(&oper->errmsg, "config FAILED - instance %s", manager_context[k].instance[manager_context[k].started_instances - 1].container.name);
329                 log_error("%s\n", oper->errmsg);
330             }
331         }
332     }
333
334     if(manager_context[k].mounted_instances > oper->mounted_instances) {
335         //unmount instances
336         while(manager_context[k].mounted_instances > oper->mounted_instances) {
337             rc = manager_actions_unmount(&manager_context[k]);
338             if(rc != NTS_ERR_OK) {
339                 asprintf(&oper->errmsg, "unmount FAILED - instance %s", manager_context[k].instance[manager_context[k].mounted_instances - 1].container.name);
340                 log_error("%s\n", oper->errmsg);
341                 return NTS_ERR_FAILED;
342             }
343
344             rc = manager_sr_update_context(&manager_context[k]);
345             if(rc != NTS_ERR_OK) {
346                 log_error("manager_sr_update_context failed\n");
347             }
348         }
349
350     }
351     else if(manager_context[k].mounted_instances < oper->mounted_instances) {
352         //mount instances     
353         while(manager_context[k].mounted_instances < oper->mounted_instances) {
354             rc = manager_actions_mount(&manager_context[k]);
355             if(rc != NTS_ERR_OK) {
356                 asprintf(&oper->errmsg, "mount FAILED - instance %s", manager_context[k].instance[manager_context[k].mounted_instances].container.name);
357                 log_error("%s\n", oper->errmsg);
358                 return NTS_ERR_FAILED;
359             }
360             
361             rc = manager_sr_update_context(&manager_context[k]);
362             if(rc != NTS_ERR_OK) {
363                 log_error("manager_sr_update_context failed\n");
364             }
365         }
366     }
367
368     return NTS_ERR_OK;
369 }