summary |
shortlog |
log |
commit | commitdiff |
review |
tree
raw |
patch |
inline | side by side (from parent 1:
083393d)
Decreased the sleep time when waiting for dmaap configuration to 10 seconds instead on 60.
Change-Id: I774c62d665efe84d249c486094de41168233c410
Issue-ID: NONRTRIC-217
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
15 files changed:
description: service
required: true
type: string
description: service
required: true
type: string
+ - name: transient
+ in: query
+ description: transient
+ required: false
+ type: boolean
+ default: false
- name: type
in: query
description: type
- name: type
in: query
description: type
@RequestParam(name = "id", required = true) String instanceId, //
@RequestParam(name = "ric", required = true) String ricName, //
@RequestParam(name = "service", required = true) String service, //
@RequestParam(name = "id", required = true) String instanceId, //
@RequestParam(name = "ric", required = true) String ricName, //
@RequestParam(name = "service", required = true) String service, //
+ @RequestParam(name = "transient", required = false, defaultValue = "false") boolean isTransient, //
@RequestBody Object jsonBody) {
String jsonString = gson.toJson(jsonBody);
@RequestBody Object jsonBody) {
String jsonString = gson.toJson(jsonBody);
.ric(ric) //
.ownerServiceName(service) //
.lastModified(getTimeStampUtc()) //
.ric(ric) //
.ownerServiceName(service) //
.lastModified(getTimeStampUtc()) //
+ .isTransient(isTransient) //
.build();
final boolean isCreate = this.policies.get(policy.id()) == null;
.build();
final boolean isCreate = this.policies.get(policy.id()) == null;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.exceptions.ServiceException;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.exceptions.ServiceException;
-import org.oransc.policyagent.tasks.RefreshConfigTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
- * Starts the consumer. If there is a DMaaP configuration, it will start polling for messages. Otherwise it will
- * check regularly for the configuration.
+ * Starts the consumer. If there is a DMaaP configuration, it will start polling
+ * for messages. Otherwise it will check regularly for the configuration.
*
* @return the running thread, for test purposes.
*/
public Thread start() {
*
* @return the running thread, for test purposes.
*/
public Thread start() {
- Thread thread = new Thread(this::checkConfigLoop);
+ Thread thread = new Thread(this::messageHandlingLoop);
thread.start();
return thread;
}
thread.start();
return thread;
}
- private void checkConfigLoop() {
- while (!isStopped()) {
- if (isDmaapConfigured()) {
- messageHandlingLoop();
- } else {
- sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
- }
- }
- }
-
private void messageHandlingLoop() {
private void messageHandlingLoop() {
- while (!isStopped() && isDmaapConfigured()) {
- Iterable<String> dmaapMsgs = fetchAllMessages();
- if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
- logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
- for (String msg : dmaapMsgs) {
- processMsg(msg);
+ if (isDmaapConfigured()) {
+ Iterable<String> dmaapMsgs = fetchAllMessages();
+ if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
+ logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
+ for (String msg : dmaapMsgs) {
+ processMsg(msg);
+ }
+ } else {
+ sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration
}
} catch (Exception e) {
logger.warn("Cannot fetch because of {}", e.getMessage());
}
} catch (Exception e) {
logger.warn("Cannot fetch because of {}", e.getMessage());
public PolicyType type();
public String lastModified();
public PolicyType type();
public String lastModified();
+
+ public boolean isTransient();
/**
* The time between refreshes of the configuration.
*/
/**
* The time between refreshes of the configuration.
*/
- public static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
+ static final Duration CONFIG_REFRESH_INTERVAL = Duration.ofMinutes(1);
final ApplicationConfig appConfig;
@Getter(AccessLevel.PROTECTED)
final ApplicationConfig appConfig;
@Getter(AccessLevel.PROTECTED)
.flatMapMany(notUsed -> Flux.just(policy));
}
.flatMapMany(notUsed -> Flux.just(policy));
}
+ private boolean checkTransient(Policy policy) {
+ if (policy.isTransient()) {
+ this.policies.remove(policy);
+ }
+ return policy.isTransient();
+ }
+
private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
return Flux.fromIterable(policies.getForRic(ric.name())) //
private Flux<Policy> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
return Flux.fromIterable(policies.getForRic(ric.name())) //
+ .filter(policy -> !checkTransient(policy)) //
.flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
}
.flatMap(policy -> putPolicy(policy, ric, a1Client), CONCURRENCY_RIC);
}
testErrorCode(restClient().get(url), HttpStatus.NOT_FOUND);
}
testErrorCode(restClient().get(url), HttpStatus.NOT_FOUND);
}
- private String putPolicyUrl(String serviceName, String ricName, String policyTypeName, String policyInstanceId) {
+ private String putPolicyUrl(String serviceName, String ricName, String policyTypeName, String policyInstanceId,
+ boolean isTransient) {
+ String url;
if (policyTypeName.isEmpty()) {
if (policyTypeName.isEmpty()) {
- return "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName;
+ url = "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName;
- return "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName + "&type="
+ url = "/policy?id=" + policyInstanceId + "&ric=" + ricName + "&service=" + serviceName + "&type="
+ if (isTransient) {
+ url += "&transient=true";
+ }
+ return url;
+ }
+
+ private String putPolicyUrl(String serviceName, String ricName, String policyTypeName, String policyInstanceId) {
+ return putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId, false);
putService(serviceName);
addPolicyType(policyTypeName, ricName);
putService(serviceName);
addPolicyType(policyTypeName, ricName);
- String url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId);
+ // PUT a transient policy
+ String url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId, true);
final String policyBody = jsonString();
this.rics.getRic(ricName).setState(Ric.RicState.AVAILABLE);
final String policyBody = jsonString();
this.rics.getRic(ricName).setState(Ric.RicState.AVAILABLE);
assertThat(policy.id()).isEqualTo(policyInstanceId);
assertThat(policy.ownerServiceName()).isEqualTo(serviceName);
assertThat(policy.ric().name()).isEqualTo("ric1");
assertThat(policy.id()).isEqualTo(policyInstanceId);
assertThat(policy.ownerServiceName()).isEqualTo(serviceName);
assertThat(policy.ric().name()).isEqualTo("ric1");
+ assertThat(policy.isTransient()).isEqualTo(true);
+
+ // Put a non transient policy
+ url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId);
+ restClient().put(url, policyBody).block();
+ policy = policies.getPolicy(policyInstanceId);
+ assertThat(policy.isTransient()).isEqualTo(false);
url = "/policies";
String rsp = restClient().get(url).block();
url = "/policies";
String rsp = restClient().get(url).block();
private Policy addPolicy(String id, String typeName, String service, String ric) throws ServiceException {
addRic(ric);
private Policy addPolicy(String id, String typeName, String service, String ric) throws ServiceException {
addRic(ric);
- Policy p = ImmutablePolicy.builder().id(id) //
+ Policy p = ImmutablePolicy.builder() //
+ .id(id) //
.json(jsonString()) //
.ownerServiceName(service) //
.ric(rics.getRic(ric)) //
.type(addPolicyType(typeName, ric)) //
.json(jsonString()) //
.ownerServiceName(service) //
.ric(rics.getRic(ric)) //
.type(addPolicyType(typeName, ric)) //
- .lastModified("lastModified").build();
+ .lastModified("lastModified") //
+ .isTransient(false) //
+ .build();
policies.put(p);
return p;
}
policies.put(p);
return p;
}
.ric(ric) //
.ownerServiceName("") //
.lastModified("") //
.ric(ric) //
.ownerServiceName("") //
.lastModified("") //
.ric(ric) //
.type(unnamedPolicyType) //
.lastModified("now") //
.ric(ric) //
.type(unnamedPolicyType) //
.lastModified("now") //
.build();
this.policies.put(policy);
}
.build();
this.policies.put(policy);
}
.ric(createRic(nearRtRicUrl)) //
.type(createPolicyType(type)) //
.lastModified("now") //
.ric(createRic(nearRtRicUrl)) //
.type(createPolicyType(type)) //
.lastModified("now") //
import org.onap.dmaap.mr.client.MRConsumer;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.onap.dmaap.mr.client.MRConsumer;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.tasks.RefreshConfigTask;
import org.oransc.policyagent.utils.LoggingUtils;
import org.springframework.http.HttpStatus;
import org.oransc.policyagent.utils.LoggingUtils;
import org.springframework.http.HttpStatus;
messageConsumerUnderTest.start().join();
InOrder orderVerifier = inOrder(messageConsumerUnderTest);
messageConsumerUnderTest.start().join();
InOrder orderVerifier = inOrder(messageConsumerUnderTest);
- orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+ orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
}
orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
}
InOrder orderVerifier = inOrder(messageConsumerUnderTest);
orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
InOrder orderVerifier = inOrder(messageConsumerUnderTest);
orderVerifier.verify(messageConsumerUnderTest).fetchAllMessages();
- orderVerifier.verify(messageConsumerUnderTest).sleep(RefreshConfigTask.CONFIG_REFRESH_INTERVAL);
+ orderVerifier.verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
response.setActualMessages(Collections.emptyList());
response.setResponseCode(Integer.toString(HttpStatus.OK.value()));
response.setActualMessages(Collections.emptyList());
- doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
.getMessageRouterConsumer(any(Properties.class));
when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
.getMessageRouterConsumer(any(Properties.class));
when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response);
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class));
- doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
.getMessageRouterConsumer(any(Properties.class));
doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
.getMessageRouterConsumer(any(Properties.class));
setUpMrConfig();
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
setUpMrConfig();
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- doReturn(false, false, true).when(messageConsumerUnderTest).isStopped();
+ doReturn(false, true).when(messageConsumerUnderTest).isStopped();
doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
.getMessageRouterConsumer(any(Properties.class));
doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest)
.getMessageRouterConsumer(any(Properties.class));
.ric(ric) //
.json("{}") //
.ownerServiceName("ownerServiceName") //
.ric(ric) //
.json("{}") //
.ownerServiceName("ownerServiceName") //
.build();
return policy;
}
.build();
return policy;
}
.ric(RIC_1) //
.type(POLICY_TYPE_1) //
.lastModified("now") //
.ric(RIC_1) //
.type(POLICY_TYPE_1) //
.lastModified("now") //
.build();
private static final Policy POLICY_2 = ImmutablePolicy.builder() //
.build();
private static final Policy POLICY_2 = ImmutablePolicy.builder() //
.ric(RIC_1) //
.type(POLICY_TYPE_1) //
.lastModified("now") //
.ric(RIC_1) //
.type(POLICY_TYPE_1) //
.lastModified("now") //
.controllerName("controllerName") //
.build());
.controllerName("controllerName") //
.build());
- private static final Policy POLICY_1 = ImmutablePolicy.builder() //
- .id("policyId1") //
- .json("") //
- .ownerServiceName("service") //
- .ric(RIC_1) //
- .type(POLICY_TYPE_1) //
- .lastModified("now") //
- .build();
+ private static Policy createPolicy(boolean isTransient) {
+ return ImmutablePolicy.builder() //
+ .id("policyId1") //
+ .json("") //
+ .ownerServiceName("service") //
+ .ric(RIC_1) //
+ .type(POLICY_TYPE_1) //
+ .lastModified("now") //
+ .isTransient(isTransient) //
+ .build();
+ }
+
+ private static final Policy POLICY_1 = createPolicy(false);
private static final String SERVICE_1_NAME = "service1";
private static final String SERVICE_1_CALLBACK_URL = "callbackUrl";
private static final String SERVICE_1_NAME = "service1";
private static final String SERVICE_1_CALLBACK_URL = "callbackUrl";
public void ricIdleAndHavePolicies_thenSynchronizationWithRecreationOfPolicies() {
RIC_1.setState(RicState.AVAILABLE);
public void ricIdleAndHavePolicies_thenSynchronizationWithRecreationOfPolicies() {
RIC_1.setState(RicState.AVAILABLE);
+ Policy transientPolicy = createPolicy(true);
+
+ policies.put(transientPolicy);
policies.put(POLICY_1);
setUpCreationOfA1Client();
policies.put(POLICY_1);
setUpCreationOfA1Client();
verifyNoMoreInteractions(a1ClientMock);
assertThat(policyTypes.size()).isEqualTo(0);
verifyNoMoreInteractions(a1ClientMock);
assertThat(policyTypes.size()).isEqualTo(0);
- assertThat(policies.size()).isEqualTo(1);
+ assertThat(policies.size()).isEqualTo(1); // The transient policy shall be deleted
assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE);
}
assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE);
}
.ric(ric) //
.type(policyType) //
.lastModified("lastModified") //
.ric(ric) //
.type(policyType) //
.lastModified("lastModified") //