Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "sinonym"
version = "0.2.5"
version = "0.2.7"
description = "Chinese Name Detection and Normalization Module"
readme = "README.md"
requires-python = ">=3.10"
Expand All @@ -27,6 +27,7 @@ keywords = ["chinese", "names", "nlp", "romanization", "pinyin"]
dependencies = [
"joblib>=1.3",
"numpy>=1.24",
"pydantic>=1.10,<2",
"pypinyin>=0.44.0",
"requests>=2.31",
"scikit-learn>=1.2,<2",
Expand Down
Empty file added sinonym/timo/__init__.py
Empty file.
12 changes: 12 additions & 0 deletions sinonym/timo/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
config_version: 0.0.1
model_variants:
sinonym_v1:
instance: sinonym.timo.interface.Instance
prediction: sinonym.timo.interface.Prediction
predictor: sinonym.timo.interface.Predictor
predictor_config: sinonym.timo.interface.PredictorConfig
artifacts_s3_path: null
python_version: "3.10"
cuda: False
integration_test: sinonym.timo.integration_test.TestIntegration
docker_run_commands: []
70 changes: 70 additions & 0 deletions sinonym/timo/integration_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import unittest

from sinonym.timo.interface import Instance, Prediction, Predictor, PredictorConfig


class TestIntegration(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.predictor = Predictor(config=PredictorConfig(), artifacts_dir=".")

def test_chinese_name(self):
results = self.predictor.predict_batch([Instance(name="Li Wei")])
self.assertEqual(len(results), 1)
self.assertIsInstance(results[0], Prediction)
self.assertTrue(results[0].success)
self.assertIsNotNone(results[0].given_name)
self.assertIsNotNone(results[0].surname)

def test_non_chinese_name(self):
results = self.predictor.predict_batch([Instance(name="John Smith")])
self.assertEqual(len(results), 1)
self.assertFalse(results[0].success)
self.assertIsNotNone(results[0].error_message)

def test_batch_superset_output(self):
instances = [Instance(name="Li Wei"), Instance(name="Wang Weiming")]
results = self.predictor.predict_batch(instances)
self.assertEqual(len(results), 2)
for r in results:
self.assertIsInstance(r, Prediction)
self.assertTrue(r.success)
# superset: every Prediction carries confidence + format_pattern
self.assertIsNotNone(r.confidence)
self.assertIsNotNone(r.format_pattern)
# shared batch pattern replicated onto each row
self.assertEqual(
results[0].format_pattern.dominant_format,
results[1].format_pattern.dominant_format,
)

def test_predict_batch_empty(self):
self.assertEqual(self.predictor.predict_batch([]), [])

def test_score_name_batch(self):
names = ["Li Wei", "Wang Weiming"]
summary = self.predictor.score_name_batch(names)
self.assertEqual(summary.names, names)
self.assertEqual(len(summary.results), 2)
self.assertEqual(len(summary.confidences), 2)
self.assertIsNotNone(summary.format_pattern.dominant_format)

def test_score_name_batch_tuned_threshold(self):
names = ["Li Wei", "Wang Weiming"]
summary = self.predictor.score_name_batch(names, format_threshold=0.9)
self.assertEqual(len(summary.results), 2)
self.assertEqual(
summary.format_pattern.threshold_met,
summary.format_pattern.confidence >= 0.9,
)

def test_detect_batch_format(self):
pattern = self.predictor.detect_batch_format(["Zhang Wei", "Li Ming", "Wang Xiaoli"])
self.assertIn(
pattern.dominant_format, {"surname_first", "given_first", "mixed"}
)

def test_analyze_name_batch_full(self):
result = self.predictor.analyze_name_batch(["Li Wei", "Wang Weiming"])
self.assertEqual(len(result.results), 2)
self.assertEqual(len(result.individual_analyses), 2)
244 changes: 244 additions & 0 deletions sinonym/timo/interface.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
from typing import List, Optional

from pydantic import BaseModel, BaseSettings, Field

from sinonym.detector import ChineseNameDetector


class Instance(BaseModel):
name: str = Field(description="Name string to detect/normalize as Chinese")


class FormatPattern(BaseModel):
"""Batch-level order detection (surname-first vs given-first)."""

dominant_format: str = Field(description="surname_first | given_first | mixed")
confidence: float = Field(description="dominant_count / total_count")
surname_first_count: int
given_first_count: int
total_count: int
threshold_met: bool = Field(description="confidence >= format_threshold")


class Prediction(BaseModel):
success: bool = Field(description="Whether the name was recognized as Chinese")
error_message: Optional[str] = Field(default=None, description="Reason for failure")
given_name: Optional[str] = Field(default=None)
surname: Optional[str] = Field(default=None)
middle_name: Optional[str] = Field(default=None)
confidence: Optional[float] = Field(
default=None, description="per-name confidence (softmax over candidate scores)"
)
format_pattern: Optional[FormatPattern] = Field(
default=None, description="shared batch order pattern (same on every row)"
)


class Candidate(BaseModel):
surname_tokens: List[str]
given_tokens: List[str]
score: float
format: str = Field(description="surname_first | given_first | mixed")
original_compound_format: Optional[str] = None


class IndividualAnalysis(BaseModel):
"""Per-name analysis, pre batch-override."""

raw_name: str
candidates: List[Candidate]
best_candidate: Optional[Candidate] = None
confidence: float = Field(description="softmax over candidate scores for best candidate")


class BatchPrediction(BaseModel):
"""Full result of analyze_name_batch."""

names: List[str]
results: List[Prediction]
format_pattern: FormatPattern
individual_analyses: List[IndividualAnalysis]
improvements: List[int] = Field(description="indices of names changed by batch context")


class BatchSummary(BaseModel):
"""Trimmed analyze_name_batch result: drops candidates, keeps per-name confidence only."""

names: List[str]
results: List[Prediction]
format_pattern: FormatPattern
confidences: List[float] = Field(
description="per-name confidence from individual_analyses, aligned with results"
)


class PredictorConfig(BaseSettings):
pass


class Predictor:
_config: PredictorConfig
_artifacts_dir: str

def __init__(self, config: PredictorConfig, artifacts_dir: str):
self._config = config
self._artifacts_dir = artifacts_dir
self._detector = ChineseNameDetector()

# ---- converters -------------------------------------------------------

def _to_prediction(self, parse_result) -> Prediction:
return Prediction(
success=parse_result.success,
error_message=parse_result.error_message,
given_name=parse_result.parsed.given_name if parse_result.parsed else None,
surname=parse_result.parsed.surname if parse_result.parsed else None,
middle_name=(
parse_result.parsed.middle_name
if parse_result.parsed and parse_result.parsed.middle_name
else None
),
)

def _to_format_pattern(self, pattern) -> FormatPattern:
return FormatPattern(
dominant_format=pattern.dominant_format.value,
confidence=pattern.confidence,
surname_first_count=pattern.surname_first_count,
given_first_count=pattern.given_first_count,
total_count=pattern.total_count,
threshold_met=pattern.threshold_met,
)

def _to_candidate(self, candidate) -> Candidate:
return Candidate(
surname_tokens=list(candidate.surname_tokens),
given_tokens=list(candidate.given_tokens),
score=candidate.score,
format=candidate.format.value,
original_compound_format=candidate.original_compound_format,
)

def _to_individual_analysis(self, analysis) -> IndividualAnalysis:
return IndividualAnalysis(
raw_name=analysis.raw_name,
candidates=[self._to_candidate(c) for c in analysis.candidates],
best_candidate=(
self._to_candidate(analysis.best_candidate)
if analysis.best_candidate is not None
else None
),
confidence=analysis.confidence,
)

def _to_batch_prediction(self, batch_result) -> BatchPrediction:
return BatchPrediction(
names=list(batch_result.names),
results=[self._to_prediction(r) for r in batch_result.results],
format_pattern=self._to_format_pattern(batch_result.format_pattern),
individual_analyses=[
self._to_individual_analysis(a) for a in batch_result.individual_analyses
],
improvements=list(batch_result.improvements),
)

def _to_batch_summary(self, batch_result) -> BatchSummary:
return BatchSummary(
names=list(batch_result.names),
results=[self._to_prediction(r) for r in batch_result.results],
format_pattern=self._to_format_pattern(batch_result.format_pattern),
confidences=[a.confidence for a in batch_result.individual_analyses],
)

# ---- timo entrypoint --------------------------------------------------

def predict_batch(self, instances: List[Instance]) -> List[Prediction]:
"""timo HTTP entrypoint. Analyzes the whole batch jointly.

Names are processed together (cross-batch order detection), returning one
Prediction per name (index-aligned) with surname/given_name/middle_name,
per-name `confidence`, and the shared `format_pattern` (same on every row).
"""
if not instances:
return []

names = [i.name for i in instances]
batch_result = self._detector.analyze_name_batch(names)
pattern = self._to_format_pattern(batch_result.format_pattern)

predictions = []
for parse_result, analysis in zip(
batch_result.results, batch_result.individual_analyses
):
prediction = self._to_prediction(parse_result)
prediction.confidence = analysis.confidence
prediction.format_pattern = pattern
predictions.append(prediction)
return predictions

# ---- exposed detector functions --------------------------------------

@staticmethod
def _batch_kwargs(format_threshold, minimum_batch_size=...) -> dict:
"""Forward only caller-set tuning params; let sinonym own the defaults."""
kw = {}
if format_threshold is not None:
kw["format_threshold"] = format_threshold
if minimum_batch_size is not ... and minimum_batch_size is not None:
kw["minimum_batch_size"] = minimum_batch_size
return kw

def analyze_name_batch(
self,
names: List[str],
format_threshold: Optional[float] = None,
minimum_batch_size: Optional[int] = None,
) -> BatchPrediction:
batch_result = self._detector.analyze_name_batch(
names, **self._batch_kwargs(format_threshold, minimum_batch_size)
)
return self._to_batch_prediction(batch_result)

def process_name_batch(
self,
names: List[str],
format_threshold: Optional[float] = None,
minimum_batch_size: Optional[int] = None,
) -> List[Prediction]:
results = self._detector.process_name_batch(
names, **self._batch_kwargs(format_threshold, minimum_batch_size)
)
return [self._to_prediction(r) for r in results]

def detect_batch_format(
self,
names: List[str],
format_threshold: Optional[float] = None,
) -> FormatPattern:
pattern = self._detector.detect_batch_format(
names, **self._batch_kwargs(format_threshold)
)
return self._to_format_pattern(pattern)

def process_name_batch_multiprocess(
self,
names: List[str],
max_workers: Optional[int] = None,
chunk_size: int = 64,
) -> List[Prediction]:
results = self._detector.process_name_batch_multiprocess(
names, max_workers=max_workers, chunk_size=chunk_size
)
return [self._to_prediction(r) for r in results]

def score_name_batch(
self,
names: List[str],
format_threshold: Optional[float] = None,
minimum_batch_size: Optional[int] = None,
) -> BatchSummary:
"""analyze_name_batch trimmed to names, results, format_pattern, per-name confidence."""
batch_result = self._detector.analyze_name_batch(
names, **self._batch_kwargs(format_threshold, minimum_batch_size)
)
return self._to_batch_summary(batch_result)
Loading
Loading