Add extra line about src files are part of RIC platform project
[ric-plt/sdl.git] / src / redis / asyncsentineldatabasediscovery.cpp
1 /*
2    Copyright (c) 2018-2019 Nokia.
3
4    Licensed under the Apache License, Version 2.0 (the "License");
5    you may not use this file except in compliance with the License.
6    You may obtain a copy of the License at
7
8        http://www.apache.org/licenses/LICENSE-2.0
9
10    Unless required by applicable law or agreed to in writing, software
11    distributed under the License is distributed on an "AS IS" BASIS,
12    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13    See the License for the specific language governing permissions and
14    limitations under the License.
15 */
16
17 /*
18  * This source code is part of the near-RT RIC (RAN Intelligent Controller)
19  * platform project (RICP).
20 */
21
22 #include <arpa/inet.h>
23 #include <boost/algorithm/string.hpp>
24 #include <iostream>
25 #include <string>
26 #include <vector>
27 #include <sdl/asyncstorage.hpp>
28 #include "private/abort.hpp"
29 #include "private/hostandport.hpp"
30 #include "private/redis/asyncsentineldatabasediscovery.hpp"
31 #include "private/redis/asynccommanddispatcher.hpp"
32 #include "private/redis/contents.hpp"
33 #include "private/redis/contentsbuilder.hpp"
34 #include "private/redis/reply.hpp"
35
36 using namespace shareddatalayer;
37 using namespace shareddatalayer::redis;
38
39 namespace
40 {
41     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
42                                                                           const DatabaseInfo& databaseInfo,
43                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
44                                                                           std::shared_ptr<Logger> logger,
45                                                                           bool usePermanentCommandCallbacks);
46
47     struct SubscribeReply
48     {
49         enum class Type { UNKNOWN, SUBSCRIBE_REPLY, NOTIFICATION };
50         Type type;
51         std::string message;
52
53         SubscribeReply(): type(Type::UNKNOWN) { }
54     };
55
56     std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger);
57
58     std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger);
59
60     std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger);
61 }
62
63 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
64                                                                std::shared_ptr<Logger> logger,
65                                                                const HostAndPort& sentinelAddress,
66                                                                const std::string& sentinelMasterName):
67         AsyncSentinelDatabaseDiscovery(engine,
68                                        logger,
69                                        sentinelAddress,
70                                        sentinelMasterName,
71                                        ::asyncCommandDispatcherCreator,
72                                        std::make_shared<redis::ContentsBuilder>(AsyncStorage::SEPARATOR))
73 {
74 }
75
76 AsyncSentinelDatabaseDiscovery::AsyncSentinelDatabaseDiscovery(std::shared_ptr<Engine> engine,
77                                                                std::shared_ptr<Logger> logger,
78                                                                const HostAndPort& sentinelAddress,
79                                                                const std::string& sentinelMasterName,
80                                                                const AsyncCommandDispatcherCreator& asyncCommandDispatcherCreator,
81                                                                std::shared_ptr<redis::ContentsBuilder> contentsBuilder):
82         engine(engine),
83         logger(logger),
84         databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({sentinelAddress}),
85                      DatabaseInfo::Type::SINGLE,
86                      boost::none,
87                      DatabaseInfo::Discovery::SENTINEL})),
88         sentinelMasterName(sentinelMasterName),
89         contentsBuilder(contentsBuilder),
90         subscribeRetryTimer(*engine),
91         subscribeRetryTimerDuration(std::chrono::seconds(1)),
92         masterInquiryRetryTimer(*engine),
93         masterInquiryRetryTimerDuration(std::chrono::seconds(1))
94 {
95     subscriber = asyncCommandDispatcherCreator(*engine,
96                                                databaseInfo,
97                                                contentsBuilder,
98                                                logger,
99                                                true);
100     dispatcher = asyncCommandDispatcherCreator(*engine,
101                                                databaseInfo,
102                                                contentsBuilder,
103                                                logger,
104                                                false);
105 }
106
107 AsyncSentinelDatabaseDiscovery::~AsyncSentinelDatabaseDiscovery()
108 {
109     if (subscriber)
110         subscriber->disableCommandCallbacks();
111     if (dispatcher)
112         dispatcher->disableCommandCallbacks();
113     stateChangedCb = nullptr;
114 }
115
116 void AsyncSentinelDatabaseDiscovery::setStateChangedCb(const StateChangedCb& cb)
117 {
118     stateChangedCb = cb;
119     subscriber->registerDisconnectCb([this]()
120             {
121                 subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
122             });
123     subscriber->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
124 }
125
126 void AsyncSentinelDatabaseDiscovery::clearStateChangedCb()
127 {
128     stateChangedCb = nullptr;
129 }
130
131 void AsyncSentinelDatabaseDiscovery::subscribeNotifications()
132 {
133     subscriber->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::subscribeAck,
134                                         this,
135                                         std::placeholders::_1,
136                                         std::placeholders::_2),
137                               "dummyNamespace", // Not meaningful for Sentinel
138                               contentsBuilder->build("SUBSCRIBE", "+switch-master"));
139 }
140
141 void AsyncSentinelDatabaseDiscovery::subscribeAck(const std::error_code& error,
142                                                   const Reply& reply)
143 {
144     if (!error)
145     {
146         auto subscribeReply = parseSubscribeReply(reply, *logger);
147         if (subscribeReply)
148         {
149             switch (subscribeReply->type)
150             {
151                 case (SubscribeReply::Type::SUBSCRIBE_REPLY):
152                 {
153                     dispatcher->waitConnectedAsync(std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
154                     break;
155                 }
156                 case (SubscribeReply::Type::NOTIFICATION):
157                 {
158                     auto hostAndPort = parseNotificationMessage(subscribeReply->message, *logger);
159                     if (hostAndPort)
160                     {
161                         auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
162                                                        DatabaseInfo::Type::SINGLE,
163                                                        boost::none,
164                                                        DatabaseInfo::Discovery::SENTINEL}));
165                         if (stateChangedCb)
166                             stateChangedCb(databaseInfo);
167                     }
168                     else
169                         SHAREDDATALAYER_ABORT("Notification message parsing error.");
170                     break;
171                 }
172                 case (SubscribeReply::Type::UNKNOWN):
173                 {
174                     logger->debug() << "Invalid SUBSCRIBE reply type." << std::endl;
175                     SHAREDDATALAYER_ABORT("Invalid SUBSCRIBE command reply type.");
176                 }
177             }
178         }
179         else
180             SHAREDDATALAYER_ABORT("SUBSCRIBE command reply parsing error.");
181     }
182     else
183         subscribeRetryTimer.arm(
184                 subscribeRetryTimerDuration,
185                 std::bind(&AsyncSentinelDatabaseDiscovery::subscribeNotifications, this));
186 }
187
188 void AsyncSentinelDatabaseDiscovery::sendMasterInquiry()
189 {
190     dispatcher->dispatchAsync(std::bind(&AsyncSentinelDatabaseDiscovery::masterInquiryAck,
191                                         this,
192                                         std::placeholders::_1,
193                                         std::placeholders::_2),
194                               "dummyNamespace", // Not meaningful for Sentinel
195                               contentsBuilder->build("SENTINEL", "get-master-addr-by-name", sentinelMasterName));
196 }
197
198 void AsyncSentinelDatabaseDiscovery::masterInquiryAck(const std::error_code& error,
199                                                       const Reply& reply)
200 {
201     if (!error)
202     {
203         auto hostAndPort = parseMasterInquiryReply(reply, *logger);
204         if (hostAndPort)
205         {
206             auto databaseInfo(DatabaseInfo({DatabaseConfiguration::Addresses({*hostAndPort}),
207                                            DatabaseInfo::Type::SINGLE,
208                                            boost::none,
209                                            DatabaseInfo::Discovery::SENTINEL}));
210             if (stateChangedCb)
211                 stateChangedCb(databaseInfo);
212         }
213         else
214             SHAREDDATALAYER_ABORT("Master inquiry reply parsing error.");
215     }
216     else
217     {
218         masterInquiryRetryTimer.arm(
219                 masterInquiryRetryTimerDuration,
220                 std::bind(&AsyncSentinelDatabaseDiscovery::sendMasterInquiry, this));
221     }
222 }
223
224 namespace
225 {
226     std::shared_ptr<AsyncCommandDispatcher> asyncCommandDispatcherCreator(Engine& engine,
227                                                                           const DatabaseInfo& databaseInfo,
228                                                                           std::shared_ptr<ContentsBuilder> contentsBuilder,
229                                                                           std::shared_ptr<Logger> logger,
230                                                                           bool usePermanentCommandCallbacks)
231     {
232         return AsyncCommandDispatcher::create(engine,
233                                               databaseInfo,
234                                               contentsBuilder,
235                                               usePermanentCommandCallbacks,
236                                               logger,
237                                               true);
238     }
239
240     std::unique_ptr<SubscribeReply> parseSubscribeReply(const Reply& reply, Logger& logger)
241     {
242         // refer to: https://redis.io/topics/pubsub#format-of-pushed-messages
243         auto replyType = reply.getType();
244         if (replyType == Reply::Type::ARRAY)
245         {
246             auto& replyVector(*reply.getArray());
247             auto firstElementType = replyVector[0]->getType();
248             if (firstElementType == Reply::Type::STRING)
249             {
250                 auto subscribeReply = std::unique_ptr<SubscribeReply>(new SubscribeReply());
251                 auto kind(replyVector[0]->getString()->str);
252                 if (kind == "subscribe")
253                 {
254                     subscribeReply->type = SubscribeReply::Type::SUBSCRIBE_REPLY;
255                     return subscribeReply;
256                 }
257                 else if (kind == "message")
258                 {
259                     subscribeReply->type = SubscribeReply::Type::NOTIFICATION;
260                     auto thirdElementType = replyVector[2]->getType();
261                     if (thirdElementType == Reply::Type::STRING)
262                     {
263                         subscribeReply->message = replyVector[2]->getString()->str;
264                         return subscribeReply;
265                     }
266                     else
267                         logger.debug() << "Invalid message field type in SUBSCRIBE reply: " << kind << std::endl;
268                 }
269                 else
270                     logger.debug() << "Invalid kind field in SUBSCRIBE reply: " << kind << std::endl;
271             }
272             else
273                 logger.debug() << "Invalid first element type in SUBSCRIBE reply: "
274                                << static_cast<int>(firstElementType) << std::endl;
275         }
276         else
277             logger.debug() << "Invalid SUBSCRIBE reply type: "
278                            << static_cast<int>(replyType) << std::endl;
279         return nullptr;
280     }
281
282     std::unique_ptr<HostAndPort> parseMasterInquiryReply(const Reply& reply, Logger& logger)
283     {
284         auto replyType = reply.getType();
285         if (replyType == Reply::Type::ARRAY)
286         {
287             auto& replyVector(*reply.getArray());
288             auto hostElementType = replyVector[0]->getType();
289             if (hostElementType == Reply::Type::STRING)
290             {
291                 auto host(replyVector[0]->getString()->str);
292                 auto portElementType = replyVector[1]->getType();
293                 if (portElementType == Reply::Type::STRING)
294                 {
295                     auto port(replyVector[1]->getString()->str);
296                     try
297                     {
298                         return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
299                     }
300                     catch (const std::exception& e)
301                     {
302                         logger.debug() << "Invalid host or port in master inquiry reply, host: "
303                                        << host << ", port: " << port
304                                        << ", exception: " << e.what() << std::endl;
305                     }
306                 }
307                 else
308                     logger.debug() << "Invalid port element type in master inquiry reply: "
309                                    << static_cast<int>(portElementType) << std::endl;
310             }
311             else
312                 logger.debug() << "Invalid host element type in master inquiry reply: "
313                                << static_cast<int>(hostElementType) << std::endl;
314         }
315         else
316             logger.debug() << "Invalid master inquiry reply type: "
317                            << static_cast<int>(replyType) << std::endl;
318         return nullptr;
319     }
320
321     std::unique_ptr<HostAndPort> parseNotificationMessage(const std::string& message, Logger& logger)
322     {
323         std::vector<std::string> splittedMessage;
324         boost::split(splittedMessage, message, boost::is_any_of(" "));
325         if (splittedMessage.size() == 5)
326         {
327             auto host = splittedMessage[3];
328             auto port = splittedMessage[4];
329             try
330             {
331                 return std::unique_ptr<HostAndPort>(new HostAndPort(host+":"+port, 0));;
332             }
333             catch (const std::exception& e)
334             {
335                 logger.debug() << "Invalid host or port in notification message, host: "
336                                << host << ", port: " << port
337                                << ", exception: " << e.what() << std::endl;
338             }
339         }
340         else
341             logger.debug() << "Invalid structure in notification message, size: " << splittedMessage.size() << std::endl;
342         return nullptr;
343     }
344 }