From 30395f3b3a7e0ccb2240ff90efee79c826974d88 Mon Sep 17 00:00:00 2001 From: "sunil.n" Date: Wed, 3 Dec 2025 14:18:17 +0530 Subject: [PATCH] Add Python application for real-time PRB prediction and automated resource optimization - Implement Flask-based web service for PRB prediction with LSTM model integration - Add real-time inference and automated slice resource optimization via RAN NSSMF - Implement notification-driven architecture for event-based processing - Add Service Management Environment (SME) API for dynamic InfluxDB endpoint discovery - Include comprehensive error handling, thread safety, and logging - Add requirements.txt with all necessary dependencies - Update README.md with complete application documentation and deployment guide Change-Id: I396c6e6e11a2b60b1b0c58dd7df12d2a45ad0cd5 Signed-off-by: sunil.n --- .../rapp-slice-prb-prediction/README.md | 277 +++++++++++++++++++++ .../rapp-slice-prb-prediction/src/data.py | 21 +- .../rapp-slice-prb-prediction/src/main.py | 264 ++++++++++++++++++++ .../rapp-slice-prb-prediction/src/requirements.txt | 8 + 4 files changed, 569 insertions(+), 1 deletion(-) create mode 100644 sample-rapp-generator/rapp-slice-prb-prediction/src/main.py create mode 100644 sample-rapp-generator/rapp-slice-prb-prediction/src/requirements.txt diff --git a/sample-rapp-generator/rapp-slice-prb-prediction/README.md b/sample-rapp-generator/rapp-slice-prb-prediction/README.md index 109b2c1..97f24b8 100644 --- a/sample-rapp-generator/rapp-slice-prb-prediction/README.md +++ b/sample-rapp-generator/rapp-slice-prb-prediction/README.md @@ -265,6 +265,22 @@ The method returns a pandas DataFrame with the following structure: - `{tag_nssi_id}`: Network Slice Subnet Instance identifier - KPI fields as configured in `field_names` (PRB usage, data volume, RRC connections) +##### get_url_from_sme() +The `get_url_from_sme()` method enables dynamic discovery of InfluxDB endpoint through the Service Management Environment (SME). This method allows the DATABASE class to automatically locate and configure the InfluxDB connection without hardcoded endpoints. + +**Usage Example:** +```python +# Initialize database and discover InfluxDB endpoint +db = DATABASE() +db.get_url_from_sme() + +if db.address: + logger.info(f"Discovered InfluxDB at: {db.address}") + # Connect and use the discovered endpoint + if db.connect(): + data = db.read_data() +``` + ##### query() The `query()` method provides robust database query execution with automatic retry logic for handling temporary connection failures. This method ensures reliable data retrieval by implementing exponential backoff retry mechanism. @@ -609,6 +625,237 @@ To train a new model: 4. Monitor training progress and evaluation metrics 5. Use the saved artifacts for deployment in the RAPP +## Main Application (`src/main.py`) + +The `main.py` file implements a comprehensive Flask-based web application that provides real-time PRB prediction and automated resource optimization for 5G RAN slices. This application serves as the core runtime component that integrates machine learning predictions with RAN network management. + +### Key Features + +- **Real-time PRB Prediction**: Uses trained LSTM models to predict future PRB demand for each network slice +- **Automated Resource Optimization**: Automatically adjusts slice PRB allocations based on prediction results +- **Flask Web Service**: Provides RESTful API endpoints for external integration and notification handling +- **RAN NSSMF Integration**: Seamlessly integrates with RAN Network Slice Subnet Management Function for slice control +- **Thread-Safe Operations**: Implements locking mechanisms to prevent concurrent inference conflicts +- **Service Discovery Support**: Optional SME integration for dynamic endpoint discovery +- **Comprehensive Logging**: Detailed logging for monitoring, debugging, and operational visibility + +### Architecture Overview + +The application follows a modular architecture with the following key components: + +#### SlicePRBPrediction Class + +The main prediction engine that orchestrates the entire PRB prediction and optimization workflow. + +**Initialization Parameters:** +- `use_sme` (bool): Enable Service Management Environment for dynamic service discovery + +**Core Components:** +- **Database Integration**: Uses `DATABASE` class for InfluxDB connectivity and data retrieval +- **RAN NSSMF Client**: Integrates with `RAN_NSSMF_CLIENT` for slice management operations +- **ML Model Loading**: Loads pre-trained LSTM models and preprocessing artifacts +- **Configuration Management**: Centralized configuration loading from `config.json` + +#### Flask Web Service + +The application exposes a RESTful API endpoint for handling notifications and triggering inference: + +**Endpoint: `POST /handleFileReadyNotification`** +- Receives notifications from RAN NSSMF and external systems +- Triggers the PRB prediction and optimization workflow +- Returns JSON responses with operation status + +### Core Workflow + +#### 1. Initialization Process +```python +# Initialize the prediction application +rapp = SlicePRBPrediction(use_sme=True) + +# Automatic initialization includes: +# - Database connection setup +# - Model and preprocessing artifacts loading +# - RAN NSSMF client configuration +# - Service discovery (if enabled) +``` + +#### 2. Notification-Driven Inference +The application operates on a notification-driven model: + +1. **Notification Reception**: Receives HTTP POST notifications at `/handleFileReadyNotification` +2. **Data Retrieval**: Fetches latest performance data from InfluxDB +3. **Prediction Execution**: Runs LSTM model inference for each network slice +4. **Resource Optimization**: Compares predictions with current allocations and adjusts as needed +5. **Response Generation**: Returns operation status and results + +#### 3. Prediction Pipeline + +**Data Processing:** +- Retrieves time-series performance data from InfluxDB +- Standardizes column names and data types +- Filters and sorts data by slice type and NSSI ID +- Handles missing data and edge cases + +**Feature Engineering:** +- One-hot encoding for slice types and NSSI IDs +- MinMax scaling for numerical features (PRB, data volume, RRC connections) +- Time series window creation with configurable window size +- Feature concatenation for model input preparation + +**Model Inference:** +- LSTM neural network prediction for each slice +- Inverse transformation of scaled predictions +- Confidence interval estimation (optional) + +#### 4. Automated Resource Optimization + +**Decision Logic:** +```python +# Compare predicted vs current PRB usage +if predicted_prb > current_prb_allocation: + # Increase allocation to prevent congestion + modify_network_slice_subnet(nssi_id, new_prb_allocation=int(predicted_prb)) +else: + # Current allocation sufficient, no action needed + logger.info("No modification required") +``` + +**Optimization Features:** +- Proactive resource allocation based on ML predictions +- Automatic slice configuration updates via RAN NSSMF +- Safety thresholds to prevent over-allocation +- Detailed logging of all optimization actions + +### Configuration + +The application uses `src/config.json` for runtime configuration: + +```json +{ + "RAPP": { + "interval": "672", // Processing interval + "ran_nssmf_address": "http://localhost:8080", // RAN NSSMF endpoint + "callback_uri": "http://localhost:8080/handleFileReadyNotification" + }, + "DB": { + "window_size": 672, // Time series window size + "field_names": ["RRU.PrbDl.SNSSAI", "DRB.PdcpSduVolumeDL.SNSSAI", "RRC.ConnEstabSucc.Cause"] + } +} +``` + +### Model Artifacts + +The application loads the following ML artifacts from the `models/` directory: + +- `best_prb_lstm.keras`: Trained LSTM model for PRB prediction +- `slice_onehot.joblib`: One-hot encoder for slice types +- `nssi_onehot.joblib`: One-hot encoder for NSSI IDs +- `scaler_*.joblib`: Feature scaling transformers for different metrics +- `scaler_y.joblib`: Target variable scaler for prediction inverse transformation + +### API Endpoints + +#### POST /handleFileReadyNotification + +Receives notifications and triggers the PRB prediction workflow. + +**Request Format:** +```json +{ + "fileInfoList": [ + { + "fileId": "performance_data_12345", + "fileSize": 1024000, + "fileLocation": "http://data-server/files/perf_data.csv" + } + ] +} +``` + +**Response Format:** +```json +{ + "status": "success", + "message": "Notification received and inference triggered" +} +``` + +**Error Responses:** +```json +{ + "status": "error", + "message": "Application not properly initialized" +} +``` + +### Running the Application + +#### Command Line Options + +```bash +# Run with static configuration +python src/main.py + +# Run with Service Management Environment discovery +python src/main.py --use_sme True + +# Run without SME (default) +python src/main.py --use_sme False +``` + +#### Startup Process + +1. **Argument Parsing**: Parse command line arguments for SME configuration +2. **Application Initialization**: Create `SlicePRBPrediction` instance +3. **Service Discovery**: Optionally discover endpoints via SME +4. **Notification Subscription**: Subscribe to RAN NSSMF notifications +5. **Web Server Start**: Launch Flask application on port 8080 + + +### Operational Features + +#### Logging and Monitoring + +Detailed logging at multiple levels: + +```python +# Configuration and status logging +logger.info("Successfully subscribed to RAN NSSMF notifications.") +logger.warning("Previous inference still running, skipping this iteration") +logger.error("Failed to fetch details for NSSI ID: {nssi}") + +# Debug logging for detailed troubleshooting +logger.debug(f"NSSI Details: {json.dumps(nssi_details, indent=2)}") +``` + +### Integration Examples + +#### External System Integration + +```python +import requests + +# Trigger inference from external system +response = requests.post( + "http://localhost:8080/handleFileReadyNotification", + json={ + "fileInfoList": [ + { + "fileId": "trigger_001", + "fileSize": 1000, + "fileLocation": "http://external-system/trigger" + } + ] + } +) + +if response.status_code == 200: + print("Inference triggered successfully") +else: + print(f"Failed to trigger inference: {response.text}") +``` + ## Deployment and Usage ### Prerequisites @@ -616,7 +863,27 @@ To train a new model: - Python 3.8+ - InfluxDB 2.x for time series data storage - TensorFlow/Keras for model inference +- Flask web framework for API service - O-RAN Service Management Environment (optional for service discovery) +- RAN Network Slice Subnet Management Function (for slice control) + +### Dependencies + +Install required Python packages: + +```bash +pip install -r src/requirements.txt +``` + +**Required Packages:** +- `influxdb_client`: InfluxDB 2.x client library +- `pandas`: Data manipulation and analysis +- `requests`: HTTP client library +- `tensorflow`: Machine learning framework +- `numpy`: Numerical computing +- `joblib`: Model serialization +- `scikit-learn`: Machine learning utilities +- `Flask`: Web framework for API service ### Running the Application @@ -634,3 +901,13 @@ To train a new model: - Update `src/config.json` with your environment settings - Configure InfluxDB connection parameters - Set up SME endpoints if using service discovery + - Ensure model artifacts are present in `models/` directory + +4. **Start the Prediction Service**: + ```bash + # Run with static configuration + python src/main.py + + # Run with Service Management Environment discovery + python src/main.py --use_sme True + ``` diff --git a/sample-rapp-generator/rapp-slice-prb-prediction/src/data.py b/sample-rapp-generator/rapp-slice-prb-prediction/src/data.py index 0a1b303..0d3c241 100644 --- a/sample-rapp-generator/rapp-slice-prb-prediction/src/data.py +++ b/sample-rapp-generator/rapp-slice-prb-prediction/src/data.py @@ -8,6 +8,8 @@ import os from requests import RequestException +from sme_client import SMEClient + logger = logging.getLogger(__name__) class DATABASE(object): @@ -108,4 +110,21 @@ class DATABASE(object): return result except (RequestException, ConnectionError) as e: logger.error(f'Failed to query influxdb: {e}, retrying in 60 seconds...') - time.sleep(60) \ No newline at end of file + time.sleep(60) + + def get_url_from_sme(self): + sme_client = SMEClient( + invoker_id=self.invoker_id, + api_name=self.influx_api_name, + resource_name=self.influx_resource_name + ) + + self.influx_url = sme_client.discover_service() + + logger.info("InfluxDB URL: {}".format(self.influx_url)) + + if self.influx_url is not None: + self.address = self.influx_url.rstrip('/') + logger.debug(f"InfluxDB Address: {self.address}") + else: + logger.error("Failed to discover InfluxDB URL from SME.") \ No newline at end of file diff --git a/sample-rapp-generator/rapp-slice-prb-prediction/src/main.py b/sample-rapp-generator/rapp-slice-prb-prediction/src/main.py new file mode 100644 index 0000000..ab5bdd3 --- /dev/null +++ b/sample-rapp-generator/rapp-slice-prb-prediction/src/main.py @@ -0,0 +1,264 @@ +import argparse +import os +from data import DATABASE + +from threading import Lock +import logging + +from joblib import load + +import tensorflow as tf + +import numpy as np +import pandas as pd +import json + +from typing import Optional + +from ran_nssmf_client import RAN_NSSMF_CLIENT +from flask import Flask, request, jsonify + +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +# Initialize Flask App +app = Flask(__name__) + +class SlicePRBPrediction(): + def __init__(self, use_sme=False): + self.interval = None + + # Initialize the database and prediction client + self.db = DATABASE() + self.ran_nssmf_client = RAN_NSSMF_CLIENT() + + if use_sme: + # Get the InfluxDB URL from SME + self.db.get_url_from_sme() + # self.ran_nssmf_client.get_url_from_sme() + + self.db.connect() + + self.inference_lock = Lock() + self._running = False + + self.features = ["sliceType_enc", "RRU.PrbDl.SNSSAI","DRB.PdcpSduVolumeDL.SNSSAI","RRC.ConnEstabSucc.Cause"] + + self.enc = load(os.path.join("models", "slice_onehot.joblib")) + self.nssi_enc = load(os.path.join("models", "nssi_onehot.joblib")) + self.scalers = { + "prb": load(os.path.join("models", "scaler_prb.joblib")), + "data": load(os.path.join("models", "scaler_data.joblib")), + "rrc": load(os.path.join("models", "scaler_rrc.joblib")), + "y": load(os.path.join("models", "scaler_y.joblib")) + } + self.model = tf.keras.models.load_model(os.path.join("models", "best_prb_lstm.keras")) + + self.callback_uri = None + + self.config() + + def config(self): + + with open('config.json', 'r') as f: + config = json.load(f) + + # Load RAPP configuration, with a default interval of 10 seconds + rapp_config = config.get("RAPP", {}) # Renamed for clarity + + # Get interval from config, or use 10 as a default if not found + interval_str = rapp_config.get("interval", "672") # Default to "10" string + self.interval = int(interval_str) + self.callback_uri = rapp_config.get("callback_uri", "http://localhost:8080/handleFileReadyNotification") + + def subscribe_to_notifications(self): + # This method will be called after the app is created to subscribe to notifications + # The callback URI must point to this running Flask app's endpoint + # Assuming the rApp will be accessible on port 8080 + + logger.info(f"Attempting to subscribe to RAN NSSMF notifications with callback: {self.callback_uri}") + response = self.ran_nssmf_client.subscribe_to_file_ready_notifications(self.callback_uri) + if response and response.status_code == 201: + logger.info("Successfully subscribed to RAN NSSMF notifications.") + # You might want to store the subscription ID or location from response.headers + else: + logger.error(f"Failed to subscribe to RAN NSSMF notifications. Status: {response.status_code if response else 'N/A'}") + if response: + logger.error(f"Response: {response.text}") + + def safe_inference(self): + if not self.inference_lock.acquire(blocking=False): + logger.warning("Previous inference still running, skipping this iteration") + return + + try: + self.inference() + finally: + self.inference_lock.release() + + def inference(self): + logger.info("Starting inference process...") + df = self.db.read_data() + + # Standardize column names + df = df.rename(columns={ + "_time": "time", + "measObjLdn": "nssi_id", + "sliceType": "slice_type", + "RRU.PrbDl.SNSSAI": "prb_dl", + "DRB.PdcpSduVolumeDL.SNSSAI": "data_dl", + "RRC.ConnEstabSucc.Cause": "rrc_succ" + }) + + if df.empty: + logger.info("No data to process... skipping this iteration of inference.") + return + + # Ensure types + df["time"] = pd.to_datetime(df["time"], utc=True) + df = df.sort_values(["slice_type", "nssi_id", "time"]).reset_index(drop=True) + + # Drop rows with any NA in core columns + df = df.dropna(subset=["slice_type", "nssi_id", "time", "prb_dl", "data_dl", "rrc_succ"]) + + target_groups = sorted(df[["slice_type", "nssi_id"]].drop_duplicates().values.tolist()) + results = [] + for st, nssi in target_groups: + X_latest = self.build_window_for_latest_slice( + df, + target_slice_type=st, + target_nssi_id=nssi, + window=self.db.window_size + ) + if X_latest is None: + logger.warning(f"Not enough recent points for slice_type='{st}' and nssi='{nssi}' to build a window of {self.db.window_size}. Skipping.") + continue + + y_pred_scaled = self.model.predict(X_latest).reshape(-1, 1) + y_pred = self.scalers["y"].inverse_transform(y_pred_scaled)[0, 0] + results.append({"slice_type": st, "nssi_id": nssi, "predicted_prb_dl_next": float(y_pred)}) + + # Fetch NSSI details from RAN NSSMF simulator + logger.info(f"Fetching details for NSSI ID: {nssi} from RAN NSSMF simulator.") + nssi_details = self.ran_nssmf_client.get_network_slice_subnet(subnet_id=nssi) + if nssi_details: + logger.info(f"Successfully fetched details for NSSI ID {nssi}.") + # logger.debug(f"NSSI Details: {json.dumps(nssi_details, indent=2)}") # Uncomment for verbose details + + # Extract current_prb_dl from nssi_details + # Path: nssi_details["attributes"]["sliceProfileList"][0]["ransliceSubnetProfile"]["RRU.PrbDl"] + try: + current_prb_dl = nssi_details.get("attributes", {}) \ + .get("sliceProfileList", [{}])[0] \ + .get("ransliceSubnetProfile", {}) \ + .get("RRU.PrbDl") + + if current_prb_dl is not None: + logger.info(f"Current PRB DL for NSSI ID {nssi}: {current_prb_dl}. Predicted PRB DL: {y_pred:.2f}") + if current_prb_dl < y_pred: + logger.info(f"Current PRB DL ({current_prb_dl}) is less than predicted ({y_pred:.2f}). Sending modification request.") + modification_response = self.ran_nssmf_client.modify_network_slice_subnet( + subnet_id=nssi, + new_prb_dl=int(y_pred) # Cast to int as RRU.PrbDl is an integer + ) + if modification_response: + logger.info(f"Successfully sent modification request for NSSI ID {nssi}. Status: {modification_response.status_code}") + else: + logger.warning(f"Failed to send modification request for NSSI ID {nssi}.") + else: + logger.info(f"Current PRB DL ({current_prb_dl}) is not less than predicted ({y_pred:.2f}). No modification needed.") + else: + logger.warning(f"Could not find RRU.PrbDl in NSSI details for NSSI ID {nssi}.") + except (IndexError, TypeError) as e: + logger.error(f"Error parsing RRU.PrbDl from NSSI details for NSSI ID {nssi}: {e}. NSSI Details: {json.dumps(nssi_details, indent=2)}") + + else: + logger.warning(f"Failed to fetch details for NSSI ID: {nssi}.") + + logger.info(f"Inference results: {json.dumps({'results': results}, indent=2)}") + + + def build_window_for_latest_slice( + self, + df: pd.DataFrame, + target_slice_type: str, + target_nssi_id: str, + window: int + ) -> Optional[np.ndarray]: + g = df[(df["slice_type"] == target_slice_type) & (df["nssi_id"] == target_nssi_id)].sort_values("time").reset_index(drop=True) + if len(g) < window: + return None + + oh = self.enc.transform(np.array([[target_slice_type]])) + oh_row = np.repeat(oh, window, axis=0) + + nssi_oh = self.nssi_enc.transform(np.array([[target_nssi_id]])) + nssi_oh_row = np.repeat(nssi_oh, window, axis=0) + + prb = self.scalers["prb"].transform(g[["prb_dl"]].iloc[-window:]) + data = self.scalers["data"].transform(g[["data_dl"]].iloc[-window:]) + rrc = self.scalers["rrc"].transform(g[["rrc_succ"]].iloc[-window:]) + + feat = np.concatenate([oh_row, nssi_oh_row, prb, data, rrc], axis=1).astype(np.float32) + return feat[np.newaxis, :, :] # shape (1, window, features) + +# Global instance of SlicePRBPrediction to be used by Flask routes +# This instance will be initialized after parsing arguments. +rapp_instance = None + +@app.route('/handleFileReadyNotification', methods=['POST']) +def handle_file_ready_notification(): + logger.info("Received POST request on /handleFileReadyNotification") + if not rapp_instance: + logger.error("rapp_instance not initialized. Cannot process notification.") + return jsonify({"status": "error", "message": "Application not properly initialized"}), 500 + + notification_data = request.get_json() + if not notification_data: + logger.warning("No JSON data received in notification.") + return jsonify({"status": "error", "message": "Invalid JSON payload"}), 400 + + logger.info(f"Notification received: {json.dumps(notification_data, indent=2)}") + + # Trigger the inference process + # The notification_data (e.g., fileInfoList) could be used to tailor the inference + # For now, we'll trigger the general inference process. + try: + rapp_instance.safe_inference() + logger.info("Inference process triggered successfully by notification.") + return jsonify({"status": "success", "message": "Notification received and inference triggered"}), 200 + except Exception as e: + logger.error(f"Error during inference triggered by notification: {str(e)}", exc_info=True) + return jsonify({"status": "error", "message": f"Inference failed: {str(e)}"}), 500 + +if __name__ == "__main__": + + def str2bool(v): + if isinstance(v, bool): + return v + if v.lower() in ('yes', 'true', 't', 'y', '1'): + return True + elif v.lower() in ('no', 'false', 'f', 'n', '0'): + return False + else: + raise argparse.ArgumentTypeError('Boolean value expected.') + + parser = argparse.ArgumentParser(description="Run SlicePRBPrediction rApp") + parser.add_argument("--use_sme", type=str2bool, default=False, help="Set to True use SME url for DB.") + args = parser.parse_args() + + # Instantiate the SlicePRBPrediction class + rapp_instance = SlicePRBPrediction(use_sme=args.use_sme) + logger.debug("Slice PRB Prediction rApp initialized") + + # Subscribe to RAN NSSMF notifications at startup + if rapp_instance: + rapp_instance.subscribe_to_notifications() + else: + logger.error("rapp_instance not initialized. Cannot subscribe to notifications.") + + # Run the Flask app + # The host is set to '0.0.0.0' to make it accessible from outside the container (if applicable) + # The port is set to 8080 as per the callback URI used in subscription. + logger.info("Starting Flask server on port 8080...") + app.run(host='0.0.0.0', port=8080) diff --git a/sample-rapp-generator/rapp-slice-prb-prediction/src/requirements.txt b/sample-rapp-generator/rapp-slice-prb-prediction/src/requirements.txt new file mode 100644 index 0000000..33e676c --- /dev/null +++ b/sample-rapp-generator/rapp-slice-prb-prediction/src/requirements.txt @@ -0,0 +1,8 @@ +influxdb_client +pandas +requests +tensorflow +numpy +joblib +scikit-learn +Flask -- 2.16.6