Indices and tables¶
Reference¶
-
class
darq.app.Darq(*, queue_name: str = 'arq:queue', redis_settings: Optional[darq.connections.RedisSettings] = None, burst: bool = False, on_startup: Callable[[Dict[Any, Any]], Awaitable[None]] = None, on_shutdown: Callable[[Dict[Any, Any]], Awaitable[None]] = None, on_job_prerun: Optional[Callable[[Dict[Any, Any], Task, Sequence[Any], Mapping[str, Any]], Awaitable[None]]] = None, on_job_postrun: Optional[Callable[[Dict[Any, Any], Task, Sequence[Any], Mapping[str, Any], Any], Awaitable[None]]] = None, on_job_prepublish: Optional[Callable[[Dict[str, Any], Task, List[Any], Dict[str, Any], darq.types.JobEnqueueOptions], Awaitable[None]]] = None, on_scheduler_startup: Callable[[Dict[Any, Any]], Awaitable[None]] = None, on_scheduler_shutdown: Callable[[Dict[Any, Any]], Awaitable[None]] = None, max_jobs: int = 10, job_timeout: Union[int, float, datetime.timedelta] = 300, keep_result: Union[int, float, datetime.timedelta] = 3600, poll_delay: Union[int, float, datetime.timedelta] = 0.5, queue_read_limit: Optional[int] = None, max_tries: int = 5, health_check_interval: Union[int, float, datetime.timedelta] = 3600, health_check_key: Optional[str] = None, ctx: Optional[Dict[str, Any]] = None, retry_jobs: bool = True, max_burst_jobs: int = -1, job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None, job_deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None, default_job_expires: Union[int, float, datetime.timedelta] = datetime.timedelta(days=1))¶ Darq application.
Parameters: - queue_name – queue name to get tasks from
- redis_settings – settings for creating a redis connection
- burst – whether to stop the worker once all tasks have been run
- on_startup – coroutine function to run at worker startup
- on_shutdown – coroutine function to run at worker shutdown
- on_job_prerun – coroutine function to run before task starts
- on_job_postrun – coroutine function to run after task finish
- on_job_prepublish – coroutine function to run before enqueue task
- max_jobs – maximum number of tasks to run at a time
- job_timeout – default task timeout (max run time)
- keep_result – default duration to keep task results for
- poll_delay – duration between polling the queue for new tasks
- queue_read_limit – the maximum number of tasks to pull from the queue
each time it’s polled;
by default it equals
max_jobs - max_tries – default maximum number of times to retry a task
- health_check_interval – how often to set the health check key
- health_check_key – redis key under which health check is set
- ctx – dict object, data from it will be pass to hooks:
on_startup,on_shutdown- can modifyctx;on_job_prerun,on_job_postrun- readonly - retry_jobs – whether to retry tasks on Retry or CancelledError or not
- max_burst_jobs – the maximum number of tasks to process in burst mode (disabled with negative values)
- job_serializer – a function that serializes Python objects to bytes, defaults to pickle.dumps
- job_deserializer – a function that deserializes bytes into Python objects, defaults to pickle.loads
- default_job_expires – default task expires. If the task still hasn’t started after this duration, do not run it
-
add_cron_jobs(*cron_jobs) → None¶ Parameters: cron_jobs – list of cron jobs to run, use darq.cron.cron()to create them
-
task(func=None, *, keep_result=None, timeout=None, max_tries=None, queue=None, expires=<object object>, with_ctx=False)¶ Parameters: - func – coroutine function
- keep_result – duration to keep the result for, if 0 the result is not kept
- timeout – maximum time the task should take
- max_tries – maximum number of tries allowed for the task, use 1 to prevent retrying
- queue – queue of the task, can be used to send task to different queue
- expires – if the task still hasn’t started after this duration, do not run it
- with_ctx – pass context to the task as first argument
-
exception
darq.app.DarqConfigError¶
-
exception
darq.app.DarqConnectionError¶
-
exception
darq.app.DarqException¶
-
class
darq.connections.ArqRedis(pool_or_conn: Union[aioredis.pool.ConnectionsPool, aioredis.connection.RedisConnection], job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None, job_deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None, **kwargs)¶ Thin subclass of
aioredis.Rediswhich addsdarq.connections.enqueue_job().Parameters: - redis_settings – an instance of
darq.connections.RedisSettings. - job_serializer – a function that serializes Python objects to bytes, defaults to pickle.dumps
- job_deserializer – a function that deserializes bytes into Python objects, defaults to pickle.loads
- kwargs – keyword arguments directly passed to
aioredis.Redis.
-
all_job_results() → List[darq.jobs.JobResult]¶ Get results for all jobs in redis.
-
enqueue_job(function: str, args: Sequence[Any], kwargs: Mapping[str, Any], *, job_id: Optional[str] = None, queue_name: Optional[str] = None, defer_until: Optional[datetime.datetime] = None, defer_by: Union[None, int, float, datetime.timedelta] = None, expires: Union[None, int, float, datetime.timedelta] = None, job_try: Optional[int] = None) → Optional[darq.jobs.Job]¶ Enqueue a job.
Parameters: - function – Name of the function to call
- args – args to pass to the function
- kwargs – kwargs to pass to the function
- job_id – ID of the job, can be used to enforce job uniqueness
- queue_name – queue of the job, can be used to create job in different queue
- defer_until – datetime at which to run the job
- defer_by – duration to wait before running the job
- expires – if the job still hasn’t started after this duration, do not run it
- job_try – useful when re-enqueueing jobs within a job
Returns: darq.jobs.Jobinstance orNoneif a job with this ID already exists
-
queued_jobs(*, queue_name: str = 'arq:queue') → List[darq.jobs.JobDef]¶ Get information about queued, mostly useful when testing.
- redis_settings – an instance of
-
class
darq.connections.RedisSettings(host: Union[str, List[Tuple[str, int]]] = 'localhost', port: int = 6379, database: int = 0, password: Optional[str] = None, ssl: Union[bool, None, ssl.SSLContext] = None, conn_timeout: int = 1, conn_retries: int = 5, conn_retry_delay: int = 1, sentinel: bool = False, sentinel_master: str = 'mymaster', sentinel_timeout: float = 0.2)¶ No-Op class used to hold redis connection redis_settings.
Used by
darq.connections.create_pool()anddarq.worker.Worker.
-
darq.connections.create_pool(settings: darq.connections.RedisSettings = None, *, retry: int = 0, job_serializer: Optional[Callable[[Dict[str, Any]], bytes]] = None, job_deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None) → darq.connections.ArqRedis¶ Create a new redis pool, retrying up to
conn_retriestimes if the connection fails.Similar to
aioredis.create_redis_poolexcept it returns adarq.connections.ArqRedisinstance, thus allowing job enqueuing.
-
exception
darq.worker.Retry(defer: Union[int, float, datetime.timedelta, None] = None)¶ Special exception to retry the job (if
max_retrieshasn’t been reached).Parameters: defer – duration to wait before rerunning the job
-
class
darq.worker.Worker(app: Darq, **replace_kwargs)¶ Main class for running jobs.
Parameters: - app – instance of
darq.app.Darq() - worker_settings – instance of
darq.worker.WorkerSettings
-
async_run() → None¶ Asynchronously run the worker, does not close connections. Useful when testing.
-
run() → None¶ Sync function to run the worker, finally closes worker connections.
-
run_check(retry_jobs: Optional[bool] = None, max_burst_jobs: Optional[int] = None) → int¶ Run
darq.worker.Worker.async_run(), check for failed jobs and raisedarq.worker.FailedJobsif any jobs have failed.Returns: number of completed jobs
- app – instance of
-
darq.cron.cron(task: Union[str, darq.types.DarqTask[typing.Callable[..., typing.Coroutine[typing.Any, typing.Any, typing.Any]]][Callable[[...], Coroutine[Any, Any, Any]]]], *, name: Optional[str] = None, month: Union[None, Set[int], List[int], Tuple[int], int] = None, day: Union[None, Set[int], List[int], Tuple[int], int] = None, weekday: Union[None, Set[int], List[int], Tuple[int], int, str] = None, hour: Union[None, Set[int], List[int], Tuple[int], int] = None, minute: Union[None, Set[int], List[int], Tuple[int], int] = None, second: Union[None, Set[int], List[int], Tuple[int], int] = 0, microsecond: int = 123456, run_at_startup: bool = False, unique: bool = True, timeout: Union[int, float, datetime.timedelta, None] = None, keep_result: Optional[float] = 0, max_tries: Optional[int] = 1) → darq.cron.CronJob¶ Create a cron job, eg. it should be executed at specific times.
Workers will enqueue this job at or just after the set times. If
uniqueis true (the default) the job will only be run once even if multiple workers are running.Parameters: - task – task function to run
- name – name of the job, if None, the name of the task is used
- month – month(s) to run the job on, 1 - 12
- day – day(s) to run the job on, 1 - 31
- weekday – week day(s) to run the job on, 0 - 6 or mon - sun
- hour – hour(s) to run the job on, 0 - 23
- minute – minute(s) to run the job on, 0 - 59
- second – second(s) to run the job on, 0 - 59
- microsecond – microsecond(s) to run the job on, defaults to 123456 as the world is busier at the top of a second, 0 - 1e6
- run_at_startup – whether to run as worker starts
- unique – whether the job should be only be executed once at each time
- timeout – job timeout
- keep_result – how long to keep the result for
- max_tries – maximum number of tries for the job
-
class
darq.jobs.JobStatus¶ Enum of job statuses.
-
complete= 'complete'¶ job is complete, result is available
-
deferred= 'deferred'¶ job is in the queue, time it should be run not yet reached
-
in_progress= 'in_progress'¶ job is in progress
-
not_found= 'not_found'¶ job not found in any way
-
queued= 'queued'¶ job is in the queue, time it should run has been reached
-
-
class
darq.jobs.Job(job_id: str, redis: ArqRedis, _queue_name: str = 'arq:queue', _deserializer: Optional[Callable[[bytes], Dict[str, Any]]] = None)¶ Holds data a reference to a job.
-
info() → Optional[darq.jobs.JobDef]¶ All information on a job, including its result if it’s available, does not wait for the result.
-
result(timeout: Optional[float] = None, *, pole_delay: float = 0.5) → Any¶ Get the result of the job, including waiting if it’s not yet available. If the job raised an exception, it will be raised here.
Parameters: - timeout – maximum time to wait for the job result before raising
TimeoutError, will wait forever - pole_delay – how often to poll redis for the job result
- timeout – maximum time to wait for the job result before raising
-
result_info() → Optional[darq.jobs.JobResult]¶ Information about the job result if available, does not wait for the result. Does not raise an exception even if the job raised one.
-
status() → darq.jobs.JobStatus¶ Status of the job.
-
Changelog¶
0.11.1 (unreleased)¶
- Remove
pydanticdependency
0.11.0 (2022-08-03)¶
- Added ability to optionally pass
ctxto the task, like this:
@task(with_ctx=True)
def foobar(ctx):
log.info('Foobar try %s', ctx['job_try'])
ctx contains: job_id, job_try, enqueue_time, score, metadata + all worker’s ctx (including custom context which can be passed via on_startup). Thanks to @kindermax (https://github.com/seedofjoy/darq/pull/426) !
0.10.2 (2022-02-03)¶
- Add proper typing for functions wrapped with the @task decorator. Mypy will now check that parameters are passed correctly when calling
func()andfunc.delay()
0.10.1 (2021-07-29)¶
- Add
sentinel_timeout(defaults to 0.2) param toRedisSettings
0.10.0 (2021-07-09)¶
- Breaking change: Rename
darq.worker.Functiontodarq.worker.Task - Made
jobtotasknaming migration - Add max_jobs parameter to CLI (thanks to @antonmyronyuk)
- Fixed bug with
expiresargument:default_job_expirescould not be replaced withNonein@taskor.apply_async
0.9.0 (2020-06-24)¶
- Breaking change: Add
scheduler_ctxparam toon_scheduler_startupandon_scheduler_shutdownto share data between this callbacks. It already hasctx['redis']- instance ofArqRedis
0.8.0 (2020-06-22)¶
- Breaking change: Changed CLI command format. Before:
darq some_project.darq_app.darq. Now:darq -A some_project.darq_app.darq worker - Breaking change: Scheduler (cron jobs) now run’s seperate from worker (see
darq schedulercommand) - Breaking change: Changed some function signatures (rename arguments)
- Breaking change: Remove
redis_poolparam fromDarqapp - Add
on_scheduler_startupandon_scheduler_shutdowncallbacks
0.7.2 (2020-06-18)¶
- Fix some types (cron, OnJobPrepublishType)
on_job_prerunnow runs before “task started” log andon_job_postrunnow runs after “task finished” log
0.7.1 (2020-05-25)¶
.apply_async: Makeargsandkwargsarguments optional
0.7.0 (2020-05-25)¶
- Fork
arqto project and merge it withdarq(It was easier to rewritearqthan to write a wrapper) - Breaking change: Remove “magic” params from
.delay. For enqueue job with special params added.apply_async. - Add
watch-mode to CLI. - Fix: Now worker will not run cronjob if it’s functions queue not match with worker’s
0.6.0 (2020-03-08)¶
- Breaking change: Changed Darq constructor from single config param to separate params.
- arq_function.coroutine now has .delay method.
0.5.0 (2020-03-03)¶
- Add
on_job_prepublish(metadata, arq_function, args, kwargs)callback.metadatais mutable dict, which will be available atctx['metadata'].
0.4.0 (2020-03-03)¶
- Add
default_job_expiresparam to Darq (if the job still hasn’t started after this duration, do not run it). Default - 1 day - Add expires param to
@task(if set - overwritesdefault_job_expires)
0.3.1 (2020-03-02)¶
- Rewrite warm shutdown: now during warm shutdown cron is disabled, on second signal the warm shutdown will be canceled
0.3.0 (2020-02-27)¶
- Breaking change:
on_job_prerunandon_job_postrunnow acceptsarq.worker.Functioninstead of the original function (it can still be accessed atarq_function.coroutine)
0.2.1 (2020-02-26)¶
- Fix
add_cron_jobsmethod. Tests added.
0.2.0 (2020-02-26)¶
- Add
on_job_prerun(ctx, function, args, kwargs)andon_job_postrun(ctx, function, args, kwargs, result)callbacks.
0.1.0 (2020-02-26)¶
- Breaking change: Jobs no longer explicitly get
JobCtxas the first argument, as in 99.9% cases it doesn’t need it. In future release will be possible to optionally passJobCtxin some way. - Breaking change: All cron jobs should be wrapped in
@taskdecorator - Directly pass
functionstoarq.Worker, not names.
0.0.3 (2020-02-25)¶
.delay()now returnsarq_redis.enqueue_jobresult (Optional[Job])- Add
py.typedfile - Fixed
add_cron_jobstyping
0.0.2 (2020-02-24)¶
- Add
add_cron_jobsmethod
0.0.1 (2020-02-21)¶
First release