+\r
+class FakeStxObjRepo(StxObjectRepository):\r
+ def __init__(self):\r
+ super().__init__()\r
+ self.oclouds = []\r
+\r
+ def _add(self, ocloud: ocloud.Ocloud):\r
+ self.oclouds.append(ocloud)\r
+\r
+ def _get(self, ocloudid) -> ocloud.Ocloud:\r
+ filtered = [o for o in self.oclouds if o.id == ocloudid]\r
+ return filtered.pop()\r
+\r
+ def _list(self, type: ResourceTypeEnum) -> List[ocloud.Ocloud]:\r
+ return [x for x in self.oclouds]\r
+\r
+ def _update(self, ocloud: ocloud.Ocloud):\r
+ filtered = [o for o in self.oclouds if o.id == ocloud.id]\r
+ assert len(filtered) == 1\r
+ ocloud1 = filtered.pop()\r
+ ocloud1.update_by(ocloud)\r
+\r
+\r
+class FakeUnitOfWork(AbstractUnitOfWork):\r
+ def __init__(self):\r
+ pass\r
+\r
+ def __enter__(self):\r
+ # self.session = self.session_factory() # type: Session\r
+ # self.oclouds = OcloudSqlAlchemyRepository(self.session)\r
+ self.stxobjects = FakeStxObjRepo()\r
+ return super().__enter__()\r
+\r
+ def __exit__(self, *args):\r
+ super().__exit__(*args)\r
+ # self.session.close()\r
+\r
+ def _commit(self):\r
+ pass\r
+ # self.session.commit()\r
+\r
+ def rollback(self):\r
+ pass\r
+ # self.session.rollback()\r
+\r
+ def collect_new_events(self):\r
+ yield\r
+ # return super().collect_new_events()\r
+\r
+\r
+def create_fake_bus(uow):\r
+ def update_ocloud(\r
+ cmd: commands.UpdateOCloud,\r
+ uow: AbstractUnitOfWork,\r
+ publish: Callable):\r
+ return\r
+\r
+ fakeuow = FakeUnitOfWork()\r
+ handlers.EVENT_HANDLERS = {}\r
+ handlers.COMMAND_HANDLERS = {\r
+ commands.UpdateOCloud: update_ocloud,\r
+ }\r
+ bus = bootstrap.bootstrap(False, fakeuow)\r
+ return bus\r
+\r
+\r