Optimization Analysis: analysis_orchestrator.py
Date: 2025-11-27
File: src/ast_grep_mcp/features/deduplication/analysis_orchestrator.py
Lines: 334
Purpose: Orchestrates deduplication candidate analysis workflow
Executive Summary
Analysis identified 15 optimization opportunities across 4 categories:
- Performance: 5 critical improvements (parallel execution, caching, batching)
- Code Quality: 4 refactoring opportunities (complexity reduction, DRY violations)
- Architecture: 3 design improvements (separation of concerns, configuration)
- Error Handling: 3 robustness enhancements (edge cases, validation, resilience)
Priority Recommendations:
- HIGH: Implement batch test coverage detection (60-80% performance gain)
- HIGH: Add instance-level caching for components (avoid repeated instantiation)
- MEDIUM: Extract parallel execution to reusable utility
- MEDIUM: Refactor long methods (_enrich_and_summarize: 56 lines)
1. Performance Bottlenecks
1.1 Sequential Test Coverage Detection in coverage.py
Severity: CRITICAL
Location: coverage.py:340-372 (called from analysis_orchestrator.py:229-274)
Impact: O(n) sequential file I/O operations, glob searches, and regex matching
Current Implementation:
# coverage.py:356-362
for file_path in file_paths:
has_coverage = self.has_test_coverage(file_path, language, project_root)
coverage_map[file_path] = has_coverage
Problem:
get_test_coverage_for_files()iterates sequentially over all files- Each call to
has_test_coverage()performs:- File existence checks (lines 309-315)
- Glob searches across entire project (lines 318-335)
- File reads with regex matching (lines 224-228, 234-285)
- For 100 candidates with 5 files each = 500 sequential I/O operations
Recommendation:
# Add to coverage.py
def get_test_coverage_for_files_batch(
self,
file_paths: List[str],
language: str,
project_root: str,
max_workers: int = 4
) -> Dict[str, bool]:
"""Parallel batch test coverage detection."""
coverage_map: Dict[str, bool] = {}
# Pre-compute test file patterns once
patterns = self.find_test_file_patterns(language)
test_files = set()
for pattern in patterns:
test_files.update(glob.glob(os.path.join(project_root, pattern), recursive=True))
# Parallel processing
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(
self._has_test_coverage_optimized,
fp, language, project_root, test_files
): fp
for fp in file_paths
}
for future in as_completed(futures):
file_path = futures[future]
coverage_map[file_path] = future.result()
return coverage_map
Expected Gain: 60-80% reduction in test coverage detection time
1.2 Repeated Component Instantiation
Severity: HIGH
Location: analysis_orchestrator.py:19-25
Impact: Unnecessary object creation overhead
Current Implementation:
def __init__(self):
self.detector = DuplicationDetector()
self.ranker = DuplicationRanker()
self.coverage_detector = TestCoverageDetector()
self.recommendation_engine = RecommendationEngine()
Problem:
- Each
DeduplicationAnalysisOrchestratorinstance creates new component instances - Components are stateless/lightweight but still incur instantiation overhead
- No caching or singleton pattern for reusable components
Recommendation:
# Option 1: Module-level singleton cache
_component_cache = {}
def _get_component(component_type: str):
"""Get or create cached component instance."""
if component_type not in _component_cache:
if component_type == "detector":
_component_cache[component_type] = DuplicationDetector()
elif component_type == "ranker":
_component_cache[component_type] = DuplicationRanker()
# ... etc
return _component_cache[component_type]
# Option 2: Lazy initialization
@property
def detector(self):
if not hasattr(self, '_detector'):
self._detector = DuplicationDetector()
return self._detector
Expected Gain: 10-15% reduction in orchestrator initialization time
1.3 Duplicate ThreadPoolExecutor Pattern
Severity: MEDIUM
Location: analysis_orchestrator.py:229-274, 276-315
Impact: Code duplication, maintenance burden
Current Implementation:
# Lines 246-265: Test coverage parallel execution
if parallel and len(candidates) > 1:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {...}
for future in as_completed(futures):
try:
future.result()
except Exception as e:
self.logger.error(...)
# Lines 289-306: Recommendations parallel execution (DUPLICATE PATTERN)
if parallel and len(candidates) > 1:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {...}
for future in as_completed(futures):
try:
future.result()
except Exception as e:
self.logger.error(...)
Problem:
- Exact same parallel execution pattern duplicated twice
- Violates DRY principle
- Error handling logic duplicated
Recommendation:
def _parallel_enrich(
self,
candidates: List[Dict[str, Any]],
enrich_func: Callable,
operation_name: str,
parallel: bool = True,
max_workers: int = 4,
**kwargs
) -> None:
"""Generic parallel enrichment helper."""
if parallel and len(candidates) > 1:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(enrich_func, candidate, **kwargs): candidate
for candidate in candidates
}
for future in as_completed(futures):
try:
future.result()
except Exception as e:
self.logger.error(
f"{operation_name}_enrichment_failed",
error=str(e)
)
else:
for candidate in candidates:
enrich_func(candidate, **kwargs)
self.logger.debug(
f"{operation_name}_added",
candidate_count=len(candidates),
parallel=parallel
)
# Usage:
def _add_test_coverage(self, candidates, language, project_path, ...):
self._parallel_enrich(
candidates,
self._enrich_with_test_coverage,
"test_coverage",
language=language,
project_path=project_path
)
Expected Gain: Reduces duplication by 40 lines, improves maintainability
1.4 No Caching for Repeated Analysis
Severity: MEDIUM
Location: analysis_orchestrator.py:73-76
Impact: Repeated scoring calculations
Current Implementation:
# Lines 73-76
ranked_candidates = self.ranker.rank_deduplication_candidates(
duplication_results.get("duplicates", [])
)
Problem:
rank_deduplication_candidates()recalculates scores every time- No memoization for identical candidate groups
- Scoring involves multiple calculations (savings, complexity, risk, effort)
Recommendation:
from functools import lru_cache
# In ranker.py
@lru_cache(maxsize=256)
def _calculate_candidate_hash(self, candidate: Dict[str, Any]) -> str:
"""Generate stable hash for candidate caching."""
key_parts = [
candidate.get("similarity", 0),
tuple(sorted(candidate.get("files", []))),
candidate.get("lines_saved", 0)
]
return hash(str(key_parts))
def rank_deduplication_candidates(self, candidates, include_analysis=True):
# Check cache before calculating
cache_key = tuple(self._calculate_candidate_hash(c) for c in candidates)
if cache_key in self._score_cache:
return self._score_cache[cache_key]
# ... existing logic ...
self._score_cache[cache_key] = ranked
return ranked
Expected Gain: 20-30% speedup for repeated analysis runs
1.5 No Early Exit on Max Candidates
Severity: LOW
Location: analysis_orchestrator.py:157-158
Impact: Unnecessary enrichment of candidates that won’t be returned
Current Implementation:
# Lines 157-165
top_candidates = self._get_top_candidates(ranked_candidates, max_candidates)
if include_test_coverage:
self._add_test_coverage(top_candidates, language, project_path)
self._add_recommendations(top_candidates)
Problem:
- Gets top N candidates AFTER ranking all
- Enriches only top N (correct)
- But ranking itself processes all candidates unnecessarily
Recommendation:
# In ranker.py - add early exit parameter
def rank_deduplication_candidates(
self,
candidates: List[Dict[str, Any]],
max_results: Optional[int] = None,
include_analysis: bool = True
) -> List[Dict[str, Any]]:
# ... scoring logic ...
# Sort by score
ranked.sort(key=lambda x: x["score"], reverse=True)
# Early exit if max_results specified
if max_results:
ranked = ranked[:max_results]
# Add rank numbers only to returned candidates
for i, candidate in enumerate(ranked):
candidate["rank"] = i + 1
return ranked
# In orchestrator - pass max_candidates to ranker
ranked_candidates = self.ranker.rank_deduplication_candidates(
duplication_results.get("duplicates", []),
max_results=max_candidates # Early exit at ranker level
)
Expected Gain: 5-10% reduction in ranking time for large candidate sets
2. Code Quality Issues
2.1 Long Methods
Severity: MEDIUM
Location: analysis_orchestrator.py:27-87, 133-188
Impact: Reduced readability, testability, maintainability
Findings:
analyze_candidates(): 61 lines (target: <30)_enrich_and_summarize(): 56 lines (target: <30)
Current Implementation:
# Lines 27-87: analyze_candidates
def analyze_candidates(self, project_path, language, min_similarity, ...):
self.logger.info(...)
# Step 1: Find duplicates
duplication_results = self.detector.find_duplication(...)
# Step 2: Rank candidates
ranked_candidates = self.ranker.rank_deduplication_candidates(...)
# Step 3-5: Enrich and summarize
return self._enrich_and_summarize(...)
Recommendation:
def analyze_candidates(self, project_path, language, **options):
"""Main entry point - orchestrates 5-step workflow."""
config = self._build_analysis_config(project_path, language, options)
# Step 1: Find duplicates
duplication_results = self._find_duplicates(config)
# Step 2: Rank candidates
ranked_candidates = self._rank_candidates(duplication_results)
# Step 3-5: Enrich and build results
return self._build_analysis_results(ranked_candidates, config)
def _build_analysis_config(self, project_path, language, options):
"""Extract config building to separate method."""
return {
"project_path": project_path,
"language": language,
"min_similarity": options.get("min_similarity", 0.8),
"include_test_coverage": options.get("include_test_coverage", True),
# ...
}
def _find_duplicates(self, config):
"""Step 1: Duplicate detection."""
self.logger.info("analysis_start", **config)
return self.detector.find_duplication(
project_folder=config["project_path"],
language=config["language"],
min_similarity=config["min_similarity"],
min_lines=config["min_lines"],
exclude_patterns=config["exclude_patterns"]
)
def _rank_candidates(self, duplication_results):
"""Step 2: Ranking."""
return self.ranker.rank_deduplication_candidates(
duplication_results.get("duplicates", [])
)
def _build_analysis_results(self, ranked_candidates, config):
"""Steps 3-5: Enrichment and summary."""
top_candidates = ranked_candidates[:config["max_candidates"]]
if config["include_test_coverage"]:
self._add_test_coverage(
top_candidates,
config["language"],
config["project_path"]
)
self._add_recommendations(top_candidates)
return self._build_result_dict(
top_candidates,
ranked_candidates,
config
)
Expected Gain: Improved testability (can unit test each step), better readability
2.2 Excessive Parameter Passing
Severity: MEDIUM Location: Throughout file Impact: Method signatures with 6-8 parameters
Examples:
# Line 27: 8 parameters
def analyze_candidates(
self, project_path, language, min_similarity,
include_test_coverage, min_lines, max_candidates, exclude_patterns
)
# Line 133: 8 parameters
def _enrich_and_summarize(
self, ranked_candidates, max_candidates, include_test_coverage,
language, project_path, min_similarity, min_lines
)
# Line 105: 6 parameters
def _build_analysis_metadata(
self, language, min_similarity, min_lines,
include_test_coverage, project_path
)
Recommendation:
from dataclasses import dataclass
from typing import Optional
@dataclass
class AnalysisConfig:
"""Configuration for deduplication analysis."""
project_path: str
language: str
min_similarity: float = 0.8
include_test_coverage: bool = True
min_lines: int = 5
max_candidates: int = 100
exclude_patterns: Optional[List[str]] = None
parallel: bool = True
max_workers: int = 4
class DeduplicationAnalysisOrchestrator:
def analyze_candidates(self, config: AnalysisConfig) -> Dict[str, Any]:
"""Analyze with config object instead of 8 parameters."""
# ... implementation ...
def _enrich_and_summarize(
self,
ranked_candidates: List[Dict[str, Any]],
config: AnalysisConfig
) -> Dict[str, Any]:
"""2 parameters instead of 8."""
# ... implementation ...
Expected Gain: Cleaner method signatures, easier to extend configuration
2.3 Magic Numbers
Severity: LOW
Location: analysis_orchestrator.py:234, 279
Impact: Reduced maintainability
Current Implementation:
# Line 234
def _add_test_coverage(self, ..., parallel: bool = True, max_workers: int = 4):
# Line 279
def _add_recommendations(self, ..., parallel: bool = True, max_workers: int = 4):
Problem:
- Hardcoded
max_workers=4repeated - No justification for the value
- Not configurable at class level
Recommendation:
class DeduplicationAnalysisOrchestrator:
DEFAULT_MAX_WORKERS = 4 # CPU-bound tasks, conservative thread count
def __init__(self, max_workers: Optional[int] = None):
self.max_workers = max_workers or self.DEFAULT_MAX_WORKERS
# ... rest of init ...
def _add_test_coverage(
self,
candidates,
language,
project_path,
parallel: bool = True,
max_workers: Optional[int] = None
):
workers = max_workers or self.max_workers
# ... use workers instead of hardcoded 4 ...
Expected Gain: Better configurability, clearer intent
2.4 Inconsistent Naming
Severity: LOW
Location: analysis_orchestrator.py:157, 180
Impact: Confusion in variable naming
Current Implementation:
# Line 157
top_candidates = self._get_top_candidates(ranked_candidates, max_candidates)
# Line 179-180
return {
"total_groups": len(ranked_candidates), # All candidates
"total_savings_potential": total_savings, # From top candidates only
}
Problem:
total_savingscalculated fromtop_candidates(line 168)- But labeled as “total” in metadata (misleading)
- Should be
top_candidates_savingsor similar
Recommendation:
# Line 168
top_candidates_savings = self._calculate_total_savings(top_candidates)
# Line 177-182
return {
"candidates": top_candidates,
"total_groups_analyzed": len(ranked_candidates),
"top_candidates_count": len(top_candidates),
"top_candidates_savings_potential": top_candidates_savings,
"analysis_metadata": self._build_analysis_metadata(...)
}
Expected Gain: Clearer API, less confusion
3. Architecture Improvements
3.1 Missing Configuration Validation
Severity: MEDIUM
Location: analysis_orchestrator.py:27-36
Impact: Runtime errors from invalid input
Current Implementation:
def analyze_candidates(
self,
project_path: str,
language: str,
min_similarity: float = 0.8,
include_test_coverage: bool = True,
min_lines: int = 5,
max_candidates: int = 100,
exclude_patterns: List[str] | None = None
) -> Dict[str, Any]:
# No validation of inputs
self.logger.info(...)
duplication_results = self.detector.find_duplication(...)
Problem:
- No validation of
min_similarityrange (should be 0.0-1.0) - No validation of
min_lines(should be positive) - No validation of
max_candidates(should be positive) - No path existence check for
project_path - Invalid inputs cause errors deep in the workflow
Recommendation:
def analyze_candidates(self, ...) -> Dict[str, Any]:
# Validate inputs early
self._validate_analysis_inputs(
project_path, language, min_similarity,
min_lines, max_candidates
)
# ... rest of method ...
def _validate_analysis_inputs(
self,
project_path: str,
language: str,
min_similarity: float,
min_lines: int,
max_candidates: int
) -> None:
"""Validate analysis inputs with clear error messages."""
if not os.path.exists(project_path):
raise ValueError(f"Project path does not exist: {project_path}")
if not 0.0 <= min_similarity <= 1.0:
raise ValueError(f"min_similarity must be 0.0-1.0, got {min_similarity}")
if min_lines < 1:
raise ValueError(f"min_lines must be positive, got {min_lines}")
if max_candidates < 1:
raise ValueError(f"max_candidates must be positive, got {max_candidates}")
# Could add language validation against supported languages
supported_languages = ["python", "javascript", "typescript", "java", "go"]
if language.lower() not in supported_languages:
self.logger.warning(
"unsupported_language",
language=language,
supported=supported_languages
)
Expected Gain: Fail-fast with clear errors, better user experience
3.2 Tight Coupling to Implementation Details
Severity: MEDIUM
Location: analysis_orchestrator.py:22-25, 65-76
Impact: Hard to test, hard to extend
Current Implementation:
def __init__(self):
self.detector = DuplicationDetector() # Concrete dependency
self.ranker = DuplicationRanker() # Concrete dependency
self.coverage_detector = TestCoverageDetector() # Concrete dependency
self.recommendation_engine = RecommendationEngine() # Concrete dependency
def analyze_candidates(self, ...):
duplication_results = self.detector.find_duplication(...) # Direct call
ranked_candidates = self.ranker.rank_deduplication_candidates(...)
Problem:
- Hard to mock for testing (need to patch multiple classes)
- Hard to extend with custom implementations
- Violates dependency inversion principle
Recommendation:
# Option 1: Dependency injection
class DeduplicationAnalysisOrchestrator:
def __init__(
self,
detector: Optional[DuplicationDetector] = None,
ranker: Optional[DuplicationRanker] = None,
coverage_detector: Optional[TestCoverageDetector] = None,
recommendation_engine: Optional[RecommendationEngine] = None
):
self.detector = detector or DuplicationDetector()
self.ranker = ranker or DuplicationRanker()
self.coverage_detector = coverage_detector or TestCoverageDetector()
self.recommendation_engine = recommendation_engine or RecommendationEngine()
# Testing becomes easy:
def test_analyze_candidates():
mock_detector = Mock()
mock_detector.find_duplication.return_value = {"duplicates": []}
orchestrator = DeduplicationAnalysisOrchestrator(detector=mock_detector)
result = orchestrator.analyze_candidates(...)
Expected Gain: Improved testability, easier to extend
3.3 No Progress Callbacks
Severity: LOW Location: Throughout workflow Impact: Poor UX for long-running analysis
Current Implementation:
def analyze_candidates(self, ...):
self.logger.info("analysis_start", ...)
# Step 1 (could take minutes for large projects)
duplication_results = self.detector.find_duplication(...)
# Step 2 (could process thousands of candidates)
ranked_candidates = self.ranker.rank_deduplication_candidates(...)
# Step 3 (I/O intensive)
if include_test_coverage:
self._add_test_coverage(...)
# Step 4
self._add_recommendations(...)
self.logger.info("analysis_complete", ...)
Problem:
- No progress feedback during long operations
- User doesn’t know which step is running
- Can’t cancel mid-workflow
Recommendation:
from typing import Callable, Optional
ProgressCallback = Callable[[str, float], None]
class DeduplicationAnalysisOrchestrator:
def analyze_candidates(
self,
...,
progress_callback: Optional[ProgressCallback] = None
) -> Dict[str, Any]:
"""Analyze with optional progress reporting."""
def report_progress(stage: str, percent: float):
if progress_callback:
progress_callback(stage, percent)
report_progress("Finding duplicates", 0.0)
duplication_results = self.detector.find_duplication(...)
report_progress("Ranking candidates", 0.25)
ranked_candidates = self.ranker.rank_deduplication_candidates(...)
report_progress("Getting top candidates", 0.50)
top_candidates = self._get_top_candidates(...)
if include_test_coverage:
report_progress("Checking test coverage", 0.60)
self._add_test_coverage(...)
report_progress("Generating recommendations", 0.85)
self._add_recommendations(...)
report_progress("Complete", 1.0)
return result
# Usage:
def show_progress(stage, percent):
print(f"[{percent*100:.0f}%] {stage}")
orchestrator.analyze_candidates(..., progress_callback=show_progress)
Expected Gain: Better UX for long-running operations
4. Error Handling & Edge Cases
4.1 No Error Recovery in Parallel Enrichment
Severity: HIGH
Location: analysis_orchestrator.py:258-265, 299-306
Impact: Silent failures, partial results
Current Implementation:
# Lines 258-265
for future in as_completed(futures):
try:
future.result()
except Exception as e:
self.logger.error(
"test_coverage_enrichment_failed",
error=str(e)
)
Problem:
- Exception logged but enrichment continues
- No indication in result that some candidates failed enrichment
- Candidate left in inconsistent state (missing test_coverage field)
- No retry logic
Recommendation:
def _add_test_coverage(self, candidates, language, project_path, ...):
failed_candidates = []
if parallel and len(candidates) > 1:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(...): candidate
for candidate in candidates
}
for future in as_completed(futures):
candidate = futures[future]
try:
future.result()
except Exception as e:
self.logger.error(
"test_coverage_enrichment_failed",
candidate_id=candidate.get("id"),
error=str(e)
)
# Mark candidate as failed
candidate["test_coverage_error"] = str(e)
candidate["test_coverage"] = {}
candidate["has_tests"] = False
failed_candidates.append(candidate)
# Log summary with failure count
self.logger.info(
"test_coverage_added",
candidate_count=len(candidates),
failed_count=len(failed_candidates),
parallel=parallel
)
return failed_candidates # Return for monitoring
Expected Gain: Better error visibility, consistent state
4.2 Missing Empty List Handling
Severity: MEDIUM
Location: analysis_orchestrator.py:128-130
Impact: Division by zero potential
Current Implementation:
# Lines 125-130
self.logger.info(
"candidates_ranked",
total_candidates=len(ranked),
top_score=ranked[0]["score"] if ranked else 0, # Safe
average_score=sum(c["score"] for c in ranked) / len(ranked) if ranked else 0 # Safe
)
Analysis:
This is actually handled correctly with the if ranked else 0 guard. However, let’s check other locations:
# Line 329 - POTENTIAL ISSUE
def _calculate_total_savings(self, candidates):
total = sum(
c.get("lines_saved", 0) * len(c.get("files", []))
for c in candidates
)
return total
Issue:
- If
candidatesis empty list, returns 0 (correct) - But if candidate has no “files” key,
len(c.get("files", []))returns 0 (correct) - Actually safe due to
.get()with default
Recommendation: Add explicit validation at entry points:
def _enrich_and_summarize(self, ranked_candidates, ...):
if not ranked_candidates:
self.logger.warning("no_candidates_to_enrich")
return {
"candidates": [],
"total_groups": 0,
"total_savings_potential": 0,
"analysis_metadata": self._build_analysis_metadata(...)
}
# Continue with normal flow
top_candidates = self._get_top_candidates(...)
Expected Gain: Explicit empty handling, clearer intent
4.3 No Timeout for Parallel Operations
Severity: MEDIUM
Location: analysis_orchestrator.py:246-265, 289-306
Impact: Potential hangs on stuck operations
Current Implementation:
# Lines 246-265
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {...}
for future in as_completed(futures): # No timeout
try:
future.result() # Blocks indefinitely
except Exception as e:
self.logger.error(...)
Problem:
- File I/O operations can hang (network drives, slow disks)
- Glob searches can take very long on large projects
- No mechanism to cancel stuck operations
Recommendation:
from concurrent.futures import TimeoutError
def _add_test_coverage(
self,
candidates,
language,
project_path,
parallel: bool = True,
max_workers: int = 4,
timeout_per_candidate: int = 30 # seconds
):
if parallel and len(candidates) > 1:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {...}
for future in as_completed(futures, timeout=timeout_per_candidate * len(candidates)):
candidate = futures[future]
try:
future.result(timeout=timeout_per_candidate)
except TimeoutError:
self.logger.error(
"test_coverage_timeout",
candidate_id=candidate.get("id"),
timeout=timeout_per_candidate
)
# Mark as timeout
candidate["test_coverage_error"] = "Timeout"
candidate["test_coverage"] = {}
candidate["has_tests"] = False
except Exception as e:
# ... existing error handling ...
Expected Gain: Prevents indefinite hangs, better resilience
5. Summary of Recommendations
High Priority (Implement First)
| # | Optimization | Location | Expected Impact | Effort |
|---|---|---|---|---|
| 1.1 | Batch test coverage detection | coverage.py:340-372 | 60-80% speedup | HIGH |
| 1.2 | Component instance caching | analysis_orchestrator.py:19-25 | 10-15% speedup | LOW |
| 4.1 | Error recovery in parallel ops | analysis_orchestrator.py:258-306 | Better resilience | MEDIUM |
| 3.1 | Input validation | analysis_orchestrator.py:27-36 | Fail-fast errors | LOW |
Medium Priority (Plan Next)
| # | Optimization | Location | Expected Impact | Effort |
|---|---|---|---|---|
| 1.3 | Extract parallel execution utility | analysis_orchestrator.py:229-315 | 40 lines saved | MEDIUM |
| 2.1 | Refactor long methods | analysis_orchestrator.py:27-188 | Better testability | HIGH |
| 2.2 | Config object pattern | Throughout | Cleaner API | MEDIUM |
| 4.3 | Operation timeouts | analysis_orchestrator.py:246-306 | Prevent hangs | MEDIUM |
Low Priority (Nice to Have)
| # | Optimization | Location | Expected Impact | Effort |
|---|---|---|---|---|
| 1.4 | Score caching | ranker.py | 20-30% speedup | MEDIUM |
| 1.5 | Early exit on max candidates | analysis_orchestrator.py:157 | 5-10% speedup | LOW |
| 2.3 | Extract magic numbers | analysis_orchestrator.py:234,279 | Better config | LOW |
| 2.4 | Fix naming inconsistency | analysis_orchestrator.py:157-180 | Clearer API | LOW |
| 3.2 | Dependency injection | analysis_orchestrator.py:22-25 | Better testing | MEDIUM |
| 3.3 | Progress callbacks | Throughout | Better UX | MEDIUM |
6. Implementation Roadmap
Phase 1: Quick Wins (1-2 days)
- Add component instance caching (1.2)
- Add input validation (3.1)
- Extract magic numbers (2.3)
- Fix naming inconsistencies (2.4)
Phase 2: Performance (3-5 days)
- Implement batch test coverage detection (1.1) - CRITICAL
- Add score caching (1.4)
- Implement early exit optimization (1.5)
Phase 3: Robustness (2-3 days)
- Add error recovery in parallel ops (4.1)
- Add operation timeouts (4.3)
- Add empty list validation (4.2)
Phase 4: Refactoring (5-7 days)
- Extract parallel execution utility (1.3)
- Refactor long methods (2.1)
- Implement config object pattern (2.2)
- Add dependency injection (3.2)
Phase 5: User Experience (2-3 days)
- Add progress callbacks (3.3)
- Improve error messages
- Add comprehensive logging
Total Estimated Effort: 13-20 days
7. Testing Strategy
For each optimization:
- Before: Benchmark current performance
uv run python scripts/benchmark_parallel_enrichment.py --baseline - After: Measure improvement
uv run python scripts/benchmark_parallel_enrichment.py --compare baseline.json - Regression Tests:
- All existing tests must pass
- Add tests for new edge cases
- Add tests for error handling paths
- Performance Tests:
- Test with 10, 100, 1000 candidates
- Test with 1, 10, 100 files per candidate
- Measure memory usage
Appendix: Benchmark Script
Create scripts/benchmark_parallel_enrichment.py:
#!/usr/bin/env python3
"""Benchmark parallel enrichment performance."""
import time
import json
from pathlib import Path
from ast_grep_mcp.features.deduplication.analysis_orchestrator import (
DeduplicationAnalysisOrchestrator
)
def create_mock_candidates(count: int, files_per_candidate: int = 5):
"""Create mock candidates for benchmarking."""
candidates = []
for i in range(count):
candidates.append({
"id": f"candidate_{i}",
"similarity": 0.85,
"lines_saved": 50,
"files": [f"/tmp/file_{i}_{j}.py" for j in range(files_per_candidate)],
"complexity_score": 3,
"score": 75.0
})
return candidates
def benchmark_test_coverage(
orchestrator,
candidates,
project_path: str,
parallel: bool
):
"""Benchmark test coverage enrichment."""
start = time.time()
orchestrator._add_test_coverage(
candidates,
language="python",
project_path=project_path,
parallel=parallel
)
elapsed = time.time() - start
return elapsed
def main():
orchestrator = DeduplicationAnalysisOrchestrator()
project_path = "/Users/alyshialedlie/code/ast-grep-mcp"
results = {}
for candidate_count in [10, 50, 100]:
print(f"\n=== Testing with {candidate_count} candidates ===")
candidates = create_mock_candidates(candidate_count)
# Sequential
candidates_copy = [dict(c) for c in candidates]
seq_time = benchmark_test_coverage(
orchestrator, candidates_copy, project_path, parallel=False
)
print(f"Sequential: {seq_time:.2f}s")
# Parallel
candidates_copy = [dict(c) for c in candidates]
par_time = benchmark_test_coverage(
orchestrator, candidates_copy, project_path, parallel=True
)
print(f"Parallel: {par_time:.2f}s")
print(f"Speedup: {seq_time/par_time:.2f}x")
results[candidate_count] = {
"sequential": seq_time,
"parallel": par_time,
"speedup": seq_time / par_time
}
# Save results
output_file = Path("benchmark_results.json")
with open(output_file, "w") as f:
json.dump(results, f, indent=2)
print(f"\nResults saved to {output_file}")
if __name__ == "__main__":
main()
End of Analysis