bartender.filesystems.queue#
Attributes#
Custom type for file staging queue. |
|
Functions#
|
Download files of job from job's destination filesystem to local filesystem. |
|
|
|
Create file staging queue and inject in to app state. |
|
Create file staging queue and single worker task. |
|
Stop file staging queue and its task consumer. |
|
Stop file staging queue and its task consumer. |
|
Retrieve file staging queue. |
Module Contents#
- bartender.filesystems.queue.FileStagingQueue#
Custom type for file staging queue.
The item argument in queue.put(item) method should be an async function without any arguments.
- async bartender.filesystems.queue.perform_download(job_root_dir: pathlib.Path, job_id: int, filesystem: bartender.filesystems.abstract.AbstractFileSystem, job_dao: bartender.db.dao.job_dao.JobDAO, state: bartender.db.models.job_model.State) None#
Download files of job from job’s destination filesystem to local filesystem.
When completed the state of the job is set to the given state.
- Parameters:
job_root_dir (pathlib.Path) – Local job root directory.
job_id (int) – Job identifier to download files for.
filesystem (bartender.filesystems.abstract.AbstractFileSystem) – Filesystem to download files from.
job_dao (bartender.db.dao.job_dao.JobDAO) – Object to update jobs in database.
state (bartender.db.models.job_model.State) – The new state, most likely retrieved from a scheduler.
- Raises:
FileNotFoundError – If the job directory does not exist in supplied filesystem.
- Return type:
None
- async bartender.filesystems.queue._file_staging_worker(queue: FileStagingQueue, job_root_dir: pathlib.Path, destinations: dict[str, bartender.destinations.Destination], factory: sqlalchemy.ext.asyncio.async_sessionmaker[sqlalchemy.ext.asyncio.AsyncSession]) None#
- Parameters:
queue (FileStagingQueue) –
job_root_dir (pathlib.Path) –
destinations (dict[str, bartender.destinations.Destination]) –
factory (sqlalchemy.ext.asyncio.async_sessionmaker[sqlalchemy.ext.asyncio.AsyncSession]) –
- Return type:
None
- bartender.filesystems.queue.setup_file_staging_queue(app: fastapi.FastAPI) None#
Create file staging queue and inject in to app state.
- Parameters:
app (fastapi.FastAPI) – FastAPI application.
- Return type:
None
- bartender.filesystems.queue.build_file_staging_queue(job_root_dir: pathlib.Path, destinations: dict[str, bartender.destinations.Destination], factory: sqlalchemy.ext.asyncio.async_sessionmaker[sqlalchemy.ext.asyncio.AsyncSession]) tuple[FileStagingQueue, asyncio.Task[None]]#
Create file staging queue and single worker task.
- Parameters:
job_root_dir (pathlib.Path) – Job root directory.
destinations (dict[str, bartender.destinations.Destination]) – Job destination dictionary.
factory (sqlalchemy.ext.asyncio.async_sessionmaker[sqlalchemy.ext.asyncio.AsyncSession]) – Database session factory.
- Returns:
Tuple with the queue and the task.
- Return type:
tuple[FileStagingQueue, asyncio.Task[None]]
- async bartender.filesystems.queue.stop_file_staging_queue(task: asyncio.Task[None]) None#
Stop file staging queue and its task consumer.
- Parameters:
task (asyncio.Task[None]) – Task to cancel and wait for.
- Return type:
None
- async bartender.filesystems.queue.teardown_file_staging_queue(app: fastapi.FastAPI) None#
Stop file staging queue and its task consumer.
- Parameters:
app (fastapi.FastAPI) – fastAPI application.
- Return type:
None
- bartender.filesystems.queue.get_file_staging_queue(request: fastapi.Request) FileStagingQueue#
Retrieve file staging queue.
- Parameters:
request (fastapi.Request) – The request injected by FastAPI.
- Returns:
queue for downloading/uploading files from/to remote filesystems.
- Return type:
FileStagingQueue
Can be used in dependency injection in a FastAPI route. Requires
setup_file_staging_queue()andteardown_file_staging_queue()to be added to FastAPI startup and shutdown events.Example
To make route which returns number of jobs that are waiting for their files to be staged out.
from fastapi import APIRouter from bartender.filesystems.queue import CurrentFileOutStagingQueue router = APIRouter() @router.get("/staging-queue-size") async def file_staging_queue_size( file_staging_queue: CurrentFileOutStagingQueue ): return file_staging_queue.qsize()
- bartender.filesystems.queue.CurrentFileOutStagingQueue#