Development of NETCONF RPCs for tr-069 adapter to
[oam/tr069-adapter.git] / acs / requestprocessor / src / main / java / org / commscope / tr069adapter / acs / requestprocessor / impl / TR069RequestProcessEngine.java
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : tr-069-adapter
4  * =================================================================================================
5  * Copyright (C) 2020 CommScope Inc Intellectual Property.
6  * =================================================================================================
7  * This tr-069-adapter software file is distributed by CommScope Inc under the Apache License,
8  * Version 2.0 (the "License"); you may not use this file except in compliance with the License. You
9  * may obtain a copy of the License at
10  *
11  * http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
14  * either express or implied. See the License for the specific language governing permissions and
15  * limitations under the License.
16  * ===============LICENSE_END=======================================================================
17  */
18
19 package org.commscope.tr069adapter.acs.requestprocessor.impl;
20
21 import static org.commscope.tr069adapter.acs.common.utils.AcsConstants.SESSION_ID;
22
23 import java.util.Date;
24 import java.util.List;
25
26 import org.commscope.tr069adapter.acs.common.DeviceDetails;
27 import org.commscope.tr069adapter.acs.common.DeviceInform;
28 import org.commscope.tr069adapter.acs.common.DeviceRPCRequest;
29 import org.commscope.tr069adapter.acs.common.DeviceRPCResponse;
30 import org.commscope.tr069adapter.acs.common.OperationCode;
31 import org.commscope.tr069adapter.acs.common.OperationDetails;
32 import org.commscope.tr069adapter.acs.common.OperationResponse;
33 import org.commscope.tr069adapter.acs.common.dto.CustomOperationCode;
34 import org.commscope.tr069adapter.acs.common.dto.DeviceOperationRequestDetails;
35 import org.commscope.tr069adapter.acs.common.dto.TR069DeviceDetails;
36 import org.commscope.tr069adapter.acs.common.dto.TR069InformType;
37 import org.commscope.tr069adapter.acs.common.dto.TR069OperationCode;
38 import org.commscope.tr069adapter.acs.common.dto.TR069OperationDetails;
39 import org.commscope.tr069adapter.acs.common.exception.DeviceOperationException;
40 import org.commscope.tr069adapter.acs.common.exception.SessionConcurrentAccessException;
41 import org.commscope.tr069adapter.acs.common.exception.SessionManagerException;
42 import org.commscope.tr069adapter.acs.common.exception.TR069EventProcessingException;
43 import org.commscope.tr069adapter.acs.common.faults.AcsFaultCode;
44 import org.commscope.tr069adapter.acs.common.inform.BootInform;
45 import org.commscope.tr069adapter.acs.common.inform.BootstrapInform;
46 import org.commscope.tr069adapter.acs.common.response.DeviceInformResponse;
47 import org.commscope.tr069adapter.acs.common.utils.ErrorCode;
48 import org.commscope.tr069adapter.acs.requestprocessor.DeviceOperationInterface;
49 import org.commscope.tr069adapter.acs.requestprocessor.dao.DeviceRPCRequestRepositoryHelper;
50 import org.commscope.tr069adapter.acs.requestprocessor.dao.DeviceRepository;
51 import org.commscope.tr069adapter.acs.requestprocessor.dto.CustomOperationData;
52 import org.commscope.tr069adapter.acs.requestprocessor.dto.SessionDTO;
53 import org.commscope.tr069adapter.acs.requestprocessor.dto.SessionState;
54 import org.commscope.tr069adapter.acs.requestprocessor.dto.TR069RequestProcessorData;
55 import org.commscope.tr069adapter.acs.requestprocessor.entity.TR069DeviceEntity;
56 import org.commscope.tr069adapter.acs.requestprocessor.entity.TR069DeviceRPCRequestEntity;
57 import org.commscope.tr069adapter.acs.requestprocessor.helper.TR069RequestProcessEngineHelper;
58 import org.commscope.tr069adapter.acs.requestprocessor.util.TR069RequestProcessorUtility;
59 import org.slf4j.Logger;
60 import org.slf4j.LoggerFactory;
61 import org.springframework.beans.factory.annotation.Autowired;
62 import org.springframework.stereotype.Component;
63
64 @Component
65 public class TR069RequestProcessEngine extends TR069RequestProcessEngineHelper {
66
67   private static final String PENDING_RPC_CHECK =
68       "Checking if any pending Device RPC requests exists for the device";
69
70   private static final Logger logger = LoggerFactory.getLogger(TR069RequestProcessEngine.class);
71
72   @Autowired
73   TR069EventNotificationService tr069EventNotificationService;
74
75   @Autowired
76   DeviceOperationInterface deviceOperationInterface;
77
78   @Autowired
79   protected DeviceRPCRequestRepositoryHelper deviceRPCRequestRepositoryHelper;
80
81   @Autowired
82   private DeviceRepository deviceRepository;
83
84   /**
85    * @param deviceRPCRequest
86    * @throws TR069EventProcessingException
87    * @throws SessionConcurrentAccessException
88    */
89   public void processDeviceRPCRequest(DeviceRPCRequest deviceRPCRequest)
90       throws TR069EventProcessingException, SessionConcurrentAccessException {
91
92     DeviceRPCResponse deviceRPCResponse = null;
93     String deviceId = null;
94
95     try {
96       if (deviceRPCRequest == null) {
97         TR069EventProcessingException ex =
98             new TR069EventProcessingException(ErrorCode.INVALID_NBI_REQUEST);
99         logger.error(ex.getMessage());
100         throw ex;
101       }
102
103       Long operationId = deviceRPCRequest.getOperationId();
104       logger.info("A Mapper request is received with operationID: {}", operationId);
105       TR069DeviceDetails tr069DeviceDetails = null;
106       deviceId = deviceRPCRequest.getDeviceDetails().getDeviceId();
107       try {
108         tr069DeviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);
109         deviceRPCResponse = checkForDeviceAvailabilityRequest(deviceRPCRequest, tr069DeviceDetails);
110         if (null != deviceRPCResponse) {
111           return;
112         }
113       } catch (DeviceOperationException | SessionManagerException deo) {
114         logger.error(deo.getMessage());
115         deviceRPCResponse = tr069RequestProcessEngineUtility.buildAbortedOperationresult(
116             tr069DeviceDetails, deviceRPCRequest, AcsFaultCode.FAULT_CODE_8000);
117         return;
118       }
119
120       try {
121         SessionDTO sessionDTO = acquireSessionLockWithRetryOnFailure(deviceId, operationId);
122
123         logger.debug("Persisting the Device RPC request, with operation ID: {}", operationId);
124         List<TR069DeviceRPCRequestEntity> tr069DeviceRPCRequestEntities =
125             TR069RequestProcessorUtility.convertToEntity(deviceRPCRequest);
126         deviceRPCRequestRepositoryHelper.saveAll(tr069DeviceRPCRequestEntities);
127         logger.info("Successfully persisted the Device RPC request");
128
129         if (sessionDTO != null) {
130           if (SessionState.TERMINATED.equals(sessionDTO.getSessionState())) {
131             logger.debug("No active session exists, hence requesting for Connection request");
132             requestForConnectionRequestInform(tr069DeviceDetails);
133
134             // Start Request Timer
135             startDeviceRPCRequestTimer(deviceId, deviceRPCRequest.getOperationId(),
136                 deviceRPCRequest.getOptions().getExecutionTimeout());
137           } else {
138             logger.debug(
139                 "Session is in processing state, Will be notified to session manager to pick the request on session availability");
140           }
141         } else {
142           logger.warn(
143               "The device is not activated yet, hence the NBI Operation result cannot be processed!");
144           deviceRPCResponse = tr069RequestProcessEngineUtility.buildAbortedOperationresult(
145               tr069DeviceDetails, deviceRPCRequest, AcsFaultCode.FAULT_CODE_8001);
146         }
147       } catch (SessionConcurrentAccessException scae) {
148         throw scae;
149       } catch (Exception e) {
150         logger.error("An unknown exception occurred while processing the NBI request, Reason: {}",
151             e.getMessage());
152         deviceRPCResponse = tr069RequestProcessEngineUtility.buildAbortedOperationresult(
153             tr069DeviceDetails, deviceRPCRequest, AcsFaultCode.FAULT_CODE_8004);
154       }
155     } finally {
156       if (deviceRPCResponse != null) {
157         logger.debug("Sending failed operation result for this NBI request");
158         tr069EventNotificationService.sendOperationResultToNBI(deviceRPCResponse);
159         // Marking the NBI Request as processed
160         deviceRPCRequestRepositoryHelper.markDeviceRPCRequestAsProcessed(deviceId,
161             deviceRPCRequest.getOperationId());
162       }
163     }
164   }
165
166   private DeviceRPCResponse checkForDeviceAvailabilityRequest(DeviceRPCRequest deviceRPCRequest,
167       TR069DeviceDetails tr069DeviceDetails) throws SessionManagerException {
168     DeviceRPCResponse deviceRPCResponse = null;
169
170     if (!deviceRPCRequest.getOpDetails().getOpCode().equals(CustomOperationCode.CONNECT)) {
171       return deviceRPCResponse;
172     }
173
174     SessionDTO sessionDTO = getSession(tr069DeviceDetails.getDeviceId());
175     tr069DeviceDetails.setCrRetryCount(1);
176     if (null != sessionDTO && !SessionState.TERMINATED.equals(sessionDTO.getSessionState())) {
177       logger.debug("Device is reachable as device tr069 session is in {} state.",
178           sessionDTO.getSessionState());
179
180       deviceRPCResponse = new DeviceRPCResponse();
181       deviceRPCResponse.setDeviceDetails(tr069DeviceDetails);
182       deviceRPCResponse.setOperationId(deviceRPCRequest.getOperationId());
183
184       OperationResponse operationResponse = new OperationResponse();
185       // device reachable...change value 1 to some constant or enum
186       operationResponse.setStatus(TR069RequestProcessorUtility.DEVICE_REACHABLE_STATUS_CODE);
187       operationResponse.setOperationCode(deviceRPCRequest.getOpDetails().getOpCode());
188
189       deviceRPCResponse.setOperationResponse(operationResponse);
190     }
191
192     return deviceRPCResponse;
193   }
194
195   /**
196    * Common Step 1. Since there can exist only one Inform from any device, which will be the
197    * initiator of the session, following steps to be followed a. Stop the session timer for this
198    * device if any running. b. Get the lock by reading the session record for this device from DB
199    * and load the session object in Thread Local Cache c. Create a new session id from
200    * SessionManager, and update the session object with new Session id and change the state to
201    * PROCESSING. 2. Read the Device record from the DB, and load the device DTO also in the
202    * ThreadLocalCache
203    * 
204    * Common Notification Specific Step 1. Take the connection request URL from the
205    * deviceNotification object 2. Update the Device DTO object with connection request URL,
206    * swVersion, hwVersion if there is a difference. Update the last updated time if updated 3. Send
207    * this Inform to the NBI, by calling the ProcessDeviceInform method on TR069NBIService module. 4.
208    * Create the Inform response Object and update the sessionID used for this transaction in the
209    * response which will be used in the cookie of the HTTP response 5. Post the message into
210    * Response Queue 6. Change the session state to LOCKED 7. Save the Session and device records
211    * with the values in ThreadLocalCache 8. Start the session timer and default request timer (As
212    * configured for all the requests from device) 8. All the above steps to be executed in a single
213    * transaction.
214    * 
215    * @param deviceNotification
216    * @return
217    * @throws SessionConcurrentAccessException
218    */
219   public DeviceInformResponse processDeviceInform(DeviceInform deviceNotification)
220       throws SessionConcurrentAccessException {
221
222     String deviceId = deviceNotification.getDeviceDetails().getDeviceId();
223     TR069InformType notificationType = (TR069InformType) deviceNotification.getInformType();
224     logger.info("Processing the Device Inform Event: '{}'", notificationType.getNotificationCode());
225     String newSessionId = null;
226     SessionDTO session = null;
227     TR069DeviceDetails deviceDetails = null;
228     DeviceInformResponse informResponse = null;
229
230     try {
231
232       SessionDTO sessionDTO = getSession(deviceId);
233       if (sessionDTO != null && !SessionState.TERMINATED.equals(sessionDTO.getSessionState())) {
234         String sessionId = sessionDTO.getSessionId();
235         logger.debug(
236             "The session with session id {} is not terminated, hence stopping the associated timer",
237             sessionId);
238         stopSessionTimer(sessionDTO.getSessionId());
239       }
240
241       // To stop the request timer if any running for this device, and send failed operation
242       // result for any such pending cases. Requests pending in DB should not be cleared
243
244       /*
245        * Read any pending records in TR069_NBI_REQUEST table for this device. Send abort operation
246        * result for all the pending requests Delete the records from the TR069_NBI_Request table for
247        * this device.
248        */
249       if (deviceNotification instanceof BootstrapInform
250           || deviceNotification instanceof BootInform) {
251         sendAbortedOperationResultForPendingRPCRequests(deviceNotification.getDeviceDetails(),
252             notificationType);
253       }
254
255       session = acquireSessionLock(deviceId, notificationType, true);
256       deviceDetails = getPersistedDeviceDetails(deviceId, deviceNotification);
257
258       newSessionId = session.getSessionId();
259       logger.debug("The session id generated to process the device notification request is: {} ",
260           newSessionId);
261
262       initThreadLocalCache(deviceDetails, session);
263       TR069RequestProcessorData tr069RequestProcessorData = getTR069RequestProcessorData();
264       updateSessionOnDeviceNotification(tr069RequestProcessorData, newSessionId);
265
266       logger.debug("Sending the Device Inform Event to the Mapper");
267       tr069EventNotificationService.sendDeviceInformToNBI(deviceNotification);
268
269       updateDeviceDetailsFromInform(tr069RequestProcessorData, deviceNotification);
270
271       logger.debug("Updating the session for the device with newly generated session id");
272       changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);
273       updateSession(session);
274
275       // Start session timer - Get a default timeout for session.
276       startSessionTimer(session.getSessionId());
277
278       informResponse =
279           new DeviceInformResponse(newSessionId, deviceNotification.getDeviceDetails());
280     } catch (
281
282     DeviceOperationException doe) {
283       logger.error(doe.getMessage());
284     } catch (SessionConcurrentAccessException scae) {
285       throw scae;
286     } catch (Exception e) {
287       throw new SessionConcurrentAccessException(ErrorCode.UNKNOWN_ERROR, e.getMessage());
288     }
289     return informResponse;
290   }
291
292   /**
293    * 
294    * 1. Stop the request timer for this device using the session ID received in the cookie of the
295    * request 2. Read the session record from the session table into ThreadLocalCache 3. Move the
296    * session state to PROCESSING and update the OPERATION_ID as null. 4a. Session Manager to check
297    * if any pending request being notified for the device using in memory cache - Not planned for
298    * this Drop 4. As an interim solution for drop1, Check the TR069_NBI_REQUEST table if any request
299    * is pending.
300    * 
301    * if (anyPendingRequestExists) { 1. Pick the request with the least created_time 2. Create the
302    * response object and update the sessionID used for this transaction in the response which will
303    * be used in the cookie of the HTTP response 3. Post the response object into Response Queue 4.
304    * Change the session state to LOCKED and update the OPERATION_ID in the session table with the
305    * NBI requests OPERATION_ID. 5. Save the Session record with the values in ThreadLocalCache 6.
306    * Start the session timer and request timer (available in the request) } else { 1. Move the
307    * session state to PROCESSING and update the OPERATION_ID as null. 2. Save the Session record
308    * with the values in ThreadLocalCache. 3. Start the session timer. }
309    * 
310    * 6. All the above steps to be executed in a single transaction
311    * 
312    * @param deviceRPCResponse
313    * @return
314    * @throws SessionConcurrentAccessException
315    */
316   public DeviceRPCRequest processDeviceRPCResponse(DeviceRPCResponse deviceRPCResponse)
317       throws SessionConcurrentAccessException {
318     TR069DeviceDetails deviceDetails = (TR069DeviceDetails) deviceRPCResponse.getDeviceDetails();
319     String deviceId = deviceDetails.getDeviceId();
320     logger.info("Processing the operation response from device");
321
322     SessionDTO session;
323     DeviceRPCRequest deviceRPCRequest = null;
324     try {
325       session = acquireSessionLock(deviceId, null, false);
326       String newSessionId = session.getSessionId();
327       logger.debug("The session id used to process the device RPC response is: {}", newSessionId);
328
329       initThreadLocalCache(deviceDetails, session);
330       TR069RequestProcessorData tr069RequestProcessorData = getTR069RequestProcessorData();
331       updateSessionOnDeviceNotification(tr069RequestProcessorData, newSessionId);
332
333       Long operationId = deviceRPCResponse.getOperationId();
334
335       deviceRPCRequest = getNextRPCRequest(deviceId, operationId);
336
337       if (deviceRPCRequest != null) {
338         OperationDetails opDetails = deviceRPCRequest.getOpDetails();
339         if (opDetails != null) {
340           OperationCode opCode = opDetails.getOpCode();
341           if (opCode instanceof CustomOperationCode) {
342             CustomOperationCode customOperationCode = (CustomOperationCode) opCode;
343             String customOperationCodeName = customOperationCode.name();
344             logger.info(
345                 "The Device RPC request is of custom type, the custom operation to be performed is: {}",
346                 customOperationCodeName);
347             String jndiName = customOperationCode.getJndiName();
348             CustomOperationData customOperationData =
349                 new CustomOperationData(deviceDetails, deviceRPCResponse, deviceRPCRequest);
350             customOperationData = executeCustomOperation(jndiName, customOperationData);
351
352             DeviceRPCRequest operationRequest = customOperationData.getDeviceRPCRequest();
353             deviceRPCResponse = customOperationData.getDeviceRPCResponse();
354             if (operationRequest != null) {
355               return handleOperationRequest(deviceRPCResponse, session, deviceRPCRequest,
356                   newSessionId, tr069RequestProcessorData, operationRequest);
357             } else {
358               logger.debug(PENDING_RPC_CHECK);
359               deviceRPCRequest =
360                   deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);
361             }
362           }
363         }
364       }
365
366       if (deviceRPCRequest != null) {
367         logger.info("A pending Device RPC request exists for the device with operation ID: {}",
368             deviceRPCRequest.getOperationId());
369         updateSessionCurOpId(tr069RequestProcessorData, deviceRPCRequest.getOperationId());
370         changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);
371
372         deviceRPCRequest.addContextParam(SESSION_ID, newSessionId);
373
374       } else {
375         logger.info(
376             "No pending Device RPC request exists for the device, hence empty response will be sent to the device");
377         logger.debug("Updating the session to terminated state");
378         // To stop the session timer if any running for this device.
379         stopSessionTimer(newSessionId);
380         changeSessionState(tr069RequestProcessorData, SessionState.TERMINATED);
381       }
382
383       // To stop the request timer if any running for this operation.
384       stopDeviceRPCRequestTimer(deviceId, deviceRPCResponse.getOperationId());
385
386       // Sending the operation response to NBI
387       logger.debug("Sending the Device RPC Response to the Mapper");
388       tr069EventNotificationService.sendOperationResultToNBI(deviceRPCResponse);
389
390       updateSession(session);
391     } catch (DeviceOperationException doe) {
392       logger.error(doe.getMessage());
393     } catch (SessionConcurrentAccessException scae) {
394       throw scae;
395     }
396
397     return deviceRPCRequest;
398   }
399
400   private DeviceRPCRequest handleOperationRequest(DeviceRPCResponse deviceRPCResponse,
401       SessionDTO session, DeviceRPCRequest deviceRPCRequest, String newSessionId,
402       TR069RequestProcessorData tr069RequestProcessorData, DeviceRPCRequest operationRequest) {
403     operationRequest.addContextParam(SESSION_ID, newSessionId);
404     updateSessionCurOpId(tr069RequestProcessorData, deviceRPCRequest.getOperationId());
405     changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);
406     updateSession(session);
407     if (deviceRPCResponse != null && operationRequest.getOperationId() != null
408         && !operationRequest.getOperationId().equals(deviceRPCResponse.getOperationId())) {
409       logger
410           .debug("Sending the Device RPC response for a configure Multiple object prior operation");
411       // Sending the operation response to NBI
412       tr069EventNotificationService.sendOperationResultToNBI(deviceRPCResponse);
413     }
414     return operationRequest;
415   }
416
417   /**
418    * 
419    * 1. Stop the request timer for this device using the session ID received in the cookie of the
420    * request 2. Read the session record from the session table into ThreadLocalCache 3. Move the
421    * session state to PROCESSING and update the OPERATION_ID as null. 4a. Session Manager to check
422    * if any pending request being notified for the device using in memory cache - Not planned for
423    * this Drop 4. As an interim solution for drop1, Check the TR069_NBI_REQUEST table if any request
424    * is pending.
425    * 
426    * if (anyPendingRequestExists) { 1. Pick the request with the least created_time 2. Create the
427    * response object and update the sessionID used for this transaction in the response which will
428    * be used in the cookie of the HTTP response 3. Post the response object into Response Queue 4.
429    * Change the session state to LOCKED and update the OPERATION_ID in the session table with the
430    * NBI requests OPERATION_ID. 5. Save the Session record with the values in ThreadLocalCache 6.
431    * Start the session timer and request timer (available in the request) } else { 1. Move the
432    * session state to PROCESSING and update the OPERATION_ID as null. 2. Save the Session record
433    * with the values in ThreadLocalCache. 3. Start the session timer. }
434    * 
435    * 6. All the above steps to be executed in a single transaction
436    * 
437    * @param deviceDetails
438    * @return
439    * @throws SessionConcurrentAccessException
440    */
441   public DeviceRPCRequest processEmptyDeviceRequest(TR069DeviceDetails deviceDetails)
442       throws SessionConcurrentAccessException {
443     String deviceId = deviceDetails.getDeviceId();
444     logger.info("Processing the empty request from device");
445
446     SessionDTO session;
447     DeviceRPCRequest nbiDeviceOperationRequest = null;
448     try {
449       session = acquireSessionLock(deviceId, null, false);
450       String newSessionId = session.getSessionId();
451       logger.debug("The session id used to process the empty device request is: {}", newSessionId);
452
453       initThreadLocalCache(deviceDetails, session);
454       TR069RequestProcessorData tr069RequestProcessorData = getTR069RequestProcessorData();
455       updateSessionOnDeviceNotification(tr069RequestProcessorData, newSessionId);
456
457       logger.debug(PENDING_RPC_CHECK);
458       nbiDeviceOperationRequest =
459           deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);
460       if (nbiDeviceOperationRequest != null) {
461         Long operationId = nbiDeviceOperationRequest.getOperationId();
462         OperationDetails opDetails = nbiDeviceOperationRequest.getOpDetails();
463         if (opDetails != null) {
464           OperationCode opCode = opDetails.getOpCode();
465           if (opCode instanceof CustomOperationCode) {
466             CustomOperationCode customOperationCode = (CustomOperationCode) opCode;
467             String customOperationCodeName = customOperationCode.name();
468             logger.info(
469                 "The Device RPC operation request is of custom type, the custom operation to be performed is: {}",
470                 customOperationCodeName);
471             String jndiName = customOperationCode.getJndiName();
472             CustomOperationData customOperationData =
473                 new CustomOperationData(deviceDetails, null, nbiDeviceOperationRequest);
474             customOperationData = executeCustomOperation(jndiName, customOperationData);
475
476             DeviceRPCRequest operationRequest = customOperationData.getDeviceRPCRequest();
477             if (operationRequest != null) {
478               operationRequest.addContextParam(SESSION_ID, newSessionId);
479               updateSessionCurOpId(tr069RequestProcessorData, operationId);
480               changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);
481               updateSession(session);
482               return operationRequest;
483             } else {
484               logger.debug(PENDING_RPC_CHECK);
485               nbiDeviceOperationRequest =
486                   deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);
487             }
488           }
489         }
490       }
491
492       if (nbiDeviceOperationRequest != null) {
493         Long operationId = nbiDeviceOperationRequest.getOperationId();
494         logger.info("A pending Device RPC request exists for the device with operation ID: {}",
495             operationId);
496         updateSessionCurOpId(tr069RequestProcessorData, operationId);
497         changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);
498
499         nbiDeviceOperationRequest.addContextParam(SESSION_ID, newSessionId);
500
501       } else {
502         logger.info(
503             "No pending Device RPC request exists for the device, hence empty device response will be sent to the device");
504         logger.debug("Updating the session to terminated state");
505         // To stop the session timer if any running for this device.
506         stopSessionTimer(newSessionId);
507         changeSessionState(tr069RequestProcessorData, SessionState.TERMINATED);
508       }
509
510       updateSession(session);
511     } catch (DeviceOperationException doe) {
512       logger.error(doe.getMessage());
513     } catch (SessionConcurrentAccessException scae) {
514       throw scae;
515     }
516
517     return nbiDeviceOperationRequest;
518   }
519
520   /**
521    * @param sessionId
522    * @return
523    * @throws SessionManagerException
524    */
525   public DeviceOperationRequestDetails getOpRequestDetailsBySessionId(String sessionId)
526       throws SessionManagerException {
527     DeviceOperationRequestDetails deviceOperationRequestDetails =
528         new DeviceOperationRequestDetails();
529
530     logger.debug("Fetching Operation request details for session: {}", sessionId);
531     SessionDTO session = getSessionBySessionId(sessionId);
532     String deviceId = session.getDeviceId();
533     TR069DeviceDetails deviceDetails = null;
534     try {
535       deviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);
536       if (session.getCurrentOperationId() == null) {
537         logger.debug("There exists no pending operation request for the session: {}", sessionId);
538       } else {
539         sessionId = sessionId.replaceAll("[\n|\r|\t]", "_");
540         logger.debug("There exists pending operation request for the session: {}", sessionId);
541         List<TR069DeviceRPCRequestEntity> tr069DeviceRPCRequestEntityList =
542             deviceRPCRequestRepositoryHelper.findByDeviceIdAndOperationId(deviceId,
543                 session.getCurrentOperationId());
544         if (tr069DeviceRPCRequestEntityList == null) {
545           SessionManagerException ex =
546               new SessionManagerException(ErrorCode.SESSION_EXPIRED, sessionId);
547           logger.error(ex.getMessage());
548           throw ex;
549         }
550         DeviceRPCRequest deviceRPCRequest =
551             TR069RequestProcessorUtility.convertToDTO(tr069DeviceRPCRequestEntityList);
552         OperationCode opCode = deviceRPCRequest.getOpDetails().getOpCode();
553
554         String operationName = null;
555
556         if (opCode instanceof TR069OperationCode) {
557           operationName = ((TR069OperationCode) opCode).name();
558         } else {
559           operationName = ((CustomOperationCode) opCode).name();
560           TR069OperationDetails tr069OperationDetails =
561               (TR069OperationDetails) deviceRPCRequest.getOpDetails();
562           opCode = getCustomOperationCode(tr069OperationDetails);
563         }
564         logger.info("The pending operation request for the session is of operation {}",
565             operationName);
566         deviceOperationRequestDetails.setOpCode(opCode);
567         deviceOperationRequestDetails.setOperationId(deviceRPCRequest.getOperationId());
568       }
569       deviceOperationRequestDetails.setDeviceDetails(deviceDetails);
570     } catch (DeviceOperationException e) {
571       SessionManagerException ex =
572           new SessionManagerException(ErrorCode.DEVICE_NOT_EXISTS, deviceId);
573       String exceptionMessage = ex.getMessage().replaceAll("[\n|\r|\t]", "_");
574       logger.error(exceptionMessage);
575       throw ex;
576     } catch (Exception e) {
577       logger.error(e.getMessage());
578       SessionManagerException ex =
579           new SessionManagerException(ErrorCode.SESSION_EXPIRED, sessionId);
580       logger.error(ex.getMessage());
581       throw ex;
582     }
583     return deviceOperationRequestDetails;
584   }
585
586   /**
587    * @param deviceId
588    * @return
589    * @throws DeviceOperationException
590    */
591   public TR069DeviceDetails getDeviceDetails(String deviceId) throws DeviceOperationException {
592     return deviceOperationInterface.getDeviceDetails(deviceId);
593   }
594
595   /**
596    * @param deviceId
597    * @param operationId
598    * @return
599    * @throws DeviceOperationException
600    * @throws SessionConcurrentAccessException
601    * @throws InterruptedException
602    */
603   private SessionDTO acquireSessionLockWithRetryOnFailure(String deviceId, Long operationId)
604       throws DeviceOperationException, SessionConcurrentAccessException, InterruptedException {
605     int sessionLockAcquireRetryCount = 0;
606     SessionDTO sessionDTO = null;
607     do {
608       try {
609         sessionDTO = acquireSessionLock(deviceId, null, false);
610         logger.info(
611             "Successfully acquired the session lock for processing NBI request with operation ID: {}",
612             operationId);
613         break;
614       } catch (SessionConcurrentAccessException ex) {
615         sessionLockAcquireRetryCount++;
616         if (sessionLockAcquireRetryCount == 3) {
617           logger.error("Failed acquiring the lock after retry, rolling back the transaction");
618           throw ex;
619         }
620         logger.warn(
621             "Session lock acquiring failed with SessionConcurrentAccessException, hence retrying");
622         Thread.sleep(1000L);
623       }
624     } while (sessionLockAcquireRetryCount < 3);
625
626     return sessionDTO;
627   }
628
629   /**
630    * @param deviceId
631    * @param deviceNotification
632    * @return
633    * @throws DeviceOperationException
634    */
635   private TR069DeviceDetails getPersistedDeviceDetails(String deviceId,
636       DeviceInform deviceNotification) throws DeviceOperationException {
637     TR069DeviceDetails deviceDetails = null;
638     try {
639       deviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);
640     } catch (DeviceOperationException doe) {
641       if (ErrorCode.DEVICE_NOT_EXISTS.equals(doe.getErrorCode())) {
642         logger.info(
643             "Creating the device record in TR069_DEVICE_ table, as the device is authenticated successfully.");
644         createDevice(deviceNotification.getDeviceDetails());
645         deviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);
646       }
647     }
648
649     return deviceDetails;
650   }
651
652   /**
653    * @param deviceDetails
654    * @param notificationType
655    * @throws TR069EventProcessingException
656    */
657   private void sendAbortedOperationResultForPendingRPCRequests(DeviceDetails deviceDetails,
658       TR069InformType notificationType) throws TR069EventProcessingException {
659     String deviceId = deviceDetails.getDeviceId();
660     String notificationName = notificationType.name();
661     logger.debug(
662         "Device Inform event received is {}, hence aborting all the pending operations if any exists",
663         notificationName);
664
665     List<DeviceRPCRequest> deviceRPCRequestList =
666         deviceRPCRequestRepositoryHelper.findAllDeviceRPCRequests(deviceId);
667
668     for (DeviceRPCRequest pendingDeviceRPCRequest : deviceRPCRequestList) {
669       DeviceRPCResponse deviceOpResult =
670           tr069RequestProcessEngineUtility.buildAbortedOperationresult(deviceDetails,
671               pendingDeviceRPCRequest, AcsFaultCode.FAULT_CODE_8002);
672       String operationName = null;
673       if (pendingDeviceRPCRequest.getOpDetails().getOpCode() instanceof CustomOperationCode) {
674         CustomOperationCode operationCode =
675             (CustomOperationCode) pendingDeviceRPCRequest.getOpDetails().getOpCode();
676         operationName = operationCode.name();
677       } else {
678         TR069OperationCode operationCode =
679             (TR069OperationCode) pendingDeviceRPCRequest.getOpDetails().getOpCode();
680         operationName = operationCode.name();
681       }
682       Long operationId = pendingDeviceRPCRequest.getOperationId();
683       logger.debug("Aborting the NBI Operation request with operation Id : {} for operation: {}",
684           operationId, operationName);
685       tr069EventNotificationService.sendOperationResultToNBI(deviceOpResult);
686       // Marking the NBI Request as processed
687       deviceRPCRequestRepositoryHelper.markDeviceRPCRequestAsProcessed(deviceId, operationId);
688       stopDeviceRPCRequestTimer(deviceId, operationId);
689     }
690   }
691
692   /**
693    * @param tr069RequestProcessorData
694    * @param deviceNotification
695    */
696   private void updateDeviceDetailsFromInform(TR069RequestProcessorData tr069RequestProcessorData,
697       DeviceInform deviceNotification) {
698     Boolean isDeviceDataChanged =
699         isDeviceUpdateExists(tr069RequestProcessorData, deviceNotification);
700     if (isDeviceDataChanged.booleanValue()) {
701       updateDeviceDetails(tr069RequestProcessorData, deviceNotification);
702       try {
703         logger.info(
704             "The device data like connection request URL/ Device SW/HW version has changed, hence updating the device details");
705         deviceOperationInterface
706             .updateDeviceDetails(tr069RequestProcessorData.getTr069DeviceDetails());
707       } catch (DeviceOperationException e) {
708         logger.error("Updating the device details with the notification details failed, Reason: {}",
709             e.getMessage());
710         logger.error(e.getMessage());
711       }
712     } else {
713       TR069DeviceDetails tr069DeviceDetails =
714           (TR069DeviceDetails) tr069RequestProcessorData.getTr069DeviceDetails();
715       TR069DeviceEntity tr069DeviceEntity =
716           deviceRepository.findByDeviceId(tr069DeviceDetails.getDeviceId());
717       logger.info("Setting connection status as true for device: {}",
718           tr069DeviceDetails.getDeviceId());
719       tr069DeviceEntity.setConnStatus(true);
720       tr069DeviceEntity.setLastUpdatedTime(new Date());
721       tr069DeviceEntity.setErrorMsg(null);
722       deviceRepository.save(tr069DeviceEntity);
723     }
724   }
725
726   /**
727    * @param deviceId
728    * @param operationId
729    * @return
730    */
731   private DeviceRPCRequest getNextRPCRequest(String deviceId, Long operationId) {
732     DeviceRPCRequest deviceRPCRequest = null;
733     try {
734       List<TR069DeviceRPCRequestEntity> entityList =
735           deviceRPCRequestRepositoryHelper.findByDeviceIdAndOperationId(deviceId, operationId);
736       Integer operationCode = entityList.get(0).getOpCode();
737       if (CustomOperationCode.getByOperationCode(operationCode) == null) {
738         logger.info("Marking the Device RPC request with operation id - {} as processed.",
739             operationId);
740         deviceRPCRequestRepositoryHelper.markDeviceRPCRequestAsProcessed(deviceId, operationId);
741       }
742       logger.debug(PENDING_RPC_CHECK);
743       deviceRPCRequest = deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);
744     } catch (TR069EventProcessingException e) {
745       logger.error("An unknown exception occurred while fetching the NBI request, Reason: {}",
746           e.getMessage());
747     }
748
749     return deviceRPCRequest;
750   }
751
752   /**
753    * Creates the device in the DM module if factory imported already
754    * 
755    * @param deviceDetails
756    * @throws DeviceOperationException
757    */
758   private void createDevice(DeviceDetails deviceDetails) throws DeviceOperationException {
759     deviceOperationInterface.updateDeviceDetails(deviceDetails);
760   }
761 }