<springdoc.version>2.0.2</springdoc.version>
<springdoc.openapi-ui.version>1.6.14</springdoc.openapi-ui.version>
<exec.skip>true</exec.skip>
+
</properties>
<dependencies>
<dependency>
private final AsyncRestClient restClient;
private final ApplicationConfig applicationConfig;
- private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder() //
- .disableHtmlEscaping() //
- .excludeFieldsWithoutExposeAnnotation() //
- .create();
-
@Getter
private boolean isRegisteredInIcs = false;
private void createSubscription() {
putInfoJob() //
.doOnError(this::handleRegistrationFailure)
- .retryWhen(Retry.fixedDelay(100, Duration.ofMillis(5 * 1000))) //
+ .retryWhen(Retry.fixedDelay(100, Duration.ofMillis(5L * 1000))) //
.subscribe( //
null, //
this::handleRegistrationFailure, //
import java.io.ByteArrayInputStream;
import java.util.zip.GZIPInputStream;
+import lombok.Getter;
import lombok.ToString;
import org.apache.kafka.common.header.Header;
@ToString
public class DataFromKafkaTopic {
+ @Getter
private final byte[] key;
+ @Getter
private final byte[] value;
+ @Getter
private String stringValue = null;
private static final Logger logger = LoggerFactory.getLogger(DataFromKafkaTopic.class);
@Getter
private Disposable subscription;
- private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
private final ApplicationConfig applConfig;
private final InfluxDBClient influxClient;
this.subscription = input.flatMap(this::storeInInflux) //
.subscribe(this::handleSentOk, //
this::handleExceptionInStream, //
- () -> stop());
+ this::stop);
}
return measInfoList.getMeasTypes().getMeasType(measResult.getP());
}
+ @SuppressWarnings("java:S1172")
private void addCounterFieldToPoint(Point point, PmReport.MeasInfoList measInfoList,
PmReport.MeasValuesList measValueList, PmReport.MeasResult measResult) {
String measType = measType(measResult, measInfoList);
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class);
private final ApplicationConfig applicationConfig;
private Flux<DataFromKafkaTopic> dataFromTopic;
- private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
public KafkaTopicListener(ApplicationConfig applConfig) {
this.applicationConfig = applConfig;
}
private void onError(Throwable t) {
- if (t instanceof WebClientResponseException) {
- WebClientResponseException e = (WebClientResponseException) t;
+ if (t instanceof WebClientResponseException e) {
logger.debug("Response error: {}", e.getResponseBodyAsString());
}
}
}
}
+ @SuppressWarnings("java:S6204")
private SslContext createSslContextRejectingUntrustedPeers(String trustStorePath, String trustStorePass,
KeyManagerFactory keyManager)
throws NoSuchAlgorithmException, CertificateException, IOException, KeyStoreException {
@SerializedName("info_type_id")
@Expose
+ @Getter
public String infoTypeId = "";
@SerializedName("job_owner")
@Expose
+ @Getter
public String owner = "";
@SerializedName("job_definition")
@Expose
+ @Getter
public PmJobParameters jobDefinition;
public ConsumerJobInfo() {}
-// ============LICENSE_START===============================================
-// Copyright (C) 2023 Nordix Foundation. All rights reserved.
-// ========================================================================
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-// ============LICENSE_END=================================================
-//
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * Copyright (C) 2023 Nordix Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
package org.oran.pmlog.oauth2;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonMappingException;
-
import java.util.Base64;
import java.util.HashSet;
import java.util.Set;
-
import lombok.ToString;
-
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.oran.pmlog.exceptions.ServiceException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class OAuthBearerTokenJwt implements OAuthBearerToken {
- private static final Logger logger = LoggerFactory.getLogger(OAuthBearerTokenJwt.class);
private static final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
private final String jwtTokenRaw;
}
public static OAuthBearerTokenJwt create(String tokenRaw)
- throws ServiceException, JsonMappingException, JsonProcessingException {
+ throws ServiceException {
String[] chunks = tokenRaw.split("\\.");
Base64.Decoder decoder = Base64.getUrlDecoder();
if (chunks.length < 2) {
-// ============LICENSE_START===============================================
-// Copyright (C) 2023 Nordix Foundation. All rights reserved.
-// ========================================================================
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-// ============LICENSE_END=================================================
-//
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * Copyright (C) 2023 Nordix Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
package org.oran.pmlog.oauth2;
}
@Override
- public void close() {}
+ public void close() {
+ /*This method intentionally left empty.
+ Close functionality will be implemented later.*/
+ }
@Override
public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
if (!this.isConfigured)
throw new IllegalStateException("Callback handler not configured");
for (Callback callback : callbacks) {
- logger.debug("callback " + callback.toString());
- if (callback instanceof OAuthBearerTokenCallback) {
- handleCallback((OAuthBearerTokenCallback) callback);
- } else if (callback instanceof SaslExtensionsCallback) {
- handleCallback((SaslExtensionsCallback) callback);
+ logger.debug("callback {}", callback);
+ if (callback instanceof OAuthBearerTokenCallback oAuthBearerTokenCallback) {
+ handleCallback(oAuthBearerTokenCallback);
+ } else if (callback instanceof SaslExtensionsCallback saslExtensionsCallback) {
+ handleCallback(saslExtensionsCallback);
} else {
logger.error("Unsupported callback: {}", callback);
throw new UnsupportedCallbackException(callback);
}
}
+ public boolean isConfigured() {
+ return isConfigured;
+ }
+
}
private static SecurityContext instance;
@Setter
+ @Getter
private Path authTokenFilePath;
public SecurityContext(@Value("${app.auth-token-file:}") String authTokenFilename) {
- instance = this;
+ instance = this; //NOSONAR
if (!authTokenFilename.isEmpty()) {
this.authTokenFilePath = Path.of(authTokenFilename);
}
package org.oran.pmlog;
import static org.assertj.core.api.Assertions.assertThat;
-import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
-
import org.json.JSONObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.oran.pmlog.clients.AsyncRestClient;
import org.oran.pmlog.clients.AsyncRestClientFactory;
import org.oran.pmlog.configuration.ApplicationConfig;
-import org.oran.pmlog.configuration.ConsumerJobInfo;
import org.oran.pmlog.configuration.WebClientConfig;
import org.oran.pmlog.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.pmlog.oauth2.SecurityContext;
Mockito.verify(TestBeanFactory.influxStore, Mockito.times(1)).store(any(), any());
}
- @Test
- void testJobCreation() throws Exception {
- await().untilAsserted(() -> assertThat(consumerRegstrationTask.isRegisteredInIcs()).isTrue());
- ConsumerJobInfo createdJob = this.icsSimulatorController.testResults.createdJob;
- assertThat(createdJob).isNotNull();
- assertThat(createdJob.jobDefinition.getDeliveryInfo().getTopic())
- .isEqualTo(applicationConfig.getKafkaInputTopic());
- }
-
private AsyncRestClient restClient() {
return restClient(false);
}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2023 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.pmlog;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.TestPropertySource;
+
+@SpringBootTest
+@TestPropertySource(properties = { "server.http-port=8080" }) // Set the http-port property
+class BeanFactoryTest {
+
+ @Autowired
+ private BeanFactory beanFactory;
+
+ @Test
+ void testApplicationConfigBean() {
+ // Ensure that the ApplicationConfig bean is created
+ assertNotNull(beanFactory.getApplicationConfig());
+ }
+
+ @Test
+ void testServletContainerBean() {
+ // Ensure that the ServletWebServerFactory bean is created
+ assertNotNull(beanFactory.servletContainer());
+ }
+
+ @Test
+ void testKafkaTopicListenerBean() {
+ // Ensure that the KafkaTopicListener bean is created with ApplicationConfig dependency
+ assertNotNull(beanFactory.getKafkaTopicListener(beanFactory.getApplicationConfig()));
+ }
+
+ @Test
+ void testInfluxStoreBean() {
+ // Ensure that the InfluxStore bean is created with ApplicationConfig and KafkaTopicListener dependencies
+ assertNotNull(beanFactory.getInfluxStore(beanFactory.getApplicationConfig(), beanFactory.getKafkaTopicListener(beanFactory.getApplicationConfig())));
+ }
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2023 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.pmlog;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.kafka.common.header.Header;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+class DataFromKafkaTopicTest {
+
+ private DataFromKafkaTopic data;
+ private List<Header> headers;
+
+ @BeforeEach
+ void setUp() {
+ headers = new ArrayList<>();
+ data = new DataFromKafkaTopic(headers, null, null);
+ }
+
+ @Test
+ void testConstructor_NullKeyAndValue() {
+ assertNotNull(data);
+ assertArrayEquals(new byte[0], data.getKey());
+ assertArrayEquals(new byte[0], data.getValue());
+ assertEquals(headers, data.headers);
+ }
+
+ @Test
+ void testValueAsString_Unzipped() {
+ data.valueAsString();
+ assertEquals("", data.valueAsString());
+ assertFalse(data.isZipped()); // Not zipped
+
+ // Ensure that calling valueAsString again does not recompute the value
+ data.valueAsString();
+ assertEquals("", data.valueAsString());
+ }
+
+ @Test
+ void testValueAsString_Zipped() {
+ headers.add(new Header() {
+ @Override
+ public String key() {
+ return DataFromKafkaTopic.ZIPPED_PROPERTY;
+ }
+
+ @Override
+ public byte[] value() {
+ return new byte[]{1};
+ }
+ });
+
+ // Mock GZIPInputStream behavior
+ ByteArrayInputStream inputStream = Mockito.mock(ByteArrayInputStream.class);
+ when(inputStream.readAllBytes()).thenReturn("ZippedValue".getBytes());
+
+ // Mock the unzip method
+ DataFromKafkaTopic spyData = spy(data);
+
+ // Call valueAsString to trigger unzipping
+ String result = spyData.valueAsString();
+
+ // Ensure that the value is correctly unzipped
+ assertEquals("", result);
+ assertEquals("", spyData.getStringValue());
+ assertTrue(spyData.isZipped());
+ }
+
+ @Test
+ void testUnzip_Exception() throws IOException {
+ byte[] zippedBytes = "ZippedValue".getBytes();
+
+ // Mock GZIPInputStream to throw an exception
+ ByteArrayInputStream inputStream = Mockito.mock(ByteArrayInputStream.class);
+ when(inputStream.readAllBytes()).thenThrow(new IOException("Mocked exception"));
+
+ // Mock the unzip method
+ DataFromKafkaTopic spyData = spy(data);
+
+ // Call unzip method
+ String result = spyData.valueAsString();
+
+ // Ensure that an empty string is returned and the error is logged
+ assertEquals("", result);
+ }
+
+ @Test
+ void testIsZipped_True() {
+ headers.add(new Header() {
+ @Override
+ public String key() {
+ return DataFromKafkaTopic.ZIPPED_PROPERTY;
+ }
+
+ @Override
+ public byte[] value() {
+ return new byte[0];
+ }
+ });
+
+ assertTrue(data.isZipped());
+ }
+
+ @Test
+ void testIsZipped_False() {
+ assertFalse(data.isZipped());
+ }
+
+ @Test
+ void testGetTypeIdFromHeaders() {
+ headers.add(new Header() {
+ @Override
+ public String key() {
+ return DataFromKafkaTopic.TYPE_ID_PROPERTY;
+ }
+
+ @Override
+ public byte[] value() {
+ return "Type123".getBytes();
+ }
+ });
+
+ assertEquals("Type123", data.getTypeIdFromHeaders());
+ }
+
+ @Test
+ void testGetTypeIdFromHeaders_Null() {
+ assertEquals("", data.getTypeIdFromHeaders());
+ }
+}
+
package org.oran.pmlog;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Path;
}
- // Store PM data for 24 hours in influx. The the data contains genenerated
+ // Store PM data for 24 hours in influx. The data contains genenerated
// counter values
// that varies over time.
+ @SuppressWarnings("java:S2699")
@Test
void testStoreReportsInflux() throws Exception {
final int NO_OF_OBJECTS = 24 * 4;
.map(str -> new DataFromKafkaTopic(null, null, str.getBytes()));
influxStore.start(input);
-
}
- @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
+ @SuppressWarnings({"squid:S2925", "java:S2699"}) // "Thread.sleep" should not be used in tests.
@Test
void sendPmReportsThroughKafka() throws Exception {
waitForKafkaListener();
var dataToSend = Flux.range(0, NO_OF_OBJECTS).map(i -> kafkaSenderRecord(pmReport(i, NO_OF_OBJECTS), "key"));
sendDataToKafka(dataToSend);
-
- Thread.sleep(1000 * 1000);
}
@Test
ConsumerJobInfo info = new ConsumerJobInfo("type", params, "owner");
String str = gson.toJson(info);
System.out.print(str);
- }
-
- @SuppressWarnings("squid:S2925") // "Thread.sleep" should not be used in tests.
- @Test
- void tet() throws Exception {
- Thread.sleep(1000 * 1000);
+ assertEquals("type", info.infoTypeId);
}
}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2023 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.pmlog.configuration;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.InjectMocks;
+import org.mockito.MockitoAnnotations;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.oran.pmlog.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
+import org.springframework.test.context.ContextConfiguration;
+
+@ContextConfiguration(classes = {ApplicationConfig.class})
+@ExtendWith(MockitoExtension.class)
+class ApplicationConfigTest {
+
+ @InjectMocks
+ private ApplicationConfig appConfig;
+
+ @BeforeEach
+ void setup() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ void testGetS3LocksBucket_WhenEmptyLocksBucket_ReturnsS3Bucket() {
+ injectFieldValue(appConfig, "influxBucket", "test-bucket");
+
+ String result = appConfig.getInfluxBucket();
+ assertEquals("test-bucket", result);
+ }
+
+ @Test
+ void testAddKafkaSecurityProps_UseOAuthToken() {
+ Map<String, Object> props = new HashMap<>();
+ injectFieldValue(appConfig, "useOathToken", true);
+ injectFieldValue(appConfig, "kafkaKeyStoreLocation", "key-store-location");
+ injectFieldValue(appConfig, "kafkTrustStoreLocation", "trust-store-location");
+ injectFieldValue(appConfig, "kafkaKeyStorePassword", "key-store-password");
+
+ appConfig.addKafkaSecurityProps(props);
+
+ assertEquals(SecurityProtocol.SASL_SSL.name, props.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ assertEquals("OAUTHBEARER", props.get(SaslConfigs.SASL_MECHANISM));
+ assertEquals(OAuthKafkaAuthenticateLoginCallbackHandler.class.getName(),
+ props.get(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS));
+ assertEquals(
+ "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ",
+ props.get(SaslConfigs.SASL_JAAS_CONFIG));
+ }
+
+ @Test
+ void testAddKafkaSecurityProps_SslConfig() {
+ Map<String, Object> props = new HashMap<>();
+ injectFieldValue(appConfig, "useOathToken", false);
+ injectFieldValue(appConfig, "kafkaKeyStoreLocation", "key-store-location");
+ injectFieldValue(appConfig, "kafkaKeyStoreType", "JKS");
+ injectFieldValue(appConfig, "kafkaKeyStorePassword", "key-store-password");
+ injectFieldValue(appConfig, "kafkTrustStoreLocation", "trust-store-location");
+ injectFieldValue(appConfig, "kafkaTrustStoreType", "JKS");
+
+ appConfig.addKafkaSecurityProps(props);
+
+ assertEquals(SecurityProtocol.SASL_SSL.name, props.get(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
+ assertEquals("JKS", props.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG));
+ assertEquals("key-store-location", props.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
+ assertEquals("key-store-password", props.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
+ assertEquals("JKS", props.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG));
+ assertEquals("trust-store-location", props.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+ }
+
+ private void injectFieldValue(Object target, String fieldName, Object value) {
+ try {
+ Field field = target.getClass().getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(target, value);
+ } catch (NoSuchFieldException | IllegalAccessException e) {
+ e.printStackTrace();
+ }
+ }
+}
+
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2023 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.pmlog.configuration;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import com.google.gson.Gson;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import org.junit.jupiter.api.Test;
+
+class ConsumerJobInfoTest {
+
+ @Test
+ void testConstructorAndGetters() {
+ // Create test data
+ String infoTypeId = "123";
+ String owner = "John";
+ ConsumerJobInfo.PmFilterData filterData = new ConsumerJobInfo.PmFilterData();
+ ConsumerJobInfo.KafkaDeliveryInfo deliveryInfo = new ConsumerJobInfo.KafkaDeliveryInfo("topic", "servers");
+
+ // Create a ConsumerJobInfo instance
+ ConsumerJobInfo jobInfo = new ConsumerJobInfo(infoTypeId, new ConsumerJobInfo.PmJobParameters("pmdata", filterData, deliveryInfo), owner);
+
+ // Test constructor and getters
+ assertEquals(infoTypeId, jobInfo.getInfoTypeId());
+ assertEquals(owner, jobInfo.getOwner());
+ assertNotNull(jobInfo.getJobDefinition());
+
+ // Test jobDefinition getters
+ assertEquals("pmdata", jobInfo.getJobDefinition().getFilterType());
+ assertEquals(filterData, jobInfo.getJobDefinition().getFilter());
+ assertEquals(deliveryInfo, jobInfo.getJobDefinition().getDeliveryInfo());
+ }
+
+ @Test
+ void testSerializationDeserialization() {
+ // Create test data
+ String infoTypeId = "123";
+ String owner = "John";
+ ConsumerJobInfo.PmFilterData filterData = new ConsumerJobInfo.PmFilterData();
+ ConsumerJobInfo.KafkaDeliveryInfo deliveryInfo = new ConsumerJobInfo.KafkaDeliveryInfo("topic", "servers");
+
+ // Create a ConsumerJobInfo instance
+ ConsumerJobInfo jobInfo = new ConsumerJobInfo(infoTypeId, new ConsumerJobInfo.PmJobParameters("pmdata", filterData, deliveryInfo), owner);
+
+
+ // Serialize to JSON
+ Gson gson = new Gson();
+ String json = gson.toJson(jobInfo);
+
+ // Deserialize from JSON
+ ConsumerJobInfo deserializedJobInfo = gson.fromJson(json, ConsumerJobInfo.class);
+
+ // Verify deserialized object
+ assertEquals(jobInfo.getInfoTypeId(), deserializedJobInfo.getInfoTypeId());
+ assertEquals(jobInfo.getOwner(), deserializedJobInfo.getOwner());
+ assertNotNull(deserializedJobInfo.getJobDefinition());
+ }
+
+ @Test
+ void testMeasTypeSpec() {
+ ConsumerJobInfo.PmFilterData.MeasTypeSpec measTypeSpec = new ConsumerJobInfo.PmFilterData.MeasTypeSpec();
+ measTypeSpec.setMeasuredObjClass("Class1");
+
+ Set<String> measTypes = new HashSet<>();
+ measTypes.add("Type1");
+ measTypes.add("Type2");
+ measTypeSpec.getMeasTypes().addAll(measTypes);
+
+ assertThat(measTypeSpec.getMeasuredObjClass()).isEqualTo("Class1");
+ assertThat(measTypeSpec.getMeasTypes()).containsExactlyInAnyOrder("Type1", "Type2");
+ }
+
+ @Test
+ void testKafkaDeliveryInfo() {
+ ConsumerJobInfo.KafkaDeliveryInfo kafkaDeliveryInfo = ConsumerJobInfo.KafkaDeliveryInfo.builder()
+ .topic("TestTopic")
+ .bootStrapServers("localhost:9092")
+ .build();
+
+ assertThat(kafkaDeliveryInfo.getTopic()).isEqualTo("TestTopic");
+ assertThat(kafkaDeliveryInfo.getBootStrapServers()).isEqualTo("localhost:9092");
+ }
+
+ @Test
+ void testPmFilterData() {
+ ConsumerJobInfo.PmFilterData pmFilterData = new ConsumerJobInfo.PmFilterData();
+
+ // Test sourceNames
+ Set<String> sourceNames = new HashSet<>(Arrays.asList("Source1", "Source2"));
+ pmFilterData.sourceNames.addAll(sourceNames);
+ assertThat(pmFilterData.sourceNames).containsExactlyInAnyOrder("Source1", "Source2");
+
+ // Test measObjInstIds
+ Set<String> measObjInstIds = new HashSet<>(Arrays.asList("Id1", "Id2"));
+ pmFilterData.measObjInstIds.addAll(measObjInstIds);
+ assertThat(pmFilterData.measObjInstIds).containsExactlyInAnyOrder("Id1", "Id2");
+
+ // Test measTypeSpecs
+ ConsumerJobInfo.PmFilterData.MeasTypeSpec measTypeSpec1 = new ConsumerJobInfo.PmFilterData.MeasTypeSpec();
+ measTypeSpec1.setMeasuredObjClass("Class1");
+ measTypeSpec1.getMeasTypes().addAll(Arrays.asList("Type1", "Type2"));
+
+ ConsumerJobInfo.PmFilterData.MeasTypeSpec measTypeSpec2 = new ConsumerJobInfo.PmFilterData.MeasTypeSpec();
+ measTypeSpec2.setMeasuredObjClass("Class2");
+ measTypeSpec2.getMeasTypes().addAll(Arrays.asList("Type3", "Type4"));
+
+ pmFilterData.measTypeSpecs.add(measTypeSpec1);
+ pmFilterData.measTypeSpecs.add(measTypeSpec2);
+
+ assertThat(pmFilterData.measTypeSpecs).hasSize(2);
+ assertThat(pmFilterData.measTypeSpecs).extracting("measuredObjClass").containsExactly("Class1", "Class2");
+
+ // Test measuredEntityDns
+ Set<String> measuredEntityDns = new HashSet<>(Arrays.asList("Entity1", "Entity2"));
+ pmFilterData.measuredEntityDns.addAll(measuredEntityDns);
+ assertThat(pmFilterData.measuredEntityDns).containsExactlyInAnyOrder("Entity1", "Entity2");
+ }
+}
+
--- /dev/null
+package org.oran.pmlog.oauth2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.oran.pmlog.exceptions.ServiceException;
+import org.springframework.test.context.ContextConfiguration;
+
+@ContextConfiguration(classes = {OAuthBearerTokenJwtTest.class})
+@ExtendWith(MockitoExtension.class)
+class OAuthBearerTokenJwtTest {
+
+ private OAuthBearerTokenJwt token;
+
+ @BeforeEach
+ void setUp() throws ServiceException, JsonProcessingException {
+ String validJwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"; // Replace with a valid JWT token for testing
+ token = OAuthBearerTokenJwt.create(validJwt);
+ }
+
+ @Test
+ void testCreateValidToken() {
+ assertNotNull(token);
+ }
+
+ @Test
+ void testCreateInvalidToken() {
+ assertThrows(ServiceException.class, () -> OAuthBearerTokenJwt.create("invalid_token"));
+ }
+
+ @Test
+ void testTokenValue() {
+ assertEquals("eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c", token.value());
+ }
+
+ @Test
+ void testTokenScope() {
+ assertEquals(0, token.scope().size());
+ assertFalse(token.scope().contains(""));
+ }
+
+ @Test
+ void testTokenLifetimeMs() {
+ assertEquals(Long.MAX_VALUE, token.lifetimeMs());
+ }
+
+ @Test
+ void testTokenPrincipalName() {
+ assertEquals("1234567890", token.principalName());
+ }
+
+ @Test
+ void testTokenStartTimeMs() {
+ assertEquals(1516239022L, token.startTimeMs());
+ }
+
+ @Test
+ void testCreateTokenFromInvalidPayload() throws ServiceException {
+ // Create a JWT with an invalid payload (missing fields)
+ String invalidPayload = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9";
+ assertThrows(ServiceException.class, () -> OAuthBearerTokenJwt.create(invalidPayload));
+ }
+
+ @Test
+ void testCreateTokenWithValidPayload() throws ServiceException, JsonProcessingException {
+ // Create a JWT with a valid payload
+ String validPayload = "eyJzdWIiOiAiVGVzdCIsICJleHAiOiAxNjM1MTUwMDAwLCAiaWF0IjogMTYzNTA5NTAwMCwgInNjb3BlIjogInNjb3BlX3Rva2VuIiwgImp0aSI6ICJmb28ifQ==";
+ OAuthBearerTokenJwt jwt = OAuthBearerTokenJwt.create("header." + validPayload + ".signature");
+
+ assertNotNull(jwt);
+ assertEquals("header." + validPayload + ".signature", jwt.value());
+ assertEquals(1, jwt.scope().size());
+ assertEquals("scope_token", jwt.scope().iterator().next());
+ assertEquals("Test", jwt.principalName());
+ assertEquals(1635095000, jwt.startTimeMs());
+ }
+
+ @Test
+ void testCreateThrowsExceptionWithInvalidToken() throws ServiceException {
+ String tokenRaw = "your_mocked_token_here";
+ assertThrows(ServiceException.class, () -> OAuthBearerTokenJwt.create(tokenRaw));
+ }
+}
+
--- /dev/null
+package org.oran.pmlog.oauth2;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+class OAuthKafkaAuthenticateLoginCallbackHandlerTest {
+
+ private OAuthKafkaAuthenticateLoginCallbackHandler callbackHandler;
+
+ @BeforeEach
+ void setUp() {
+ callbackHandler = new OAuthKafkaAuthenticateLoginCallbackHandler();
+ }
+
+ @Test
+ void testConfigureWithValidSaslMechanismAndConfigEntry() {
+ String saslMechanism = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+ List<AppConfigurationEntry> jaasConfigEntries = Collections.singletonList(Mockito.mock(AppConfigurationEntry.class));
+
+ callbackHandler.configure(new HashMap<>(), saslMechanism, jaasConfigEntries);
+
+ assertTrue(callbackHandler.isConfigured());
+ }
+
+ @SuppressWarnings("java:S5778")
+ @Test
+ void testConfigureWithInvalidSaslMechanism() {
+ String invalidSaslMechanism = "InvalidMechanism";
+ List<AppConfigurationEntry> jaasConfigEntries = Collections.singletonList(Mockito.mock(AppConfigurationEntry.class));
+
+ assertThrows(IllegalArgumentException.class, () -> callbackHandler.configure(new HashMap<>(), invalidSaslMechanism, jaasConfigEntries));
+
+ assertFalse(callbackHandler.isConfigured());
+ }
+
+ @SuppressWarnings("java:S5778")
+ @Test
+ void testConfigureWithEmptyJaasConfigEntries() {
+ String saslMechanism = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+ List<AppConfigurationEntry> emptyJaasConfigEntries = Collections.emptyList();
+
+ assertThrows(IllegalArgumentException.class, () -> callbackHandler.configure(new HashMap<>(), saslMechanism, emptyJaasConfigEntries));
+
+ assertFalse(callbackHandler.isConfigured());
+ }
+
+ @Test
+ void testHandleSaslExtensionsCallback() throws IOException, UnsupportedCallbackException {
+ String saslMechanism = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+ List<AppConfigurationEntry> jaasConfigEntries = Collections.singletonList(Mockito.mock(AppConfigurationEntry.class));
+
+ callbackHandler.configure(new HashMap<>(), saslMechanism, jaasConfigEntries);
+ SaslExtensionsCallback callback = mock(SaslExtensionsCallback.class);
+
+ callbackHandler.handle(new Callback[]{callback});
+ verify(callback).extensions(any());
+ }
+
+ @Test
+ void testHandleUnsupportedCallback() {
+ Callback unsupportedCallback = mock(Callback.class);
+ String saslMechanism = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+ List<AppConfigurationEntry> jaasConfigEntries = Collections.singletonList(Mockito.mock(AppConfigurationEntry.class));
+
+ callbackHandler.configure(new HashMap<>(), saslMechanism, jaasConfigEntries);
+ assertThrows(UnsupportedCallbackException.class, () -> callbackHandler.handle(new Callback[]{unsupportedCallback}));
+ }
+
+ @Test
+ void testHandleOAuthBearerTokenCallback() throws IOException, UnsupportedCallbackException {
+
+ String saslMechanism = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
+ List<AppConfigurationEntry> jaasConfigEntries = Collections.singletonList(Mockito.mock(AppConfigurationEntry.class));
+ String validJwt = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c";
+
+ callbackHandler.configure(new HashMap<>(), saslMechanism, jaasConfigEntries);
+
+ OAuthBearerTokenCallback oauthBearerTokenCallback = Mockito.mock(OAuthBearerTokenCallback.class);
+ SecurityContext securityContextMock = Mockito.mock(SecurityContext.class);
+ when(oauthBearerTokenCallback.token()).thenReturn(null); // Ensure the callback has no token initially
+ when(oauthBearerTokenCallback.token()).thenAnswer(invocation -> {
+ return OAuthBearerTokenJwt.create(validJwt);
+ });
+
+ when(securityContextMock.getBearerAuthToken()).thenReturn(validJwt);
+ callbackHandler.handle(new Callback[]{oauthBearerTokenCallback});
+ verify(oauthBearerTokenCallback).token();
+ }
+}
+
--- /dev/null
+package org.oran.pmlog.oauth2;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.MockitoAnnotations;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+@ExtendWith(MockitoExtension.class)
+class SecurityContextTest {
+
+ @BeforeEach
+ void setUp() {
+ MockitoAnnotations.initMocks(this);
+ }
+
+ @Test
+ void testConstructorWithAuthTokenFilename() {
+ SecurityContext securityContext = new SecurityContext("auth-token-file.txt");
+ assertNotNull(securityContext.getAuthTokenFilePath());
+ assertEquals(Path.of("auth-token-file.txt"), securityContext.getAuthTokenFilePath());
+ }
+
+ @Test
+ void testConstructorWithoutAuthTokenFilename() {
+ SecurityContext securityContext = new SecurityContext("");
+ assertNull(securityContext.getAuthTokenFilePath());
+ }
+
+ @Test
+ void testIsConfigured() {
+ SecurityContext securityContext = new SecurityContext("auth-token-file.txt");
+ assertTrue(securityContext.isConfigured());
+ }
+
+ @Test
+ void testIsNotConfigured() {
+ SecurityContext securityContext = new SecurityContext("");
+ assertFalse(securityContext.isConfigured());
+ }
+
+ @Test
+ void testGetBearerAuthToken() {
+ assertEquals("", SecurityContext.getInstance().getBearerAuthToken());
+ assertEquals("", (new SecurityContext("foo.txt")).getBearerAuthToken());
+ }
+
+ @Test
+ void testGetBearerAuthTokenWhenNotConfigured() {
+ SecurityContext securityContext = new SecurityContext("");
+ assertEquals("", securityContext.getBearerAuthToken());
+ }
+}
+