bartender.filesystems.queue#

Attributes#

FileStagingQueue

Custom type for file staging queue.

CurrentFileOutStagingQueue

Functions#

perform_download(→ None)

Download files of job from job's destination filesystem to local filesystem.

_file_staging_worker(→ None)

setup_file_staging_queue(→ None)

Create file staging queue and inject in to app state.

build_file_staging_queue(→ tuple[FileStagingQueue, ...)

Create file staging queue and single worker task.

stop_file_staging_queue(→ None)

Stop file staging queue and its task consumer.

teardown_file_staging_queue(→ None)

Stop file staging queue and its task consumer.

get_file_staging_queue(→ FileStagingQueue)

Retrieve file staging queue.

Module Contents#

bartender.filesystems.queue.FileStagingQueue#

Custom type for file staging queue.

The item argument in queue.put(item) method should be an async function without any arguments.

async bartender.filesystems.queue.perform_download(job_root_dir: pathlib.Path, job_id: int, filesystem: bartender.filesystems.abstract.AbstractFileSystem, job_dao: bartender.db.dao.job_dao.JobDAO, state: bartender.db.models.job_model.State) None#

Download files of job from job’s destination filesystem to local filesystem.

When completed the state of the job is set to the given state.

Parameters:
Raises:

FileNotFoundError – If the job directory does not exist in supplied filesystem.

Return type:

None

async bartender.filesystems.queue._file_staging_worker(queue: FileStagingQueue, job_root_dir: pathlib.Path, destinations: dict[str, bartender.destinations.Destination], factory: sqlalchemy.ext.asyncio.async_sessionmaker[sqlalchemy.ext.asyncio.AsyncSession]) None#
Parameters:
Return type:

None

bartender.filesystems.queue.setup_file_staging_queue(app: fastapi.FastAPI) None#

Create file staging queue and inject in to app state.

Parameters:

app (fastapi.FastAPI) – FastAPI application.

Return type:

None

bartender.filesystems.queue.build_file_staging_queue(job_root_dir: pathlib.Path, destinations: dict[str, bartender.destinations.Destination], factory: sqlalchemy.ext.asyncio.async_sessionmaker[sqlalchemy.ext.asyncio.AsyncSession]) tuple[FileStagingQueue, asyncio.Task[None]]#

Create file staging queue and single worker task.

Parameters:
Returns:

Tuple with the queue and the task.

Return type:

tuple[FileStagingQueue, asyncio.Task[None]]

async bartender.filesystems.queue.stop_file_staging_queue(task: asyncio.Task[None]) None#

Stop file staging queue and its task consumer.

Parameters:

task (asyncio.Task[None]) – Task to cancel and wait for.

Return type:

None

async bartender.filesystems.queue.teardown_file_staging_queue(app: fastapi.FastAPI) None#

Stop file staging queue and its task consumer.

Parameters:

app (fastapi.FastAPI) – fastAPI application.

Return type:

None

bartender.filesystems.queue.get_file_staging_queue(request: fastapi.Request) FileStagingQueue#

Retrieve file staging queue.

Parameters:

request (fastapi.Request) – The request injected by FastAPI.

Returns:

queue for downloading/uploading files from/to remote filesystems.

Return type:

FileStagingQueue

Can be used in dependency injection in a FastAPI route. Requires setup_file_staging_queue() and teardown_file_staging_queue() to be added to FastAPI startup and shutdown events.

Example

To make route which returns number of jobs that are waiting for their files to be staged out.

from fastapi import APIRouter
from bartender.filesystems.queue import CurrentFileOutStagingQueue

router = APIRouter()

@router.get("/staging-queue-size")
async def file_staging_queue_size(
    file_staging_queue: CurrentFileOutStagingQueue
):
    return file_staging_queue.qsize()
bartender.filesystems.queue.CurrentFileOutStagingQueue#