Add Python application for real-time PRB prediction and automated resource optimization 27/15327/2 master
authorsunil.n <sunil.n@samsung.com>
Wed, 3 Dec 2025 08:48:17 +0000 (14:18 +0530)
committersunil.n <sunil.n@samsung.com>
Wed, 3 Dec 2025 08:48:17 +0000 (14:18 +0530)
- Implement Flask-based web service for PRB prediction with LSTM model integration
- Add real-time inference and automated slice resource optimization via RAN NSSMF
- Implement notification-driven architecture for event-based processing
- Add Service Management Environment (SME) API for dynamic InfluxDB endpoint discovery
- Include comprehensive error handling, thread safety, and logging
- Add requirements.txt with all necessary dependencies
- Update README.md with complete application documentation and deployment guide

Change-Id: I396c6e6e11a2b60b1b0c58dd7df12d2a45ad0cd5
Signed-off-by: sunil.n <sunil.n@samsung.com>
sample-rapp-generator/rapp-slice-prb-prediction/README.md
sample-rapp-generator/rapp-slice-prb-prediction/src/data.py
sample-rapp-generator/rapp-slice-prb-prediction/src/main.py [new file with mode: 0644]
sample-rapp-generator/rapp-slice-prb-prediction/src/requirements.txt [new file with mode: 0644]

index 109b2c1..97f24b8 100644 (file)
@@ -265,6 +265,22 @@ The method returns a pandas DataFrame with the following structure:
 - `{tag_nssi_id}`: Network Slice Subnet Instance identifier
 - KPI fields as configured in `field_names` (PRB usage, data volume, RRC connections)
 
+##### get_url_from_sme()
+The `get_url_from_sme()` method enables dynamic discovery of InfluxDB endpoint through the Service Management Environment (SME). This method allows the DATABASE class to automatically locate and configure the InfluxDB connection without hardcoded endpoints.
+
+**Usage Example:**
+```python
+# Initialize database and discover InfluxDB endpoint
+db = DATABASE()
+db.get_url_from_sme()
+
+if db.address:
+    logger.info(f"Discovered InfluxDB at: {db.address}")
+    # Connect and use the discovered endpoint
+    if db.connect():
+        data = db.read_data()
+```
+
 ##### query()
 The `query()` method provides robust database query execution with automatic retry logic for handling temporary connection failures. This method ensures reliable data retrieval by implementing exponential backoff retry mechanism.
 
@@ -609,6 +625,237 @@ To train a new model:
 4. Monitor training progress and evaluation metrics
 5. Use the saved artifacts for deployment in the RAPP
 
+## Main Application (`src/main.py`)
+
+The `main.py` file implements a comprehensive Flask-based web application that provides real-time PRB prediction and automated resource optimization for 5G RAN slices. This application serves as the core runtime component that integrates machine learning predictions with RAN network management.
+
+### Key Features
+
+- **Real-time PRB Prediction**: Uses trained LSTM models to predict future PRB demand for each network slice
+- **Automated Resource Optimization**: Automatically adjusts slice PRB allocations based on prediction results
+- **Flask Web Service**: Provides RESTful API endpoints for external integration and notification handling
+- **RAN NSSMF Integration**: Seamlessly integrates with RAN Network Slice Subnet Management Function for slice control
+- **Thread-Safe Operations**: Implements locking mechanisms to prevent concurrent inference conflicts
+- **Service Discovery Support**: Optional SME integration for dynamic endpoint discovery
+- **Comprehensive Logging**: Detailed logging for monitoring, debugging, and operational visibility
+
+### Architecture Overview
+
+The application follows a modular architecture with the following key components:
+
+#### SlicePRBPrediction Class
+
+The main prediction engine that orchestrates the entire PRB prediction and optimization workflow.
+
+**Initialization Parameters:**
+- `use_sme` (bool): Enable Service Management Environment for dynamic service discovery
+
+**Core Components:**
+- **Database Integration**: Uses `DATABASE` class for InfluxDB connectivity and data retrieval
+- **RAN NSSMF Client**: Integrates with `RAN_NSSMF_CLIENT` for slice management operations
+- **ML Model Loading**: Loads pre-trained LSTM models and preprocessing artifacts
+- **Configuration Management**: Centralized configuration loading from `config.json`
+
+#### Flask Web Service
+
+The application exposes a RESTful API endpoint for handling notifications and triggering inference:
+
+**Endpoint: `POST /handleFileReadyNotification`**
+- Receives notifications from RAN NSSMF and external systems
+- Triggers the PRB prediction and optimization workflow
+- Returns JSON responses with operation status
+
+### Core Workflow
+
+#### 1. Initialization Process
+```python
+# Initialize the prediction application
+rapp = SlicePRBPrediction(use_sme=True)
+
+# Automatic initialization includes:
+# - Database connection setup
+# - Model and preprocessing artifacts loading
+# - RAN NSSMF client configuration
+# - Service discovery (if enabled)
+```
+
+#### 2. Notification-Driven Inference
+The application operates on a notification-driven model:
+
+1. **Notification Reception**: Receives HTTP POST notifications at `/handleFileReadyNotification`
+2. **Data Retrieval**: Fetches latest performance data from InfluxDB
+3. **Prediction Execution**: Runs LSTM model inference for each network slice
+4. **Resource Optimization**: Compares predictions with current allocations and adjusts as needed
+5. **Response Generation**: Returns operation status and results
+
+#### 3. Prediction Pipeline
+
+**Data Processing:**
+- Retrieves time-series performance data from InfluxDB
+- Standardizes column names and data types
+- Filters and sorts data by slice type and NSSI ID
+- Handles missing data and edge cases
+
+**Feature Engineering:**
+- One-hot encoding for slice types and NSSI IDs
+- MinMax scaling for numerical features (PRB, data volume, RRC connections)
+- Time series window creation with configurable window size
+- Feature concatenation for model input preparation
+
+**Model Inference:**
+- LSTM neural network prediction for each slice
+- Inverse transformation of scaled predictions
+- Confidence interval estimation (optional)
+
+#### 4. Automated Resource Optimization
+
+**Decision Logic:**
+```python
+# Compare predicted vs current PRB usage
+if predicted_prb > current_prb_allocation:
+    # Increase allocation to prevent congestion
+    modify_network_slice_subnet(nssi_id, new_prb_allocation=int(predicted_prb))
+else:
+    # Current allocation sufficient, no action needed
+    logger.info("No modification required")
+```
+
+**Optimization Features:**
+- Proactive resource allocation based on ML predictions
+- Automatic slice configuration updates via RAN NSSMF
+- Safety thresholds to prevent over-allocation
+- Detailed logging of all optimization actions
+
+### Configuration
+
+The application uses `src/config.json` for runtime configuration:
+
+```json
+{
+  "RAPP": {
+    "interval": "672",                                    // Processing interval
+    "ran_nssmf_address": "http://localhost:8080",         // RAN NSSMF endpoint
+    "callback_uri": "http://localhost:8080/handleFileReadyNotification"
+  },
+  "DB": {
+    "window_size": 672,                                   // Time series window size
+    "field_names": ["RRU.PrbDl.SNSSAI", "DRB.PdcpSduVolumeDL.SNSSAI", "RRC.ConnEstabSucc.Cause"]
+  }
+}
+```
+
+### Model Artifacts
+
+The application loads the following ML artifacts from the `models/` directory:
+
+- `best_prb_lstm.keras`: Trained LSTM model for PRB prediction
+- `slice_onehot.joblib`: One-hot encoder for slice types
+- `nssi_onehot.joblib`: One-hot encoder for NSSI IDs
+- `scaler_*.joblib`: Feature scaling transformers for different metrics
+- `scaler_y.joblib`: Target variable scaler for prediction inverse transformation
+
+### API Endpoints
+
+#### POST /handleFileReadyNotification
+
+Receives notifications and triggers the PRB prediction workflow.
+
+**Request Format:**
+```json
+{
+  "fileInfoList": [
+    {
+      "fileId": "performance_data_12345",
+      "fileSize": 1024000,
+      "fileLocation": "http://data-server/files/perf_data.csv"
+    }
+  ]
+}
+```
+
+**Response Format:**
+```json
+{
+  "status": "success",
+  "message": "Notification received and inference triggered"
+}
+```
+
+**Error Responses:**
+```json
+{
+  "status": "error",
+  "message": "Application not properly initialized"
+}
+```
+
+### Running the Application
+
+#### Command Line Options
+
+```bash
+# Run with static configuration
+python src/main.py
+
+# Run with Service Management Environment discovery
+python src/main.py --use_sme True
+
+# Run without SME (default)
+python src/main.py --use_sme False
+```
+
+#### Startup Process
+
+1. **Argument Parsing**: Parse command line arguments for SME configuration
+2. **Application Initialization**: Create `SlicePRBPrediction` instance
+3. **Service Discovery**: Optionally discover endpoints via SME
+4. **Notification Subscription**: Subscribe to RAN NSSMF notifications
+5. **Web Server Start**: Launch Flask application on port 8080
+
+
+### Operational Features
+
+#### Logging and Monitoring
+
+Detailed logging at multiple levels:
+
+```python
+# Configuration and status logging
+logger.info("Successfully subscribed to RAN NSSMF notifications.")
+logger.warning("Previous inference still running, skipping this iteration")
+logger.error("Failed to fetch details for NSSI ID: {nssi}")
+
+# Debug logging for detailed troubleshooting
+logger.debug(f"NSSI Details: {json.dumps(nssi_details, indent=2)}")
+```
+
+### Integration Examples
+
+#### External System Integration
+
+```python
+import requests
+
+# Trigger inference from external system
+response = requests.post(
+    "http://localhost:8080/handleFileReadyNotification",
+    json={
+        "fileInfoList": [
+            {
+                "fileId": "trigger_001",
+                "fileSize": 1000,
+                "fileLocation": "http://external-system/trigger"
+            }
+        ]
+    }
+)
+
+if response.status_code == 200:
+    print("Inference triggered successfully")
+else:
+    print(f"Failed to trigger inference: {response.text}")
+```
+
 ## Deployment and Usage
 
 ### Prerequisites
@@ -616,7 +863,27 @@ To train a new model:
 - Python 3.8+
 - InfluxDB 2.x for time series data storage
 - TensorFlow/Keras for model inference
+- Flask web framework for API service
 - O-RAN Service Management Environment (optional for service discovery)
+- RAN Network Slice Subnet Management Function (for slice control)
+
+### Dependencies
+
+Install required Python packages:
+
+```bash
+pip install -r src/requirements.txt
+```
+
+**Required Packages:**
+- `influxdb_client`: InfluxDB 2.x client library
+- `pandas`: Data manipulation and analysis
+- `requests`: HTTP client library
+- `tensorflow`: Machine learning framework
+- `numpy`: Numerical computing
+- `joblib`: Model serialization
+- `scikit-learn`: Machine learning utilities
+- `Flask`: Web framework for API service
 
 ### Running the Application
 
@@ -634,3 +901,13 @@ To train a new model:
    - Update `src/config.json` with your environment settings
    - Configure InfluxDB connection parameters
    - Set up SME endpoints if using service discovery
+   - Ensure model artifacts are present in `models/` directory
+
+4. **Start the Prediction Service**:
+   ```bash
+   # Run with static configuration
+   python src/main.py
+   
+   # Run with Service Management Environment discovery
+   python src/main.py --use_sme True
+   ```
index 0a1b303..0d3c241 100644 (file)
@@ -8,6 +8,8 @@ import os
 
 from requests import RequestException
 
+from sme_client import SMEClient
+
 logger = logging.getLogger(__name__)
 
 class DATABASE(object):
@@ -108,4 +110,21 @@ class DATABASE(object):
                 return result
             except (RequestException, ConnectionError) as e:
                 logger.error(f'Failed to query influxdb: {e}, retrying in 60 seconds...')
-                time.sleep(60)
\ No newline at end of file
+                time.sleep(60)
+
+    def get_url_from_sme(self):
+        sme_client = SMEClient(
+            invoker_id=self.invoker_id,
+            api_name=self.influx_api_name,
+            resource_name=self.influx_resource_name
+        )
+
+        self.influx_url = sme_client.discover_service()
+
+        logger.info("InfluxDB URL: {}".format(self.influx_url))
+
+        if self.influx_url is not None:
+            self.address = self.influx_url.rstrip('/')
+            logger.debug(f"InfluxDB Address: {self.address}")
+        else:
+            logger.error("Failed to discover InfluxDB URL from SME.")
\ No newline at end of file
diff --git a/sample-rapp-generator/rapp-slice-prb-prediction/src/main.py b/sample-rapp-generator/rapp-slice-prb-prediction/src/main.py
new file mode 100644 (file)
index 0000000..ab5bdd3
--- /dev/null
@@ -0,0 +1,264 @@
+import argparse
+import os
+from data import DATABASE
+
+from threading import Lock
+import logging
+
+from joblib import load
+
+import tensorflow as tf
+
+import numpy as np
+import pandas as pd
+import json
+
+from typing import Optional
+
+from ran_nssmf_client import RAN_NSSMF_CLIENT
+from flask import Flask, request, jsonify
+
+logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+logger = logging.getLogger(__name__)
+
+# Initialize Flask App
+app = Flask(__name__)
+
+class SlicePRBPrediction():
+    def __init__(self, use_sme=False):
+        self.interval = None
+
+        # Initialize the database and prediction client
+        self.db = DATABASE()
+        self.ran_nssmf_client = RAN_NSSMF_CLIENT()
+
+        if use_sme:
+            # Get the InfluxDB URL from SME
+            self.db.get_url_from_sme()
+            # self.ran_nssmf_client.get_url_from_sme()
+
+        self.db.connect()
+
+        self.inference_lock = Lock()
+        self._running = False
+
+        self.features = ["sliceType_enc", "RRU.PrbDl.SNSSAI","DRB.PdcpSduVolumeDL.SNSSAI","RRC.ConnEstabSucc.Cause"]
+
+        self.enc = load(os.path.join("models", "slice_onehot.joblib"))
+        self.nssi_enc = load(os.path.join("models", "nssi_onehot.joblib"))
+        self.scalers = {
+            "prb": load(os.path.join("models", "scaler_prb.joblib")),
+            "data": load(os.path.join("models", "scaler_data.joblib")),
+            "rrc": load(os.path.join("models", "scaler_rrc.joblib")),
+            "y": load(os.path.join("models", "scaler_y.joblib"))
+        }
+        self.model = tf.keras.models.load_model(os.path.join("models", "best_prb_lstm.keras"))
+
+        self.callback_uri = None
+
+        self.config()
+
+    def config(self):
+
+        with open('config.json', 'r') as f:
+            config = json.load(f)
+
+        # Load RAPP configuration, with a default interval of 10 seconds
+        rapp_config = config.get("RAPP", {}) # Renamed for clarity
+
+        # Get interval from config, or use 10 as a default if not found
+        interval_str = rapp_config.get("interval", "672") # Default to "10" string
+        self.interval = int(interval_str)
+        self.callback_uri = rapp_config.get("callback_uri", "http://localhost:8080/handleFileReadyNotification")
+
+    def subscribe_to_notifications(self):
+        # This method will be called after the app is created to subscribe to notifications
+        # The callback URI must point to this running Flask app's endpoint
+        # Assuming the rApp will be accessible on port 8080
+        
+        logger.info(f"Attempting to subscribe to RAN NSSMF notifications with callback: {self.callback_uri}")
+        response = self.ran_nssmf_client.subscribe_to_file_ready_notifications(self.callback_uri)
+        if response and response.status_code == 201:
+            logger.info("Successfully subscribed to RAN NSSMF notifications.")
+            # You might want to store the subscription ID or location from response.headers
+        else:
+            logger.error(f"Failed to subscribe to RAN NSSMF notifications. Status: {response.status_code if response else 'N/A'}")
+            if response:
+                logger.error(f"Response: {response.text}")
+    
+    def safe_inference(self):
+        if not self.inference_lock.acquire(blocking=False):
+            logger.warning("Previous inference still running, skipping this iteration")
+            return
+
+        try:
+            self.inference()
+        finally:
+            self.inference_lock.release()
+
+    def inference(self):
+        logger.info("Starting inference process...")
+        df = self.db.read_data()
+
+        # Standardize column names
+        df = df.rename(columns={
+        "_time": "time",
+        "measObjLdn": "nssi_id",
+        "sliceType": "slice_type",
+        "RRU.PrbDl.SNSSAI": "prb_dl",
+        "DRB.PdcpSduVolumeDL.SNSSAI": "data_dl",
+        "RRC.ConnEstabSucc.Cause": "rrc_succ"
+        })
+
+        if df.empty:
+            logger.info("No data to process... skipping this iteration of inference.")
+            return
+
+        # Ensure types
+        df["time"] = pd.to_datetime(df["time"], utc=True)
+        df = df.sort_values(["slice_type", "nssi_id", "time"]).reset_index(drop=True)
+
+        # Drop rows with any NA in core columns
+        df = df.dropna(subset=["slice_type", "nssi_id", "time", "prb_dl", "data_dl", "rrc_succ"])
+
+        target_groups = sorted(df[["slice_type", "nssi_id"]].drop_duplicates().values.tolist())
+        results = []
+        for st, nssi in target_groups:
+            X_latest = self.build_window_for_latest_slice(
+            df, 
+            target_slice_type=st, 
+            target_nssi_id=nssi,
+            window=self.db.window_size
+            )
+            if X_latest is None:
+                logger.warning(f"Not enough recent points for slice_type='{st}' and nssi='{nssi}' to build a window of {self.db.window_size}. Skipping.")
+                continue
+        
+            y_pred_scaled = self.model.predict(X_latest).reshape(-1, 1)
+            y_pred = self.scalers["y"].inverse_transform(y_pred_scaled)[0, 0]
+            results.append({"slice_type": st, "nssi_id": nssi, "predicted_prb_dl_next": float(y_pred)})
+
+            # Fetch NSSI details from RAN NSSMF simulator
+            logger.info(f"Fetching details for NSSI ID: {nssi} from RAN NSSMF simulator.")
+            nssi_details = self.ran_nssmf_client.get_network_slice_subnet(subnet_id=nssi)
+            if nssi_details:
+                logger.info(f"Successfully fetched details for NSSI ID {nssi}.")
+                # logger.debug(f"NSSI Details: {json.dumps(nssi_details, indent=2)}") # Uncomment for verbose details
+
+                # Extract current_prb_dl from nssi_details
+                # Path: nssi_details["attributes"]["sliceProfileList"][0]["ransliceSubnetProfile"]["RRU.PrbDl"]
+                try:
+                    current_prb_dl = nssi_details.get("attributes", {}) \
+                                                .get("sliceProfileList", [{}])[0] \
+                                                .get("ransliceSubnetProfile", {}) \
+                                                .get("RRU.PrbDl")
+                    
+                    if current_prb_dl is not None:
+                        logger.info(f"Current PRB DL for NSSI ID {nssi}: {current_prb_dl}. Predicted PRB DL: {y_pred:.2f}")
+                        if current_prb_dl < y_pred:
+                            logger.info(f"Current PRB DL ({current_prb_dl}) is less than predicted ({y_pred:.2f}). Sending modification request.")
+                            modification_response = self.ran_nssmf_client.modify_network_slice_subnet(
+                                subnet_id=nssi, 
+                                new_prb_dl=int(y_pred) # Cast to int as RRU.PrbDl is an integer
+                            )
+                            if modification_response:
+                                logger.info(f"Successfully sent modification request for NSSI ID {nssi}. Status: {modification_response.status_code}")
+                            else:
+                                logger.warning(f"Failed to send modification request for NSSI ID {nssi}.")
+                        else:
+                            logger.info(f"Current PRB DL ({current_prb_dl}) is not less than predicted ({y_pred:.2f}). No modification needed.")
+                    else:
+                        logger.warning(f"Could not find RRU.PrbDl in NSSI details for NSSI ID {nssi}.")
+                except (IndexError, TypeError) as e:
+                    logger.error(f"Error parsing RRU.PrbDl from NSSI details for NSSI ID {nssi}: {e}. NSSI Details: {json.dumps(nssi_details, indent=2)}")
+
+            else:
+                logger.warning(f"Failed to fetch details for NSSI ID: {nssi}.")
+        
+        logger.info(f"Inference results: {json.dumps({'results': results}, indent=2)}")
+        
+    
+    def build_window_for_latest_slice(
+        self,
+        df: pd.DataFrame,
+        target_slice_type: str,
+        target_nssi_id: str,
+        window: int
+    ) -> Optional[np.ndarray]:
+        g = df[(df["slice_type"] == target_slice_type) & (df["nssi_id"] == target_nssi_id)].sort_values("time").reset_index(drop=True)
+        if len(g) < window:
+            return None
+
+        oh = self.enc.transform(np.array([[target_slice_type]]))
+        oh_row = np.repeat(oh, window, axis=0)
+
+        nssi_oh = self.nssi_enc.transform(np.array([[target_nssi_id]]))
+        nssi_oh_row = np.repeat(nssi_oh, window, axis=0)
+
+        prb = self.scalers["prb"].transform(g[["prb_dl"]].iloc[-window:])
+        data = self.scalers["data"].transform(g[["data_dl"]].iloc[-window:])
+        rrc = self.scalers["rrc"].transform(g[["rrc_succ"]].iloc[-window:])
+
+        feat = np.concatenate([oh_row, nssi_oh_row, prb, data, rrc], axis=1).astype(np.float32)
+        return feat[np.newaxis, :, :]  # shape (1, window, features)
+
+# Global instance of SlicePRBPrediction to be used by Flask routes
+# This instance will be initialized after parsing arguments.
+rapp_instance = None
+
+@app.route('/handleFileReadyNotification', methods=['POST'])
+def handle_file_ready_notification():
+    logger.info("Received POST request on /handleFileReadyNotification")
+    if not rapp_instance:
+        logger.error("rapp_instance not initialized. Cannot process notification.")
+        return jsonify({"status": "error", "message": "Application not properly initialized"}), 500
+
+    notification_data = request.get_json()
+    if not notification_data:
+        logger.warning("No JSON data received in notification.")
+        return jsonify({"status": "error", "message": "Invalid JSON payload"}), 400
+
+    logger.info(f"Notification received: {json.dumps(notification_data, indent=2)}")
+
+    # Trigger the inference process
+    # The notification_data (e.g., fileInfoList) could be used to tailor the inference
+    # For now, we'll trigger the general inference process.
+    try:
+        rapp_instance.safe_inference()
+        logger.info("Inference process triggered successfully by notification.")
+        return jsonify({"status": "success", "message": "Notification received and inference triggered"}), 200
+    except Exception as e:
+        logger.error(f"Error during inference triggered by notification: {str(e)}", exc_info=True)
+        return jsonify({"status": "error", "message": f"Inference failed: {str(e)}"}), 500
+
+if __name__ == "__main__":
+    
+    def str2bool(v):
+        if isinstance(v, bool):
+            return v
+        if v.lower() in ('yes', 'true', 't', 'y', '1'):
+            return True
+        elif v.lower() in ('no', 'false', 'f', 'n', '0'):
+            return False
+        else:
+            raise argparse.ArgumentTypeError('Boolean value expected.')
+
+    parser = argparse.ArgumentParser(description="Run SlicePRBPrediction rApp")
+    parser.add_argument("--use_sme", type=str2bool, default=False, help="Set to True use SME url for DB.")
+    args = parser.parse_args()
+
+    # Instantiate the SlicePRBPrediction class
+    rapp_instance = SlicePRBPrediction(use_sme=args.use_sme)
+    logger.debug("Slice PRB Prediction rApp initialized")
+
+    # Subscribe to RAN NSSMF notifications at startup
+    if rapp_instance:
+        rapp_instance.subscribe_to_notifications()
+    else:
+        logger.error("rapp_instance not initialized. Cannot subscribe to notifications.")
+
+    # Run the Flask app
+    # The host is set to '0.0.0.0' to make it accessible from outside the container (if applicable)
+    # The port is set to 8080 as per the callback URI used in subscription.
+    logger.info("Starting Flask server on port 8080...")
+    app.run(host='0.0.0.0', port=8080)
diff --git a/sample-rapp-generator/rapp-slice-prb-prediction/src/requirements.txt b/sample-rapp-generator/rapp-slice-prb-prediction/src/requirements.txt
new file mode 100644 (file)
index 0000000..33e676c
--- /dev/null
@@ -0,0 +1,8 @@
+influxdb_client
+pandas
+requests
+tensorflow
+numpy
+joblib
+scikit-learn
+Flask