bartender.schedulers.arq#

Exceptions#

JobFailureError

Error during job running.

Classes#

ArqSchedulerConfig

Configuration for ArqScheduler.

ArqScheduler

Arq scheduler.

Functions#

_map_arq_status(→ bartender.db.models.job_model.State)

_exec(→ None)

arq_worker(→ arq.Worker)

Worker that runs jobs submitted to arq queue.

run_workers(→ None)

Run worker for each arq scheduler config.

Module Contents#

bartender.schedulers.arq._map_arq_status(arq_status: arq.jobs.JobStatus, success: bool) bartender.db.models.job_model.State#
Parameters:
  • arq_status (arq.jobs.JobStatus) –

  • success (bool) –

Return type:

bartender.db.models.job_model.State

class bartender.schedulers.arq.ArqSchedulerConfig#

Bases: pydantic.BaseModel

Configuration for ArqScheduler.

type: Literal['arq'] = 'arq'#
redis_dsn: pydantic.RedisDsn#
queue: str = 'arq:queue'#
max_jobs: pydantic.types.PositiveInt = 10#

Maximum number of jobs to run at a time inside a single worker.

job_timeout: pydantic.types.PositiveInt | datetime.timedelta = 3600#

Maximum job run time.

Default is one hour.

In seconds or string in ISO 8601 duration format.

For example, “PT12H” represents a max runtime of “twelve hours”.

property redis_settings: arq.connections.RedisSettings#

Settings for arq.

Returns:

The settings based on redis_dsn.

Return type:

arq.connections.RedisSettings

class bartender.schedulers.arq.ArqScheduler(config: ArqSchedulerConfig)#

Bases: bartender.schedulers.abstract.AbstractScheduler

Arq scheduler.

See https://arq-docs.helpmanual.io/.

Parameters:

config (ArqSchedulerConfig) –

config: ArqSchedulerConfig#
connection: arq.ArqRedis | None = None#
async close() None#

Cancel all runnning jobs and make scheduler unable to work.

Return type:

None

async submit(description: bartender.schedulers.abstract.JobDescription) str#

Submit a job description for running.

Parameters:

description (bartender.schedulers.abstract.JobDescription) – Description for a job.

Returns:

Identifier that can be used later to interact with job.

Raises:

JobSubmissionError – If job submission failed.

Return type:

str

async state(job_id: str) bartender.db.models.job_model.State#

Get state of a job.

Once job is completed, then scheduler can forget job.

Parameters:

job_id (str) – Identifier of job.

Returns:

State of job.

Return type:

bartender.db.models.job_model.State

async cancel(job_id: str) None#

Cancel a queued or running job.

Once a queued job is cancelled, then the scheduler can forget job.

Parameters:

job_id (str) – Identifier of job.

Return type:

None

__eq__(other: object) bool#
Parameters:

other (object) –

Return type:

bool

__repr__() str#
Return type:

str

async _pool() arq.ArqRedis#
Return type:

arq.ArqRedis

exception bartender.schedulers.arq.JobFailureError#

Bases: Exception

Error during job running.

async bartender.schedulers.arq._exec(ctx: dict[Any, Any], description: bartender.schedulers.abstract.JobDescription) None#
Parameters:
Return type:

None

bartender.schedulers.arq.arq_worker(config: ArqSchedulerConfig, burst: bool = False) arq.Worker#

Worker that runs jobs submitted to arq queue.

Parameters:
  • config (ArqSchedulerConfig) – The config. Should be equal to the one used to submit job.

  • burst (bool) – Whether to stop the worker once all jobs have been run.

Returns:

A worker.

Return type:

arq.Worker

async bartender.schedulers.arq.run_workers(configs: list[ArqSchedulerConfig]) None#

Run worker for each arq scheduler config.

Parameters:

configs (list[ArqSchedulerConfig]) – The configs.

Return type:

None