Add NF addressing method as ENV.
[sim/o1-interface.git] / ntsimulator / ntsim-ng / core / app / manager_operations.c
1 /*************************************************************************
2 *
3 * Copyright 2020 highstreet technologies GmbH and others
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 ***************************************************************************/
17
18 #define _GNU_SOURCE
19
20 #include "manager.h"
21 #include "utils/log_utils.h"
22 #include "utils/sys_utils.h"
23 #include <stdio.h>
24 #include <stdlib.h>
25 #include <assert.h>
26 #include <pthread.h>
27
28 #include "core/framework.h"
29 #include "core/session.h"
30 #include "core/xpath.h"
31
32 static manager_operation_t *manager_operations;
33 static pthread_mutex_t manager_operations_mutex;
34 static sem_t manager_operations_sem;
35
36 manager_protocol_type_t manager_port[65536];
37
38 static int manager_operations_execute(manager_operation_t *oper);
39
40 int manager_operations_init(void) {
41     manager_operations = 0;
42     if(pthread_mutex_init(&manager_operations_mutex, NULL) != 0) { 
43         log_error("mutex init has failed\n"); 
44         return NTS_ERR_FAILED; 
45     }
46
47     if(sem_init(&manager_operations_sem, 0, 0) != 0) {
48         log_error("sem init has failed\n"); 
49         return NTS_ERR_FAILED; 
50     }
51
52     //checkAL ar fi misto sa stim ce porturi sunt si ce porturi nu sunt available...
53     for(int i = 0; i < 1000; i++) {
54         manager_port[i] = MANAGER_PROTOCOL_UNAVAILABLE;
55     }
56
57     for(int i = 1000; i < 65536; i++) {
58         manager_port[i] = MANAGER_PROTOCOL_UNUSED;
59     }
60
61     return NTS_ERR_OK;
62 }
63
64 void manager_operations_loop(void) {
65     int rc;
66
67     struct timespec ts;
68     clock_gettime(CLOCK_REALTIME, &ts);
69     ts.tv_sec += 1;
70
71     if(sem_timedwait(&manager_operations_sem, &ts) == 0) {
72         int retries = 10;
73         while(retries) {
74             rc = sr_lock(session_running, NTS_MANAGER_MODULE);
75             if(rc == SR_ERR_OK) {
76                 break;
77             }
78             else {
79                 sleep(1);
80             }
81             retries--;
82         }
83
84         if(retries == 0) {
85             log_error("sr_lock failed\n");
86             //checkAL ce facem acum ?
87         }
88
89         pthread_mutex_lock(&manager_operations_mutex);
90
91         const char *status = "SUCCESS";
92         char errmsg[256];
93         errmsg[0] = 0;
94
95         while(manager_operations) {
96             //pop operation from list
97             manager_operation_t *oper = manager_operations;
98             manager_operations = manager_operations->next;       
99             
100             //if operation is RPC first update any *other* fields
101
102             rc = manager_operations_execute(oper);
103             if(rc != NTS_ERR_OK) {
104                 log_error("manager_operations_execute failed\n");
105                 status = "FAILED";
106                 strcpy(errmsg, oper->errmsg);
107                 manager_operations_free_oper(oper);
108                 break;
109             }
110
111             manager_operations_free_oper(oper);
112         }
113
114         for(int i = 0; i < docker_context_count; i++) {
115             //do any reconfig necesarry
116             for(int j = 0; j < manager_context[i].started_instances; j++) {
117                 if(manager_context[i].instance[j].is_configured == false) {
118                     rc = manager_actions_config_instance(&manager_context[i], &manager_context[i].instance[j]);
119                     if(rc != NTS_ERR_OK) {
120                         status = "reconfig FAILED";
121                         sprintf(errmsg, "reconfig FAILED - instance %s", manager_context[i].instance[j].container.name);
122                         log_error("%s\n", errmsg);
123                     }
124                 }
125             }
126         }
127
128         rc = manager_sr_on_last_operation_status(status, errmsg);
129         if(rc != NTS_ERR_OK) {
130             log_error("manager_sr_on_last_operation_status failed\n");
131         }
132
133         pthread_mutex_unlock(&manager_operations_mutex);
134         rc = sr_unlock(session_running, NTS_MANAGER_MODULE);    //release datastore
135         if(rc != SR_ERR_OK) {
136             log_error("sr_unlock failed\n");
137         }
138     }
139 }
140
141 void manager_operations_free(void) {
142     //terminate all containers
143     for(int i = 0; i < docker_context_count; i++) {
144         while(manager_context[i].started_instances) {
145             manager_actions_stop(&manager_context[i]);
146         }
147     }
148
149     sem_destroy(&manager_operations_sem);
150     pthread_mutex_destroy(&manager_operations_mutex);
151 }
152
153 manager_operation_t *manager_operations_new_oper(manager_operation_type_t type) {
154     manager_operation_t *new_oper = malloc(sizeof(manager_operation_t));
155     if(new_oper == 0) {
156         log_error("malloc failed\n");
157         return 0;
158     }
159
160     new_oper->type = type;
161
162     new_oper->ft_index = -1;
163     new_oper->function_type = 0;
164
165     new_oper->started_instances = -1;
166     new_oper->mounted_instances = -1;
167     
168     new_oper->docker_instance_name = 0;
169     new_oper->docker_version_tag = 0;
170     new_oper->docker_repository = 0;
171
172     new_oper->mount_point_addressing_method = 0;
173
174     new_oper->fault_generation.delay_period = 0;
175     new_oper->fault_generation.delay_period_count = -1;
176
177     new_oper->netconf.faults_enabled = -1;
178     new_oper->netconf.call_home = -1;
179
180     new_oper->ves.faults_enabled = -1;
181     new_oper->ves.pnf_registration = -1;
182     new_oper->ves.heartbeat_period = -1;
183
184     new_oper->errmsg = 0;
185     new_oper->next = 0;
186     
187     return new_oper;
188 }
189
190 int manager_operations_free_oper(manager_operation_t *oper) {
191     assert(oper);
192
193     free(oper->function_type);
194     free(oper->docker_instance_name);
195     free(oper->docker_repository);
196     free(oper->docker_version_tag);
197     free(oper->mount_point_addressing_method);
198     free(oper->errmsg);
199
200     free(oper);
201     return NTS_ERR_OK;
202 }
203
204 int manager_operations_begin(void) {
205     return pthread_mutex_lock(&manager_operations_mutex);
206 }
207
208 int manager_operations_add(manager_operation_t *oper) {
209     assert(oper);
210
211     if(manager_operations == 0) {
212         manager_operations = oper;
213     }
214     else {
215         manager_operation_t *h = manager_operations;
216         while(h->next) {
217             h = h->next;
218         }
219         h->next = oper;
220     }
221
222     return NTS_ERR_OK;
223 }
224
225 void manager_operations_finish_and_execute(void) {
226     pthread_mutex_unlock(&manager_operations_mutex);
227     sem_post(&manager_operations_sem);
228 }
229
230 void manager_operations_finish_with_error(void) {
231     while(manager_operations) {
232         manager_operation_t *h = manager_operations->next;
233         manager_operations_free_oper(manager_operations);
234         manager_operations = h;
235     }
236     pthread_mutex_unlock(&manager_operations_mutex);
237     sem_post(&manager_operations_sem);
238 }
239
240
241
242 int manager_operations_validate(manager_operation_t *oper) {
243     assert(oper);
244     
245     //prepopulate unset values
246     if(oper->docker_instance_name == 0) {
247         oper->docker_instance_name = strdup(manager_context[oper->ft_index].docker_instance_name);
248     }
249
250     if(oper->docker_repository == 0) {
251         oper->docker_repository = strdup(manager_context[oper->ft_index].docker_repository);
252     }
253
254     if(oper->docker_version_tag == 0) {
255         oper->docker_version_tag = strdup(manager_context[oper->ft_index].docker_version_tag);
256     }
257
258     if(oper->started_instances == -1) {
259         oper->started_instances = manager_context[oper->ft_index].started_instances;
260     }
261
262     if(oper->mounted_instances == -1) {
263         oper->mounted_instances = manager_context[oper->ft_index].mounted_instances;
264     }
265
266     //check docker image if exists
267     bool found = false;
268     for(int i = 0; i < docker_context[oper->ft_index].available_images_count; i++) {
269         if(strcmp(docker_context[oper->ft_index].available_images[i].repo, oper->docker_repository) == 0) {
270             if(strcmp(docker_context[oper->ft_index].available_images[i].tag, oper->docker_version_tag) == 0) {
271                 found = true;
272                 break;
273             }
274         }
275     }
276
277     if(found == false) {
278         log_error("could not find image: %s/%s:%s\n", oper->docker_repository, docker_context[oper->ft_index].image, oper->docker_version_tag);
279         return NTS_ERR_FAILED;
280     }
281
282     return NTS_ERR_OK;
283 }
284
285 static int manager_operations_execute(manager_operation_t *oper) {
286     assert(oper);
287
288     int k = oper->ft_index;
289     int rc;
290
291     //operation --> actions
292     if(manager_context[k].started_instances > oper->started_instances) {
293         //stop instances
294         while(manager_context[k].started_instances > oper->started_instances) {
295
296             rc = manager_actions_stop(&manager_context[k]);
297             if(rc != NTS_ERR_OK) {
298                 asprintf(&oper->errmsg, "stop FAILED - function-type %s", manager_context[k].function_type);
299                 log_error("%s\n", oper->errmsg);
300                 
301                 return NTS_ERR_FAILED;
302             }
303
304             rc = manager_sr_update_context(&manager_context[k]);
305             if(rc != NTS_ERR_OK) {
306                 log_error("manager_sr_update_context failed\n");
307             }
308         }
309
310     }
311     else if(manager_context[k].started_instances < oper->started_instances) {
312         //start instances     
313         while(manager_context[k].started_instances < oper->started_instances) {
314
315             rc = manager_actions_start(&manager_context[k]);
316             if(rc != NTS_ERR_OK) {
317                 asprintf(&oper->errmsg, "start FAILED - function-type %s", manager_context[k].function_type);
318                 log_error("%s\n", oper->errmsg);
319                 return NTS_ERR_FAILED;
320             }
321           
322             rc = manager_sr_update_context(&manager_context[k]);
323             if(rc != NTS_ERR_OK) {
324                 log_error("manager_sr_update_context failed\n");
325             }
326
327             rc = manager_actions_config_instance(&manager_context[k], &manager_context[k].instance[manager_context[k].started_instances - 1]);
328             if(rc != NTS_ERR_OK) {
329                 asprintf(&oper->errmsg, "config FAILED - instance %s", manager_context[k].instance[manager_context[k].started_instances - 1].container.name);
330                 log_error("%s\n", oper->errmsg);
331             }
332         }
333     }
334
335     if(manager_context[k].mounted_instances > oper->mounted_instances) {
336         //unmount instances
337         while(manager_context[k].mounted_instances > oper->mounted_instances) {
338             rc = manager_actions_unmount(&manager_context[k]);
339             if(rc != NTS_ERR_OK) {
340                 asprintf(&oper->errmsg, "unmount FAILED - instance %s", manager_context[k].instance[manager_context[k].mounted_instances - 1].container.name);
341                 log_error("%s\n", oper->errmsg);
342                 return NTS_ERR_FAILED;
343             }
344
345             rc = manager_sr_update_context(&manager_context[k]);
346             if(rc != NTS_ERR_OK) {
347                 log_error("manager_sr_update_context failed\n");
348             }
349         }
350
351     }
352     else if(manager_context[k].mounted_instances < oper->mounted_instances) {
353         //mount instances     
354         while(manager_context[k].mounted_instances < oper->mounted_instances) {
355             rc = manager_actions_mount(&manager_context[k]);
356             if(rc != NTS_ERR_OK) {
357                 asprintf(&oper->errmsg, "mount FAILED - instance %s", manager_context[k].instance[manager_context[k].mounted_instances].container.name);
358                 log_error("%s\n", oper->errmsg);
359                 return NTS_ERR_FAILED;
360             }
361             
362             rc = manager_sr_update_context(&manager_context[k]);
363             if(rc != NTS_ERR_OK) {
364                 log_error("manager_sr_update_context failed\n");
365             }
366         }
367     }
368
369     return NTS_ERR_OK;
370 }