Skip to content

Commit

Permalink
Use select_for_update when creating Companny objs in Hermes and Org…
Browse files Browse the repository at this point in the history
…s in PD (#294)

Co-authored-by: Sebastian Prentice <[email protected]>
  • Loading branch information
tomhamiltonstubber and PrenSJ2 authored Oct 16, 2024
1 parent a0abb37 commit 139d5d3
Show file tree
Hide file tree
Showing 10 changed files with 109 additions and 77 deletions.
8 changes: 6 additions & 2 deletions app/callbooker/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fastapi import APIRouter, Header, HTTPException
from starlette.background import BackgroundTasks
from starlette.responses import JSONResponse
from tortoise.exceptions import DoesNotExist

from app.callbooker._availability import get_admin_available_slots
from app.callbooker._process import (
Expand Down Expand Up @@ -81,8 +82,11 @@ async def generate_support_link(tc2_admin_id: int, tc2_cligency_id: int, Authori
if get_bearer(Authorization) != settings.tc2_api_key:
raise HTTPException(status_code=403, detail='Unauthorized key')

admin = await Admin.get(tc2_admin_id=tc2_admin_id)
company = await get_or_create_company(tc2_cligency_id=tc2_cligency_id)
try:
admin = await Admin.get(tc2_admin_id=tc2_admin_id)
company = await get_or_create_company(tc2_cligency_id=tc2_cligency_id)
except DoesNotExist:
return JSONResponse({'status': 'error', 'message': 'Admin or Company not found'}, status_code=404)

expiry = datetime.now() + timedelta(days=settings.support_ttl_days)
kwargs = {'admin_id': admin.id, 'company_id': company.id, 'e': int(expiry.timestamp())}
Expand Down
81 changes: 39 additions & 42 deletions app/main.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
import logging.config
import os
from contextlib import asynccontextmanager

import logfire
import sentry_sdk
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi_admin.app import app as admin_app
from logfire import PydanticPlugin
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from starlette.middleware.cors import CORSMiddleware
from tortoise.contrib.fastapi import register_tortoise
from tortoise.contrib.fastapi import RegisterTortoise

from app.admin import resources, views # noqa: F401
from app.admin.auth import AuthProvider
Expand All @@ -28,25 +28,8 @@
dsn=_app_settings.sentry_dsn,
)

app = FastAPI()

allowed_origins = ['https://tutorcruncher.com', 'http://localhost:3000']
if _app_settings.dev_mode:
allowed_origins = ['*']
app.add_middleware(CORSMiddleware, allow_origins=allowed_origins, allow_methods=['*'], allow_headers=['*'])
if bool(_app_settings.logfire_token):
logfire.instrument_fastapi(app)
logfire.configure(
send_to_logfire=True,
token=_app_settings.logfire_token,
pydantic_plugin=PydanticPlugin(record='all'),
)

FastAPIInstrumentor.instrument_app(app)

logging.config.dictConfig(config)

TORTOISE_ORM = {
TORTOISE_CONFIG = {
'connections': {'default': str(_app_settings.pg_dsn)},
'apps': {
'models': {
Expand All @@ -56,31 +39,18 @@
},
}

register_tortoise(
app,
modules={'models': ['app.models']},
generate_schemas=True,
add_exception_handlers=True,
config=TORTOISE_ORM,
)
app.include_router(tc2_router, prefix='/tc2')
app.include_router(cb_router, prefix='/callbooker')
app.include_router(pipedrive_router, prefix='/pipedrive')
app.include_router(main_router, prefix='')
# Has to go last otherwise it will override other routes
app.mount('/assets', StaticFiles(directory='app/assets'), name='assets')
app.mount('/', admin_app)

COMMIT = os.getenv('HEROKU_SLUG_COMMIT', '-')[:7]
RELEASE_CREATED_AT = os.getenv('HEROKU_RELEASE_CREATED_AT', '-')
if bool(_app_settings.logfire_token):
logfire.info('starting app {commit=} {release_created_at=}', commit=COMMIT, release_created_at=RELEASE_CREATED_AT)
@asynccontextmanager
async def lifespan(app: FastAPI):
_config = RegisterTortoise(app, config=TORTOISE_CONFIG, modules={'models': ['app.models']}, generate_schemas=True)
async with _config:
await _startup()
yield


@app.on_event('startup')
async def _startup():
from app.models import Admin
from app.utils import get_redis_client
from app.utils import get_config, get_redis_client

await admin_app.configure(
template_folders=[os.path.join(BASE_DIR, 'admin/templates/')],
Expand All @@ -90,7 +60,34 @@ async def _startup():
admin_path='',
favicon_url='/assets/favicon.ico',
)
from app.utils import get_config

await get_config()
await build_custom_field_schema()


app = FastAPI(lifespan=lifespan)

allowed_origins = ['https://tutorcruncher.com', 'http://localhost:3000']
if _app_settings.dev_mode:
allowed_origins = ['*']
app.add_middleware(CORSMiddleware, allow_origins=allowed_origins, allow_methods=['*'], allow_headers=['*'])
if bool(_app_settings.logfire_token):
logfire.instrument_fastapi(app)
logfire.instrument_pydantic()
logfire.configure(send_to_logfire=True, token=_app_settings.logfire_token)

FastAPIInstrumentor.instrument_app(app)

logging.config.dictConfig(config)

app.include_router(tc2_router, prefix='/tc2')
app.include_router(cb_router, prefix='/callbooker')
app.include_router(pipedrive_router, prefix='/pipedrive')
app.include_router(main_router, prefix='')
# Has to go last otherwise it will override other routes
app.mount('/assets', StaticFiles(directory='app/assets'), name='assets')
app.mount('/', admin_app)

COMMIT = os.getenv('HEROKU_SLUG_COMMIT', '-')[:7]
RELEASE_CREATED_AT = os.getenv('HEROKU_RELEASE_CREATED_AT', '-')
if bool(_app_settings.logfire_token):
logfire.info('starting app {commit=} {release_created_at=}', commit=COMMIT, release_created_at=RELEASE_CREATED_AT)
55 changes: 46 additions & 9 deletions app/pipedrive/tasks.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Type

import logfire
from tortoise.transactions import in_transaction

from app.base_schema import get_custom_fieldinfo
from app.models import Company, Contact, CustomField, Deal, Meeting
Expand All @@ -17,16 +18,52 @@
)


async def _transy_get_and_create_or_update_organisation(company: Company) -> Organisation:
"""
Create or update an Org in Pipedrive in a transaction
"""
async with in_transaction():
company = await Company.select_for_update().get(id=company.id)
return await get_and_create_or_update_organisation(company)


async def _transy_get_and_create_or_update_person(contact: Contact) -> Person:
"""
Create or update an Org in Pipedrive in a transaction
"""
async with in_transaction():
contact = await Contact.select_for_update().get(id=contact.id)
return await get_and_create_or_update_person(contact)


async def _transy_get_and_create_or_update_deal(deal: Deal) -> PDDeal:
"""
Create or update an Org in Pipedrive in a transaction
"""
async with in_transaction():
deal = await Deal.select_for_update().get(id=deal.id)
return await get_and_create_or_update_pd_deal(deal)


async def _transy_create_activity(meeting: Meeting, pd_deal: PDDeal = None) -> Activity:
"""
Create or update an Org in Pipedrive in a transaction
"""
async with in_transaction():
meeting = await Meeting.select_for_update().get(id=meeting.id)
return await create_activity(meeting, pd_deal)


async def pd_post_process_sales_call(company: Company, contact: Contact, meeting: Meeting, deal: Deal):
"""
Called after a sales call is booked. Creates/updates the Org & Person in pipedrive then creates the activity.
"""
with logfire.span('pd_post_process_sales_call'):
await get_and_create_or_update_organisation(company)
await get_and_create_or_update_person(contact)
pd_deal = await get_and_create_or_update_pd_deal(deal)
await _transy_get_and_create_or_update_organisation(company)
await _transy_get_and_create_or_update_person(contact)
pd_deal = await _transy_get_and_create_or_update_deal(deal)
await update_or_create_inherited_deal_custom_field_values(company)
await create_activity(meeting, pd_deal)
await _transy_create_activity(meeting, pd_deal)


async def pd_post_process_support_call(contact: Contact, meeting: Meeting):
Expand All @@ -35,20 +72,20 @@ async def pd_post_process_support_call(contact: Contact, meeting: Meeting):
"""
with logfire.span('pd_post_process_support_call'):
if (await contact.company).pd_org_id:
await get_and_create_or_update_person(contact)
await create_activity(meeting)
await _transy_get_and_create_or_update_person(contact)
await _transy_create_activity(meeting)


async def pd_post_process_client_event(company: Company, deal: Deal = None):
"""
Called after a client event from TC2. For example, a client paying an invoice.
"""
with logfire.span('pd_post_process_client_event'):
await get_and_create_or_update_organisation(company)
await _transy_get_and_create_or_update_organisation(company)
for contact in await company.contacts:
await get_and_create_or_update_person(contact)
await _transy_get_and_create_or_update_person(contact)
if deal:
await get_and_create_or_update_pd_deal(deal)
await _transy_get_and_create_or_update_deal(deal)
await update_or_create_inherited_deal_custom_field_values(company)


Expand Down
7 changes: 4 additions & 3 deletions app/tc2/_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,13 @@ async def _get_or_create_deal(company: Company, contact: Contact | None) -> Deal
else:
# these custom fields values are not stored on the model.
if cf.hermes_field_name:
val = getattr(company, cf.hermes_field_name, None)
# get the associated deal custom field
deal_cf = next((dcf for dcf in deal_custom_fields if dcf.machine_name == cf.machine_name), None)
if cf.field_type == CustomField.TYPE_FK_FIELD:
val = getattr(company, f'{cf.hermes_field_name}_id', None)
else:
val = getattr(company, cf.hermes_field_name, None)
if deal_cf and val:
if cf.field_type == CustomField.TYPE_FK_FIELD:
val = val.id
await CustomFieldValue.update_or_create(
**{'custom_field_id': deal_cf.id, 'deal': deal, 'defaults': {'value': val}}
)
Expand Down
2 changes: 1 addition & 1 deletion app/tc2/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async def callback(
Ignores events that don't have a meta_agency.
"""
expected_sig = hmac.new(settings.tc2_api_key.encode(), (await request.body()), hashlib.sha256).hexdigest()
if not webhook_signature or not compare_digest(webhook_signature, expected_sig):
if not settings.dev_mode and (not webhook_signature or not compare_digest(webhook_signature, expected_sig)):
raise HTTPException(status_code=403, detail='Unauthorized key')
for event in webhook.events:
company, deal = None, None
Expand Down
4 changes: 2 additions & 2 deletions patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from datetime import datetime
import click

from app.main import TORTOISE_ORM
from app.main import TORTOISE_CONFIG
from app.tc2.tasks import update_client_from_company
from tortoise.expressions import Q
from tortoise import Tortoise
Expand All @@ -21,7 +21,7 @@

async def init():
# Initialize Tortoise ORM
await Tortoise.init(config=TORTOISE_ORM)
await Tortoise.init(config=TORTOISE_CONFIG)
await build_custom_field_schema()
await Tortoise.generate_schemas()

Expand Down
7 changes: 2 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
[tool.pytest.ini_options]
addopts = '--tb=native'
filterwarnings = [
'ignore::DeprecationWarning:fastapi_admin.resources*', # Remove when fastapi-admin is updated
# 'ignore::DeprecationWarning:tortoise.contrib.fastapi.*', # Remove when tortoise is updated
'ignore::DeprecationWarning:fastapi.applications.*', # Remove when tortoise is updated
'ignore::DeprecationWarning:tortoise.contrib.test*',
'ignore::DeprecationWarning:fastapi_admin.resources*', # Remove when fastapi-admin is updated to us Pydantic2
]

[tool.coverage.run]
Expand All @@ -38,6 +35,6 @@
]

[tool.aerich]
tortoise_orm = 'app.main.TORTOISE_ORM'
tortoise_orm = 'app.main.TORTOISE_CONFIG'
location = './migrations'
src_folder = './.'
4 changes: 2 additions & 2 deletions tests/_common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from httpx import AsyncClient
from httpx import ASGITransport, AsyncClient
from tortoise.contrib.test import TestCase

from app.main import app
Expand All @@ -10,7 +10,7 @@ class HermesTestCase(TestCase):
async def asyncSetUp(self) -> None:
await super().asyncSetUp()
settings.testing = True
self.client = AsyncClient(app=app, base_url='http://test')
self.client = AsyncClient(transport=ASGITransport(app=app), base_url='http://test')
self.stage = await Stage.create(name='New', pd_stage_id=1)
self.pipeline = await Pipeline.create(name='payg', pd_pipeline_id=1, dft_entry_stage=self.stage)
self.config = await get_config()
Expand Down
6 changes: 1 addition & 5 deletions tests/test_callbooker.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,11 +956,7 @@ async def test_generate_support_link_admin_doesnt_exist(self):
name='Junes Ltd', website='https://junes.com', country='GB', tc2_cligency_id=10, sales_person=admin
)
headers = {'Authorization': f'token {settings.tc2_api_key}'}
r = await self.client.get(
self.gen_url,
params={'tc2_admin_id': 1, 'tc2_cligency_id': 10},
headers=headers,
)
r = await self.client.get(self.gen_url, params={'tc2_admin_id': 1, 'tc2_cligency_id': 10}, headers=headers)
assert r.status_code == 404

async def test_validate_support_link(self):
Expand Down
12 changes: 6 additions & 6 deletions tests/test_pipedrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ async def test_tc2_client_event(self, mock_request):
1: {
'title': 'A deal with Julies Ltd',
'org_id': 1,
'person_id': None,
'person_id': (await Contact.get()).pd_person_id,
'user_id': 99,
'pipeline_id': 1,
'stage_id': 1,
Expand Down Expand Up @@ -977,7 +977,7 @@ async def test_tc2_client_event_company_cf_on_deal(self, mock_request):
1: {
'title': 'A deal with Julies Ltd',
'org_id': 1,
'person_id': None,
'person_id': (await Contact.get()).pd_person_id,
'user_id': 99,
'pipeline_id': 1,
'stage_id': 1,
Expand Down Expand Up @@ -1271,7 +1271,7 @@ async def test_tc2_client_event_org_exists_linked_by_company_id(self, mock_reque
1: {
'title': 'A deal with Julies Ltd',
'org_id': 1,
'person_id': None,
'person_id': (await Contact.get()).pd_person_id,
'user_id': 99,
'pipeline_id': 1,
'stage_id': 1,
Expand Down Expand Up @@ -1361,7 +1361,7 @@ async def test_tc2_client_event_org_exists_linked_by_contacts_emails(self, mock_
1: {
'title': 'A deal with Julies Ltd',
'org_id': 1,
'person_id': None,
'person_id': (await Contact.get()).pd_person_id,
'user_id': 99,
'pipeline_id': 1,
'stage_id': 1,
Expand Down Expand Up @@ -1451,7 +1451,7 @@ async def test_tc2_client_event_org_exists_linked_by_contacts_phones(self, mock_
1: {
'title': 'A deal with Julies Ltd',
'org_id': 1,
'person_id': None,
'person_id': (await Contact.get()).pd_person_id,
'user_id': 99,
'pipeline_id': 1,
'stage_id': 1,
Expand Down Expand Up @@ -1532,7 +1532,7 @@ async def test_tc2_client_event_org_exists_contact_exists_no_org(self, mock_requ
1: {
'title': 'A deal with Julies Ltd',
'org_id': 1,
'person_id': None,
'person_id': (await Contact.get()).pd_person_id,
'user_id': 99,
'pipeline_id': 1,
'stage_id': 1,
Expand Down

0 comments on commit 139d5d3

Please sign in to comment.