this.httpProxyConfig = httpProxyConfig;
}
- public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body) {
+ public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body,
+ @Nullable MediaType contentType) {
Object traceTag = createTraceTag();
logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
RequestHeadersSpec<?> request = getWebClient() //
.post() //
.uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
+ .contentType(contentType) //
.body(bodyProducer, String.class);
return retrieve(traceTag, request);
}
- public Mono<String> post(String uri, @Nullable String body) {
- return postForEntity(uri, body) //
+ public Mono<String> post(String uri, @Nullable String body, @Nullable MediaType contentType) {
+ return postForEntity(uri, body, contentType) //
.map(this::toBody);
}
- public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
+ public Mono<String> postWithAuthHeader(String uri, String body, String username, String password,
+ MediaType mediaType) {
Object traceTag = createTraceTag();
logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
.post() //
.uri(uri) //
.headers(headers -> headers.setBasicAuth(username, password)) //
- .contentType(MediaType.APPLICATION_JSON) //
+ .contentType(mediaType) //
.bodyValue(body);
return retrieve(traceTag, request) //
.map(this::toBody);
import org.oran.dmaapadapter.repository.Jobs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
.doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
- .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
+ .flatMap(job -> job.getConsumerRestClient().post("", body, MediaType.APPLICATION_JSON), CONCURRENCY) //
.onErrorResume(this::handleConsumerErrorResponse);
}
}
package org.oran.dmaapadapter.tasks;
+import lombok.Getter;
+
import org.oran.dmaapadapter.repository.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.Disposable;
* owner via REST calls.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-
public class KafkaJobDataConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataConsumer.class);
- private final Many<String> input;
+ @Getter
private final Job job;
private Disposable subscription;
- private int errorCounter = 0;
+ private final ErrorStats errorStats = new ErrorStats();
+
+ private class ErrorStats {
+ private int consumerFaultCounter = 0;
+ private boolean kafkaError = false; // eg. overflow
+
+ public void handleOkFromConsumer() {
+ this.consumerFaultCounter = 0;
+ }
+
+ public void handleException(Throwable t) {
+ if (t instanceof WebClientResponseException) {
+ ++this.consumerFaultCounter;
+ } else {
+ kafkaError = true;
+ }
+ }
- KafkaJobDataConsumer(Many<String> input, Job job) {
- this.input = input;
+ public boolean isItHopeless() {
+ final int STOP_AFTER_ERRORS = 5;
+ return kafkaError || consumerFaultCounter > STOP_AFTER_ERRORS;
+ }
+
+ public void resetKafkaErrors() {
+ kafkaError = false;
+ }
+ }
+
+ public KafkaJobDataConsumer(Job job) {
this.job = job;
}
- public synchronized void start() {
+ public synchronized void start(Many<String> input) {
stop();
- this.subscription = getMessagesFromKafka(job) //
- .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
- .flatMap(body -> job.getConsumerRestClient().post("", body), job.getParameters().getMaxConcurrency()) //
+ this.errorStats.resetKafkaErrors();
+ this.subscription = getMessagesFromKafka(input, job) //
+ .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleConsumerSentOk, //
- this::handleErrorInStream, //
- () -> logger.debug("KafkaMessageConsumer stopped, jobId: {}, type: {}", job.getId(),
- job.getType().getId()));
+ t -> stop(), //
+ () -> logger.warn("KafkaMessageConsumer stopped jobId: {}", job.getId()));
+ }
+
+ private Mono<String> postToClient(String body) {
+ logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), body);
+ MediaType contentType = this.job.isBuffered() ? MediaType.APPLICATION_JSON : null;
+ return job.getConsumerRestClient().post("", body, contentType);
}
public synchronized void stop() {
return this.subscription != null;
}
- private Flux<String> getMessagesFromKafka(Job job) {
+ private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
Flux<String> result = input.asFlux() //
.filter(job::isFilterMatch);
if (job.isBuffered()) {
- result = result.bufferTimeout( //
- job.getParameters().getBufferTimeout().getMaxSize(), //
- job.getParameters().getBufferTimeout().getMaxTime()) //
+ result = result.map(this::quote) //
+ .bufferTimeout( //
+ job.getParameters().getBufferTimeout().getMaxSize(), //
+ job.getParameters().getBufferTimeout().getMaxTime()) //
.map(Object::toString);
}
return result;
}
- private Mono<String> handleError(Throwable t) {
- logger.warn("exception: {} job: {}", t.getMessage(), job);
+ private String quote(String str) {
+ final String q = "\"";
+ return q + str.replace(q, "\\\"") + q;
+ }
- final int STOP_AFTER_ERRORS = 5;
- if (t instanceof WebClientResponseException) {
- if (++this.errorCounter > STOP_AFTER_ERRORS) {
- logger.error("Stopping job {}", job);
- return Mono.error(t);
- } else {
- return Mono.empty(); // Discard
- }
+ private Mono<String> handleError(Throwable t) {
+ logger.warn("exception: {} job: {}", t.getMessage(), job.getId());
+ this.errorStats.handleException(t);
+ if (this.errorStats.isItHopeless()) {
+ return Mono.error(t);
} else {
- // This can happen if there is an overflow.
- return Mono.empty();
+ return Mono.empty(); // Ignore
}
}
private void handleConsumerSentOk(String data) {
- this.errorCounter = 0;
- }
-
- private void handleErrorInStream(Throwable t) {
- logger.error("KafkaMessageConsumer jobId: {}, error: {}", job.getId(), t.getMessage());
- this.subscription = null;
+ this.errorStats.handleOkFromConsumer();
}
}
public class KafkaTopicConsumers {
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumers.class);
- private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>();
+ private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
+
@Getter
- private final Map<String, KafkaJobDataConsumer> activeSubscriptions = new HashMap<>();
+ private final Map<String, KafkaJobDataConsumer> consumers = new HashMap<>(); // Key is jobId
private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
}
public synchronized void addJob(Job job) {
- if (this.activeSubscriptions.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+ if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
logger.debug("Kafka job added {}", job.getId());
KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
- KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(topicConsumer.getOutput(), job);
- subscription.start();
- activeSubscriptions.put(job.getId(), subscription);
+ KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
+ subscription.start(topicConsumer.getOutput());
+ consumers.put(job.getId(), subscription);
}
}
public synchronized void removeJob(Job job) {
- KafkaJobDataConsumer d = activeSubscriptions.remove(job.getId());
+ KafkaJobDataConsumer d = consumers.remove(job.getId());
if (d != null) {
logger.debug("Kafka job removed {}", job.getId());
d.stop();
@Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
public synchronized void restartNonRunningTasks() {
- for (KafkaJobDataConsumer consumer : activeSubscriptions.values()) {
+
+ for (KafkaJobDataConsumer consumer : consumers.values()) {
if (!consumer.isRunning()) {
- consumer.start();
+ restartTopic(consumer);
}
}
}
+ private void restartTopic(KafkaJobDataConsumer consumer) {
+ InfoType type = consumer.getJob().getType();
+ KafkaTopicListener topic = this.topicListeners.get(type.getId());
+ topic.start();
+ restartConsumersOfType(topic, type);
+ }
+
+ private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
+ this.consumers.forEach((jobId, consumer) -> {
+ if (consumer.getJob().getType().getId().equals(type.getId())) {
+ consumer.start(topic.getOutput());
+ }
+ });
+ }
}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ApplicationConfig applicationConfig;
private final InfoType type;
- private final Many<String> output;
+ private Many<String> output;
+ private Disposable topicReceiverTask;
public KafkaTopicListener(ApplicationConfig applicationConfig, InfoType type) {
this.applicationConfig = applicationConfig;
-
- final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
- this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
-
this.type = type;
- startKafkaTopicReceiver();
+ start();
}
public Many<String> getOutput() {
return this.output;
}
- private Disposable startKafkaTopicReceiver() {
- return KafkaReceiver.create(kafkaInputProperties()) //
+ public void start() {
+ stop();
+ final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
+ this.output = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
+ logger.debug("Listening to kafka topic: {} type :{}", this.type.getKafkaInputTopic(), type.getId());
+ topicReceiverTask = KafkaReceiver.create(kafkaInputProperties()) //
.receive() //
.doOnNext(this::onReceivedData) //
.subscribe(null, //
() -> logger.warn("KafkaTopicReceiver stopped"));
}
- private void onReceivedData(ConsumerRecord<Integer, String> input) {
+ private void stop() {
+ if (topicReceiverTask != null) {
+ topicReceiverTask.dispose();
+ topicReceiverTask = null;
+ }
+ }
+
+ private void onReceivedData(ConsumerRecord<String, String> input) {
logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
output.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
}
logger.error("KafkaTopicReceiver error: {}", t.getMessage());
}
- private ReceiverOptions<Integer, String> kafkaInputProperties() {
+ private ReceiverOptions<String, String> kafkaInputProperties() {
Map<String, Object> consumerProps = new HashMap<>();
if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
logger.error("No kafka boostrap server is setup");
}
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "osc-dmaap-adaptor");
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- return ReceiverOptions.<Integer, String>create(consumerProps)
+ return ReceiverOptions.<String, String>create(consumerProps)
.subscription(Collections.singleton(this.type.getKafkaInputTopic()));
}
}
private void handleRegistrationCompleted() {
- logger.debug("Registering types and producer completed");
isRegisteredInEcs = true;
}
private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
if (isEqual(producerRegistrationInfo(), registerredInfo)) {
- logger.trace("Already registered");
+ logger.trace("Already registered in ECS");
return Mono.just(Boolean.TRUE);
} else {
return Mono.just(Boolean.FALSE);
ProducerJobInfo info = new ProducerJobInfo(null, "id", "typeId", "targetUri", "owner", "lastUpdated");
String body = gson.toJson(info);
- testErrorCode(restClient().post(jobUrl, body), HttpStatus.NOT_FOUND, "Could not find type");
+ testErrorCode(restClient().post(jobUrl, body, MediaType.APPLICATION_JSON), HttpStatus.NOT_FOUND,
+ "Could not find type");
}
@Test
new ProducerJobInfo(job.jobDefinition, jobId, job.infoTypeId, job.jobResultUri, job.owner, "TIMESTAMP");
String body = gson.toJson(request);
logger.info("ECS Simulator PUT job: {}", body);
- restClient.post(url, body).block();
+ restClient.post(url, body, MediaType.APPLICATION_JSON).block();
}
public void deleteJob(String jobId, AsyncRestClient restClient) {
var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
sendDataToStream(dataToSend);
- verifiedReceivedByConsumer("Message_1", "[Message_1, Message_2, Message_3]");
+ verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
+
+ // Just for testing quoting
+ this.consumerController.testResults.reset();
+ dataToSend = Flux.just(senderRecord("Message\"_", 1));
+ sendDataToStream(dataToSend);
+ verifiedReceivedByConsumer("[\"Message\\\"_1\"]");
// Delete the jobs
this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getActiveSubscriptions()).isEmpty());
+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers()).isEmpty());
}
@Test
void kafkaIOverflow() throws InterruptedException {
- // This does not work. After an overflow, the kafka stream does not seem to work
- //
final String JOB_ID1 = "ID1";
final String JOB_ID2 = "ID2";
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
var dataToSend = Flux.range(1, 1000000).map(i -> senderRecord("Message_", i)); // Message_1, Message_2 etc.
- sendDataToStream(dataToSend); // this will overflow
+ sendDataToStream(dataToSend); // this should overflow
- KafkaJobDataConsumer consumer = kafkaTopicConsumers.getActiveSubscriptions().values().iterator().next();
+ KafkaJobDataConsumer consumer = kafkaTopicConsumers.getConsumers().values().iterator().next();
await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
this.consumerController.testResults.reset();
kafkaTopicConsumers.restartNonRunningTasks();
+ this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+ Thread.sleep(1000); // Restarting the input seems to take some asynch time
- dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message__", i)); // Message_1
+ dataToSend = Flux.range(1, 1).map(i -> senderRecord("Howdy_", i));
sendDataToStream(dataToSend);
- verifiedReceivedByConsumer("Message__1", "Message__1");
+ verifiedReceivedByConsumer("Howdy_1");
}
}
##
## Build
##
-FROM golang:1.17-bullseye AS build
+FROM nexus3.o-ran-sc.org:10001/golang:1.17-bullseye AS build
WORKDIR /app
COPY go.mod .
COPY go.sum .
At start up the producer will register the configured job types in ICS and also register itself as a producer supporting these types. If ICS is unavailable, the producer will retry to connect indefinetely. The same goes for MR.
-Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer.
+Once the initial registration is done, the producer will constantly poll MR for all configured job types. When receiving messages for a type, it will distribute these messages to all jobs registered for the type. If no jobs for that type are registered, the messages will be discarded. If a consumer is unavailable for distribution, the messages will be discarded for that consumer until it is available again.
## Development
type TypeData struct {
TypeId string `json:"id"`
DMaaPTopicURL string `json:"dmaapTopicUrl"`
- jobHandler *jobHandler
+ jobsHandler *jobsHandler
}
type JobInfo struct {
}
type JobsManager interface {
- AddJob(JobInfo) error
- DeleteJob(jobId string)
+ AddJobFromRESTCall(JobInfo) error
+ DeleteJobFromRESTCall(jobId string)
}
type JobsManagerImpl struct {
distributeClient restclient.HTTPClient
}
-type jobHandler struct {
- mu sync.Mutex
- typeId string
- topicUrl string
- jobs map[string]JobInfo
- addJobCh chan JobInfo
- deleteJobCh chan string
- pollClient restclient.HTTPClient
- distributeClient restclient.HTTPClient
-}
-
func NewJobsManagerImpl(typeConfigFilePath string, pollClient restclient.HTTPClient, mrAddr string, distributeClient restclient.HTTPClient) *JobsManagerImpl {
return &JobsManagerImpl{
configFile: typeConfigFilePath,
}
}
-func (jm *JobsManagerImpl) AddJob(ji JobInfo) error {
+func (jm *JobsManagerImpl) AddJobFromRESTCall(ji JobInfo) error {
if err := jm.validateJobInfo(ji); err == nil {
typeData := jm.allTypes[ji.InfoTypeIdentity]
- typeData.jobHandler.addJobCh <- ji
+ typeData.jobsHandler.addJobCh <- ji
log.Debug("Added job: ", ji)
return nil
} else {
}
}
-func (jm *JobsManagerImpl) DeleteJob(jobId string) {
+func (jm *JobsManagerImpl) DeleteJobFromRESTCall(jobId string) {
for _, typeData := range jm.allTypes {
log.Debugf("Deleting job %v from type %v", jobId, typeData.TypeId)
- typeData.jobHandler.deleteJobCh <- jobId
+ typeData.jobsHandler.deleteJobCh <- jobId
}
log.Debug("Deleted job: ", jobId)
}
return nil, err
}
for _, typeDef := range typeDefs.Types {
- addCh := make(chan JobInfo)
- deleteCh := make(chan string)
- jh := jobHandler{
- typeId: typeDef.Id,
- topicUrl: typeDef.DmaapTopicURL,
- jobs: make(map[string]JobInfo),
- addJobCh: addCh,
- deleteJobCh: deleteCh,
- pollClient: jm.pollClient,
- distributeClient: jm.distributeClient,
- }
jm.allTypes[typeDef.Id] = TypeData{
TypeId: typeDef.Id,
DMaaPTopicURL: typeDef.DmaapTopicURL,
- jobHandler: &jh,
+ jobsHandler: newJobsHandler(typeDef.Id, typeDef.DmaapTopicURL, jm.pollClient, jm.distributeClient),
}
}
return typeDefs.Types, nil
return supportedTypes
}
-func (jm *JobsManagerImpl) StartJobs() {
+func (jm *JobsManagerImpl) StartJobsForAllTypes() {
for _, jobType := range jm.allTypes {
- go jobType.jobHandler.start(jm.mrAddress)
+ go jobType.jobsHandler.startPollingAndDistribution(jm.mrAddress)
+
+ }
+}
+
+type jobsHandler struct {
+ mu sync.Mutex
+ typeId string
+ topicUrl string
+ jobs map[string]job
+ addJobCh chan JobInfo
+ deleteJobCh chan string
+ pollClient restclient.HTTPClient
+ distributeClient restclient.HTTPClient
+}
+func newJobsHandler(typeId string, topicURL string, pollClient restclient.HTTPClient, distributeClient restclient.HTTPClient) *jobsHandler {
+ return &jobsHandler{
+ typeId: typeId,
+ topicUrl: topicURL,
+ jobs: make(map[string]job),
+ addJobCh: make(chan JobInfo),
+ deleteJobCh: make(chan string),
+ pollClient: pollClient,
+ distributeClient: distributeClient,
}
}
-func (jh *jobHandler) start(mRAddress string) {
+func (jh *jobsHandler) startPollingAndDistribution(mRAddress string) {
go func() {
for {
jh.pollAndDistributeMessages(mRAddress)
}()
}
-func (jh *jobHandler) pollAndDistributeMessages(mRAddress string) {
+func (jh *jobsHandler) pollAndDistributeMessages(mRAddress string) {
log.Debugf("Processing jobs for type: %v", jh.typeId)
messagesBody, error := restclient.Get(mRAddress+jh.topicUrl, jh.pollClient)
if error != nil {
- log.Warnf("Error getting data from MR. Cause: %v", error)
+ log.Warn("Error getting data from MR. Cause: ", error)
}
- log.Debugf("Received messages: %v", string(messagesBody))
+ log.Debug("Received messages: ", string(messagesBody))
jh.distributeMessages(messagesBody)
}
-func (jh *jobHandler) distributeMessages(messages []byte) {
+func (jh *jobsHandler) distributeMessages(messages []byte) {
if len(messages) > 2 {
jh.mu.Lock()
defer jh.mu.Unlock()
- for _, jobInfo := range jh.jobs {
- go jh.sendMessagesToConsumer(messages, jobInfo)
+ for _, job := range jh.jobs {
+ if len(job.messagesChannel) < cap(job.messagesChannel) {
+ job.messagesChannel <- messages
+ } else {
+ jh.emptyMessagesBuffer(job)
+ }
}
}
}
-func (jh *jobHandler) sendMessagesToConsumer(messages []byte, jobInfo JobInfo) {
- log.Debugf("Processing job: %v", jobInfo.InfoJobIdentity)
- if postErr := restclient.Post(jobInfo.TargetUri, messages, jh.distributeClient); postErr != nil {
- log.Warnf("Error posting data for job: %v. Cause: %v", jobInfo, postErr)
+func (jh *jobsHandler) emptyMessagesBuffer(job job) {
+ log.Debug("Emptying message queue for job: ", job.jobInfo.InfoJobIdentity)
+out:
+ for {
+ select {
+ case <-job.messagesChannel:
+ default:
+ break out
+ }
}
- log.Debugf("Messages distributed to consumer: %v.", jobInfo.Owner)
}
-func (jh *jobHandler) monitorManagementChannels() {
+func (jh *jobsHandler) monitorManagementChannels() {
select {
case addedJob := <-jh.addJobCh:
- jh.mu.Lock()
- log.Debugf("received %v from addJobCh\n", addedJob)
- jh.jobs[addedJob.InfoJobIdentity] = addedJob
- jh.mu.Unlock()
+ jh.addJob(addedJob)
case deletedJob := <-jh.deleteJobCh:
- jh.mu.Lock()
- log.Debugf("received %v from deleteJobCh\n", deletedJob)
+ jh.deleteJob(deletedJob)
+ }
+}
+
+func (jh *jobsHandler) addJob(addedJob JobInfo) {
+ jh.mu.Lock()
+ log.Debug("Add job: ", addedJob)
+ newJob := newJob(addedJob, jh.distributeClient)
+ go newJob.start()
+ jh.jobs[addedJob.InfoJobIdentity] = newJob
+ jh.mu.Unlock()
+}
+
+func (jh *jobsHandler) deleteJob(deletedJob string) {
+ jh.mu.Lock()
+ log.Debug("Delete job: ", deletedJob)
+ j, exist := jh.jobs[deletedJob]
+ if exist {
+ j.controlChannel <- struct{}{}
delete(jh.jobs, deletedJob)
- jh.mu.Unlock()
}
+ jh.mu.Unlock()
+}
+
+type job struct {
+ jobInfo JobInfo
+ client restclient.HTTPClient
+ messagesChannel chan []byte
+ controlChannel chan struct{}
+}
+
+func newJob(j JobInfo, c restclient.HTTPClient) job {
+ return job{
+ jobInfo: j,
+ client: c,
+ messagesChannel: make(chan []byte, 10),
+ controlChannel: make(chan struct{}),
+ }
+}
+
+func (j *job) start() {
+out:
+ for {
+ select {
+ case <-j.controlChannel:
+ log.Debug("Stop distribution for job: ", j.jobInfo.InfoJobIdentity)
+ break out
+ case msg := <-j.messagesChannel:
+ j.sendMessagesToConsumer(msg)
+ }
+ }
+}
+
+func (j *job) sendMessagesToConsumer(messages []byte) {
+ log.Debug("Processing job: ", j.jobInfo.InfoJobIdentity)
+ if postErr := restclient.Post(j.jobInfo.TargetUri, messages, j.client); postErr != nil {
+ log.Warnf("Error posting data for job: %v. Cause: %v", j.jobInfo, postErr)
+ }
+ log.Debugf("Messages for job: %v distributed to consumer: %v", j.jobInfo.InfoJobIdentity, j.jobInfo.Owner)
}
const typeDefinition = `{"types": [{"id": "type1", "dmaapTopicUrl": "events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/type1"}]}`
-func TestGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
+func TestJobsManagerGetTypes_filesOkShouldReturnSliceOfTypesAndProvideSupportedTypes(t *testing.T) {
assertions := require.New(t)
typesDir, err := os.MkdirTemp("", "configs")
if err != nil {
assertions.EqualValues([]string{"type1"}, supportedTypes)
}
-func TestManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
+func TestJobsManagerAddJobWhenTypeIsSupported_shouldAddJobToChannel(t *testing.T) {
assertions := require.New(t)
managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
wantedJob := JobInfo{
InfoJobData: "{}",
InfoTypeIdentity: "type1",
}
- jobHandler := jobHandler{
+ jobsHandler := jobsHandler{
addJobCh: make(chan JobInfo)}
managerUnderTest.allTypes["type1"] = TypeData{
- TypeId: "type1",
- jobHandler: &jobHandler,
+ TypeId: "type1",
+ jobsHandler: &jobsHandler,
}
var err error
go func() {
- err = managerUnderTest.AddJob(wantedJob)
+ err = managerUnderTest.AddJobFromRESTCall(wantedJob)
}()
assertions.Nil(err)
- addedJob := <-jobHandler.addJobCh
+ addedJob := <-jobsHandler.addJobCh
assertions.Equal(wantedJob, addedJob)
}
-func TestManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
+func TestJobsManagerAddJobWhenTypeIsNotSupported_shouldReturnError(t *testing.T) {
assertions := require.New(t)
managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
jobInfo := JobInfo{
InfoTypeIdentity: "type1",
}
- err := managerUnderTest.AddJob(jobInfo)
+ err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
assertions.Equal("type not supported: type1", err.Error())
}
-func TestManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
+func TestJobsManagerAddJobWhenJobIdMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
managerUnderTest.allTypes["type1"] = TypeData{
jobInfo := JobInfo{
InfoTypeIdentity: "type1",
}
- err := managerUnderTest.AddJob(jobInfo)
+ err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
assertions.Equal("missing required job identity: { <nil> type1}", err.Error())
}
-func TestManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
+func TestJobsManagerAddJobWhenTargetUriMissing_shouldReturnError(t *testing.T) {
assertions := require.New(t)
managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
managerUnderTest.allTypes["type1"] = TypeData{
InfoTypeIdentity: "type1",
InfoJobIdentity: "job1",
}
- err := managerUnderTest.AddJob(jobInfo)
+ err := managerUnderTest.AddJobFromRESTCall(jobInfo)
assertions.NotNil(err)
assertions.Equal("missing required target URI: { job1 <nil> type1}", err.Error())
}
-func TestManagerDeleteJob(t *testing.T) {
+func TestJobsManagerDeleteJob_shouldSendDeleteToChannel(t *testing.T) {
assertions := require.New(t)
managerUnderTest := NewJobsManagerImpl("", nil, "", nil)
- jobHandler := jobHandler{
+ jobsHandler := jobsHandler{
deleteJobCh: make(chan string)}
managerUnderTest.allTypes["type1"] = TypeData{
- TypeId: "type1",
- jobHandler: &jobHandler,
+ TypeId: "type1",
+ jobsHandler: &jobsHandler,
}
- go managerUnderTest.DeleteJob("job2")
+ go managerUnderTest.DeleteJobFromRESTCall("job2")
- assertions.Equal("job2", <-jobHandler.deleteJobCh)
+ assertions.Equal("job2", <-jobsHandler.deleteJobCh)
}
-func TestHandlerPollAndDistributeMessages(t *testing.T) {
+func TestAddJobToJobsManager_shouldStartPollAndDistributeMessages(t *testing.T) {
assertions := require.New(t)
- wg := sync.WaitGroup{}
+ called := false
messages := `[{"message": {"data": "data"}}]`
pollClientMock := NewTestClient(func(req *http.Request) *http.Response {
if req.URL.String() == "http://mrAddr/topicUrl" {
assertions.Equal(req.Method, "GET")
- wg.Done() // Signal that the poll call has been made
+ body := "[]"
+ if !called {
+ called = true
+ body = messages
+ }
return &http.Response{
StatusCode: 200,
- Body: ioutil.NopCloser(bytes.NewReader([]byte(messages))),
+ Body: ioutil.NopCloser(bytes.NewReader([]byte(body))),
Header: make(http.Header), // Must be set to non-nil value or it panics
}
}
t.Fail()
return nil
})
+
+ wg := sync.WaitGroup{}
distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
if req.URL.String() == "http://consumerHost/target" {
assertions.Equal(req.Method, "POST")
- assertions.Equal(messages, getBodyAsString(req))
+ assertions.Equal(messages, getBodyAsString(req, t))
assertions.Equal("application/json", req.Header.Get("Content-Type"))
- wg.Done() // Signal that the distribution call has been made
+ wg.Done()
return &http.Response{
StatusCode: 200,
Body: ioutil.NopCloser(bytes.NewBufferString(`OK`)),
t.Fail()
return nil
})
+ jobsHandler := newJobsHandler("type1", "/topicUrl", pollClientMock, distributeClientMock)
+
+ jobsManager := NewJobsManagerImpl("", pollClientMock, "http://mrAddr", distributeClientMock)
+ jobsManager.allTypes["type1"] = TypeData{
+ DMaaPTopicURL: "/topicUrl",
+ TypeId: "type1",
+ jobsHandler: jobsHandler,
+ }
+
+ jobsManager.StartJobsForAllTypes()
jobInfo := JobInfo{
InfoTypeIdentity: "type1",
InfoJobIdentity: "job1",
TargetUri: "http://consumerHost/target",
}
- handlerUnderTest := jobHandler{
- topicUrl: "/topicUrl",
- jobs: map[string]JobInfo{jobInfo.InfoJobIdentity: jobInfo},
- pollClient: pollClientMock,
- distributeClient: distributeClientMock,
- }
- wg.Add(2) // Two calls should be made to the server, one to poll and one to distribute
- handlerUnderTest.pollAndDistributeMessages("http://mrAddr")
+ wg.Add(1) // Wait till the distribution has happened
+ err := jobsManager.AddJobFromRESTCall(jobInfo)
+ assertions.Nil(err)
- if waitTimeout(&wg, 100*time.Millisecond) {
+ if waitTimeout(&wg, 2*time.Second) {
t.Error("Not all calls to server were made")
t.Fail()
}
}
-func TestHandlerAddJob_shouldAddJobToJobsMap(t *testing.T) {
- assertions := require.New(t)
+func TestJobsHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
+ jobToDelete := newJob(JobInfo{}, nil)
+ go jobToDelete.start()
+ jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+ jobsHandler.jobs["job1"] = jobToDelete
- jobInfo := JobInfo{
- InfoTypeIdentity: "type1",
- InfoJobIdentity: "job1",
- TargetUri: "http://consumerHost/target",
- }
+ go jobsHandler.monitorManagementChannels()
- addCh := make(chan JobInfo)
- handlerUnderTest := jobHandler{
- mu: sync.Mutex{},
- jobs: map[string]JobInfo{},
- addJobCh: addCh,
- }
+ jobsHandler.deleteJobCh <- "job1"
- go func() {
- addCh <- jobInfo
- }()
-
- handlerUnderTest.monitorManagementChannels()
-
- assertions.Len(handlerUnderTest.jobs, 1)
- assertions.Equal(jobInfo, handlerUnderTest.jobs["job1"])
+ deleted := false
+ for i := 0; i < 100; i++ {
+ if len(jobsHandler.jobs) == 0 {
+ deleted = true
+ break
+ }
+ time.Sleep(time.Microsecond) // Need to drop control to let the job's goroutine do the job
+ }
+ require.New(t).True(deleted, "Job not deleted")
}
-func TestHandlerDeleteJob_shouldDeleteJobFromJobsMap(t *testing.T) {
- assertions := require.New(t)
+func TestJobsHandlerEmptyJobMessageBufferWhenItIsFull(t *testing.T) {
+ job := newJob(JobInfo{
+ InfoJobIdentity: "job",
+ }, nil)
- deleteCh := make(chan string)
- handlerUnderTest := jobHandler{
- mu: sync.Mutex{},
- jobs: map[string]JobInfo{"job1": {
- InfoJobIdentity: "job1",
- }},
- deleteJobCh: deleteCh,
- }
+ jobsHandler := newJobsHandler("type1", "/topicUrl", nil, nil)
+ jobsHandler.jobs["job1"] = job
- go func() {
- deleteCh <- "job1"
- }()
+ fillMessagesBuffer(job.messagesChannel)
- handlerUnderTest.monitorManagementChannels()
+ jobsHandler.distributeMessages([]byte("sent msg"))
- assertions.Len(handlerUnderTest.jobs, 0)
+ require.New(t).Len(job.messagesChannel, 0)
+}
+
+func fillMessagesBuffer(mc chan []byte) {
+ for i := 0; i < cap(mc); i++ {
+ mc <- []byte("msg")
+ }
}
type RoundTripFunc func(req *http.Request) *http.Response
}
}
-func getBodyAsString(req *http.Request) string {
+func getBodyAsString(req *http.Request, t *testing.T) string {
buf := new(bytes.Buffer)
- buf.ReadFrom(req.Body)
+ if _, err := buf.ReadFrom(req.Body); err != nil {
+ t.Fail()
+ }
return buf.String()
}
http.Error(w, fmt.Sprintf("Invalid json body. Cause: %v", unmarshalErr), http.StatusBadRequest)
return
}
- if err := h.jobsManager.AddJob(jobInfo); err != nil {
+ if err := h.jobsManager.AddJobFromRESTCall(jobInfo); err != nil {
http.Error(w, fmt.Sprintf("Invalid job info. Cause: %v", err), http.StatusBadRequest)
}
}
return
}
- h.jobsManager.DeleteJob(id)
+ h.jobsManager.DeleteJobFromRESTCall(id)
}
type notFoundHandler struct{}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
jobHandlerMock := jobhandler.JobHandler{}
- jobHandlerMock.On("AddJob", tt.args.job).Return(tt.args.mockReturn)
+ jobHandlerMock.On("AddJobFromRESTCall", tt.args.job).Return(tt.args.mockReturn)
callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
assertions.Equal(tt.wantedStatus, responseRecorder.Code, tt.name)
assertions.Contains(responseRecorder.Body.String(), tt.wantedBody, tt.name)
- jobHandlerMock.AssertCalled(t, "AddJob", tt.args.job)
+ jobHandlerMock.AssertCalled(t, "AddJobFromRESTCall", tt.args.job)
})
}
}
func TestDeleteJob(t *testing.T) {
assertions := require.New(t)
jobHandlerMock := jobhandler.JobHandler{}
- jobHandlerMock.On("DeleteJob", mock.Anything).Return(nil)
+ jobHandlerMock.On("DeleteJobFromRESTCall", mock.Anything).Return(nil)
callbackHandlerUnderTest := NewProducerCallbackHandler(&jobHandlerMock)
assertions.Equal("", responseRecorder.Body.String())
- jobHandlerMock.AssertCalled(t, "DeleteJob", "job1")
+ jobHandlerMock.AssertCalled(t, "DeleteJobFromRESTCall", "job1")
}
func newRequest(method string, url string, jobInfo *jobs.JobInfo, t *testing.T) *http.Request {
if err := registerTypesAndProducer(jobsManager, configuration.InfoCoordinatorAddress, callbackAddress, retryClient); err != nil {
log.Fatalf("Stopping producer due to: %v", err)
}
- jobsManager.StartJobs()
+ jobsManager.StartJobsForAllTypes()
log.Debug("Starting DMaaP Mediator Producer")
go func() {
}
// AddJob provides a mock function with given fields: _a0
-func (_m *JobHandler) AddJob(_a0 jobs.JobInfo) error {
+func (_m *JobHandler) AddJobFromRESTCall(_a0 jobs.JobInfo) error {
ret := _m.Called(_a0)
var r0 error
}
// DeleteJob provides a mock function with given fields: jobId
-func (_m *JobHandler) DeleteJob(jobId string) {
+func (_m *JobHandler) DeleteJobFromRESTCall(jobId string) {
_m.Called(jobId)
}
registerJob(*port)
fmt.Print("Starting consumer on port: ", *port)
- http.ListenAndServe(fmt.Sprintf(":%v", *port), nil)
+ fmt.Println(http.ListenAndServe(fmt.Sprintf(":%v", *port), nil))
}
func registerJob(port int) {
http.HandleFunc("/events/unauthenticated.SEC_FAULT_OUTPUT/dmaapmediatorproducer/STD_Fault_Messages", handleData)
fmt.Print("Starting mr on port: ", *port)
- http.ListenAndServeTLS(fmt.Sprintf(":%v", *port), "../../security/producer.crt", "../../security/producer.key", nil)
+ fmt.Println(http.ListenAndServeTLS(fmt.Sprintf(":%v", *port), "../../security/producer.crt", "../../security/producer.key", nil))
}
-Subproject commit 3b916e4dc5777863cb4ee873b41ee460fb9aec27
+Subproject commit 558d6d2de33bb8cf4b16df980a0cdf3b1747a8e2
.history
oruclosedloop
-simulator
+producer
+sdnr
This consumer creates a job of type `STD_Fault_Messages` in the Information Coordinator Service (ICS). When it recieves messages, it checks if they are link failure messages. If they are, it checks if the event severity is other than normal. If so, it looks up the O-DU ID mapped to the O-RU the message originates from and sends a configuration message to the O-DU through SDNC. If the event severity is normal, then it logs, on `Debug` level, that the link failure has been cleared.
-The producer takes a number of environment variables, described below, as configuration.
+## Configuration
+
+The consumer takes a number of environment variables, described below, as configuration.
>- CONSUMER_HOST **Required**. The host for the consumer. Example: `http://mrproducer`
->- CONSUMER_HOST **Required**. The port for the consumer. Example: `8095`
->- LOG_LEVEL Optional. The log level, which can be `Error`, `Warn`, `Info` or `Debug`. Defaults to `Info`.
+>- CONSUMER_PORT **Required**. The port for the consumer. Example: `8095`
+>- CONSUMER_CERT_PATH **Required**. The path to the certificate to use for https. Defaults to `security/producer.crt`
+>- CONSUMER_KEY_PATH **Required**. The path to the key to the certificate to use for https. Defaults to `security/producer.key`
>- INFO_COORD_ADDR Optional. The address of the Information Coordinator. Defaults to `http://enrichmentservice:8083`.
->- SDNR_HOST Optional. The host for SDNR. Defaults to `http://localhost`.
->- SDNR_PORT Optional. The port for SDNR. Defaults to `3904`.
+>- SDNR_ADDRESS Optional. The address for SDNR. Defaults to `http://localhost:3904`.
>- SDNR_USER Optional. The user for the SDNR. Defaults to `admin`.
>- SDNR_PASSWORD Optional. The password for the SDNR user. Defaults to `Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U`.
>- ORU_TO_ODU_MAP_FILE Optional. The file containing the mapping from O-RU ID to O-DU ID. Defaults to `o-ru-to-o-du-map.csv`.
+>- LOG_LEVEL Optional. The log level, which can be `Error`, `Warn`, `Info` or `Debug`. Defaults to `Info`.
+
+Any of the addresses used by this product can be configured to use https, by specifying it as the scheme of the address URI. The client will not use server certificate verification. The consumer's own callback will only listen to the scheme configured in the scheme of the consumer host address.
+
+The configured public key and cerificate shall be PEM-encoded. A self signed certificate and key are provided in the `security` folder of the project. These files should be replaced for production. To generate a self signed key and certificate, use the example code below:
+
+ openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650
-The creation of the job is not done when the consumer is started. Instead the consumer provides a REST API where it can be started and stopped, described below.
+T## Functionality
+
+he creation of the job is not done when the consumer is started. Instead the consumer provides a REST API where it can be started and stopped, described below.
>- /start Creates the job in ICS.
>- /stop Deletes the job in ICS.
If the consumer is shut down with a SIGTERM, it will also delete the job before exiting.
+## Development
+
+To make it easy to test during development of the consumer, two stubs are provided in the `stub` folder.
+
+One, under the `producer` folder, called `producer` that stubs the producer and pushes an array with one message with `eventSeverity` alternating between `NORMAL` and `CRITICAL`. To build and start the stub, do the following:
+>1. cd stub/producer
+>2. go build
+>3. ./producer
+
+One, under the `sdnr` folder, called `sdnr` that at startup will listen for REST calls and print the body of them. By default, it listens to the port `3904`, but his can be overridden by passing a `-port [PORT]` flag when starting the stub. To build and start the stub, do the following:
+>1. cd stub/sdnr
+>2. go build
+>3. ./sdnr
+
+Mocks needed for unit tests have been generated using `github.com/stretchr/testify/mock` and are checked in under the `mocks` folder. **Note!** Keep in mind that if any of the mocked interfaces change, a new mock for that interface must be generated and checked in.
+
## License
Copyright (C) 2021 Nordix Foundation.
require (
github.com/davecgh/go-spew v1.1.1 // indirect
- github.com/google/uuid v1.3.0 // indirect
- github.com/gorilla/mux v1.8.0 // indirect
+ github.com/gorilla/mux v1.8.0
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.1.1 // indirect
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
+
+require (
+ github.com/hashicorp/go-cleanhttp v0.5.1 // indirect
+ github.com/hashicorp/go-retryablehttp v0.7.0 // indirect
+)
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
+github.com/hashicorp/go-cleanhttp v0.5.1 h1:dH3aiDG9Jvb5r5+bYHsikaOUIpcM0xvgMXVoDkXMzJM=
+github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80=
+github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ=
+github.com/hashicorp/go-retryablehttp v0.7.0 h1:eu1EI/mbirUgP5C8hVsTNaGZreBDlYiwC1FZWkvQPQ4=
+github.com/hashicorp/go-retryablehttp v0.7.0/go.mod h1:vAew36LZh98gCBJNLH42IQ1ER/9wtLZZ8meHqQvEYWY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.8.1 h1:dJKuHgqk1NNQlqoA6BTlM1Wf9DOH3NBjQyu0h9+AZZE=
package config
import (
+ "fmt"
"os"
"strconv"
)
type Config struct {
- LogLevel log.Level
ConsumerHost string
ConsumerPort int
InfoCoordinatorAddress string
- SDNRHost string
- SDNRPort int
+ SDNRAddress string
SDNRUser string
SDNPassword string
ORUToODUMapFile string
+ ConsumerCertPath string
+ ConsumerKeyPath string
+ LogLevel log.Level
}
func New() *Config {
return &Config{
- LogLevel: getLogLevel(),
ConsumerHost: getEnv("CONSUMER_HOST", ""),
ConsumerPort: getEnvAsInt("CONSUMER_PORT", 0),
InfoCoordinatorAddress: getEnv("INFO_COORD_ADDR", "http://enrichmentservice:8083"),
- SDNRHost: getEnv("SDNR_HOST", "http://localhost"),
- SDNRPort: getEnvAsInt("SDNR_PORT", 3904),
+ SDNRAddress: getEnv("SDNR_ADDR", "http://localhost:3904"),
SDNRUser: getEnv("SDNR_USER", "admin"),
SDNPassword: getEnv("SDNR_PASSWORD", "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U"),
ORUToODUMapFile: getEnv("ORU_TO_ODU_MAP_FILE", "o-ru-to-o-du-map.csv"),
+ ConsumerCertPath: getEnv("CONSUMER_CERT_PATH", "security/consumer.crt"),
+ ConsumerKeyPath: getEnv("CONSUMER_KEY_PATH", "security/consumer.key"),
+ LogLevel: getLogLevel(),
}
}
+func (c Config) String() string {
+ return fmt.Sprintf("ConsumerHost: %v, ConsumerPort: %v, InfoCoordinatorAddress: %v, SDNRAddress: %v, SDNRUser: %v, SDNRPassword: %v, ORUToODUMapFile: %v, ConsumerCertPath: %v, ConsumerKeyPath: %v, LogLevel: %v", c.ConsumerHost, c.ConsumerPort, c.InfoCoordinatorAddress, c.SDNRAddress, c.SDNRUser, c.SDNPassword, c.ORUToODUMapFile, c.ConsumerCertPath, c.ConsumerKeyPath, c.LogLevel)
+}
+
func getEnv(key string, defaultVal string) string {
if value, exists := os.LookupEnv(key); exists {
return value
func TestNew_envVarsSetConfigContainSetValues(t *testing.T) {
assertions := require.New(t)
- os.Setenv("LOG_LEVEL", "Debug")
os.Setenv("CONSUMER_HOST", "consumerHost")
os.Setenv("CONSUMER_PORT", "8095")
os.Setenv("INFO_COORD_ADDR", "infoCoordAddr")
- os.Setenv("SDNR_HOST", "sdnrHost")
- os.Setenv("SDNR_PORT", "3908")
+ os.Setenv("SDNR_ADDR", "sdnrHost:3908")
os.Setenv("SDNR_USER", "admin")
os.Setenv("SDNR_PASSWORD", "pwd")
os.Setenv("ORU_TO_ODU_MAP_FILE", "file")
+ os.Setenv("CONSUMER_CERT_PATH", "cert")
+ os.Setenv("CONSUMER_KEY_PATH", "key")
+ os.Setenv("LOG_LEVEL", "Debug")
t.Cleanup(func() {
os.Clearenv()
})
wantConfig := Config{
- LogLevel: log.DebugLevel,
ConsumerHost: "consumerHost",
ConsumerPort: 8095,
InfoCoordinatorAddress: "infoCoordAddr",
- SDNRHost: "sdnrHost",
- SDNRPort: 3908,
+ SDNRAddress: "sdnrHost:3908",
SDNRUser: "admin",
SDNPassword: "pwd",
ORUToODUMapFile: "file",
+ ConsumerCertPath: "cert",
+ ConsumerKeyPath: "key",
+ LogLevel: log.DebugLevel,
}
got := New()
os.Clearenv()
})
wantConfig := Config{
- LogLevel: log.InfoLevel,
ConsumerHost: "",
ConsumerPort: 0,
InfoCoordinatorAddress: "http://enrichmentservice:8083",
- SDNRHost: "http://localhost",
- SDNRPort: 3904,
+ SDNRAddress: "http://localhost:3904",
SDNRUser: "admin",
SDNPassword: "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U",
ORUToODUMapFile: "o-ru-to-o-du-map.csv",
+ ConsumerCertPath: "security/consumer.crt",
+ ConsumerKeyPath: "security/consumer.key",
+ LogLevel: log.InfoLevel,
}
got := New()
os.Clearenv()
})
wantConfig := Config{
- LogLevel: log.InfoLevel,
ConsumerHost: "",
ConsumerPort: 0,
InfoCoordinatorAddress: "http://enrichmentservice:8083",
- SDNRHost: "http://localhost",
- SDNRPort: 3904,
+ SDNRAddress: "http://localhost:3904",
SDNRUser: "admin",
SDNPassword: "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U",
ORUToODUMapFile: "o-ru-to-o-du-map.csv",
+ ConsumerCertPath: "security/consumer.crt",
+ ConsumerKeyPath: "security/consumer.key",
+ LogLevel: log.InfoLevel,
}
got := New()
assertions.Equal(&wantConfig, got)
if error := restclient.Put(lfh.config.SDNRAddress+sdnrPath, unlockMessage, lfh.client, lfh.config.SDNRUser, lfh.config.SDNRPassword); error == nil {
log.Debugf("Sent unlock message for O-RU: %v to O-DU: %v.", oRuId, oDuId)
} else {
- log.Warn(error)
+ log.Warn("Send of unlock message failed due to ", error)
}
} else {
- log.Warn(err)
+ log.Warn("Send of unlock message failed due to ", err)
}
}
import (
"bytes"
+ "crypto/tls"
"fmt"
"io"
+ "math"
"net/http"
+ "net/url"
+ "time"
+
+ "github.com/hashicorp/go-retryablehttp"
)
type RequestError struct {
}
func (e RequestError) Error() string {
- return fmt.Sprintf("Request failed due to error response with status: %v and body: %v", e.StatusCode, string(e.Body))
+ return fmt.Sprintf("error response with status: %v and body: %v", e.StatusCode, string(e.Body))
}
// HTTPClient interface
return do(http.MethodDelete, url, nil, client)
}
+func CreateClientCertificate(certPath string, keyPath string) (tls.Certificate, error) {
+ if cert, err := tls.LoadX509KeyPair(certPath, keyPath); err == nil {
+ return cert, nil
+ } else {
+ return tls.Certificate{}, fmt.Errorf("cannot create x509 keypair from cert file %s and key file %s due to: %v", certPath, keyPath, err)
+ }
+}
+
+func CreateRetryClient(cert tls.Certificate) *http.Client {
+ rawRetryClient := retryablehttp.NewClient()
+ rawRetryClient.RetryWaitMax = time.Minute
+ rawRetryClient.RetryMax = math.MaxInt
+ rawRetryClient.HTTPClient.Transport = getSecureTransportWithoutVerify(cert)
+
+ client := rawRetryClient.StandardClient()
+ return client
+}
+
+func IsUrlSecure(configUrl string) bool {
+ u, _ := url.Parse(configUrl)
+ return u.Scheme == "https"
+}
+
+func getSecureTransportWithoutVerify(cert tls.Certificate) *http.Transport {
+ return &http.Transport{
+ TLSClientConfig: &tls.Config{
+ Certificates: []tls.Certificate{
+ cert,
+ },
+ InsecureSkipVerify: true,
+ },
+ }
+}
+
func do(method string, url string, body []byte, client HTTPClient, userInfo ...string) error {
if req, reqErr := http.NewRequest(method, url, bytes.NewBuffer(body)); reqErr == nil {
if body != nil {
import (
"bytes"
+ "crypto/tls"
"fmt"
"io/ioutil"
+ "math"
"net/http"
+ "reflect"
"testing"
+ "time"
+ "github.com/hashicorp/go-retryablehttp"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"oransc.org/usecase/oruclosedloop/mocks"
StatusCode: http.StatusBadRequest,
Body: []byte("error"),
}
- assertions.Equal("Request failed due to error response with status: 400 and body: error", actualError.Error())
+ assertions.Equal("error response with status: 400 and body: error", actualError.Error())
}
func TestPutWithoutAuth(t *testing.T) {
})
}
}
+
+func Test_createClientCertificate(t *testing.T) {
+ assertions := require.New(t)
+ wantedCert, _ := tls.LoadX509KeyPair("../../security/consumer.crt", "../../security/consumer.key")
+ type args struct {
+ certPath string
+ keyPath string
+ }
+ tests := []struct {
+ name string
+ args args
+ wantCert tls.Certificate
+ wantErr error
+ }{
+ {
+ name: "Paths to cert info ok should return cerftificate",
+ args: args{
+ certPath: "../../security/consumer.crt",
+ keyPath: "../../security/consumer.key",
+ },
+ wantCert: wantedCert,
+ },
+ {
+ name: "Paths to cert info not ok should return error with info about error",
+ args: args{
+ certPath: "wrong_cert",
+ keyPath: "wrong_key",
+ },
+ wantErr: fmt.Errorf("cannot create x509 keypair from cert file wrong_cert and key file wrong_key due to: open wrong_cert: no such file or directory"),
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ cert, err := CreateClientCertificate(tt.args.certPath, tt.args.keyPath)
+ assertions.Equal(tt.wantCert, cert, tt.name)
+ assertions.Equal(tt.wantErr, err, tt.name)
+ })
+ }
+}
+
+func Test_CreateRetryClient(t *testing.T) {
+ assertions := require.New(t)
+
+ client := CreateRetryClient(tls.Certificate{})
+
+ transport := client.Transport
+ assertions.Equal("*retryablehttp.RoundTripper", reflect.TypeOf(transport).String())
+ retryableTransport := transport.(*retryablehttp.RoundTripper)
+ retryableClient := retryableTransport.Client
+ assertions.Equal(time.Minute, retryableClient.RetryWaitMax)
+ assertions.Equal(math.MaxInt, retryableClient.RetryMax)
+}
+
+func TestIsUrlSecured(t *testing.T) {
+ assertions := require.New(t)
+
+ assertions.True(IsUrlSecure("https://url"))
+
+ assertions.False(IsUrlSecure("http://url"))
+}
package main
import (
+ "crypto/tls"
"encoding/json"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
- "time"
"github.com/gorilla/mux"
log "github.com/sirupsen/logrus"
ListenAndServe() error
}
-const timeoutHTTPClient = time.Second * 5
const jobId = "14e7bb84-a44d-44c1-90b7-6995a92ad43c"
var jobRegistrationInfo = struct {
configuration = config.New()
log.SetLevel(configuration.LogLevel)
-
- client = &http.Client{
- Timeout: timeoutHTTPClient,
- }
+ log.Debug("Using configuration: ", configuration)
consumerPort = fmt.Sprint(configuration.ConsumerPort)
jobRegistrationInfo.JobResultUri = configuration.ConsumerHost + ":" + consumerPort
linkfailureConfig = linkfailure.Configuration{
- SDNRAddress: configuration.SDNRHost + ":" + fmt.Sprint(configuration.SDNRPort),
+ SDNRAddress: configuration.SDNRAddress,
SDNRUser: configuration.SDNRUser,
SDNRPassword: configuration.SDNPassword,
}
log.Fatalf("Unable to create LookupService due to inability to get O-RU-ID to O-DU-ID map. Cause: %v", initErr)
}
+ var cert tls.Certificate
+ if c, err := restclient.CreateClientCertificate(configuration.ConsumerCertPath, configuration.ConsumerKeyPath); err == nil {
+ cert = c
+ } else {
+ log.Fatalf("Stopping producer due to error: %v", err)
+ }
+ client = restclient.CreateRetryClient(cert)
+
go func() {
- startServer(&http.Server{
- Addr: ":" + consumerPort,
- Handler: getRouter(),
- })
- deleteJob()
+ startServer()
os.Exit(1) // If the startServer function exits, it is because there has been a failure in the server, so we exit.
}()
if configuration.ConsumerHost == "" || configuration.ConsumerPort == 0 {
return fmt.Errorf("consumer host and port must be provided")
}
+
+ if configuration.ConsumerCertPath == "" || configuration.ConsumerKeyPath == "" {
+ return fmt.Errorf("missing CONSUMER_CERT and/or CONSUMER_KEY")
+ }
+
return nil
}
return r
}
-func startServer(server Server) {
- if err := server.ListenAndServe(); err != nil {
+func startServer() {
+ var err error
+ if restclient.IsUrlSecure(configuration.ConsumerHost) {
+ err = http.ListenAndServeTLS(fmt.Sprintf(":%v", configuration.ConsumerPort), configuration.ConsumerCertPath, configuration.ConsumerKeyPath, getRouter())
+ } else {
+ err = http.ListenAndServe(fmt.Sprintf(":%v", configuration.ConsumerPort), getRouter())
+ }
+ if err != nil {
log.Errorf("Server stopped unintentionally due to: %v. Deleteing job.", err)
if deleteErr := deleteJob(); deleteErr != nil {
log.Error(fmt.Sprintf("Unable to delete consumer job due to: %v. Please remove job %v manually.", deleteErr, jobId))
doInit()
wantedConfiguration := &config.Config{
- LogLevel: log.InfoLevel,
ConsumerHost: "consumerHost",
ConsumerPort: 8095,
InfoCoordinatorAddress: "http://enrichmentservice:8083",
- SDNRHost: "http://localhost",
- SDNRPort: 3904,
+ SDNRAddress: "http://localhost:3904",
SDNRUser: "admin",
SDNPassword: "Kp8bJ4SXszM0WXlhak3eHlcse2gAw84vaoGGmJvUy2U",
ORUToODUMapFile: "o-ru-to-o-du-map.csv",
+ ConsumerCertPath: "security/consumer.crt",
+ ConsumerKeyPath: "security/consumer.key",
+ LogLevel: log.InfoLevel,
}
assertions.Equal(wantedConfiguration, configuration)
assertions.Equal(wantedConfiguration.ConsumerHost+":"+fmt.Sprint(wantedConfiguration.ConsumerPort), jobRegistrationInfo.JobResultUri)
wantedLinkFailureConfig := linkfailure.Configuration{
- SDNRAddress: wantedConfiguration.SDNRHost + ":" + fmt.Sprint(wantedConfiguration.SDNRPort),
+ SDNRAddress: wantedConfiguration.SDNRAddress,
SDNRUser: wantedConfiguration.SDNRUser,
SDNRPassword: wantedConfiguration.SDNPassword,
}
name: "Valid config, should return nil",
args: args{
configuration: &config.Config{
- ConsumerHost: "host",
- ConsumerPort: 80,
+ ConsumerHost: "host",
+ ConsumerPort: 80,
+ ConsumerCertPath: "security/consumer.crt",
+ ConsumerKeyPath: "security/consumer.key",
},
},
},
assertions.Equal("/admin/stop", path)
}
-func Test_startServer_shouldDeleteJobWhenServerStopsWithErrorAndLog(t *testing.T) {
- assertions := require.New(t)
-
- var buf bytes.Buffer
- log.SetOutput(&buf)
-
- os.Setenv("CONSUMER_PORT", "wrong")
- t.Cleanup(func() {
- log.SetOutput(os.Stderr)
- })
-
- mockServer := &mocks.Server{}
- mockServer.On("ListenAndServe").Return(errors.New("Server failure"))
-
- startServer(mockServer)
-
- log := buf.String()
- assertions.Contains(log, "level=error")
- assertions.Contains(log, "Server stopped unintentionally due to: Server failure. Deleteing job.")
- assertions.Contains(log, "Please remove job 14e7bb84-a44d-44c1-90b7-6995a92ad43c manually")
-}
-
func Test_startHandler(t *testing.T) {
assertions := require.New(t)
+++ /dev/null
-// Code generated by mockery v1.0.0. DO NOT EDIT.
-
-package mocks
-
-import mock "github.com/stretchr/testify/mock"
-
-// Server is an autogenerated mock type for the Server type
-type Server struct {
- mock.Mock
-}
-
-// ListenAndServe provides a mock function with given fields:
-func (_m *Server) ListenAndServe() error {
- ret := _m.Called()
-
- var r0 error
- if rf, ok := ret.Get(0).(func() error); ok {
- r0 = rf()
- } else {
- r0 = ret.Error(0)
- }
-
- return r0
-}
--- /dev/null
+-----BEGIN CERTIFICATE-----
+MIIDXzCCAkegAwIBAgIUEbuDTP0ixwxCxCQ9tR5DijGCbtkwDQYJKoZIhvcNAQEL
+BQAwPzELMAkGA1UEBhMCc2UxDDAKBgNVBAoMA0VTVDERMA8GA1UECwwIRXJpY3Nz
+b24xDzANBgNVBAMMBnNlcnZlcjAeFw0yMTEwMTkxNDA1MzVaFw0zMTEwMTcxNDA1
+MzVaMD8xCzAJBgNVBAYTAnNlMQwwCgYDVQQKDANFU1QxETAPBgNVBAsMCEVyaWNz
+c29uMQ8wDQYDVQQDDAZzZXJ2ZXIwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
+AoIBAQDnH4imV8kx/mXz6BDbq8e4oZGqGgv7V837iNspj/zIZXhEMP9311fdsZEE
+Y6VWU47bSYRn2xJOP+wmfKewbw0OcEWu/RkdvO7Y0VIVrlbEJYu88ZjK14dMUpfe
+72iMbTc5q2uYi0ImB5/m3jyMSXgso6NDWuvXrp2VSWjb1tG++des9rhvyrZyNrua
+I4iOnMvvuc71gvHol7appRu3+LRTQFYsAizdfHEQ9k949MZH4fiIu5NmCT/wNJVo
+uUZYYJseFhOlIANaXn6qmz7kKVYfxfV+Z5EccaRixaClCFwyRdmjgLyyeuI4/QPD
+x9PjmGmf6eOEC2ZHBi4OHwjIzmLnAgMBAAGjUzBRMB0GA1UdDgQWBBRjeDLPpLm2
+W623wna7xBCbHxtxVjAfBgNVHSMEGDAWgBRjeDLPpLm2W623wna7xBCbHxtxVjAP
+BgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQAbFUAWFZaIMXmd5qv/
+xJYr1oPJpsmbgWGRWZWDZqbUabvWObyXlDJWIau60BerfcC5TmyElBjTyONSGwCT
+tq+SVB0PXpgqa8ZQ25Ytn2AMDFWhrGbOefCXs6te3HGq6BNubTWrOVIvJypCwC95
++iXVuDd4eg+n2fWv7h7fZRZHum/zLoRxB2lKoMMbc/BQX9hbtP6xyvIVvaYdhcJw
+VzJJGIDqpMiMH6IBaOFSmgfOyGblGKAicj3E3kpGBfadLx3R+9V6aG7zyBnVbr2w
+YJbV2Ay4PrF+PTpCMB/mNwC5RBTYHpSNdrCMSyq3I+QPVJq8dPJr7fd1Uwl3WHqX
+FV0h
+-----END CERTIFICATE-----
--- /dev/null
+-----BEGIN PRIVATE KEY-----
+MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDnH4imV8kx/mXz
+6BDbq8e4oZGqGgv7V837iNspj/zIZXhEMP9311fdsZEEY6VWU47bSYRn2xJOP+wm
+fKewbw0OcEWu/RkdvO7Y0VIVrlbEJYu88ZjK14dMUpfe72iMbTc5q2uYi0ImB5/m
+3jyMSXgso6NDWuvXrp2VSWjb1tG++des9rhvyrZyNruaI4iOnMvvuc71gvHol7ap
+pRu3+LRTQFYsAizdfHEQ9k949MZH4fiIu5NmCT/wNJVouUZYYJseFhOlIANaXn6q
+mz7kKVYfxfV+Z5EccaRixaClCFwyRdmjgLyyeuI4/QPDx9PjmGmf6eOEC2ZHBi4O
+HwjIzmLnAgMBAAECggEBAMq1lZyPkh8PCUyLVX3VhC4jRybyAWBI+piKx+4EI6l/
+laP5dZcegCoo+w/mdbTpRHqAWGjec4e9+Nkoq8rLG6B2SCfaRJUYiEQSEvSBHAid
+BZqKK4B82GXQavNU91Vy1OT3vD7mpPXF6jEK6gAA0C4Wt7Lzo7ZfqEavRBDMsNnV
+jOxLwWJCFSKhfeA6grJCnagmEDKSxxazlNBgCahjPf/+IRJZ7Vk4Zjq+I/5nWKf8
+lYaQExKDIANuM/jMRnYVq5k4g2MKHUADWGTSvG1DMJiMHzdxb2miZovpIkEE86bC
+wKBuele9IR6Rb/wygYj7WdaWysZ081OT7mNyju08e4ECgYEA8+q7vv4Nlz8bAcmY
+Ip5517s15M5D9iLsE2Q5m9Zs99rUyQv0E8ekpChhtTSdvj+eNl39O4hji46Gyceu
+MYPfNL7+YWaFDxuyaXEe/OFuKbFqgE1p08HXFcQJCvgqD1MWO5b9BRDc0qpNFIA8
+eN9xFBMQ2UFaALBMAup7Ef85q4kCgYEA8pKOAIsgmlnO8P9cPzkMC1oozslraAti
+1JnOJjwPLoHFubtH2u7WoIkSvNfeNwfrsVXwAP0m7C8p7qhYppS+0XGjKpYNSezK
+1GCqCVv8R1m+AsSseSUUaQCmEydd+gQbBq3r4u3wU3ylrgAoR3m+7SVyhvD+vbwI
+7+zfj+O3zu8CgYEAqaAsQH5c5Tm1hmCztB+RjD1dFWl8ScevdSzWA1HzJcrA/6+Y
+ZckI7kBG8sVMjemgFR735FbNI1hS1DBRK44Rw5SvQv0Qu5j/UeShMCt1ePkwn1k2
+p1S+Rxy1TTOXzGBzra0q+ELpzncwc3lalJSPBu7bYLrZ5HC167E1NSbQ7EECgYBo
+e/IIj+TyNz7pFcVhQixK84HiWGYYQddHJhzi4TnU2XcWonG3/uqZ6ZEVoJIJ+DJw
+h0jC1EggscwJDaBp2GY9Bwq2PD3rGsDfK+fx8ho/jYtH2/lCkVMyS2I9m9Zh68TM
+YrvZWo4LGASxZ0XyS6GOunOTZlkD1uuulMRTUU4KJwKBgQCwyjs0/ElVFvO0lPIC
+JJ//B5rqI7hNMJuTBvr4yiqVZdbgFukaU7FBVyNYDMpZi/nRbpglm+psFcwXtL8n
+bHOIGLkh8vB7OuETRYhXs567lPYtO4BmHZlXW70Sq/0xqi/Mmz1RuEg4SQ1Ug5oy
+wG6IV5EWSQAhsGirdybQ+bY7Kw==
+-----END PRIVATE KEY-----
--- /dev/null
+// -
+// ========================LICENSE_START=================================
+// O-RAN-SC
+// %%
+// Copyright (C) 2021: Nordix Foundation
+// %%
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ========================LICENSE_END===================================
+//
+
+package main
+
+import (
+ "flag"
+ "fmt"
+ "io"
+ "net/http"
+
+ "github.com/gorilla/mux"
+)
+
+func main() {
+ port := flag.Int("port", 3904, "The port this SDNR stub will listen on")
+ flag.Parse()
+
+ r := mux.NewRouter()
+ r.HandleFunc("/rests/data/network-topology:network-topology/topology=topology-netconf/node={O-DU-ID}/yang-ext:mount/o-ran-sc-du-hello-world:network-function/du-to-ru-connection={O-RU-ID}", handleData)
+
+ fmt.Println("Starting SDNR on port: ", *port)
+ http.ListenAndServe(fmt.Sprintf(":%v", *port), r)
+
+}
+
+func handleData(w http.ResponseWriter, req *http.Request) {
+ defer req.Body.Close()
+ if reqData, err := io.ReadAll(req.Body); err == nil {
+ fmt.Println("SDNR received body: ", string(reqData))
+ }
+}