Add Python application for real-time PRB prediction and automated resource optimization 27/15327/2 master
authorsunil.n <sunil.n@samsung.com>
Wed, 3 Dec 2025 08:48:17 +0000 (14:18 +0530)
committersunil.n <sunil.n@samsung.com>
Wed, 3 Dec 2025 08:48:17 +0000 (14:18 +0530)
- Implement Flask-based web service for PRB prediction with LSTM model integration
- Add real-time inference and automated slice resource optimization via RAN NSSMF
- Implement notification-driven architecture for event-based processing
- Add Service Management Environment (SME) API for dynamic InfluxDB endpoint discovery
- Include comprehensive error handling, thread safety, and logging
- Add requirements.txt with all necessary dependencies
- Update README.md with complete application documentation and deployment guide

Change-Id: I396c6e6e11a2b60b1b0c58dd7df12d2a45ad0cd5
Signed-off-by: sunil.n <sunil.n@samsung.com>
sample-rapp-generator/rapp-slice-prb-prediction/README.md
sample-rapp-generator/rapp-slice-prb-prediction/src/data.py
sample-rapp-generator/rapp-slice-prb-prediction/src/main.py [new file with mode: 0644]
sample-rapp-generator/rapp-slice-prb-prediction/src/requirements.txt [new file with mode: 0644]

index 109b2c1..97f24b8 100644 (file)
@@ -265,6 +265,22 @@ The method returns a pandas DataFrame with the following structure:
 - `{tag_nssi_id}`: Network Slice Subnet Instance identifier
 - KPI fields as configured in `field_names` (PRB usage, data volume, RRC connections)
 
 - `{tag_nssi_id}`: Network Slice Subnet Instance identifier
 - KPI fields as configured in `field_names` (PRB usage, data volume, RRC connections)
 
+##### get_url_from_sme()
+The `get_url_from_sme()` method enables dynamic discovery of InfluxDB endpoint through the Service Management Environment (SME). This method allows the DATABASE class to automatically locate and configure the InfluxDB connection without hardcoded endpoints.
+
+**Usage Example:**
+```python
+# Initialize database and discover InfluxDB endpoint
+db = DATABASE()
+db.get_url_from_sme()
+
+if db.address:
+    logger.info(f"Discovered InfluxDB at: {db.address}")
+    # Connect and use the discovered endpoint
+    if db.connect():
+        data = db.read_data()
+```
+
 ##### query()
 The `query()` method provides robust database query execution with automatic retry logic for handling temporary connection failures. This method ensures reliable data retrieval by implementing exponential backoff retry mechanism.
 
 ##### query()
 The `query()` method provides robust database query execution with automatic retry logic for handling temporary connection failures. This method ensures reliable data retrieval by implementing exponential backoff retry mechanism.
 
@@ -609,6 +625,237 @@ To train a new model:
 4. Monitor training progress and evaluation metrics
 5. Use the saved artifacts for deployment in the RAPP
 
 4. Monitor training progress and evaluation metrics
 5. Use the saved artifacts for deployment in the RAPP
 
+## Main Application (`src/main.py`)
+
+The `main.py` file implements a comprehensive Flask-based web application that provides real-time PRB prediction and automated resource optimization for 5G RAN slices. This application serves as the core runtime component that integrates machine learning predictions with RAN network management.
+
+### Key Features
+
+- **Real-time PRB Prediction**: Uses trained LSTM models to predict future PRB demand for each network slice
+- **Automated Resource Optimization**: Automatically adjusts slice PRB allocations based on prediction results
+- **Flask Web Service**: Provides RESTful API endpoints for external integration and notification handling
+- **RAN NSSMF Integration**: Seamlessly integrates with RAN Network Slice Subnet Management Function for slice control
+- **Thread-Safe Operations**: Implements locking mechanisms to prevent concurrent inference conflicts
+- **Service Discovery Support**: Optional SME integration for dynamic endpoint discovery
+- **Comprehensive Logging**: Detailed logging for monitoring, debugging, and operational visibility
+
+### Architecture Overview
+
+The application follows a modular architecture with the following key components:
+
+#### SlicePRBPrediction Class
+
+The main prediction engine that orchestrates the entire PRB prediction and optimization workflow.
+
+**Initialization Parameters:**
+- `use_sme` (bool): Enable Service Management Environment for dynamic service discovery
+
+**Core Components:**
+- **Database Integration**: Uses `DATABASE` class for InfluxDB connectivity and data retrieval
+- **RAN NSSMF Client**: Integrates with `RAN_NSSMF_CLIENT` for slice management operations
+- **ML Model Loading**: Loads pre-trained LSTM models and preprocessing artifacts
+- **Configuration Management**: Centralized configuration loading from `config.json`
+
+#### Flask Web Service
+
+The application exposes a RESTful API endpoint for handling notifications and triggering inference:
+
+**Endpoint: `POST /handleFileReadyNotification`**
+- Receives notifications from RAN NSSMF and external systems
+- Triggers the PRB prediction and optimization workflow
+- Returns JSON responses with operation status
+
+### Core Workflow
+
+#### 1. Initialization Process
+```python
+# Initialize the prediction application
+rapp = SlicePRBPrediction(use_sme=True)
+
+# Automatic initialization includes:
+# - Database connection setup
+# - Model and preprocessing artifacts loading
+# - RAN NSSMF client configuration
+# - Service discovery (if enabled)
+```
+
+#### 2. Notification-Driven Inference
+The application operates on a notification-driven model:
+
+1. **Notification Reception**: Receives HTTP POST notifications at `/handleFileReadyNotification`
+2. **Data Retrieval**: Fetches latest performance data from InfluxDB
+3. **Prediction Execution**: Runs LSTM model inference for each network slice
+4. **Resource Optimization**: Compares predictions with current allocations and adjusts as needed
+5. **Response Generation**: Returns operation status and results
+
+#### 3. Prediction Pipeline
+
+**Data Processing:**
+- Retrieves time-series performance data from InfluxDB
+- Standardizes column names and data types
+- Filters and sorts data by slice type and NSSI ID
+- Handles missing data and edge cases
+
+**Feature Engineering:**
+- One-hot encoding for slice types and NSSI IDs
+- MinMax scaling for numerical features (PRB, data volume, RRC connections)
+- Time series window creation with configurable window size
+- Feature concatenation for model input preparation
+
+**Model Inference:**
+- LSTM neural network prediction for each slice
+- Inverse transformation of scaled predictions
+- Confidence interval estimation (optional)
+
+#### 4. Automated Resource Optimization
+
+**Decision Logic:**
+```python
+# Compare predicted vs current PRB usage
+if predicted_prb > current_prb_allocation:
+    # Increase allocation to prevent congestion
+    modify_network_slice_subnet(nssi_id, new_prb_allocation=int(predicted_prb))
+else:
+    # Current allocation sufficient, no action needed
+    logger.info("No modification required")
+```
+
+**Optimization Features:**
+- Proactive resource allocation based on ML predictions
+- Automatic slice configuration updates via RAN NSSMF
+- Safety thresholds to prevent over-allocation
+- Detailed logging of all optimization actions
+
+### Configuration
+
+The application uses `src/config.json` for runtime configuration:
+
+```json
+{
+  "RAPP": {
+    "interval": "672",                                    // Processing interval
+    "ran_nssmf_address": "http://localhost:8080",         // RAN NSSMF endpoint
+    "callback_uri": "http://localhost:8080/handleFileReadyNotification"
+  },
+  "DB": {
+    "window_size": 672,                                   // Time series window size
+    "field_names": ["RRU.PrbDl.SNSSAI", "DRB.PdcpSduVolumeDL.SNSSAI", "RRC.ConnEstabSucc.Cause"]
+  }
+}
+```
+
+### Model Artifacts
+
+The application loads the following ML artifacts from the `models/` directory:
+
+- `best_prb_lstm.keras`: Trained LSTM model for PRB prediction
+- `slice_onehot.joblib`: One-hot encoder for slice types
+- `nssi_onehot.joblib`: One-hot encoder for NSSI IDs
+- `scaler_*.joblib`: Feature scaling transformers for different metrics
+- `scaler_y.joblib`: Target variable scaler for prediction inverse transformation
+
+### API Endpoints
+
+#### POST /handleFileReadyNotification
+
+Receives notifications and triggers the PRB prediction workflow.
+
+**Request Format:**
+```json
+{
+  "fileInfoList": [
+    {
+      "fileId": "performance_data_12345",
+      "fileSize": 1024000,
+      "fileLocation": "http://data-server/files/perf_data.csv"
+    }
+  ]
+}
+```
+
+**Response Format:**
+```json
+{
+  "status": "success",
+  "message": "Notification received and inference triggered"
+}
+```
+
+**Error Responses:**
+```json
+{
+  "status": "error",
+  "message": "Application not properly initialized"
+}
+```
+
+### Running the Application
+
+#### Command Line Options
+
+```bash
+# Run with static configuration
+python src/main.py
+
+# Run with Service Management Environment discovery
+python src/main.py --use_sme True
+
+# Run without SME (default)
+python src/main.py --use_sme False
+```
+
+#### Startup Process
+
+1. **Argument Parsing**: Parse command line arguments for SME configuration
+2. **Application Initialization**: Create `SlicePRBPrediction` instance
+3. **Service Discovery**: Optionally discover endpoints via SME
+4. **Notification Subscription**: Subscribe to RAN NSSMF notifications
+5. **Web Server Start**: Launch Flask application on port 8080
+
+
+### Operational Features
+
+#### Logging and Monitoring
+
+Detailed logging at multiple levels:
+
+```python
+# Configuration and status logging
+logger.info("Successfully subscribed to RAN NSSMF notifications.")
+logger.warning("Previous inference still running, skipping this iteration")
+logger.error("Failed to fetch details for NSSI ID: {nssi}")
+
+# Debug logging for detailed troubleshooting
+logger.debug(f"NSSI Details: {json.dumps(nssi_details, indent=2)}")
+```
+
+### Integration Examples
+
+#### External System Integration
+
+```python
+import requests
+
+# Trigger inference from external system
+response = requests.post(
+    "http://localhost:8080/handleFileReadyNotification",
+    json={
+        "fileInfoList": [
+            {
+                "fileId": "trigger_001",
+                "fileSize": 1000,
+                "fileLocation": "http://external-system/trigger"
+            }
+        ]
+    }
+)
+
+if response.status_code == 200:
+    print("Inference triggered successfully")
+else:
+    print(f"Failed to trigger inference: {response.text}")
+```
+
 ## Deployment and Usage
 
 ### Prerequisites
 ## Deployment and Usage
 
 ### Prerequisites
@@ -616,7 +863,27 @@ To train a new model:
 - Python 3.8+
 - InfluxDB 2.x for time series data storage
 - TensorFlow/Keras for model inference
 - Python 3.8+
 - InfluxDB 2.x for time series data storage
 - TensorFlow/Keras for model inference
+- Flask web framework for API service
 - O-RAN Service Management Environment (optional for service discovery)
 - O-RAN Service Management Environment (optional for service discovery)
+- RAN Network Slice Subnet Management Function (for slice control)
+
+### Dependencies
+
+Install required Python packages:
+
+```bash
+pip install -r src/requirements.txt
+```
+
+**Required Packages:**
+- `influxdb_client`: InfluxDB 2.x client library
+- `pandas`: Data manipulation and analysis
+- `requests`: HTTP client library
+- `tensorflow`: Machine learning framework
+- `numpy`: Numerical computing
+- `joblib`: Model serialization
+- `scikit-learn`: Machine learning utilities
+- `Flask`: Web framework for API service
 
 ### Running the Application
 
 
 ### Running the Application
 
@@ -634,3 +901,13 @@ To train a new model:
    - Update `src/config.json` with your environment settings
    - Configure InfluxDB connection parameters
    - Set up SME endpoints if using service discovery
    - Update `src/config.json` with your environment settings
    - Configure InfluxDB connection parameters
    - Set up SME endpoints if using service discovery
+   - Ensure model artifacts are present in `models/` directory
+
+4. **Start the Prediction Service**:
+   ```bash
+   # Run with static configuration
+   python src/main.py
+   
+   # Run with Service Management Environment discovery
+   python src/main.py --use_sme True
+   ```
index 0a1b303..0d3c241 100644 (file)
@@ -8,6 +8,8 @@ import os
 
 from requests import RequestException
 
 
 from requests import RequestException
 
+from sme_client import SMEClient
+
 logger = logging.getLogger(__name__)
 
 class DATABASE(object):
 logger = logging.getLogger(__name__)
 
 class DATABASE(object):
@@ -108,4 +110,21 @@ class DATABASE(object):
                 return result
             except (RequestException, ConnectionError) as e:
                 logger.error(f'Failed to query influxdb: {e}, retrying in 60 seconds...')
                 return result
             except (RequestException, ConnectionError) as e:
                 logger.error(f'Failed to query influxdb: {e}, retrying in 60 seconds...')
-                time.sleep(60)
\ No newline at end of file
+                time.sleep(60)
+
+    def get_url_from_sme(self):
+        sme_client = SMEClient(
+            invoker_id=self.invoker_id,
+            api_name=self.influx_api_name,
+            resource_name=self.influx_resource_name
+        )
+
+        self.influx_url = sme_client.discover_service()
+
+        logger.info("InfluxDB URL: {}".format(self.influx_url))
+
+        if self.influx_url is not None:
+            self.address = self.influx_url.rstrip('/')
+            logger.debug(f"InfluxDB Address: {self.address}")
+        else:
+            logger.error("Failed to discover InfluxDB URL from SME.")
\ No newline at end of file
diff --git a/sample-rapp-generator/rapp-slice-prb-prediction/src/main.py b/sample-rapp-generator/rapp-slice-prb-prediction/src/main.py
new file mode 100644 (file)
index 0000000..ab5bdd3
--- /dev/null
@@ -0,0 +1,264 @@
+import argparse
+import os
+from data import DATABASE
+
+from threading import Lock
+import logging
+
+from joblib import load
+
+import tensorflow as tf
+
+import numpy as np
+import pandas as pd
+import json
+
+from typing import Optional
+
+from ran_nssmf_client import RAN_NSSMF_CLIENT
+from flask import Flask, request, jsonify
+
+logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+logger = logging.getLogger(__name__)
+
+# Initialize Flask App
+app = Flask(__name__)
+
+class SlicePRBPrediction():
+    def __init__(self, use_sme=False):
+        self.interval = None
+
+        # Initialize the database and prediction client
+        self.db = DATABASE()
+        self.ran_nssmf_client = RAN_NSSMF_CLIENT()
+
+        if use_sme:
+            # Get the InfluxDB URL from SME
+            self.db.get_url_from_sme()
+            # self.ran_nssmf_client.get_url_from_sme()
+
+        self.db.connect()
+
+        self.inference_lock = Lock()
+        self._running = False
+
+        self.features = ["sliceType_enc", "RRU.PrbDl.SNSSAI","DRB.PdcpSduVolumeDL.SNSSAI","RRC.ConnEstabSucc.Cause"]
+
+        self.enc = load(os.path.join("models", "slice_onehot.joblib"))
+        self.nssi_enc = load(os.path.join("models", "nssi_onehot.joblib"))
+        self.scalers = {
+            "prb": load(os.path.join("models", "scaler_prb.joblib")),
+            "data": load(os.path.join("models", "scaler_data.joblib")),
+            "rrc": load(os.path.join("models", "scaler_rrc.joblib")),
+            "y": load(os.path.join("models", "scaler_y.joblib"))
+        }
+        self.model = tf.keras.models.load_model(os.path.join("models", "best_prb_lstm.keras"))
+
+        self.callback_uri = None
+
+        self.config()
+
+    def config(self):
+
+        with open('config.json', 'r') as f:
+            config = json.load(f)
+
+        # Load RAPP configuration, with a default interval of 10 seconds
+        rapp_config = config.get("RAPP", {}) # Renamed for clarity
+
+        # Get interval from config, or use 10 as a default if not found
+        interval_str = rapp_config.get("interval", "672") # Default to "10" string
+        self.interval = int(interval_str)
+        self.callback_uri = rapp_config.get("callback_uri", "http://localhost:8080/handleFileReadyNotification")
+
+    def subscribe_to_notifications(self):
+        # This method will be called after the app is created to subscribe to notifications
+        # The callback URI must point to this running Flask app's endpoint
+        # Assuming the rApp will be accessible on port 8080
+        
+        logger.info(f"Attempting to subscribe to RAN NSSMF notifications with callback: {self.callback_uri}")
+        response = self.ran_nssmf_client.subscribe_to_file_ready_notifications(self.callback_uri)
+        if response and response.status_code == 201:
+            logger.info("Successfully subscribed to RAN NSSMF notifications.")
+            # You might want to store the subscription ID or location from response.headers
+        else:
+            logger.error(f"Failed to subscribe to RAN NSSMF notifications. Status: {response.status_code if response else 'N/A'}")
+            if response:
+                logger.error(f"Response: {response.text}")
+    
+    def safe_inference(self):
+        if not self.inference_lock.acquire(blocking=False):
+            logger.warning("Previous inference still running, skipping this iteration")
+            return
+
+        try:
+            self.inference()
+        finally:
+            self.inference_lock.release()
+
+    def inference(self):
+        logger.info("Starting inference process...")
+        df = self.db.read_data()
+
+        # Standardize column names
+        df = df.rename(columns={
+        "_time": "time",
+        "measObjLdn": "nssi_id",
+        "sliceType": "slice_type",
+        "RRU.PrbDl.SNSSAI": "prb_dl",
+        "DRB.PdcpSduVolumeDL.SNSSAI": "data_dl",
+        "RRC.ConnEstabSucc.Cause": "rrc_succ"
+        })
+
+        if df.empty:
+            logger.info("No data to process... skipping this iteration of inference.")
+            return
+
+        # Ensure types
+        df["time"] = pd.to_datetime(df["time"], utc=True)
+        df = df.sort_values(["slice_type", "nssi_id", "time"]).reset_index(drop=True)
+
+        # Drop rows with any NA in core columns
+        df = df.dropna(subset=["slice_type", "nssi_id", "time", "prb_dl", "data_dl", "rrc_succ"])
+
+        target_groups = sorted(df[["slice_type", "nssi_id"]].drop_duplicates().values.tolist())
+        results = []
+        for st, nssi in target_groups:
+            X_latest = self.build_window_for_latest_slice(
+            df, 
+            target_slice_type=st, 
+            target_nssi_id=nssi,
+            window=self.db.window_size
+            )
+            if X_latest is None:
+                logger.warning(f"Not enough recent points for slice_type='{st}' and nssi='{nssi}' to build a window of {self.db.window_size}. Skipping.")
+                continue
+        
+            y_pred_scaled = self.model.predict(X_latest).reshape(-1, 1)
+            y_pred = self.scalers["y"].inverse_transform(y_pred_scaled)[0, 0]
+            results.append({"slice_type": st, "nssi_id": nssi, "predicted_prb_dl_next": float(y_pred)})
+
+            # Fetch NSSI details from RAN NSSMF simulator
+            logger.info(f"Fetching details for NSSI ID: {nssi} from RAN NSSMF simulator.")
+            nssi_details = self.ran_nssmf_client.get_network_slice_subnet(subnet_id=nssi)
+            if nssi_details:
+                logger.info(f"Successfully fetched details for NSSI ID {nssi}.")
+                # logger.debug(f"NSSI Details: {json.dumps(nssi_details, indent=2)}") # Uncomment for verbose details
+
+                # Extract current_prb_dl from nssi_details
+                # Path: nssi_details["attributes"]["sliceProfileList"][0]["ransliceSubnetProfile"]["RRU.PrbDl"]
+                try:
+                    current_prb_dl = nssi_details.get("attributes", {}) \
+                                                .get("sliceProfileList", [{}])[0] \
+                                                .get("ransliceSubnetProfile", {}) \
+                                                .get("RRU.PrbDl")
+                    
+                    if current_prb_dl is not None:
+                        logger.info(f"Current PRB DL for NSSI ID {nssi}: {current_prb_dl}. Predicted PRB DL: {y_pred:.2f}")
+                        if current_prb_dl < y_pred:
+                            logger.info(f"Current PRB DL ({current_prb_dl}) is less than predicted ({y_pred:.2f}). Sending modification request.")
+                            modification_response = self.ran_nssmf_client.modify_network_slice_subnet(
+                                subnet_id=nssi, 
+                                new_prb_dl=int(y_pred) # Cast to int as RRU.PrbDl is an integer
+                            )
+                            if modification_response:
+                                logger.info(f"Successfully sent modification request for NSSI ID {nssi}. Status: {modification_response.status_code}")
+                            else:
+                                logger.warning(f"Failed to send modification request for NSSI ID {nssi}.")
+                        else:
+                            logger.info(f"Current PRB DL ({current_prb_dl}) is not less than predicted ({y_pred:.2f}). No modification needed.")
+                    else:
+                        logger.warning(f"Could not find RRU.PrbDl in NSSI details for NSSI ID {nssi}.")
+                except (IndexError, TypeError) as e:
+                    logger.error(f"Error parsing RRU.PrbDl from NSSI details for NSSI ID {nssi}: {e}. NSSI Details: {json.dumps(nssi_details, indent=2)}")
+
+            else:
+                logger.warning(f"Failed to fetch details for NSSI ID: {nssi}.")
+        
+        logger.info(f"Inference results: {json.dumps({'results': results}, indent=2)}")
+        
+    
+    def build_window_for_latest_slice(
+        self,
+        df: pd.DataFrame,
+        target_slice_type: str,
+        target_nssi_id: str,
+        window: int
+    ) -> Optional[np.ndarray]:
+        g = df[(df["slice_type"] == target_slice_type) & (df["nssi_id"] == target_nssi_id)].sort_values("time").reset_index(drop=True)
+        if len(g) < window:
+            return None
+
+        oh = self.enc.transform(np.array([[target_slice_type]]))
+        oh_row = np.repeat(oh, window, axis=0)
+
+        nssi_oh = self.nssi_enc.transform(np.array([[target_nssi_id]]))
+        nssi_oh_row = np.repeat(nssi_oh, window, axis=0)
+
+        prb = self.scalers["prb"].transform(g[["prb_dl"]].iloc[-window:])
+        data = self.scalers["data"].transform(g[["data_dl"]].iloc[-window:])
+        rrc = self.scalers["rrc"].transform(g[["rrc_succ"]].iloc[-window:])
+
+        feat = np.concatenate([oh_row, nssi_oh_row, prb, data, rrc], axis=1).astype(np.float32)
+        return feat[np.newaxis, :, :]  # shape (1, window, features)
+
+# Global instance of SlicePRBPrediction to be used by Flask routes
+# This instance will be initialized after parsing arguments.
+rapp_instance = None
+
+@app.route('/handleFileReadyNotification', methods=['POST'])
+def handle_file_ready_notification():
+    logger.info("Received POST request on /handleFileReadyNotification")
+    if not rapp_instance:
+        logger.error("rapp_instance not initialized. Cannot process notification.")
+        return jsonify({"status": "error", "message": "Application not properly initialized"}), 500
+
+    notification_data = request.get_json()
+    if not notification_data:
+        logger.warning("No JSON data received in notification.")
+        return jsonify({"status": "error", "message": "Invalid JSON payload"}), 400
+
+    logger.info(f"Notification received: {json.dumps(notification_data, indent=2)}")
+
+    # Trigger the inference process
+    # The notification_data (e.g., fileInfoList) could be used to tailor the inference
+    # For now, we'll trigger the general inference process.
+    try:
+        rapp_instance.safe_inference()
+        logger.info("Inference process triggered successfully by notification.")
+        return jsonify({"status": "success", "message": "Notification received and inference triggered"}), 200
+    except Exception as e:
+        logger.error(f"Error during inference triggered by notification: {str(e)}", exc_info=True)
+        return jsonify({"status": "error", "message": f"Inference failed: {str(e)}"}), 500
+
+if __name__ == "__main__":
+    
+    def str2bool(v):
+        if isinstance(v, bool):
+            return v
+        if v.lower() in ('yes', 'true', 't', 'y', '1'):
+            return True
+        elif v.lower() in ('no', 'false', 'f', 'n', '0'):
+            return False
+        else:
+            raise argparse.ArgumentTypeError('Boolean value expected.')
+
+    parser = argparse.ArgumentParser(description="Run SlicePRBPrediction rApp")
+    parser.add_argument("--use_sme", type=str2bool, default=False, help="Set to True use SME url for DB.")
+    args = parser.parse_args()
+
+    # Instantiate the SlicePRBPrediction class
+    rapp_instance = SlicePRBPrediction(use_sme=args.use_sme)
+    logger.debug("Slice PRB Prediction rApp initialized")
+
+    # Subscribe to RAN NSSMF notifications at startup
+    if rapp_instance:
+        rapp_instance.subscribe_to_notifications()
+    else:
+        logger.error("rapp_instance not initialized. Cannot subscribe to notifications.")
+
+    # Run the Flask app
+    # The host is set to '0.0.0.0' to make it accessible from outside the container (if applicable)
+    # The port is set to 8080 as per the callback URI used in subscription.
+    logger.info("Starting Flask server on port 8080...")
+    app.run(host='0.0.0.0', port=8080)
diff --git a/sample-rapp-generator/rapp-slice-prb-prediction/src/requirements.txt b/sample-rapp-generator/rapp-slice-prb-prediction/src/requirements.txt
new file mode 100644 (file)
index 0000000..33e676c
--- /dev/null
@@ -0,0 +1,8 @@
+influxdb_client
+pandas
+requests
+tensorflow
+numpy
+joblib
+scikit-learn
+Flask