Skip to content

Data Components

Data processing functions for the AutoRAG pipeline.

Discovery

documents_discovery

Classes

DiscoveryResult dataclass

DiscoveryResult(bucket: str, prefix: str, documents: list[DocumentDescriptor], total_size_bytes: int, count: int)

Outcome of a document discovery run.

Attributes:

  • bucket (str) –

    S3 bucket name.

  • prefix (str) –

    S3 key prefix used during listing.

  • documents (list[DocumentDescriptor]) –

    Discovered (and optionally sampled) documents.

  • total_size_bytes (int) –

    Combined size of all discovered documents.

  • count (int) –

    Number of discovered documents.

Functions
to_dict
to_dict() -> dict

Serialise the result to a JSON-compatible dictionary.

Source code in ai4rag/components/data/documents_discovery.py
def to_dict(self) -> dict:
    """Serialise the result to a JSON-compatible dictionary."""
    return {
        "bucket": self.bucket,
        "prefix": self.prefix,
        "documents": [{"key": d.key, "size_bytes": d.size_bytes} for d in self.documents],
        "total_size_bytes": self.total_size_bytes,
        "count": self.count,
    }
save
save(path: str | Path, filename: str = DOCUMENTS_DESCRIPTOR_FILENAME) -> None

Write documents_descriptor.json into the given directory.

Parameters:

  • path (str | Path) –

    Directory where the descriptor file will be created. The directory is created if it does not exist.

  • filename (str, default: DOCUMENTS_DESCRIPTOR_FILENAME ) –

    Name of the file to be used within the output directory.

Source code in ai4rag/components/data/documents_discovery.py
def save(self, path: str | Path, filename: str = DOCUMENTS_DESCRIPTOR_FILENAME) -> None:
    """Write ``documents_descriptor.json`` into the given directory.

    Parameters
    ----------
    path : str | Path
        Directory where the descriptor file will be created. The
        directory is created if it does not exist.
    filename : str
        Name of the file to be used within the output directory.
    """
    out_dir = Path(path)
    out_dir.mkdir(parents=True, exist_ok=True)
    descriptor_path = out_dir / filename
    with open(descriptor_path, "w", encoding="utf-8") as fh:
        json.dump(self.to_dict(), fh, indent=2)
    _logger.info("Documents descriptor written to %s", descriptor_path)

DocumentDescriptor dataclass

DocumentDescriptor(key: str, size_bytes: int)

Metadata for a single document discovered in an S3 bucket.

Attributes:

  • key (str) –

    Full S3 object key.

  • size_bytes (int) –

    Object size in bytes.

Functions

discover_documents

discover_documents(
    bucket_name: str,
    prefix: str = "",
    test_data_doc_names: list[str] | None = None,
    sampling_enabled: bool = True,
    sampling_max_size_gb: float = SAMPLING_MAX_SIZE_GB,
    supported_extensions: set[str] | None = None,
    s3_client: Any | None = None,
) -> DiscoveryResult

Discover documents in an S3-compatible bucket and optionally sample them.

Lists objects under bucket_name/prefix, filters by file extension, and applies size-based sampling when enabled. Documents referenced by test_data_doc_names are prioritised during sampling so that benchmark-relevant files are always included when the budget permits.

Parameters:

  • bucket_name (str) –

    S3-compatible bucket name.

  • prefix (str, default: "" ) –

    Object-key prefix to narrow the listing.

  • test_data_doc_names (list[str] | None, default: None ) –

    Filenames (stem + extension, no path) of documents referenced by the benchmark test data. These are sorted first so that sampling picks them before other files.

  • sampling_enabled (bool, default: True ) –

    When True, only documents up to sampling_max_size_gb total are returned.

  • sampling_max_size_gb (float, default: 1.0 ) –

    Maximum cumulative size (in gigabytes) when sampling is enabled.

  • supported_extensions (set[str] | None, default: None ) –

    File extensions to accept. Defaults to {".pdf", ".docx", ".pptx", ".md", ".html", ".txt"}.

  • s3_client (Any | None, default: None ) –

    Pre-configured boto3 S3 client. When None, one is created via :func:ai4rag.components._s3.create_s3_client.

Returns:

Raises:

  • RuntimeError

    If no supported documents are found in the bucket.

  • ValueError

    If sampling produces an empty selection.

Source code in ai4rag/components/data/documents_discovery.py
def discover_documents(  # pylint: disable=too-many-locals
    bucket_name: str,
    prefix: str = "",
    test_data_doc_names: list[str] | None = None,
    sampling_enabled: bool = True,
    sampling_max_size_gb: float = SAMPLING_MAX_SIZE_GB,
    supported_extensions: set[str] | None = None,
    s3_client: Any | None = None,
) -> DiscoveryResult:
    """Discover documents in an S3-compatible bucket and optionally sample them.

    Lists objects under *bucket_name*/*prefix*, filters by file extension,
    and applies size-based sampling when enabled.  Documents referenced by
    ``test_data_doc_names`` are prioritised during sampling so that
    benchmark-relevant files are always included when the budget permits.

    Parameters
    ----------
    bucket_name : str
        S3-compatible bucket name.
    prefix : str, default=""
        Object-key prefix to narrow the listing.
    test_data_doc_names : list[str] | None, default=None
        Filenames (stem + extension, no path) of documents referenced by
        the benchmark test data.  These are sorted first so that sampling
        picks them before other files.
    sampling_enabled : bool, default=True
        When ``True``, only documents up to *sampling_max_size_gb* total
        are returned.
    sampling_max_size_gb : float, default=1.0
        Maximum cumulative size (in gigabytes) when sampling is enabled.
    supported_extensions : set[str] | None, default=None
        File extensions to accept.  Defaults to
        ``{".pdf", ".docx", ".pptx", ".md", ".html", ".txt"}``.
    s3_client : Any | None, default=None
        Pre-configured ``boto3`` S3 client.  When ``None``, one is created
        via :func:`ai4rag.components._s3.create_s3_client`.

    Returns
    -------
    DiscoveryResult
        Discovery outcome with document metadata.

    Raises
    ------
    RuntimeError
        If no supported documents are found in the bucket.
    ValueError
        If sampling produces an empty selection.
    """
    if supported_extensions is None:
        supported_extensions = set(SUPPORTED_EXTENSIONS)

    ext_tuple = tuple(supported_extensions)
    max_size_bytes = float(sampling_max_size_gb) * 1024**3 if sampling_enabled else float(inf)

    if s3_client is None:
        s3_client, contents = _list_objects_with_ssl_fallback(bucket_name, prefix)
    else:
        contents = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix).get("Contents", [])
    supported_files = [c for c in contents if c["Key"].endswith(ext_tuple)]

    if not supported_files:
        raise RuntimeError("No supported documents found.")

    if test_data_doc_names:
        test_names_set = set(test_data_doc_names)
        test_keys = {c["Key"] for c in supported_files if Path(c["Key"]).name in test_names_set}
        supported_files.sort(key=lambda c: c["Key"] not in test_keys)

    total_size = 0
    selected: list[DocumentDescriptor] = []
    for file_info in supported_files:
        size = file_info["Size"]
        if total_size + size > max_size_bytes:
            continue
        selected.append(DocumentDescriptor(key=file_info["Key"], size_bytes=size))
        total_size += size

    if not selected:
        raise ValueError(
            "No documents to process. Check that the bucket/prefix is correct and contains supported files."
        )

    result = DiscoveryResult(
        bucket=bucket_name,
        prefix=prefix,
        documents=selected,
        total_size_bytes=total_size,
        count=len(selected),
    )
    _logger.info("Discovered %d document(s), total size %d bytes", result.count, result.total_size_bytes)
    return result

Text Extraction

text_extraction

Classes

ExtractionResult dataclass

ExtractionResult(processed_count: int, total_documents: int, error_count: int)

Outcome of a text extraction run.

Attributes:

  • processed_count (int) –

    Number of documents successfully extracted.

  • total_documents (int) –

    Total number of input documents.

  • error_count (int) –

    Number of documents that failed during download or extraction.

Functions

extract_text

extract_text(
    documents: list[dict],
    bucket: str,
    output_dir: str | Path,
    s3_endpoint: str | None = None,
    s3_access_key: str | None = None,
    s3_secret_key: str | None = None,
    s3_region: str | None = None,
    error_tolerance: float | None = None,
    max_extraction_workers: int | None = None,
    docling_artifacts_path: str | None = None,
) -> ExtractionResult

Download documents from S3 and extract text using Docling.

Each input document is downloaded from S3, converted to a :class:DoclingDocument via the Docling library, and persisted as a JSON file in output_dir. Conversion runs in a separate process pool (multiprocess library, "spawn" context) while downloads happen concurrently in a thread pool.

Parameters:

  • documents (list[dict]) –

    List of document descriptor dicts, each with at least a "key" and "size_bytes" entry (as produced by :func:~ai4rag.components.data.documents_discovery.discover_documents).

  • bucket (str) –

    S3-compatible bucket name.

  • output_dir (str | Path) –

    Local directory where DoclingDocument JSON files are written.

  • s3_endpoint (str | None, default: None ) –

    S3-compatible endpoint URL. Falls back to AWS_S3_ENDPOINT.

  • s3_access_key (str | None, default: None ) –

    AWS access key. Falls back to AWS_ACCESS_KEY_ID.

  • s3_secret_key (str | None, default: None ) –

    AWS secret key. Falls back to AWS_SECRET_ACCESS_KEY.

  • s3_region (str | None, default: None ) –

    AWS region. Falls back to AWS_DEFAULT_REGION.

  • error_tolerance (float | None, default: None ) –

    Fraction of documents (0.0--1.0) allowed to fail. None means zero tolerance.

  • max_extraction_workers (int | None, default: None ) –

    Number of parallel worker processes. Defaults to min(max(1, cpu_count // 2), 8).

  • docling_artifacts_path (str | None, default: None ) –

    Path to pre-downloaded Docling model artifacts for offline use. Falls back to DOCLING_ARTIFACTS_PATH environment variable.

Returns:

Raises:

  • RuntimeError

    If the error count exceeds the allowed tolerance.

Source code in ai4rag/components/data/text_extraction.py
def extract_text(  # pylint: disable=too-many-locals,too-many-arguments,too-many-positional-arguments
    documents: list[dict],
    bucket: str,
    output_dir: str | Path,
    s3_endpoint: str | None = None,
    s3_access_key: str | None = None,
    s3_secret_key: str | None = None,
    s3_region: str | None = None,
    error_tolerance: float | None = None,
    max_extraction_workers: int | None = None,
    docling_artifacts_path: str | None = None,
) -> ExtractionResult:
    """Download documents from S3 and extract text using Docling.

    Each input document is downloaded from S3, converted to a
    :class:`DoclingDocument` via the Docling library, and persisted as a
    JSON file in *output_dir*.  Conversion runs in a separate process pool
    (``multiprocess`` library, ``"spawn"`` context) while downloads happen
    concurrently in a thread pool.

    Parameters
    ----------
    documents
        List of document descriptor dicts, each with at least a ``"key"``
        and ``"size_bytes"`` entry (as produced by
        :func:`~ai4rag.components.data.documents_discovery.discover_documents`).
    bucket
        S3-compatible bucket name.
    output_dir
        Local directory where DoclingDocument JSON files are written.
    s3_endpoint
        S3-compatible endpoint URL.  Falls back to ``AWS_S3_ENDPOINT``.
    s3_access_key
        AWS access key.  Falls back to ``AWS_ACCESS_KEY_ID``.
    s3_secret_key
        AWS secret key.  Falls back to ``AWS_SECRET_ACCESS_KEY``.
    s3_region
        AWS region.  Falls back to ``AWS_DEFAULT_REGION``.
    error_tolerance
        Fraction of documents (0.0--1.0) allowed to fail.  ``None`` means
        zero tolerance.
    max_extraction_workers
        Number of parallel worker processes.  Defaults to
        ``min(max(1, cpu_count // 2), 8)``.
    docling_artifacts_path
        Path to pre-downloaded Docling model artifacts for offline use.
        Falls back to ``DOCLING_ARTIFACTS_PATH`` environment variable.

    Returns
    -------
    ExtractionResult
        Summary of the extraction run.

    Raises
    ------
    RuntimeError
        If the error count exceeds the allowed tolerance.
    """
    import tempfile

    import multiprocess as multiprocessing

    out_dir = Path(output_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    if not documents:
        _logger.info("No documents to process.")
        return ExtractionResult(processed_count=0, total_documents=0, error_count=0)

    s3_creds = _resolve_s3_credentials(s3_endpoint, s3_access_key, s3_secret_key, s3_region)
    artifacts_path = _resolve_artifacts_path(docling_artifacts_path)

    documents = sorted(documents, key=lambda d: d.get("size_bytes", 0), reverse=True)

    effective_workers = _effective_worker_count(max_extraction_workers)
    _logger.info(
        "Starting text extraction for %d documents. extraction_workers=%d, download_threads=%d.",
        len(documents),
        effective_workers,
        DOWNLOAD_MAX_THREADS,
    )

    if artifacts_path is not None:
        os.environ.setdefault("HF_HUB_OFFLINE", "1")

    mp_context = multiprocessing.get_context("spawn")  # pylint: disable=no-member
    with (
        tempfile.TemporaryDirectory() as download_dir,
        mp_context.Pool(
            processes=effective_workers,
            initializer=_text_extraction_pool_initializer,
        ) as process_pool,
    ):
        download_start = time.perf_counter()
        extraction_tasks, download_errors = _download_and_submit(
            docs=documents,
            bucket=bucket,
            download_path=Path(download_dir),
            process_pool=process_pool,
            out_dir=out_dir,
            s3_creds=s3_creds,
        )
        _logger.info(
            "Downloads finished in %.1fs; %d file(s) queued for extraction, %d download error(s).",
            time.perf_counter() - download_start,
            len(extraction_tasks),
            len(download_errors),
        )
        _raise_if_threshold_exceeded(download_errors, len(documents), error_tolerance)

        extraction_errors: list[dict] = []
        processed_count = 0
        pending = list(extraction_tasks)
        completed = 0

        while pending:
            still_pending = []
            for file_path, task in pending:
                if task.ready():
                    completed += 1
                    try:
                        success, tb = task.get()
                    except Exception:
                        tb = traceback.format_exc()
                        _logger.error("Worker crashed for %s:\n%s", file_path, tb)
                        success = False
                    Path(file_path).unlink(missing_ok=True)
                    if success:
                        processed_count += 1
                    else:
                        extraction_errors.append({"file": file_path, "traceback": tb})
                    _logger.info("Extraction progress %d/%d", completed, len(extraction_tasks))
                else:
                    still_pending.append((file_path, task))
            pending = still_pending
            if pending:
                time.sleep(0.01)

    all_errors = download_errors + extraction_errors
    total_errors = len(all_errors)
    _logger.info(
        "Text extraction completed. Total processed: %d/%d, Errors: %d",
        processed_count,
        len(documents),
        total_errors,
    )
    _raise_if_threshold_exceeded(
        error_details=all_errors,
        total_docs=len(documents),
        tolerance=error_tolerance,
    )

    return ExtractionResult(
        processed_count=processed_count,
        total_documents=len(documents),
        error_count=total_errors,
    )

Document Indexing

documents_indexing

Functions

index_documents

index_documents(
    extracted_text_dir: str | Path,
    embedding_model_id: str,
    vector_io_provider_id: str,
    ogx_client: OgxClient,
    embedding_params: dict | None = None,
    distance_metric: str = "cosine",
    chunking_method: str = "recursive",
    chunk_size: int = 1024,
    chunk_overlap: int = 0,
    batch_size: int = 20,
    collection_name: str | None = None,
) -> int

Chunk, embed, and index extracted documents into a vector store.

Reads DoclingDocument JSON files from extracted_text_dir, splits them into chunks, computes embeddings via OGX, and inserts the resulting vectors into the configured vector store. Documents are processed in batches to bound memory consumption.

Parameters:

  • extracted_text_dir (str | Path) –

    Directory containing DoclingDocument JSON files produced by the text extraction stage.

  • embedding_model_id (str) –

    Identifier of the embedding model served by OGX.

  • vector_io_provider_id (str) –

    OGX provider identifier for the vector database backend.

  • ogx_client (OgxClient) –

    Pre-configured :class:OgxClient instance.

  • embedding_params (dict | None, default: None ) –

    Optional dictionary forwarded to :class:OGXEmbeddingParams.

  • distance_metric (str, default: 'cosine' ) –

    Vector distance metric ("cosine" or "euclidean").

  • chunking_method (str, default: 'recursive' ) –

    Chunking strategy: "recursive" (LangChain) or "hybrid" (Docling structure-aware).

  • chunk_size (int, default: 1024 ) –

    Maximum chunk size in tokens. Must be in the range 128--2048.

  • chunk_overlap (int, default: 0 ) –

    Token overlap between consecutive chunks (only used with the "recursive" method).

  • batch_size (int, default: 20 ) –

    Number of documents per processing batch. 0 processes all documents in a single batch.

  • collection_name (str | None, default: None ) –

    Name of an existing vector-store collection to reuse. When None, a new collection is created.

Returns:

  • int

    Total number of chunks indexed.

Raises:

  • ValueError

    If any of the validated parameters are out of range.

  • TypeError

    If chunk_size or chunk_overlap have incorrect types.

Source code in ai4rag/components/data/documents_indexing.py
def index_documents(  # pylint: disable=too-many-locals,too-many-arguments,too-many-positional-arguments
    extracted_text_dir: str | Path,
    embedding_model_id: str,
    vector_io_provider_id: str,
    ogx_client: OgxClient,
    embedding_params: dict | None = None,
    distance_metric: str = "cosine",
    chunking_method: str = "recursive",
    chunk_size: int = 1024,
    chunk_overlap: int = 0,
    batch_size: int = 20,
    collection_name: str | None = None,
) -> int:
    """Chunk, embed, and index extracted documents into a vector store.

    Reads DoclingDocument JSON files from *extracted_text_dir*, splits them
    into chunks, computes embeddings via OGX, and inserts the resulting
    vectors into the configured vector store.  Documents are processed in
    batches to bound memory consumption.

    Parameters
    ----------
    extracted_text_dir
        Directory containing DoclingDocument JSON files produced by the
        text extraction stage.
    embedding_model_id
        Identifier of the embedding model served by OGX.
    vector_io_provider_id
        OGX provider identifier for the vector database backend.
    ogx_client
        Pre-configured :class:`OgxClient` instance.
    embedding_params
        Optional dictionary forwarded to :class:`OGXEmbeddingParams`.
    distance_metric
        Vector distance metric (``"cosine"`` or ``"euclidean"``).
    chunking_method
        Chunking strategy: ``"recursive"`` (LangChain) or ``"hybrid"``
        (Docling structure-aware).
    chunk_size
        Maximum chunk size in tokens.  Must be in the range 128--2048.
    chunk_overlap
        Token overlap between consecutive chunks (only used with the
        ``"recursive"`` method).
    batch_size
        Number of documents per processing batch.  ``0`` processes all
        documents in a single batch.
    collection_name
        Name of an existing vector-store collection to reuse.  When
        ``None``, a new collection is created.

    Returns
    -------
    int
        Total number of chunks indexed.

    Raises
    ------
    ValueError
        If any of the validated parameters are out of range.
    TypeError
        If *chunk_size* or *chunk_overlap* have incorrect types.
    """
    _validate_inputs(
        embedding_model_id, vector_io_provider_id, distance_metric, chunking_method, chunk_size, chunk_overlap
    )

    params = OGXEmbeddingParams(**(embedding_params or {}))

    base = Path(extracted_text_dir)
    paths = sorted(p for p in base.iterdir() if p.is_file() and p.suffix.lower() == ".json")
    total_documents = len(paths)
    _logger.info("Found %d documents to index", total_documents)

    if total_documents == 0:
        _logger.warning("No documents found in %s", extracted_text_dir)
        return 0

    chunker = _create_chunker(chunking_method, chunk_size, chunk_overlap)
    embedding_model = OGXEmbeddingModel(client=ogx_client, model_id=embedding_model_id, params=params)

    collection_kwargs = {"reuse_collection_name": collection_name} if collection_name is not None else {}
    ogx_vectorstore = OGXVectorStore(
        embedding_model=embedding_model,
        client=ogx_client,
        provider_id=vector_io_provider_id,
        distance_metric=distance_metric,
        **collection_kwargs,
    )

    effective_batch_size = batch_size if batch_size > 0 else total_documents
    total_chunks = 0
    num_batches = (total_documents + effective_batch_size - 1) // effective_batch_size

    for start in range(0, total_documents, effective_batch_size):
        batch_paths = paths[start : start + effective_batch_size]
        batch_documents = [DoclingDocument.load_from_json(p) for p in batch_paths]
        batch_chunks = chunker.split_documents(batch_documents)
        ogx_vectorstore.add_documents(batch_chunks)
        total_chunks += len(batch_chunks)
        batch_num = start // effective_batch_size + 1
        _logger.info(
            "Batch %d/%d: indexed %d documents (%d chunks), total chunks so far: %d",
            batch_num,
            num_batches,
            len(batch_documents),
            len(batch_chunks),
            total_chunks,
        )

    _logger.info("Documents indexing finished: %d documents, %d chunks", total_documents, total_chunks)
    return total_chunks

Test Data Loading

test_data_loader

Classes

TestDataResult dataclass

TestDataResult(data: list[dict], record_count: int, sampled: bool)

Outcome of loading (and optionally sampling) benchmark test data.

Attributes:

  • data (list[dict]) –

    Benchmark records, each containing question, correct_answers, and correct_answer_document_ids.

  • record_count (int) –

    Number of records in data.

  • sampled (bool) –

    True if the data was randomly sampled down.

TestDataLoaderError

Bases: Exception

Raised when test data cannot be loaded or validated.

Functions

load_test_data

load_test_data(
    bucket_name: str, key: str, benchmark_sample_size: int = BENCHMARK_SAMPLE_SIZE, s3_client: Any | None = None
) -> TestDataResult

Download benchmark test data from S3 and optionally sample it.

Parameters:

  • bucket_name (str) –

    S3-compatible bucket containing the test data file.

  • key (str) –

    Full S3 object key to the JSON test data file.

  • benchmark_sample_size (int, default: 25 ) –

    Maximum number of records to keep. When the dataset exceeds this limit a reproducible random sample is drawn (seed 42). Set to 0 to disable sampling and keep all records.

  • s3_client (Any | None, default: None ) –

    Pre-configured boto3 S3 client. When None, one is created via :func:ai4rag.components.s3.create_s3_client.

Returns:

Raises:

  • FileNotFoundError

    If the object does not exist in S3.

  • TestDataLoaderError

    If the file is not valid JSON or the records have an unexpected structure.

Source code in ai4rag/components/data/test_data_loader.py
def load_test_data(
    bucket_name: str,
    key: str,
    benchmark_sample_size: int = BENCHMARK_SAMPLE_SIZE,
    s3_client: Any | None = None,
) -> TestDataResult:
    """Download benchmark test data from S3 and optionally sample it.

    Parameters
    ----------
    bucket_name : str
        S3-compatible bucket containing the test data file.
    key : str
        Full S3 object key to the JSON test data file.
    benchmark_sample_size : int, default=25
        Maximum number of records to keep.  When the dataset exceeds this
        limit a reproducible random sample is drawn (seed 42).  Set to
        ``0`` to disable sampling and keep all records.
    s3_client : Any | None, default=None
        Pre-configured ``boto3`` S3 client.  When ``None``, one is created
        via :func:`ai4rag.components.s3.create_s3_client`.

    Returns
    -------
    TestDataResult
        Loaded (and optionally sampled) benchmark data.

    Raises
    ------
    FileNotFoundError
        If the object does not exist in S3.
    TestDataLoaderError
        If the file is not valid JSON or the records have an unexpected
        structure.
    """
    if not bucket_name:
        raise TypeError("bucket_name must be a non-empty string")

    if s3_client is None:
        s3_client = _make_s3_client_with_ssl_fallback(bucket_name, key)

    raw_data = _download_object(s3_client, bucket_name, key)
    benchmark_data = _parse_and_validate(raw_data)

    sampled = False
    if 0 < benchmark_sample_size < len(benchmark_data):
        original_count = len(benchmark_data)
        rng = random.Random(42)
        benchmark_data = rng.sample(benchmark_data, benchmark_sample_size)
        sampled = True
        _logger.info("Sampled %d records from %d total.", benchmark_sample_size, original_count)
    else:
        _logger.info("No sampling applied; record count: %d.", len(benchmark_data))

    return TestDataResult(data=benchmark_data, record_count=len(benchmark_data), sampled=sampled)