Skip to content

Classifier

mailtag.classifier.Classifier

Classifies emails using a multi-signal strategy, prioritizing server-side labels, then historical data, semantic routing, and finally an AI model.

Source code in src/mailtag/classifier.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
class Classifier:
    """
    Classifies emails using a multi-signal strategy, prioritizing server-side
    labels, then historical data, semantic routing, and finally an AI model.
    """

    def __init__(self, config: AppConfig, database: ClassificationDatabase):
        import threading

        self.config = config
        self.proposal_file = Path("proposals.log")
        self.database = database
        self.ai_cache = {}  # Simple in-memory cache for AI responses

        # Thread safety locks
        self._mlx_lock = threading.RLock()  # Reentrant lock for MLX initialization
        self._cache_lock = threading.Lock()  # Lock for ai_cache access

        # MLX components (lazy loaded)
        self._embedder: MLXEmbedder | None = None
        self._semantic_router: SemanticRouter | None = None
        self._mlx_llm: MLXLLM | None = None
        self._mlx_initialized = False

        # Cached prompt prefix (static category list portion of LLM prompt)
        self._llm_prompt_prefix: str | None = None

        # Buffered proposal writes
        self._proposal_buffer: list[str] = []

        # Use either folder analyzer or static schema based on configuration
        if config.general.use_imap_folders_for_classification:
            self.folder_analyzer = FolderAnalyzer()
            self.categories = self.folder_analyzer.get_all_categories()
            logger.info(f"Using dynamic IMAP folder structure with {len(self.categories)} categories")
        else:
            self.folder_analyzer = None
            self.categories = self._load_categories_from_schema()
            logger.info(f"Using static classification schema with {len(self.categories)} categories")

    def _init_mlx_components(self) -> bool:
        """Lazy initialize MLX components when first needed (thread-safe).

        Returns:
            True if initialization successful, False otherwise
        """
        with self._mlx_lock:
            if self._mlx_initialized:
                return self._semantic_router is not None

            self._mlx_initialized = True

            if not self.config.mlx.enabled:
                logger.debug("MLX classification is disabled in config")
                return False

            try:
                from .mlx_provider import MLXLLM, MLXEmbedder
                from .semantic_router import SemanticRouter

                # Initialize embedder
                logger.info(f"Initializing MLX embedder with model: {self.config.mlx.embedding_model}")
                self._embedder = MLXEmbedder(self.config.mlx.embedding_model)

                # Initialize semantic router
                self._semantic_router = SemanticRouter(
                    self._embedder,
                    score_threshold=self.config.mlx.score_threshold,
                )

                # Try to load pre-computed embeddings
                embeddings_path = Path(self.config.mlx.embeddings_file)
                if embeddings_path.exists():
                    if self._semantic_router.load_embeddings(embeddings_path):
                        num_cats = self._semantic_router.num_categories
                        logger.info(f"Loaded {num_cats} category embeddings from {embeddings_path}")
                    else:
                        logger.warning(f"Failed to load embeddings from {embeddings_path}")
                else:
                    logger.warning(
                        f"No embeddings file found at {embeddings_path}. "
                        "Run 'python scripts/build_category_embeddings.py' to generate."
                    )

                # Initialize LLM for fallback
                logger.info(f"Initializing MLX LLM with model: {self.config.mlx.llm_model}")
                self._mlx_llm = MLXLLM(
                    model_name=self.config.mlx.llm_model,
                    max_tokens=self.config.mlx.llm_max_tokens,
                    temperature=self.config.mlx.llm_temperature,
                )

                logger.info("MLX components initialized successfully")
                return True

            except ImportError as e:
                logger.warning(f"MLX dependencies not available: {e}")
                return False
            except (OSError, RuntimeError, ValueError) as e:
                logger.error(f"Failed to initialize MLX components: {e}")
                return False

    def _load_categories_from_schema(self) -> list[str]:
        """Loads classification categories from the YAML file (legacy method)."""
        schema_path = Path("data/classification_schema.yml")
        if not schema_path.exists():
            logger.error(f"Classification schema not found at {schema_path}")
            return []
        with schema_path.open("r", encoding="utf-8") as f:
            schema = yaml.safe_load(f)

        categories = []
        for category in schema:
            if "sublabels" in category and category["sublabels"]:
                for sublabel in category["sublabels"]:
                    categories.append(f"{category['name']}/{sublabel['name']}")
            else:
                categories.append(category["name"])
        return categories

    def _get_category_from_validated_db(self, email: Email) -> str | None:
        """
        Signal 1: Check for a classification from the validated database.
        """
        return self.database.get_dominant_classification(email.sender_address)

    def _get_category_from_labels(self, email: Email) -> str | None:
        """
        Signal 2: Check for an existing server-side label that matches a known category.
        """
        for label in email.labels:
            if label in self.categories:
                logger.debug(f"Found matching server-side label: {label}")
                return label
        return None

    def _get_category_from_history(self, email: Email) -> str | None:
        """
        Signal 3: Check for a high-confidence classification from the sender's history in the suggestion DB.
        """
        sender_classifications = self.database.get_sender_classifications(email.sender_address)
        if not sender_classifications:
            return None

        total_count = sum(sender_classifications.values())
        if total_count < self.config.classifier.min_count:
            return None

        most_common_category = max(sender_classifications, key=sender_classifications.get)
        confidence = sender_classifications[most_common_category] / total_count

        if confidence >= self.config.classifier.historical_confidence_threshold:
            logger.info(
                f"Found high-confidence historical category for {email.sender_address}: "
                f"{most_common_category} ({confidence:.2f} confidence)."
            )
            return most_common_category
        return None

    def _get_category_from_domain(self, email: Email) -> str | None:
        """
        Signal 4: Check for domain-based classification.
        Skip non-commercial domains (gmail.com, yahoo.com, etc.)
        """
        if not email.sender_address:
            return None

        domain = extract_domain(email.sender_address)
        if not domain:
            return None

        # Skip non-commercial domains (personal email providers)
        if is_non_commercial_domain_cached(domain):
            logger.debug(f"Skipping non-commercial domain: {domain}")
            return None

        # Look up domain classification
        category = self.database.get_category_by_domain(domain)
        if category:
            logger.info(
                f"Found domain-based classification for {email.sender_address}: {category} (domain: {domain})"
            )
            return category

        logger.debug(f"No domain classification found for: {domain}")
        return None

    def _get_category_from_semantic_router(self, email: Email) -> tuple[str | None, float]:
        """
        Signal 5: Use semantic router for embedding-based classification.

        Returns:
            Tuple of (category, confidence_score) or (None, 0.0) if no match
        """
        if not self._init_mlx_components():
            return None, 0.0

        if self._semantic_router is None or self._semantic_router.num_categories == 0:
            logger.debug("Semantic router not available or no categories loaded")
            return None, 0.0

        # Build query text from email
        sender = email.sender_name or email.sender_address or "Unknown"
        query_text = f"Email from {sender}: {email.subject}"

        # Add truncated body if available
        if email.body:
            truncated_body = self._truncate_body(email.body, max_chars=500)
            if truncated_body:
                query_text += f"\n{truncated_body}"

        # Route using semantic similarity
        category, score = self._semantic_router.route(query_text)

        if category:
            # Validate category exists in our known categories
            if category in self.categories:
                logger.debug(f"Semantic router matched '{category}' with score {score:.3f}")
                return category, score
            else:
                logger.debug(f"Semantic router suggested unknown category '{category}', ignoring")

        return None, score

    def _get_cache_key(self, email: Email) -> str:
        """Generate a cache key for AI responses based on sender and subject."""
        sender = email.sender_address or "Unknown"
        subject = email.subject or "No Subject"
        # Create a hash of sender + subject for caching
        cache_input = f"{sender.lower()}:{subject.lower()}"
        return hashlib.md5(cache_input.encode(), usedforsecurity=False).hexdigest()

    def _truncate_body(self, body: str, max_chars: int = 1500) -> str:
        """Intelligently truncate email body to preserve important content.

        Uses smart_truncate to:
        - Remove quoted replies and signatures
        - Extract key paragraphs and sentences
        - Preserve high-signal keywords

        Default increased from 500 to 1500 chars for better context.
        """
        if not body:
            return ""
        return smart_truncate(body, max_chars=max_chars)

    def _parse_ai_json_response(self, raw_response: str) -> tuple[str, float, str]:
        """
        Parse JSON response from AI model.

        Returns:
            Tuple of (category, confidence, reason)
        """
        import json
        import re

        # Extract JSON from response (handles markdown code blocks)
        json_match = re.search(r"\{[^}]+\}", raw_response, re.DOTALL)
        if not json_match:
            # Fallback: try to parse the entire response
            json_match = re.search(r"\{.*\}", raw_response, re.DOTALL)

        if json_match:
            try:
                result = json.loads(json_match.group(0))
                category = result.get("category", "").strip()
                confidence = float(result.get("confidence", 0.0))
                reason = result.get("reason", "")

                # Validate confidence range
                if not 0.0 <= confidence <= 1.0:
                    logger.warning(f"AI confidence {confidence} out of range [0,1], clamping")
                    confidence = max(0.0, min(1.0, confidence))

                return category, confidence, reason
            except (json.JSONDecodeError, ValueError, KeyError) as e:
                logger.debug(f"JSON parse error: {e}, falling back to legacy parsing")
                return "", 0.0, ""

        return "", 0.0, ""

    def _parse_legacy_ai_response(self, raw_response: str) -> str:
        """
        Fallback parser for non-JSON responses (legacy format).

        Returns:
            Category string or empty string if invalid
        """
        category = raw_response.strip()

        # Handle legacy UNCERTAIN format
        if category.startswith("UNCERTAIN"):
            return ""

        # Handle folder-based classification responses
        if self.config.general.use_imap_folders_for_classification and self.folder_analyzer:
            if category.startswith("Parent/NewSub"):
                return ""

        return category

    def _build_llm_prompt_prefix(self) -> str:
        """Build and cache the static portion of the LLM prompt (category list + instructions).

        This avoids regenerating ~600-900 tokens of static text per email.
        """
        if self._llm_prompt_prefix is not None:
            return self._llm_prompt_prefix

        json_instructions = (
            "IMPORTANT: Réponds en format JSON structuré:\n"
            "{\n"
            '  "category": "NomExactCategorie",\n'
            '  "confidence": 0.95,\n'
            '  "reason": "brève explication (optionnel)"\n'
            "}\n\n"
            "- confidence: score entre 0.0 et 1.0 (0.0 = incertain, 1.0 = très confiant)\n"
            "- reason: pourquoi cette catégorie (1 phrase courte, optionnel)\n\n"
            "N'invente PAS de nouvelles catégories qui ne sont pas dans la liste."
        )

        if self.folder_analyzer:
            all_folders = self.folder_analyzer.get_all_categories()
            parent_folders = self.folder_analyzer.get_parent_folders()
            category_list = "\n".join([f"- {cat}" for cat in sorted(all_folders)])

            self._llm_prompt_prefix = (
                "Classe dans une des catégories suivantes (la plus spécifique possible):\n"
                f"{category_list}\n\n"
                "Si aucune catégorie n'existe, propose un sous-dossier: 'Parent/NewSub'\n"
                f"Parents valides: {', '.join(sorted(parent_folders))}\n\n"
                "- category: nom exact ou 'Parent/NewSub' pour nouveau sous-dossier\n"
                f"{json_instructions}"
            )
        else:
            category_list = "\n".join([f"- {cat}" for cat in self.categories])

            self._llm_prompt_prefix = (
                "Classe dans une catégorie de la liste suivante:\n"
                f"{category_list}\n\n"
                "- category: nom exact de la liste ci-dessus\n"
                f"{json_instructions}"
            )

        return self._llm_prompt_prefix

    def _get_category_from_ai(self, email: Email) -> str:
        """
        Signal 6: Fallback to AI classification with confidence scoring.
        Uses MLX LLM when available (Apple Silicon), otherwise falls back to litellm
        (Ollama, Gemini, OpenRouter, etc.) for Docker/cloud deployments.
        """
        sender = (
            f"{email.sender_name} <{email.sender_address}>"
            if email.sender_name
            else email.sender_address or "Unknown"
        )

        # Try MLX first; if disabled/unavailable, fall back to litellm
        if not self._init_mlx_components() or self._mlx_llm is None:
            logger.debug(f"MLX unavailable, using litellm for {sender}")
            return self._get_category_from_litellm(email, sender)

        # Check cache first (thread-safe)
        cache_key = self._get_cache_key(email)
        with self._cache_lock:
            if cache_key in self.ai_cache:
                logger.debug(f"Using cached AI response for {email.sender_address}")
                return self.ai_cache[cache_key]

        # Truncate email body for better performance
        truncated_body = self._truncate_body(email.body)

        # Build prompt: per-email header + cached static suffix
        prompt = (
            f"Sujet: {email.subject}\n"
            f"De: {sender}\n"
            f"Corps: {truncated_body}\n\n"
            f"{self._build_llm_prompt_prefix()}"
        )

        try:
            # Use MLX LLM classify method
            category, confidence, reason = self._mlx_llm.classify(prompt)

            if category:
                logger.debug(
                    f"MLX LLM classification: category='{category}', "
                    f"confidence={confidence:.2f}, reason='{reason}'"
                )

                # Check against confidence threshold
                confidence_threshold = self.config.mlx.llm_confidence
                if confidence < confidence_threshold:
                    logger.info(
                        f"MLX LLM confidence {confidence:.2f} below threshold "
                        f"{confidence_threshold:.2f}, routing to 'À Classer'"
                    )
                    self._log_proposal(email, f"{category} (confidence: {confidence:.2f}, reason: {reason})")
                    return "À Classer"

                # Validate category exists or is a new folder proposal
                if self.config.general.use_imap_folders_for_classification and self.folder_analyzer:
                    # Handle new subfolder suggestions
                    if "/" in category and category not in self.categories:
                        parts = category.split("/", 1)
                        if len(parts) == 2:
                            parent, _subfolder = parts
                            if self.folder_analyzer.is_valid_parent_folder(parent):
                                logger.debug(f"Valid new folder proposal: '{category}'")
                                self._log_proposal(
                                    email, f"{category} (confidence: {confidence:.2f}, reason: {reason})"
                                )
                                return "À Classer"

                    # Check if category exists
                    if category not in self.categories:
                        logger.warning(f"MLX LLM suggested invalid category: '{category}'")
                        self._log_proposal(
                            email, f"{category} (confidence: {confidence:.2f}, reason: {reason})"
                        )
                        return "À Classer"
                else:
                    # Static schema mode
                    if category not in self.categories:
                        logger.warning(f"MLX LLM suggested invalid category: '{category}'")
                        self._log_proposal(
                            email, f"{category} (confidence: {confidence:.2f}, reason: {reason})"
                        )
                        return "À Classer"

                # Cache and return valid classification (thread-safe)
                with self._cache_lock:
                    self.ai_cache[cache_key] = category
                return category

            else:
                # Empty category means parsing failed or uncertain
                logger.debug("MLX LLM returned empty category")
                self._log_proposal(email, f"(empty response, confidence: {confidence:.2f})")
                return "À Classer"

        except (RuntimeError, ValueError, KeyError, AttributeError) as e:
            logger.error(f"Error calling MLX LLM: {e}")
            return "(Model Error)"

    def _get_category_from_litellm(self, email: Email, sender: str) -> str:
        """Fallback AI classification via litellm (Ollama, Gemini, OpenRouter, etc.).

        Used when MLX is unavailable (Docker, Linux, or mlx.enabled=false).
        """
        import json as json_mod

        # Check cache first (thread-safe)
        cache_key = self._get_cache_key(email)
        with self._cache_lock:
            if cache_key in self.ai_cache:
                logger.debug(f"Using cached AI response for {email.sender_address}")
                return self.ai_cache[cache_key]

        truncated_body = self._truncate_body(email.body)
        prompt_prefix = self._build_llm_prompt_prefix()

        user_content = f"Sujet: {email.subject}\nDe: {sender}\nCorps: {truncated_body}\n\n{prompt_prefix}"

        try:
            import litellm

            model = self.config.general.ollama_model
            api_base = self.config.general.api_base or None

            response = litellm.completion(
                model=model,
                api_base=api_base,
                messages=[{"role": "user", "content": user_content}],
                temperature=self.config.mlx.llm_temperature,
                max_tokens=self.config.mlx.llm_max_tokens,
                num_retries=2,
            )

            raw = response.choices[0].message.content.strip()

            # Strip markdown fences if present
            if raw.startswith("```"):
                raw = raw.split("\n", 1)[-1].rsplit("```", 1)[0].strip()

            result = json_mod.loads(raw)
            category = result.get("category", "").strip()
            confidence = float(result.get("confidence", 0.0))
            reason = result.get("reason", "")

        except ImportError:
            logger.error("litellm not installed — cannot classify without MLX")
            return "(Model Error)"
        except Exception as e:
            logger.error(f"litellm classification error: {e}")
            return "(Model Error)"

        if not category:
            logger.debug("litellm returned empty category")
            self._log_proposal(email, f"(empty response, confidence: {confidence:.2f})")
            return "À Classer"

        logger.debug(f"litellm classification: category='{category}', confidence={confidence:.2f}")

        # Check confidence threshold
        confidence_threshold = self.config.mlx.llm_confidence
        if confidence < confidence_threshold:
            logger.info(
                f"litellm confidence {confidence:.2f} below threshold "
                f"{confidence_threshold:.2f}, routing to 'À Classer'"
            )
            self._log_proposal(email, f"{category} (confidence: {confidence:.2f}, reason: {reason})")
            return "À Classer"

        # Validate category exists
        if category not in self.categories:
            logger.warning(f"litellm suggested invalid category: '{category}'")
            self._log_proposal(email, f"{category} (confidence: {confidence:.2f}, reason: {reason})")
            return "À Classer"

        # Cache and return
        with self._cache_lock:
            self.ai_cache[cache_key] = category
        return category

    def classify_email(self, email: Email) -> str:
        """
        Classifies an email using the Adaptive Multi-Signal Classification (AMSC) strategy.
        Tracks classification metrics for each signal.
        """
        start_time = time.perf_counter()

        # Signal 1: Validated Database
        category = self._get_category_from_validated_db(email)
        if category:
            elapsed_ms = (time.perf_counter() - start_time) * 1000
            logger.info(f"Classified via Validated DB: {category}")
            METRICS.classification_metrics.record_classification(
                email_id=email.msg_id,
                signal="validated_db",
                category=category,
                confidence=1.0,  # Validated = 100% confidence
                processing_time_ms=elapsed_ms,
            )
            return category

        # Signal 2: Server-Side Label
        category = self._get_category_from_labels(email)
        if category:
            elapsed_ms = (time.perf_counter() - start_time) * 1000
            logger.info(f"Classified via Server Label: {category}")
            self.database.update_suggestion(email.sender_address, category)
            METRICS.classification_metrics.record_classification(
                email_id=email.msg_id,
                signal="server_labels",
                category=category,
                confidence=0.95,  # High confidence from user's existing organization
                processing_time_ms=elapsed_ms,
            )
            return category

        # Signal 3: Historical Suggestion Database
        category = self._get_category_from_history(email)
        if category:
            elapsed_ms = (time.perf_counter() - start_time) * 1000
            logger.info(f"Classified via History: {category}")

            # Calculate actual confidence from historical data
            sender_classifications = self.database.get_sender_classifications(email.sender_address)
            total_count = sum(sender_classifications.values())
            confidence = sender_classifications.get(category, 0) / total_count if total_count > 0 else 0.0

            METRICS.classification_metrics.record_classification(
                email_id=email.msg_id,
                signal="historical_db",
                category=category,
                confidence=confidence,
                processing_time_ms=elapsed_ms,
            )
            return category

        # Signal 4: Domain-based Classification
        category = self._get_category_from_domain(email)
        if category:
            elapsed_ms = (time.perf_counter() - start_time) * 1000
            logger.info(f"Classified via Domain: {category}")
            self.database.update_suggestion(email.sender_address, category)
            METRICS.classification_metrics.record_classification(
                email_id=email.msg_id,
                signal="domain_db",
                category=category,
                confidence=0.90,  # Domain rules are high confidence
                processing_time_ms=elapsed_ms,
            )
            return category

        # Signal 5: Semantic Router (Embedding-based classification)
        category, semantic_score = self._get_category_from_semantic_router(email)
        if category:
            elapsed_ms = (time.perf_counter() - start_time) * 1000
            logger.info(f"Classified via Semantic Router: {category} (score: {semantic_score:.3f})")
            self.database.update_suggestion(email.sender_address, category)
            METRICS.classification_metrics.record_classification(
                email_id=email.msg_id,
                signal="semantic_router",
                category=category,
                confidence=semantic_score,
                processing_time_ms=elapsed_ms,
            )
            return category

        # Signal 6: MLX LLM (Fallback)
        logger.debug("No high-confidence signals found, falling back to MLX LLM.")
        category = self._get_category_from_ai(email)
        total_elapsed_ms = (time.perf_counter() - start_time) * 1000

        logger.info(f"Classified via MLX LLM: {category}")

        if category not in ["À Classer", "(Model Error)"]:
            self.database.update_suggestion(email.sender_address, category)
            METRICS.classification_metrics.record_classification(
                email_id=email.msg_id,
                signal="mlx_llm",
                category=category,
                confidence=None,  # Confidence already tracked in _get_category_from_ai
                processing_time_ms=total_elapsed_ms,
            )
        elif category == "(Model Error)":
            METRICS.classification_metrics.record_error("mlx_llm_error", email.sender_address)
        else:
            # "À Classer" - low confidence or uncertain
            METRICS.classification_metrics.record_error("mlx_llm_uncertain", email.sender_address)

        return category

    def _log_proposal(self, email: Email, proposal: str):
        """Buffer a classification proposal for later flushing."""
        sender = (
            f"{email.sender_name} <{email.sender_address}>" if email.sender_name else email.sender_address
        )
        entry = (
            "=" * 80 + "\n"
            f"From: {sender}\n"
            f"Subject: {email.subject}\n"
            f"Proposed Category: {proposal}\n"
            f"Body:\n{email.body}\n\n"
        )
        self._proposal_buffer.append(entry)

    def flush_proposals(self) -> None:
        """Write buffered proposals to disk and clear the buffer."""
        if not self._proposal_buffer:
            return
        with self.proposal_file.open("a", encoding="utf-8") as f:
            f.writelines(self._proposal_buffer)
        self._proposal_buffer.clear()

    def classify_emails_batch(self, emails: list[Email]) -> list[str]:
        """Classify a batch of emails, using batch embeddings for Signal 5.

        Emails that pass Signals 1-4 are classified individually.
        Remaining emails get batch-encoded for Signal 5 (semantic router).
        Emails still unclassified fall through to Signal 6 (LLM) individually.

        Args:
            emails: List of Email objects to classify

        Returns:
            List of category strings, one per input email
        """
        results: list[str | None] = [None] * len(emails)
        pending_indices: list[int] = []

        # Signals 1-4: fast lookups (no batching needed)
        for i, email_obj in enumerate(emails):
            start_time = time.perf_counter()

            category = self._get_category_from_validated_db(email_obj)
            if category:
                elapsed_ms = (time.perf_counter() - start_time) * 1000
                METRICS.classification_metrics.record_classification(
                    email_id=email_obj.msg_id,
                    signal="validated_db",
                    category=category,
                    confidence=1.0,
                    processing_time_ms=elapsed_ms,
                )
                results[i] = category
                continue

            category = self._get_category_from_labels(email_obj)
            if category:
                elapsed_ms = (time.perf_counter() - start_time) * 1000
                self.database.update_suggestion(email_obj.sender_address, category)
                METRICS.classification_metrics.record_classification(
                    email_id=email_obj.msg_id,
                    signal="server_labels",
                    category=category,
                    confidence=0.95,
                    processing_time_ms=elapsed_ms,
                )
                results[i] = category
                continue

            category = self._get_category_from_history(email_obj)
            if category:
                elapsed_ms = (time.perf_counter() - start_time) * 1000
                sender_cls = self.database.get_sender_classifications(email_obj.sender_address)
                total = sum(sender_cls.values())
                conf = sender_cls.get(category, 0) / total if total > 0 else 0.0
                METRICS.classification_metrics.record_classification(
                    email_id=email_obj.msg_id,
                    signal="historical_db",
                    category=category,
                    confidence=conf,
                    processing_time_ms=elapsed_ms,
                )
                results[i] = category
                continue

            category = self._get_category_from_domain(email_obj)
            if category:
                elapsed_ms = (time.perf_counter() - start_time) * 1000
                self.database.update_suggestion(email_obj.sender_address, category)
                METRICS.classification_metrics.record_classification(
                    email_id=email_obj.msg_id,
                    signal="domain_db",
                    category=category,
                    confidence=0.90,
                    processing_time_ms=elapsed_ms,
                )
                results[i] = category
                continue

            pending_indices.append(i)

        # Signal 5: batch semantic routing
        has_router = (
            self._init_mlx_components() and self._semantic_router and self._semantic_router.num_categories > 0
        )
        if pending_indices and has_router:
            query_texts = []
            for i in pending_indices:
                e = emails[i]
                sender_text = e.sender_name or e.sender_address or "Unknown"
                qt = f"Email from {sender_text}: {e.subject}"
                if e.body:
                    tb = self._truncate_body(e.body, max_chars=500)
                    if tb:
                        qt += f"\n{tb}"
                query_texts.append(qt)

            batch_start = time.perf_counter()
            batch_results = self._semantic_router.route_batch(query_texts)
            batch_elapsed_ms = (time.perf_counter() - batch_start) * 1000
            still_pending: list[int] = []

            for idx, (cat, score) in zip(pending_indices, batch_results, strict=True):
                if cat and cat in self.categories:
                    self.database.update_suggestion(emails[idx].sender_address, cat)
                    METRICS.classification_metrics.record_classification(
                        email_id=emails[idx].msg_id,
                        signal="semantic_router",
                        category=cat,
                        confidence=score,
                        processing_time_ms=batch_elapsed_ms / len(pending_indices),
                    )
                    results[idx] = cat
                else:
                    still_pending.append(idx)

            pending_indices = still_pending

        # Signal 6: LLM fallback (sequential)
        for i in pending_indices:
            category = self._get_category_from_ai(emails[i])
            if category not in ["À Classer", "(Model Error)"]:
                self.database.update_suggestion(emails[i].sender_address, category)
            results[i] = category

        self.flush_proposals()

        # Ensure no None results
        return [r if r is not None else "À Classer" for r in results]

    def export_metrics(self, output_dir: Path = Path("data/metrics")) -> Path:
        """Export classification metrics to JSON file.

        Args:
            output_dir: Directory to save metrics file

        Returns:
            Path to exported metrics file
        """
        from datetime import datetime

        output_dir.mkdir(parents=True, exist_ok=True)
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filepath = output_dir / f"classification_metrics_{timestamp}.json"

        METRICS.classification_metrics.export_to_json(filepath)
        logger.info(f"Exported classification metrics to {filepath}")

        return filepath

    def log_metrics_summary(self, log_level: str = "INFO"):
        """Log a formatted summary of classification metrics.

        Args:
            log_level: Log level to use for output
        """
        METRICS.classification_metrics.log_summary(log_level)

__init__(config, database)

Source code in src/mailtag/classifier.py
def __init__(self, config: AppConfig, database: ClassificationDatabase):
    import threading

    self.config = config
    self.proposal_file = Path("proposals.log")
    self.database = database
    self.ai_cache = {}  # Simple in-memory cache for AI responses

    # Thread safety locks
    self._mlx_lock = threading.RLock()  # Reentrant lock for MLX initialization
    self._cache_lock = threading.Lock()  # Lock for ai_cache access

    # MLX components (lazy loaded)
    self._embedder: MLXEmbedder | None = None
    self._semantic_router: SemanticRouter | None = None
    self._mlx_llm: MLXLLM | None = None
    self._mlx_initialized = False

    # Cached prompt prefix (static category list portion of LLM prompt)
    self._llm_prompt_prefix: str | None = None

    # Buffered proposal writes
    self._proposal_buffer: list[str] = []

    # Use either folder analyzer or static schema based on configuration
    if config.general.use_imap_folders_for_classification:
        self.folder_analyzer = FolderAnalyzer()
        self.categories = self.folder_analyzer.get_all_categories()
        logger.info(f"Using dynamic IMAP folder structure with {len(self.categories)} categories")
    else:
        self.folder_analyzer = None
        self.categories = self._load_categories_from_schema()
        logger.info(f"Using static classification schema with {len(self.categories)} categories")

classify_email(email)

Classifies an email using the Adaptive Multi-Signal Classification (AMSC) strategy. Tracks classification metrics for each signal.

Source code in src/mailtag/classifier.py
def classify_email(self, email: Email) -> str:
    """
    Classifies an email using the Adaptive Multi-Signal Classification (AMSC) strategy.
    Tracks classification metrics for each signal.
    """
    start_time = time.perf_counter()

    # Signal 1: Validated Database
    category = self._get_category_from_validated_db(email)
    if category:
        elapsed_ms = (time.perf_counter() - start_time) * 1000
        logger.info(f"Classified via Validated DB: {category}")
        METRICS.classification_metrics.record_classification(
            email_id=email.msg_id,
            signal="validated_db",
            category=category,
            confidence=1.0,  # Validated = 100% confidence
            processing_time_ms=elapsed_ms,
        )
        return category

    # Signal 2: Server-Side Label
    category = self._get_category_from_labels(email)
    if category:
        elapsed_ms = (time.perf_counter() - start_time) * 1000
        logger.info(f"Classified via Server Label: {category}")
        self.database.update_suggestion(email.sender_address, category)
        METRICS.classification_metrics.record_classification(
            email_id=email.msg_id,
            signal="server_labels",
            category=category,
            confidence=0.95,  # High confidence from user's existing organization
            processing_time_ms=elapsed_ms,
        )
        return category

    # Signal 3: Historical Suggestion Database
    category = self._get_category_from_history(email)
    if category:
        elapsed_ms = (time.perf_counter() - start_time) * 1000
        logger.info(f"Classified via History: {category}")

        # Calculate actual confidence from historical data
        sender_classifications = self.database.get_sender_classifications(email.sender_address)
        total_count = sum(sender_classifications.values())
        confidence = sender_classifications.get(category, 0) / total_count if total_count > 0 else 0.0

        METRICS.classification_metrics.record_classification(
            email_id=email.msg_id,
            signal="historical_db",
            category=category,
            confidence=confidence,
            processing_time_ms=elapsed_ms,
        )
        return category

    # Signal 4: Domain-based Classification
    category = self._get_category_from_domain(email)
    if category:
        elapsed_ms = (time.perf_counter() - start_time) * 1000
        logger.info(f"Classified via Domain: {category}")
        self.database.update_suggestion(email.sender_address, category)
        METRICS.classification_metrics.record_classification(
            email_id=email.msg_id,
            signal="domain_db",
            category=category,
            confidence=0.90,  # Domain rules are high confidence
            processing_time_ms=elapsed_ms,
        )
        return category

    # Signal 5: Semantic Router (Embedding-based classification)
    category, semantic_score = self._get_category_from_semantic_router(email)
    if category:
        elapsed_ms = (time.perf_counter() - start_time) * 1000
        logger.info(f"Classified via Semantic Router: {category} (score: {semantic_score:.3f})")
        self.database.update_suggestion(email.sender_address, category)
        METRICS.classification_metrics.record_classification(
            email_id=email.msg_id,
            signal="semantic_router",
            category=category,
            confidence=semantic_score,
            processing_time_ms=elapsed_ms,
        )
        return category

    # Signal 6: MLX LLM (Fallback)
    logger.debug("No high-confidence signals found, falling back to MLX LLM.")
    category = self._get_category_from_ai(email)
    total_elapsed_ms = (time.perf_counter() - start_time) * 1000

    logger.info(f"Classified via MLX LLM: {category}")

    if category not in ["À Classer", "(Model Error)"]:
        self.database.update_suggestion(email.sender_address, category)
        METRICS.classification_metrics.record_classification(
            email_id=email.msg_id,
            signal="mlx_llm",
            category=category,
            confidence=None,  # Confidence already tracked in _get_category_from_ai
            processing_time_ms=total_elapsed_ms,
        )
    elif category == "(Model Error)":
        METRICS.classification_metrics.record_error("mlx_llm_error", email.sender_address)
    else:
        # "À Classer" - low confidence or uncertain
        METRICS.classification_metrics.record_error("mlx_llm_uncertain", email.sender_address)

    return category

classify_emails_batch(emails)

Classify a batch of emails, using batch embeddings for Signal 5.

Emails that pass Signals 1-4 are classified individually. Remaining emails get batch-encoded for Signal 5 (semantic router). Emails still unclassified fall through to Signal 6 (LLM) individually.

Parameters:

Name Type Description Default
emails list[Email]

List of Email objects to classify

required

Returns:

Type Description
list[str]

List of category strings, one per input email

Source code in src/mailtag/classifier.py
def classify_emails_batch(self, emails: list[Email]) -> list[str]:
    """Classify a batch of emails, using batch embeddings for Signal 5.

    Emails that pass Signals 1-4 are classified individually.
    Remaining emails get batch-encoded for Signal 5 (semantic router).
    Emails still unclassified fall through to Signal 6 (LLM) individually.

    Args:
        emails: List of Email objects to classify

    Returns:
        List of category strings, one per input email
    """
    results: list[str | None] = [None] * len(emails)
    pending_indices: list[int] = []

    # Signals 1-4: fast lookups (no batching needed)
    for i, email_obj in enumerate(emails):
        start_time = time.perf_counter()

        category = self._get_category_from_validated_db(email_obj)
        if category:
            elapsed_ms = (time.perf_counter() - start_time) * 1000
            METRICS.classification_metrics.record_classification(
                email_id=email_obj.msg_id,
                signal="validated_db",
                category=category,
                confidence=1.0,
                processing_time_ms=elapsed_ms,
            )
            results[i] = category
            continue

        category = self._get_category_from_labels(email_obj)
        if category:
            elapsed_ms = (time.perf_counter() - start_time) * 1000
            self.database.update_suggestion(email_obj.sender_address, category)
            METRICS.classification_metrics.record_classification(
                email_id=email_obj.msg_id,
                signal="server_labels",
                category=category,
                confidence=0.95,
                processing_time_ms=elapsed_ms,
            )
            results[i] = category
            continue

        category = self._get_category_from_history(email_obj)
        if category:
            elapsed_ms = (time.perf_counter() - start_time) * 1000
            sender_cls = self.database.get_sender_classifications(email_obj.sender_address)
            total = sum(sender_cls.values())
            conf = sender_cls.get(category, 0) / total if total > 0 else 0.0
            METRICS.classification_metrics.record_classification(
                email_id=email_obj.msg_id,
                signal="historical_db",
                category=category,
                confidence=conf,
                processing_time_ms=elapsed_ms,
            )
            results[i] = category
            continue

        category = self._get_category_from_domain(email_obj)
        if category:
            elapsed_ms = (time.perf_counter() - start_time) * 1000
            self.database.update_suggestion(email_obj.sender_address, category)
            METRICS.classification_metrics.record_classification(
                email_id=email_obj.msg_id,
                signal="domain_db",
                category=category,
                confidence=0.90,
                processing_time_ms=elapsed_ms,
            )
            results[i] = category
            continue

        pending_indices.append(i)

    # Signal 5: batch semantic routing
    has_router = (
        self._init_mlx_components() and self._semantic_router and self._semantic_router.num_categories > 0
    )
    if pending_indices and has_router:
        query_texts = []
        for i in pending_indices:
            e = emails[i]
            sender_text = e.sender_name or e.sender_address or "Unknown"
            qt = f"Email from {sender_text}: {e.subject}"
            if e.body:
                tb = self._truncate_body(e.body, max_chars=500)
                if tb:
                    qt += f"\n{tb}"
            query_texts.append(qt)

        batch_start = time.perf_counter()
        batch_results = self._semantic_router.route_batch(query_texts)
        batch_elapsed_ms = (time.perf_counter() - batch_start) * 1000
        still_pending: list[int] = []

        for idx, (cat, score) in zip(pending_indices, batch_results, strict=True):
            if cat and cat in self.categories:
                self.database.update_suggestion(emails[idx].sender_address, cat)
                METRICS.classification_metrics.record_classification(
                    email_id=emails[idx].msg_id,
                    signal="semantic_router",
                    category=cat,
                    confidence=score,
                    processing_time_ms=batch_elapsed_ms / len(pending_indices),
                )
                results[idx] = cat
            else:
                still_pending.append(idx)

        pending_indices = still_pending

    # Signal 6: LLM fallback (sequential)
    for i in pending_indices:
        category = self._get_category_from_ai(emails[i])
        if category not in ["À Classer", "(Model Error)"]:
            self.database.update_suggestion(emails[i].sender_address, category)
        results[i] = category

    self.flush_proposals()

    # Ensure no None results
    return [r if r is not None else "À Classer" for r in results]

flush_proposals()

Write buffered proposals to disk and clear the buffer.

Source code in src/mailtag/classifier.py
def flush_proposals(self) -> None:
    """Write buffered proposals to disk and clear the buffer."""
    if not self._proposal_buffer:
        return
    with self.proposal_file.open("a", encoding="utf-8") as f:
        f.writelines(self._proposal_buffer)
    self._proposal_buffer.clear()

export_metrics(output_dir=Path('data/metrics'))

Export classification metrics to JSON file.

Parameters:

Name Type Description Default
output_dir Path

Directory to save metrics file

Path('data/metrics')

Returns:

Type Description
Path

Path to exported metrics file

Source code in src/mailtag/classifier.py
def export_metrics(self, output_dir: Path = Path("data/metrics")) -> Path:
    """Export classification metrics to JSON file.

    Args:
        output_dir: Directory to save metrics file

    Returns:
        Path to exported metrics file
    """
    from datetime import datetime

    output_dir.mkdir(parents=True, exist_ok=True)
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filepath = output_dir / f"classification_metrics_{timestamp}.json"

    METRICS.classification_metrics.export_to_json(filepath)
    logger.info(f"Exported classification metrics to {filepath}")

    return filepath

log_metrics_summary(log_level='INFO')

Log a formatted summary of classification metrics.

Parameters:

Name Type Description Default
log_level str

Log level to use for output

'INFO'
Source code in src/mailtag/classifier.py
def log_metrics_summary(self, log_level: str = "INFO"):
    """Log a formatted summary of classification metrics.

    Args:
        log_level: Log level to use for output
    """
    METRICS.classification_metrics.log_summary(log_level)