Data Components¶
Data processing functions for the AutoRAG pipeline.
Discovery¶
documents_discovery ¶
Classes¶
DiscoveryResult dataclass ¶
DiscoveryResult(bucket: str, prefix: str, documents: list[DocumentDescriptor], total_size_bytes: int, count: int)
Outcome of a document discovery run.
Attributes:
-
bucket(str) –S3 bucket name.
-
prefix(str) –S3 key prefix used during listing.
-
documents(list[DocumentDescriptor]) –Discovered (and optionally sampled) documents.
-
total_size_bytes(int) –Combined size of all discovered documents.
-
count(int) –Number of discovered documents.
Functions¶
to_dict ¶
Serialise the result to a JSON-compatible dictionary.
Source code in ai4rag/components/data/documents_discovery.py
save ¶
Write documents_descriptor.json into the given directory.
Parameters:
-
path(str | Path) –Directory where the descriptor file will be created. The directory is created if it does not exist.
-
filename(str, default:DOCUMENTS_DESCRIPTOR_FILENAME) –Name of the file to be used within the output directory.
Source code in ai4rag/components/data/documents_discovery.py
DocumentDescriptor dataclass ¶
Metadata for a single document discovered in an S3 bucket.
Attributes:
-
key(str) –Full S3 object key.
-
size_bytes(int) –Object size in bytes.
Functions¶
discover_documents ¶
discover_documents(
bucket_name: str,
prefix: str = "",
test_data_doc_names: list[str] | None = None,
sampling_enabled: bool = True,
sampling_max_size_gb: float = SAMPLING_MAX_SIZE_GB,
supported_extensions: set[str] | None = None,
s3_client: Any | None = None,
) -> DiscoveryResult
Discover documents in an S3-compatible bucket and optionally sample them.
Lists objects under bucket_name/prefix, filters by file extension, and applies size-based sampling when enabled. Documents referenced by test_data_doc_names are prioritised during sampling so that benchmark-relevant files are always included when the budget permits.
Parameters:
-
bucket_name(str) –S3-compatible bucket name.
-
prefix(str, default:"") –Object-key prefix to narrow the listing.
-
test_data_doc_names(list[str] | None, default:None) –Filenames (stem + extension, no path) of documents referenced by the benchmark test data. These are sorted first so that sampling picks them before other files.
-
sampling_enabled(bool, default:True) –When
True, only documents up to sampling_max_size_gb total are returned. -
sampling_max_size_gb(float, default:1.0) –Maximum cumulative size (in gigabytes) when sampling is enabled.
-
supported_extensions(set[str] | None, default:None) –File extensions to accept. Defaults to
{".pdf", ".docx", ".pptx", ".md", ".html", ".txt"}. -
s3_client(Any | None, default:None) –Pre-configured
boto3S3 client. WhenNone, one is created via :func:ai4rag.components._s3.create_s3_client.
Returns:
-
DiscoveryResult–Discovery outcome with document metadata.
Raises:
-
RuntimeError–If no supported documents are found in the bucket.
-
ValueError–If sampling produces an empty selection.
Source code in ai4rag/components/data/documents_discovery.py
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 | |
Text Extraction¶
text_extraction ¶
Classes¶
ExtractionResult dataclass ¶
Outcome of a text extraction run.
Attributes:
-
processed_count(int) –Number of documents successfully extracted.
-
total_documents(int) –Total number of input documents.
-
error_count(int) –Number of documents that failed during download or extraction.
Functions¶
extract_text ¶
extract_text(
documents: list[dict],
bucket: str,
output_dir: str | Path,
s3_endpoint: str | None = None,
s3_access_key: str | None = None,
s3_secret_key: str | None = None,
s3_region: str | None = None,
error_tolerance: float | None = None,
max_extraction_workers: int | None = None,
docling_artifacts_path: str | None = None,
) -> ExtractionResult
Download documents from S3 and extract text using Docling.
Each input document is downloaded from S3, converted to a :class:DoclingDocument via the Docling library, and persisted as a JSON file in output_dir. Conversion runs in a separate process pool (multiprocess library, "spawn" context) while downloads happen concurrently in a thread pool.
Parameters:
-
documents(list[dict]) –List of document descriptor dicts, each with at least a
"key"and"size_bytes"entry (as produced by :func:~ai4rag.components.data.documents_discovery.discover_documents). -
bucket(str) –S3-compatible bucket name.
-
output_dir(str | Path) –Local directory where DoclingDocument JSON files are written.
-
s3_endpoint(str | None, default:None) –S3-compatible endpoint URL. Falls back to
AWS_S3_ENDPOINT. -
s3_access_key(str | None, default:None) –AWS access key. Falls back to
AWS_ACCESS_KEY_ID. -
s3_secret_key(str | None, default:None) –AWS secret key. Falls back to
AWS_SECRET_ACCESS_KEY. -
s3_region(str | None, default:None) –AWS region. Falls back to
AWS_DEFAULT_REGION. -
error_tolerance(float | None, default:None) –Fraction of documents (0.0--1.0) allowed to fail.
Nonemeans zero tolerance. -
max_extraction_workers(int | None, default:None) –Number of parallel worker processes. Defaults to
min(max(1, cpu_count // 2), 8). -
docling_artifacts_path(str | None, default:None) –Path to pre-downloaded Docling model artifacts for offline use. Falls back to
DOCLING_ARTIFACTS_PATHenvironment variable.
Returns:
-
ExtractionResult–Summary of the extraction run.
Raises:
-
RuntimeError–If the error count exceeds the allowed tolerance.
Source code in ai4rag/components/data/text_extraction.py
60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 | |
Document Indexing¶
documents_indexing ¶
Functions¶
index_documents ¶
index_documents(
extracted_text_dir: str | Path,
embedding_model_id: str,
vector_io_provider_id: str,
ogx_client: OgxClient,
embedding_params: dict | None = None,
distance_metric: str = "cosine",
chunking_method: str = "recursive",
chunk_size: int = 1024,
chunk_overlap: int = 0,
batch_size: int = 20,
collection_name: str | None = None,
) -> int
Chunk, embed, and index extracted documents into a vector store.
Reads DoclingDocument JSON files from extracted_text_dir, splits them into chunks, computes embeddings via OGX, and inserts the resulting vectors into the configured vector store. Documents are processed in batches to bound memory consumption.
Parameters:
-
extracted_text_dir(str | Path) –Directory containing DoclingDocument JSON files produced by the text extraction stage.
-
embedding_model_id(str) –Identifier of the embedding model served by OGX.
-
vector_io_provider_id(str) –OGX provider identifier for the vector database backend.
-
ogx_client(OgxClient) –Pre-configured :class:
OgxClientinstance. -
embedding_params(dict | None, default:None) –Optional dictionary forwarded to :class:
OGXEmbeddingParams. -
distance_metric(str, default:'cosine') –Vector distance metric (
"cosine"or"euclidean"). -
chunking_method(str, default:'recursive') –Chunking strategy:
"recursive"(LangChain) or"hybrid"(Docling structure-aware). -
chunk_size(int, default:1024) –Maximum chunk size in tokens. Must be in the range 128--2048.
-
chunk_overlap(int, default:0) –Token overlap between consecutive chunks (only used with the
"recursive"method). -
batch_size(int, default:20) –Number of documents per processing batch.
0processes all documents in a single batch. -
collection_name(str | None, default:None) –Name of an existing vector-store collection to reuse. When
None, a new collection is created.
Returns:
-
int–Total number of chunks indexed.
Raises:
-
ValueError–If any of the validated parameters are out of range.
-
TypeError–If chunk_size or chunk_overlap have incorrect types.
Source code in ai4rag/components/data/documents_indexing.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | |
Test Data Loading¶
test_data_loader ¶
Classes¶
TestDataResult dataclass ¶
Outcome of loading (and optionally sampling) benchmark test data.
Attributes:
-
data(list[dict]) –Benchmark records, each containing question, correct_answers, and correct_answer_document_ids.
-
record_count(int) –Number of records in
data. -
sampled(bool) –Trueif the data was randomly sampled down.
TestDataLoaderError ¶
Bases: Exception
Raised when test data cannot be loaded or validated.
Functions¶
load_test_data ¶
load_test_data(
bucket_name: str, key: str, benchmark_sample_size: int = BENCHMARK_SAMPLE_SIZE, s3_client: Any | None = None
) -> TestDataResult
Download benchmark test data from S3 and optionally sample it.
Parameters:
-
bucket_name(str) –S3-compatible bucket containing the test data file.
-
key(str) –Full S3 object key to the JSON test data file.
-
benchmark_sample_size(int, default:25) –Maximum number of records to keep. When the dataset exceeds this limit a reproducible random sample is drawn (seed 42). Set to
0to disable sampling and keep all records. -
s3_client(Any | None, default:None) –Pre-configured
boto3S3 client. WhenNone, one is created via :func:ai4rag.components.s3.create_s3_client.
Returns:
-
TestDataResult–Loaded (and optionally sampled) benchmark data.
Raises:
-
FileNotFoundError–If the object does not exist in S3.
-
TestDataLoaderError–If the file is not valid JSON or the records have an unexpected structure.