services.task_dispatcher
1from typing import Any, Dict, List, Tuple 2 3from services.api_clients import AgifyClient, JokeClient, PostmanClient 4from utils.logger import info_logger, error_logger 5 6 7class AsyncTaskDispatcher: 8 """Dispatches tasks to external API clients and manages caching for age predictions.""" 9 10 def __init__(self) -> None: 11 """Initialize API clients and internal age cache.""" 12 self.age_cache: Dict[Tuple[str, str], Dict[str, Any]] = {} 13 self.agify_client = AgifyClient() 14 self.joke_client = JokeClient() 15 self.postman_client = PostmanClient() 16 17 async def preload_age_predictions( 18 self, name_country_pairs: List[Tuple[str, str]] 19 ) -> None: 20 """ 21 Preload and cache age predictions for batches of (name, country) pairs. 22 23 :param name_country_pairs: List of (name, country) tuples to preload predictions for. 24 """ 25 country_groups: Dict[str, List[str]] = {} 26 for name, country in name_country_pairs: 27 country = country.upper() 28 country_groups.setdefault(country, []).append(name) 29 30 for country, names in country_groups.items(): 31 for i in range(0, len(names), 10): 32 batch = names[i : i + 10] 33 try: 34 results = await self.agify_client.get_batch_ages(batch, country) 35 info_logger.info( 36 f"[BATCH] {len(batch)} names in {country}: {', '.join(batch)}" 37 ) 38 for result in results: 39 self.age_cache[(result["name"], country)] = result 40 except Exception as e: 41 error_logger.error( 42 f"[BATCH] Failed batch request for {country}: {str(e)}" 43 ) 44 45 async def handle(self, data: Dict[str, Any]) -> Dict[str, Any]: 46 """ 47 Process a single task based on its type ("age", "joke", etc.). 48 49 :param data: Input data dict with a "type" field. 50 :return: Processed result posted through PostmanClient. 51 :raises Exception: If any error occurs during processing. 52 """ 53 task_type: str = data.get("type", "").lower() 54 name: str = data.get("name", "") 55 country: str = data.get("country", "").upper() 56 57 try: 58 if task_type == "age": 59 key = (name, country) 60 if key not in self.age_cache: 61 result = await self.agify_client.get_age(name, country) 62 info_logger.info(f"[SINGLE] Age fetched for {name} in {country}") 63 self.age_cache[key] = result 64 response: Dict[str, Any] = self.age_cache[key] 65 66 elif task_type == "joke": 67 response = await self.joke_client.get_random_joke() 68 info_logger.info(f"[JOKE] Random joke fetched") 69 70 else: 71 response = data 72 info_logger.info( 73 f"[RAW] Unrecognized task_type. Used input as response." 74 ) 75 76 postman_response = await self.postman_client.post_response(response) 77 return postman_response.get("json", {}) 78 79 except Exception as e: 80 error_logger.error(f"[TASK] Failed to handle task: {str(e)}") 81 raise
class
AsyncTaskDispatcher:
8class AsyncTaskDispatcher: 9 """Dispatches tasks to external API clients and manages caching for age predictions.""" 10 11 def __init__(self) -> None: 12 """Initialize API clients and internal age cache.""" 13 self.age_cache: Dict[Tuple[str, str], Dict[str, Any]] = {} 14 self.agify_client = AgifyClient() 15 self.joke_client = JokeClient() 16 self.postman_client = PostmanClient() 17 18 async def preload_age_predictions( 19 self, name_country_pairs: List[Tuple[str, str]] 20 ) -> None: 21 """ 22 Preload and cache age predictions for batches of (name, country) pairs. 23 24 :param name_country_pairs: List of (name, country) tuples to preload predictions for. 25 """ 26 country_groups: Dict[str, List[str]] = {} 27 for name, country in name_country_pairs: 28 country = country.upper() 29 country_groups.setdefault(country, []).append(name) 30 31 for country, names in country_groups.items(): 32 for i in range(0, len(names), 10): 33 batch = names[i : i + 10] 34 try: 35 results = await self.agify_client.get_batch_ages(batch, country) 36 info_logger.info( 37 f"[BATCH] {len(batch)} names in {country}: {', '.join(batch)}" 38 ) 39 for result in results: 40 self.age_cache[(result["name"], country)] = result 41 except Exception as e: 42 error_logger.error( 43 f"[BATCH] Failed batch request for {country}: {str(e)}" 44 ) 45 46 async def handle(self, data: Dict[str, Any]) -> Dict[str, Any]: 47 """ 48 Process a single task based on its type ("age", "joke", etc.). 49 50 :param data: Input data dict with a "type" field. 51 :return: Processed result posted through PostmanClient. 52 :raises Exception: If any error occurs during processing. 53 """ 54 task_type: str = data.get("type", "").lower() 55 name: str = data.get("name", "") 56 country: str = data.get("country", "").upper() 57 58 try: 59 if task_type == "age": 60 key = (name, country) 61 if key not in self.age_cache: 62 result = await self.agify_client.get_age(name, country) 63 info_logger.info(f"[SINGLE] Age fetched for {name} in {country}") 64 self.age_cache[key] = result 65 response: Dict[str, Any] = self.age_cache[key] 66 67 elif task_type == "joke": 68 response = await self.joke_client.get_random_joke() 69 info_logger.info(f"[JOKE] Random joke fetched") 70 71 else: 72 response = data 73 info_logger.info( 74 f"[RAW] Unrecognized task_type. Used input as response." 75 ) 76 77 postman_response = await self.postman_client.post_response(response) 78 return postman_response.get("json", {}) 79 80 except Exception as e: 81 error_logger.error(f"[TASK] Failed to handle task: {str(e)}") 82 raise
Dispatches tasks to external API clients and manages caching for age predictions.
AsyncTaskDispatcher()
11 def __init__(self) -> None: 12 """Initialize API clients and internal age cache.""" 13 self.age_cache: Dict[Tuple[str, str], Dict[str, Any]] = {} 14 self.agify_client = AgifyClient() 15 self.joke_client = JokeClient() 16 self.postman_client = PostmanClient()
Initialize API clients and internal age cache.
async def
preload_age_predictions(self, name_country_pairs: List[Tuple[str, str]]) -> None:
18 async def preload_age_predictions( 19 self, name_country_pairs: List[Tuple[str, str]] 20 ) -> None: 21 """ 22 Preload and cache age predictions for batches of (name, country) pairs. 23 24 :param name_country_pairs: List of (name, country) tuples to preload predictions for. 25 """ 26 country_groups: Dict[str, List[str]] = {} 27 for name, country in name_country_pairs: 28 country = country.upper() 29 country_groups.setdefault(country, []).append(name) 30 31 for country, names in country_groups.items(): 32 for i in range(0, len(names), 10): 33 batch = names[i : i + 10] 34 try: 35 results = await self.agify_client.get_batch_ages(batch, country) 36 info_logger.info( 37 f"[BATCH] {len(batch)} names in {country}: {', '.join(batch)}" 38 ) 39 for result in results: 40 self.age_cache[(result["name"], country)] = result 41 except Exception as e: 42 error_logger.error( 43 f"[BATCH] Failed batch request for {country}: {str(e)}" 44 )
Preload and cache age predictions for batches of (name, country) pairs.
Parameters
- name_country_pairs: List of (name, country) tuples to preload predictions for.
async def
handle(self, data: Dict[str, Any]) -> Dict[str, Any]:
46 async def handle(self, data: Dict[str, Any]) -> Dict[str, Any]: 47 """ 48 Process a single task based on its type ("age", "joke", etc.). 49 50 :param data: Input data dict with a "type" field. 51 :return: Processed result posted through PostmanClient. 52 :raises Exception: If any error occurs during processing. 53 """ 54 task_type: str = data.get("type", "").lower() 55 name: str = data.get("name", "") 56 country: str = data.get("country", "").upper() 57 58 try: 59 if task_type == "age": 60 key = (name, country) 61 if key not in self.age_cache: 62 result = await self.agify_client.get_age(name, country) 63 info_logger.info(f"[SINGLE] Age fetched for {name} in {country}") 64 self.age_cache[key] = result 65 response: Dict[str, Any] = self.age_cache[key] 66 67 elif task_type == "joke": 68 response = await self.joke_client.get_random_joke() 69 info_logger.info(f"[JOKE] Random joke fetched") 70 71 else: 72 response = data 73 info_logger.info( 74 f"[RAW] Unrecognized task_type. Used input as response." 75 ) 76 77 postman_response = await self.postman_client.post_response(response) 78 return postman_response.get("json", {}) 79 80 except Exception as e: 81 error_logger.error(f"[TASK] Failed to handle task: {str(e)}") 82 raise
Process a single task based on its type ("age", "joke", etc.).
Parameters
- data: Input data dict with a "type" field.
Returns
Processed result posted through PostmanClient.
Raises
- Exception: If any error occurs during processing.