-/*\r
- * ============LICENSE_START========================================================================\r
- * ONAP : tr-069-adapter\r
- * =================================================================================================\r
- * Copyright (C) 2020 CommScope Inc Intellectual Property.\r
- * =================================================================================================\r
- * This tr-069-adapter software file is distributed by CommScope Inc under the Apache License,\r
- * Version 2.0 (the "License"); you may not use this file except in compliance with the License. You\r
- * may obtain a copy of the License at\r
- *\r
- * http://www.apache.org/licenses/LICENSE-2.0\r
- *\r
- * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,\r
- * either express or implied. See the License for the specific language governing permissions and\r
- * limitations under the License.\r
- * ===============LICENSE_END=======================================================================\r
- */\r
-\r
-package org.commscope.tr069adapter.acs.requestprocessor.impl;\r
-\r
-import static org.commscope.tr069adapter.acs.common.utils.AcsConstants.SESSION_ID;\r
-\r
-import java.util.List;\r
-\r
-import org.commscope.tr069adapter.acs.common.DeviceDetails;\r
-import org.commscope.tr069adapter.acs.common.DeviceInform;\r
-import org.commscope.tr069adapter.acs.common.DeviceRPCRequest;\r
-import org.commscope.tr069adapter.acs.common.DeviceRPCResponse;\r
-import org.commscope.tr069adapter.acs.common.OperationCode;\r
-import org.commscope.tr069adapter.acs.common.OperationDetails;\r
-import org.commscope.tr069adapter.acs.common.dto.CustomOperationCode;\r
-import org.commscope.tr069adapter.acs.common.dto.DeviceOperationRequestDetails;\r
-import org.commscope.tr069adapter.acs.common.dto.TR069DeviceDetails;\r
-import org.commscope.tr069adapter.acs.common.dto.TR069InformType;\r
-import org.commscope.tr069adapter.acs.common.dto.TR069OperationCode;\r
-import org.commscope.tr069adapter.acs.common.dto.TR069OperationDetails;\r
-import org.commscope.tr069adapter.acs.common.exception.DeviceOperationException;\r
-import org.commscope.tr069adapter.acs.common.exception.SessionConcurrentAccessException;\r
-import org.commscope.tr069adapter.acs.common.exception.SessionManagerException;\r
-import org.commscope.tr069adapter.acs.common.exception.TR069EventProcessingException;\r
-import org.commscope.tr069adapter.acs.common.faults.AcsFaultCode;\r
-import org.commscope.tr069adapter.acs.common.inform.BootInform;\r
-import org.commscope.tr069adapter.acs.common.inform.BootstrapInform;\r
-import org.commscope.tr069adapter.acs.common.response.DeviceInformResponse;\r
-import org.commscope.tr069adapter.acs.common.utils.ErrorCode;\r
-import org.commscope.tr069adapter.acs.requestprocessor.DeviceOperationInterface;\r
-import org.commscope.tr069adapter.acs.requestprocessor.dao.DeviceRPCRequestRepositoryHelper;\r
-import org.commscope.tr069adapter.acs.requestprocessor.dto.CustomOperationData;\r
-import org.commscope.tr069adapter.acs.requestprocessor.dto.SessionDTO;\r
-import org.commscope.tr069adapter.acs.requestprocessor.dto.SessionState;\r
-import org.commscope.tr069adapter.acs.requestprocessor.dto.TR069RequestProcessorData;\r
-import org.commscope.tr069adapter.acs.requestprocessor.entity.TR069DeviceRPCRequestEntity;\r
-import org.commscope.tr069adapter.acs.requestprocessor.helper.TR069RequestProcessEngineHelper;\r
-import org.commscope.tr069adapter.acs.requestprocessor.util.TR069RequestProcessorUtility;\r
-import org.slf4j.Logger;\r
-import org.slf4j.LoggerFactory;\r
-import org.springframework.beans.factory.annotation.Autowired;\r
-import org.springframework.stereotype.Component;\r
-\r
-@Component\r
-public class TR069RequestProcessEngine extends TR069RequestProcessEngineHelper {\r
-\r
- private static final String PENDING_RPC_CHECK =\r
- "Checking if any pending Device RPC requests exists for the device";\r
-\r
- private static final Logger logger = LoggerFactory.getLogger(TR069RequestProcessEngine.class);\r
-\r
- @Autowired\r
- TR069EventNotificationService tr069EventNotificationService;\r
-\r
- @Autowired\r
- DeviceOperationInterface deviceOperationInterface;\r
-\r
- @Autowired\r
- protected DeviceRPCRequestRepositoryHelper deviceRPCRequestRepositoryHelper;\r
-\r
- /**\r
- * @param deviceRPCRequest\r
- * @throws TR069EventProcessingException\r
- * @throws SessionConcurrentAccessException\r
- */\r
- public void processDeviceRPCRequest(DeviceRPCRequest deviceRPCRequest)\r
- throws TR069EventProcessingException, SessionConcurrentAccessException {\r
-\r
- DeviceRPCResponse deviceRPCResponse = null;\r
- String deviceId = null;\r
-\r
- try {\r
- if (deviceRPCRequest == null) {\r
- TR069EventProcessingException ex =\r
- new TR069EventProcessingException(ErrorCode.INVALID_NBI_REQUEST);\r
- logger.error(ex.getMessage());\r
- throw ex;\r
- }\r
-\r
- Long operationId = deviceRPCRequest.getOperationId();\r
- logger.info("A Mapper request is received with operationID: {}", operationId);\r
- TR069DeviceDetails tr069DeviceDetails = null;\r
- deviceId = deviceRPCRequest.getDeviceDetails().getDeviceId();\r
- try {\r
- tr069DeviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);\r
- } catch (DeviceOperationException deo) {\r
- logger.error(deo.getMessage());\r
- deviceRPCResponse = tr069RequestProcessEngineUtility.buildAbortedOperationresult(\r
- tr069DeviceDetails, deviceRPCRequest, AcsFaultCode.FAULT_CODE_8000);\r
- return;\r
- }\r
-\r
- try {\r
- SessionDTO sessionDTO = acquireSessionLockWithRetryOnFailure(deviceId, operationId);\r
-\r
- logger.debug("Persisting the Device RPC request, with operation ID: {}", operationId);\r
- List<TR069DeviceRPCRequestEntity> tr069DeviceRPCRequestEntities =\r
- TR069RequestProcessorUtility.convertToEntity(deviceRPCRequest);\r
- deviceRPCRequestRepositoryHelper.saveAll(tr069DeviceRPCRequestEntities);\r
- logger.info("Successfully persisted the Device RPC request");\r
-\r
- if (sessionDTO != null) {\r
- if (SessionState.TERMINATED.equals(sessionDTO.getSessionState())) {\r
- logger.debug("No active session exists, hence requesting for Connection request");\r
- requestForConnectionRequestInform(tr069DeviceDetails);\r
-\r
- // Start Request Timer\r
- startDeviceRPCRequestTimer(deviceId, deviceRPCRequest.getOperationId(),\r
- deviceRPCRequest.getOptions().getExecutionTimeout());\r
- } else {\r
- logger.debug(\r
- "Session is in processing state, Will be notified to session manager to pick the request on session availability");\r
- }\r
- } else {\r
- logger.warn(\r
- "The device is not activated yet, hence the NBI Operation result cannot be processed!");\r
- deviceRPCResponse = tr069RequestProcessEngineUtility.buildAbortedOperationresult(\r
- tr069DeviceDetails, deviceRPCRequest, AcsFaultCode.FAULT_CODE_8001);\r
- }\r
- } catch (SessionConcurrentAccessException scae) {\r
- throw scae;\r
- } catch (Exception e) {\r
- logger.error("An unknown exception occurred while processing the NBI request, Reason: {}",\r
- e.getMessage());\r
- deviceRPCResponse = tr069RequestProcessEngineUtility.buildAbortedOperationresult(\r
- tr069DeviceDetails, deviceRPCRequest, AcsFaultCode.FAULT_CODE_8004);\r
- }\r
- } finally {\r
- if (deviceRPCResponse != null) {\r
- logger.debug("Sending failed operation result for this NBI request");\r
- tr069EventNotificationService.sendOperationResultToNBI(deviceRPCResponse);\r
- // Marking the NBI Request as processed\r
- deviceRPCRequestRepositoryHelper.markDeviceRPCRequestAsProcessed(deviceId,\r
- deviceRPCRequest.getOperationId());\r
- }\r
- }\r
- }\r
-\r
- /**\r
- * Common Step 1. Since there can exist only one Inform from any device, which will be the\r
- * initiator of the session, following steps to be followed a. Stop the session timer for this\r
- * device if any running. b. Get the lock by reading the session record for this device from DB\r
- * and load the session object in Thread Local Cache c. Create a new session id from\r
- * SessionManager, and update the session object with new Session id and change the state to\r
- * PROCESSING. 2. Read the Device record from the DB, and load the device DTO also in the\r
- * ThreadLocalCache\r
- * \r
- * Common Notification Specific Step 1. Take the connection request URL from the\r
- * deviceNotification object 2. Update the Device DTO object with connection request URL,\r
- * swVersion, hwVersion if there is a difference. Update the last updated time if updated 3. Send\r
- * this Inform to the NBI, by calling the ProcessDeviceInform method on TR069NBIService module. 4.\r
- * Create the Inform response Object and update the sessionID used for this transaction in the\r
- * response which will be used in the cookie of the HTTP response 5. Post the message into\r
- * Response Queue 6. Change the session state to LOCKED 7. Save the Session and device records\r
- * with the values in ThreadLocalCache 8. Start the session timer and default request timer (As\r
- * configured for all the requests from device) 8. All the above steps to be executed in a single\r
- * transaction.\r
- * \r
- * @param deviceNotification\r
- * @return\r
- * @throws SessionConcurrentAccessException\r
- */\r
- public DeviceInformResponse processDeviceInform(DeviceInform deviceNotification)\r
- throws SessionConcurrentAccessException {\r
-\r
- String deviceId = deviceNotification.getDeviceDetails().getDeviceId();\r
- TR069InformType notificationType = (TR069InformType) deviceNotification.getInformType();\r
- logger.info("Processing the Device Inform Event: '{}'", notificationType.getNotificationCode());\r
- String newSessionId = null;\r
- SessionDTO session = null;\r
- TR069DeviceDetails deviceDetails = null;\r
- DeviceInformResponse informResponse = null;\r
-\r
- try {\r
-\r
- SessionDTO sessionDTO = getSession(deviceId);\r
- if (sessionDTO != null && !SessionState.TERMINATED.equals(sessionDTO.getSessionState())) {\r
- String sessionId = sessionDTO.getSessionId();\r
- logger.debug(\r
- "The session with session id {} is not terminated, hence stopping the associated timer",\r
- sessionId);\r
- stopSessionTimer(sessionDTO.getSessionId());\r
- }\r
-\r
- // To stop the request timer if any running for this device, and send failed operation\r
- // result for any such pending cases. Requests pending in DB should not be cleared\r
-\r
- /*\r
- * Read any pending records in TR069_NBI_REQUEST table for this device. Send abort operation\r
- * result for all the pending requests Delete the records from the TR069_NBI_Request table for\r
- * this device.\r
- */\r
- if (deviceNotification instanceof BootstrapInform\r
- || deviceNotification instanceof BootInform) {\r
- sendAbortedOperationResultForPendingRPCRequests(deviceNotification.getDeviceDetails(),\r
- notificationType);\r
- }\r
-\r
- session = acquireSessionLock(deviceId, notificationType, true);\r
- deviceDetails = getPersistedDeviceDetails(deviceId, deviceNotification);\r
-\r
- newSessionId = session.getSessionId();\r
- logger.debug("The session id generated to process the device notification request is: {} ",\r
- newSessionId);\r
-\r
- initThreadLocalCache(deviceDetails, session);\r
- TR069RequestProcessorData tr069RequestProcessorData = getTR069RequestProcessorData();\r
- updateSessionOnDeviceNotification(tr069RequestProcessorData, newSessionId);\r
-\r
- logger.debug("Sending the Device Inform Event to the Mapper");\r
- tr069EventNotificationService.sendDeviceInformToNBI(deviceNotification);\r
-\r
- updateDeviceDetailsFromInform(tr069RequestProcessorData, deviceNotification);\r
-\r
- logger.debug("Updating the session for the device with newly generated session id");\r
- changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);\r
- updateSession(session);\r
-\r
- // Start session timer - Get a default timeout for session.\r
- startSessionTimer(session.getSessionId());\r
-\r
- informResponse =\r
- new DeviceInformResponse(newSessionId, deviceNotification.getDeviceDetails());\r
- } catch (\r
-\r
- DeviceOperationException doe) {\r
- logger.error(doe.getMessage());\r
- } catch (SessionConcurrentAccessException scae) {\r
- throw scae;\r
- } catch (Exception e) {\r
- throw new SessionConcurrentAccessException(ErrorCode.UNKNOWN_ERROR, e.getMessage());\r
- }\r
- return informResponse;\r
- }\r
-\r
- /**\r
- * \r
- * 1. Stop the request timer for this device using the session ID received in the cookie of the\r
- * request 2. Read the session record from the session table into ThreadLocalCache 3. Move the\r
- * session state to PROCESSING and update the OPERATION_ID as null. 4a. Session Manager to check\r
- * if any pending request being notified for the device using in memory cache - Not planned for\r
- * this Drop 4. As an interim solution for drop1, Check the TR069_NBI_REQUEST table if any request\r
- * is pending.\r
- * \r
- * if (anyPendingRequestExists) { 1. Pick the request with the least created_time 2. Create the\r
- * response object and update the sessionID used for this transaction in the response which will\r
- * be used in the cookie of the HTTP response 3. Post the response object into Response Queue 4.\r
- * Change the session state to LOCKED and update the OPERATION_ID in the session table with the\r
- * NBI requests OPERATION_ID. 5. Save the Session record with the values in ThreadLocalCache 6.\r
- * Start the session timer and request timer (available in the request) } else { 1. Move the\r
- * session state to PROCESSING and update the OPERATION_ID as null. 2. Save the Session record\r
- * with the values in ThreadLocalCache. 3. Start the session timer. }\r
- * \r
- * 6. All the above steps to be executed in a single transaction\r
- * \r
- * @param deviceRPCResponse\r
- * @return\r
- * @throws SessionConcurrentAccessException\r
- */\r
- public DeviceRPCRequest processDeviceRPCResponse(DeviceRPCResponse deviceRPCResponse)\r
- throws SessionConcurrentAccessException {\r
- TR069DeviceDetails deviceDetails = (TR069DeviceDetails) deviceRPCResponse.getDeviceDetails();\r
- String deviceId = deviceDetails.getDeviceId();\r
- logger.info("Processing the operation response from device");\r
-\r
- SessionDTO session;\r
- DeviceRPCRequest deviceRPCRequest = null;\r
- try {\r
- session = acquireSessionLock(deviceId, null, false);\r
- String newSessionId = session.getSessionId();\r
- logger.debug("The session id used to process the device RPC response is: {}", newSessionId);\r
-\r
- initThreadLocalCache(deviceDetails, session);\r
- TR069RequestProcessorData tr069RequestProcessorData = getTR069RequestProcessorData();\r
- updateSessionOnDeviceNotification(tr069RequestProcessorData, newSessionId);\r
-\r
- Long operationId = deviceRPCResponse.getOperationId();\r
-\r
- deviceRPCRequest = getNextRPCRequest(deviceId, operationId);\r
-\r
- if (deviceRPCRequest != null) {\r
- OperationDetails opDetails = deviceRPCRequest.getOpDetails();\r
- if (opDetails != null) {\r
- OperationCode opCode = opDetails.getOpCode();\r
- if (opCode instanceof CustomOperationCode) {\r
- CustomOperationCode customOperationCode = (CustomOperationCode) opCode;\r
- String customOperationCodeName = customOperationCode.name();\r
- logger.info(\r
- "The Device RPC request is of custom type, the custom operation to be performed is: {}",\r
- customOperationCodeName);\r
- String jndiName = customOperationCode.getJndiName();\r
- CustomOperationData customOperationData =\r
- new CustomOperationData(deviceDetails, deviceRPCResponse, deviceRPCRequest);\r
- customOperationData = executeCustomOperation(jndiName, customOperationData);\r
-\r
- DeviceRPCRequest operationRequest = customOperationData.getDeviceRPCRequest();\r
- deviceRPCResponse = customOperationData.getDeviceRPCResponse();\r
- if (operationRequest != null) {\r
- operationRequest.addContextParam(SESSION_ID, newSessionId);\r
- updateSessionCurOpId(tr069RequestProcessorData, deviceRPCRequest.getOperationId());\r
- changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);\r
- updateSession(session);\r
- if (deviceRPCResponse != null && operationRequest.getOperationId() != null\r
- && !operationRequest.getOperationId()\r
- .equals(deviceRPCResponse.getOperationId())) {\r
- logger.debug(\r
- "Sending the Device RPC response for a configure Multiple object prior operation");\r
- // Sending the operation response to NBI\r
- tr069EventNotificationService.sendOperationResultToNBI(deviceRPCResponse);\r
- }\r
- return operationRequest;\r
- } else {\r
- logger.debug(PENDING_RPC_CHECK);\r
- deviceRPCRequest =\r
- deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);\r
- }\r
- }\r
- }\r
- }\r
-\r
- if (deviceRPCRequest != null) {\r
- logger.info("A pending Device RPC request exists for the device with operation ID: {}",\r
- deviceRPCRequest.getOperationId());\r
- updateSessionCurOpId(tr069RequestProcessorData, deviceRPCRequest.getOperationId());\r
- changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);\r
-\r
- deviceRPCRequest.addContextParam(SESSION_ID, newSessionId);\r
-\r
- } else {\r
- logger.info(\r
- "No pending Device RPC request exists for the device, hence empty response will be sent to the device");\r
- logger.debug("Updating the session to terminated state");\r
- // To stop the session timer if any running for this device.\r
- stopSessionTimer(newSessionId);\r
- changeSessionState(tr069RequestProcessorData, SessionState.TERMINATED);\r
- }\r
-\r
- // To stop the request timer if any running for this operation.\r
- stopDeviceRPCRequestTimer(deviceId, deviceRPCResponse.getOperationId());\r
-\r
- // Sending the operation response to NBI\r
- logger.debug("Sending the Device RPC Response to the Mapper");\r
- tr069EventNotificationService.sendOperationResultToNBI(deviceRPCResponse);\r
-\r
- updateSession(session);\r
- } catch (DeviceOperationException doe) {\r
- logger.error(doe.getMessage());\r
- } catch (SessionConcurrentAccessException scae) {\r
- throw scae;\r
- }\r
-\r
- return deviceRPCRequest;\r
- }\r
-\r
- /**\r
- * \r
- * 1. Stop the request timer for this device using the session ID received in the cookie of the\r
- * request 2. Read the session record from the session table into ThreadLocalCache 3. Move the\r
- * session state to PROCESSING and update the OPERATION_ID as null. 4a. Session Manager to check\r
- * if any pending request being notified for the device using in memory cache - Not planned for\r
- * this Drop 4. As an interim solution for drop1, Check the TR069_NBI_REQUEST table if any request\r
- * is pending.\r
- * \r
- * if (anyPendingRequestExists) { 1. Pick the request with the least created_time 2. Create the\r
- * response object and update the sessionID used for this transaction in the response which will\r
- * be used in the cookie of the HTTP response 3. Post the response object into Response Queue 4.\r
- * Change the session state to LOCKED and update the OPERATION_ID in the session table with the\r
- * NBI requests OPERATION_ID. 5. Save the Session record with the values in ThreadLocalCache 6.\r
- * Start the session timer and request timer (available in the request) } else { 1. Move the\r
- * session state to PROCESSING and update the OPERATION_ID as null. 2. Save the Session record\r
- * with the values in ThreadLocalCache. 3. Start the session timer. }\r
- * \r
- * 6. All the above steps to be executed in a single transaction\r
- * \r
- * @param deviceDetails\r
- * @return\r
- * @throws SessionConcurrentAccessException\r
- */\r
- public DeviceRPCRequest processEmptyDeviceRequest(TR069DeviceDetails deviceDetails)\r
- throws SessionConcurrentAccessException {\r
- String deviceId = deviceDetails.getDeviceId();\r
- logger.info("Processing the empty request from device");\r
-\r
- SessionDTO session;\r
- DeviceRPCRequest nbiDeviceOperationRequest = null;\r
- try {\r
- session = acquireSessionLock(deviceId, null, false);\r
- String newSessionId = session.getSessionId();\r
- logger.debug("The session id used to process the empty device request is: {}", newSessionId);\r
-\r
- initThreadLocalCache(deviceDetails, session);\r
- TR069RequestProcessorData tr069RequestProcessorData = getTR069RequestProcessorData();\r
- updateSessionOnDeviceNotification(tr069RequestProcessorData, newSessionId);\r
-\r
- logger.debug(PENDING_RPC_CHECK);\r
- nbiDeviceOperationRequest =\r
- deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);\r
- if (nbiDeviceOperationRequest != null) {\r
- Long operationId = nbiDeviceOperationRequest.getOperationId();\r
- OperationDetails opDetails = nbiDeviceOperationRequest.getOpDetails();\r
- if (opDetails != null) {\r
- OperationCode opCode = opDetails.getOpCode();\r
- if (opCode instanceof CustomOperationCode) {\r
- CustomOperationCode customOperationCode = (CustomOperationCode) opCode;\r
- String customOperationCodeName = customOperationCode.name();\r
- logger.info(\r
- "The Device RPC operation request is of custom type, the custom operation to be performed is: {}",\r
- customOperationCodeName);\r
- String jndiName = customOperationCode.getJndiName();\r
- CustomOperationData customOperationData =\r
- new CustomOperationData(deviceDetails, null, nbiDeviceOperationRequest);\r
- customOperationData = executeCustomOperation(jndiName, customOperationData);\r
-\r
- DeviceRPCRequest operationRequest = customOperationData.getDeviceRPCRequest();\r
- if (operationRequest != null) {\r
- operationRequest.addContextParam(SESSION_ID, newSessionId);\r
- updateSessionCurOpId(tr069RequestProcessorData, operationId);\r
- changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);\r
- updateSession(session);\r
- return operationRequest;\r
- } else {\r
- logger.debug(PENDING_RPC_CHECK);\r
- nbiDeviceOperationRequest =\r
- deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);\r
- }\r
- }\r
- }\r
- }\r
-\r
- if (nbiDeviceOperationRequest != null) {\r
- Long operationId = nbiDeviceOperationRequest.getOperationId();\r
- logger.info("A pending Device RPC request exists for the device with operation ID: {}",\r
- operationId);\r
- updateSessionCurOpId(tr069RequestProcessorData, operationId);\r
- changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);\r
-\r
- nbiDeviceOperationRequest.addContextParam(SESSION_ID, newSessionId);\r
-\r
- } else {\r
- logger.info(\r
- "No pending Device RPC request exists for the device, hence empty device response will be sent to the device");\r
- logger.debug("Updating the session to terminated state");\r
- // To stop the session timer if any running for this device.\r
- stopSessionTimer(newSessionId);\r
- changeSessionState(tr069RequestProcessorData, SessionState.TERMINATED);\r
- }\r
-\r
- updateSession(session);\r
- } catch (DeviceOperationException doe) {\r
- logger.error(doe.getMessage());\r
- } catch (SessionConcurrentAccessException scae) {\r
- throw scae;\r
- }\r
-\r
- return nbiDeviceOperationRequest;\r
- }\r
-\r
- /**\r
- * @param sessionId\r
- * @return\r
- * @throws SessionManagerException\r
- */\r
- public DeviceOperationRequestDetails getOpRequestDetailsBySessionId(String sessionId)\r
- throws SessionManagerException {\r
- DeviceOperationRequestDetails deviceOperationRequestDetails =\r
- new DeviceOperationRequestDetails();\r
-\r
- logger.debug("Fetching Operation request details for session: {}", sessionId);\r
- SessionDTO session = getSessionBySessionId(sessionId);\r
- String deviceId = session.getDeviceId();\r
- TR069DeviceDetails deviceDetails = null;\r
- try {\r
- deviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);\r
- if (session.getCurrentOperationId() == null) {\r
- logger.debug("There exists no pending operation request for the session: {}", sessionId);\r
- } else {\r
- logger.debug("There exists pending operation request for the session: {}", sessionId);\r
- List<TR069DeviceRPCRequestEntity> tr069DeviceRPCRequestEntityList =\r
- deviceRPCRequestRepositoryHelper.findByDeviceIdAndOperationId(deviceId,\r
- session.getCurrentOperationId());\r
- if (tr069DeviceRPCRequestEntityList == null) {\r
- SessionManagerException ex =\r
- new SessionManagerException(ErrorCode.SESSION_EXPIRED, sessionId);\r
- logger.error(ex.getMessage());\r
- throw ex;\r
- }\r
- DeviceRPCRequest deviceRPCRequest =\r
- TR069RequestProcessorUtility.convertToDTO(tr069DeviceRPCRequestEntityList);\r
- OperationCode opCode = deviceRPCRequest.getOpDetails().getOpCode();\r
-\r
- String operationName = null;\r
-\r
- if (opCode instanceof TR069OperationCode) {\r
- operationName = ((TR069OperationCode) opCode).name();\r
- } else {\r
- operationName = ((CustomOperationCode) opCode).name();\r
- TR069OperationDetails tr069OperationDetails =\r
- (TR069OperationDetails) deviceRPCRequest.getOpDetails();\r
- opCode = getCustomOperationCode(tr069OperationDetails);\r
- }\r
- logger.info("The pending operation request for the session is of operation {}",\r
- operationName);\r
- deviceOperationRequestDetails.setOpCode(opCode);\r
- deviceOperationRequestDetails.setOperationId(deviceRPCRequest.getOperationId());\r
- }\r
- deviceOperationRequestDetails.setDeviceDetails(deviceDetails);\r
- } catch (DeviceOperationException e) {\r
- SessionManagerException ex =\r
- new SessionManagerException(ErrorCode.DEVICE_NOT_EXISTS, deviceId);\r
- logger.error(ex.getMessage());\r
- throw ex;\r
- } catch (Exception e) {\r
- logger.error(e.getMessage());\r
- SessionManagerException ex =\r
- new SessionManagerException(ErrorCode.SESSION_EXPIRED, sessionId);\r
- logger.error(ex.getMessage());\r
- throw ex;\r
- }\r
- return deviceOperationRequestDetails;\r
- }\r
-\r
- /**\r
- * @param deviceId\r
- * @return\r
- * @throws DeviceOperationException\r
- */\r
- public TR069DeviceDetails getDeviceDetails(String deviceId) throws DeviceOperationException {\r
- return deviceOperationInterface.getDeviceDetails(deviceId);\r
- }\r
-\r
- /**\r
- * @param deviceId\r
- * @param operationId\r
- * @return\r
- * @throws DeviceOperationException\r
- * @throws SessionConcurrentAccessException\r
- * @throws InterruptedException\r
- */\r
- private SessionDTO acquireSessionLockWithRetryOnFailure(String deviceId, Long operationId)\r
- throws DeviceOperationException, SessionConcurrentAccessException, InterruptedException {\r
- int sessionLockAcquireRetryCount = 0;\r
- SessionDTO sessionDTO = null;\r
- do {\r
- try {\r
- sessionDTO = acquireSessionLock(deviceId, null, false);\r
- logger.info(\r
- "Successfully acquired the session lock for processing NBI request with operation ID: {}",\r
- operationId);\r
- break;\r
- } catch (SessionConcurrentAccessException ex) {\r
- sessionLockAcquireRetryCount++;\r
- if (sessionLockAcquireRetryCount == 3) {\r
- logger.error("Failed acquiring the lock after retry, rolling back the transaction");\r
- throw ex;\r
- }\r
- logger.warn(\r
- "Session lock acquiring failed with SessionConcurrentAccessException, hence retrying");\r
- Thread.sleep(1000L);\r
- }\r
- } while (sessionLockAcquireRetryCount < 3);\r
-\r
- return sessionDTO;\r
- }\r
-\r
- /**\r
- * @param deviceId\r
- * @param deviceNotification\r
- * @return\r
- * @throws DeviceOperationException\r
- */\r
- private TR069DeviceDetails getPersistedDeviceDetails(String deviceId,\r
- DeviceInform deviceNotification) throws DeviceOperationException {\r
- TR069DeviceDetails deviceDetails = null;\r
- try {\r
- deviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);\r
- } catch (DeviceOperationException doe) {\r
- if (ErrorCode.DEVICE_NOT_EXISTS.equals(doe.getErrorCode())) {\r
- logger.info(\r
- "Creating the device record in TR069_DEVICE_ table, as the device is authenticated successfully.");\r
- createDevice(deviceNotification.getDeviceDetails());\r
- deviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);\r
- }\r
- }\r
-\r
- return deviceDetails;\r
- }\r
-\r
- /**\r
- * @param deviceDetails\r
- * @param notificationType\r
- * @throws TR069EventProcessingException\r
- */\r
- private void sendAbortedOperationResultForPendingRPCRequests(DeviceDetails deviceDetails,\r
- TR069InformType notificationType) throws TR069EventProcessingException {\r
- String deviceId = deviceDetails.getDeviceId();\r
- String notificationName = notificationType.name();\r
- logger.debug(\r
- "Device Inform event received is {}, hence aborting all the pending operations if any exists",\r
- notificationName);\r
-\r
- List<DeviceRPCRequest> deviceRPCRequestList =\r
- deviceRPCRequestRepositoryHelper.findAllDeviceRPCRequests(deviceId);\r
-\r
- for (DeviceRPCRequest pendingDeviceRPCRequest : deviceRPCRequestList) {\r
- DeviceRPCResponse deviceOpResult =\r
- tr069RequestProcessEngineUtility.buildAbortedOperationresult(deviceDetails,\r
- pendingDeviceRPCRequest, AcsFaultCode.FAULT_CODE_8002);\r
- String operationName = null;\r
- if (pendingDeviceRPCRequest.getOpDetails().getOpCode() instanceof CustomOperationCode) {\r
- CustomOperationCode operationCode =\r
- (CustomOperationCode) pendingDeviceRPCRequest.getOpDetails().getOpCode();\r
- operationName = operationCode.name();\r
- } else {\r
- TR069OperationCode operationCode =\r
- (TR069OperationCode) pendingDeviceRPCRequest.getOpDetails().getOpCode();\r
- operationName = operationCode.name();\r
- }\r
- Long operationId = pendingDeviceRPCRequest.getOperationId();\r
- logger.debug("Aborting the NBI Operation request with operation Id : {} for operation: {}",\r
- operationId, operationName);\r
- tr069EventNotificationService.sendOperationResultToNBI(deviceOpResult);\r
- // Marking the NBI Request as processed\r
- deviceRPCRequestRepositoryHelper.markDeviceRPCRequestAsProcessed(deviceId, operationId);\r
- stopDeviceRPCRequestTimer(deviceId, operationId);\r
- }\r
- }\r
-\r
- /**\r
- * @param tr069RequestProcessorData\r
- * @param deviceNotification\r
- */\r
- private void updateDeviceDetailsFromInform(TR069RequestProcessorData tr069RequestProcessorData,\r
- DeviceInform deviceNotification) {\r
- Boolean isDeviceDataChanged =\r
- isDeviceUpdateExists(tr069RequestProcessorData, deviceNotification);\r
- if (isDeviceDataChanged.booleanValue()) {\r
- updateDeviceDetails(tr069RequestProcessorData, deviceNotification);\r
- try {\r
- logger.info(\r
- "The device data like connection request URL/ Device SW/HW version has changed, hence updating the device details");\r
- deviceOperationInterface\r
- .updateDeviceDetails(tr069RequestProcessorData.getTr069DeviceDetails());\r
- } catch (DeviceOperationException e) {\r
- logger.error("Updating the device details with the notification details failed, Reason: {}",\r
- e.getMessage());\r
- logger.error(e.getMessage());\r
- }\r
- }\r
- }\r
-\r
- /**\r
- * @param deviceId\r
- * @param operationId\r
- * @return\r
- */\r
- private DeviceRPCRequest getNextRPCRequest(String deviceId, Long operationId) {\r
- DeviceRPCRequest deviceRPCRequest = null;\r
- try {\r
- List<TR069DeviceRPCRequestEntity> entityList =\r
- deviceRPCRequestRepositoryHelper.findByDeviceIdAndOperationId(deviceId, operationId);\r
- Integer operationCode = entityList.get(0).getOpCode();\r
- if (CustomOperationCode.getByOperationCode(operationCode) == null) {\r
- logger.info("Marking the Device RPC request with operation id - {} as processed.",\r
- operationId);\r
- deviceRPCRequestRepositoryHelper.markDeviceRPCRequestAsProcessed(deviceId, operationId);\r
- }\r
- logger.debug(PENDING_RPC_CHECK);\r
- deviceRPCRequest = deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);\r
- } catch (TR069EventProcessingException e) {\r
- logger.error("An unknown exception occurred while fetching the NBI request, Reason: {}",\r
- e.getMessage());\r
- }\r
-\r
- return deviceRPCRequest;\r
- }\r
-\r
- /**\r
- * Creates the device in the DM module if factory imported already\r
- * \r
- * @param deviceDetails\r
- * @throws DeviceOperationException\r
- */\r
- private void createDevice(DeviceDetails deviceDetails) throws DeviceOperationException {\r
- deviceOperationInterface.updateDeviceDetails(deviceDetails);\r
- }\r
-}\r
+/*
+ * ============LICENSE_START========================================================================
+ * ONAP : tr-069-adapter
+ * =================================================================================================
+ * Copyright (C) 2020 CommScope Inc Intellectual Property.
+ * =================================================================================================
+ * This tr-069-adapter software file is distributed by CommScope Inc under the Apache License,
+ * Version 2.0 (the "License"); you may not use this file except in compliance with the License. You
+ * may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
+ * either express or implied. See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ===============LICENSE_END=======================================================================
+ */
+
+package org.commscope.tr069adapter.acs.requestprocessor.impl;
+
+import static org.commscope.tr069adapter.acs.common.utils.AcsConstants.SESSION_ID;
+
+import java.util.Date;
+import java.util.List;
+
+import org.commscope.tr069adapter.acs.common.DeviceDetails;
+import org.commscope.tr069adapter.acs.common.DeviceInform;
+import org.commscope.tr069adapter.acs.common.DeviceRPCRequest;
+import org.commscope.tr069adapter.acs.common.DeviceRPCResponse;
+import org.commscope.tr069adapter.acs.common.OperationCode;
+import org.commscope.tr069adapter.acs.common.OperationDetails;
+import org.commscope.tr069adapter.acs.common.OperationResponse;
+import org.commscope.tr069adapter.acs.common.dto.CustomOperationCode;
+import org.commscope.tr069adapter.acs.common.dto.DeviceOperationRequestDetails;
+import org.commscope.tr069adapter.acs.common.dto.TR069DeviceDetails;
+import org.commscope.tr069adapter.acs.common.dto.TR069InformType;
+import org.commscope.tr069adapter.acs.common.dto.TR069OperationCode;
+import org.commscope.tr069adapter.acs.common.dto.TR069OperationDetails;
+import org.commscope.tr069adapter.acs.common.exception.DeviceOperationException;
+import org.commscope.tr069adapter.acs.common.exception.SessionConcurrentAccessException;
+import org.commscope.tr069adapter.acs.common.exception.SessionManagerException;
+import org.commscope.tr069adapter.acs.common.exception.TR069EventProcessingException;
+import org.commscope.tr069adapter.acs.common.faults.AcsFaultCode;
+import org.commscope.tr069adapter.acs.common.inform.BootInform;
+import org.commscope.tr069adapter.acs.common.inform.BootstrapInform;
+import org.commscope.tr069adapter.acs.common.response.DeviceInformResponse;
+import org.commscope.tr069adapter.acs.common.utils.ErrorCode;
+import org.commscope.tr069adapter.acs.requestprocessor.DeviceOperationInterface;
+import org.commscope.tr069adapter.acs.requestprocessor.dao.DeviceRPCRequestRepositoryHelper;
+import org.commscope.tr069adapter.acs.requestprocessor.dao.DeviceRepository;
+import org.commscope.tr069adapter.acs.requestprocessor.dto.CustomOperationData;
+import org.commscope.tr069adapter.acs.requestprocessor.dto.SessionDTO;
+import org.commscope.tr069adapter.acs.requestprocessor.dto.SessionState;
+import org.commscope.tr069adapter.acs.requestprocessor.dto.TR069RequestProcessorData;
+import org.commscope.tr069adapter.acs.requestprocessor.entity.TR069DeviceEntity;
+import org.commscope.tr069adapter.acs.requestprocessor.entity.TR069DeviceRPCRequestEntity;
+import org.commscope.tr069adapter.acs.requestprocessor.helper.TR069RequestProcessEngineHelper;
+import org.commscope.tr069adapter.acs.requestprocessor.util.TR069RequestProcessorUtility;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TR069RequestProcessEngine extends TR069RequestProcessEngineHelper {
+
+ private static final String PENDING_RPC_CHECK =
+ "Checking if any pending Device RPC requests exists for the device";
+
+ private static final Logger logger = LoggerFactory.getLogger(TR069RequestProcessEngine.class);
+
+ @Autowired
+ TR069EventNotificationService tr069EventNotificationService;
+
+ @Autowired
+ DeviceOperationInterface deviceOperationInterface;
+
+ @Autowired
+ protected DeviceRPCRequestRepositoryHelper deviceRPCRequestRepositoryHelper;
+
+ @Autowired
+ private DeviceRepository deviceRepository;
+
+ /**
+ * @param deviceRPCRequest
+ * @throws TR069EventProcessingException
+ * @throws SessionConcurrentAccessException
+ */
+ public void processDeviceRPCRequest(DeviceRPCRequest deviceRPCRequest)
+ throws TR069EventProcessingException, SessionConcurrentAccessException {
+
+ DeviceRPCResponse deviceRPCResponse = null;
+ String deviceId = null;
+
+ try {
+ if (deviceRPCRequest == null) {
+ TR069EventProcessingException ex =
+ new TR069EventProcessingException(ErrorCode.INVALID_NBI_REQUEST);
+ logger.error(ex.getMessage());
+ throw ex;
+ }
+
+ Long operationId = deviceRPCRequest.getOperationId();
+ logger.info("A Mapper request is received with operationID: {}", operationId);
+ TR069DeviceDetails tr069DeviceDetails = null;
+ deviceId = deviceRPCRequest.getDeviceDetails().getDeviceId();
+ try {
+ tr069DeviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);
+ deviceRPCResponse = checkForDeviceAvailabilityRequest(deviceRPCRequest, tr069DeviceDetails);
+ if (null != deviceRPCResponse) {
+ return;
+ }
+ } catch (DeviceOperationException deo) {
+ logger.error(deo.getMessage());
+ deviceRPCResponse = tr069RequestProcessEngineUtility.buildAbortedOperationresult(
+ tr069DeviceDetails, deviceRPCRequest, AcsFaultCode.FAULT_CODE_8000);
+ return;
+ }
+
+ try {
+ SessionDTO sessionDTO = acquireSessionLockWithRetryOnFailure(deviceId, operationId);
+
+ logger.debug("Persisting the Device RPC request, with operation ID: {}", operationId);
+ List<TR069DeviceRPCRequestEntity> tr069DeviceRPCRequestEntities =
+ TR069RequestProcessorUtility.convertToEntity(deviceRPCRequest);
+ deviceRPCRequestRepositoryHelper.saveAll(tr069DeviceRPCRequestEntities);
+ logger.info("Successfully persisted the Device RPC request");
+
+ if (sessionDTO != null) {
+ if (SessionState.TERMINATED.equals(sessionDTO.getSessionState())) {
+ logger.debug("No active session exists, hence requesting for Connection request");
+ requestForConnectionRequestInform(tr069DeviceDetails);
+
+ // Start Request Timer
+ startDeviceRPCRequestTimer(deviceId, deviceRPCRequest.getOperationId(),
+ deviceRPCRequest.getOptions().getExecutionTimeout());
+ } else {
+ logger.debug(
+ "Session is in processing state, Will be notified to session manager to pick the request on session availability");
+ }
+ } else {
+ logger.warn(
+ "The device is not activated yet, hence the NBI Operation result cannot be processed!");
+ deviceRPCResponse = tr069RequestProcessEngineUtility.buildAbortedOperationresult(
+ tr069DeviceDetails, deviceRPCRequest, AcsFaultCode.FAULT_CODE_8001);
+ }
+ } catch (SessionConcurrentAccessException scae) {
+ throw scae;
+ } catch (Exception e) {
+ logger.error("An unknown exception occurred while processing the NBI request, Reason: {}",
+ e.getMessage());
+ deviceRPCResponse = tr069RequestProcessEngineUtility.buildAbortedOperationresult(
+ tr069DeviceDetails, deviceRPCRequest, AcsFaultCode.FAULT_CODE_8004);
+ }
+ } finally {
+ if (deviceRPCResponse != null) {
+ logger.debug("Sending failed operation result for this NBI request");
+ tr069EventNotificationService.sendOperationResultToNBI(deviceRPCResponse);
+ // Marking the NBI Request as processed
+ deviceRPCRequestRepositoryHelper.markDeviceRPCRequestAsProcessed(deviceId,
+ deviceRPCRequest.getOperationId());
+ }
+ }
+ }
+
+ private DeviceRPCResponse checkForDeviceAvailabilityRequest(DeviceRPCRequest deviceRPCRequest,
+ TR069DeviceDetails tr069DeviceDetails) {
+ DeviceRPCResponse deviceRPCResponse = null;
+
+ if (!deviceRPCRequest.getOpDetails().getOpCode().equals(CustomOperationCode.CONNECT)) {
+ return deviceRPCResponse;
+ }
+
+ SessionDTO sessionDTO = getSession(tr069DeviceDetails.getDeviceId());
+ tr069DeviceDetails.setCrRetryCount(1);
+ if (null != sessionDTO && !SessionState.TERMINATED.equals(sessionDTO.getSessionState())) {
+ logger.debug("Device is reachable as device tr069 session is in {} state.",
+ sessionDTO.getSessionState());
+
+ deviceRPCResponse = new DeviceRPCResponse();
+ deviceRPCResponse.setDeviceDetails(tr069DeviceDetails);
+ deviceRPCResponse.setOperationId(deviceRPCRequest.getOperationId());
+
+ OperationResponse operationResponse = new OperationResponse();
+ // device reachable...change value 1 to some constant or enum
+ operationResponse.setStatus(TR069RequestProcessorUtility.DEVICE_REACHABLE_STATUS_CODE);
+ operationResponse.setOperationCode(deviceRPCRequest.getOpDetails().getOpCode());
+
+ deviceRPCResponse.setOperationResponse(operationResponse);
+ }
+
+ return deviceRPCResponse;
+ }
+
+ /**
+ * Common Step 1. Since there can exist only one Inform from any device, which will be the
+ * initiator of the session, following steps to be followed a. Stop the session timer for this
+ * device if any running. b. Get the lock by reading the session record for this device from DB
+ * and load the session object in Thread Local Cache c. Create a new session id from
+ * SessionManager, and update the session object with new Session id and change the state to
+ * PROCESSING. 2. Read the Device record from the DB, and load the device DTO also in the
+ * ThreadLocalCache
+ *
+ * Common Notification Specific Step 1. Take the connection request URL from the
+ * deviceNotification object 2. Update the Device DTO object with connection request URL,
+ * swVersion, hwVersion if there is a difference. Update the last updated time if updated 3. Send
+ * this Inform to the NBI, by calling the ProcessDeviceInform method on TR069NBIService module. 4.
+ * Create the Inform response Object and update the sessionID used for this transaction in the
+ * response which will be used in the cookie of the HTTP response 5. Post the message into
+ * Response Queue 6. Change the session state to LOCKED 7. Save the Session and device records
+ * with the values in ThreadLocalCache 8. Start the session timer and default request timer (As
+ * configured for all the requests from device) 8. All the above steps to be executed in a single
+ * transaction.
+ *
+ * @param deviceNotification
+ * @return
+ * @throws SessionConcurrentAccessException
+ */
+ public DeviceInformResponse processDeviceInform(DeviceInform deviceNotification)
+ throws SessionConcurrentAccessException {
+
+ String deviceId = deviceNotification.getDeviceDetails().getDeviceId();
+ TR069InformType notificationType = (TR069InformType) deviceNotification.getInformType();
+ logger.info("Processing the Device Inform Event: '{}'", notificationType.getNotificationCode());
+ String newSessionId = null;
+ SessionDTO session = null;
+ TR069DeviceDetails deviceDetails = null;
+ DeviceInformResponse informResponse = null;
+
+ try {
+
+ SessionDTO sessionDTO = getSession(deviceId);
+ if (sessionDTO != null && !SessionState.TERMINATED.equals(sessionDTO.getSessionState())) {
+ String sessionId = sessionDTO.getSessionId();
+ logger.debug(
+ "The session with session id {} is not terminated, hence stopping the associated timer",
+ sessionId);
+ stopSessionTimer(sessionDTO.getSessionId());
+ }
+
+ // To stop the request timer if any running for this device, and send failed operation
+ // result for any such pending cases. Requests pending in DB should not be cleared
+
+ /*
+ * Read any pending records in TR069_NBI_REQUEST table for this device. Send abort operation
+ * result for all the pending requests Delete the records from the TR069_NBI_Request table for
+ * this device.
+ */
+ if (deviceNotification instanceof BootstrapInform
+ || deviceNotification instanceof BootInform) {
+ sendAbortedOperationResultForPendingRPCRequests(deviceNotification.getDeviceDetails(),
+ notificationType);
+ }
+
+ session = acquireSessionLock(deviceId, notificationType, true);
+ deviceDetails = getPersistedDeviceDetails(deviceId, deviceNotification);
+
+ newSessionId = session.getSessionId();
+ logger.debug("The session id generated to process the device notification request is: {} ",
+ newSessionId);
+
+ initThreadLocalCache(deviceDetails, session);
+ TR069RequestProcessorData tr069RequestProcessorData = getTR069RequestProcessorData();
+ updateSessionOnDeviceNotification(tr069RequestProcessorData, newSessionId);
+
+ logger.debug("Sending the Device Inform Event to the Mapper");
+ tr069EventNotificationService.sendDeviceInformToNBI(deviceNotification);
+
+ updateDeviceDetailsFromInform(tr069RequestProcessorData, deviceNotification);
+
+ logger.debug("Updating the session for the device with newly generated session id");
+ changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);
+ updateSession(session);
+
+ // Start session timer - Get a default timeout for session.
+ startSessionTimer(session.getSessionId());
+
+ informResponse =
+ new DeviceInformResponse(newSessionId, deviceNotification.getDeviceDetails());
+ } catch (
+
+ DeviceOperationException doe) {
+ logger.error(doe.getMessage());
+ } catch (SessionConcurrentAccessException scae) {
+ throw scae;
+ } catch (Exception e) {
+ throw new SessionConcurrentAccessException(ErrorCode.UNKNOWN_ERROR, e.getMessage());
+ }
+ return informResponse;
+ }
+
+ /**
+ *
+ * 1. Stop the request timer for this device using the session ID received in the cookie of the
+ * request 2. Read the session record from the session table into ThreadLocalCache 3. Move the
+ * session state to PROCESSING and update the OPERATION_ID as null. 4a. Session Manager to check
+ * if any pending request being notified for the device using in memory cache - Not planned for
+ * this Drop 4. As an interim solution for drop1, Check the TR069_NBI_REQUEST table if any request
+ * is pending.
+ *
+ * if (anyPendingRequestExists) { 1. Pick the request with the least created_time 2. Create the
+ * response object and update the sessionID used for this transaction in the response which will
+ * be used in the cookie of the HTTP response 3. Post the response object into Response Queue 4.
+ * Change the session state to LOCKED and update the OPERATION_ID in the session table with the
+ * NBI requests OPERATION_ID. 5. Save the Session record with the values in ThreadLocalCache 6.
+ * Start the session timer and request timer (available in the request) } else { 1. Move the
+ * session state to PROCESSING and update the OPERATION_ID as null. 2. Save the Session record
+ * with the values in ThreadLocalCache. 3. Start the session timer. }
+ *
+ * 6. All the above steps to be executed in a single transaction
+ *
+ * @param deviceRPCResponse
+ * @return
+ * @throws SessionConcurrentAccessException
+ */
+ public DeviceRPCRequest processDeviceRPCResponse(DeviceRPCResponse deviceRPCResponse)
+ throws SessionConcurrentAccessException {
+ TR069DeviceDetails deviceDetails = (TR069DeviceDetails) deviceRPCResponse.getDeviceDetails();
+ String deviceId = deviceDetails.getDeviceId();
+ logger.info("Processing the operation response from device");
+
+ SessionDTO session;
+ DeviceRPCRequest deviceRPCRequest = null;
+ try {
+ session = acquireSessionLock(deviceId, null, false);
+ String newSessionId = session.getSessionId();
+ logger.debug("The session id used to process the device RPC response is: {}", newSessionId);
+
+ initThreadLocalCache(deviceDetails, session);
+ TR069RequestProcessorData tr069RequestProcessorData = getTR069RequestProcessorData();
+ updateSessionOnDeviceNotification(tr069RequestProcessorData, newSessionId);
+
+ Long operationId = deviceRPCResponse.getOperationId();
+
+ deviceRPCRequest = getNextRPCRequest(deviceId, operationId);
+
+ if (deviceRPCRequest != null) {
+ OperationDetails opDetails = deviceRPCRequest.getOpDetails();
+ if (opDetails != null) {
+ OperationCode opCode = opDetails.getOpCode();
+ if (opCode instanceof CustomOperationCode) {
+ CustomOperationCode customOperationCode = (CustomOperationCode) opCode;
+ String customOperationCodeName = customOperationCode.name();
+ logger.info(
+ "The Device RPC request is of custom type, the custom operation to be performed is: {}",
+ customOperationCodeName);
+ String jndiName = customOperationCode.getJndiName();
+ CustomOperationData customOperationData =
+ new CustomOperationData(deviceDetails, deviceRPCResponse, deviceRPCRequest);
+ customOperationData = executeCustomOperation(jndiName, customOperationData);
+
+ DeviceRPCRequest operationRequest = customOperationData.getDeviceRPCRequest();
+ deviceRPCResponse = customOperationData.getDeviceRPCResponse();
+ if (operationRequest != null) {
+ return handleOperationRequest(deviceRPCResponse, session, deviceRPCRequest,
+ newSessionId, tr069RequestProcessorData, operationRequest);
+ } else {
+ logger.debug(PENDING_RPC_CHECK);
+ deviceRPCRequest =
+ deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);
+ }
+ }
+ }
+ }
+
+ if (deviceRPCRequest != null) {
+ logger.info("A pending Device RPC request exists for the device with operation ID: {}",
+ deviceRPCRequest.getOperationId());
+ updateSessionCurOpId(tr069RequestProcessorData, deviceRPCRequest.getOperationId());
+ changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);
+
+ deviceRPCRequest.addContextParam(SESSION_ID, newSessionId);
+
+ } else {
+ logger.info(
+ "No pending Device RPC request exists for the device, hence empty response will be sent to the device");
+ logger.debug("Updating the session to terminated state");
+ // To stop the session timer if any running for this device.
+ stopSessionTimer(newSessionId);
+ changeSessionState(tr069RequestProcessorData, SessionState.TERMINATED);
+ }
+
+ // To stop the request timer if any running for this operation.
+ stopDeviceRPCRequestTimer(deviceId, deviceRPCResponse.getOperationId());
+
+ // Sending the operation response to NBI
+ logger.debug("Sending the Device RPC Response to the Mapper");
+ tr069EventNotificationService.sendOperationResultToNBI(deviceRPCResponse);
+
+ updateSession(session);
+ } catch (DeviceOperationException doe) {
+ logger.error(doe.getMessage());
+ } catch (SessionConcurrentAccessException scae) {
+ throw scae;
+ }
+
+ return deviceRPCRequest;
+ }
+
+ private DeviceRPCRequest handleOperationRequest(DeviceRPCResponse deviceRPCResponse,
+ SessionDTO session, DeviceRPCRequest deviceRPCRequest, String newSessionId,
+ TR069RequestProcessorData tr069RequestProcessorData, DeviceRPCRequest operationRequest) {
+ operationRequest.addContextParam(SESSION_ID, newSessionId);
+ updateSessionCurOpId(tr069RequestProcessorData, deviceRPCRequest.getOperationId());
+ changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);
+ updateSession(session);
+ if (deviceRPCResponse != null && operationRequest.getOperationId() != null
+ && !operationRequest.getOperationId().equals(deviceRPCResponse.getOperationId())) {
+ logger
+ .debug("Sending the Device RPC response for a configure Multiple object prior operation");
+ // Sending the operation response to NBI
+ tr069EventNotificationService.sendOperationResultToNBI(deviceRPCResponse);
+ }
+ return operationRequest;
+ }
+
+ /**
+ *
+ * 1. Stop the request timer for this device using the session ID received in the cookie of the
+ * request 2. Read the session record from the session table into ThreadLocalCache 3. Move the
+ * session state to PROCESSING and update the OPERATION_ID as null. 4a. Session Manager to check
+ * if any pending request being notified for the device using in memory cache - Not planned for
+ * this Drop 4. As an interim solution for drop1, Check the TR069_NBI_REQUEST table if any request
+ * is pending.
+ *
+ * if (anyPendingRequestExists) { 1. Pick the request with the least created_time 2. Create the
+ * response object and update the sessionID used for this transaction in the response which will
+ * be used in the cookie of the HTTP response 3. Post the response object into Response Queue 4.
+ * Change the session state to LOCKED and update the OPERATION_ID in the session table with the
+ * NBI requests OPERATION_ID. 5. Save the Session record with the values in ThreadLocalCache 6.
+ * Start the session timer and request timer (available in the request) } else { 1. Move the
+ * session state to PROCESSING and update the OPERATION_ID as null. 2. Save the Session record
+ * with the values in ThreadLocalCache. 3. Start the session timer. }
+ *
+ * 6. All the above steps to be executed in a single transaction
+ *
+ * @param deviceDetails
+ * @return
+ * @throws SessionConcurrentAccessException
+ */
+ public DeviceRPCRequest processEmptyDeviceRequest(TR069DeviceDetails deviceDetails)
+ throws SessionConcurrentAccessException {
+ String deviceId = deviceDetails.getDeviceId();
+ logger.info("Processing the empty request from device");
+
+ SessionDTO session;
+ DeviceRPCRequest nbiDeviceOperationRequest = null;
+ try {
+ session = acquireSessionLock(deviceId, null, false);
+ String newSessionId = session.getSessionId();
+ logger.debug("The session id used to process the empty device request is: {}", newSessionId);
+
+ initThreadLocalCache(deviceDetails, session);
+ TR069RequestProcessorData tr069RequestProcessorData = getTR069RequestProcessorData();
+ updateSessionOnDeviceNotification(tr069RequestProcessorData, newSessionId);
+
+ logger.debug(PENDING_RPC_CHECK);
+ nbiDeviceOperationRequest =
+ deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);
+ if (nbiDeviceOperationRequest != null) {
+ Long operationId = nbiDeviceOperationRequest.getOperationId();
+ OperationDetails opDetails = nbiDeviceOperationRequest.getOpDetails();
+ if (opDetails != null) {
+ OperationCode opCode = opDetails.getOpCode();
+ if (opCode instanceof CustomOperationCode) {
+ CustomOperationCode customOperationCode = (CustomOperationCode) opCode;
+ String customOperationCodeName = customOperationCode.name();
+ logger.info(
+ "The Device RPC operation request is of custom type, the custom operation to be performed is: {}",
+ customOperationCodeName);
+ String jndiName = customOperationCode.getJndiName();
+ CustomOperationData customOperationData =
+ new CustomOperationData(deviceDetails, null, nbiDeviceOperationRequest);
+ customOperationData = executeCustomOperation(jndiName, customOperationData);
+
+ DeviceRPCRequest operationRequest = customOperationData.getDeviceRPCRequest();
+ if (operationRequest != null) {
+ operationRequest.addContextParam(SESSION_ID, newSessionId);
+ updateSessionCurOpId(tr069RequestProcessorData, operationId);
+ changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);
+ updateSession(session);
+ return operationRequest;
+ } else {
+ logger.debug(PENDING_RPC_CHECK);
+ nbiDeviceOperationRequest =
+ deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);
+ }
+ }
+ }
+ }
+
+ if (nbiDeviceOperationRequest != null) {
+ Long operationId = nbiDeviceOperationRequest.getOperationId();
+ logger.info("A pending Device RPC request exists for the device with operation ID: {}",
+ operationId);
+ updateSessionCurOpId(tr069RequestProcessorData, operationId);
+ changeSessionState(tr069RequestProcessorData, SessionState.LOCKED);
+
+ nbiDeviceOperationRequest.addContextParam(SESSION_ID, newSessionId);
+
+ } else {
+ logger.info(
+ "No pending Device RPC request exists for the device, hence empty device response will be sent to the device");
+ logger.debug("Updating the session to terminated state");
+ // To stop the session timer if any running for this device.
+ stopSessionTimer(newSessionId);
+ changeSessionState(tr069RequestProcessorData, SessionState.TERMINATED);
+ }
+
+ updateSession(session);
+ } catch (DeviceOperationException doe) {
+ logger.error(doe.getMessage());
+ } catch (SessionConcurrentAccessException scae) {
+ throw scae;
+ }
+
+ return nbiDeviceOperationRequest;
+ }
+
+ /**
+ * @param sessionId
+ * @return
+ * @throws SessionManagerException
+ */
+ public DeviceOperationRequestDetails getOpRequestDetailsBySessionId(String sessionId)
+ throws SessionManagerException {
+ DeviceOperationRequestDetails deviceOperationRequestDetails =
+ new DeviceOperationRequestDetails();
+
+ logger.debug("Fetching Operation request details for session: {}", sessionId);
+ SessionDTO session = getSessionBySessionId(sessionId);
+ String deviceId = session.getDeviceId();
+ TR069DeviceDetails deviceDetails = null;
+ try {
+ deviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);
+ if (session.getCurrentOperationId() == null) {
+ logger.debug("There exists no pending operation request for the session: {}", sessionId);
+ } else {
+ sessionId = sessionId.replaceAll("[\n|\r|\t]", "_");
+ logger.debug("There exists pending operation request for the session: {}", sessionId);
+ List<TR069DeviceRPCRequestEntity> tr069DeviceRPCRequestEntityList =
+ deviceRPCRequestRepositoryHelper.findByDeviceIdAndOperationId(deviceId,
+ session.getCurrentOperationId());
+ if (tr069DeviceRPCRequestEntityList == null) {
+ SessionManagerException ex =
+ new SessionManagerException(ErrorCode.SESSION_EXPIRED, sessionId);
+ logger.error(ex.getMessage());
+ throw ex;
+ }
+ DeviceRPCRequest deviceRPCRequest =
+ TR069RequestProcessorUtility.convertToDTO(tr069DeviceRPCRequestEntityList);
+ OperationCode opCode = deviceRPCRequest.getOpDetails().getOpCode();
+
+ String operationName = null;
+
+ if (opCode instanceof TR069OperationCode) {
+ operationName = ((TR069OperationCode) opCode).name();
+ } else {
+ operationName = ((CustomOperationCode) opCode).name();
+ TR069OperationDetails tr069OperationDetails =
+ (TR069OperationDetails) deviceRPCRequest.getOpDetails();
+ opCode = getCustomOperationCode(tr069OperationDetails);
+ }
+ logger.info("The pending operation request for the session is of operation {}",
+ operationName);
+ deviceOperationRequestDetails.setOpCode(opCode);
+ deviceOperationRequestDetails.setOperationId(deviceRPCRequest.getOperationId());
+ }
+ deviceOperationRequestDetails.setDeviceDetails(deviceDetails);
+ } catch (DeviceOperationException e) {
+ SessionManagerException ex =
+ new SessionManagerException(ErrorCode.DEVICE_NOT_EXISTS, deviceId);
+ String exceptionMessage = ex.getMessage().replaceAll("[\n|\r|\t]", "_");
+ logger.error(exceptionMessage);
+ throw ex;
+ } catch (Exception e) {
+ logger.error(e.getMessage());
+ SessionManagerException ex =
+ new SessionManagerException(ErrorCode.SESSION_EXPIRED, sessionId);
+ logger.error(ex.getMessage());
+ throw ex;
+ }
+ return deviceOperationRequestDetails;
+ }
+
+ /**
+ * @param deviceId
+ * @return
+ * @throws DeviceOperationException
+ */
+ public TR069DeviceDetails getDeviceDetails(String deviceId) throws DeviceOperationException {
+ return deviceOperationInterface.getDeviceDetails(deviceId);
+ }
+
+ /**
+ * @param deviceId
+ * @param operationId
+ * @return
+ * @throws DeviceOperationException
+ * @throws SessionConcurrentAccessException
+ * @throws InterruptedException
+ */
+ private SessionDTO acquireSessionLockWithRetryOnFailure(String deviceId, Long operationId)
+ throws DeviceOperationException, SessionConcurrentAccessException, InterruptedException {
+ int sessionLockAcquireRetryCount = 0;
+ SessionDTO sessionDTO = null;
+ do {
+ try {
+ sessionDTO = acquireSessionLock(deviceId, null, false);
+ logger.info(
+ "Successfully acquired the session lock for processing NBI request with operation ID: {}",
+ operationId);
+ break;
+ } catch (SessionConcurrentAccessException ex) {
+ sessionLockAcquireRetryCount++;
+ if (sessionLockAcquireRetryCount == 3) {
+ logger.error("Failed acquiring the lock after retry, rolling back the transaction");
+ throw ex;
+ }
+ logger.warn(
+ "Session lock acquiring failed with SessionConcurrentAccessException, hence retrying");
+ Thread.sleep(1000L);
+ }
+ } while (sessionLockAcquireRetryCount < 3);
+
+ return sessionDTO;
+ }
+
+ /**
+ * @param deviceId
+ * @param deviceNotification
+ * @return
+ * @throws DeviceOperationException
+ */
+ private TR069DeviceDetails getPersistedDeviceDetails(String deviceId,
+ DeviceInform deviceNotification) throws DeviceOperationException {
+ TR069DeviceDetails deviceDetails = null;
+ try {
+ deviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);
+ } catch (DeviceOperationException doe) {
+ if (ErrorCode.DEVICE_NOT_EXISTS.equals(doe.getErrorCode())) {
+ logger.info(
+ "Creating the device record in TR069_DEVICE_ table, as the device is authenticated successfully.");
+ createDevice(deviceNotification.getDeviceDetails());
+ deviceDetails = deviceOperationInterface.getDeviceDetails(deviceId);
+ }
+ }
+
+ return deviceDetails;
+ }
+
+ /**
+ * @param deviceDetails
+ * @param notificationType
+ * @throws TR069EventProcessingException
+ */
+ private void sendAbortedOperationResultForPendingRPCRequests(DeviceDetails deviceDetails,
+ TR069InformType notificationType) throws TR069EventProcessingException {
+ String deviceId = deviceDetails.getDeviceId();
+ String notificationName = notificationType.name();
+ logger.debug(
+ "Device Inform event received is {}, hence aborting all the pending operations if any exists",
+ notificationName);
+
+ List<DeviceRPCRequest> deviceRPCRequestList =
+ deviceRPCRequestRepositoryHelper.findAllDeviceRPCRequests(deviceId);
+
+ for (DeviceRPCRequest pendingDeviceRPCRequest : deviceRPCRequestList) {
+ DeviceRPCResponse deviceOpResult =
+ tr069RequestProcessEngineUtility.buildAbortedOperationresult(deviceDetails,
+ pendingDeviceRPCRequest, AcsFaultCode.FAULT_CODE_8002);
+ String operationName = null;
+ if (pendingDeviceRPCRequest.getOpDetails().getOpCode() instanceof CustomOperationCode) {
+ CustomOperationCode operationCode =
+ (CustomOperationCode) pendingDeviceRPCRequest.getOpDetails().getOpCode();
+ operationName = operationCode.name();
+ } else {
+ TR069OperationCode operationCode =
+ (TR069OperationCode) pendingDeviceRPCRequest.getOpDetails().getOpCode();
+ operationName = operationCode.name();
+ }
+ Long operationId = pendingDeviceRPCRequest.getOperationId();
+ logger.debug("Aborting the NBI Operation request with operation Id : {} for operation: {}",
+ operationId, operationName);
+ tr069EventNotificationService.sendOperationResultToNBI(deviceOpResult);
+ // Marking the NBI Request as processed
+ deviceRPCRequestRepositoryHelper.markDeviceRPCRequestAsProcessed(deviceId, operationId);
+ stopDeviceRPCRequestTimer(deviceId, operationId);
+ }
+ }
+
+ /**
+ * @param tr069RequestProcessorData
+ * @param deviceNotification
+ */
+ private void updateDeviceDetailsFromInform(TR069RequestProcessorData tr069RequestProcessorData,
+ DeviceInform deviceNotification) {
+ Boolean isDeviceDataChanged =
+ isDeviceUpdateExists(tr069RequestProcessorData, deviceNotification);
+ if (isDeviceDataChanged.booleanValue()) {
+ updateDeviceDetails(tr069RequestProcessorData, deviceNotification);
+ try {
+ logger.info(
+ "The device data like connection request URL/ Device SW/HW version has changed, hence updating the device details");
+ deviceOperationInterface
+ .updateDeviceDetails(tr069RequestProcessorData.getTr069DeviceDetails());
+ } catch (DeviceOperationException e) {
+ logger.error("Updating the device details with the notification details failed, Reason: {}",
+ e.getMessage());
+ logger.error(e.getMessage());
+ }
+ } else {
+ TR069DeviceDetails tr069DeviceDetails =
+ (TR069DeviceDetails) tr069RequestProcessorData.getTr069DeviceDetails();
+ TR069DeviceEntity tr069DeviceEntity =
+ deviceRepository.findByDeviceId(tr069DeviceDetails.getDeviceId());
+ logger.info("Setting connection status as true for device: {}",
+ tr069DeviceDetails.getDeviceId());
+ tr069DeviceEntity.setConnStatus(true);
+ tr069DeviceEntity.setLastUpdatedTime(new Date());
+ tr069DeviceEntity.setErrorMsg(null);
+ deviceRepository.save(tr069DeviceEntity);
+ }
+ }
+
+ /**
+ * @param deviceId
+ * @param operationId
+ * @return
+ */
+ private DeviceRPCRequest getNextRPCRequest(String deviceId, Long operationId) {
+ DeviceRPCRequest deviceRPCRequest = null;
+ try {
+ List<TR069DeviceRPCRequestEntity> entityList =
+ deviceRPCRequestRepositoryHelper.findByDeviceIdAndOperationId(deviceId, operationId);
+ Integer operationCode = entityList.get(0).getOpCode();
+ if (CustomOperationCode.getByOperationCode(operationCode) == null) {
+ logger.info("Marking the Device RPC request with operation id - {} as processed.",
+ operationId);
+ deviceRPCRequestRepositoryHelper.markDeviceRPCRequestAsProcessed(deviceId, operationId);
+ }
+ logger.debug(PENDING_RPC_CHECK);
+ deviceRPCRequest = deviceRPCRequestRepositoryHelper.findOldestDeviceRPCRequest(deviceId);
+ } catch (TR069EventProcessingException e) {
+ logger.error("An unknown exception occurred while fetching the NBI request, Reason: {}",
+ e.getMessage());
+ }
+
+ return deviceRPCRequest;
+ }
+
+ /**
+ * Creates the device in the DM module if factory imported already
+ *
+ * @param deviceDetails
+ * @throws DeviceOperationException
+ */
+ private void createDevice(DeviceDetails deviceDetails) throws DeviceOperationException {
+ deviceOperationInterface.updateDeviceDetails(deviceDetails);
+ }
+}