<properties>
<java.version>11</java.version>
<springfox.version>3.0.0</springfox.version>
- <immutable.version>2.8.2</immutable.version>
- <sdk.version>1.1.6</sdk.version>
+ <immutable.version>2.8.2</immutable.version>
<swagger.version>2.1.12</swagger.version>
<json.version>20190722</json.version>
<maven-compiler-plugin.version>3.8.0</maven-compiler-plugin.version>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>30.0-jre</version>
+ <version>31.0.1-jre</version>
</dependency>
<dependency>
<groupId>org.springdoc</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
- <dependency>
- <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
- <artifactId>cbs-client</artifactId>
- <version>${sdk.version}</version>
- </dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<system>JIRA</system>
<url>https://jira.o-ran-sc.org/</url>
</issueManagement>
-</project>
\ No newline at end of file
+</project>
RUN mkdir -p /var/log/dmaap-adaptor-service
RUN mkdir -p /opt/app/dmaap-adaptor-service/etc/cert/
RUN mkdir -p /var/dmaap-adaptor-service
-RUN chmod -R 777 /var/dmaap-adaptor-service
ADD /config/application.yaml /opt/app/dmaap-adaptor-service/config/application.yaml
ADD /config/application_configuration.json /opt/app/dmaap-adaptor-service/data/application_configuration.json_example
ADD /config/keystore.jks /opt/app/dmaap-adaptor-service/etc/cert/keystore.jks
ADD /config/truststore.jks /opt/app/dmaap-adaptor-service/etc/cert/truststore.jks
-RUN chmod -R 777 /opt/app/dmaap-adaptor-service/config/
+ARG user=nonrtric
+ARG group=nonrtric
+
+RUN groupadd $user && \
+ useradd -r -g $group $user
+RUN chown -R $user:$group /opt/app/dmaap-adaptor-service
+RUN chown -R $user:$group /var/log/dmaap-adaptor-service
+RUN chown -R $user:$group /var/dmaap-adaptor-service
+
+USER ${user}
ADD target/${JAR} /opt/app/dmaap-adaptor-service/dmaap-adaptor.jar
CMD ["java", "-jar", "/opt/app/dmaap-adaptor-service/dmaap-adaptor.jar"]
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
- <version>2.6.2</version>
+ <version>2.5.8</version>
<relativePath />
</parent>
<groupId>org.o-ran-sc.nonrtric</groupId>
<spotless-maven-plugin.version>1.24.3</spotless-maven-plugin.version>
<swagger-codegen-maven-plugin.version>3.0.11</swagger-codegen-maven-plugin.version>
<docker-maven-plugin>0.30.0</docker-maven-plugin>
- <javax.ws.rs-api.version>2.1.1</javax.ws.rs-api.version>
<sonar-maven-plugin.version>3.7.0.1746</sonar-maven-plugin.version>
<jacoco-maven-plugin.version>0.8.5</jacoco-maven-plugin.version>
<exec.skip>true</exec.skip>
<dependency>
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
- <version>1.3.7</version>
+ <version>1.3.9</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<system>JIRA</system>
<url>https://jira.o-ran-sc.org/</url>
</issueManagement>
-</project>
\ No newline at end of file
+</project>
public synchronized void restartNonRunningTopics() {
for (String typeId : this.consumers.keySet()) {
for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) {
- restartTopic(consumer);
+ if (!consumer.isRunning()) {
+ restartTopic(consumer);
+ }
}
}
}
"type": "string"
},
"maxConcurrency": {
- "type": "integer"
+ "type": "integer",
+ "minimum": 1
},
"bufferTimeout": {
"type": "object",
"properties": {
"maxSize": {
- "type": "integer"
+ "type": "integer",
+ "minimum": 1
},
"maxTimeMiliseconds": {
- "type": "integer"
+ "type": "integer",
+ "minimum": 0,
+ "maximum": 160000
}
},
"additionalProperties": false,
() -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
}
- private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
+ public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
testErrorCode(request, expStatus, responseContains, true);
}
- private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains,
+ public static void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains,
boolean expectApplicationProblemJsonMediaType) {
StepVerifier.create(request) //
.expectSubscription() //
.verify();
}
- private boolean checkWebClientError(Throwable throwable, HttpStatus expStatus, String responseContains,
+ private static boolean checkWebClientError(Throwable throwable, HttpStatus expStatus, String responseContains,
boolean expectApplicationProblemJsonMediaType) {
assertTrue(throwable instanceof WebClientResponseException);
WebClientResponseException responseException = (WebClientResponseException) throwable;
import org.springframework.boot.web.embedded.tomcat.TomcatServletWebServerFactory;
import org.springframework.boot.web.servlet.server.ServletWebServerFactory;
import org.springframework.context.annotation.Bean;
+import org.springframework.http.HttpStatus;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit.jupiter.SpringExtension;
+
@SuppressWarnings("java:S3577") // Rename class
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
}
@Test
- void testWholeChain() throws Exception {
+ void testKafkaJobParameterOutOfRange() {
+ await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
+ final String TYPE_ID = "KafkaInformationType";
+
+ Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 170 * 1000), 1);
+
+ ConsumerJobInfo jobInfo =
+ new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", consumerUri(), "");
+ String body = gson.toJson(jobInfo);
+
+ ApplicationTest.testErrorCode(restClient().put(jobUrl("KAFKA_JOB_ID"), body), HttpStatus.BAD_REQUEST,
+ "Json validation failure");
+
+ }
+
+ @Test
+ void testDmaapMessage() throws Exception {
await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
createInformationJobInIcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*");
deleteInformationJobInIcs(DMAAP_JOB_ID);
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
-
}
}
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
- props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producer");
+ props.put(ProducerConfig.CLIENT_ID_CONFIG, "sample-producerx");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
.doOnError(e -> logger.error("Send failed", e)) //
.blockLast();
+ sender.close();
+
}
private void verifiedReceivedByConsumer(String... strings) {
}
}
+ @Test
+ void simpleCase() throws InterruptedException {
+ final String JOB_ID = "ID";
+
+ // Register producer, Register types
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ Thread.sleep(4000);
+ var dataToSend = Flux.just(senderRecord("Message"));
+ sendDataToStream(dataToSend);
+
+ verifiedReceivedByConsumer("Message");
+
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
+ }
+
@Test
void kafkaIntegrationTest() throws Exception {
final String JOB_ID1 = "ID1";
assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs. One buffering and one with a filter
- this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 10, 20), JOB_ID1,
restClient());
this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
+ Thread.sleep(2000);
var dataToSend = Flux.range(1, 3).map(i -> senderRecord("Message_" + i)); // Message_1, Message_2 etc.
sendDataToStream(dataToSend);
}
func getAsJSONArray(rawMsgs [][]byte) []byte {
- json := `"[`
+ if len(rawMsgs) == 0 {
+ return []byte("")
+ }
+ strings := ""
for i := 0; i < len(rawMsgs); i++ {
- msg := string(rawMsgs[i])
- json = json + strings.ReplaceAll(msg, "\"", "\\\"")
- if i < len(rawMsgs)-1 {
- json = json + ","
- }
+ strings = strings + makeIntoString(rawMsgs[i])
+ strings = addSeparatorIfNeeded(strings, i, len(rawMsgs))
}
- return []byte(json + `]"`)
+ return []byte(wrapInJSONArray(strings))
+}
+
+func makeIntoString(rawMsg []byte) string {
+ return `"` + strings.ReplaceAll(string(rawMsg), "\"", "\\\"") + `"`
+}
+
+func addSeparatorIfNeeded(strings string, position, length int) string {
+ if position < length-1 {
+ strings = strings + ","
+ }
+ return strings
+}
+
+func wrapInJSONArray(strings string) string {
+ return "[" + strings + "]"
}
func (j *job) waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
distributeClientMock := NewTestClient(func(req *http.Request) *http.Response {
if req.URL.String() == "http://consumerHost/target" {
assertions.Equal(req.Method, "POST")
- assertions.Equal(`"[{\"data\": 1},{\"data\": 2}]"`, getBodyAsString(req, t))
+ assertions.Equal(`["{\"data\": 1}","{\"data\": 2}","ABCDEFGH"]`, getBodyAsString(req, t))
assertions.Equal("application/json", req.Header.Get("Content-Type"))
wg.Done()
return &http.Response{
go func() {
jobUnderTest.messagesChannel <- []byte(`{"data": 1}`)
jobUnderTest.messagesChannel <- []byte(`{"data": 2}`)
+ jobUnderTest.messagesChannel <- []byte("ABCDEFGH")
}()
if waitTimeout(&wg, 2*time.Second) {
MaxTimeMiliseconds: 200,
})
- assertions.Equal([]byte("\"[0,1]\""), msgs)
+ assertions.Equal([]byte("[\"0\",\"1\"]"), msgs)
}
func TestJobReadBufferedWhenTimeout_shouldOnlyReturnMessagesSentBeforeTimeout(t *testing.T) {
assertions := require.New(t)
MaxTimeMiliseconds: 30,
})
- assertions.Equal([]byte("\"[0,1]\""), msgs)
+ assertions.Equal([]byte("[\"0\",\"1\"]"), msgs)
}
func fillMessagesBuffer(mc chan []byte) {
RUN mkdir -p /var/log/information-coordinator-service
RUN mkdir -p /opt/app/information-coordinator-service/etc/cert/
RUN mkdir -p /var/information-coordinator-service
-RUN chmod -R 777 /var/information-coordinator-service
EXPOSE 8083 8434
ADD /config/keystore.jks /opt/app/information-coordinator-service/etc/cert/keystore.jks
ADD /config/truststore.jks /opt/app/information-coordinator-service/etc/cert/truststore.jks
+ARG user=nonrtric
+ARG group=nonrtric
-RUN chmod -R 777 /opt/app/information-coordinator-service/config/
+RUN groupadd $user && \
+ useradd -r -g $group $user
+RUN chown -R $user:$group /opt/app/information-coordinator-service
+RUN chown -R $user:$group /var/log/information-coordinator-service
+RUN chown -R $user:$group /var/information-coordinator-service
+
+USER ${user}
CMD ["java", "-jar", "/opt/app/information-coordinator-service/information-coordinator-service.jar"]
-Subproject commit c6cca20e3fd10842be0ca782d143dfa59d5c086e
+Subproject commit 0f8b20544745afaf9c7b38140b9516667d9c4752
ADD /config/r-app-catalogue-keystore.jks /opt/app/r-app-catalogue/etc/cert/keystore.jks
ADD target/${JAR} /opt/app/r-app-catalogue/r-app-catalogue.jar
+ARG user=nonrtric
+ARG group=nonrtric
-RUN chmod -R 777 /opt/app/r-app-catalogue/config/
+RUN groupadd $user && \
+ useradd -r -g $group $user
+RUN chown -R $user:$group /opt/app/r-app-catalogue
+RUN chown -R $user:$group /var/log/r-app-catalogue
+
+USER ${user}
CMD ["java", "-jar", "/opt/app/r-app-catalogue/r-app-catalogue.jar"]
RUN pip install -r requirements.txt
+ARG user=nonrtric
+ARG group=nonrtric
+
+RUN groupadd $user && \
+ useradd -r -g $group $user
+RUN chown -R $user:$group /usr/src/app/
+
+USER ${user}
+
CMD [ "python3", "-u", "main.py" ]