<div style="background-color: #04D7FD; padding: 20px; text-align: left;">
    <h1 style="color: #000000; font-size: 36px; margin: 0;">Demo: Data Prep Kit</h1>
    
</div>


## Overview
Welcome to the demo notebook! Inside, you will find an end-to-end sample data pipeline designed for processing language datasets, beginning with a folder of PDF documents and culminating in a working Retrieval-Augmented Generation (RAG) system. This notebook provides the following transforms for processing the data. 

- [pdf2parquet](#item1)
- [Chunk documents](#item2)
- [Exact Dedup](#item3)
- [Doc_ID generation](#item4)
- [Fuzzy Dedup](#item5)
- [Language detection](#item6)
- [Doc quality](#item7)
- [Filtering](#item8)
- [Text encoder](#item9)

### Getting started

TBA

### Import Common python modules

In [1]:

import os
import sys

from data_processing_ray.runtime.ray import RayTransformLauncher
from data_processing.runtime.pure_python import PythonTransformLauncher
from data_processing.utils import ParamsUtils

### Set input/output path variables for the pipeline

In [2]:
# Example
# We can set input paths here
pdf_input_folder = "input_data"

if not os.path.exists(pdf_input_folder):
    print ("NO INPUT DATA")
    print ("Please set `pdf_input_folder` variable to path containing data")

# make sure the paths are correct
data_base_path = "output"

parquet_data_output = os.path.join(data_base_path, "01_parquet_input")

chunk_out =  os.path.join(data_base_path, "02_chunk_out")
ededup_out =  os.path.join(data_base_path, "03_ededup_out")
doc_id_out =  os.path.join(data_base_path, "04_doc_id_out")
fdedup_out = os.path.join(data_base_path, "05_fdedup_out")
lang_out =  os.path.join(data_base_path,"06_lang_out")
dq_out = os.path.join(data_base_path,"07_dq_out")

filter_out = os.path.join(data_base_path ,"08_filter_out")
encoder_out = os.path.join(data_base_path ,"09_encoder_out")

# Main repo root
from utils import rootdir

## <span style="color: green"> 1. Convert data to parquet using pdf2parquet [<-](#top)<a class="anchor" id="item1"></a>
_pdf_ to _parquet_ </span>

This step is reading the input folder containing all PDF files and ingest them in a parquet table using the [Docling package](https://github.com/DS4SD/docling).
The documents are converted into a JSON format which allows to easily chunk it in the later steps.



### Set Input/output Folder

In [3]:
# For this stage input folder contains the zip files, each zip file contains a github repo.

input_folder = pdf_input_folder
output_folder =  parquet_data_output

### Execute 

In [4]:
import ast
import os
import sys

from pdf2parquet_transform import (
    pdf2parquet_contents_type_cli_param,
    pdf2parquet_contents_types,
)
from pdf2parquet_transform_python import Pdf2ParquetPythonTransformConfiguration
from pdf2parquet_transform_ray import Pdf2ParquetRayTransformConfiguration

from data_processing.utils import GB, ParamsUtils


# create parameters
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus": 0.8, "memory": 2 * GB}
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
ingest_config = {
    pdf2parquet_contents_type_cli_param: pdf2parquet_contents_types.JSON,
}

params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    "data_files_to_use": ast.literal_eval("['.pdf']"),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    # "runtime_num_workers": 3,
    "runtime_pipeline_id": "pipeline_id",
    "runtime_job_id": "job_id",
    "runtime_code_location": ParamsUtils.convert_to_ast(code_location),
}


sys.argv = ParamsUtils.dict_to_req(d=(params | ingest_config))
# create launcher
launcher = RayTransformLauncher(Pdf2ParquetRayTransformConfiguration())
# launcher = PythonTransformLauncher(Pdf2ParquetPythonTransformConfiguration())
# launch
launcher.launch()

17:16:00 INFO - Running locally
17:16:00 INFO - pdf2parquet parameters are : {'artifacts_path': None, 'contents_type': <pdf2parquet_contents_types.JSON: 'application/json'>, 'do_table_structure': True, 'do_ocr': False}
17:16:00 INFO - data factory data_ is using local data access: input_folder - input_data output_folder - output/parquet_input
17:16:00 INFO - data factory data_ max_files -1, n_sample -1
17:16:00 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.pdf'], files to checkpoint ['.parquet']
17:16:00 INFO - pipeline id pipeline_id
17:16:00 INFO - code location {'github': 'github', 'commit_hash': '12345', 'path': 'path'}
17:16:00 INFO - number of workers 1 worker options {'num_cpus': 0.8, 'memory': 2147483648, 'max_restarts': -1}
17:16:00 INFO - actor creation delay 0
17:16:00 INFO - job details {'job category': 'preprocessing', 'job name': 'pdf2parquet', 'job type': 'ray', 'job id': 'job_id'}
2024-07-31 17:16:04

0

##  <span style="color: green">   2. Doc chunks [<-](#top)<a class="anchor" id="item2"></a> </span>

Split the documents in chunks, according to their layout segmentation.

### Set Input/output Folder

In [5]:
## For this stage the input is the folder containing parquet data which is output from the ingest2parquet tool

input_folder = parquet_data_output
output_folder = chunk_out

print(input_folder)
print(output_folder)

output/parquet_input
output/chunk_out


### Execute 

In [6]:
# Import doc_json_chunk transform configuration
from doc_chunk_transform_ray import DocChunkRayTransformConfiguration


# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus": 0.8}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": 3,
    # doc_chunk arguments
    # ...
}

# Pass the commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = RayTransformLauncher(DocChunkRayTransformConfiguration())
# launch
launcher.launch()

17:17:48 INFO - Running locally
17:17:48 INFO - doc_chunk parameters are : {'chunking_type': <chunking_types.DL_JSON: 'dl_json'>, 'content_column_name': 'contents', 'output_chunk_column_name': 'contents', 'output_jsonpath_column_name': 'doc_jsonpath', 'output_pageno_column_name': 'page_number', 'output_bbox_column_name': 'bbox'}
17:17:48 INFO - data factory data_ is using local data access: input_folder - output/parquet_input output_folder - output/chunk_out
17:17:48 INFO - data factory data_ max_files -1, n_sample -1
17:17:48 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
17:17:48 INFO - pipeline id pipeline_id
17:17:48 INFO - code location None
17:17:48 INFO - number of workers 3 worker options {'num_cpus': 0.8, 'max_restarts': -1}
17:17:48 INFO - actor creation delay 0
17:17:48 INFO - job details {'job category': 'preprocessing', 'job name': 'doc_chunk', 'job type': 'ray

0

##  <span style="color: green">   3. Exact Dedup [<-](#top)<a class="anchor" id="item3"></a> </span>

Remove documents having identical code to remove bias in the training data. On the content of each document, a SHA256 hash is computed,
followed by de-duplication of record having identical hashes.

### Set Input/output Folder

In [7]:
## For this stage the input is the folder containing parquet data which is output from the ingest2parquet tool

input_folder = chunk_out
output_folder = ededup_out

print(input_folder)
print(output_folder)

output/chunk_out
output/ededup_out


### Execute 

In [8]:
# Import ededup transform configuration
from ededup_transform_ray import EdedupRayTransformConfiguration


# Prepare the commandline params
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus": 0.8}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": 3,
    # ededup parameters
    "ededup_hash_cpu": 0.5,
    "ededup_num_hashes": 2,
    "ededup_doc_column": "contents",
}

# Pass the commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = RayTransformLauncher(EdedupRayTransformConfiguration())
# launch
launcher.launch()

17:18:05 INFO - Running locally
17:18:05 INFO - exact dedup params are {'doc_column': 'contents', 'hash_cpu': 0.5, 'num_hashes': 2}
17:18:05 INFO - data factory data_ is using local data access: input_folder - output/chunk_out output_folder - output/ededup_out
17:18:05 INFO - data factory data_ max_files -1, n_sample -1
17:18:05 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
17:18:05 INFO - pipeline id pipeline_id
17:18:05 INFO - code location None
17:18:05 INFO - number of workers 3 worker options {'num_cpus': 0.8, 'max_restarts': -1}
17:18:05 INFO - actor creation delay 0
17:18:05 INFO - job details {'job category': 'preprocessing', 'job name': 'ededup', 'job type': 'ray', 'job id': 'job_id'}
2024-07-31 17:18:07,184	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
[36m(orchestrate pid=98943)[0m 17:18:07 INFO -

0

## <span style="color: green">  4. DOC ID generation [<-](#top)<a class="anchor" id="item4"></a> </span>

This transform annotates documents with document "ids". It supports the following transformations of the original data:

 - Adding document hash: this enables the addition of a document hash-based id to the data. The hash is calculated with `hashlib.sha256(doc.encode("utf-8")).hexdigest()`. To enable this annotation, set hash_column to the name of the column, where you want to store it.
 - Adding integer document id: this allows the addition of an integer document id to the data that is unique across all rows in all tables provided to the transform() method. To enable this annotation, set int_id_column to the name of the column, where you want to store it. **This is a pre-requisite for fuzzy dedup** in the pipeline.

In [9]:
# Input for this stage is the output of exact dedeup component
# output of this component makes it possible for fdedup component to run on data.

input_folder = ededup_out
output_folder = doc_id_out

print(input_folder)
print(output_folder)


output/ededup_out
output/doc_id_out


In [10]:
from doc_id_transform_ray import DocIDRayTransformConfiguration
local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus": 0.8}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": 3,
    # doc id configuration
    "doc_id_doc_column": "contents",
    "doc_id_hash_column": "hash_column",
    "doc_id_int_column": "int_id_column",
}
sys.argv = ParamsUtils.dict_to_req(d=params)

# launch

launcher = RayTransformLauncher(DocIDRayTransformConfiguration())
launcher.launch()

17:18:19 INFO - Running locally
17:18:19 INFO - Doc id parameters are : {'doc_column': 'contents', 'hash_column': 'hash_column', 'int_column': 'int_id_column'}
17:18:19 INFO - data factory data_ is using local data access: input_folder - output/ededup_out output_folder - output/doc_id_out
17:18:19 INFO - data factory data_ max_files -1, n_sample -1
17:18:19 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
17:18:19 INFO - pipeline id pipeline_id
17:18:19 INFO - code location None
17:18:19 INFO - number of workers 3 worker options {'num_cpus': 0.8, 'max_restarts': -1}
17:18:19 INFO - actor creation delay 0
17:18:19 INFO - job details {'job category': 'preprocessing', 'job name': 'doc_id', 'job type': 'ray', 'job id': 'job_id'}
2024-07-31 17:18:21,364	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
[36m(orchestrate p

0

## 5. <span style="color: green">  Fuzzy Dedup [<-](#top)<a class="anchor" id="item5"></a> </span>

Post exact deduplication, fuzzy deduplication is applied with
the goal of removing code files that may have slight variations and thereby unbiasing
the data further. Small variations are quite commonly seen in code data in the form
of variations in the values of variables, addittion of logging statements etc. Find near-
duplicate.

### Set Input/output Folder

In [11]:
## Input to this component is the output of doc_id generator component. 

input_folder = doc_id_out
output_folder = fdedup_out

print(input_folder)
print(output_folder)

output/doc_id_out
output/fdedup_out


### Execute 

In [12]:
import os
import sys

from data_processing.utils import ParamsUtils
from fdedup_transform_ray import FdedupRayTransformConfiguration

# create parameters

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus": 0.8}
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # Orchestration parameters
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": 3,
    # columns used
    "fdedup_doc_column": "contents",
    "fdedup_id_column": "int_id_column",
    "fdedup_cluster_column": "hash_column",
    # infrastructure
    "fdedup_bucket_cpu": 0.5,
    "fdedup_doc_cpu": 0.5,
    "fdedup_mhash_cpu": 0.5,
    "fdedup_num_doc_actors": 2,
    "fdedup_num_bucket_actors": 1,
    "fdedup_num_minhash_actors": 1,
    "fdedup_num_preprocessors": 2,
    # fuzzy parameters
    "fdedup_num_permutations": 64,
    "fdedup_threshold": 0.8,
    "fdedup_shingles_size": 5,
    "fdedup_delimiters": " "
}

# Pass commandline params
sys.argv = ParamsUtils.dict_to_req(d=params)

# launch

launcher = RayTransformLauncher(FdedupRayTransformConfiguration())
launcher.launch()

17:18:33 INFO - Running locally
17:18:33 INFO - fuzzy dedup params are {'doc_column': 'contents', 'id_column': 'int_id_column', 'cluster_column': 'hash_column', 'bucket_cpu': 0.5, 'mhash_cpu': 0.5, 'doc_cpu': 0.5, 'num_doc_actors': 2, 'num_minhash_actors': 1, 'num_bucket_actors': 1, 'num_preprocessors': 2, 'num_permutations': 64, 'threshold': 0.8, 'shingles_size': 5, 'delimiters': ' ', 'snapshot_delay': 1, 'use_bucket_snapshot': False, 'use_doc_snapshot': False, 'random_delay_limit': 10, 'worker_options': {'num_cpus': 0.8}}
17:18:33 INFO - data factory data_ is using local data access: input_folder - output/doc_id_out output_folder - output/fdedup_out
17:18:33 INFO - data factory data_ max_files -1, n_sample -1
17:18:33 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
17:18:33 INFO - pipeline id pipeline_id
17:18:33 INFO - code location None
17:18:33 INFO - number of workers 

0

## <span style="color: green">  6. Language identification [<-](#top)<a class="anchor" id="item6"></a> </span>

This transform identifies the language of the document components.

### Set Input/output Folder

In [13]:

input_folder = fdedup_out
output_folder = lang_out 


### Execute 

In [14]:
import os
import sys

from data_processing.utils import ParamsUtils
from lang_id_transform import (
    content_column_name_cli_param,
    model_credential_cli_param,
    model_kind_cli_param,
    model_url_cli_param,
)
from lang_models import KIND_FASTTEXT
from lang_id_transform_ray import LangIdentificationRayTransformConfiguration


local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus": 0.8}
langid_config = {
    model_credential_cli_param: None, #"PUT YOUR OWN HUGGINGFACE CREDENTIAL",
    model_kind_cli_param: KIND_FASTTEXT,
    model_url_cli_param: "facebook/fasttext-language-identification",
    # content_column_name_cli_param: "text",
}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": 1,
    # language selection specific parameters
    **langid_config,
}

sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = RayTransformLauncher(LangIdentificationRayTransformConfiguration())
launcher.launch()


17:19:09 INFO - Running locally
17:19:09 INFO - lang_id parameters are : {'model_credential': 'None', 'model_kind': 'fasttext', 'model_url': 'facebook/fasttext-language-identification', 'content_column_name': 'contents', 'output_lang_column_name': 'lang', 'output_score_column_name': 'score'}
17:19:09 INFO - data factory data_ is using local data access: input_folder - output/fdedup_out output_folder - output/lang_out
17:19:09 INFO - data factory data_ max_files -1, n_sample -1
17:19:09 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
17:19:09 INFO - pipeline id pipeline_id
17:19:09 INFO - code location None
17:19:09 INFO - number of workers 1 worker options {'num_cpus': 0.8, 'max_restarts': -1}
17:19:09 INFO - actor creation delay 0
17:19:09 INFO - job details {'job category': 'preprocessing', 'job name': 'lang_id', 'job type': 'ray', 'job id': 'job_id'}
2024-07-31 17:19:11,4

0

## <span style="color: green">  7. Document Quality [<-](#top)<a class="anchor" id="item7"></a> </span>

TBA

### Set Input/output Folder

In [15]:
input_folder = lang_out
output_folder = dq_out

print(input_folder)
print(output_folder)

output/lang_out
output/dq_out


### Execute 

In [16]:
import os
import sys
from pathlib import Path

from doc_quality_transform import (
    text_lang_cli_param,
    doc_content_column_cli_param,
    bad_word_filepath_cli_param,
)
from doc_quality_transform_ray import DocQualityRayTransformConfiguration
from data_processing.utils import ParamsUtils

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

doc_quality_basedir = os.path.join(rootdir, "transforms", "language", "doc_quality", "python")
worker_options = {"num_cpus": 0.8}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": 3,
    "runtime_pipeline_id": "pipeline_id",
    "runtime_job_id": "job_id",
    "runtime_creation_delay": 0,
    # doc quality configuration
    text_lang_cli_param: "en",
    doc_content_column_cli_param: "contents",
    bad_word_filepath_cli_param: os.path.join(doc_quality_basedir, "ldnoobw", "en"),
}


Path(output_folder).mkdir(parents=True, exist_ok=True)

sys.argv = ParamsUtils.dict_to_req(d=params)

# create launcher
launcher = RayTransformLauncher(DocQualityRayTransformConfiguration())
# launch
launcher.launch()

17:19:25 INFO - Running locally
17:19:25 INFO - doc_quality parameters are : {'text_lang': 'en', 'doc_content_column': 'contents', 'bad_word_filepath': '/Users/dol/scratch/dpk-dev/data-prep-kit/transforms/language/doc_quality/python/ldnoobw/en', 's3_cred': None, 'docq_data_factory': <data_processing.data_access.data_access_factory.DataAccessFactory object at 0x38b6df8e0>}
17:19:25 INFO - data factory docq_ is using local configuration without input/output path
17:19:25 INFO - data factory docq_ max_files -1, n_sample -1
17:19:25 INFO - data factory docq_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
17:19:25 INFO - data factory data_ is using local data access: input_folder - output/lang_out output_folder - output/dq_out
17:19:25 INFO - data factory data_ max_files -1, n_sample -1
17:19:25 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to 

0

## 8. <span style="color: green">   Filtering [<-](#top)<a class="anchor" id="item8"></a> </span>

Filter out documents that do not meet the quality threshold for each annotation. The thresholds are computed based on a distributional
analysis as well as manual inspection of samples maintaining the balance between data quality and data volume

### Set Input/output Folder

In [17]:
input_folder = dq_out
output_folder = filter_out

### Execute 

In [18]:
import os

from data_processing.data_access import DataAccessLocal
from filter_transform import (
    filter_columns_to_drop_cli_param,
    filter_criteria_cli_param,
    filter_logical_operator_cli_param,
)
from filter_transform_ray import FilterRayTransformConfiguration


local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}

# TODO
# - decide which rules to apply for filtering


# This is just an example criteria to filter
filter_criteria = [
    "total_num_lines > 10 AND total_num_lines < 90",
    "lang_selected = 1",
]
filter_logical_operator = "AND"
filter_columns_to_drop = ["lang_selected", "hash_column"]

filter_params = {
    filter_criteria_cli_param: filter_criteria,
    filter_columns_to_drop_cli_param: filter_columns_to_drop,
    filter_logical_operator_cli_param: filter_logical_operator,
}

worker_options = {"num_cpus": 0.8}
launcher_params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": 5,
}


sys.argv = ParamsUtils.dict_to_req(launcher_params | filter_params)
# Create the longer to launch with the blocklist transform.
launcher = RayTransformLauncher(FilterRayTransformConfiguration())
# Launch the ray actor(s) to process the input
# launcher.launch()

## 9. <span style="color: green">  Text encoding [<-](#top)<a class="anchor" id="item9"></a> </span>

Encode text for the vector storage.

In [19]:
# input_folder = filter_out
input_folder = dq_out
output_folder = encoder_out

In [20]:
from text_encoder_transform_ray import TextEncoderRayTransformConfiguration

local_conf = {
    "input_folder": input_folder,
    "output_folder": output_folder,
}
worker_options = {"num_cpus": 0.8}
params = {
    # where to run
    "run_locally": True,
    # Data access. Only required parameters are specified
    "data_local_config": ParamsUtils.convert_to_ast(local_conf),
    # orchestrator
    "runtime_worker_options": ParamsUtils.convert_to_ast(worker_options),
    "runtime_num_workers": 2,
    # text_encoder
    "text_encoder_model_name": "BAAI/bge-small-en-v1.5",
}

sys.argv = ParamsUtils.dict_to_req(d=params)
# create launcher
launcher = RayTransformLauncher(TextEncoderRayTransformConfiguration())
# Launch the ray actor(s) to process the input
launcher.launch()


17:19:40 INFO - Running locally
17:19:40 INFO - text_encoder parameters are : {'content_column_name': 'contents', 'output_embeddings_column_name': 'embeddings', 'model_name': 'BAAI/bge-small-en-v1.5'}
17:19:40 INFO - data factory data_ is using local data access: input_folder - output/dq_out output_folder - output/encoder_out
17:19:40 INFO - data factory data_ max_files -1, n_sample -1
17:19:40 INFO - data factory data_ Not using data sets, checkpointing False, max files -1, random samples -1, files to use ['.parquet'], files to checkpoint ['.parquet']
17:19:40 INFO - pipeline id pipeline_id
17:19:40 INFO - code location None
17:19:40 INFO - number of workers 2 worker options {'num_cpus': 0.8, 'max_restarts': -1}
17:19:40 INFO - actor creation delay 0
17:19:40 INFO - job details {'job category': 'preprocessing', 'job name': 'text_encoder', 'job type': 'ray', 'job id': 'job_id'}
2024-07-31 17:19:42,211	INFO worker.py:1744 -- Started a local Ray instance. View the dashboard at [1m[32m1

0