EmbeddingsPipeline
An EmbeddingsPipeline is an utility wraping up all transformations your documents are expected to go through in order to be vectorize and therefor eleligible to similarity search via a vector database.
An EmbeddingsPipeline is a Pydantic BaseModeldoc structure. It means the provided arguments are expected to adheres to specified types and constraints.
It expects 3 typed arguments representing its core components:
document_loader: Any Langchain's Document Loaderdocdocument_transformers: A list of Document transformerdocvectorstore: A Rag-CoreVectorStoreResourceor a Langchain VectorStoredoc
An EmbeddingsPipeline implements natively the methods required to triggers these three core components.
Load documents
Load all documents stored in an archive or a remote storage (such as S3 or Snowflake) and turn them into a List[Document] using the load_raw_documents method.
Source EmbeddingsPipeline.load_raw_documents
def load_raw_documents(
self, load_kwargs: Optional[Dict[str, Any]] = None
) -> List[Document]:
"""Load raw documents using the provided document_loader."""
if self.document_loader is None:
raise AttributeError("A document_loader must be set to load documents.")
if load_kwargs is None:
load_kwargs = {}
return self.document_loader.load(**load_kwargs)
And its asynchronous version.
Source EmbeddingsPipeline.aload_raw_documents
async def aload_raw_documents(
self, load_kwargs: Optional[Dict[str, Any]] = None
) -> List[Document]:
"""Asynchronous loading of raw documents using the provided document_loader."""
if self.document_loader is None:
raise AttributeError("A document_loader must be set to load documents.")
if load_kwargs is None:
load_kwargs = {}
return await self.document_loader.aload(**load_kwargs)
Memory issues
As document loader stores all the document in an in-memory List, large storage can lead to memory issues.
Implementing a lazy pipeline could lead to severe refactoring and custom implementations.
If you face this need, please get in touch with the RAG-Core team for support.
Transform documents
Apply a sequence of document transformation using transform_documents
This method simply loops of the document_transformers class attribute and call the transform_documents associated.
Source EmbeddingsPipeline.transform_documents
def transform_documents(
self,
docs: List[Document],
transformers_kwargs: Optional[List[Dict[str, Any]]] = None,
) -> List[Document]:
"""Transform sequentially each documents using the provided list of document_transformers.
Args:
docs: List of documents to transform.
transformers_kwargs: List of dictionaries containing the kwargs for each transformer.
This list must have the same length as document_transformers.
Returns:
List of transformed documents.
"""
transformers_kwargs = self.validate_transformers_kwargs(transformers_kwargs)
if self.document_transformers:
for transformer, kwargs in zip(
self.document_transformers, transformers_kwargs
):
docs = transformer.transform_documents(docs, **kwargs)
return docs
And its asynchronous version.
Source EmbeddingsPipeline.atransform_documents
async def atransform_documents(
self,
docs: List[Document],
transformers_kwargs: Optional[List[Dict[str, Any]]] = None,
) -> List[Document]:
"""Asynchronously transform sequentially each documents using the provided list of document_transformers.
Args:
docs: List of documents to transform.
transformers_kwargs: List of dictionaries containing the kwargs for each transformer.
This list must have the same length as document_transformers.
Returns:
List of transformed documents.
"""
transformers_kwargs = self.validate_transformers_kwargs(transformers_kwargs)
if self.document_transformers:
for transformer, kwargs in zip(
self.document_transformers, transformers_kwargs
):
docs = await transformer.atransform_documents(docs, **kwargs)
return docs
Valid document transformer
Document transformers provided as arguments shall implement langchain_core.documents.BaseDocumentTransformer.
This Langchain abstract class request only one method implementation: transform_documents
Example
from langchain_core.documents import BaseDocumentTransformer
class ExpirationDateTransformer(BaseDocumentTransformer):
def transform_documents(
self, documents: Sequence[Document], **kwargs: Any
) -> Sequence[Document]:
try:
exp_dates = kwargs["expiration_dates"]
except KeyError:
raise ValueError("ExpirationDateTransformer requires an `expiration_dates` keyword argument." )
for doc, exp_date in zip(docs, exp_dates):
doc.metadata["expiration_date"] = exp_date
return docs
Extra arguments
transform_documentsaccept a transformers_kwargs method argument distributing a List[Dict[str, Any]] in each transformers kwargs.
considering an EmbeddingsPipeline with 3 document transformers where:
- The second is
ExpirationDateTransformer - Both the first and the last does not requires any extra arguments.
# ✅ Working example
>>> doc_trans_kwargs = [
{}, # kwargs_1
{"expiration_dates": [datetime.date("2030-12-31") for _ in range(len(docs))]}, # kwargs_2
{} # kwargs_3
]
>>> embeddings_pipeline.transform_documents(docs, doc_trans_kwargs)
# ❌ Failing example
# A single dict provided
>>> doc_trans_kwargs = {"expiration_dates": [datetime.date("2030-12-31") for _ in range(len(docs))]}
>>> embeddings_pipeline.transform_documents(docs, doc_trans_kwargs)
ValueError: Number of transformers_kwargs must match number
of document_transformers and respective order.
Embed & Store documents
The embeddings and storing procedure is handled by the Langchain VectorStoredoc instance as they natively and efficiently handle this operation.
Info
Considering the risks and costs associated with this procedure, embeddings and storage is only available through load_transform_and_store_documents ensuring users to perform all predefined transformation steps before populating a database.
This aims to prevent resource contamination with misformed documents or vectors.
All together
Perform all predefined steps of the pipeline with load_transform_and_store_documents
Danger
With a static file loader (such as an archive loader), this procedure can produce duplicated entries in your vector storage.
Consider carrefully the data to be loaded before proceeding.
Source EmbeddingsPipeline.transform_documents
def load_transform_and_store_documents(
self,
load_kwargs: Optional[Dict[str, Any]] = None,
transformers_kwargs: Optional[List[Dict[str, Any]]] = None,
) -> List[Document]:
"""Perform all steps in the pipeline and store the transformed documents in the vectorstore.
Warning: This operation can lead to significant changes in remote vector databases"""
assert self.vectorstore is not None, AttributeError(
"A vectorstore must be set to perform storage operation."
)
docs = self.load_raw_documents(load_kwargs)
docs = self.transform_documents(docs, transformers_kwargs)
self._store_documents(docs)
return docs
And its asynchronous version.
Source EmbeddingsPipeline.transform_documents
async def aload_transform_and_store_documents(
self,
load_kwargs: Optional[Dict[str, Any]] = None,
transformers_kwargs: Optional[List[Dict[str, Any]]] = None,
) -> List[Document]:
"""Asynchronously perform all steps in the pipeline and store the transformed documents in the vectorstore.
Warning: This operation can lead to significant changes in remote vector databases"""
assert self.vectorstore is not None, AttributeError(
"A vectorstore must be set to perform storage operation."
)
docs = await self.aload_raw_documents(load_kwargs)
docs = await self.atransform_documents(docs, transformers_kwargs)
await self._astore_documents(docs)
return docs