diff --git a/python_tests/test_memo_protect_fields.py b/python_tests/test_memo_protect_fields.py index d4d2a4a7..c0bef517 100644 --- a/python_tests/test_memo_protect_fields.py +++ b/python_tests/test_memo_protect_fields.py @@ -501,6 +501,69 @@ def test_protected_field_update_does_not_require_create_access(db0_fixture): obj.name = "beta" +def test_protected_field_update_requires_update_access(db0_fixture): + account_id = ContextVar("protected_update_denied_account_id") + obj = MemoProtectedFieldsClass("alpha", 1) + db0.set_field_access(MemoProtectedFieldsClass, 101, (FieldAccess.READ,), "name") + db0.set_field_access(MemoProtectedFieldsClass, 202, (FieldAccess.READ, FieldAccess.UPDATE), "name") + db0._init_data_masking(account_id) + + account_id.set(101) + with pytest.raises(PermissionError, match="update"): + obj.name = "denied" + assert obj.name == "alpha" + + account_id.set(202) + obj.name = "allowed" + assert obj.name == "allowed" + + +def test_protected_field_update_requires_initialized_data_masking(db0_fixture): + obj = MemoProtectedFieldsClass("alpha", 1) + + with pytest.raises(RuntimeError, match="data masking"): + obj.name = "beta" + + +def test_protected_field_update_allows_debug_full_access_super_account(db0_fixture): + account_id = ContextVar("protected_update_debug_full_access_account_id") + obj = MemoProtectedFieldsClass("alpha", 1) + db0._init_data_masking(account_id, mode="DEBUG") + account_id.set(-2) + + obj.name = "beta" + assert obj.name == "beta" + + +def test_protected_field_update_rejects_debug_read_only_super_account(db0_fixture): + account_id = ContextVar("protected_update_debug_read_only_account_id") + obj = MemoProtectedFieldsClass("alpha", 1) + db0._init_data_masking(account_id, mode="DEBUG") + account_id.set(-1) + + with pytest.raises(PermissionError, match="update"): + obj.name = "beta" + assert obj.name == "alpha" + + +def test_protected_field_delete_requires_delete_access(db0_fixture): + account_id = ContextVar("protected_delete_account_id") + obj = MemoProtectedFieldsClass("alpha", 1) + db0.set_field_access(MemoProtectedFieldsClass, 101, (FieldAccess.READ, FieldAccess.UPDATE), "name") + db0.set_field_access(MemoProtectedFieldsClass, 202, (FieldAccess.READ, FieldAccess.DELETE), "name") + db0._init_data_masking(account_id) + + account_id.set(101) + with pytest.raises(PermissionError, match="delete"): + del obj.name + assert obj.name == "alpha" + + account_id.set(202) + del obj.name + with pytest.raises(AttributeError): + _ = obj.name + + def test_protected_field_getter_requires_read_access(db0_fixture): account_id = ContextVar("protected_read_account_id") obj = MemoProtectedFieldsClass("alpha", 1) diff --git a/src/dbzero/bindings/python/Memo.cpp b/src/dbzero/bindings/python/Memo.cpp index ebed082d..9f98b9af 100644 --- a/src/dbzero/bindings/python/Memo.cpp +++ b/src/dbzero/bindings/python/Memo.cpp @@ -441,10 +441,19 @@ namespace db0::python return mask && (*mask)[access_option]; } + void setProtectedFieldPermissionError(db0::object_model::FieldMaskOptions access_option) + { + std::stringstream message; + message << "data masking denies " << access_option + << " access to protected field"; + PyErr_SetString(PyExc_PermissionError, message.str().c_str()); + } + template PyObject *checkProtectedFieldReadAccess(MemoImplT *memo_obj, const db0::object_model::MemberLoc &member_loc) { - if (checkProtectedFieldAccess(memo_obj, db0::object_model::FieldMaskOptions::READ, member_loc)) { + auto accessOption = db0::object_model::FieldMaskOptions::READ; + if (checkProtectedFieldAccess(memo_obj, accessOption, member_loc)) { return nullptr; } if (PyErr_Occurred()) { @@ -457,29 +466,28 @@ namespace db0::python return masking_state->missingValuePlaceholder; } - PyErr_SetString(PyExc_PermissionError, "data masking denies read access to protected field"); + setProtectedFieldPermissionError(accessOption); return nullptr; } template - bool checkProtectedFieldCreateAccess(MemoImplT *memo_obj, const char *field_name) + bool checkProtectedFieldMutateAccess(MemoImplT *memo_obj, db0::object_model::FieldMaskOptions accessOption, + const char *fieldName) { auto &memo_type = memo_obj->ext().getType(); - if (!memo_type.isProtectFields() || !Settings::m_data_masking_enabled) { + if (!memo_type.isProtectFields()) { return true; } - auto member_loc = memo_type.findField(field_name); - if (checkProtectedFieldAccess( - memo_obj, db0::object_model::FieldMaskOptions::CREATE, member_loc, field_name - )) { + auto memberLoc = memo_type.findField(fieldName); + if (checkProtectedFieldAccess(memo_obj, accessOption, memberLoc, fieldName)) { return true; } if (PyErr_Occurred()) { return false; } - PyErr_SetString(PyExc_PermissionError, "data masking denies create access to protected field"); + setProtectedFieldPermissionError(accessOption); return false; } @@ -550,6 +558,12 @@ namespace db0::python if (isPersistentAttrName(attr_name)) { try { + if (!value && !checkProtectedFieldMutateAccess( + self, db0::object_model::FieldMaskOptions::DELETE, attr_name + )) { + return -1; + } + // must materialize the object before setting as an attribute if (value && !db0::object_model::isMaterialized(value)) { db0::FixtureLock lock(self->ext().getFixture()); @@ -559,14 +573,31 @@ namespace db0::python auto maybe_type_id = PyToolkit::getTypeManager().tryGetTypeId(value); if (maybe_type_id) { if (self->ext().hasInstance()) { - auto member_loc = self->ext().findField(attr_name); - if (!self->ext().tryGet(member_loc) && !checkProtectedFieldCreateAccess(self, attr_name)) { - return -1; + if (value) { + auto member_loc = self->ext().findField(attr_name); + bool memberExists = !!self->ext().tryGet(member_loc); + auto accessOption = memberExists + ? db0::object_model::FieldMaskOptions::UPDATE + : db0::object_model::FieldMaskOptions::CREATE; + if ( + (memberExists || Settings::m_data_masking_enabled) + && !checkProtectedFieldMutateAccess( + self, accessOption, attr_name + ) + ) { + return -1; + } } db0::FixtureLock lock(self->ext().getFixture()); self->modifyExt().set(lock, attr_name, *maybe_type_id, value); } else { - if (!checkProtectedFieldCreateAccess(self, attr_name)) { + if ( + value + && Settings::m_data_masking_enabled + && !checkProtectedFieldMutateAccess( + self, db0::object_model::FieldMaskOptions::CREATE, attr_name + ) + ) { return -1; } // considered as a non-mutating operation