The scheduler stopped running due to a deadlock problem #45275
Replies: 3 comments 4 replies
-
The error indicates a database deadlock issue, specifically when updating the DAG scheduling information. Error: sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. (pymysql.err.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction') And, it happens when the Airflow tries to execute this query: UPDATE dag SET next_dagrun=%(next_dagrun)s, next_dagrun_data_interval_start=%(next_dagrun_data_interval_start)s, next_dagrun_data_interval_end=%(next_dagrun_data_interval_end)s, next_dagrun_create_after=%(next_dagrun_create_after)s WHERE dag.dag_id = %(dag_dag_id)s The issue occurs because concurrent transactions are trying to update the same DAG record. The very short schedule interval (30 seconds) might contribute to this by creating many concurrent operations. You may try to update the schedule interval to minutely and also try to add and set Your DAG will be like this: with DAG(
dag_id=task_name,
schedule=timedelta(seconds=60),
start_date=datetime(2024, 1, 1),
dagrun_timeout=timedelta(seconds=300),
catchup=False,
max_active_runs=1
):
sub_hook = SubprocessHook()
@task
def run_command(execute, arguments, capture, work_dir, env):
command = [execute] + arguments.split()
result = sub_hook.run_command(command, env=env, cwd=work_dir)
if result.exit_code is not 0:
raise ValueError(result.output)
return result.output
run_command(dag_config['exec'], dag_config['arg'], dag_config.get("capture", False),
dag_config.get('work_dir', XXXXX_PATH / "config"), dag_config.get('env', None)) |
Beta Was this translation helpful? Give feedback.
-
Add the related error content: This shows that the main cause of the error was caused when the dagrun was created. [2025-01-03T14:51:00.411+0800] {scheduler_job_runner.py:1526} INFO - DAG OpsRt is at (or above) max_active_runs (1 of 1), not creating any more runs
[2025-01-03T14:51:00.424+0800] {scheduler_job_runner.py:1526} INFO - DAG EVCal is at (or above) max_active_runs (1 of 1), not creating any more runs
**[2025-01-03T14:51:00.704+0800] {scheduler_job_runner.py:1387} ERROR - Failed creating DagRun for OpsRo_data
Traceback (most recent call last):
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
self.dialect.do_execute(
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/cursors.py", line 153, in execute
result = self._query(query)
^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/cursors.py", line 322, in _query
conn.query(q)
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/connections.py", line 563, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/connections.py", line 825, in _read_query_result
result.read()
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/connections.py", line 1199, in read
first_packet = self.connection._read_packet()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/connections.py", line 775, in _read_packet
packet.raise_for_error()
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/protocol.py", line 219, in raise_for_error
err.raise_mysql_exception(self._data)
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/err.py", line 150, in raise_mysql_exception
raise errorclass(errno, errval)**
**pymysql.err.OperationalError: (1213, 'Deadlock found when trying to get lock; try restarting transaction')**
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1371, in _create_dag_runs
dag.create_dagrun(
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/airflow/utils/session.py", line 94, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/airflow/models/dag.py", line 3188, in create_dagrun
run = _create_orm_dagrun(
^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/airflow/utils/session.py", line 94, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/airflow/models/dag.py", line 348, in _create_orm_dagrun
session.flush()
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
self._flush(objects)
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
with util.safe_reraise():
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
flush_context.execute()
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
util.preloaded.orm_persistence.save_obj(
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 237, in save_obj
_emit_update_statements(
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/persistence.py", line 1001, in _emit_update_statements
c = connection._execute_20(
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
self._handle_dbapi_exception(
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
util.raise_(
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
self.dialect.do_execute(
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/cursors.py", line 153, in execute
result = self._query(query)
^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/cursors.py", line 322, in _query
conn.query(q)
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/connections.py", line 563, in query
self._affected_rows = self._read_query_result(unbuffered=unbuffered)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/connections.py", line 825, in _read_query_result
result.read()
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/connections.py", line 1199, in read
first_packet = self.connection._read_packet()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/connections.py", line 775, in _read_packet
packet.raise_for_error()
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/protocol.py", line 219, in raise_for_error
err.raise_mysql_exception(self._data)
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/pymysql/err.py", line 150, in raise_mysql_exception
raise errorclass(errno, errval)
sqlalchemy.exc.OperationalError: (pymysql.err.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE dag SET next_dagrun_create_after=%(next_dagrun_create_after)s WHERE dag.dag_id = %(dag_dag_id)s]
[parameters: {'next_dagrun_create_after': None, 'dag_dag_id': 'EVCal'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8)
[2025-01-03T14:51:01.556+0800] {scheduler_job_runner.py:1016} ERROR - Exception when executing SchedulerJob._run_scheduler_loop
Traceback (most recent call last):
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 999, in _execute
self._run_scheduler_loop()
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1138, in _run_scheduler_loop
num_queued_tis = self._do_scheduling(session)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1244, in _do_scheduling
self._create_dagruns_for_dags(guard, session)
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/airflow/utils/retries.py", line 93, in wrapped_function
for attempt in run_with_db_retries(max_retries=retries, logger=logger, **retry_kwargs):
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py", line 443, in __iter__
do = self.iter(retry_state=retry_state)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py", line 376, in iter
result = action(retry_state)
^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/tenacity/__init__.py", line 398, in <lambda>
self._add_action_func(lambda rs: rs.outcome.result())
^^^^^^^^^^^^^^^^^^^
File "/opt/tsysmart/local/python/lib/python3.11/concurrent/futures/_base.py", line 449, in result
return self.__get_result()
^^^^^^^^^^^^^^^^^^^
File "/opt/tsysmart/local/python/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
raise self._exception
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/airflow/utils/retries.py", line 102, in wrapped_function
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/airflow/jobs/scheduler_job_runner.py", line 1325, in _create_dagruns_for_dags
guard.commit()
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/airflow/utils/sqlalchemy.py", line 440, in commit
self.session.commit()
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 1454, in commit
self._transaction.commit(_to_root=self.future)
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 830, in commit
self._assert_active(prepared_ok=True)
File "/home/ttrs/tsysmart/platform/python/airflow/venv/lib/python3.11/site-packages/sqlalchemy/orm/session.py", line 604, in _assert_active
raise sa_exc.PendingRollbackError(
sqlalchemy.exc.PendingRollbackError: This Session's transaction has been rolled back due to a previous exception during flush. To begin a new transaction with this Session, first issue Session.rollback(). Original exception was: (pymysql.err.OperationalError) (1213, 'Deadlock found when trying to get lock; try restarting transaction')
[SQL: UPDATE dag SET next_dagrun_create_after=%(next_dagrun_create_after)s WHERE dag.dag_id = %(dag_dag_id)s]
[parameters: {'next_dagrun_create_after': None, 'dag_dag_id': 'EVCal'}]
(Background on this error at: https://sqlalche.me/e/14/e3q8) (Background on this error at: https://sqlalche.me/e/14/7s2a) |
Beta Was this translation helpful? Give feedback.
-
I am dynamically generating 30+ dag tasks through a for loop, does this have any impact on the task running |
Beta Was this translation helpful? Give feedback.
-
version: Airflow2.10.4 ubuntu
dag config (scheduler = seconds(30))
Beta Was this translation helpful? Give feedback.
All reactions