# O-RAN-SC Non-RealTime RIC DMaaP Information Producer
-This product is a generic information producer (as defined by the Information Coordinator Service (ICS)). It can produce any information that can be retrieved from DMaaP. Its main tasks is to register information types and itself as a producer using the ICS Data Producer API.
+This product is a generic information producer (as defined by the Information Coordinator Service (ICS)). It can produce any information that can be retrieved from DMaaP or Kafka. Its main tasks is to register information types and itself as a producer using the ICS Data Producer API.
A data consumer may create information jobs through the ICS Data Producer API.
-This service will retrieve data from the DMaaP Message Router (MR) and distribute it further to the data consumers (information job owners).
+This service will retrieve data from the DMaaP Message Router (MR) or from the Kafka streaming platform and will distribute it further to the data consumers (information job owners).
The component is a springboot service and is configured as any springboot service through the file `config/application.yaml`. The component log can be retrieved and logging can be controled by means of REST call. See the API documentation (api/api.yaml).
"types":
[
{
- "id": "STD_Fault_Messages",
- "dmaapTopicUrl": events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD-Fault-Messages_1.0.0",
+ "id": "ExampleInformationType1_1.0.0",
+ "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD-Fault-Messages_1.0.0",
+ "useHttpProxy": true
+ },
+ {
+ "id": "ExampleInformationType2_2.0.0",
+ "kafkaInputTopic": "KafkaInputTopic",
"useHttpProxy": false
}
]
}
```
-Each information has the following properties:
+Each information type has the following properties:
- id the information type identity as exposed in the Information Coordination Service data consumer API
- dmaapTopicUrl the URL to for fetching information from DMaaP
+ - kafkaInputTopic a Kafka topic to get input from
- useHttpProxy if true, the received information will be delivered using a HTTP proxy (provided that one is setup in the application.yaml file). This might for instance be needed if the data consumer is in the RAN or outside the cluster.
-The service producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
+The service producer will poll MR and/or listen to Kafka topics for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
+
+When an Information Job is created in the Information Coordinator Service Consumer API, it is possible to define a number of job specific properties. For an Information type that has a Kafka topic defined, the following Json schema defines the properties that can be used:
+
+
+```sh
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "filter": {
+ "type": "string"
+ },
+ "maxConcurrency": {
+ "type": "integer"
+ },
+ "bufferTimeout": {
+ "type": "object",
+ "properties": {
+ "maxSize": {
+ "type": "integer"
+ },
+ "maxTimeMiliseconds": {
+ "type": "integer"
+ }
+ },
+ "additionalProperties": false,
+ "required": [
+ "maxSize",
+ "maxTimeMiliseconds"
+ ]
+ }
+ },
+ "additionalProperties": false
+}
+```
+-filter is a regular expression. Only strings that matches the expression will be pushed further to the consumer.
+-maxConcurrency the maximum number of concurrent REST session for the data delivery to the consumer.
+ The default is 1 and that is the number that must be used to guarantee that the object sequence is maintained.
+ A higher number will give higher throughtput.
+-bufferTimeout, can be used to reduce the number of REST calls to the consumer. If defined, a number of objects will be
+ buffered and sent in one REST call to the consumer.
+ The buffered objects will be put in a Json array and quoted. Example;
+ Object1 and Object2 may be posted in one call --> ["Object1", "Object2"]
+ The bufferTimeout is a Json object and the parameters in the object are:
+ - maxSize the maximum number of buffered objects before posting
+ - maxTimeMiliseconds the maximum delay time to buffer before posting
+ If no bufferTimeout is specified, each object will be posted as received in separate calls (not quoted and put in a Json array).
+
+
+For an information type that only has a DMaaP topic, the following Json schema defines the possible parameters to use when creating an information job:
+
+```sh
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "filter": {
+ "type": "string"
+ }
+ },
+ "additionalProperties": false
+}
+```
+-filter is a regular expression. Only strings that matches the expression will be pushed furter to the consumer. This
+ has a similar meaning as in jobs that receives data from Kafka.
## License
}},
"openapi": "3.0.1",
"paths": {
- "/dmaap_dataproducer/info_job": {
+ "/actuator/threaddump": {"get": {
+ "summary": "Actuator web endpoint 'threaddump'",
+ "operationId": "handle_2_1_3",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "tags": ["Actuator"]
+ }},
+ "/actuator/info": {"get": {
+ "summary": "Actuator web endpoint 'info'",
+ "operationId": "handle_9",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"*/*": {"schema": {"type": "object"}}}
+ }},
+ "tags": ["Actuator"]
+ }},
+ "/data-producer/v1/info-types/{infoTypeId}": {"put": {
+ "requestBody": {
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/producer_info_type_info"}}},
+ "required": true
+ },
+ "operationId": "putInfoType",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"type": "object"}}}
+ }},
+ "parameters": [{
+ "schema": {"type": "string"},
+ "in": "path",
+ "name": "infoTypeId",
+ "required": true
+ }],
+ "tags": ["Information Coordinator Service Simulator (exists only in test)"]
+ }},
+ "/generic_dataproducer/health_check": {"get": {
+ "summary": "Producer supervision",
+ "description": "The endpoint is provided by the Information Producer and is used for supervision of the producer.",
+ "operationId": "producerSupervision",
+ "responses": {"200": {
+ "description": "The producer is OK",
+ "content": {"application/json": {"schema": {"type": "string"}}}
+ }},
+ "tags": ["Producer job control API"]
+ }},
+ "/generic_dataproducer/info_job": {
"post": {
"summary": "Callback for Information Job creation/modification",
"requestBody": {
"tags": ["Producer job control API"]
}
},
- "/dmaap_dataproducer/health_check": {"get": {
- "summary": "Producer supervision",
- "description": "The endpoint is provided by the Information Producer and is used for supervision of the producer.",
- "operationId": "producerSupervision",
- "responses": {"200": {
- "description": "The producer is OK",
- "content": {"application/json": {"schema": {"type": "string"}}}
- }},
- "tags": ["Producer job control API"]
- }},
- "/actuator/threaddump": {"get": {
- "summary": "Actuator web endpoint 'threaddump'",
- "operationId": "handle_2_1_3",
- "responses": {"200": {
- "description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
- }},
- "tags": ["Actuator"]
- }},
- "/actuator/info": {"get": {
- "summary": "Actuator web endpoint 'info'",
- "operationId": "handle_9",
- "responses": {"200": {
- "description": "OK",
- "content": {"*/*": {"schema": {"type": "object"}}}
- }},
- "tags": ["Actuator"]
- }},
- "/data-producer/v1/info-types/{infoTypeId}": {"put": {
- "requestBody": {
- "content": {"application/json": {"schema": {"$ref": "#/components/schemas/producer_info_type_info"}}},
- "required": true
- },
- "operationId": "putInfoType",
- "responses": {"200": {
- "description": "OK",
- "content": {"application/json": {"schema": {"type": "object"}}}
- }},
- "parameters": [{
- "schema": {"type": "string"},
- "in": "path",
- "name": "infoTypeId",
- "required": true
- }],
- "tags": ["Information Coordinator Service Simulator (exists only in test)"]
- }},
"/actuator/loggers": {"get": {
"summary": "Actuator web endpoint 'loggers'",
"operationId": "handle_6",
"tags": ["Information Coordinator Service Simulator (exists only in test)"]
}
},
+ "/generic_dataproducer/info_job/{infoJobId}": {"delete": {
+ "summary": "Callback for Information Job deletion",
+ "description": "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.",
+ "operationId": "jobDeletedCallback",
+ "responses": {"200": {
+ "description": "OK",
+ "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
+ }},
+ "parameters": [{
+ "schema": {"type": "string"},
+ "in": "path",
+ "name": "infoJobId",
+ "required": true
+ }],
+ "tags": ["Producer job control API"]
+ }},
"/actuator/metrics/{requiredMetricName}": {"get": {
"summary": "Actuator web endpoint 'metrics-requiredMetricName'",
"operationId": "handle_5",
"tags": ["Actuator"]
}
},
- "/dmaap_dataproducer/info_job/{infoJobId}": {"delete": {
- "summary": "Callback for Information Job deletion",
- "description": "The call is invoked to terminate a data subscription. The endpoint is provided by the Information Producer.",
- "operationId": "jobDeletedCallback",
- "responses": {"200": {
- "description": "OK",
- "content": {"application/json": {"schema": {"$ref": "#/components/schemas/void"}}}
- }},
- "parameters": [{
- "schema": {"type": "string"},
- "in": "path",
- "name": "infoJobId",
- "required": true
- }],
- "tags": ["Producer job control API"]
- }},
"/actuator/health": {"get": {
"summary": "Actuator web endpoint 'health'",
"operationId": "handle_11",
"name": "Copyright (C) 2021 Nordix Foundation. Licensed under the Apache License.",
"url": "http://www.apache.org/licenses/LICENSE-2.0"
},
- "description": "Reads data from DMAAP and sends it further to information consumers",
- "title": "Generic Dmaap Information Producer",
+ "description": "Reads data from DMaaP and Kafka and posts it further to information consumers",
+ "title": "Generic Dmaap and Kafka Information Producer",
"version": "1.0"
},
"tags": [{
openapi: 3.0.1
info:
- title: Generic Dmaap Information Producer
- description: Reads data from DMAAP and sends it further to information consumers
+ title: Generic Dmaap and Kafka Information Producer
+ description: Reads data from DMaaP and Kafka and posts it further to information
+ consumers
license:
name: Copyright (C) 2021 Nordix Foundation. Licensed under the Apache License.
url: http://www.apache.org/licenses/LICENSE-2.0
description: Spring Boot Actuator Web API Documentation
url: https://docs.spring.io/spring-boot/docs/current/actuator-api/html/
paths:
- /dmaap_dataproducer/info_job:
- get:
- tags:
- - Producer job control API
- summary: Get all jobs
- description: Returns all info jobs, can be used for trouble shooting
- operationId: getJobs
- responses:
- 200:
- description: Information jobs
- content:
- application/json:
- schema:
- type: array
- items:
- $ref: '#/components/schemas/producer_info_job_request'
- post:
- tags:
- - Producer job control API
- summary: Callback for Information Job creation/modification
- description: The call is invoked to activate or to modify a data subscription.
- The endpoint is provided by the Information Producer.
- operationId: jobCreatedCallback
- requestBody:
- content:
- application/json:
- schema:
- type: string
- required: true
- responses:
- 200:
- description: OK
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/void'
- 400:
- description: Other error in the request
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/error_information'
- 404:
- description: Information type is not found
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/error_information'
- /dmaap_dataproducer/health_check:
- get:
- tags:
- - Producer job control API
- summary: Producer supervision
- description: The endpoint is provided by the Information Producer and is used
- for supervision of the producer.
- operationId: producerSupervision
- responses:
- 200:
- description: The producer is OK
- content:
- application/json:
- schema:
- type: string
/actuator/threaddump:
get:
tags:
application/json:
schema:
type: object
+ /generic_dataproducer/health_check:
+ get:
+ tags:
+ - Producer job control API
+ summary: Producer supervision
+ description: The endpoint is provided by the Information Producer and is used
+ for supervision of the producer.
+ operationId: producerSupervision
+ responses:
+ 200:
+ description: The producer is OK
+ content:
+ application/json:
+ schema:
+ type: string
+ /generic_dataproducer/info_job:
+ get:
+ tags:
+ - Producer job control API
+ summary: Get all jobs
+ description: Returns all info jobs, can be used for trouble shooting
+ operationId: getJobs
+ responses:
+ 200:
+ description: Information jobs
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ $ref: '#/components/schemas/producer_info_job_request'
+ post:
+ tags:
+ - Producer job control API
+ summary: Callback for Information Job creation/modification
+ description: The call is invoked to activate or to modify a data subscription.
+ The endpoint is provided by the Information Producer.
+ operationId: jobCreatedCallback
+ requestBody:
+ content:
+ application/json:
+ schema:
+ type: string
+ required: true
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/void'
+ 400:
+ description: Other error in the request
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/error_information'
+ 404:
+ description: Information type is not found
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/error_information'
/actuator/loggers:
get:
tags:
application/json:
schema:
type: object
+ /generic_dataproducer/info_job/{infoJobId}:
+ delete:
+ tags:
+ - Producer job control API
+ summary: Callback for Information Job deletion
+ description: The call is invoked to terminate a data subscription. The endpoint
+ is provided by the Information Producer.
+ operationId: jobDeletedCallback
+ parameters:
+ - name: infoJobId
+ in: path
+ required: true
+ style: simple
+ explode: false
+ schema:
+ type: string
+ responses:
+ 200:
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/void'
/actuator/metrics/{requiredMetricName}:
get:
tags:
'*/*':
schema:
type: object
- /dmaap_dataproducer/info_job/{infoJobId}:
- delete:
- tags:
- - Producer job control API
- summary: Callback for Information Job deletion
- description: The call is invoked to terminate a data subscription. The endpoint
- is provided by the Information Producer.
- operationId: jobDeletedCallback
- parameters:
- - name: infoJobId
- in: path
- required: true
- style: simple
- explode: false
- schema:
- type: string
- responses:
- 200:
- description: OK
- content:
- application/json:
- schema:
- $ref: '#/components/schemas/void'
/actuator/health:
get:
tags:
# The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s
http.proxy-host:
http.proxy-port: 0
- ecs-base-url: https://localhost:8434
+ ics-base-url: https://localhost:8434
# Location of the component configuration file. The file will only be used if the Consul database is not used;
# configuration from the Consul will override the file.
configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json
dmaap-base-url: http://dradmin:dradmin@localhost:2222
# The url used to adress this component. This is used as a callback url sent to other components.
dmaap-adapter-base-url: https://localhost:8435
- # KAFKA boostrap server. This is only needed if there are Information Types that uses a kafkaInputTopic
+ # KAFKA boostrap servers. This is only needed if there are Information Types that uses a kafkaInputTopic
+ # several redundant boostrap servers can be specified, separated by a comma ','.
kafka:
bootstrap-servers: localhost:9092
{
"types": [
{
- "id": "ExampleInformationType",
+ "id": "ExampleInformationType1",
"dmaapTopicUrl": "/events/unauthenticated.VES_NOTIFICATION_OUTPUT/OpenDcae-c12/C12",
"useHttpProxy": true
+ },
+ {
+ "id": "ExampleInformationType2",
+ "kafkaInputTopic": "TutorialTopic",
+ "useHttpProxy": false
}
+
]
-}
\ No newline at end of file
+}
package org.oran.dmaapadapter;
+import java.io.File;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
@SpringBootApplication
+@EnableConfigurationProperties
+@EnableScheduling
public class Application {
+ private static final Logger logger = LoggerFactory.getLogger(Application.class);
+
+ @Value("${app.configuration-filepath}")
+ private String localConfigurationFilePath;
+
+ private long configFileLastModification = 0;
+ private static ConfigurableApplicationContext applicationContext;
+
public static void main(String[] args) {
- SpringApplication.run(Application.class);
+ applicationContext = SpringApplication.run(Application.class);
}
+ @Scheduled(fixedRate = 10 * 1000)
+ public void checkConfigFileChanges() {
+ long timestamp = new File(localConfigurationFilePath).lastModified();
+ if (configFileLastModification != 0 && timestamp != configFileLastModification) {
+ logger.info("Restarting due to change in the file {}", localConfigurationFilePath);
+ restartApplication();
+ }
+ configFileLastModification = timestamp;
+ }
+
+ private static void restartApplication() {
+ if (applicationContext == null) {
+ logger.info("Cannot restart in unittest");
+ return;
+ }
+ ApplicationArguments args = applicationContext.getBean(ApplicationArguments.class);
+
+ Thread thread = new Thread(() -> {
+ applicationContext.close();
+ applicationContext = SpringApplication.run(Application.class, args.getSourceArgs());
+ });
+
+ thread.setDaemon(false);
+ thread.start();
+ }
}
public class SwaggerConfig {
private SwaggerConfig() {}
- static final String API_TITLE = "Generic Dmaap Information Producer";
- static final String DESCRIPTION = "Reads data from DMAAP and sends it further to information consumers";
+ static final String API_TITLE = "Generic Dmaap and Kafka Information Producer";
+ static final String DESCRIPTION = "Reads data from DMaaP and Kafka and posts it further to information consumers";
}
import java.util.Collections;
import lombok.Getter;
+import lombok.Setter;
import org.oran.dmaapadapter.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.dmaapadapter.repository.InfoType;
private int httpProxyPort = 0;
@Getter
+ @Setter
@Value("${server.port}")
private int localServerHttpPort;
@Getter
- @Value("${app.ecs-base-url}")
- private String ecsBaseUrl;
+ @Value("${app.ics-base-url}")
+ private String icsBaseUrl;
@Getter
@Value("${app.dmaap-adapter-base-url}")
public static final String API_NAME = "Producer job control API";
public static final String API_DESCRIPTION = "";
- public static final String JOB_URL = "/dmaap_dataproducer/info_job";
- public static final String SUPERVISION_URL = "/dmaap_dataproducer/health_check";
+ public static final String JOB_URL = "/generic_dataproducer/info_job";
+ public static final String SUPERVISION_URL = "/generic_dataproducer/health_check";
private static Gson gson = new GsonBuilder().create();
private final Jobs jobs;
private final InfoTypes types;
return new ResponseEntity<>(HttpStatus.OK);
} catch (ServiceException e) {
logger.warn("jobCreatedCallback failed: {}", e.getMessage());
- return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
+ return ErrorResponse.create(e, e.getHttpStatus());
} catch (Exception e) {
logger.warn("jobCreatedCallback failed: {}", e.getMessage());
return ErrorResponse.create(e, HttpStatus.BAD_REQUEST);
import org.immutables.gson.Gson;
@Gson.TypeAdapters
-@Schema(name = "consumer_job", description = "Information for an Enrichment Information Job")
+@Schema(name = "consumer_job", description = "Information for an Information Job")
public class ConsumerJobInfo {
@Schema(name = "info_type_id", description = "Information type Idenitifier of the subscription job",
import org.oran.dmaapadapter.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpStatus;
public class InfoTypes {
private static final Logger logger = LoggerFactory.getLogger(InfoTypes.class);
public synchronized InfoType getType(String id) throws ServiceException {
InfoType type = allTypes.get(id);
if (type == null) {
- throw new ServiceException("Could not find type: " + id);
+ throw new ServiceException("Could not find type: " + id, HttpStatus.NOT_FOUND);
}
return type;
}
return null;
}
+ public T get(String key1, String key2) {
+ Map<String, T> innerMap = this.map.get(key1);
+ if (innerMap == null) {
+ return null;
+ }
+ return innerMap.get(key2);
+ }
+
public Collection<T> get(String key) {
Map<String, T> innerMap = this.map.get(key);
if (innerMap == null) {
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks.Many;
/**
* The class streams data from a multi cast sink and sends the data to the Job
this.job = job;
}
- public synchronized void start(Many<String> input) {
+ public synchronized void start(Flux<String> input) {
stop();
this.errorStats.resetKafkaErrors();
this.subscription = getMessagesFromKafka(input, job) //
public synchronized void stop() {
if (this.subscription != null) {
- subscription.dispose();
- subscription = null;
+ this.subscription.dispose();
+ this.subscription = null;
}
}
return this.subscription != null;
}
- private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
- Flux<String> result = input.asFlux() //
- .filter(job::isFilterMatch);
+ private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
+ Flux<String> result = input.filter(job::isFilterMatch);
if (job.isBuffered()) {
result = result.map(this::quote) //
public void onJobRemoved(Job job) {
removeJob(job);
}
-
});
}
topicConsumer.start();
}
KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
- subscription.start(topicConsumer.getOutput());
+ subscription.start(topicConsumer.getOutput().asFlux());
consumers.put(job.getType().getId(), job.getId(), subscription);
}
}
}
@Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
- public synchronized void restartNonRunningTasks() {
- this.consumers.keySet().forEach(typeId -> {
- this.consumers.get(typeId).forEach(consumer -> {
- if (!consumer.isRunning()) {
- restartTopic(consumer);
- }
- });
- });
+ public synchronized void restartNonRunningTopics() {
+ for (String typeId : this.consumers.keySet()) {
+ for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) {
+ restartTopic(consumer);
+ }
+ }
}
private void restartTopic(KafkaJobDataConsumer consumer) {
}
private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
- this.consumers.get(type.getId()).forEach((consumer) -> {
- consumer.start(topic.getOutput());
- });
+ this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
}
}
import reactor.core.publisher.Mono;
/**
- * Registers the types and this producer in ECS. This is done when needed.
+ * Registers the types and this producer in Innformation Coordinator Service.
+ * This is done when needed.
*/
@Component
@EnableScheduling
private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
@Getter
- private boolean isRegisteredInEcs = false;
+ private boolean isRegisteredInIcs = false;
private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5;
public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) {
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
public void supervisionTask() {
checkRegistration() //
- .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) //
+ .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
.flatMap(isRegisterred -> registerTypesAndProducer()) //
.subscribe( //
null, //
}
private void handleRegistrationCompleted() {
- isRegisteredInEcs = true;
+ isRegisteredInIcs = true;
}
private void handleRegistrationFailure(Throwable t) {
// Returns TRUE if registration is correct
private Mono<Boolean> checkRegistration() {
- final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ final String url = applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return restClient.get(url) //
.flatMap(this::isRegisterredInfoCorrect) //
.onErrorResume(t -> Mono.just(Boolean.FALSE));
private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
if (isEqual(producerRegistrationInfo(), registerredInfo)) {
- logger.trace("Already registered in ECS");
+ logger.trace("Already registered in ICS");
return Mono.just(Boolean.TRUE);
} else {
return Mono.just(Boolean.FALSE);
}
private String registerTypeUrl(InfoType type) {
- return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
+ return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
}
private Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 20;
final String producerUrl =
- applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
}
private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
-
- if (type.isKafkaTopicDefined()) {
- String schemaStrKafka = readSchemaFile("/typeSchemaKafka.json");
- return jsonObject(schemaStrKafka);
- } else {
- // An object with no properties
- String schemaStr = "{" //
- + "\"type\": \"object\"," //
- + "\"properties\": {" //
- + " \"filter\": { \"type\": \"string\" }" //
- + "}," //
- + "\"additionalProperties\": false" //
- + "}"; //
-
- return
-
- jsonObject(schemaStr);
- }
-
+ String schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json";
+ return jsonObject(readSchemaFile(schemaFile));
}
private String readSchemaFile(String filePath) throws IOException, ServiceException {
return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
}
+ @SuppressWarnings("java:S2139") // Log exception
private Object jsonObject(String json) {
try {
return JsonParser.parseString(json).getAsJsonObject();
} catch (Exception e) {
- logger.error("Bug, error in JSON: {}", json);
- throw new NullPointerException(e.toString());
+ logger.error("Bug, error in JSON: {} {}", json, e.getMessage());
+ throw new NullPointerException(e.getMessage());
}
}
}
private ProducerRegistrationInfo producerRegistrationInfo() {
-
return ProducerRegistrationInfo.builder() //
.jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) //
.producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) //
--- /dev/null
+{
+ "$schema": "http://json-schema.org/draft-04/schema#",
+ "type": "object",
+ "properties": {
+ "filter": {
+ "type": "string"
+ }
+ },
+ "additionalProperties": false
+}
import org.json.JSONObject;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
@ExtendWith(SpringExtension.class)
-@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
+@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@TestPropertySource(properties = { //
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", //
private ConsumerController consumerController;
@Autowired
- private EcsSimulatorController ecsSimulatorController;
+ private IcsSimulatorController icsSimulatorController;
+
+ @Autowired
+ KafkaTopicConsumers kafkaTopicConsumers;
private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
static class TestApplicationConfig extends ApplicationConfig {
@Override
- public String getEcsBaseUrl() {
+ public String getIcsBaseUrl() {
return thisProcessUrl();
}
}
}
+ @BeforeEach
+ void setPort() {
+ this.applicationConfig.setLocalServerHttpPort(this.localServerHttpPort);
+ }
+
@AfterEach
void reset() {
this.consumerController.testResults.reset();
- this.ecsSimulatorController.testResults.reset();
+ this.icsSimulatorController.testResults.reset();
this.jobs.clear();
}
}
@Test
- void testWholeChain() throws Exception {
+ void testReceiveAndPostDataFromKafka() {
+ final String JOB_ID = "ID";
+ final String TYPE_ID = "KafkaInformationType";
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ // Create a job
+ Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1);
+ String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ ConsumerJobInfo kafkaJobInfo =
+ new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+
+ this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ KafkaJobDataConsumer kafkaConsumer = this.kafkaTopicConsumers.getConsumers().get(TYPE_ID, JOB_ID);
+
+ // Handle received data from Kafka, check that it has been posted to the
+ // consumer
+ kafkaConsumer.start(Flux.just("data"));
+
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(1));
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
+
+ // Test send an exception
+ kafkaConsumer.start(Flux.error(new NullPointerException()));
+
+ // Test regular restart of stopped
+ kafkaConsumer.stop();
+ this.kafkaTopicConsumers.restartNonRunningTopics();
+ await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
+
+ // Delete the job
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
+ @Test
+ void testReceiveAndPostDataFromDmaap() throws Exception {
final String JOB_ID = "ID";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create a job
- this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
// Return two messages from DMAAP and verify that these are sent to the owner of
assertThat(jobs).contains(JOB_ID);
// Delete the job
- this.ecsSimulatorController.deleteJob(JOB_ID, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
@Test
void testReRegister() throws Exception {
// Wait foir register types and producer
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Clear the registration, should trigger a re-register
- ecsSimulatorController.testResults.reset();
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ icsSimulatorController.testResults.reset();
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Just clear the registerred types, should trigger a re-register
- ecsSimulatorController.testResults.types.clear();
+ icsSimulatorController.testResults.types.clear();
await().untilAsserted(
- () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
- }
-
- @Test
- void testCreateKafkaJob() {
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
-
- final String TYPE_ID = "KafkaInformationType";
-
- Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
- String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
- ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
-
- // Create a job
- this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
-
- // Delete the job
- this.ecsSimulatorController.deleteJob("JOB_ID", restClient());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ () -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
}
private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
@RestController("IcsSimulatorController")
@Tag(name = "Information Coordinator Service Simulator (exists only in test)")
-public class EcsSimulatorController {
+public class IcsSimulatorController {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final static Gson gson = new GsonBuilder().create();
ProducerJobInfo request =
new ProducerJobInfo(job.jobDefinition, jobId, job.infoTypeId, job.jobResultUri, job.owner, "TIMESTAMP");
String body = gson.toJson(request);
- logger.info("ECS Simulator PUT job: {}", body);
+ logger.info("ICS Simulator PUT job: {}", body);
restClient.post(url, body, MediaType.APPLICATION_JSON).block();
}
public void deleteJob(String jobId, AsyncRestClient restClient) {
String url = this.testResults.registrationInfo.jobCallbackUrl + "/" + jobId;
- logger.info("ECS Simulator DELETE job: {}", url);
+ logger.info("ICS Simulator DELETE job: {}", url);
restClient.delete(url).block();
}
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", //
"app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
- "app.ecs-base-url=https://localhost:8434" //
+ "app.ics-base-url=https://localhost:8434" //
})
-class IntegrationWithEcs {
+class IntegrationWithIcs {
private static final String DMAAP_JOB_ID = "DMAAP_JOB_ID";
private static final String DMAAP_TYPE_ID = "DmaapInformationType";
static class TestApplicationConfig extends ApplicationConfig {
@Override
- public String getEcsBaseUrl() {
+ public String getIcsBaseUrl() {
return "https://localhost:8434";
}
return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
}
- private String ecsBaseUrl() {
- return applicationConfig.getEcsBaseUrl();
+ private String icsBaseUrl() {
+ return applicationConfig.getIcsBaseUrl();
}
private String jobUrl(String jobId) {
- return ecsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId + "?typeCheck=true";
+ return icsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId + "?typeCheck=true";
}
- private void createInformationJobInEcs(String typeId, String jobId, String filter) {
+ private void createInformationJobInIcs(String typeId, String jobId, String filter) {
String body = gson.toJson(consumerJobInfo(typeId, filter));
try {
// Delete the job if it already exists
- deleteInformationJobInEcs(jobId);
+ deleteInformationJobInIcs(jobId);
} catch (Exception e) {
}
restClient().putForEntity(jobUrl(jobId), body).block();
}
- private void deleteInformationJobInEcs(String jobId) {
+ private void deleteInformationJobInIcs(String jobId) {
restClient().delete(jobUrl(jobId)).block();
}
@Test
void testCreateKafkaJob() {
- await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+ await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
final String TYPE_ID = "KafkaInformationType";
Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
- deleteInformationJobInEcs("KAFKA_JOB_ID");
+ deleteInformationJobInIcs("KAFKA_JOB_ID");
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
@Test
void testWholeChain() throws Exception {
- await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+ await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
- createInformationJobInEcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*");
+ createInformationJobInIcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*");
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
await().untilAsserted(() -> assertThat(results.receivedBodies.size()).isEqualTo(2));
assertThat(results.receivedBodies.get(0)).isEqualTo("DmaapResponse1");
- deleteInformationJobInEcs(DMAAP_JOB_ID);
+ deleteInformationJobInIcs(DMAAP_JOB_ID);
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- synchronized (this) {
- // logger.warn("**************** Keeping server alive! " +
- // this.applicationConfig.getLocalServerHttpPort());
- // this.wait();
- }
}
}
private ConsumerController consumerController;
@Autowired
- private EcsSimulatorController ecsSimulatorController;
+ private IcsSimulatorController icsSimulatorController;
@Autowired
private KafkaTopicConsumers kafkaTopicConsumers;
static class TestApplicationConfig extends ApplicationConfig {
@Override
- public String getEcsBaseUrl() {
+ public String getIcsBaseUrl() {
return thisProcessUrl();
}
@AfterEach
void reset() {
this.consumerController.testResults.reset();
- this.ecsSimulatorController.testResults.reset();
+ this.icsSimulatorController.testResults.reset();
this.jobs.clear();
}
final String JOB_ID2 = "ID2";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs. One buffering and one with a filter
- this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
restClient());
- this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
// Delete the jobs
- this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
- this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
final String JOB_ID2 = "ID2";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs.
- this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
restClient());
- this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
this.consumerController.testResults.reset();
- this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
- kafkaTopicConsumers.restartNonRunningTasks();
+ this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+ kafkaTopicConsumers.restartNonRunningTopics();
Thread.sleep(1000); // Restarting the input seems to take some asynch time
dataToSend = Flux.just(senderRecord("Howdy\""));
verifiedReceivedByConsumer("[\"Howdy\\\"\"]");
// Delete the jobs
- this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
- this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
>- INFO_PRODUCER_HOST **Required**. The host for the producer. Example: `https://mrproducer`
>- INFO_PRODUCER_PORT Optional. The port for the product. Defaults to `8085`.
->- INFO_COORD_ADDR Optional. The address of the Information Coordinator. Defaults to `https://enrichmentservice:8434`.
+>- INFO_COORD_ADDR Optional. The address of the Information Coordinator. Defaults to `https://informationservice:8434`.
>- DMAAP_MR_ADDR Optional. The address of the DMaaP Message Router. Defaults to `https://message-router.onap:3905`.
>- PRODUCER_CERT_PATH Optional. The path to the certificate to use for https. Defaults to `security/producer.crt`
>- PRODUCER_KEY_PATH Optional. The path to the key to the certificate to use for https. Defaults to `security/producer.key`
Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer until it is available again.
+The producer provides a REST API to control the log level. The available levels are the same as the ones used in the configuration above.
+
+ PUT https://mrproducer:8085/admin/log?level=<new level>
+
## Development
To make it easy to test during development of the producer, two stubs are provided in the `stub` folder.
-One, under the `dmaap` folder, called `dmaap` that stubs MR and respond with an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. The default port is `3905`, but this can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
+One, under the `dmaap` folder, called `dmaap` that stubs MR and respond with an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. The default port is `3905`, but this can be overridden by passing a `-port <PORT>` flag when starting the stub. To build and start the stub, do the following:
>1. cd stub/dmaap
>2. go build
->3. ./dmaap
+>3. ./dmaap [-port \<PORT>]
-One, under the `consumer` folder, called `consumer` that at startup will register a job of type `STD_Fault_Messages` in ICS, and then listen for REST calls and print the body of them. By default, it listens to the port `40935`, but his can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
+One, under the `consumer` folder, called `consumer` that at startup will register a job of type `STD_Fault_Messages` in ICS, and then listen for REST calls and print the body of them. By default, it listens to the port `40935`, but his can be overridden by passing a `-port <PORT>` flag when starting the stub. To build and start the stub, do the following:
>1. cd stub/consumer
>2. go build
->3. ./consumer
+>3. ./consumer [-port \<PORT>]
Mocks needed for unit tests have been generated using `github.com/stretchr/testify/mock` and are checked in under the `mocks` folder. **Note!** Keep in mind that if any of the mocked interfaces change, a new mock for that interface must be generated and checked in.
go 1.17
require (
+ github.com/gorilla/mux v1.8.0
+ github.com/hashicorp/go-retryablehttp v0.7.0
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
- github.com/gorilla/mux v1.8.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
- github.com/hashicorp/go-retryablehttp v0.7.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.1.0 // indirect
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
+github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-retryablehttp v0.7.0 h1:eu1EI/mbirUgP5C8hVsTNaGZreBDlYiwC1FZWkvQPQ4=
github.com/hashicorp/go-retryablehttp v0.7.0/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
package config
import (
+ "encoding/json"
"fmt"
"os"
"strconv"
return &Config{
InfoProducerHost: getEnv("INFO_PRODUCER_HOST", ""),
InfoProducerPort: getEnvAsInt("INFO_PRODUCER_PORT", 8085),
- InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://enrichmentservice:8434"),
+ InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "https://informationservice:8434"),
DMaaPMRAddress: getEnv("DMAAP_MR_ADDR", "https://message-router.onap:3905"),
ProducerCertPath: getEnv("PRODUCER_CERT_PATH", "security/producer.crt"),
ProducerKeyPath: getEnv("PRODUCER_KEY_PATH", "security/producer.key"),
return log.InfoLevel
}
}
+
+func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) {
+ typeDefsByte, err := os.ReadFile(configFile)
+ if err != nil {
+ return nil, err
+ }
+ typeDefs := struct {
+ Types []TypeDefinition `json:"types"`
+ }{}
+ err = json.Unmarshal(typeDefsByte, &typeDefs)
+ if err != nil {
+ return nil, err
+ }
+
+ return typeDefs.Types, nil
+}
import (
"bytes"
"os"
- "reflect"
+ "path/filepath"
"testing"
log "github.com/sirupsen/logrus"
LogLevel: log.InfoLevel,
InfoProducerHost: "",
InfoProducerPort: 8085,
- InfoCoordinatorAddress: "https://enrichmentservice:8434",
+ InfoCoordinatorAddress: "https://informationservice:8434",
DMaaPMRAddress: "https://message-router.onap:3905",
ProducerCertPath: "security/producer.crt",
ProducerKeyPath: "security/producer.key",
}
- if got := New(); !reflect.DeepEqual(got, &wantConfig) {
- t.Errorf("New() = %v, want %v", got, &wantConfig)
- }
+ got := New()
+ assertions.Equal(&wantConfig, got)
logString := buf.String()
assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used")
}
LogLevel: log.InfoLevel,
InfoProducerHost: "",
InfoProducerPort: 8085,
- InfoCoordinatorAddress: "https://enrichmentservice:8434",
+ InfoCoordinatorAddress: "https://informationservice:8434",
DMaaPMRAddress: "https://message-router.onap:3905",
ProducerCertPath: "security/producer.crt",
ProducerKeyPath: "security/producer.key",
logString := buf.String()
assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!")
}
+
+const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
+
+func TestGetTypesFromConfiguration_fileOkShouldReturnSliceOfTypeDefinitions(t *testing.T) {
+ assertions := require.New(t)
+ typesDir, err := os.MkdirTemp("", "configs")
+ if err != nil {
+ t.Errorf("Unable to create temporary directory for types due to: %v", err)
+ }
+ fname := filepath.Join(typesDir, "type_config.json")
+ t.Cleanup(func() {
+ os.RemoveAll(typesDir)
+ })
+ if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
+ t.Errorf("Unable to create temporary config file for types due to: %v", err)
+ }
+
+ types, err := GetJobTypesFromConfiguration(fname)
+
+ wantedType := TypeDefinition{
+ Id: "type1",
+ DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+ }
+ wantedTypes := []TypeDefinition{wantedType}
+ assertions.EqualValues(wantedTypes, types)
+ assertions.Nil(err)
+}
package jobs
import (
- "encoding/json"
"fmt"
- "os"
"sync"
log "github.com/sirupsen/logrus"
}
type JobTypesManager interface {
- LoadTypesFromConfiguration() ([]config.TypeDefinition, error)
+ LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
GetSupportedTypes() []string
}
}
type JobsManagerImpl struct {
- configFile string
allTypes map[string]TypeData
pollClient restclient.HTTPClient
mrAddress string
distributeClient restclient.HTTPClient
}
-func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
+func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
return &JobsManagerImpl{
- configFile: typeConfigFilePath,
allTypes: make(map[string]TypeData),
pollClient: pollClient,
mrAddress: mrAddr,
return nil
}
-func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) {
- typeDefsByte, err := os.ReadFile(jm.configFile)
- if err != nil {
- return nil, err
- }
- typeDefs := struct {
- Types []config.TypeDefinition `json:"types"`
- }{}
- err = json.Unmarshal(typeDefsByte, &typeDefs)
- if err != nil {
- return nil, err
- }
- for _, typeDef := range typeDefs.Types {
+func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
+ for _, typeDef := range types {
jm.allTypes[typeDef.Id] = TypeData{
TypeId: typeDef.Id,
DMaaPTopicURL: typeDef.DmaapTopicURL,
jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
}
}
- return typeDefs.Types, nil
+ return types
}
func (jm *JobsManagerImpl) GetSupportedTypes() []string {
"bytes"
"io/ioutil"
"net/http"
- "os"
- "path/filepath"
"sync"
"testing"
"time"
func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
assertions := require.New(t)
- typesDir, err := os.MkdirTemp("", "configs")
- if err != nil {
- t.Errorf("Unable to create temporary directory for types due to: %v", err)
- }
- fname := filepath.Join(typesDir, "type_config.json")
- managerUnderTest := NewJobsManagerImpl(fname, nil, "", nil)
- t.Cleanup(func() {
- os.RemoveAll(typesDir)
- })
- if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
- t.Errorf("Unable to create temporary config file for types due to: %v", err)
- }
- types, err := managerUnderTest.LoadTypesFromConfiguration()
+
+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+
wantedType := config.TypeDefinition{
Id: "type1",
DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
}
wantedTypes := []config.TypeDefinition{wantedType}
+
+ types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
+
assertions.EqualValues(wantedTypes, types)
- assertions.Nil(err)
supportedTypes := managerUnderTest.GetSupportedTypes()
assertions.EqualValues([]string{"type1"}, supportedTypes)
func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
wantedJob := JobInfo{
Owner: "owner",
LastUpdated: "now",
func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
jobInfo := JobInfo{
InfoTypeIdentity: "type1",
}
func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
managerUnderTest.allTypes["type1"] = TypeData{
TypeId: "type1",
}
func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
managerUnderTest.allTypes["type1"] = TypeData{
TypeId: "type1",
}
func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
jobsHandler := jobsHandler{
deleteJobCh: make(chan string)}
managerUnderTest.allTypes["type1"] = TypeData{
})
jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
- jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock)
+ jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock)
jobsManager.allTypes["type1"] = TypeData{
DMaaPTopicURL: "/topicUrl",
TypeId: "type1",
"time"
"github.com/hashicorp/go-retryablehttp"
+ log "github.com/sirupsen/logrus"
)
// HTTPClient interface
func CreateRetryClient(cert tls.Certificate) *http.Client {
rawRetryClient := retryablehttp.NewClient()
+ rawRetryClient.Logger = leveledLogger{}
rawRetryClient.RetryWaitMax = time.Minute
rawRetryClient.RetryMax = math.MaxInt
rawRetryClient.HTTPClient.Transport = getSecureTransportWithoutVerify(cert)
u, _ := url.Parse(configUrl)
return u.Scheme == "https"
}
+
+// Used to get leveled logging in the RetryClient
+type leveledLogger struct {
+}
+
+func (ll leveledLogger) Error(msg string, keysAndValues ...interface{}) {
+ log.WithFields(getFields(keysAndValues)).Error(msg)
+}
+func (ll leveledLogger) Info(msg string, keysAndValues ...interface{}) {
+ log.WithFields(getFields(keysAndValues)).Info(msg)
+}
+func (ll leveledLogger) Debug(msg string, keysAndValues ...interface{}) {
+ log.WithFields(getFields(keysAndValues)).Debug(msg)
+}
+func (ll leveledLogger) Warn(msg string, keysAndValues ...interface{}) {
+ log.WithFields(getFields(keysAndValues)).Warn(msg)
+}
+
+func getFields(keysAndValues []interface{}) log.Fields {
+ fields := log.Fields{}
+ for i := 0; i < len(keysAndValues); i = i + 2 {
+ fields[fmt.Sprint(keysAndValues[i])] = keysAndValues[i+1]
+ }
+ return fields
+}
"net/http"
"github.com/gorilla/mux"
+ log "github.com/sirupsen/logrus"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
)
const AddJobPath = "/jobs"
const jobIdToken = "infoJobId"
const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}"
+const logLevelToken = "level"
+const logAdminPath = "/admin/log"
type ProducerCallbackHandler struct {
jobsManager jobs.JobsManager
r.HandleFunc(StatusPath, statusHandler).Methods(http.MethodGet).Name("status")
r.HandleFunc(AddJobPath, callbackHandler.addInfoJobHandler).Methods(http.MethodPost).Name("add")
r.HandleFunc(deleteJobPath, callbackHandler.deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete")
+ r.HandleFunc(logAdminPath, callbackHandler.setLogLevel).Methods(http.MethodPut).Name("setLogLevel")
r.NotFoundHandler = ¬FoundHandler{}
r.MethodNotAllowedHandler = &methodNotAllowedHandler{}
return r
h.jobsManager.DeleteJobFromRESTCall(id)
}
+func (h *ProducerCallbackHandler) setLogLevel(w http.ResponseWriter, r *http.Request) {
+ query := r.URL.Query()
+ logLevelStr := query.Get(logLevelToken)
+ if loglevel, err := log.ParseLevel(logLevelStr); err == nil {
+ log.SetLevel(loglevel)
+ } else {
+ http.Error(w, fmt.Sprintf("Invalid log level: %v. Log level will not be changed!", logLevelStr), http.StatusBadRequest)
+ return
+ }
+}
+
type notFoundHandler struct{}
func (h *notFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
handler.ServeHTTP(responseRecorder, newRequest(http.MethodPut, "/status", nil, t))
assertions.Equal(http.StatusMethodNotAllowed, responseRecorder.Code)
assertions.Contains(responseRecorder.Body.String(), "Method is not supported.")
+
+ setLogLevelRoute := r.Get("setLogLevel")
+ assertions.NotNil(setLogLevelRoute)
+ supportedMethods, err = setLogLevelRoute.GetMethods()
+ assertions.Equal([]string{http.MethodPut}, supportedMethods)
+ assertions.Nil(err)
+ path, _ = setLogLevelRoute.GetPathTemplate()
+ assertions.Equal("/admin/log", path)
}
func TestStatusHandler(t *testing.T) {
},
},
wantedStatus: http.StatusOK,
- wantedBody: "",
},
{
name: "AddInfoJobHandler with incorrect job info, should return BadRequest",
jobHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
}
+func TestSetLogLevel(t *testing.T) {
+ assertions := require.New(t)
+
+ type args struct {
+ logLevel string
+ }
+ tests := []struct {
+ name string
+ args args
+ wantedStatus int
+ wantedBody string
+ }{
+ {
+ name: "Set to valid log level, should return OK",
+ args: args{
+ logLevel: "Debug",
+ },
+ wantedStatus: http.StatusOK,
+ },
+ {
+ name: "Set to invalid log level, should return BadRequest",
+ args: args{
+ logLevel: "bad",
+ },
+ wantedStatus: http.StatusBadRequest,
+ wantedBody: "Invalid log level: bad",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ callbackHandlerUnderTest := NewProducerCallbackHandler(nil)
+
+ handler := http.HandlerFunc(callbackHandlerUnderTest.setLogLevel)
+ responseRecorder := httptest.NewRecorder()
+ r, _ := http.NewRequest(http.MethodPut, "/admin/log?level="+tt.args.logLevel, nil)
+
+ handler.ServeHTTP(responseRecorder, r)
+
+ assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
+ assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
+ })
+ }
+}
+
func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request {
var body io.Reader
if jobInfo != nil {
}
retryClient := restclient.CreateRetryClient(cert)
- jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 5*time.Second))
+ jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
log.Fatalf("Stopping producer due to: %v", err)
}
}
func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
- if types, err := jobTypesHandler.LoadTypesFromConfiguration(); err == nil {
- if regErr := registrator.RegisterTypes(types); regErr != nil {
- return fmt.Errorf("unable to register all types due to: %v", regErr)
- }
- } else {
- return fmt.Errorf("unable to get types to register due to: %v", err)
+ configTypes, err := config.GetJobTypesFromConfiguration("configs/type_config.json")
+ if err != nil {
+ return fmt.Errorf("unable to register all types due to: %v", err)
}
+ regErr := registrator.RegisterTypes(jobTypesHandler.LoadTypesFromConfiguration(configTypes))
+ if regErr != nil {
+ return fmt.Errorf("unable to register all types due to: %v", regErr)
+ }
+
producer := config.ProducerRegistrationInfo{
InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
SupportedInfoTypes: jobTypesHandler.GetSupportedTypes(),
registerJob(*port)
- fmt.Print("Starting consumer on port: ", *port)
+ fmt.Println("Starting consumer on port: ", *port)
fmt.Println(http.ListenAndServe(fmt.Sprintf(":%v", *port), nil))
}
InfoTypeId: "STD_Fault_Messages",
JobDefinition: "{}",
}
- fmt.Print("Registering consumer: ", jobInfo)
+ fmt.Println("Registering consumer: ", jobInfo)
body, _ := json.Marshal(jobInfo)
putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient)
if putErr != nil {
- fmt.Printf("Unable to register consumer: %v", putErr)
+ fmt.Println("Unable to register consumer: ", putErr)
}
}
var responseBody []byte
if critical {
responseBody = getFaultMessage("CRITICAL")
+ fmt.Println("Sending CRITICAL")
critical = false
} else {
responseBody = getFaultMessage("NORMAL")
+ fmt.Println("Sending NORMAL")
critical = true
}
- // w.Write(responseBody)
fmt.Fprint(w, string(responseBody))
}
NONRTRIC_GATEWAY_IMAGE_TAG="1.0.0"
#ECS
-ECS_IMAGE_BASE="nexus3.o-ran-sc.org:10002/o-ran-sc/nonrtric-enrichment-coordinator-service"
+ECS_IMAGE_BASE="nexus3.o-ran-sc.org:10002/o-ran-sc/nonrtric-information-coordinator-service"
ECS_IMAGE_TAG="1.1.0"
#CONSUMER
#DMAAP_MEDIATOR_JAVA
DMAAP_MEDIATOR_JAVA_BASE="nexus3.o-ran-sc.org:10003/o-ran-sc/nonrtric-dmaap-adaptor"
-DMAAP_MEDIATOR_JAVA_TAG="1.0.0-SNAPSHOT"
\ No newline at end of file
+DMAAP_MEDIATOR_JAVA_TAG="1.0.0-SNAPSHOT"
## O-RAN-SC Control Panel
-The Non-RT RIC Control Panel is a graphical user interface that enables the user to view and manage the A1 policies in the RAN and also view producers and jobs for the Enrichement coordinator service.
+The Non-RT RIC Control Panel is a graphical user interface that enables the user to view and manage the A1 policies in the RAN and also view producers and jobs for the Information coordinator service.
### O-RAN-SC Control Panel Gateway:
-To view the policy or enrichment information in control panel gui along with Policy Management Service & Enrichment Coordinator Service you should also have nonrtric gateway because all the request from the gui is passed through this API gateway.
+To view the policy or information jobs and types in control panel gui along with Policy Management Service & Information Coordinator Service you should also have nonrtric gateway because all the request from the gui is passed through this API gateway.
#### Prerequisite:
To start all the necessary components, run the following command:
-docker-compose -f docker-compose.yaml -f control-panel/docker-compose.yaml -f nonrtric-gateway/docker-compose.yaml -f policy-service/docker-compose.yaml -f ecs/docker-compose.yaml -f a1-sim/docker-compose.yaml up
\ No newline at end of file
+docker-compose -f docker-compose.yaml -f control-panel/docker-compose.yaml -f nonrtric-gateway/docker-compose.yaml -f policy-service/docker-compose.yaml -f ecs/docker-compose.yaml -f a1-sim/docker-compose.yaml up
environment:
- INFO_PRODUCER_HOST=http://consumer
- INFO_PRODUCER_PORT=8088
- - INFO_COORD_ADDR=http://ecs:8083
+ - INFO_COORD_ADDR=http://ics:8083
- DMAAP_MR_ADDR=http://dmaap-mr:3904
- PRODUCER_CERT_PATH=security/producer.crt
- PRODUCER_KEY_PATH=security/producer.key
- LOG_LEVEL=Debug
networks:
- - default
\ No newline at end of file
+ - default
name: nonrtric-docker-net
services:
- ecs:
- image: "${ECS_IMAGE_BASE}:${ECS_IMAGE_TAG}"
- container_name: ecs
+ ics:
+ image: "${ICS_IMAGE_BASE}:${ICS_IMAGE_TAG}"
+ container_name: ics
networks:
default:
aliases:
- - enrichment-service-container
+ - information-service-container
ports:
- 8083:8083
- 8434:8434
+++ /dev/null
-information-coordinator-service
\ No newline at end of file
}
},
"consumer_job": {
- "description": "Information for an Enrichment Information Job",
+ "description": "Information for an Information Job",
"type": "object",
"required": [
"info_type_id",
status_notification_uri:
type: string
description: The target of Information subscription job status notifications
- description: Information for an Enrichment Information Job
+ description: Information for an Information Job
producer_status:
required:
- operational_state
import org.immutables.gson.Gson;
@Gson.TypeAdapters
-@Schema(name = "consumer_job", description = "Information for an Enrichment Information Job")
+@Schema(name = "consumer_job", description = "Information for an Information Job")
public class ConsumerJobInfo {
@Schema(
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", "app.vardata-directory=./target"})
@SuppressWarnings("squid:S3577") // Not containing any tests since it is a mock.
-class MockEnrichmentService {
+class MockInformationService {
private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
@LocalServerPort
--- /dev/null
+{
+ "type": "record",
+ "name": "Std_Defined_Output",
+ "fields": [
+ {
+ "name": "radio_DasH_resource_DasH_management_DasH_policy_DasH_ratio",
+ "type": {
+ "type": "array",
+ "items": {
+ "name": "RRM_Policy_Ratio",
+ "type": "record",
+ "fields": [
+ {
+ "name": "id",
+ "type": "string"
+ },
+ {
+ "name": "administrative_DasH_state",
+ "type": "string"
+ },
+ {
+ "name": "user_DasH_label",
+ "type": "string"
+ },
+ {
+ "name": "radio_DasH_resource_DasH_management_DasH_policy_DasH_max_DasH_ratio",
+ "type": "string"
+ },
+ {
+ "name": "radio_DasH_resource_DasH_management_DasH_policy_DasH_min_DasH_ratio",
+ "type": "string"
+ },
+ {
+ "name": "radio_DasH_resource_DasH_management_DasH_policy_DasH_dedicated_DasH_ratio",
+ "type": "string"
+ },
+ {
+ "name": "resource_DasH_type",
+ "type": "string"
+ },
+ {
+ "name": "radio_DasH_resource_DasH_management_DasH_policy_DasH_members",
+ "type": {
+ "type": "array",
+ "items": {
+ "name": "RRM_Policy_Members",
+ "type": "record",
+ "fields": [
+ {
+ "name": "mobile_DasH_country_DasH_code",
+ "type": "string"
+ },
+ {
+ "name": "mobile_DasH_network_DasH_code",
+ "type": "string"
+ },
+ {
+ "name": "slice_DasH_differentiator",
+ "type": "int"
+ },
+ {
+ "name": "slice_DasH_service_DasH_type",
+ "type": "int"
+ }
+ ]
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+ ]
+}
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
+ env:
+ - name: TOPIC_READ
+ value: http://dmaap-mr:3904/events/unauthenticated.SEC_FAULT_OUTPUT
+ - name: TOPIC_WRITE
+ value: http://dmaap-mr:3904/events/unauthenticated.SEC_FAULT_OUTPUT
+ - name: GENERIC_TOPICS_UPLOAD_BASEURL
+ value: http://dmaap-mr:3904
ports:
- name: http
containerPort: 3904