Skip to content

Commit

Permalink
Adding select_for_update lock on summary before reset
Browse files Browse the repository at this point in the history
  • Loading branch information
mcanu committed Feb 21, 2025
1 parent 7d086c2 commit 035f205
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 91 deletions.
111 changes: 59 additions & 52 deletions label_studio/data_import/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from core.utils.common import load_func
from django.conf import settings
from django.db import transaction
from projects.models import ProjectImport, ProjectReimport
from projects.models import ProjectImport, ProjectReimport, ProjectSummary
from users.models import User
from webhooks.models import WebhookAction
from webhooks.utils import emit_webhooks_for_instance
Expand Down Expand Up @@ -47,35 +47,39 @@ def async_import_background(
tasks = reformat_predictions(tasks, project_import.preannotated_from_fields)

if project_import.commit_to_project:
# Immediately create project tasks and update project states and counters
serializer = ImportApiSerializer(data=tasks, many=True, context={'project': project})
serializer.is_valid(raise_exception=True)
tasks = serializer.save(project_id=project.id)
emit_webhooks_for_instance(user.active_organization, project, WebhookAction.TASKS_CREATED, tasks)

task_count = len(tasks)
annotation_count = len(serializer.db_annotations)
prediction_count = len(serializer.db_predictions)
# Update counters (like total_annotations) for new tasks and after bulk update tasks stats. It should be a
# single operation as counters affect bulk is_labeled update

recalculate_stats_counts = {
'task_count': task_count,
'annotation_count': annotation_count,
'prediction_count': prediction_count,
}

project.update_tasks_counters_and_task_states(
tasks_queryset=tasks,
maximum_annotations_changed=False,
overlap_cohort_percentage_changed=False,
tasks_number_changed=True,
recalculate_stats_counts=recalculate_stats_counts,
)
logger.info('Tasks bulk_update finished (async import)')

project.summary.update_data_columns(tasks)
# TODO: project.summary.update_created_annotations_and_labels
with transaction.atomic():
# Lock summary for update to avoid race conditions
summary = ProjectSummary.objects.select_for_update().get(id=project.summary_id)

# Immediately create project tasks and update project states and counters
serializer = ImportApiSerializer(data=tasks, many=True, context={'project': project})
serializer.is_valid(raise_exception=True)
tasks = serializer.save(project_id=project.id)
emit_webhooks_for_instance(user.active_organization, project, WebhookAction.TASKS_CREATED, tasks)

task_count = len(tasks)
annotation_count = len(serializer.db_annotations)
prediction_count = len(serializer.db_predictions)
# Update counters (like total_annotations) for new tasks and after bulk update tasks stats. It should be a
# single operation as counters affect bulk is_labeled update

recalculate_stats_counts = {
'task_count': task_count,
'annotation_count': annotation_count,
'prediction_count': prediction_count,
}

project.update_tasks_counters_and_task_states(
tasks_queryset=tasks,
maximum_annotations_changed=False,
overlap_cohort_percentage_changed=False,
tasks_number_changed=True,
recalculate_stats_counts=recalculate_stats_counts,
)
logger.info('Tasks bulk_update finished (async import)')

summary.update_data_columns(tasks)
# TODO: summary.update_created_annotations_and_labels
else:
# Do nothing - just output file upload ids for further use
task_count = len(tasks)
Expand Down Expand Up @@ -148,35 +152,38 @@ def async_reimport_background(reimport_id, organization_id, user, **kwargs):
)

with transaction.atomic():
# Lock summary for update to avoid race conditions
summary = ProjectSummary.objects.select_for_update().get(id=project.summary_id)

project.remove_tasks_by_file_uploads(reimport.file_upload_ids)
serializer = ImportApiSerializer(data=tasks, many=True, context={'project': project, 'user': user})
serializer.is_valid(raise_exception=True)
tasks = serializer.save(project_id=project.id)
emit_webhooks_for_instance(organization_id, project, WebhookAction.TASKS_CREATED, tasks)

task_count = len(tasks)
annotation_count = len(serializer.db_annotations)
prediction_count = len(serializer.db_predictions)

recalculate_stats_counts = {
'task_count': task_count,
'annotation_count': annotation_count,
'prediction_count': prediction_count,
}

# Update counters (like total_annotations) for new tasks and after bulk update tasks stats. It should be a
# single operation as counters affect bulk is_labeled update
project.update_tasks_counters_and_task_states(
tasks_queryset=tasks,
maximum_annotations_changed=False,
overlap_cohort_percentage_changed=False,
tasks_number_changed=True,
recalculate_stats_counts=recalculate_stats_counts,
)
logger.info('Tasks bulk_update finished (async reimport)')
task_count = len(tasks)
annotation_count = len(serializer.db_annotations)
prediction_count = len(serializer.db_predictions)

recalculate_stats_counts = {
'task_count': task_count,
'annotation_count': annotation_count,
'prediction_count': prediction_count,
}

# Update counters (like total_annotations) for new tasks and after bulk update tasks stats. It should be a
# single operation as counters affect bulk is_labeled update
project.update_tasks_counters_and_task_states(
tasks_queryset=tasks,
maximum_annotations_changed=False,
overlap_cohort_percentage_changed=False,
tasks_number_changed=True,
recalculate_stats_counts=recalculate_stats_counts,
)
logger.info('Tasks bulk_update finished (async reimport)')

project.summary.update_data_columns(tasks)
# TODO: project.summary.update_created_annotations_and_labels
summary.update_data_columns(tasks)
# TODO: summary.update_created_annotations_and_labels

reimport.task_count = task_count
reimport.annotation_count = annotation_count
Expand Down
85 changes: 46 additions & 39 deletions label_studio/projects/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,38 +524,42 @@ def validate_config(self, config_string, strict=False):
if not hasattr(self, 'summary'):
return

if self.num_tasks == 0:
logger.debug(f'Project {self} has no tasks: nothing to validate here. Ensure project summary is empty')
logger.info(f'calling reset project_id={self.id} validate_config() num_tasks={self.num_tasks}')
self.summary.reset()
return
with transaction.atomic():
# Lock summary for update to avoid race conditions
summary = ProjectSummary.objects.select_for_update().get(id=self.summary_id)

# validate data columns consistency
fields_from_config = get_all_object_tag_names(config_string)
if not fields_from_config:
logger.debug('Data fields not found in labeling config')
return
if self.num_tasks == 0:
logger.debug(f'Project {self} has no tasks: nothing to validate here. Ensure project summary is empty')
logger.info(f'calling reset project_id={self.id} validate_config() num_tasks={self.num_tasks}')
summary.reset()
return

# validate data columns consistency
fields_from_config = get_all_object_tag_names(config_string)
if not fields_from_config:
logger.debug('Data fields not found in labeling config')
return

# TODO: DEV-2939 Add validation for fields addition in label config
"""fields_from_config = {field.split('[')[0] for field in fields_from_config} # Repeater tag support
fields_from_data = set(self.summary.common_data_columns)
fields_from_data.discard(settings.DATA_UNDEFINED_NAME)
if fields_from_data and not fields_from_config.issubset(fields_from_data):
different_fields = list(fields_from_config.difference(fields_from_data))
raise LabelStudioValidationErrorSentryIgnored(
f'These fields are not present in the data: {",".join(different_fields)}'
)"""

# TODO: DEV-2939 Add validation for fields addition in label config
"""fields_from_config = {field.split('[')[0] for field in fields_from_config} # Repeater tag support
fields_from_data = set(self.summary.common_data_columns)
fields_from_data.discard(settings.DATA_UNDEFINED_NAME)
if fields_from_data and not fields_from_config.issubset(fields_from_data):
different_fields = list(fields_from_config.difference(fields_from_data))
raise LabelStudioValidationErrorSentryIgnored(
f'These fields are not present in the data: {",".join(different_fields)}'
)"""

if self.num_annotations == 0 and self.num_drafts == 0:
logger.debug(
f'Project {self} has no annotations and drafts: nothing to validate here. '
f'Ensure annotations-related project summary is empty'
)
logger.info(
f'calling reset project_id={self.id} validate_config() num_annotations={self.num_annotations} num_drafts={self.num_drafts}'
)
self.summary.reset(tasks_data_based=False)
return
if self.num_annotations == 0 and self.num_drafts == 0:
logger.debug(
f'Project {self} has no annotations and drafts: nothing to validate here. '
f'Ensure annotations-related project summary is empty'
)
logger.info(
f'calling reset project_id={self.id} validate_config() num_annotations={self.num_annotations} num_drafts={self.num_drafts}'
)
summary.reset(tasks_data_based=False)
return

# validate annotations consistency
annotations_from_config = set(get_all_control_tag_tuples(config_string))
Expand Down Expand Up @@ -782,15 +786,18 @@ def save(self, *args, update_fields=None, recalc=True, **kwargs):
)

if hasattr(self, 'summary'):
# Ensure project.summary is consistent with current tasks / annotations
if self.num_tasks == 0:
logger.info(f'calling reset project_id={self.id} Project.save() num_tasks={self.num_tasks}')
self.summary.reset()
elif self.num_annotations == 0 and self.num_drafts == 0:
logger.info(
f'calling reset project_id={self.id} Project.save() num_annotations={self.num_annotations} num_drafts={self.num_drafts}'
)
self.summary.reset(tasks_data_based=False)
with transaction.atomic():
# Lock summary for update to avoid race conditions
summary = ProjectSummary.objects.select_for_update().get(id=self.summary_id)
# Ensure project.summary is consistent with current tasks / annotations
if self.num_tasks == 0:
logger.info(f'calling reset project_id={self.id} Project.save() num_tasks={self.num_tasks}')
summary.reset()
elif self.num_annotations == 0 and self.num_drafts == 0:
logger.info(
f'calling reset project_id={self.id} Project.save() num_annotations={self.num_annotations} num_drafts={self.num_drafts}'
)
summary.reset(tasks_data_based=False)

def get_member_ids(self):
if hasattr(self, 'team_link'):
Expand Down

0 comments on commit 035f205

Please sign in to comment.