-
-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathdatabase.py
More file actions
142 lines (128 loc) · 6.47 KB
/
database.py
File metadata and controls
142 lines (128 loc) · 6.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import sqlite3
import logging
from contextlib import contextmanager
from typing import Dict, List, Optional
from dataclasses import dataclass
@dataclass
class JiraTicket:
"""Data class representing a JIRA ticket."""
ticket_id: str
summary: str
description: str
has_subtasks: bool
status: str
issue_type: str = None
things_id: str = None
last_updated: str = None
class DatabaseManager:
"""Manages SQLite database operations for JIRA tickets."""
def __init__(self, db_path: str, jira_base_url: str):
self.db_path = db_path
self.jira_base_url = jira_base_url.rstrip('/') # Remove trailing slash if present
logging.debug(f"Initializing database at {db_path}")
self._init_db()
@contextmanager
def get_connection(self):
"""Context manager for database connections with automatic cleanup."""
logging.debug("Opening database connection")
conn = sqlite3.connect(self.db_path)
try:
yield conn
finally:
conn.close()
logging.debug("Closed database connection")
def _init_db(self) -> None:
"""Initialize the database schema with proper constraints."""
logging.info(f"Initializing database at path: {self.db_path}")
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS jira_tickets (
ticket_id TEXT PRIMARY KEY,
summary TEXT NOT NULL,
description TEXT,
has_subtasks BOOLEAN NOT NULL,
status TEXT,
issue_type TEXT,
things_id TEXT,
added_to_db TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
synced_to_things TEXT DEFAULT 'not synced' CHECK(synced_to_things IN ('synced', 'not synced', 'unknown')),
last_updated TIMESTAMP
)
''')
conn.commit()
logging.info("Database initialization complete")
def save_ticket(self, ticket: JiraTicket) -> None:
"""Save or update a ticket in the database.
Only updates timestamps and sync status when actual changes are detected.
"""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT summary, description, status, issue_type, things_id, synced_to_things
FROM jira_tickets WHERE ticket_id = ?
''', (ticket.ticket_id,))
row = cursor.fetchone()
things_id = ticket.things_id
if row:
# Ticket exists - check for content changes
existing_summary, existing_description, existing_status, existing_issue_type, existing_things_id, existing_synced = row
if not things_id:
things_id = existing_things_id
# Compare all relevant fields for changes
has_changes = (existing_summary != ticket.summary or
existing_description != ticket.description or
existing_status != ticket.status or
existing_issue_type != ticket.issue_type)
if not has_changes:
# No changes detected - exit early without DB writes
logging.debug(f"No changes detected for ticket {ticket.ticket_id}, preserving sync status")
return
else:
# Changes detected - update ticket and mark as unsynced
logging.info(f"Changes detected for ticket {ticket.ticket_id}, marking as not synced")
cursor.execute('''
UPDATE jira_tickets
SET summary = ?, description = ?, has_subtasks = ?, status = ?,
issue_type = ?, things_id = ?, synced_to_things = ?, last_updated = CURRENT_TIMESTAMP
WHERE ticket_id = ?
''', (ticket.summary, ticket.description, ticket.has_subtasks, ticket.status,
ticket.issue_type, things_id, 'not synced', ticket.ticket_id))
else:
# New ticket - insert with current timestamps
logging.debug(f"Inserting new ticket {ticket.ticket_id}")
cursor.execute('''
INSERT INTO jira_tickets
(ticket_id, summary, description, has_subtasks, status, issue_type,
things_id, synced_to_things, added_to_db, last_updated)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
''', (ticket.ticket_id, ticket.summary, ticket.description, ticket.has_subtasks,
ticket.status, ticket.issue_type, things_id, 'not synced'))
conn.commit()
def get_all_tickets(self) -> List[JiraTicket]:
"""Retrieve all tickets from the database."""
with self.get_connection() as conn:
cursor = conn.cursor()
logging.debug("Retrieving all tickets from database")
cursor.execute('SELECT ticket_id, summary, description, has_subtasks, status, issue_type, things_id, last_updated FROM jira_tickets')
return [JiraTicket(*row) for row in cursor.fetchall()]
def get_unsynced_tickets(self) -> List[JiraTicket]:
"""Get tickets that haven't been synced to Things (status = 'not synced')."""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT ticket_id, summary, description, has_subtasks, status,
issue_type, things_id, last_updated
FROM jira_tickets
WHERE synced_to_things = 'not synced'
''')
return [JiraTicket(*row) for row in cursor.fetchall()]
def get_ticket_by_id(self, ticket_id: str) -> Optional[JiraTicket]:
"""Retrieve a specific ticket by its ID. Returns None if not found."""
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('SELECT ticket_id, summary, description, has_subtasks, status, issue_type, things_id, last_updated FROM jira_tickets WHERE ticket_id = ?', (ticket_id,))
row = cursor.fetchone()
if row:
return JiraTicket(*row)
return None