Initial source code
[oam/tr069-adapter.git] / acs / requestprocessor / src / main / java / org / commscope / tr069adapter / acs / requestprocessor / impl / TR069RequestProcessEngine.java
diff --git a/acs/requestprocessor/src/main/java/org/commscope/tr069adapter/acs/requestprocessor/impl/TR069RequestProcessEngine.java b/acs/requestprocessor/src/main/java/org/commscope/tr069adapter/acs/requestprocessor/impl/TR069RequestProcessEngine.java
new file mode 100644 (file)
index 0000000..8ad7f2b
--- /dev/null
@@ -0,0 +1,702 @@
+/*\r
+ * ============LICENSE_START========================================================================\r
+ * ONAP : tr-069-adapter\r
+ * =================================================================================================\r
+ * Copyright (C) 2020 CommScope Inc Intellectual Property.\r
+ * =================================================================================================\r
+ * This tr-069-adapter software file is distributed by CommScope Inc under the Apache License,\r
+ * Version 2.0 (the "License"); you may not use this file except in compliance with the License. You\r
+ * may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,\r
+ * either express or implied. See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ===============LICENSE_END=======================================================================\r
+ */\r
+\r
+package org.commscope.tr069adapter.acs.requestprocessor.impl;\r
+\r
+import static org.commscope.tr069adapter.acs.common.utils.AcsConstants.SESSION_ID;\r
+\r
+import java.util.List;\r
+\r
+import org.commscope.tr069adapter.acs.common.DeviceDetails;\r
+import org.commscope.tr069adapter.acs.common.DeviceInform;\r
+import org.commscope.tr069adapter.acs.common.DeviceRPCRequest;\r
+import org.commscope.tr069adapter.acs.common.DeviceRPCResponse;\r
+import org.commscope.tr069adapter.acs.common.OperationCode;\r
+import org.commscope.tr069adapter.acs.common.OperationDetails;\r
+import org.commscope.tr069adapter.acs.common.dto.CustomOperationCode;\r
+import org.commscope.tr069adapter.acs.common.dto.DeviceOperationRequestDetails;\r
+import org.commscope.tr069adapter.acs.common.dto.TR069DeviceDetails;\r
+import org.commscope.tr069adapter.acs.common.dto.TR069InformType;\r
+import org.commscope.tr069adapter.acs.common.dto.TR069OperationCode;\r
+import org.commscope.tr069adapter.acs.common.dto.TR069OperationDetails;\r
+import org.commscope.tr069adapter.acs.common.exception.DeviceOperationException;\r
+import org.commscope.tr069adapter.acs.common.exception.SessionConcurrentAccessException;\r
+import org.commscope.tr069adapter.acs.common.exception.SessionManagerException;\r
+import org.commscope.tr069adapter.acs.common.exception.TR069EventProcessingException;\r
+import org.commscope.tr069adapter.acs.common.faults.AcsFaultCode;\r
+import org.commscope.tr069adapter.acs.common.inform.BootInform;\r
+import org.commscope.tr069adapter.acs.common.inform.BootstrapInform;\r
+import org.commscope.tr069adapter.acs.common.response.DeviceInformResponse;\r
+import org.commscope.tr069adapter.acs.common.utils.ErrorCode;\r
+import org.commscope.tr069adapter.acs.requestprocessor.DeviceOperationInterface;\r
+import org.commscope.tr069adapter.acs.requestprocessor.dao.DeviceRPCRequestRepositoryHelper;\r
+import org.commscope.tr069adapter.acs.requestprocessor.dto.CustomOperationData;\r
+import org.commscope.tr069adapter.acs.requestprocessor.dto.SessionDTO;\r
+import org.commscope.tr069adapter.acs.requestprocessor.dto.SessionState;\r
+import org.commscope.tr069adapter.acs.requestprocessor.dto.TR069RequestProcessorData;\r
+import org.commscope.tr069adapter.acs.requestprocessor.entity.TR069DeviceRPCRequestEntity;\r
+import org.commscope.tr069adapter.acs.requestprocessor.helper.TR069RequestProcessEngineHelper;\r
+import org.commscope.tr069adapter.acs.requestprocessor.util.TR069RequestProcessorUtility;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.stereotype.Component;\r
+\r
+@Component\r
+public class TR069RequestProcessEngine extends TR069RequestProcessEngineHelper {\r
+\r
+  private static final String PENDING_RPC_CHECK =\r
+      "Checking if any pending Device RPC requests exists for the device";\r
+\r
+  private static final Logger logger = LoggerFactory.getLogger(TR069RequestProcessEngine.class);\r
+\r
+  @Autowired\r
+  TR069EventNotificationService tr069EventNotificationService;\r
+\r
+  @Autowired\r
+  DeviceOperationInterface deviceOperationInterface;\r
+\r
+  @Autowired\r
+  protected DeviceRPCRequestRepositoryHelper deviceRPCRequestRepositoryHelper;\r
+\r
+  /**\r
+   * @param deviceRPCRequest\r
+   * @throws TR069EventProcessingException\r
+   * @throws SessionConcurrentAccessException\r
+   */\r
+  public void processDeviceRPCRequest(DeviceRPCRequest deviceRPCRequest)\r
+      throws TR069EventProcessingException, SessionConcurrentAccessException {\r
+\r
+    DeviceRPCResponse deviceRPCResponse = null;\r
+    String deviceId = null;\r
+\r
+    try {\r
+      if (deviceRPCRequest == null) {\r
+        TR069EventProcessingException ex =\r
+            new TR069EventProcessingException(ErrorCode.INVALID_NBI_REQUEST);\r
+        logger.error(ex.getMessage());\r
+        throw ex;\r
+      }\r
+\r
+      Long operationId = deviceRPCRequest.getOperationId();\r
+      logger.info("A Mapper request is received with operationID: {}", operationId);\r
+      TR069DeviceDetails tr069DeviceDetails = null;\r
+      deviceId = deviceRPCRequest.getDeviceDetails().getDeviceId();\r
+      try {\r
+        tr069DeviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);\r
+      } catch (DeviceOperationException deo) {\r
+        logger.error(deo.getMessage());\r
+        deviceRPCResponse = tr069RequestProcessEngineUtility.buildAbortedOperationresult(\r
+            tr069DeviceDetails, deviceRPCRequest, AcsFaultCode.FAULT_CODE_8000);\r
+        return;\r
+      }\r
+\r
+      try {\r
+        SessionDTO sessionDTO = acquireSessionLockWithRetryOnFailure(deviceId, operationId);\r
+\r
+        logger.debug("Persisting the Device RPC request, with operation ID: {}", operationId);\r
+        List<TR069DeviceRPCRequestEntity> tr069DeviceRPCRequestEntities =\r
+            TR069RequestProcessorUtility.convertToEntity(deviceRPCRequest);\r
+        deviceRPCRequestRepositoryHelper.saveAll(tr069DeviceRPCRequestEntities);\r
+        logger.info("Successfully persisted the Device RPC request");\r
+\r
+        if (sessionDTO != null) {\r
+          if (SessionState.TERMINATED.equals(sessionDTO.getSessionState())) {\r
+            logger.debug("No active session exists, hence requesting for Connection request");\r
+            requestForConnectionRequestInform(tr069DeviceDetails);\r
+\r
+            // Start Request Timer\r
+            startDeviceRPCRequestTimer(deviceId, deviceRPCRequest.getOperationId(),\r
+                deviceRPCRequest.getOptions().getExecutionTimeout());\r
+          } else {\r
+            logger.debug(\r
+                "Session is in processing state, Will be notified to session manager to pick the request on session availability");\r
+          }\r
+        } else {\r
+          logger.warn(\r
+              "The device is not activated yet, hence the NBI Operation result cannot be processed!");\r
+          deviceRPCResponse = tr069RequestProcessEngineUtility.buildAbortedOperationresult(\r
+              tr069DeviceDetails, deviceRPCRequest, AcsFaultCode.FAULT_CODE_8001);\r
+        }\r
+      } catch (SessionConcurrentAccessException scae) {\r
+        throw scae;\r
+      } catch (Exception e) {\r
+        logger.error("An unknown exception occurred while processing the NBI request, Reason: {}",\r
+            e.getMessage());\r
+        deviceRPCResponse = tr069RequestProcessEngineUtility.buildAbortedOperationresult(\r
+            tr069DeviceDetails, deviceRPCRequest, AcsFaultCode.FAULT_CODE_8004);\r
+      }\r
+    } finally {\r
+      if (deviceRPCResponse != null) {\r
+        logger.debug("Sending failed operation result for this NBI request");\r
+        tr069EventNotificationService.sendOperationResultToNBI(deviceRPCResponse);\r
+        // Marking the NBI Request as processed\r
+        deviceRPCRequestRepositoryHelper.markDeviceRPCRequestAsProcessed(deviceId,\r
+            deviceRPCRequest.getOperationId());\r
+      }\r
+    }\r
+  }\r
+\r
+  /**\r
+   * Common Step 1. Since there can exist only one Inform from any device, which will be the\r
+   * initiator of the session, following steps to be followed a. Stop the session timer for this\r
+   * device if any running. b. Get the lock by reading the session record for this device from DB\r
+   * and load the session object in Thread Local Cache c. Create a new session id from\r
+   * SessionManager, and update the session object with new Session id and change the state to\r
+   * PROCESSING. 2. Read the Device record from the DB, and load the device DTO also in the\r
+   * ThreadLocalCache\r
+   * \r
+   * Common Notification Specific Step 1. Take the connection request URL from the\r
+   * deviceNotification object 2. Update the Device DTO object with connection request URL,\r
+   * swVersion, hwVersion if there is a difference. Update the last updated time if updated 3. Send\r
+   * this Inform to the NBI, by calling the ProcessDeviceInform method on TR069NBIService module. 4.\r
+   * Create the Inform response Object and update the sessionID used for this transaction in the\r
+   * response which will be used in the cookie of the HTTP response 5. Post the message into\r
+   * Response Queue 6. Change the session state to LOCKED 7. Save the Session and device records\r
+   * with the values in ThreadLocalCache 8. Start the session timer and default request timer (As\r
+   * configured for all the requests from device) 8. All the above steps to be executed in a single\r
+   * transaction.\r
+   * \r
+   * @param deviceNotification\r
+   * @return\r
+   * @throws SessionConcurrentAccessException\r
+   */\r
+  public DeviceInformResponse processDeviceInform(DeviceInform deviceNotification)\r
+      throws SessionConcurrentAccessException {\r
+\r
+    String deviceId = deviceNotification.getDeviceDetails().getDeviceId();\r
+    TR069InformType notificationType = (TR069InformType) deviceNotification.getInformType();\r
+    logger.info("Processing the Device Inform Event: '{}'", notificationType.getNotificationCode());\r
+    String newSessionId = null;\r
+    SessionDTO session = null;\r
+    TR069DeviceDetails deviceDetails = null;\r
+    DeviceInformResponse informResponse = null;\r
+\r
+    try {\r
+\r
+      SessionDTO sessionDTO = getSession(deviceId);\r
+      if (sessionDTO != null && !SessionState.TERMINATED.equals(sessionDTO.getSessionState())) {\r
+        String sessionId = sessionDTO.getSessionId();\r
+        logger.debug(\r
+            "The session with session id {} is not terminated, hence stopping the associated timer",\r
+            sessionId);\r
+        stopSessionTimer(sessionDTO.getSessionId());\r
+      }\r
+\r
+      // To stop the request timer if any running for this device, and send failed operation\r
+      // result for any such pending cases. Requests pending in DB should not be cleared\r
+\r
+      /*\r
+       * Read any pending records in TR069_NBI_REQUEST table for this device. Send abort operation\r
+       * result for all the pending requests Delete the records from the TR069_NBI_Request table for\r
+       * this device.\r
+       */\r
+      if (deviceNotification instanceof BootstrapInform\r
+          || deviceNotification instanceof BootInform) {\r
+        sendAbortedOperationResultForPendingRPCRequests(deviceNotification.getDeviceDetails(),\r
+            notificationType);\r
+      }\r
+\r
+      session = acquireSessionLock(deviceId, notificationType, true);\r
+      deviceDetails = getPersistedDeviceDetails(deviceId, deviceNotification);\r
+\r
+      newSessionId = session.getSessionId();\r
+      logger.debug("The session id generated to process the device notification request is: {} ",\r
+          newSessionId);\r
+\r
+      initThreadLocalCache(deviceDetails, session);\r
+      TR069RequestProcessorData tr069RequestProcessorData = getTR069RequestProcessorData();\r
+      updateSessionOnDeviceNotification(tr069RequestProcessorData, newSessionId);\r
+\r
+      logger.debug("Sending the Device Inform Event to the Mapper");\r
+      tr069EventNotificationService.sendDeviceInformToNBI(deviceNotification);\r
+\r
+      updateDeviceDetailsFromInform(tr069RequestProcessorData, deviceNotification);\r
+\r
+      logger.debug("Updating the session for the device with newly generated session id");\r
+      changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);\r
+      updateSession(session);\r
+\r
+      // Start session timer - Get a default timeout for session.\r
+      startSessionTimer(session.getSessionId());\r
+\r
+      informResponse =\r
+          new DeviceInformResponse(newSessionId, deviceNotification.getDeviceDetails());\r
+    } catch (\r
+\r
+    DeviceOperationException doe) {\r
+      logger.error(doe.getMessage());\r
+    } catch (SessionConcurrentAccessException scae) {\r
+      throw scae;\r
+    } catch (Exception e) {\r
+      throw new SessionConcurrentAccessException(ErrorCode.UNKNOWN_ERROR, e.getMessage());\r
+    }\r
+    return informResponse;\r
+  }\r
+\r
+  /**\r
+   * \r
+   * 1. Stop the request timer for this device using the session ID received in the cookie of the\r
+   * request 2. Read the session record from the session table into ThreadLocalCache 3. Move the\r
+   * session state to PROCESSING and update the OPERATION_ID as null. 4a. Session Manager to check\r
+   * if any pending request being notified for the device using in memory cache - Not planned for\r
+   * this Drop 4. As an interim solution for drop1, Check the TR069_NBI_REQUEST table if any request\r
+   * is pending.\r
+   * \r
+   * if (anyPendingRequestExists) { 1. Pick the request with the least created_time 2. Create the\r
+   * response object and update the sessionID used for this transaction in the response which will\r
+   * be used in the cookie of the HTTP response 3. Post the response object into Response Queue 4.\r
+   * Change the session state to LOCKED and update the OPERATION_ID in the session table with the\r
+   * NBI requests OPERATION_ID. 5. Save the Session record with the values in ThreadLocalCache 6.\r
+   * Start the session timer and request timer (available in the request) } else { 1. Move the\r
+   * session state to PROCESSING and update the OPERATION_ID as null. 2. Save the Session record\r
+   * with the values in ThreadLocalCache. 3. Start the session timer. }\r
+   * \r
+   * 6. All the above steps to be executed in a single transaction\r
+   * \r
+   * @param deviceRPCResponse\r
+   * @return\r
+   * @throws SessionConcurrentAccessException\r
+   */\r
+  public DeviceRPCRequest processDeviceRPCResponse(DeviceRPCResponse deviceRPCResponse)\r
+      throws SessionConcurrentAccessException {\r
+    TR069DeviceDetails deviceDetails = (TR069DeviceDetails) deviceRPCResponse.getDeviceDetails();\r
+    String deviceId = deviceDetails.getDeviceId();\r
+    logger.info("Processing the operation response from device");\r
+\r
+    SessionDTO session;\r
+    DeviceRPCRequest deviceRPCRequest = null;\r
+    try {\r
+      session = acquireSessionLock(deviceId, null, false);\r
+      String newSessionId = session.getSessionId();\r
+      logger.debug("The session id used to process the device RPC response is: {}", newSessionId);\r
+\r
+      initThreadLocalCache(deviceDetails, session);\r
+      TR069RequestProcessorData tr069RequestProcessorData = getTR069RequestProcessorData();\r
+      updateSessionOnDeviceNotification(tr069RequestProcessorData, newSessionId);\r
+\r
+      Long operationId = deviceRPCResponse.getOperationId();\r
+\r
+      deviceRPCRequest = getNextRPCRequest(deviceId, operationId);\r
+\r
+      if (deviceRPCRequest != null) {\r
+        OperationDetails opDetails = deviceRPCRequest.getOpDetails();\r
+        if (opDetails != null) {\r
+          OperationCode opCode = opDetails.getOpCode();\r
+          if (opCode instanceof CustomOperationCode) {\r
+            CustomOperationCode customOperationCode = (CustomOperationCode) opCode;\r
+            String customOperationCodeName = customOperationCode.name();\r
+            logger.info(\r
+                "The Device RPC request is of custom type, the custom operation to be performed is: {}",\r
+                customOperationCodeName);\r
+            String jndiName = customOperationCode.getJndiName();\r
+            CustomOperationData customOperationData =\r
+                new CustomOperationData(deviceDetails, deviceRPCResponse, deviceRPCRequest);\r
+            customOperationData = executeCustomOperation(jndiName, customOperationData);\r
+\r
+            DeviceRPCRequest operationRequest = customOperationData.getDeviceRPCRequest();\r
+            deviceRPCResponse = customOperationData.getDeviceRPCResponse();\r
+            if (operationRequest != null) {\r
+              operationRequest.addContextParam(SESSION_ID, newSessionId);\r
+              updateSessionCurOpId(tr069RequestProcessorData, deviceRPCRequest.getOperationId());\r
+              changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);\r
+              updateSession(session);\r
+              if (deviceRPCResponse != null && operationRequest.getOperationId() != null\r
+                  && !operationRequest.getOperationId()\r
+                      .equals(deviceRPCResponse.getOperationId())) {\r
+                logger.debug(\r
+                    "Sending the Device RPC response for a configure Multiple object prior operation");\r
+                // Sending the operation response to NBI\r
+                tr069EventNotificationService.sendOperationResultToNBI(deviceRPCResponse);\r
+              }\r
+              return operationRequest;\r
+            } else {\r
+              logger.debug(PENDING_RPC_CHECK);\r
+              deviceRPCRequest =\r
+                  deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);\r
+            }\r
+          }\r
+        }\r
+      }\r
+\r
+      if (deviceRPCRequest != null) {\r
+        logger.info("A pending Device RPC request exists for the device with operation ID: {}",\r
+            deviceRPCRequest.getOperationId());\r
+        updateSessionCurOpId(tr069RequestProcessorData, deviceRPCRequest.getOperationId());\r
+        changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);\r
+\r
+        deviceRPCRequest.addContextParam(SESSION_ID, newSessionId);\r
+\r
+      } else {\r
+        logger.info(\r
+            "No pending Device RPC request exists for the device, hence empty response will be sent to the device");\r
+        logger.debug("Updating the session to terminated state");\r
+        // To stop the session timer if any running for this device.\r
+        stopSessionTimer(newSessionId);\r
+        changeSessionState(tr069RequestProcessorData, SessionState.TERMINATED);\r
+      }\r
+\r
+      // To stop the request timer if any running for this operation.\r
+      stopDeviceRPCRequestTimer(deviceId, deviceRPCResponse.getOperationId());\r
+\r
+      // Sending the operation response to NBI\r
+      logger.debug("Sending the Device RPC Response to the Mapper");\r
+      tr069EventNotificationService.sendOperationResultToNBI(deviceRPCResponse);\r
+\r
+      updateSession(session);\r
+    } catch (DeviceOperationException doe) {\r
+      logger.error(doe.getMessage());\r
+    } catch (SessionConcurrentAccessException scae) {\r
+      throw scae;\r
+    }\r
+\r
+    return deviceRPCRequest;\r
+  }\r
+\r
+  /**\r
+   * \r
+   * 1. Stop the request timer for this device using the session ID received in the cookie of the\r
+   * request 2. Read the session record from the session table into ThreadLocalCache 3. Move the\r
+   * session state to PROCESSING and update the OPERATION_ID as null. 4a. Session Manager to check\r
+   * if any pending request being notified for the device using in memory cache - Not planned for\r
+   * this Drop 4. As an interim solution for drop1, Check the TR069_NBI_REQUEST table if any request\r
+   * is pending.\r
+   * \r
+   * if (anyPendingRequestExists) { 1. Pick the request with the least created_time 2. Create the\r
+   * response object and update the sessionID used for this transaction in the response which will\r
+   * be used in the cookie of the HTTP response 3. Post the response object into Response Queue 4.\r
+   * Change the session state to LOCKED and update the OPERATION_ID in the session table with the\r
+   * NBI requests OPERATION_ID. 5. Save the Session record with the values in ThreadLocalCache 6.\r
+   * Start the session timer and request timer (available in the request) } else { 1. Move the\r
+   * session state to PROCESSING and update the OPERATION_ID as null. 2. Save the Session record\r
+   * with the values in ThreadLocalCache. 3. Start the session timer. }\r
+   * \r
+   * 6. All the above steps to be executed in a single transaction\r
+   * \r
+   * @param deviceDetails\r
+   * @return\r
+   * @throws SessionConcurrentAccessException\r
+   */\r
+  public DeviceRPCRequest processEmptyDeviceRequest(TR069DeviceDetails deviceDetails)\r
+      throws SessionConcurrentAccessException {\r
+    String deviceId = deviceDetails.getDeviceId();\r
+    logger.info("Processing the empty request from device");\r
+\r
+    SessionDTO session;\r
+    DeviceRPCRequest nbiDeviceOperationRequest = null;\r
+    try {\r
+      session = acquireSessionLock(deviceId, null, false);\r
+      String newSessionId = session.getSessionId();\r
+      logger.debug("The session id used to process the empty device request is: {}", newSessionId);\r
+\r
+      initThreadLocalCache(deviceDetails, session);\r
+      TR069RequestProcessorData tr069RequestProcessorData = getTR069RequestProcessorData();\r
+      updateSessionOnDeviceNotification(tr069RequestProcessorData, newSessionId);\r
+\r
+      logger.debug(PENDING_RPC_CHECK);\r
+      nbiDeviceOperationRequest =\r
+          deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);\r
+      if (nbiDeviceOperationRequest != null) {\r
+        Long operationId = nbiDeviceOperationRequest.getOperationId();\r
+        OperationDetails opDetails = nbiDeviceOperationRequest.getOpDetails();\r
+        if (opDetails != null) {\r
+          OperationCode opCode = opDetails.getOpCode();\r
+          if (opCode instanceof CustomOperationCode) {\r
+            CustomOperationCode customOperationCode = (CustomOperationCode) opCode;\r
+            String customOperationCodeName = customOperationCode.name();\r
+            logger.info(\r
+                "The Device RPC operation request is of custom type, the custom operation to be performed is: {}",\r
+                customOperationCodeName);\r
+            String jndiName = customOperationCode.getJndiName();\r
+            CustomOperationData customOperationData =\r
+                new CustomOperationData(deviceDetails, null, nbiDeviceOperationRequest);\r
+            customOperationData = executeCustomOperation(jndiName, customOperationData);\r
+\r
+            DeviceRPCRequest operationRequest = customOperationData.getDeviceRPCRequest();\r
+            if (operationRequest != null) {\r
+              operationRequest.addContextParam(SESSION_ID, newSessionId);\r
+              updateSessionCurOpId(tr069RequestProcessorData, operationId);\r
+              changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);\r
+              updateSession(session);\r
+              return operationRequest;\r
+            } else {\r
+              logger.debug(PENDING_RPC_CHECK);\r
+              nbiDeviceOperationRequest =\r
+                  deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);\r
+            }\r
+          }\r
+        }\r
+      }\r
+\r
+      if (nbiDeviceOperationRequest != null) {\r
+        Long operationId = nbiDeviceOperationRequest.getOperationId();\r
+        logger.info("A pending Device RPC request exists for the device with operation ID: {}",\r
+            operationId);\r
+        updateSessionCurOpId(tr069RequestProcessorData, operationId);\r
+        changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);\r
+\r
+        nbiDeviceOperationRequest.addContextParam(SESSION_ID, newSessionId);\r
+\r
+      } else {\r
+        logger.info(\r
+            "No pending Device RPC request exists for the device, hence empty device response will be sent to the device");\r
+        logger.debug("Updating the session to terminated state");\r
+        // To stop the session timer if any running for this device.\r
+        stopSessionTimer(newSessionId);\r
+        changeSessionState(tr069RequestProcessorData, SessionState.TERMINATED);\r
+      }\r
+\r
+      updateSession(session);\r
+    } catch (DeviceOperationException doe) {\r
+      logger.error(doe.getMessage());\r
+    } catch (SessionConcurrentAccessException scae) {\r
+      throw scae;\r
+    }\r
+\r
+    return nbiDeviceOperationRequest;\r
+  }\r
+\r
+  /**\r
+   * @param sessionId\r
+   * @return\r
+   * @throws SessionManagerException\r
+   */\r
+  public DeviceOperationRequestDetails getOpRequestDetailsBySessionId(String sessionId)\r
+      throws SessionManagerException {\r
+    DeviceOperationRequestDetails deviceOperationRequestDetails =\r
+        new DeviceOperationRequestDetails();\r
+\r
+    logger.debug("Fetching Operation request details for session: {}", sessionId);\r
+    SessionDTO session = getSessionBySessionId(sessionId);\r
+    String deviceId = session.getDeviceId();\r
+    TR069DeviceDetails deviceDetails = null;\r
+    try {\r
+      deviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);\r
+      if (session.getCurrentOperationId() == null) {\r
+        logger.debug("There exists no pending operation request for the session: {}", sessionId);\r
+      } else {\r
+        logger.debug("There exists pending operation request for the session: {}", sessionId);\r
+        List<TR069DeviceRPCRequestEntity> tr069DeviceRPCRequestEntityList =\r
+            deviceRPCRequestRepositoryHelper.findByDeviceIdAndOperationId(deviceId,\r
+                session.getCurrentOperationId());\r
+        if (tr069DeviceRPCRequestEntityList == null) {\r
+          SessionManagerException ex =\r
+              new SessionManagerException(ErrorCode.SESSION_EXPIRED, sessionId);\r
+          logger.error(ex.getMessage());\r
+          throw ex;\r
+        }\r
+        DeviceRPCRequest deviceRPCRequest =\r
+            TR069RequestProcessorUtility.convertToDTO(tr069DeviceRPCRequestEntityList);\r
+        OperationCode opCode = deviceRPCRequest.getOpDetails().getOpCode();\r
+\r
+        String operationName = null;\r
+\r
+        if (opCode instanceof TR069OperationCode) {\r
+          operationName = ((TR069OperationCode) opCode).name();\r
+        } else {\r
+          operationName = ((CustomOperationCode) opCode).name();\r
+          TR069OperationDetails tr069OperationDetails =\r
+              (TR069OperationDetails) deviceRPCRequest.getOpDetails();\r
+          opCode = getCustomOperationCode(tr069OperationDetails);\r
+        }\r
+        logger.info("The pending operation request for the session is of operation {}",\r
+            operationName);\r
+        deviceOperationRequestDetails.setOpCode(opCode);\r
+        deviceOperationRequestDetails.setOperationId(deviceRPCRequest.getOperationId());\r
+      }\r
+      deviceOperationRequestDetails.setDeviceDetails(deviceDetails);\r
+    } catch (DeviceOperationException e) {\r
+      SessionManagerException ex =\r
+          new SessionManagerException(ErrorCode.DEVICE_NOT_EXISTS, deviceId);\r
+      logger.error(ex.getMessage());\r
+      throw ex;\r
+    } catch (Exception e) {\r
+      logger.error(e.getMessage());\r
+      SessionManagerException ex =\r
+          new SessionManagerException(ErrorCode.SESSION_EXPIRED, sessionId);\r
+      logger.error(ex.getMessage());\r
+      throw ex;\r
+    }\r
+    return deviceOperationRequestDetails;\r
+  }\r
+\r
+  /**\r
+   * @param deviceId\r
+   * @return\r
+   * @throws DeviceOperationException\r
+   */\r
+  public TR069DeviceDetails getDeviceDetails(String deviceId) throws DeviceOperationException {\r
+    return deviceOperationInterface.getDeviceDetails(deviceId);\r
+  }\r
+\r
+  /**\r
+   * @param deviceId\r
+   * @param operationId\r
+   * @return\r
+   * @throws DeviceOperationException\r
+   * @throws SessionConcurrentAccessException\r
+   * @throws InterruptedException\r
+   */\r
+  private SessionDTO acquireSessionLockWithRetryOnFailure(String deviceId, Long operationId)\r
+      throws DeviceOperationException, SessionConcurrentAccessException, InterruptedException {\r
+    int sessionLockAcquireRetryCount = 0;\r
+    SessionDTO sessionDTO = null;\r
+    do {\r
+      try {\r
+        sessionDTO = acquireSessionLock(deviceId, null, false);\r
+        logger.info(\r
+            "Successfully acquired the session lock for processing NBI request with operation ID: {}",\r
+            operationId);\r
+        break;\r
+      } catch (SessionConcurrentAccessException ex) {\r
+        sessionLockAcquireRetryCount++;\r
+        if (sessionLockAcquireRetryCount == 3) {\r
+          logger.error("Failed acquiring the lock after retry, rolling back the transaction");\r
+          throw ex;\r
+        }\r
+        logger.warn(\r
+            "Session lock acquiring failed with SessionConcurrentAccessException, hence retrying");\r
+        Thread.sleep(1000L);\r
+      }\r
+    } while (sessionLockAcquireRetryCount < 3);\r
+\r
+    return sessionDTO;\r
+  }\r
+\r
+  /**\r
+   * @param deviceId\r
+   * @param deviceNotification\r
+   * @return\r
+   * @throws DeviceOperationException\r
+   */\r
+  private TR069DeviceDetails getPersistedDeviceDetails(String deviceId,\r
+      DeviceInform deviceNotification) throws DeviceOperationException {\r
+    TR069DeviceDetails deviceDetails = null;\r
+    try {\r
+      deviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);\r
+    } catch (DeviceOperationException doe) {\r
+      if (ErrorCode.DEVICE_NOT_EXISTS.equals(doe.getErrorCode())) {\r
+        logger.info(\r
+            "Creating the device record in TR069_DEVICE_ table, as the device is authenticated successfully.");\r
+        createDevice(deviceNotification.getDeviceDetails());\r
+        deviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);\r
+      }\r
+    }\r
+\r
+    return deviceDetails;\r
+  }\r
+\r
+  /**\r
+   * @param deviceDetails\r
+   * @param notificationType\r
+   * @throws TR069EventProcessingException\r
+   */\r
+  private void sendAbortedOperationResultForPendingRPCRequests(DeviceDetails deviceDetails,\r
+      TR069InformType notificationType) throws TR069EventProcessingException {\r
+    String deviceId = deviceDetails.getDeviceId();\r
+    String notificationName = notificationType.name();\r
+    logger.debug(\r
+        "Device Inform event received is {}, hence aborting all the pending operations if any exists",\r
+        notificationName);\r
+\r
+    List<DeviceRPCRequest> deviceRPCRequestList =\r
+        deviceRPCRequestRepositoryHelper.findAllDeviceRPCRequests(deviceId);\r
+\r
+    for (DeviceRPCRequest pendingDeviceRPCRequest : deviceRPCRequestList) {\r
+      DeviceRPCResponse deviceOpResult =\r
+          tr069RequestProcessEngineUtility.buildAbortedOperationresult(deviceDetails,\r
+              pendingDeviceRPCRequest, AcsFaultCode.FAULT_CODE_8002);\r
+      String operationName = null;\r
+      if (pendingDeviceRPCRequest.getOpDetails().getOpCode() instanceof CustomOperationCode) {\r
+        CustomOperationCode operationCode =\r
+            (CustomOperationCode) pendingDeviceRPCRequest.getOpDetails().getOpCode();\r
+        operationName = operationCode.name();\r
+      } else {\r
+        TR069OperationCode operationCode =\r
+            (TR069OperationCode) pendingDeviceRPCRequest.getOpDetails().getOpCode();\r
+        operationName = operationCode.name();\r
+      }\r
+      Long operationId = pendingDeviceRPCRequest.getOperationId();\r
+      logger.debug("Aborting the NBI Operation request with operation Id : {} for operation: {}",\r
+          operationId, operationName);\r
+      tr069EventNotificationService.sendOperationResultToNBI(deviceOpResult);\r
+      // Marking the NBI Request as processed\r
+      deviceRPCRequestRepositoryHelper.markDeviceRPCRequestAsProcessed(deviceId, operationId);\r
+      stopDeviceRPCRequestTimer(deviceId, operationId);\r
+    }\r
+  }\r
+\r
+  /**\r
+   * @param tr069RequestProcessorData\r
+   * @param deviceNotification\r
+   */\r
+  private void updateDeviceDetailsFromInform(TR069RequestProcessorData tr069RequestProcessorData,\r
+      DeviceInform deviceNotification) {\r
+    Boolean isDeviceDataChanged =\r
+        isDeviceUpdateExists(tr069RequestProcessorData, deviceNotification);\r
+    if (isDeviceDataChanged.booleanValue()) {\r
+      updateDeviceDetails(tr069RequestProcessorData, deviceNotification);\r
+      try {\r
+        logger.info(\r
+            "The device data like connection request URL/ Device SW/HW version has changed, hence updating the device details");\r
+        deviceOperationInterface\r
+            .updateDeviceDetails(tr069RequestProcessorData.getTr069DeviceDetails());\r
+      } catch (DeviceOperationException e) {\r
+        logger.error("Updating the device details with the notification details failed, Reason: {}",\r
+            e.getMessage());\r
+        logger.error(e.getMessage());\r
+      }\r
+    }\r
+  }\r
+\r
+  /**\r
+   * @param deviceId\r
+   * @param operationId\r
+   * @return\r
+   */\r
+  private DeviceRPCRequest getNextRPCRequest(String deviceId, Long operationId) {\r
+    DeviceRPCRequest deviceRPCRequest = null;\r
+    try {\r
+      List<TR069DeviceRPCRequestEntity> entityList =\r
+          deviceRPCRequestRepositoryHelper.findByDeviceIdAndOperationId(deviceId, operationId);\r
+      Integer operationCode = entityList.get(0).getOpCode();\r
+      if (CustomOperationCode.getByOperationCode(operationCode) == null) {\r
+        logger.info("Marking the Device RPC request with operation id - {} as processed.",\r
+            operationId);\r
+        deviceRPCRequestRepositoryHelper.markDeviceRPCRequestAsProcessed(deviceId, operationId);\r
+      }\r
+      logger.debug(PENDING_RPC_CHECK);\r
+      deviceRPCRequest = deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);\r
+    } catch (TR069EventProcessingException e) {\r
+      logger.error("An unknown exception occurred while fetching the NBI request, Reason: {}",\r
+          e.getMessage());\r
+    }\r
+\r
+    return deviceRPCRequest;\r
+  }\r
+\r
+  /**\r
+   * Creates the device in the DM module if factory imported already\r
+   * \r
+   * @param deviceDetails\r
+   * @throws DeviceOperationException\r
+   */\r
+  private void createDevice(DeviceDetails deviceDetails) throws DeviceOperationException {\r
+    deviceOperationInterface.updateDeviceDetails(deviceDetails);\r
+  }\r
+}\r