# The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s
http.proxy-host:
http.proxy-port: 0
- ecs-base-url: https://localhost:8434
+ ics-base-url: https://localhost:8434
# Location of the component configuration file. The file will only be used if the Consul database is not used;
# configuration from the Consul will override the file.
configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json
private int localServerHttpPort;
@Getter
- @Value("${app.ecs-base-url}")
- private String ecsBaseUrl;
+ @Value("${app.ics-base-url}")
+ private String icsBaseUrl;
@Getter
@Value("${app.dmaap-adapter-base-url}")
return null;
}
+ public T get(String key1, String key2) {
+ Map<String, T> innerMap = this.map.get(key1);
+ if (innerMap == null) {
+ return null;
+ }
+ return innerMap.get(key2);
+ }
+
public Collection<T> get(String key) {
Map<String, T> innerMap = this.map.get(key);
if (innerMap == null) {
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks.Many;
/**
* The class streams data from a multi cast sink and sends the data to the Job
this.job = job;
}
- public synchronized void start(Many<String> input) {
+ public synchronized void start(Flux<String> input) {
stop();
this.errorStats.resetKafkaErrors();
this.subscription = getMessagesFromKafka(input, job) //
public synchronized void stop() {
if (this.subscription != null) {
- subscription.dispose();
- subscription = null;
+ this.subscription.dispose();
+ this.subscription = null;
}
}
return this.subscription != null;
}
- private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
- Flux<String> result = input.asFlux() //
- .filter(job::isFilterMatch);
+ private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
+ Flux<String> result = input.filter(job::isFilterMatch);
if (job.isBuffered()) {
result = result.map(this::quote) //
public void onJobRemoved(Job job) {
removeJob(job);
}
-
});
}
topicConsumer.start();
}
KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
- subscription.start(topicConsumer.getOutput());
+ subscription.start(topicConsumer.getOutput().asFlux());
consumers.put(job.getType().getId(), job.getId(), subscription);
}
}
}
@Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
- public synchronized void restartNonRunningTasks() {
- this.consumers.keySet().forEach(typeId -> {
- this.consumers.get(typeId).forEach(consumer -> {
- if (!consumer.isRunning()) {
- restartTopic(consumer);
- }
- });
- });
+ public synchronized void restartNonRunningTopics() {
+ for (String typeId : this.consumers.keySet()) {
+ for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) {
+ restartTopic(consumer);
+ }
+ }
}
private void restartTopic(KafkaJobDataConsumer consumer) {
}
private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
- this.consumers.get(type.getId()).forEach((consumer) -> {
- consumer.start(topic.getOutput());
- });
+ this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
}
}
import reactor.core.publisher.Mono;
/**
- * Registers the types and this producer in ECS. This is done when needed.
+ * Registers the types and this producer in Innformation Coordinator Service.
+ * This is done when needed.
*/
@Component
@EnableScheduling
private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
@Getter
- private boolean isRegisteredInEcs = false;
+ private boolean isRegisteredInIcs = false;
private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5;
public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) {
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
public void supervisionTask() {
checkRegistration() //
- .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) //
+ .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
.flatMap(isRegisterred -> registerTypesAndProducer()) //
.subscribe( //
null, //
}
private void handleRegistrationCompleted() {
- isRegisteredInEcs = true;
+ isRegisteredInIcs = true;
}
private void handleRegistrationFailure(Throwable t) {
// Returns TRUE if registration is correct
private Mono<Boolean> checkRegistration() {
- final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ final String url = applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return restClient.get(url) //
.flatMap(this::isRegisterredInfoCorrect) //
.onErrorResume(t -> Mono.just(Boolean.FALSE));
private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
if (isEqual(producerRegistrationInfo(), registerredInfo)) {
- logger.trace("Already registered in ECS");
+ logger.trace("Already registered in ICS");
return Mono.just(Boolean.TRUE);
} else {
return Mono.just(Boolean.FALSE);
}
private String registerTypeUrl(InfoType type) {
- return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
+ return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
}
private Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 20;
final String producerUrl =
- applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
private ConsumerController consumerController;
@Autowired
- private EcsSimulatorController ecsSimulatorController;
+ private IcsSimulatorController icsSimulatorController;
+
+ @Autowired
+ KafkaTopicConsumers kafkaTopicConsumers;
private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
static class TestApplicationConfig extends ApplicationConfig {
@Override
- public String getEcsBaseUrl() {
+ public String getIcsBaseUrl() {
return thisProcessUrl();
}
@AfterEach
void reset() {
this.consumerController.testResults.reset();
- this.ecsSimulatorController.testResults.reset();
+ this.icsSimulatorController.testResults.reset();
this.jobs.clear();
}
}
@Test
- void testWholeChain() throws Exception {
+ void testReceiveAndPostDataFromKafka() {
+ final String JOB_ID = "ID";
+ final String TYPE_ID = "KafkaInformationType";
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ // Create a job
+ Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1);
+ String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ ConsumerJobInfo kafkaJobInfo =
+ new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+
+ this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ KafkaJobDataConsumer kafkaConsumer = this.kafkaTopicConsumers.getConsumers().get(TYPE_ID, JOB_ID);
+
+ // Handle received data from Kafka, check that it has been posted to the
+ // consumer
+ kafkaConsumer.start(Flux.just("data"));
+
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(1));
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
+
+ // Test send an exception
+ kafkaConsumer.start(Flux.error(new NullPointerException()));
+
+ // Test regular restart of stopped
+ kafkaConsumer.stop();
+ this.kafkaTopicConsumers.restartNonRunningTopics();
+ await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
+
+ // Delete the job
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
+ @Test
+ void testReceiveAndPostDataFromDmaap() throws Exception {
final String JOB_ID = "ID";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create a job
- this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
// Return two messages from DMAAP and verify that these are sent to the owner of
assertThat(jobs).contains(JOB_ID);
// Delete the job
- this.ecsSimulatorController.deleteJob(JOB_ID, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
@Test
void testReRegister() throws Exception {
// Wait foir register types and producer
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Clear the registration, should trigger a re-register
- ecsSimulatorController.testResults.reset();
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ icsSimulatorController.testResults.reset();
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Just clear the registerred types, should trigger a re-register
- ecsSimulatorController.testResults.types.clear();
+ icsSimulatorController.testResults.types.clear();
await().untilAsserted(
- () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
- }
-
- @Test
- void testCreateKafkaJob() {
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
-
- final String TYPE_ID = "KafkaInformationType";
-
- Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
- String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
- ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
-
- // Create a job
- this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
-
- // Delete the job
- this.ecsSimulatorController.deleteJob("JOB_ID", restClient());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ () -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
}
private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
@RestController("IcsSimulatorController")
@Tag(name = "Information Coordinator Service Simulator (exists only in test)")
-public class EcsSimulatorController {
+public class IcsSimulatorController {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final static Gson gson = new GsonBuilder().create();
static class TestApplicationConfig extends ApplicationConfig {
@Override
- public String getEcsBaseUrl() {
+ public String getIcsBaseUrl() {
return "https://localhost:8434";
}
}
private String ecsBaseUrl() {
- return applicationConfig.getEcsBaseUrl();
+ return applicationConfig.getIcsBaseUrl();
}
private String jobUrl(String jobId) {
@Test
void testCreateKafkaJob() {
- await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+ await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
final String TYPE_ID = "KafkaInformationType";
Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
@Test
void testWholeChain() throws Exception {
- await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+ await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
createInformationJobInEcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*");
private ConsumerController consumerController;
@Autowired
- private EcsSimulatorController ecsSimulatorController;
+ private IcsSimulatorController icsSimulatorController;
@Autowired
private KafkaTopicConsumers kafkaTopicConsumers;
static class TestApplicationConfig extends ApplicationConfig {
@Override
- public String getEcsBaseUrl() {
+ public String getIcsBaseUrl() {
return thisProcessUrl();
}
@AfterEach
void reset() {
this.consumerController.testResults.reset();
- this.ecsSimulatorController.testResults.reset();
+ this.icsSimulatorController.testResults.reset();
this.jobs.clear();
}
final String JOB_ID2 = "ID2";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs. One buffering and one with a filter
- this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
restClient());
- this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
// Delete the jobs
- this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
- this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
final String JOB_ID2 = "ID2";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs.
- this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
restClient());
- this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
this.consumerController.testResults.reset();
- this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
- kafkaTopicConsumers.restartNonRunningTasks();
+ this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+ kafkaTopicConsumers.restartNonRunningTopics();
Thread.sleep(1000); // Restarting the input seems to take some asynch time
dataToSend = Flux.just(senderRecord("Howdy\""));
verifiedReceivedByConsumer("[\"Howdy\\\"\"]");
// Delete the jobs
- this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
- this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer until it is available again.
+The producer provides a REST API to control the log level. The available levels are the same as the ones used in the configuration above.
+
+ PUT https://mrproducer:8085/admin/log?level=<new level>
+
## Development
To make it easy to test during development of the producer, two stubs are provided in the `stub` folder.
-One, under the `dmaap` folder, called `dmaap` that stubs MR and respond with an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. The default port is `3905`, but this can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
+One, under the `dmaap` folder, called `dmaap` that stubs MR and respond with an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. The default port is `3905`, but this can be overridden by passing a `-port <PORT>` flag when starting the stub. To build and start the stub, do the following:
>1. cd stub/dmaap
>2. go build
->3. ./dmaap
+>3. ./dmaap [-port \<PORT>]
-One, under the `consumer` folder, called `consumer` that at startup will register a job of type `STD_Fault_Messages` in ICS, and then listen for REST calls and print the body of them. By default, it listens to the port `40935`, but his can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
+One, under the `consumer` folder, called `consumer` that at startup will register a job of type `STD_Fault_Messages` in ICS, and then listen for REST calls and print the body of them. By default, it listens to the port `40935`, but his can be overridden by passing a `-port <PORT>` flag when starting the stub. To build and start the stub, do the following:
>1. cd stub/consumer
>2. go build
->3. ./consumer
+>3. ./consumer [-port \<PORT>]
Mocks needed for unit tests have been generated using `github.com/stretchr/testify/mock` and are checked in under the `mocks` folder. **Note!** Keep in mind that if any of the mocked interfaces change, a new mock for that interface must be generated and checked in.
go 1.17
require (
+ github.com/gorilla/mux v1.8.0
+ github.com/hashicorp/go-retryablehttp v0.7.0
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
- github.com/gorilla/mux v1.8.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
- github.com/hashicorp/go-retryablehttp v0.7.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.1.0 // indirect
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
+github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI=
github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
github.com/hashicorp/go-retryablehttp v0.7.0 h1:eu1EI/mbirUgP5C8hVsTNaGZreBDlYiwC1FZWkvQPQ4=
github.com/hashicorp/go-retryablehttp v0.7.0/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
package config
import (
+ "encoding/json"
"fmt"
"os"
"strconv"
return log.InfoLevel
}
}
+
+func GetJobTypesFromConfiguration(configFile string) ([]TypeDefinition, error) {
+ typeDefsByte, err := os.ReadFile(configFile)
+ if err != nil {
+ return nil, err
+ }
+ typeDefs := struct {
+ Types []TypeDefinition `json:"types"`
+ }{}
+ err = json.Unmarshal(typeDefsByte, &typeDefs)
+ if err != nil {
+ return nil, err
+ }
+
+ return typeDefs.Types, nil
+}
import (
"bytes"
"os"
- "reflect"
+ "path/filepath"
"testing"
log "github.com/sirupsen/logrus"
ProducerCertPath: "security/producer.crt",
ProducerKeyPath: "security/producer.key",
}
- if got := New(); !reflect.DeepEqual(got, &wantConfig) {
- t.Errorf("New() = %v, want %v", got, &wantConfig)
- }
+ got := New()
+ assertions.Equal(&wantConfig, got)
logString := buf.String()
assertions.Contains(logString, "Invalid int value: wrong for variable: INFO_PRODUCER_PORT. Default value: 8085 will be used")
}
logString := buf.String()
assertions.Contains(logString, "Invalid log level: wrong. Log level will be Info!")
}
+
+const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
+
+func TestGetTypesFromConfiguration_fileOkShouldReturnSliceOfTypeDefinitions(t *testing.T) {
+ assertions := require.New(t)
+ typesDir, err := os.MkdirTemp("", "configs")
+ if err != nil {
+ t.Errorf("Unable to create temporary directory for types due to: %v", err)
+ }
+ fname := filepath.Join(typesDir, "type_config.json")
+ t.Cleanup(func() {
+ os.RemoveAll(typesDir)
+ })
+ if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
+ t.Errorf("Unable to create temporary config file for types due to: %v", err)
+ }
+
+ types, err := GetJobTypesFromConfiguration(fname)
+
+ wantedType := TypeDefinition{
+ Id: "type1",
+ DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
+ }
+ wantedTypes := []TypeDefinition{wantedType}
+ assertions.EqualValues(wantedTypes, types)
+ assertions.Nil(err)
+}
package jobs
import (
- "encoding/json"
"fmt"
- "os"
"sync"
log "github.com/sirupsen/logrus"
}
type JobTypesManager interface {
- LoadTypesFromConfiguration() ([]config.TypeDefinition, error)
+ LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition
GetSupportedTypes() []string
}
}
type JobsManagerImpl struct {
- configFile string
allTypes map[string]TypeData
pollClient restclient.HTTPClient
mrAddress string
distributeClient restclient.HTTPClient
}
-func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
+func NewJobsManagerImpl(pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
return &JobsManagerImpl{
- configFile: typeConfigFilePath,
allTypes: make(map[string]TypeData),
pollClient: pollClient,
mrAddress: mrAddr,
return nil
}
-func (jm *JobsManagerImpl) LoadTypesFromConfiguration() ([]config.TypeDefinition, error) {
- typeDefsByte, err := os.ReadFile(jm.configFile)
- if err != nil {
- return nil, err
- }
- typeDefs := struct {
- Types []config.TypeDefinition `json:"types"`
- }{}
- err = json.Unmarshal(typeDefsByte, &typeDefs)
- if err != nil {
- return nil, err
- }
- for _, typeDef := range typeDefs.Types {
+func (jm *JobsManagerImpl) LoadTypesFromConfiguration(types []config.TypeDefinition) []config.TypeDefinition {
+ for _, typeDef := range types {
jm.allTypes[typeDef.Id] = TypeData{
TypeId: typeDef.Id,
DMaaPTopicURL: typeDef.DmaapTopicURL,
jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
}
}
- return typeDefs.Types, nil
+ return types
}
func (jm *JobsManagerImpl) GetSupportedTypes() []string {
"bytes"
"io/ioutil"
"net/http"
- "os"
- "path/filepath"
"sync"
"testing"
"time"
func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
assertions := require.New(t)
- typesDir, err := os.MkdirTemp("", "configs")
- if err != nil {
- t.Errorf("Unable to create temporary directory for types due to: %v", err)
- }
- fname := filepath.Join(typesDir, "type_config.json")
- managerUnderTest := NewJobsManagerImpl(fname, nil, "", nil)
- t.Cleanup(func() {
- os.RemoveAll(typesDir)
- })
- if err = os.WriteFile(fname, []byte(typeDefinition), 0666); err != nil {
- t.Errorf("Unable to create temporary config file for types due to: %v", err)
- }
- types, err := managerUnderTest.LoadTypesFromConfiguration()
+
+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
+
wantedType := config.TypeDefinition{
Id: "type1",
DmaapTopicURL: "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1",
}
wantedTypes := []config.TypeDefinition{wantedType}
+
+ types := managerUnderTest.LoadTypesFromConfiguration(wantedTypes)
+
assertions.EqualValues(wantedTypes, types)
- assertions.Nil(err)
supportedTypes := managerUnderTest.GetSupportedTypes()
assertions.EqualValues([]string{"type1"}, supportedTypes)
func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
wantedJob := JobInfo{
Owner: "owner",
LastUpdated: "now",
func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
jobInfo := JobInfo{
InfoTypeIdentity: "type1",
}
func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
managerUnderTest.allTypes["type1"] = TypeData{
TypeId: "type1",
}
func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
managerUnderTest.allTypes["type1"] = TypeData{
TypeId: "type1",
}
func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
assertions := require.New(t)
- managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
+ managerUnderTest := NewJobsManagerImpl(nil, "", nil)
jobsHandler := jobsHandler{
deleteJobCh: make(chan string)}
managerUnderTest.allTypes["type1"] = TypeData{
})
jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
- jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock)
+ jobsManager := NewJobsManagerImpl(pollClientMock, "http://mrAddr", distributeClientMock)
jobsManager.allTypes["type1"] = TypeData{
DMaaPTopicURL: "/topicUrl",
TypeId: "type1",
"time"
"github.com/hashicorp/go-retryablehttp"
+ log "github.com/sirupsen/logrus"
)
// HTTPClient interface
func CreateRetryClient(cert tls.Certificate) *http.Client {
rawRetryClient := retryablehttp.NewClient()
+ rawRetryClient.Logger = leveledLogger{}
rawRetryClient.RetryWaitMax = time.Minute
rawRetryClient.RetryMax = math.MaxInt
rawRetryClient.HTTPClient.Transport = getSecureTransportWithoutVerify(cert)
u, _ := url.Parse(configUrl)
return u.Scheme == "https"
}
+
+// Used to get leveled logging in the RetryClient
+type leveledLogger struct {
+}
+
+func (ll leveledLogger) Error(msg string, keysAndValues ...interface{}) {
+ log.WithFields(getFields(keysAndValues)).Error(msg)
+}
+func (ll leveledLogger) Info(msg string, keysAndValues ...interface{}) {
+ log.WithFields(getFields(keysAndValues)).Info(msg)
+}
+func (ll leveledLogger) Debug(msg string, keysAndValues ...interface{}) {
+ log.WithFields(getFields(keysAndValues)).Debug(msg)
+}
+func (ll leveledLogger) Warn(msg string, keysAndValues ...interface{}) {
+ log.WithFields(getFields(keysAndValues)).Warn(msg)
+}
+
+func getFields(keysAndValues []interface{}) log.Fields {
+ fields := log.Fields{}
+ for i := 0; i < len(keysAndValues); i = i + 2 {
+ fields[fmt.Sprint(keysAndValues[i])] = keysAndValues[i+1]
+ }
+ return fields
+}
"net/http"
"github.com/gorilla/mux"
+ log "github.com/sirupsen/logrus"
"oransc.org/nonrtric/dmaapmediatorproducer/internal/jobs"
)
const AddJobPath = "/jobs"
const jobIdToken = "infoJobId"
const deleteJobPath = AddJobPath + "/{" + jobIdToken + "}"
+const logLevelToken = "level"
+const logAdminPath = "/admin/log"
type ProducerCallbackHandler struct {
jobsManager jobs.JobsManager
r.HandleFunc(StatusPath, statusHandler).Methods(http.MethodGet).Name("status")
r.HandleFunc(AddJobPath, callbackHandler.addInfoJobHandler).Methods(http.MethodPost).Name("add")
r.HandleFunc(deleteJobPath, callbackHandler.deleteInfoJobHandler).Methods(http.MethodDelete).Name("delete")
+ r.HandleFunc(logAdminPath, callbackHandler.setLogLevel).Methods(http.MethodPut).Name("setLogLevel")
r.NotFoundHandler = ¬FoundHandler{}
r.MethodNotAllowedHandler = &methodNotAllowedHandler{}
return r
h.jobsManager.DeleteJobFromRESTCall(id)
}
+func (h *ProducerCallbackHandler) setLogLevel(w http.ResponseWriter, r *http.Request) {
+ query := r.URL.Query()
+ logLevelStr := query.Get(logLevelToken)
+ if loglevel, err := log.ParseLevel(logLevelStr); err == nil {
+ log.SetLevel(loglevel)
+ } else {
+ http.Error(w, fmt.Sprintf("Invalid log level: %v. Log level will not be changed!", logLevelStr), http.StatusBadRequest)
+ return
+ }
+}
+
type notFoundHandler struct{}
func (h *notFoundHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
handler.ServeHTTP(responseRecorder, newRequest(http.MethodPut, "/status", nil, t))
assertions.Equal(http.StatusMethodNotAllowed, responseRecorder.Code)
assertions.Contains(responseRecorder.Body.String(), "Method is not supported.")
+
+ setLogLevelRoute := r.Get("setLogLevel")
+ assertions.NotNil(setLogLevelRoute)
+ supportedMethods, err = setLogLevelRoute.GetMethods()
+ assertions.Equal([]string{http.MethodPut}, supportedMethods)
+ assertions.Nil(err)
+ path, _ = setLogLevelRoute.GetPathTemplate()
+ assertions.Equal("/admin/log", path)
}
func TestStatusHandler(t *testing.T) {
},
},
wantedStatus: http.StatusOK,
- wantedBody: "",
},
{
name: "AddInfoJobHandler with incorrect job info, should return BadRequest",
jobHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
}
+func TestSetLogLevel(t *testing.T) {
+ assertions := require.New(t)
+
+ type args struct {
+ logLevel string
+ }
+ tests := []struct {
+ name string
+ args args
+ wantedStatus int
+ wantedBody string
+ }{
+ {
+ name: "Set to valid log level, should return OK",
+ args: args{
+ logLevel: "Debug",
+ },
+ wantedStatus: http.StatusOK,
+ },
+ {
+ name: "Set to invalid log level, should return BadRequest",
+ args: args{
+ logLevel: "bad",
+ },
+ wantedStatus: http.StatusBadRequest,
+ wantedBody: "Invalid log level: bad",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ callbackHandlerUnderTest := NewProducerCallbackHandler(nil)
+
+ handler := http.HandlerFunc(callbackHandlerUnderTest.setLogLevel)
+ responseRecorder := httptest.NewRecorder()
+ r, _ := http.NewRequest(http.MethodPut, "/admin/log?level="+tt.args.logLevel, nil)
+
+ handler.ServeHTTP(responseRecorder, r)
+
+ assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
+ assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
+ })
+ }
+}
+
func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request {
var body io.Reader
if jobInfo != nil {
}
retryClient := restclient.CreateRetryClient(cert)
- jobsManager := jobs.NewJobsManagerImpl("configs/type_config.json", retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 5*time.Second))
+ jobsManager := jobs.NewJobsManagerImpl(retryClient, configuration.DMaaPMRAddress, restclient.CreateClientWithoutRetry(cert, 10*time.Second))
if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
log.Fatalf("Stopping producer due to: %v", err)
}
}
func registerTypesAndProducer(jobTypesHandler jobs.JobTypesManager, infoCoordinatorAddress string, callbackAddress string, client restclient.HTTPClient) error {
registrator := config.NewRegistratorImpl(infoCoordinatorAddress, client)
- if types, err := jobTypesHandler.LoadTypesFromConfiguration(); err == nil {
- if regErr := registrator.RegisterTypes(types); regErr != nil {
- return fmt.Errorf("unable to register all types due to: %v", regErr)
- }
- } else {
- return fmt.Errorf("unable to get types to register due to: %v", err)
+ configTypes, err := config.GetJobTypesFromConfiguration("configs/type_config.json")
+ if err != nil {
+ return fmt.Errorf("unable to register all types due to: %v", err)
}
+ regErr := registrator.RegisterTypes(jobTypesHandler.LoadTypesFromConfiguration(configTypes))
+ if regErr != nil {
+ return fmt.Errorf("unable to register all types due to: %v", regErr)
+ }
+
producer := config.ProducerRegistrationInfo{
InfoProducerSupervisionCallbackUrl: callbackAddress + server.StatusPath,
SupportedInfoTypes: jobTypesHandler.GetSupportedTypes(),
registerJob(*port)
- fmt.Print("Starting consumer on port: ", *port)
+ fmt.Println("Starting consumer on port: ", *port)
fmt.Println(http.ListenAndServe(fmt.Sprintf(":%v", *port), nil))
}
InfoTypeId: "STD_Fault_Messages",
JobDefinition: "{}",
}
- fmt.Print("Registering consumer: ", jobInfo)
+ fmt.Println("Registering consumer: ", jobInfo)
body, _ := json.Marshal(jobInfo)
putErr := restclient.Put(fmt.Sprintf("http://localhost:8083/data-consumer/v1/info-jobs/job%v", port), body, &httpClient)
if putErr != nil {
- fmt.Printf("Unable to register consumer: %v", putErr)
+ fmt.Println("Unable to register consumer: ", putErr)
}
}
var responseBody []byte
if critical {
responseBody = getFaultMessage("CRITICAL")
+ fmt.Println("Sending CRITICAL")
critical = false
} else {
responseBody = getFaultMessage("NORMAL")
+ fmt.Println("Sending NORMAL")
critical = true
}
- // w.Write(responseBody)
fmt.Fprint(w, string(responseBody))
}
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
+ env:
+ - name: TOPIC_READ
+ value: http://dmaap-mr:3904/events/unauthenticated.SEC_FAULT_OUTPUT
+ - name: TOPIC_WRITE
+ value: http://dmaap-mr:3904/events/unauthenticated.SEC_FAULT_OUTPUT
+ - name: GENERIC_TOPICS_UPLOAD_BASEURL
+ value: http://dmaap-mr:3904
ports:
- name: http
containerPort: 3904