Skip to content

Commit

Permalink
update transaction middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
pgorecki committed Jul 6, 2023
1 parent 5bade23 commit c71b5af
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 41 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@ jobs:
uses: actions/setup-python@v3
with:
python-version: "3.10"
- uses: Gr1N/setup-poetry@v7
- name: Install Poetry
run: |
pip install poetry
poetry self update
- name: Install dependencies
run: poetry install
- name: Run all tests
Expand Down
14 changes: 13 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ bcrypt = "^4.0.1"
poethepoet = "^0.10.0"
pytest-cov = "^2.12.1"
mypy = "^0.910"
vulture = "^2.7"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
2 changes: 1 addition & 1 deletion src/api/tests/test_bidding.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def setup_app_for_bidding_tests(app, listing_id, seller_id, bidder_id):
)
)
app.execute_command(PublishListingDraftCommand(listing_id=listing_id))
logger.info("setup complete")
logger.info("test setup complete")


@pytest.mark.integration
Expand Down
23 changes: 17 additions & 6 deletions src/config/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,25 +108,36 @@ def on_enter_transaction_context(ctx):
db_session=session, correlation_id=correlation_id, logger=logger
)
ctx.dependency_provider = IocProvider(transaction_container)
logger.info(f"session={id(session)} transaction started")
logger.info(f"{id(session)} transaction started")

@application.on_exit_transaction_context
def on_exit_transaction_context(ctx, exc_type, exc_val, exc_tb):
session = ctx.dependency_provider.get_dependency("db_session")
if exc_type:
session.rollback()
logger.info(f"session={id(session)} rollback due to {exc_type}")
logger.info(f"{id(session)} rollback due to {exc_type}")
else:
session.commit()
logger.info(f"session={id(session)} committed")
logger.info(f"{id(session)} committed")
session.close()
logger.info(f"session={id(session)} transaction ended ")
logger.info(f"{id(session)} transaction ended ")

@application.transaction_middleware
def logging_middleware(ctx, next):
def logging_middleware(ctx, next, command=None, query=None, event=None):
if command:
prefix = "Executing"
task = command
elif query:
prefix = "Querying"
task = query
elif event:
prefix = "Handling"
task = event
task = command or query or event
session = ctx.dependency_provider.get_dependency("db_session")
logger.info(f"{id(session)} {prefix} {task}")
result = next()
logger.info(f"Task {ctx.task} executed successfully")
logger.info(f"{id(session)} {prefix} completed, result: {result}")
return result

return application
Expand Down
68 changes: 38 additions & 30 deletions src/seedwork/application/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ def __exit__(self, exc_type, exc_val, exc_tb):
"""Should be used to commit/end a transaction"""
self.app._on_exit_transaction_context(self, exc_type, exc_val, exc_tb)

def _wrap_with_middlewares(
self, handler_func, command=None, query=None, event=None
):
p = handler_func
for middleware in self.app._transaction_middlewares:
p = partial(middleware, self, p, command, query, event)
return p

def execute_query(self, query):
assert (
self.task is None
Expand All @@ -119,13 +127,9 @@ def execute_query(self, query):
handler_kwargs = self.dependency_provider.get_handler_kwargs(
handler_func, **self.overrides
)

# prepare query handler
p = partial(handler_func, query, **handler_kwargs)

for middleware in self.app._transaction_middlewares:
p = partial(middleware, self, p)
result = p()
handler_func = partial(handler_func, query, **handler_kwargs)
wrapped_handler = self._wrap_with_middlewares(handler_func, query=query)
result = wrapped_handler()
return result

def execute_command(self, command):
Expand All @@ -138,39 +142,24 @@ def execute_command(self, command):
handler_kwargs = self.dependency_provider.get_handler_kwargs(
handler_func, **self.overrides
)

# prepare command handler
p = partial(handler_func, command, **handler_kwargs)

# wrap command handler in middlewares
handler_kwargs = self.dependency_provider.get_handler_kwargs(
handler_func, **self.overrides
)
p = partial(handler_func, command, **handler_kwargs)
for middleware in self.app._transaction_middlewares:
p = partial(middleware, self, p)
handler_func = partial(handler_func, command, **handler_kwargs)
wrapped_handler = self._wrap_with_middlewares(handler_func, command=command)

# execute wrapped command handler
command_result = p()
command_result = wrapped_handler()

self.next_commands = []
self.integration_events = []
event_queue = command_result.events.copy()
while len(event_queue) > 0:
event = event_queue.pop(0)
if isinstance(event, IntegrationEvent):
self.integration_events.append(result)
self._process_integration_event(event)

elif isinstance(event, DomainEvent):
for event_handler in self.app.get_event_handlers(event):
handler_kwargs = self.dependency_provider.get_handler_kwargs(
event_handler, **self.overrides
)
logger.info(f"handling event {event} with {event_handler}")
result = event_handler(event, **handler_kwargs)
if isinstance(result, Command):
self.next_commands.append(result)
elif isinstance(result, EventResult):
event_queue.extend(result.events)
new_command, new_events = self._process_domain_event(event)
self.next_commands.extend(new_command)
event_queue.extend(new_events)

return CommandResult.success(payload=command_result.payload)

Expand All @@ -181,6 +170,25 @@ def get_service(self, service_cls):
def current_user(self):
return self.dependency_provider.get_dependency("current_user")

def _process_integration_event(self, event):
self.integration_events.append(event)

def _process_domain_event(self, event):
new_commands = []
new_events = []
for handler_func in self.app.get_event_handlers(event):
handler_kwargs = self.dependency_provider.get_handler_kwargs(
handler_func, **self.overrides
)
event_handler = partial(handler_func, event, **handler_kwargs)
wrapped_handler = self._wrap_with_middlewares(event_handler, event=event)
result = wrapped_handler()
if isinstance(result, Command):
new_commands.append(result)
elif isinstance(result, EventResult):
new_events.extend(result.events)
return new_commands, new_events


class ApplicationModule:
def __init__(self, name, version=1.0):
Expand Down
4 changes: 2 additions & 2 deletions src/seedwork/tests/application/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ def test_transaction_context_middleware():
app = Application(trace=[])

@app.transaction_middleware
def middleware1(ctx, next):
def middleware1(ctx, next, command=None, query=None, event=None):
ctx.dependency_provider["trace"].append("middleware1")
return next()

@app.transaction_middleware
def middleware1(ctx, next):
def middleware1(ctx, next, command=None, query=None, event=None):
ctx.dependency_provider["trace"].append("middleware2")
return next()

Expand Down

0 comments on commit c71b5af

Please sign in to comment.