diff --git a/queue_job/README.rst b/queue_job/README.rst index f22fd7bc10..bb46ba8374 100644 --- a/queue_job/README.rst +++ b/queue_job/README.rst @@ -1,7 +1,3 @@ -.. image:: https://odoo-community.org/readme-banner-image - :target: https://odoo-community.org/get-involved?utm_source=readme - :alt: Odoo Community Association - ========= Job Queue ========= @@ -17,7 +13,7 @@ Job Queue .. |badge1| image:: https://img.shields.io/badge/maturity-Mature-brightgreen.png :target: https://odoo-community.org/page/development-status :alt: Mature -.. |badge2| image:: https://img.shields.io/badge/license-LGPL--3-blue.png +.. |badge2| image:: https://img.shields.io/badge/licence-LGPL--3-blue.png :target: http://www.gnu.org/licenses/lgpl-3.0-standalone.html :alt: License: LGPL-3 .. |badge3| image:: https://img.shields.io/badge/github-OCA%2Fqueue-lightgray.png?logo=github @@ -697,10 +693,13 @@ promote its widespread use. .. |maintainer-guewen| image:: https://github.com/guewen.png?size=40px :target: https://github.com/guewen :alt: guewen +.. |maintainer-sbidoul| image:: https://github.com/sbidoul.png?size=40px + :target: https://github.com/sbidoul + :alt: sbidoul -Current `maintainer `__: +Current `maintainers `__: -|maintainer-guewen| +|maintainer-guewen| |maintainer-sbidoul| This module is part of the `OCA/queue `_ project on GitHub. diff --git a/queue_job/__manifest__.py b/queue_job/__manifest__.py index f32b20e2e2..5a88b8e7fa 100644 --- a/queue_job/__manifest__.py +++ b/queue_job/__manifest__.py @@ -29,7 +29,7 @@ }, "installable": True, "development_status": "Mature", - "maintainers": ["guewen"], + "maintainers": ["guewen", "sbidoul"], "post_init_hook": "post_init_hook", "post_load": "post_load", } diff --git a/queue_job/controllers/main.py b/queue_job/controllers/main.py index 4addf1be23..adc450d52d 100644 --- a/queue_job/controllers/main.py +++ b/queue_job/controllers/main.py @@ -26,15 +26,48 @@ class RunJobController(http.Controller): - def _try_perform_job(self, env, job): - """Try to perform the job.""" + @classmethod + def _acquire_job(cls, env: api.Environment, job_uuid: str) -> Job | None: + """Acquire a job for execution. + + - make sure it is in ENQUEUED state + - mark it as STARTED and commit the state change + - acquire the job lock + + If successful, return the Job instance, otherwise return None. This + function may fail to acquire the job is not in the expected state or is + already locked by another worker. + """ + env.cr.execute( + "SELECT uuid FROM queue_job WHERE uuid=%s AND state=%s " + "FOR NO KEY UPDATE SKIP LOCKED", + (job_uuid, ENQUEUED), + ) + if not env.cr.fetchone(): + _logger.warning( + "was requested to run job %s, but it does not exist, " + "or is not in state %s, or is being handled by another worker", + job_uuid, + ENQUEUED, + ) + return None + job = Job.load(env, job_uuid) + assert job and job.state == ENQUEUED job.set_started() job.store() env.cr.commit() - job.lock() + if not job.lock(): + _logger.warning( + "was requested to run job %s, but it could not be locked", + job_uuid, + ) + return None + return job + @classmethod + def _try_perform_job(cls, env, job): + """Try to perform the job, mark it done and commit if successful.""" _logger.debug("%s started", job) - job.perform() # Triggers any stored computed fields before calling 'set_done' # so that will be part of the 'exec_time' @@ -45,18 +78,20 @@ def _try_perform_job(self, env, job): env.cr.commit() _logger.debug("%s done", job) - def _enqueue_dependent_jobs(self, env, job): + @classmethod + def _enqueue_dependent_jobs(cls, env, job): tries = 0 while True: try: - job.enqueue_waiting() + with job.env.cr.savepoint(): + job.enqueue_waiting() except OperationalError as err: # Automatically retry the typical transaction serialization # errors if err.pgcode not in PG_CONCURRENCY_ERRORS_TO_RETRY: raise if tries >= DEPENDS_MAX_TRIES_ON_CONCURRENCY_FAILURE: - _logger.info( + _logger.error( "%s, maximum number of tries reached to update dependencies", errorcodes.lookup(err.pgcode), ) @@ -74,17 +109,8 @@ def _enqueue_dependent_jobs(self, env, job): else: break - @http.route( - "/queue_job/runjob", - type="http", - auth="none", - save_session=False, - readonly=False, - ) - def runjob(self, db, job_uuid, **kw): - http.request.session.db = db - env = http.request.env(user=SUPERUSER_ID) - + @classmethod + def _runjob(cls, env: api.Environment, job: Job) -> None: def retry_postpone(job, message, seconds=None): job.env.clear() with registry(job.env.cr.dbname).cursor() as new_cr: @@ -93,26 +119,9 @@ def retry_postpone(job, message, seconds=None): job.set_pending(reset_retry=False) job.store() - # ensure the job to run is in the correct state and lock the record - env.cr.execute( - "SELECT state FROM queue_job WHERE uuid=%s AND state=%s FOR UPDATE", - (job_uuid, ENQUEUED), - ) - if not env.cr.fetchone(): - _logger.warning( - "was requested to run job %s, but it does not exist, " - "or is not in state %s", - job_uuid, - ENQUEUED, - ) - return "" - - job = Job.load(env, job_uuid) - assert job and job.state == ENQUEUED - try: try: - self._try_perform_job(env, job) + cls._try_perform_job(env, job) except OperationalError as err: # Automatically retry the typical transaction serialization # errors @@ -141,7 +150,6 @@ def retry_postpone(job, message, seconds=None): # traceback in the logs we should have the traceback when all # retries are exhausted env.cr.rollback() - return "" except (FailedJobError, Exception) as orig_exception: buff = StringIO() @@ -151,19 +159,18 @@ def retry_postpone(job, message, seconds=None): job.env.clear() with registry(job.env.cr.dbname).cursor() as new_cr: job.env = job.env(cr=new_cr) - vals = self._get_failure_values(job, traceback_txt, orig_exception) + vals = cls._get_failure_values(job, traceback_txt, orig_exception) job.set_failed(**vals) job.store() buff.close() raise _logger.debug("%s enqueue depends started", job) - self._enqueue_dependent_jobs(env, job) + cls._enqueue_dependent_jobs(env, job) _logger.debug("%s enqueue depends done", job) - return "" - - def _get_failure_values(self, job, traceback_txt, orig_exception): + @classmethod + def _get_failure_values(cls, job, traceback_txt, orig_exception): """Collect relevant data from exception.""" exception_name = orig_exception.__class__.__name__ if hasattr(orig_exception, "__module__"): @@ -177,6 +184,22 @@ def _get_failure_values(self, job, traceback_txt, orig_exception): "exc_message": exc_message, } + @http.route( + "/queue_job/runjob", + type="http", + auth="none", + save_session=False, + readonly=False, + ) + def runjob(self, db, job_uuid, **kw): + http.request.session.db = db + env = http.request.env(user=SUPERUSER_ID) + job = self._acquire_job(env, job_uuid) + if not job: + return "" + self._runjob(env, job) + return "" + # flake8: noqa: C901 @http.route("/queue_job/create_test_job", type="http", auth="user") def create_test_job( @@ -187,6 +210,7 @@ def create_test_job( description="Test job", size=1, failure_rate=0, + job_duration=0, ): """Create test jobs @@ -207,6 +231,12 @@ def create_test_job( except (ValueError, TypeError): failure_rate = 0 + if job_duration is not None: + try: + job_duration = float(job_duration) + except (ValueError, TypeError): + job_duration = 0 + if not (0 <= failure_rate <= 1): raise BadRequest("failure_rate must be between 0 and 1") @@ -235,6 +265,7 @@ def create_test_job( channel=channel, description=description, failure_rate=failure_rate, + job_duration=job_duration, ) if size > 1: @@ -245,6 +276,7 @@ def create_test_job( channel=channel, description=description, failure_rate=failure_rate, + job_duration=job_duration, ) return "" @@ -256,6 +288,7 @@ def _create_single_test_job( description="Test job", size=1, failure_rate=0, + job_duration=0, ): delayed = ( http.request.env["queue.job"] @@ -265,7 +298,7 @@ def _create_single_test_job( channel=channel, description=description, ) - ._test_job(failure_rate=failure_rate) + ._test_job(failure_rate=failure_rate, job_duration=job_duration) ) return "job uuid: %s" % (delayed.db_record().uuid,) @@ -279,6 +312,7 @@ def _create_graph_test_jobs( channel=None, description="Test job", failure_rate=0, + job_duration=0, ): model = http.request.env["queue.job"] current_count = 0 @@ -301,7 +335,7 @@ def _create_graph_test_jobs( max_retries=max_retries, channel=channel, description="%s #%d" % (description, current_count), - )._test_job(failure_rate=failure_rate) + )._test_job(failure_rate=failure_rate, job_duration=job_duration) ) grouping = random.choice(possible_grouping_methods) diff --git a/queue_job/job.py b/queue_job/job.py index 790e07d90e..86407be3bb 100644 --- a/queue_job/job.py +++ b/queue_job/job.py @@ -236,7 +236,7 @@ def load_many(cls, env, job_uuids): recordset = cls.db_records_from_uuids(env, job_uuids) return {cls._load_from_db_record(record) for record in recordset} - def add_lock_record(self): + def add_lock_record(self) -> None: """ Create row in db to be locked while the job is being performed. """ @@ -256,13 +256,11 @@ def add_lock_record(self): [self.uuid], ) - def lock(self): - """ - Lock row of job that is being performed + def lock(self) -> bool: + """Lock row of job that is being performed. - If a job cannot be locked, - it means that the job wasn't started, - a RetryableJobError is thrown. + Return False if a job cannot be locked: it means that the job is not in + STARTED state or is already locked by another worker. """ self.env.cr.execute( """ @@ -278,18 +276,15 @@ def lock(self): queue_job WHERE uuid = %s - AND state='started' + AND state = %s ) - FOR UPDATE; + FOR NO KEY UPDATE SKIP LOCKED; """, - [self.uuid], + [self.uuid, STARTED], ) # 1 job should be locked - if 1 != len(self.env.cr.fetchall()): - raise RetryableJobError( - f"Trying to lock job that wasn't started, uuid: {self.uuid}" - ) + return bool(self.env.cr.fetchall()) @classmethod def _load_from_db_record(cls, job_db_record): diff --git a/queue_job/jobrunner/runner.py b/queue_job/jobrunner/runner.py index 846682a666..586c251128 100644 --- a/queue_job/jobrunner/runner.py +++ b/queue_job/jobrunner/runner.py @@ -361,23 +361,26 @@ def _query_requeue_dead_jobs(self): ELSE exc_info END) WHERE - id in ( - SELECT - queue_job_id - FROM - queue_job_lock - WHERE - queue_job_id in ( - SELECT - id - FROM - queue_job - WHERE - state IN ('enqueued','started') - AND date_enqueued < - (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') - ) - FOR UPDATE SKIP LOCKED + state IN ('enqueued','started') + AND date_enqueued < (now() AT TIME ZONE 'utc' - INTERVAL '10 sec') + AND ( + id in ( + SELECT + queue_job_id + FROM + queue_job_lock + WHERE + queue_job_lock.queue_job_id = queue_job.id + FOR NO KEY UPDATE SKIP LOCKED + ) + OR NOT EXISTS ( + SELECT + 1 + FROM + queue_job_lock + WHERE + queue_job_lock.queue_job_id = queue_job.id + ) ) RETURNING uuid """ @@ -400,6 +403,12 @@ def requeue_dead_jobs(self): However, when the Odoo server crashes or is otherwise force-stopped, running jobs are interrupted while the runner has no chance to know they have been aborted. + + This also handles orphaned jobs (enqueued but never started, no lock). + This edge case occurs when the runner marks a job as 'enqueued' + but the HTTP request to start the job never reaches the Odoo server + (e.g., due to server shutdown/crash between setting enqueued and + the controller receiving the request). """ with closing(self.conn.cursor()) as cr: diff --git a/queue_job/models/queue_job.py b/queue_job/models/queue_job.py index df33e2c7c5..d538a2a75c 100644 --- a/queue_job/models/queue_job.py +++ b/queue_job/models/queue_job.py @@ -3,6 +3,7 @@ import logging import random +import time from datetime import datetime, timedelta from odoo import _, api, exceptions, fields, models @@ -104,6 +105,7 @@ class QueueJob(models.Model): exec_time = fields.Float( string="Execution Time (avg)", group_operator="avg", + readonly=True, help="Time required to execute this job in seconds. Average when grouped.", ) date_cancelled = fields.Datetime(readonly=True) @@ -457,7 +459,9 @@ def related_action_open_record(self): ) return action - def _test_job(self, failure_rate=0): + def _test_job(self, failure_rate=0, job_duration=0): _logger.info("Running test job.") if random.random() <= failure_rate: raise JobError("Job failed") + if job_duration: + time.sleep(job_duration) diff --git a/queue_job/static/description/index.html b/queue_job/static/description/index.html index 82bed11d0f..3dcf44c283 100644 --- a/queue_job/static/description/index.html +++ b/queue_job/static/description/index.html @@ -3,7 +3,7 @@ -README.rst +Job Queue -
+
+

Job Queue

- - -Odoo Community Association - -
-

Job Queue

-

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

+

Mature License: LGPL-3 OCA/queue Translate me on Weblate Try me on Runboat

This addon adds an integrated Job Queue to Odoo.

It allows to postpone method calls executed asynchronously.

Jobs are executed in the background by a Jobrunner, in their own transaction.

@@ -446,11 +441,11 @@

Job Queue

-

Installation

+

Installation

Be sure to have the requests library.

-

Configuration

+

Configuration

  • Using environment variables and command line:
    • Adjust environment variables (optional):
-

Usage

+

Usage

To use this module, you need to:

  1. Go to Job Queue menu
-

Developers

+

Developers

-

Delaying jobs

+

Delaying jobs

The fast way to enqueue a job for a method is to use with_delay() on a record or model:

@@ -626,7 +621,7 @@ 

Delaying jobs

-

Enqueing Job Options

+

Enqueing Job Options

  • priority: default is 10, the closest it is to 0, the faster it will be executed
  • @@ -643,7 +638,7 @@

    Enqueing Job Options

-

Configure default options for jobs

+

Configure default options for jobs

In earlier versions, jobs could be configured using the @job decorator. This is now obsolete, they can be configured using optional queue.job.function and queue.job.channel XML records.

@@ -764,7 +759,7 @@

Configure default options for job without delaying any jobs.

-

Testing

+

Testing

Asserting enqueued jobs

The recommended way to test jobs, rather than running them directly and synchronously is to split the tests in two parts:

@@ -884,14 +879,14 @@

Testing

-

Tips and tricks

+

Tips and tricks

  • Idempotency (https://www.restapitutorial.com/lessons/idempotency.html): The queue_job should be idempotent so they can be retried several times without impact on the data.
  • The job should test at the very beginning its relevance: the moment the job will be executed is unknown by design. So the first task of a job should be to check if the related work is still relevant at the moment of the execution.
-

Patterns

+

Patterns

Through the time, two main patterns emerged:

  1. For data exposed to users, a model should store the data and the model should be the creator of the job. The job is kept hidden from the users
  2. @@ -901,7 +896,7 @@

    Patterns

-

Known issues / Roadmap

+

Known issues / Roadmap

  • After creating a new database or installing queue_job on an existing database, Odoo must be restarted for the runner to detect it.
  • @@ -922,7 +917,7 @@

    Known issues / Roadmap

-

Changelog

+

Changelog

-

Bug Tracker

+

Bug Tracker

Bugs are tracked on GitHub Issues. In case of trouble, please check there if your issue has already been reported. If you spotted it first, help us to smash it by providing a detailed and welcomed @@ -952,16 +947,16 @@

Bug Tracker

Do not contact contributors directly about support or help with technical issues.

-

Credits

+

Credits

-

Authors

+

Authors

  • Camptocamp
  • ACSONE SA/NV
-

Contributors

+

Contributors

-

Maintainers

+

Maintainers

This module is maintained by the OCA.

Odoo Community Association @@ -986,13 +981,12 @@

Maintainers

OCA, or the Odoo Community Association, is a nonprofit organization whose mission is to support the collaborative development of Odoo features and promote its widespread use.

-

Current maintainer:

-

guewen

+

Current maintainers:

+

guewen sbidoul

This module is part of the OCA/queue project on GitHub.

You are welcome to contribute. To learn how please visit https://odoo-community.org/page/Contribute.

-
diff --git a/queue_job/tests/__init__.py b/queue_job/tests/__init__.py index 1062acdc25..16bcdff96b 100644 --- a/queue_job/tests/__init__.py +++ b/queue_job/tests/__init__.py @@ -8,4 +8,3 @@ from . import test_model_job_function from . import test_queue_job_protected_write from . import test_wizards -from . import test_requeue_dead_job diff --git a/test_queue_job/__manifest__.py b/test_queue_job/__manifest__.py index c3a29bf0c5..484f21d855 100644 --- a/test_queue_job/__manifest__.py +++ b/test_queue_job/__manifest__.py @@ -13,6 +13,8 @@ "data/queue_job_channel_data.xml", "data/queue_job_function_data.xml", "security/ir.model.access.csv", + "data/queue_job_test_job.xml", ], + "maintainers": ["sbidoul"], "installable": True, } diff --git a/test_queue_job/data/queue_job_test_job.xml b/test_queue_job/data/queue_job_test_job.xml new file mode 100644 index 0000000000..8a28ab70a0 --- /dev/null +++ b/test_queue_job/data/queue_job_test_job.xml @@ -0,0 +1,18 @@ + + + + + + diff --git a/test_queue_job/models/test_models.py b/test_queue_job/models/test_models.py index 03fa792137..585e2fd593 100644 --- a/test_queue_job/models/test_models.py +++ b/test_queue_job/models/test_models.py @@ -1,6 +1,8 @@ # Copyright 2016 Camptocamp SA # License LGPL-3.0 or later (http://www.gnu.org/licenses/lgpl.html) +from datetime import datetime, timedelta + from odoo import api, fields, models from odoo.addons.queue_job.delay import chain @@ -29,6 +31,35 @@ def testing_related__url(self, **kwargs): "url": kwargs["url"].format(subject=subject), } + @api.model + def _create_test_started_job(self, uuid): + """Create started jobs to be used within tests""" + self.env["queue.job"].with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ).create( + { + "uuid": uuid, + "state": "started", + "model_name": "queue.job", + "method_name": "write", + } + ) + + @api.model + def _create_test_enqueued_job(self, uuid): + """Create enqueued jobs to be used within tests""" + self.env["queue.job"].with_context( + _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, + ).create( + { + "uuid": uuid, + "state": "enqueued", + "model_name": "queue.job", + "method_name": "write", + "date_enqueued": datetime.now() - timedelta(minutes=1), + } + ) + class ModelTestQueueJob(models.Model): @@ -110,6 +141,13 @@ def _register_hook(self): ) return super()._register_hook() + def _unregister_hook(self): + """Remove the patches installed by _register_hook()""" + self._revert_method("delay_me") + self._revert_method("delay_me_options") + self._revert_method("delay_me_context_key") + return super()._unregister_hook() + def _job_store_values(self, job): value = "JUST_TESTING" if job.state == "failed": diff --git a/test_queue_job/tests/__init__.py b/test_queue_job/tests/__init__.py index 0405022ce0..0cfacebdf3 100644 --- a/test_queue_job/tests/__init__.py +++ b/test_queue_job/tests/__init__.py @@ -1,3 +1,4 @@ +from . import test_acquire_job from . import test_autovacuum from . import test_delayable from . import test_dependencies @@ -7,3 +8,4 @@ from . import test_job_function from . import test_related_actions from . import test_delay_mocks +from . import test_requeue_dead_job diff --git a/test_queue_job/tests/common.py b/test_queue_job/tests/common.py index a32fcc380a..c1f7d88ca0 100644 --- a/test_queue_job/tests/common.py +++ b/test_queue_job/tests/common.py @@ -20,3 +20,13 @@ def _create_job(self): stored = Job.db_record_from_uuid(self.env, test_job.uuid) self.assertEqual(len(stored), 1) return stored + + def _get_demo_job(self, uuid): + # job created during load of demo data + job = self.env["queue.job"].search([("uuid", "=", uuid)], limit=1) + self.assertTrue( + job, + f"Demo data queue job {uuid!r} should be loaded in order " + "to make this test work", + ) + return job diff --git a/test_queue_job/tests/test_acquire_job.py b/test_queue_job/tests/test_acquire_job.py new file mode 100644 index 0000000000..3f0c92a2be --- /dev/null +++ b/test_queue_job/tests/test_acquire_job.py @@ -0,0 +1,51 @@ +# Copyright 2026 ACSONE SA/NV +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +import logging +from unittest import mock + +from odoo.tests import tagged + +from odoo.addons.queue_job.controllers.main import RunJobController + +from .common import JobCommonCase + + +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): + def test_acquire_enqueued_job(self): + job_record = self._get_demo_job(uuid="test_enqueued_job") + self.assertFalse( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)], + ), + "A job lock record should not exist at this point", + ) + with mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit: + job = RunJobController._acquire_job(self.env, job_uuid="test_enqueued_job") + mock_commit.assert_called_once() + self.assertIsNotNone(job) + self.assertEqual(job.uuid, "test_enqueued_job") + self.assertEqual(job.state, "started") + self.assertTrue( + self.env["queue.job.lock"].search( + [("queue_job_id", "=", job_record.id)] + ), + "A job lock record should exist at this point", + ) + + def test_acquire_started_job(self): + with ( + mock.patch.object( + self.env.cr, "commit", mock.Mock(side_effect=self.env.flush_all) + ) as mock_commit, + self.assertLogs(level=logging.WARNING) as logs, + ): + job = RunJobController._acquire_job(self.env, "test_started_job") + mock_commit.assert_not_called() + self.assertIsNone(job) + self.assertIn( + "was requested to run job test_started_job, but it does not exist", + logs.output[0], + ) diff --git a/test_queue_job/tests/test_autovacuum.py b/test_queue_job/tests/test_autovacuum.py index 09730a4fea..97aebcba1e 100644 --- a/test_queue_job/tests/test_autovacuum.py +++ b/test_queue_job/tests/test_autovacuum.py @@ -28,12 +28,16 @@ def test_autovacuum(self): date_done = datetime.now() - timedelta(days=29) stored.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 1) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 1 + ) date_done = datetime.now() - timedelta(days=31) stored.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 0) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 0 + ) def test_autovacuum_multi_channel(self): root_channel = self.env.ref("queue_job.channel_root") @@ -48,11 +52,17 @@ def test_autovacuum_multi_channel(self): {"channel": channel_60days.complete_name, "date_done": date_done} ) - self.assertEqual(len(self.env["queue.job"].search([])), 2) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 2 + ) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 1) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 1 + ) date_done = datetime.now() - timedelta(days=61) job_60days.write({"date_done": date_done}) self.env["queue.job"].autovacuum() - self.assertEqual(len(self.env["queue.job"].search([])), 0) + self.assertEqual( + len(self.env["queue.job"].search([("channel", "!=", False)])), 0 + ) diff --git a/queue_job/tests/test_requeue_dead_job.py b/test_queue_job/tests/test_requeue_dead_job.py similarity index 50% rename from queue_job/tests/test_requeue_dead_job.py rename to test_queue_job/tests/test_requeue_dead_job.py index c6c82a2f4d..a267c43c87 100644 --- a/queue_job/tests/test_requeue_dead_job.py +++ b/test_queue_job/tests/test_requeue_dead_job.py @@ -3,33 +3,16 @@ from contextlib import closing from datetime import datetime, timedelta -from odoo.tests.common import TransactionCase +from odoo.tests import tagged from odoo.addons.queue_job.job import Job from odoo.addons.queue_job.jobrunner.runner import Database +from .common import JobCommonCase -class TestRequeueDeadJob(TransactionCase): - def create_dummy_job(self, uuid): - """ - Create dummy job for tests - """ - return ( - self.env["queue.job"] - .with_context( - _job_edit_sentinel=self.env["queue.job"].EDIT_SENTINEL, - ) - .create( - { - "uuid": uuid, - "user_id": self.env.user.id, - "state": "pending", - "model_name": "queue.job", - "method_name": "write", - } - ) - ) +@tagged("post_install", "-at_install") +class TestRequeueDeadJob(JobCommonCase): def get_locks(self, uuid, cr=None): """ Retrieve lock rows @@ -52,7 +35,7 @@ def get_locks(self, uuid, cr=None): WHERE uuid = %s ) - FOR UPDATE SKIP LOCKED + FOR NO KEY UPDATE SKIP LOCKED """, [uuid], ) @@ -60,7 +43,8 @@ def get_locks(self, uuid, cr=None): return cr.fetchall() def test_add_lock_record(self): - queue_job = self.create_dummy_job("test_add_lock") + queue_job = self._get_demo_job("test_started_job") + self.assertEqual(len(queue_job), 1) job_obj = Job.load(self.env, queue_job.uuid) job_obj.set_started() @@ -71,19 +55,10 @@ def test_add_lock_record(self): self.assertEqual(1, len(locks)) def test_lock(self): - queue_job = self.create_dummy_job("test_lock") + queue_job = self._get_demo_job("test_started_job") job_obj = Job.load(self.env, queue_job.uuid) job_obj.set_started() - job_obj.store() - - locks = self.get_locks(job_obj.uuid) - - self.assertEqual(1, len(locks)) - - # commit to update queue_job records in DB - self.env.cr.commit() # pylint: disable=E8102 - job_obj.lock() with closing(self.env.registry.cursor()) as new_cr: @@ -92,42 +67,34 @@ def test_lock(self): # Row should be locked self.assertEqual(0, len(locks)) - # clean up - queue_job.unlink() - - self.env.cr.commit() # pylint: disable=E8102 - - # because we committed the cursor, the savepoint of the test method is - # gone, and this would break TransactionCase cleanups - self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) - def test_requeue_dead_jobs(self): - uuid = "test_requeue_dead_jobs" - - queue_job = self.create_dummy_job(uuid) + queue_job = self._get_demo_job("test_enqueued_job") job_obj = Job.load(self.env, queue_job.uuid) job_obj.set_enqueued() - # simulate enqueuing was in the past - job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) job_obj.set_started() - + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) job_obj.store() - self.env.cr.commit() # pylint: disable=E8102 # requeue dead jobs using current cursor query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() self.env.cr.execute(query) uuids_requeued = self.env.cr.fetchall() + self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued) - self.assertEqual(len(uuids_requeued), 1) - self.assertEqual(uuids_requeued[0][0], uuid) + def test_requeue_orphaned_jobs(self): + queue_job = self._get_demo_job("test_enqueued_job") + job_obj = Job.load(self.env, queue_job.uuid) - # clean up - queue_job.unlink() - self.env.cr.commit() # pylint: disable=E8102 + # Only enqueued job, don't set it to started to simulate the scenario + # that system shutdown before job is starting + job_obj.set_enqueued() + job_obj.date_enqueued = datetime.now() - timedelta(minutes=1) + job_obj.store() - # because we committed the cursor, the savepoint of the test method is - # gone, and this would break TransactionCase cleanups - self.cr.execute("SAVEPOINT test_%d" % self._savepoint_id) + # job is now picked up by the requeue query (which includes orphaned jobs) + query = Database(self.env.cr.dbname)._query_requeue_dead_jobs() + self.env.cr.execute(query) + uuids_requeued = self.env.cr.fetchall() + self.assertTrue(queue_job.uuid in j[0] for j in uuids_requeued)