diff --git a/cassis/cas.py b/cassis/cas.py index 213aa52..e81223f 100644 --- a/cassis/cas.py +++ b/cassis/cas.py @@ -15,19 +15,43 @@ FEATURE_BASE_NAME_HEAD, FEATURE_BASE_NAME_LANGUAGE, TYPE_NAME_ANNOTATION, + TYPE_NAME_ARRAY_BASE, TYPE_NAME_DOCUMENT_ANNOTATION, + TYPE_NAME_EMPTY_FLOAT_LIST, + TYPE_NAME_EMPTY_INTEGER_LIST, + TYPE_NAME_EMPTY_STRING_LIST, + TYPE_NAME_FLOAT_LIST, TYPE_NAME_FS_ARRAY, TYPE_NAME_FS_LIST, + TYPE_NAME_INTEGER_LIST, + TYPE_NAME_NON_EMPTY_FLOAT_LIST, + TYPE_NAME_NON_EMPTY_INTEGER_LIST, + TYPE_NAME_NON_EMPTY_STRING_LIST, TYPE_NAME_SOFA, + TYPE_NAME_STRING_LIST, FeatureStructure, Annotation, Type, TypeCheckError, + TypeNotFoundError, TypeSystem, TypeSystemMode, is_annotation, + load_typesystem, ) +_PRIMITIVE_LIST_BASE_TYPE = { + TYPE_NAME_INTEGER_LIST: TYPE_NAME_INTEGER_LIST, + TYPE_NAME_EMPTY_INTEGER_LIST: TYPE_NAME_INTEGER_LIST, + TYPE_NAME_NON_EMPTY_INTEGER_LIST: TYPE_NAME_INTEGER_LIST, + TYPE_NAME_FLOAT_LIST: TYPE_NAME_FLOAT_LIST, + TYPE_NAME_EMPTY_FLOAT_LIST: TYPE_NAME_FLOAT_LIST, + TYPE_NAME_NON_EMPTY_FLOAT_LIST: TYPE_NAME_FLOAT_LIST, + TYPE_NAME_STRING_LIST: TYPE_NAME_STRING_LIST, + TYPE_NAME_EMPTY_STRING_LIST: TYPE_NAME_STRING_LIST, + TYPE_NAME_NON_EMPTY_STRING_LIST: TYPE_NAME_STRING_LIST, +} + _validator_optional_string = validators.optional(validators.instance_of(str)) NAME_DEFAULT_SOFA = "_InitialView" @@ -971,9 +995,8 @@ def _find_all_fs( elif feature.rangeType.name == TYPE_NAME_FS_LIST and hasattr(feature_value, FEATURE_BASE_NAME_HEAD): v = feature_value while hasattr(v, FEATURE_BASE_NAME_HEAD): - if not v.head or v.head.xmiID in all_fs: - continue - openlist.append(v.head) + if v.head and v.head.xmiID not in all_fs: + openlist.append(v.head) v = v.tail # For primitive arrays / lists, we do not need to handle the elements continue @@ -1005,6 +1028,354 @@ def _copy(self) -> "Cas": result._xmi_id_generator = self._xmi_id_generator return result + def deep_copy(self, copy_typesystem: bool = False) -> "Cas": + """ + Create and return a deep copy of this CAS object. + All feature structures, views, and sofas are copied. If `copy_typesystem` is True, the typesystem is also deep-copied; + otherwise, the original typesystem is shared between the original and the copy. + Args: + copy_typesystem (bool): Whether to copy the original typesystem or not. If True, the typesystem is deep-copied. + Returns: + Cas: A deep copy of this CAS object. + """ + ts = self.typesystem + if copy_typesystem: + ts = self.typesystem.to_xml() + ts = load_typesystem(ts) + + cas_copy = Cas(ts, lenient=self._lenient) + + cas_copy._views = {} + cas_copy._sofas = {} + + def _collect_fs_list_references(fs_list: FeatureStructure) -> List[Optional[int]]: + referenced_list = [] + current = fs_list + + while hasattr(current, FEATURE_BASE_NAME_HEAD): + head = current.head + if head is None: + referenced_list.append(None) + elif hasattr(head, "xmiID") and head.xmiID is not None: + referenced_list.append(head.xmiID) + else: + warnings.warn("FSList item without xmiID encountered during deep copy; preserving as None in copy.") + referenced_list.append(None) + + current = current.tail + + return referenced_list + + def _build_fs_list(referenced_list: List[Optional[int]]) -> FeatureStructure: + current = ts.get_type("uima.cas.EmptyFSList")() + + for reference_id in reversed(referenced_list): + node = ts.get_type("uima.cas.NonEmptyFSList")() + node.tail = current + node.head = all_copied_fs.get(reference_id) if reference_id is not None else None + current = node + + return current + + for sofa in self.sofas: + sofa_copy = Sofa( + sofaID=sofa.sofaID, + sofaNum=sofa.sofaNum, + type=ts.get_type(sofa.type.name), + xmiID=sofa.xmiID, + ) + sofa_copy.mimeType = sofa.mimeType + sofa_copy.sofaArray = sofa.sofaArray + sofa_copy.sofaString = sofa.sofaString + sofa_copy.sofaURI = sofa.sofaURI + + cas_copy._sofas[sofa_copy.sofaID] = sofa_copy + cas_copy._views[sofa_copy.sofaID] = View(sofa=sofa_copy) + + # Set the current view to the `_InitialView` entry in the copied CAS. + # (`Cas.__init__` creates an `_InitialView`; here we point the current + # view at that entry in the `cas_copy._views` mapping so subsequent + # `add()` calls index into the initial view by default.) + cas_copy._current_view = cas_copy._views["_InitialView"] + + references = dict() + referenced_arrays = dict() + referenced_fs_arrays = dict() + referenced_primitive_arrays = dict() + referenced_lists = dict() + # for primitive lists (e.g. IntegerList) we collect primitive head values + referenced_primitive_lists = dict() + + all_copied_fs = dict() + referenced_view = defaultdict(list) + + for view in self.views: + for member in view.get_all_fs(): + if hasattr(member, "xmiID") and member.xmiID is not None: + if view.sofa.sofaID not in referenced_view[member.xmiID]: + referenced_view[member.xmiID].append(view.sofa.sofaID) + + # Ensure sofa.sofaArray feature structures are discovered even when they + # are not indexed in any view. `_find_all_fs(seeds=...)` replaces the + # default traversal roots, so we include both the original indexed view + # members and any sofaArray roots here. + traversal_seeds = [] + for sofa in self.sofas: + traversal_seeds.extend(self.get_view(sofa.sofaID).select_all_fs()) + if getattr(sofa, "sofaArray", None) is not None: + traversal_seeds.append(sofa.sofaArray) + + for fs in self._find_all_fs(seeds=traversal_seeds): + try: + t = ts.get_type(fs.type.name) + except TypeNotFoundError as e: + raise TypeNotFoundError( + f"deep_copy() cannot copy feature structure of type '{fs.type.name}': " + f"the type is not present in the target typesystem. This can happen when " + f"the source CAS was loaded leniently against an incomplete typesystem and " + f"contains feature structures whose types were not declared. deep_copy() " + f"requires every feature structure's type to be present in the typesystem." + ) from e + fs_copy = t() + + if t.name == TYPE_NAME_FS_ARRAY and fs.elements is not None: + standalone_fs_array_member_ids = [] + for item in fs.elements: + if item is None: + standalone_fs_array_member_ids.append(None) + elif hasattr(item, "xmiID") and item.xmiID is not None: + standalone_fs_array_member_ids.append(item.xmiID) + else: + warnings.warn( + f"Standalone FSArray {fs.xmiID} contains an unidentifiable item; preserving as None in copy." + ) + standalone_fs_array_member_ids.append(None) + + referenced_fs_arrays[fs.xmiID] = standalone_fs_array_member_ids + elif t.supertype.name == TYPE_NAME_ARRAY_BASE and fs.elements is not None: + referenced_primitive_arrays[fs.xmiID] = list(fs.elements) + + for feature in t.all_features: + if t.supertype.name == TYPE_NAME_ARRAY_BASE and feature.name == "elements": + continue + + if ts.is_primitive(feature.rangeType): + fs_copy[feature.name] = fs.get(feature.name) + elif ts.is_primitive_collection(feature.rangeType): + val = fs.get(feature.name) + if val is None: + continue + + if feature.multipleReferencesAllowed and hasattr(val, "xmiID") and val.xmiID is not None: + references.setdefault(feature.name, []) + references[feature.name].append((fs.xmiID, val.xmiID)) + continue + + # Distinguish primitive arrays (have `elements`) from primitive lists (use head/tail). + # Lists may be declared with the abstract base type (e.g. IntegerList) or with a + # concrete subtype (e.g. NonEmptyIntegerList); the lookup handles both. + abstract_list_name = _PRIMITIVE_LIST_BASE_TYPE.get(feature.rangeType.name) + if ts.is_array(feature.rangeType): + fs_copy[feature.name] = ts.get_type(feature.rangeType.name)() + # shallow-copy the elements list to avoid sharing the same list object + fs_copy[feature.name].elements = list(val.elements) + elif abstract_list_name is not None: + # collect primitive values from head/tail style lists + current = val + prim_list = [] + while hasattr(current, FEATURE_BASE_NAME_HEAD): + head = getattr(current, FEATURE_BASE_NAME_HEAD) + prim_list.append(head) + current = current.tail + + # store the primitive list values along with the abstract list base name + # so the rebuild step can derive Empty*/NonEmpty* concrete type names. + referenced_primitive_lists.setdefault(fs.xmiID, {}) + referenced_primitive_lists[fs.xmiID][feature.name] = ( + abstract_list_name, + prim_list, + ) + else: + warnings.warn( + f"Primitive collection feature '{feature.name}' on FS {fs.xmiID} has range type " + f"'{feature.rangeType.name}' which is neither a primitive array nor a primitive list; " + "value not copied." + ) + elif ts.is_array(feature.rangeType): + val = fs[feature.name] + if val is None: + continue + + # If the array itself may be shared (multipleReferencesAllowed), preserve + # its identity by treating it like any other FS reference and wiring it + # up later via `references`. Only inline-copy arrays when they are not + # declared shareable. + if feature.multipleReferencesAllowed and hasattr(val, "xmiID") and val.xmiID is not None: + references.setdefault(feature.name, []) + references[feature.name].append((fs.xmiID, val.xmiID)) + else: + fs_copy[feature.name] = ts.get_type(TYPE_NAME_FS_ARRAY)() + # collect referenced xmiIDs for mapping later and preserve None placeholders + array_feature_member_ids = [] + for item in val.elements: + if item is None: + array_feature_member_ids.append(None) + elif hasattr(item, "xmiID") and item.xmiID is not None: + array_feature_member_ids.append(item.xmiID) + else: + warnings.warn( + f"Array feature '{feature.name}' of FS {fs.xmiID} contains an unidentifiable item; preserving as None in copy." + ) + array_feature_member_ids.append(None) + referenced_arrays.setdefault(fs.xmiID, {}) + referenced_arrays[fs.xmiID][feature.name] = array_feature_member_ids + elif ts.is_list(feature.rangeType): + val = fs[feature.name] + if val is None: + continue + + if feature.multipleReferencesAllowed and hasattr(val, "xmiID") and val.xmiID is not None: + references.setdefault(feature.name, []) + references[feature.name].append((fs.xmiID, val.xmiID)) + else: + referenced_lists.setdefault(fs.xmiID, {}) + referenced_lists[fs.xmiID][feature.name] = _collect_fs_list_references(val) + elif feature.rangeType.name == TYPE_NAME_SOFA: + # ignore sofa references + pass + else: + val = fs[feature.name] + # If the original feature value is None, preserve it without warning + if val is None: + continue + if hasattr(val, "xmiID") and val.xmiID is not None: + references.setdefault(feature.name, []) + references[feature.name].append((fs.xmiID, val.xmiID)) + else: + warnings.warn( + f'Original non-primitive feature "{feature.name}" was not copied from feature structure {fs.xmiID}.' + ) + + fs_copy.xmiID = fs.xmiID + all_copied_fs[fs_copy.xmiID] = fs_copy + + # set references to single objects + for feature, pairs in references.items(): + for current_ID, reference_ID in pairs: + try: + all_copied_fs[current_ID][feature] = all_copied_fs[reference_ID] + except KeyError: + warnings.warn( + f"Reference {reference_ID} not found for feature '{feature}' of feature structure {current_ID}" + ) + + # set references for objects in arrays + for current_ID, arrays in referenced_arrays.items(): + for feature, array_member_ids in arrays.items(): + elements = [] + for reference_ID in array_member_ids: + if reference_ID is None: + elements.append(None) + continue + try: + elements.append(all_copied_fs[reference_ID]) + except KeyError: + warnings.warn( + f"Reference {reference_ID} not found for array feature '{feature}' of feature structure {current_ID}; inserting None." + ) + elements.append(None) + all_copied_fs[current_ID][feature].elements = elements + + for current_ID, fs_array_member_ids in referenced_fs_arrays.items(): + elements = [] + for reference_ID in fs_array_member_ids: + if reference_ID is None: + elements.append(None) + continue + try: + elements.append(all_copied_fs[reference_ID]) + except KeyError: + warnings.warn( + f"Reference {reference_ID} not found for standalone FSArray {current_ID}; inserting None." + ) + elements.append(None) + all_copied_fs[current_ID].elements = elements + + for current_ID, elements in referenced_primitive_arrays.items(): + all_copied_fs[current_ID].elements = list(elements) + + # rebuild FSList features from copied members + for current_ID, lists in referenced_lists.items(): + for feature, fs_list_member_ids in lists.items(): + all_copied_fs[current_ID][feature] = _build_fs_list(fs_list_member_ids) + + # rebuild primitive head/tail lists (e.g. IntegerList, FloatList, StringList) + for current_ID, lists in referenced_primitive_lists.items(): + for feature, (list_type_name, primitive_values) in lists.items(): + # derive Empty/NonEmpty concrete type names from the abstract list type + suffix = list_type_name.split(".")[-1] + empty_name = f"uima.cas.Empty{suffix}" + nonempty_name = f"uima.cas.NonEmpty{suffix}" + + current = ts.get_type(empty_name)() + for value in reversed(primitive_values): + node = ts.get_type(nonempty_name)() + node.tail = current + node.head = value + current = node + + all_copied_fs[current_ID][feature] = current + + # ensure Sofa.sofaArray references point to the copied feature structures + # Use the original CAS's sofas to locate the original sofaArray objects + # (safer than relying on sofa_copy.sofaArray pointing back to the original + # object in all cases) and remap them to the copied FS when available. + for orig_sofa in self.sofas: + sofa_copy = cas_copy._sofas.get(orig_sofa.sofaID) + if sofa_copy is None: + continue + orig_sofa_array = getattr(orig_sofa, "sofaArray", None) + if hasattr(orig_sofa_array, "xmiID") and orig_sofa_array.xmiID in all_copied_fs: + sofa_copy.sofaArray = all_copied_fs[orig_sofa_array.xmiID] + + # Add only original view members back to the copied indices. Referenced + # feature structures that were not indexed in any original view remain + # reachable transitively and will still be serialized by `_find_all_fs()`. + feature_structures = sorted(all_copied_fs.values(), key=lambda f: f.xmiID, reverse=False) + for item in feature_structures: + if not hasattr(item, "xmiID") or item.xmiID is None: + continue + + view_names = referenced_view.get(item.xmiID) + if not view_names: + continue + + # Use the normal add-path once so FS with a `sofa` feature are rebound + # to the copied sofa in their primary view. Any additional view + # memberships are restored by indexing the same FS directly to avoid + # mutating its `sofa` repeatedly. + cas_copy._current_view = cas_copy._views[view_names[0]] + cas_copy.add(item, keep_id=True) + + for view_name in view_names[1:]: + cas_copy._views[view_name].add_annotation_to_index(item) + + cas_copy._xmi_id_generator = IdGenerator(initial_id=self._xmi_id_generator._next_id) + cas_copy._sofa_num_generator = IdGenerator(initial_id=self._sofa_num_generator._next_id) + + # Restore the active view on the copy to match the source CAS' current view. + # During re-indexing we may have set `cas_copy._current_view` multiple + # times; ensure the returned copy has the same active sofa as `self`. + try: + active_sofa_id = self.get_sofa().sofaID + except Exception: + active_sofa_id = "_InitialView" + + if active_sofa_id in cas_copy._views: + cas_copy._current_view = cas_copy._views[active_sofa_id] + + return cas_copy + def _sort_func(a: FeatureStructure) -> Tuple[int, int, int]: xmi_id = getattr(a, "xmiID", None) diff --git a/tests/test_cas.py b/tests/test_cas.py index 79342c7..8218db9 100644 --- a/tests/test_cas.py +++ b/tests/test_cas.py @@ -5,15 +5,26 @@ from cassis.typesystem import ( TYPE_NAME_ANNOTATION, + TYPE_NAME_BYTE_ARRAY, + TYPE_NAME_EMPTY_FLOAT_LIST, + TYPE_NAME_EMPTY_FS_LIST, + TYPE_NAME_EMPTY_INTEGER_LIST, + TYPE_NAME_EMPTY_STRING_LIST, + TYPE_NAME_FS_LIST, TYPE_NAME_INTEGER, TYPE_NAME_INTEGER_ARRAY, + TYPE_NAME_NON_EMPTY_FLOAT_LIST, + TYPE_NAME_NON_EMPTY_INTEGER_LIST, + TYPE_NAME_NON_EMPTY_STRING_LIST, TYPE_NAME_STRING, TYPE_NAME_TOP, AnnotationHasNoSofa, FeatureStructure, ) from tests.fixtures import * +from tests.test_files.test_cas_generators import MultiFeatureRandomCasGenerator, MultiTypeRandomCasGenerator from cassis.util import overlapping +from cassis.cas import Cas # Cas @@ -352,7 +363,172 @@ def test_select_returns_feature_structures(cas_with_collections_xmi: str, typesy assert len(arrs) == 3 -# Covered text +def test_deep_copy_with_primitive_integer_list(): + """Deep-copying a CAS that contains a primitive IntegerList feature should succeed. + + This reproduces the scenario where `uima.cas.IntegerList` (a primitive list using + `head`/`tail`) is used as a feature value. The deep-copy implementation must not + assume an `elements` attribute for primitive lists. + """ + cas = Cas() + ts = cas.typesystem + + # Create a type that has a primitive integer list feature + MyType = ts.create_type("test.WithIntegerList") + ts.create_feature(MyType, name="ints", rangeType="uima.cas.IntegerList") + + # Build a simple NonEmptyIntegerList node: head=42, tail=EmptyIntegerList + nonempty = ts.get_type("uima.cas.NonEmptyIntegerList")() + nonempty.head = 42 + nonempty.tail = ts.get_type("uima.cas.EmptyIntegerList")() + + myfs = MyType() + myfs.ints = nonempty + + cas.add(myfs) + + # Should not raise and copied value should preserve the head element + cas_copy = cas.deep_copy() + copied = list(cas_copy.select("test.WithIntegerList"))[0] + assert copied.ints.head == 42 + + +def test_deep_copy_preserves_shared_primitive_array_identity(): + typesystem = TypeSystem() + Parent = typesystem.create_type("test.Parent") + typesystem.create_feature( + Parent, + "ints", + rangeType=typesystem.get_type(TYPE_NAME_INTEGER_ARRAY), + elementType=typesystem.get_type(TYPE_NAME_INTEGER), + multipleReferencesAllowed=True, + ) + + cas = Cas(typesystem=typesystem) + + IntegerArray = typesystem.get_type(TYPE_NAME_INTEGER_ARRAY) + int_arr = IntegerArray() + int_arr.elements = [1, 2, 3] + + first = Parent() + second = Parent() + first.ints = int_arr + second.ints = int_arr + cas.add(first) + cas.add(second) + + copy = cas.deep_copy(copy_typesystem=False) + + copied_parents = list(copy.select("test.Parent")) + assert len(copied_parents) == 2 + assert copied_parents[0].ints is copied_parents[1].ints + assert copied_parents[0].ints.elements == [1, 2, 3] + assert copied_parents[0].ints is not int_arr + + +def test_deep_copy_preserves_shared_primitive_list_identity(): + typesystem = TypeSystem() + Parent = typesystem.create_type("test.Parent") + typesystem.create_feature( + Parent, + "ints", + rangeType="uima.cas.IntegerList", + multipleReferencesAllowed=True, + ) + + cas = Cas(typesystem=typesystem) + + shared_list = typesystem.get_type("uima.cas.NonEmptyIntegerList")() + shared_list.head = 42 + shared_list.tail = typesystem.get_type("uima.cas.EmptyIntegerList")() + + first = Parent() + second = Parent() + first.ints = shared_list + second.ints = shared_list + cas.add(first) + cas.add(second) + + copy = cas.deep_copy(copy_typesystem=False) + + copied_parents = list(copy.select("test.Parent")) + assert len(copied_parents) == 2 + assert copied_parents[0].ints is copied_parents[1].ints + assert copied_parents[0].ints is not shared_list + assert copied_parents[0].ints.head == 42 + assert copied_parents[0].ints.tail.type.name == "uima.cas.EmptyIntegerList" + + +def test_deep_copy_preserves_shared_fsarray_identity(): + """If two feature structures share the same FSArray and the feature allows multiple references, + the shared array identity should be preserved after deep_copy. + """ + cas = Cas() + ts = cas.typesystem + + # Create a simple element type and a parent type with a shareable FSArray feature + ElemType = ts.create_type("test.Elem") + ParentType = ts.create_type("test.Parent") + ts.create_feature( + ParentType, name="arr", rangeType="uima.cas.FSArray", elementType="test.Elem", multipleReferencesAllowed=True + ) + + # create a shared array FS and add it to the CAS so it receives an xmiID + array_fs = ts.get_type("uima.cas.FSArray")() + # give it one element: an Elem instance + elem = ElemType() + cas.add(elem) + array_fs.elements = [elem] + cas.add(array_fs) + + p1 = ParentType() + p2 = ParentType() + p1.arr = array_fs + p2.arr = array_fs + + cas.add(p1) + cas.add(p2) + + cas_copy = cas.deep_copy() + parents = list(cas_copy.select("test.Parent")) + assert len(parents) == 2 + + arr1 = parents[0].arr + arr2 = parents[1].arr + + # The two parents in the copied CAS must reference the same FS object + assert arr1 is arr2 + + +def test_deep_copy_inlines_fsarray_when_multiple_references_not_allowed(): + """FSArray-valued features without multipleReferencesAllowed should be copied inline.""" + cas = Cas() + ts = cas.typesystem + + ElemType = ts.create_type("test.Elem") + ParentType = ts.create_type("test.Parent") + ts.create_feature( + ParentType, name="arr", rangeType="uima.cas.FSArray", elementType="test.Elem", multipleReferencesAllowed=False + ) + + elem = ElemType() + cas.add(elem) + + array_fs = ts.get_type("uima.cas.FSArray")() + array_fs.elements = [elem] + + parent = ParentType() + parent.arr = array_fs + cas.add(parent) + + cas_copy = cas.deep_copy() + copied_parent = list(cas_copy.select("test.Parent"))[0] + + assert copied_parent.arr is not None + assert copied_parent.arr is not array_fs + assert len(copied_parent.arr.elements) == 1 + assert copied_parent.arr.elements[0] is not elem + assert copied_parent.arr.elements[0].type.name == "test.Elem" def test_get_covered_text_tokens(tokens: list[FeatureStructure]): @@ -376,6 +552,38 @@ def test_get_covered_text_sentences(sentences: list[FeatureStructure]): assert actual_text == expected_text +def test_deep_copy_preserves_view_specific_language_and_mime(): + """A CAS with multiple views should preserve each view's language and mime on deep_copy.""" + cas = Cas() + + # initial view + cas.sofa_string = "initial" + cas.document_language = "en" + cas.sofa_mime = "text/plain" + + # create and set values on a second view + view2 = cas.create_view("other") + view2.sofa_string = "zweite" + view2.document_language = "de" + view2.sofa_mime = "text/html" + + # ensure both DocumentAnnotation instances exist on their views + assert cas.get_view("_InitialView").document_language == "en" + assert cas.get_view("other").document_language == "de" + + cas_copy = cas.deep_copy() + + # verify copy preserves per-view language and mime + copy_init = cas_copy.get_view("_InitialView") + copy_other = cas_copy.get_view("other") + + assert copy_init.document_language == "en" + assert copy_init.sofa_mime == "text/plain" + + assert copy_other.document_language == "de" + assert copy_other.sofa_mime == "text/html" + + def test_FeatureStructure_get_covered_text_sentences(sentences: list[FeatureStructure]): actual_text = [sentence.get_covered_text() for sentence in sentences] @@ -383,6 +591,39 @@ def test_FeatureStructure_get_covered_text_sentences(sentences: list[FeatureStru assert actual_text == expected_text +def test_deep_copy_preserves_active_view(): + """If deep_copy is called on a CAS whose current view is non-initial, + the copied CAS should have the same active view as the source. + """ + cas = Cas() + + # initial view + cas.sofa_string = "initial" + + # create a second view and set its sofa string + view2 = cas.create_view("other") + view2.sofa_string = "zweite" + + # obtain a Cas object whose current view is the non-initial view + cas_other = cas.get_view("other") + assert cas_other.get_sofa().sofaID == "other" + + # Record active views on both the original CAS and the view-specific Cas + orig_active_on_cas = cas.get_sofa().sofaID + orig_active_on_cas_other = cas_other.get_sofa().sofaID + + # deep-copy the CAS while the non-initial view is current + cas_copy = cas_other.deep_copy() + + # the copied CAS should have the same active view name and sofa string + assert cas_copy.get_sofa().sofaID == cas_other.get_sofa().sofaID + assert cas_copy.sofa_string == cas_other.sofa_string + + # ensure the original CAS objects kept their active views + assert cas.get_sofa().sofaID == orig_active_on_cas + assert cas_other.get_sofa().sofaID == orig_active_on_cas_other + + # Adding annotations @@ -628,6 +869,190 @@ def test_covered_text_on_annotation_without_sofa(): ann.get_covered_text() +def test_deep_copy_without_typesystem(small_xmi, small_typesystem_xml): + org = load_cas_from_xmi(small_xmi, typesystem=load_typesystem(small_typesystem_xml)) + copy = org.deep_copy(copy_typesystem=False) + + assert org != copy + assert len(copy.to_json(pretty_print=True)) == len(org.to_json(pretty_print=True)) + assert copy.to_json(pretty_print=True) == org.to_json(pretty_print=True) + + assert org.typesystem == copy.typesystem + + +def test_deep_copy_with_typesystem(small_xmi, small_typesystem_xml): + org = load_cas_from_xmi(small_xmi, typesystem=load_typesystem(small_typesystem_xml)) + copy = org.deep_copy(copy_typesystem=True) + + assert org != copy + assert len(copy.to_json(pretty_print=True)) == len(org.to_json(pretty_print=True)) + assert copy.to_json(pretty_print=True) == org.to_json(pretty_print=True) + + assert org.typesystem != copy.typesystem + assert len(org.typesystem.to_xml()) == len(copy.typesystem.to_xml()) + assert org.typesystem.to_xml() == copy.typesystem.to_xml() + + +def test_random_multi_type_random_deep_copy(): + generator = MultiTypeRandomCasGenerator() + for i in range(0, 10): + generator.size = (i + 1) * 10 + generator.type_count = i + 1 + typesystem = generator.generate_type_system() + org = generator.generate_cas(typesystem) + # Debugging print removed to avoid noisy CI output; keep deep-copy call. + copy = org.deep_copy(copy_typesystem=True) + + org_text = org.to_xmi(pretty_print=True) + copy_text = copy.to_xmi(pretty_print=True) + + assert org != copy + assert len(org_text) == len(copy_text) + assert org_text == copy_text + + +def test_random_multi_feature_deep_copy(): + generator = MultiFeatureRandomCasGenerator() + for i in range(0, 10): + generator.size = (i + 1) * 10 + typesystem = generator.generate_type_system() + org = generator.generate_cas(typesystem) + copy = org.deep_copy(copy_typesystem=True) + + org_text = org.to_xmi(pretty_print=True) + copy_text = copy.to_xmi(pretty_print=True) + + assert org != copy + assert len(org_text) == len(copy_text) + assert org_text == copy_text + + +def _make_fs_list(typesystem, *elements): + empty_fs_list_type = typesystem.get_type("uima.cas.EmptyFSList") + non_empty_fs_list_type = typesystem.get_type("uima.cas.NonEmptyFSList") + + current = empty_fs_list_type() + for element in reversed(elements): + node = non_empty_fs_list_type() + node.head = element + node.tail = current + current = node + + return current + + +def _fs_list_elements(fs_list): + elements = [] + current = fs_list + + while hasattr(current, "head"): + elements.append(current.head) + current = current.tail + + return elements + + +def test_deep_copy_preserves_inline_fslist_feature(): + typesystem = TypeSystem() + Item = typesystem.create_type("test.Item", supertypeName=TYPE_NAME_ANNOTATION) + Container = typesystem.create_type("test.Container", supertypeName=TYPE_NAME_ANNOTATION) + typesystem.create_feature( + Container, + "items", + rangeType="uima.cas.FSList", + elementType=Item, + multipleReferencesAllowed=False, + ) + + cas = Cas(typesystem=typesystem) + cas.sofa_string = "abcd" + + first = Item(begin=0, end=1) + second = Item(begin=1, end=2) + container = Container(begin=0, end=2) + container.items = _make_fs_list(typesystem, first, second) + + cas.add_all([first, second, container]) + + copy = cas.deep_copy(copy_typesystem=False) + + copied_container = copy.select("test.Container")[0] + copied_items = _fs_list_elements(copied_container.items) + + assert copied_container.items is not None + assert [item.begin for item in copied_items] == [0, 1] + assert copied_container.items is not container.items + assert copied_items[0] is not first + assert copied_items[1] is not second + + +def test_deep_copy_preserves_referenced_fslist_feature(): + typesystem = TypeSystem() + Item = typesystem.create_type("test.Item", supertypeName=TYPE_NAME_ANNOTATION) + Container = typesystem.create_type("test.Container", supertypeName=TYPE_NAME_ANNOTATION) + typesystem.create_feature( + Container, + "items", + rangeType="uima.cas.FSList", + elementType=Item, + multipleReferencesAllowed=True, + ) + + cas = Cas(typesystem=typesystem) + cas.sofa_string = "abcd" + + first = Item(begin=0, end=1) + second = Item(begin=1, end=2) + container = Container(begin=0, end=2) + container.items = _make_fs_list(typesystem, first, second) + + cas.add_all([first, second, container]) + + copy = cas.deep_copy(copy_typesystem=False) + + copied_container = copy.select("test.Container")[0] + copied_items = _fs_list_elements(copied_container.items) + + assert copied_container.items is not None + assert [item.begin for item in copied_items] == [0, 1] + assert copied_container.items is not container.items + assert copied_items[0] is not first + assert copied_items[1] is not second + + +def test_deep_copy_fully_decoupled(small_xmi, small_typesystem_xml): + """Ensure deep copies do not share feature structure instances with the original. + + We create copies with and without copying the typesystem and assert that + none of the FeatureStructure objects returned by `_find_all_fs()` are the + identical (``is``) between the original and the copy. + """ + typesystem = load_typesystem(small_typesystem_xml) + org = load_cas_from_xmi(small_xmi, typesystem=typesystem) + + for copy_typesystem in (False, True): + copy = org.deep_copy(copy_typesystem=copy_typesystem) + + org_fs = list(org._find_all_fs()) + copy_fs = list(copy._find_all_fs()) + + # sanity: number of FS should match + assert len(org_fs) == len(copy_fs) + + for a in org_fs: + assert all(a is not b for b in copy_fs) + + # Ensure the CAS-level sofas are distinct objects + for org_sofa in org.sofas: + copy_sofa = copy._sofas.get(org_sofa.sofaID) + assert org_sofa is not copy_sofa + + # Ensure no FS references the same sofa object across original and copy + org_sofa_refs = {id(fs.sofa) for fs in org_fs if hasattr(fs, "sofa") and fs.sofa is not None} + copy_sofa_refs = {id(fs.sofa) for fs in copy_fs if hasattr(fs, "sofa") and fs.sofa is not None} + assert org_sofa_refs.isdisjoint(copy_sofa_refs) + + def test_runtime_generated_annotation_is_detected_and_shown_in_anchor(): ts = TypeSystem() # Create a new annotation subtype (should inherit from Annotation base) @@ -967,3 +1392,402 @@ def test_crop_sofa_string_serialization_roundtrip_transitive_refs_beyond_end(sma # Ensure child was serialized and reloaded (may have unmapped offsets) all_fs = list(new_cas._find_all_fs()) assert any(fs.type.name == "test.Child" for fs in all_fs) + + +def test_deep_copy_array_with_none_entries(): + """Ensure FSArray with None entries preserves positions and references are decoupled.""" + typesystem = TypeSystem() + Child = typesystem.create_type("test.Child") + Foo = typesystem.create_type("test.Foo") + typesystem.create_feature( + Foo, + "arr", + rangeType=typesystem.get_type("uima.cas.FSArray"), + elementType=Child, + multipleReferencesAllowed=True, + ) + + cas = Cas(typesystem=typesystem) + # create two child FS and an array with a None placeholder + child1 = Child() + child2 = Child() + cas.add(child1) + cas.add(child2) + + arr = typesystem.get_type("uima.cas.FSArray")() + arr.elements = [child1, None, child2] + + foo = Foo() + foo.arr = arr + cas.add(foo) + + copy = cas.deep_copy(copy_typesystem=False) + + copied_foos = list(copy.select("test.Foo")) + assert len(copied_foos) == 1 + copied_arr = copied_foos[0].arr + # preserve length and None placeholder + assert len(copied_arr.elements) == 3 + assert copied_arr.elements[1] is None + # ensure child objects were copied (not identical) + assert copied_arr.elements[0] is not child1 + assert copied_arr.elements[2] is not child2 + + +def test_deep_copy_none_non_primitive_feature(): + """Ensure non-primitive features set to None are preserved in the copy without warnings.""" + typesystem = TypeSystem() + Child = typesystem.create_type("test.Child") + Parent = typesystem.create_type("test.Parent") + typesystem.create_feature(Parent, "child", Child) + + cas = Cas(typesystem=typesystem) + parent = Parent() + parent.child = None + cas.add(parent) + + copy = cas.deep_copy(copy_typesystem=False) + copied_parent = list(copy.select("test.Parent"))[0] + assert getattr(copied_parent, "child") is None + + +def test_deep_copy_none_fsarray_feature(): + """Ensure an FSArray feature set to None is preserved in the copy.""" + typesystem = TypeSystem() + Foo = typesystem.create_type("test.Foo") + typesystem.create_feature( + Foo, + "arr", + rangeType=typesystem.get_type("uima.cas.FSArray"), + elementType=typesystem.get_type(TYPE_NAME_TOP), + multipleReferencesAllowed=True, + ) + + cas = Cas(typesystem=typesystem) + foo = Foo() + foo.arr = None + cas.add(foo) + + copy = cas.deep_copy(copy_typesystem=False) + copied_foo = list(copy.select("test.Foo"))[0] + assert getattr(copied_foo, "arr") is None + + +def test_deep_copy_none_primitive_collection_feature(): + """Ensure a primitive collection feature set to None is preserved in the copy.""" + typesystem = TypeSystem() + Foo = typesystem.create_type("test.Foo") + typesystem.create_feature( + Foo, + "ints", + rangeType=typesystem.get_type(TYPE_NAME_INTEGER_ARRAY), + elementType=typesystem.get_type(TYPE_NAME_INTEGER), + multipleReferencesAllowed=True, + ) + + cas = Cas(typesystem=typesystem) + foo = Foo() + foo.ints = None + cas.add(foo) + + copy = cas.deep_copy(copy_typesystem=False) + copied_foo = list(copy.select("test.Foo"))[0] + assert getattr(copied_foo, "ints") is None + + +def test_deep_copy_primitive_collection_elements_are_copied(): + """Ensure primitive collection `elements` list is copied, not shared.""" + typesystem = TypeSystem() + Foo = typesystem.create_type("test.Foo") + typesystem.create_feature( + Foo, + "ints", + rangeType=typesystem.get_type(TYPE_NAME_INTEGER_ARRAY), + elementType=typesystem.get_type(TYPE_NAME_INTEGER), + multipleReferencesAllowed=True, + ) + + cas = Cas(typesystem=typesystem) + foo = Foo() + + IntegerArray = typesystem.get_type(TYPE_NAME_INTEGER_ARRAY) + int_arr = IntegerArray() + int_arr.elements = [1, 2, 3] + foo.ints = int_arr + cas.add(foo) + + copy = cas.deep_copy(copy_typesystem=False) + copied_foo = list(copy.select("test.Foo"))[0] + + # content equal but container should be a different object + assert copied_foo.ints.elements == int_arr.elements + assert copied_foo.ints.elements is not int_arr.elements + + # mutation of original should not affect the copy + int_arr.elements.append(99) + assert 99 not in copied_foo.ints.elements + + +def test_deep_copy_preserves_standalone_primitive_array_elements(): + """Ensure copied standalone primitive array FS keep their elements.""" + typesystem = TypeSystem() + Foo = typesystem.create_type("test.Foo") + typesystem.create_feature( + Foo, + "ints", + rangeType=typesystem.get_type(TYPE_NAME_INTEGER_ARRAY), + elementType=typesystem.get_type(TYPE_NAME_INTEGER), + multipleReferencesAllowed=True, + ) + + cas = Cas(typesystem=typesystem) + foo = Foo() + + IntegerArray = typesystem.get_type(TYPE_NAME_INTEGER_ARRAY) + int_arr = IntegerArray() + int_arr.elements = [1, 2, 3] + foo.ints = int_arr + cas.add(foo) + + copy = cas.deep_copy(copy_typesystem=False) + copied_foo = list(copy.select("test.Foo"))[0] + + assert copied_foo.ints.elements == [1, 2, 3] + assert copied_foo.ints.elements is not int_arr.elements + assert list(copy.select(TYPE_NAME_INTEGER_ARRAY)) == [] + + +def test_deep_copy_empty_array(): + """Ensure empty FSArray is preserved as empty in the copy.""" + typesystem = TypeSystem() + Foo = typesystem.create_type("test.Foo") + typesystem.create_feature( + Foo, + "arr", + rangeType=typesystem.get_type("uima.cas.FSArray"), + elementType=typesystem.get_type(TYPE_NAME_TOP), + multipleReferencesAllowed=True, + ) + + cas = Cas(typesystem=typesystem) + foo = Foo() + arr = typesystem.get_type("uima.cas.FSArray")() + arr.elements = [] + foo.arr = arr + cas.add(foo) + + copy = cas.deep_copy(copy_typesystem=False) + copied_foo = list(copy.select("test.Foo"))[0] + assert hasattr(copied_foo, "arr") + assert copied_foo.arr.elements == [] + + +def test_deep_copy_multiple_views_and_sofas_are_decoupled(): + """Create multiple views, deep copy and ensure sofas and view-assignments are decoupled.""" + ts = TypeSystem() + Token = ts.create_type("test.Token", supertypeName=TYPE_NAME_ANNOTATION) + + cas = Cas(typesystem=ts) + cas.sofa_string = "01234567890123456789" + + # initial view: add token + t1 = Token(begin=0, end=2) + cas.add(t1) + + # create and populate second view + view2 = cas.create_view("v2") + view2.sofa_string = "abcdefghij" + t2 = Token(begin=0, end=3) + view2.add(t2) + + copy = cas.deep_copy(copy_typesystem=False) + + # ensure top-level sofas are distinct objects + for orig_sofa in cas.sofas: + copy_sofa = copy._sofas.get(orig_sofa.sofaID) + assert copy_sofa is not orig_sofa + + # ensure annotations were copied into respective views and are not identical + orig_tokens = list(cas._find_all_fs()) + copy_tokens = list(copy._find_all_fs()) + assert len(orig_tokens) == len(copy_tokens) + for a in orig_tokens: + assert all(a is not b for b in copy_tokens) + + +def test_deep_copy_should_remap_sofa_array(): + """Regression test: ensure `deep_copy()` remaps a Sofa.sofaArray even when + the sofaArray FS is not indexed in any view. + + Expected behavior: the copied CAS should reference a copied sofaArray, + not the original object. + """ + cas = Cas() + + # Create a standalone byte array FS and assign an XMI id as if parsed + # from external representation. Do NOT add it to any view index. + ByteArray = cas.typesystem.get_type(TYPE_NAME_BYTE_ARRAY) + byte_array = ByteArray(elements=[1, 2, 3]) + byte_array.xmiID = 9999 + + cas.get_sofa().sofaArray = byte_array + + cas_copy = cas.deep_copy() + + # The copy should not keep a direct reference to the original byte array + assert cas_copy.get_sofa().sofaArray is not byte_array + + +def test_deep_copy_empty_primitive_array(): + """Ensure an empty primitive array feature is preserved as empty in the copy.""" + typesystem = TypeSystem() + Foo = typesystem.create_type("test.Foo") + typesystem.create_feature( + Foo, + "ints", + rangeType=typesystem.get_type(TYPE_NAME_INTEGER_ARRAY), + elementType=typesystem.get_type(TYPE_NAME_INTEGER), + multipleReferencesAllowed=True, + ) + + cas = Cas(typesystem=typesystem) + foo = Foo() + int_arr = typesystem.get_type(TYPE_NAME_INTEGER_ARRAY)() + int_arr.elements = [] + foo.ints = int_arr + cas.add(foo) + + copy = cas.deep_copy(copy_typesystem=False) + copied_foo = list(copy.select("test.Foo"))[0] + assert copied_foo.ints.elements == [] + assert copied_foo.ints is not int_arr + + +def test_deep_copy_none_fslist_feature(): + """Ensure an FSList feature set to None is preserved in the copy.""" + typesystem = TypeSystem() + Item = typesystem.create_type("test.Item", supertypeName=TYPE_NAME_ANNOTATION) + Container = typesystem.create_type("test.Container", supertypeName=TYPE_NAME_ANNOTATION) + typesystem.create_feature( + Container, + "items", + rangeType=TYPE_NAME_FS_LIST, + elementType=Item, + multipleReferencesAllowed=False, + ) + + cas = Cas(typesystem=typesystem) + cas.sofa_string = "ab" + container = Container(begin=0, end=2) + container.items = None + cas.add(container) + + copy = cas.deep_copy(copy_typesystem=False) + copied_container = copy.select("test.Container")[0] + assert getattr(copied_container, "items") is None + + +def test_deep_copy_empty_fslist_feature(): + """Ensure an empty FSList (EmptyFSList) is preserved as empty in the copy.""" + typesystem = TypeSystem() + Item = typesystem.create_type("test.Item", supertypeName=TYPE_NAME_ANNOTATION) + Container = typesystem.create_type("test.Container", supertypeName=TYPE_NAME_ANNOTATION) + typesystem.create_feature( + Container, + "items", + rangeType=TYPE_NAME_FS_LIST, + elementType=Item, + multipleReferencesAllowed=False, + ) + + cas = Cas(typesystem=typesystem) + cas.sofa_string = "ab" + container = Container(begin=0, end=2) + container.items = _make_fs_list(typesystem) + cas.add(container) + + copy = cas.deep_copy(copy_typesystem=False) + copied_container = copy.select("test.Container")[0] + assert copied_container.items is not None + assert copied_container.items.type.name == TYPE_NAME_EMPTY_FS_LIST + assert _fs_list_elements(copied_container.items) == [] + + +def test_deep_copy_with_nonempty_integer_list_range(): + """Feature with rangeType NonEmptyIntegerList should preserve its head value across deep_copy.""" + cas = Cas() + ts = cas.typesystem + + MyType = ts.create_type("test.WithNonEmptyIntegerList") + ts.create_feature(MyType, name="ints", rangeType=TYPE_NAME_NON_EMPTY_INTEGER_LIST) + + nonempty = ts.get_type(TYPE_NAME_NON_EMPTY_INTEGER_LIST)() + nonempty.head = 42 + nonempty.tail = ts.get_type(TYPE_NAME_EMPTY_INTEGER_LIST)() + + myfs = MyType() + myfs.ints = nonempty + cas.add(myfs) + + copy = cas.deep_copy() + copied = list(copy.select("test.WithNonEmptyIntegerList"))[0] + assert copied.ints is not None + assert copied.ints.head == 42 + + +def test_deep_copy_with_nonempty_float_list_range(): + """Feature with rangeType NonEmptyFloatList should preserve its head value across deep_copy.""" + cas = Cas() + ts = cas.typesystem + + MyType = ts.create_type("test.WithNonEmptyFloatList") + ts.create_feature(MyType, name="floats", rangeType=TYPE_NAME_NON_EMPTY_FLOAT_LIST) + + nonempty = ts.get_type(TYPE_NAME_NON_EMPTY_FLOAT_LIST)() + nonempty.head = 3.14 + nonempty.tail = ts.get_type(TYPE_NAME_EMPTY_FLOAT_LIST)() + + myfs = MyType() + myfs.floats = nonempty + cas.add(myfs) + + copy = cas.deep_copy() + copied = list(copy.select("test.WithNonEmptyFloatList"))[0] + assert copied.floats is not None + assert copied.floats.head == 3.14 + + +def test_deep_copy_with_nonempty_string_list_range(): + """Feature with rangeType NonEmptyStringList should preserve its head value across deep_copy.""" + cas = Cas() + ts = cas.typesystem + + MyType = ts.create_type("test.WithNonEmptyStringList") + ts.create_feature(MyType, name="strings", rangeType=TYPE_NAME_NON_EMPTY_STRING_LIST) + + nonempty = ts.get_type(TYPE_NAME_NON_EMPTY_STRING_LIST)() + nonempty.head = "hello" + nonempty.tail = ts.get_type(TYPE_NAME_EMPTY_STRING_LIST)() + + myfs = MyType() + myfs.strings = nonempty + cas.add(myfs) + + copy = cas.deep_copy() + copied = list(copy.select("test.WithNonEmptyStringList"))[0] + assert copied.strings is not None + assert copied.strings.head == "hello" + + +def test_deep_copy_of_empty_cas(): + """Ensure deep_copy works on a freshly initialized CAS with no user FS.""" + cas = Cas() + + copy = cas.deep_copy(copy_typesystem=False) + + assert copy is not cas + assert [s.sofaID for s in copy.sofas] == [s.sofaID for s in cas.sofas] + for orig_sofa in cas.sofas: + copy_sofa = copy._sofas.get(orig_sofa.sofaID) + assert copy_sofa is not orig_sofa + assert list(copy._find_all_fs()) == [] diff --git a/tests/test_json.py b/tests/test_json.py index d29a4ec..24046d2 100644 --- a/tests/test_json.py +++ b/tests/test_json.py @@ -122,6 +122,142 @@ def test_deserialization_serialization_one_way(json_path, annotations): assert_json_equal(actual_json, expected_json, sort_keys=True) +def test_json_roundtrip_shared_fsarray_identity(): + cas = Cas() + ts = cas.typesystem + + ElemType = ts.create_type("test.Elem") + ParentType = ts.create_type("test.Parent") + ts.create_feature( + ParentType, + name="arr", + rangeType="uima.cas.FSArray", + elementType="test.Elem", + multipleReferencesAllowed=True, + ) + + elem = ElemType() + cas.add(elem) + + array_fs = ts.get_type("uima.cas.FSArray")() + array_fs.elements = [elem] + cas.add(array_fs) + + first = ParentType() + second = ParentType() + first.arr = array_fs + second.arr = array_fs + cas.add(first) + cas.add(second) + + expected_json = cas.to_json() + + cas_copy = cas.deep_copy() + copied_parents = list(cas_copy.select("test.Parent")) + assert len(copied_parents) == 2 + assert copied_parents[0].arr is copied_parents[1].arr + + actual_json = cas_copy.to_json() + assert_json_equal(actual_json, expected_json, sort_keys=True) + + +def test_json_roundtrip_shared_primitive_array_identity(): + typesystem = TypeSystem() + Parent = typesystem.create_type("test.Parent") + typesystem.create_feature( + Parent, + "ints", + rangeType="uima.cas.IntegerArray", + elementType="uima.cas.Integer", + multipleReferencesAllowed=True, + ) + + cas = Cas(typesystem) + int_array = typesystem.get_type("uima.cas.IntegerArray")() + int_array.elements = [1, 2, 3] + cas.add(int_array) + + first = Parent() + second = Parent() + first.ints = int_array + second.ints = int_array + cas.add(first) + cas.add(second) + + expected_json = cas.to_json() + + cas_copy = cas.deep_copy() + copied_parents = list(cas_copy.select("test.Parent")) + assert len(copied_parents) == 2 + assert copied_parents[0].ints is copied_parents[1].ints + + actual_json = cas_copy.to_json() + assert_json_equal(actual_json, expected_json, sort_keys=True) + + +def test_deep_copy_preserves_view_membership_for_non_annotation_fs_in_json(): + cas = Cas() + initial_view = cas.get_view("_InitialView") + secondary_view = cas.create_view("sofa2") + + initial_view.sofa_string = "First view" + secondary_view.sofa_string = "Second view contents" + + integer_array = cas.typesystem.get_type("uima.cas.IntegerArray")() + integer_array.elements = [1, 2, 3] + initial_view.add(integer_array) + + document_annotation = cas.typesystem.get_type(TYPE_NAME_DOCUMENT_ANNOTATION)() + document_annotation.begin = 0 + document_annotation.end = len(secondary_view.sofa_string) + secondary_view.add(document_annotation) + + expected_json = cas.to_json() + + cas_copy = cas.deep_copy() + + view1_members = list(cas_copy.get_view("_InitialView").select_all_fs()) + view2_members = list(cas_copy.get_view("sofa2").select_all_fs()) + + assert [fs.xmiID for fs in view1_members] == [integer_array.xmiID] + assert [fs.xmiID for fs in view2_members] == [document_annotation.xmiID] + + actual_json = cas_copy.to_json() + assert_json_equal(actual_json, expected_json, sort_keys=True) + + +def test_deep_copy_preserves_non_annotation_membership_in_multiple_views_in_json(): + cas = Cas() + initial_view = cas.get_view("_InitialView") + secondary_view = cas.create_view("sofa2") + + initial_view.sofa_string = "First view" + secondary_view.sofa_string = "Second view" + + shared_array = cas.typesystem.get_type("uima.cas.IntegerArray")() + shared_array.elements = [1, 2, 3] + initial_view.add(shared_array) + secondary_view.add(shared_array) + + annotation = cas.typesystem.get_type(TYPE_NAME_DOCUMENT_ANNOTATION)() + annotation.begin = 0 + annotation.end = len(secondary_view.sofa_string) + secondary_view.add(annotation) + + expected_json = cas.to_json() + + cas_copy = cas.deep_copy() + + view1_members = [fs.xmiID for fs in cas_copy.get_view("_InitialView").select_all_fs()] + view2_members = [fs.xmiID for fs in cas_copy.get_view("sofa2").select_all_fs()] + + assert view1_members == [shared_array.xmiID] + assert set(view2_members) == {annotation.xmiID, shared_array.xmiID} + + actual_json = cas_copy.to_json() + assert_json_equal(actual_json, expected_json, sort_keys=True) + + def test_multi_type_random_serialization_deserialization(): generator = MultiTypeRandomCasGenerator() for i in range(0, 10): @@ -129,7 +265,6 @@ def test_multi_type_random_serialization_deserialization(): generator.type_count = i + 1 typesystem = generator.generate_type_system() randomized_cas = generator.generate_cas(typesystem) - print(f"CAS size: {sum(len(view.get_all_fs()) for view in randomized_cas.views)}") expected_json = randomized_cas.to_json() loaded_cas = load_cas_from_json(expected_json) @@ -144,7 +279,6 @@ def test_multi_feature_random_serialization_deserialization(): generator.size = (i + 1) * 10 typesystem = generator.generate_type_system() randomized_cas = generator.generate_cas(typesystem) - print(f"CAS size: {sum(len(view.get_all_fs()) for view in randomized_cas.views)}") expected_json = randomized_cas.to_json() loaded_cas = load_cas_from_json(expected_json) @@ -175,9 +309,6 @@ def test_unicode(json_path, annotations): if not expected_covered_text: continue - for n in range(len(actual_covered_text)): - print(f"{n}: [{actual_covered_text[n]}] {hex(ord(actual_covered_text[n]))}") - if len(expected) >= 5: expected_utf8_bytes = expected[4] actual_utf8_bytes = bytes(actual_covered_text, "UTF-8") diff --git a/tests/test_util.py b/tests/test_util.py index b86cdfa..d0b14fe 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -55,7 +55,6 @@ def test_cas_to_comparable_text_on_multi_feature_random(): generator.size = (i + 1) * 10 typesystem = generator.generate_type_system() randomized_cas = generator.generate_cas(typesystem) - print(f"CAS size: {sum(len(view.get_all_fs()) for view in randomized_cas.views)}") cas_to_comparable_text(randomized_cas) # At this point, we are just testing if there is no exception during rendering @@ -66,7 +65,6 @@ def test_cas_to_comparable_text_on_multi_type_random(): generator.size = (i + 1) * 10 typesystem = generator.generate_type_system() randomized_cas = generator.generate_cas(typesystem) - print(f"CAS size: {sum(len(view.get_all_fs()) for view in randomized_cas.views)}") cas_to_comparable_text(randomized_cas) # At this point, we are just testing if there is no exception during rendering diff --git a/tests/test_xmi.py b/tests/test_xmi.py index f1be393..75c5a80 100644 --- a/tests/test_xmi.py +++ b/tests/test_xmi.py @@ -3,7 +3,7 @@ from lxml import etree -from cassis.typesystem import TYPE_NAME_ANNOTATION, TYPE_NAME_SOFA, TypeNotFoundError +from cassis.typesystem import TYPE_NAME_ANNOTATION, TYPE_NAME_DOCUMENT_ANNOTATION, TYPE_NAME_SOFA, TypeNotFoundError from tests.fixtures import * from pytest_lazy_fixtures import lf from tests.test_files.test_cas_generators import ( @@ -13,6 +13,91 @@ ) from tests.util import assert_xml_equal + +def test_xmi_roundtrip_shared_fsarray_identity(): + """Ensure that a CAS with two parents sharing the same FSArray deep-copies + while preserving shared-array identity and that the copy serializes to + the same XMI as the original. + """ + from cassis.cas import Cas + + cas = Cas() + ts = cas.typesystem + + ElemType = ts.create_type("test.Elem") + ParentType = ts.create_type("test.Parent") + ts.create_feature( + ParentType, + name="arr", + rangeType="uima.cas.FSArray", + elementType="test.Elem", + multipleReferencesAllowed=True, + ) + + # shared array and element + elem = ElemType() + cas.add(elem) + array_fs = ts.get_type("uima.cas.FSArray")() + array_fs.elements = [elem] + cas.add(array_fs) + + p1 = ParentType() + p2 = ParentType() + p1.arr = array_fs + p2.arr = array_fs + cas.add(p1) + cas.add(p2) + + xmi_orig = cas.to_xmi() + + cas_copy = cas.deep_copy() + # identity preserved + parents = list(cas_copy.select("test.Parent")) + assert len(parents) == 2 + assert parents[0].arr is parents[1].arr + + # and XMI representation matches (structurally) + xmi_copy = cas_copy.to_xmi() + assert_xml_equal(xmi_copy, xmi_orig) + + +def test_xmi_roundtrip_shared_primitive_array_identity(): + """Ensure shared primitive arrays remain referenced after deep_copy and serialize unchanged.""" + from cassis.cas import Cas + + typesystem = TypeSystem() + Parent = typesystem.create_type("test.Parent") + typesystem.create_feature( + Parent, + "ints", + rangeType="uima.cas.IntegerArray", + elementType="uima.cas.Integer", + multipleReferencesAllowed=True, + ) + + cas = Cas(typesystem) + int_array = typesystem.get_type("uima.cas.IntegerArray")() + int_array.elements = [1, 2, 3] + cas.add(int_array) + + first = Parent() + second = Parent() + first.ints = int_array + second.ints = int_array + cas.add(first) + cas.add(second) + + xmi_orig = cas.to_xmi() + + cas_copy = cas.deep_copy() + copied_parents = list(cas_copy.select("test.Parent")) + assert len(copied_parents) == 2 + assert copied_parents[0].ints is copied_parents[1].ints + + xmi_copy = cas_copy.to_xmi() + assert_xml_equal(xmi_copy, xmi_orig) + + # Deserializing FIXTURES = [ @@ -113,6 +198,69 @@ def test_views_are_parsed(small_xmi, small_typesystem_xml): assert 1 == len(list(view2.select_all_annotations())) +def test_deep_copy_preserves_view_membership_for_non_annotation_fs(small_typesystem_xml): + typesystem = load_typesystem(small_typesystem_xml) + cas_xmi = """ + + + + + + + + + + """ + + cas = load_cas_from_xmi(cas_xmi, typesystem=typesystem) + xmi_orig = cas.to_xmi() + cas_copy = cas.deep_copy() + + view1_members = list(cas_copy.get_view("sofa1").select_all_fs()) + view2_members = list(cas_copy.get_view("sofa2").select_all_fs()) + + assert [fs.xmiID for fs in view1_members] == [4] + assert [fs.xmiID for fs in view2_members] == [3] + + xmi_copy = cas_copy.to_xmi() + assert_xml_equal(xmi_copy, xmi_orig) + + +def test_deep_copy_preserves_non_annotation_membership_in_multiple_views(): + cas = Cas() + initial_view = cas.get_view("_InitialView") + secondary_view = cas.create_view("sofa2") + + initial_view.sofa_string = "First view" + secondary_view.sofa_string = "Second view" + + shared_array = cas.typesystem.get_type("uima.cas.IntegerArray")() + shared_array.elements = [1, 2, 3] + initial_view.add(shared_array) + secondary_view.add(shared_array) + + annotation = cas.typesystem.get_type(TYPE_NAME_DOCUMENT_ANNOTATION)() + annotation.begin = 0 + annotation.end = len(secondary_view.sofa_string) + secondary_view.add(annotation) + + xmi_orig = cas.to_xmi() + + cas_copy = cas.deep_copy() + + view1_members = [fs.xmiID for fs in cas_copy.get_view("_InitialView").select_all_fs()] + view2_members = [fs.xmiID for fs in cas_copy.get_view("sofa2").select_all_fs()] + + assert view1_members == [shared_array.xmiID] + assert set(view2_members) == {annotation.xmiID, shared_array.xmiID} + + xmi_copy = cas_copy.to_xmi() + assert_xml_equal(xmi_copy, xmi_orig) + + def test_deserializing_and_then_adding_annotations_works(small_xmi, small_typesystem_xml): typesystem = load_typesystem(small_typesystem_xml) TokenType = typesystem.get_type("cassis.Token")