X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=o2ims%2Fadapter%2Falarm_repository.py;h=ebfe81661e2a2054f978e1ac940be14ed11faf4f;hb=e00040ca7c6149995c5883bcd9dd81e02b25d0da;hp=ef20e6a51aa6a639cc6212b9f1cadbafa25cf486;hpb=d2f6cc674bf3623caf114a8d7709e70d55ec9340;p=pti%2Fo2.git diff --git a/o2ims/adapter/alarm_repository.py b/o2ims/adapter/alarm_repository.py index ef20e6a..ebfe816 100644 --- a/o2ims/adapter/alarm_repository.py +++ b/o2ims/adapter/alarm_repository.py @@ -12,12 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List +from typing import List, Tuple from o2ims.domain import alarm_obj from o2ims.domain.alarm_repo import AlarmDefinitionRepository, \ AlarmEventRecordRepository, AlarmSubscriptionRepository, \ - AlarmProbableCauseRepository + AlarmProbableCauseRepository, AlarmDictionaryRepository from o2common.helper import o2logging logger = o2logging.get_logger(__name__) @@ -34,11 +34,20 @@ class AlarmEventRecordSqlAlchemyRepository(AlarmEventRecordRepository): return self.session.query(alarm_obj.AlarmEventRecord).filter_by( alarmEventRecordId=alarm_event_record_id).first() - def _list(self) -> List[alarm_obj.AlarmEventRecord]: - return self.session.query(alarm_obj.AlarmEventRecord) + def _list(self, *args, **kwargs) -> Tuple[ + int, List[alarm_obj.AlarmEventRecord]]: + size = kwargs.pop('limit') if 'limit' in kwargs else None + offset = kwargs.pop('start') if 'start' in kwargs else 0 + + result = self.session.query(alarm_obj.AlarmEventRecord).filter( + *args).order_by('alarmEventRecordId') + count = result.count() + if size is not None and size != -1: + return (count, result.limit(size).offset(offset)) + return (count, result) def _update(self, alarm_event_record: alarm_obj.AlarmEventRecord): - self.session.add(alarm_event_record) + self.session.merge(alarm_event_record) def _delete(self, alarm_event_record_id): self.session.query(alarm_obj.AlarmEventRecord).filter_by( @@ -68,6 +77,26 @@ class AlarmDefinitionSqlAlchemyRepository(AlarmDefinitionRepository): alarmDefinitionId=alarm_definition_id).delete() +class AlarmDictionarySqlAlchemyRepository(AlarmDictionaryRepository): + def __init__(self, session): + super().__init__() + self.session = session + + def _add(self, alarm_dict: alarm_obj.AlarmDictionary): + self.session.add(alarm_dict) + + def _get(self, dictionary_id) -> alarm_obj.AlarmDictionary: + return self.session.query(alarm_obj.AlarmDictionary).filter_by( + id=dictionary_id).first() + + def _list(self) -> List[alarm_obj.AlarmDictionary]: + return self.session.query(alarm_obj.AlarmDictionary) + + def _delete(self, dictionary_id): + self.session.query(alarm_obj.AlarmDictionary).filter_by( + id=dictionary_id).delete() + + class AlarmSubscriptionSqlAlchemyRepository(AlarmSubscriptionRepository): def __init__(self, session): super().__init__() @@ -80,8 +109,17 @@ class AlarmSubscriptionSqlAlchemyRepository(AlarmSubscriptionRepository): return self.session.query(alarm_obj.AlarmSubscription).filter_by( alarmSubscriptionId=subscription_id).first() - def _list(self) -> List[alarm_obj.AlarmSubscription]: - return self.session.query(alarm_obj.AlarmSubscription) + def _list(self, *args, **kwargs) -> Tuple[ + int, List[alarm_obj.AlarmSubscription]]: + size = kwargs.pop('limit') if 'limit' in kwargs else None + offset = kwargs.pop('start') if 'start' in kwargs else 0 + + result = self.session.query(alarm_obj.AlarmSubscription).filter( + *args).order_by('alarmSubscriptionId') + count = result.count() + if size is not None and size != -1: + return (count, result.limit(size).offset(offset)) + return (count, result) def _update(self, subscription: alarm_obj.AlarmSubscription): self.session.add(subscription)