From 74e48fbcd57dd10957338490f404ddd73016d1ce Mon Sep 17 00:00:00 2001 From: Matteo Rosati Date: Wed, 29 Apr 2026 14:46:41 +0200 Subject: [PATCH] replace existing file records on re-import --- chromy/chroma_functions.py | 8 +++++ chromy/utilities.py | 5 ++- tests/test_utilities.py | 67 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 tests/test_utilities.py diff --git a/chromy/chroma_functions.py b/chromy/chroma_functions.py index a945bee..08389bd 100644 --- a/chromy/chroma_functions.py +++ b/chromy/chroma_functions.py @@ -54,6 +54,14 @@ def delete_data(collection_name: str, where: dict[str, str]) -> int: return int(result.get("deleted", 0)) +def has_data_for_file(collection_name: str, file_name: str) -> bool: + _, collection = _get_client_and_collection(collection_name) + result = collection.get(where=cast(Where, {"file_name": file_name})) + ids = result.get("ids", []) + + return len(ids) > 0 + + def count_collection(collection_name: str) -> int: _, collection = _get_client_and_collection(collection_name) return collection.count() diff --git a/chromy/utilities.py b/chromy/utilities.py index 8d940e1..98895bf 100644 --- a/chromy/utilities.py +++ b/chromy/utilities.py @@ -4,12 +4,15 @@ from pathlib import Path from chromadb import QueryResult -from chromy.chroma_functions import add_data, query_data +from chromy.chroma_functions import add_data, delete_data, has_data_for_file, query_data from chromy.chunk_functions import chunk_file from chromy.embed import embed def ingest_file(collection_name: str, file_path: str) -> int: + if has_data_for_file(collection_name, file_path): + delete_data(collection_name, {"file_name": file_path}) + chunks = chunk_file(file_path) embeddings = embed(chunks) add_data(collection_name, embeddings, file_path) diff --git a/tests/test_utilities.py b/tests/test_utilities.py new file mode 100644 index 0000000..177219a --- /dev/null +++ b/tests/test_utilities.py @@ -0,0 +1,67 @@ +from __future__ import annotations + +import unittest +from unittest.mock import MagicMock, call, patch + +from chromy.utilities import ingest_file + + +class UtilityTests(unittest.TestCase): + def test_ingest_file_adds_new_file_without_deleting(self) -> None: + chunks = ["chunk 1", "chunk 2"] + embeddings = [ + {"text": "chunk 1", "embedding": [0.1, 0.2]}, + {"text": "chunk 2", "embedding": [0.3, 0.4]}, + ] + + with ( + patch("chromy.utilities.has_data_for_file", return_value=False) as has_data, + patch("chromy.utilities.delete_data") as delete_data, + patch("chromy.utilities.chunk_file", return_value=chunks) as chunk_file, + patch("chromy.utilities.embed", return_value=embeddings) as embed, + patch("chromy.utilities.add_data") as add_data, + ): + records_added = ingest_file("notes", "/tmp/play.txt") + + has_data.assert_called_once_with("notes", "/tmp/play.txt") + delete_data.assert_not_called() + chunk_file.assert_called_once_with("/tmp/play.txt") + embed.assert_called_once_with(chunks) + add_data.assert_called_once_with("notes", embeddings, "/tmp/play.txt") + self.assertEqual(records_added, 2) + + def test_ingest_file_replaces_existing_file_records_before_adding(self) -> None: + chunks = ["chunk 1"] + embeddings = [{"text": "chunk 1", "embedding": [0.1, 0.2]}] + manager = MagicMock() + + with ( + patch("chromy.utilities.has_data_for_file", return_value=True) as has_data, + patch("chromy.utilities.delete_data") as delete_data, + patch("chromy.utilities.chunk_file", return_value=chunks) as chunk_file, + patch("chromy.utilities.embed", return_value=embeddings) as embed, + patch("chromy.utilities.add_data") as add_data, + ): + manager.attach_mock(has_data, "has_data") + manager.attach_mock(delete_data, "delete_data") + manager.attach_mock(chunk_file, "chunk_file") + manager.attach_mock(embed, "embed") + manager.attach_mock(add_data, "add_data") + + records_added = ingest_file("notes", "/tmp/play.txt") + + self.assertEqual( + manager.mock_calls, + [ + call.has_data("notes", "/tmp/play.txt"), + call.delete_data("notes", {"file_name": "/tmp/play.txt"}), + call.chunk_file("/tmp/play.txt"), + call.embed(chunks), + call.add_data("notes", embeddings, "/tmp/play.txt"), + ], + ) + self.assertEqual(records_added, 1) + + +if __name__ == "__main__": + unittest.main()