Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion database/istsos_auth.sql
Original file line number Diff line number Diff line change
Expand Up @@ -775,7 +775,7 @@ FOR tablename IN SELECT unnest(tables)
RAISE EXCEPTION USING ERRCODE = '42704';
END IF;

new_roles_ := old_roles_ || users_;
new_roles_ := (SELECT array_agg(DISTINCT x) FROM unnest(old_roles_ || users_) AS x);

EXECUTE format('ALTER POLICY %I ON sensorthings.%I TO %s', policyname_, tablename_, array_to_string(new_roles_, ','));

Expand Down
48 changes: 48 additions & 0 deletions test/database/test_auth_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,54 @@ def test_add_users_to_policy_raises_for_missing_policy(self, schema):
(["guest"], "nonexistent_policy_xyz"),
)

def test_add_users_to_policy_deduplicates_duplicate_users(self, schema):
"""
add_users_to_policy must remove duplicate usernames when the same user
is added multiple times (either across calls or within the same call).
"""
pname = "test-dedup-pol"
with schema.cursor() as cur:
# Create initial policy with alice, bob
cur.execute(
"SELECT sensorthings.viewer_policy(%s::text[], %s)",
(["alice", "bob"], pname),
)

# Verify initial state: each policy should have exactly alice, bob
cur.execute(
"""
SELECT roles FROM pg_policies
WHERE schemaname = 'sensorthings'
AND policyname LIKE %s
ORDER BY tablename
""",
(f"{pname}%",),
)
initial_roles = [row[0] for row in cur.fetchall()]
assert all(set(r) == {"alice", "bob"} for r in initial_roles)

# Add duplicates: alice and bob already present, add bob again + charlie
cur.execute(
"SELECT sensorthings.add_users_to_policy(%s::text[], %s)",
(["alice", "bob", "charlie"], pname),
)

# Verify deduplication: final set should be {alice, bob, charlie}
cur.execute(
"""
SELECT roles FROM pg_policies
WHERE schemaname = 'sensorthings'
AND policyname LIKE %s
ORDER BY tablename
""",
(f"{pname}%",),
)
final_roles = [row[0] for row in cur.fetchall()]
for roles in final_roles:
assert sorted(roles) == ["alice", "bob", "charlie"], (
f"Expected deduped ['alice','bob','charlie'], got {roles}"
)

# ------------------------------------------------------------------
# 10. Btree indexes on commit_id
# ------------------------------------------------------------------
Expand Down