services.task_dispatcher

 1from typing import Any, Dict, List, Tuple
 2
 3from services.api_clients import AgifyClient, JokeClient, PostmanClient
 4from utils.logger import info_logger, error_logger
 5
 6
 7class AsyncTaskDispatcher:
 8    """Dispatches tasks to external API clients and manages caching for age predictions."""
 9
10    def __init__(self) -> None:
11        """Initialize API clients and internal age cache."""
12        self.age_cache: Dict[Tuple[str, str], Dict[str, Any]] = {}
13        self.agify_client = AgifyClient()
14        self.joke_client = JokeClient()
15        self.postman_client = PostmanClient()
16
17    async def preload_age_predictions(
18        self, name_country_pairs: List[Tuple[str, str]]
19    ) -> None:
20        """
21        Preload and cache age predictions for batches of (name, country) pairs.
22
23        :param name_country_pairs: List of (name, country) tuples to preload predictions for.
24        """
25        country_groups: Dict[str, List[str]] = {}
26        for name, country in name_country_pairs:
27            country = country.upper()
28            country_groups.setdefault(country, []).append(name)
29
30        for country, names in country_groups.items():
31            for i in range(0, len(names), 10):
32                batch = names[i : i + 10]
33                try:
34                    results = await self.agify_client.get_batch_ages(batch, country)
35                    info_logger.info(
36                        f"[BATCH] {len(batch)} names in {country}: {', '.join(batch)}"
37                    )
38                    for result in results:
39                        self.age_cache[(result["name"], country)] = result
40                except Exception as e:
41                    error_logger.error(
42                        f"[BATCH] Failed batch request for {country}: {str(e)}"
43                    )
44
45    async def handle(self, data: Dict[str, Any]) -> Dict[str, Any]:
46        """
47        Process a single task based on its type ("age", "joke", etc.).
48
49        :param data: Input data dict with a "type" field.
50        :return: Processed result posted through PostmanClient.
51        :raises Exception: If any error occurs during processing.
52        """
53        task_type: str = data.get("type", "").lower()
54        name: str = data.get("name", "")
55        country: str = data.get("country", "").upper()
56
57        try:
58            if task_type == "age":
59                key = (name, country)
60                if key not in self.age_cache:
61                    result = await self.agify_client.get_age(name, country)
62                    info_logger.info(f"[SINGLE] Age fetched for {name} in {country}")
63                    self.age_cache[key] = result
64                response: Dict[str, Any] = self.age_cache[key]
65
66            elif task_type == "joke":
67                response = await self.joke_client.get_random_joke()
68                info_logger.info(f"[JOKE] Random joke fetched")
69
70            else:
71                response = data
72                info_logger.info(
73                    f"[RAW] Unrecognized task_type. Used input as response."
74                )
75
76            postman_response = await self.postman_client.post_response(response)
77            return postman_response.get("json", {})
78
79        except Exception as e:
80            error_logger.error(f"[TASK] Failed to handle task: {str(e)}")
81            raise
class AsyncTaskDispatcher:
 8class AsyncTaskDispatcher:
 9    """Dispatches tasks to external API clients and manages caching for age predictions."""
10
11    def __init__(self) -> None:
12        """Initialize API clients and internal age cache."""
13        self.age_cache: Dict[Tuple[str, str], Dict[str, Any]] = {}
14        self.agify_client = AgifyClient()
15        self.joke_client = JokeClient()
16        self.postman_client = PostmanClient()
17
18    async def preload_age_predictions(
19        self, name_country_pairs: List[Tuple[str, str]]
20    ) -> None:
21        """
22        Preload and cache age predictions for batches of (name, country) pairs.
23
24        :param name_country_pairs: List of (name, country) tuples to preload predictions for.
25        """
26        country_groups: Dict[str, List[str]] = {}
27        for name, country in name_country_pairs:
28            country = country.upper()
29            country_groups.setdefault(country, []).append(name)
30
31        for country, names in country_groups.items():
32            for i in range(0, len(names), 10):
33                batch = names[i : i + 10]
34                try:
35                    results = await self.agify_client.get_batch_ages(batch, country)
36                    info_logger.info(
37                        f"[BATCH] {len(batch)} names in {country}: {', '.join(batch)}"
38                    )
39                    for result in results:
40                        self.age_cache[(result["name"], country)] = result
41                except Exception as e:
42                    error_logger.error(
43                        f"[BATCH] Failed batch request for {country}: {str(e)}"
44                    )
45
46    async def handle(self, data: Dict[str, Any]) -> Dict[str, Any]:
47        """
48        Process a single task based on its type ("age", "joke", etc.).
49
50        :param data: Input data dict with a "type" field.
51        :return: Processed result posted through PostmanClient.
52        :raises Exception: If any error occurs during processing.
53        """
54        task_type: str = data.get("type", "").lower()
55        name: str = data.get("name", "")
56        country: str = data.get("country", "").upper()
57
58        try:
59            if task_type == "age":
60                key = (name, country)
61                if key not in self.age_cache:
62                    result = await self.agify_client.get_age(name, country)
63                    info_logger.info(f"[SINGLE] Age fetched for {name} in {country}")
64                    self.age_cache[key] = result
65                response: Dict[str, Any] = self.age_cache[key]
66
67            elif task_type == "joke":
68                response = await self.joke_client.get_random_joke()
69                info_logger.info(f"[JOKE] Random joke fetched")
70
71            else:
72                response = data
73                info_logger.info(
74                    f"[RAW] Unrecognized task_type. Used input as response."
75                )
76
77            postman_response = await self.postman_client.post_response(response)
78            return postman_response.get("json", {})
79
80        except Exception as e:
81            error_logger.error(f"[TASK] Failed to handle task: {str(e)}")
82            raise

Dispatches tasks to external API clients and manages caching for age predictions.

AsyncTaskDispatcher()
11    def __init__(self) -> None:
12        """Initialize API clients and internal age cache."""
13        self.age_cache: Dict[Tuple[str, str], Dict[str, Any]] = {}
14        self.agify_client = AgifyClient()
15        self.joke_client = JokeClient()
16        self.postman_client = PostmanClient()

Initialize API clients and internal age cache.

age_cache: Dict[Tuple[str, str], Dict[str, Any]]
agify_client
joke_client
postman_client
async def preload_age_predictions(self, name_country_pairs: List[Tuple[str, str]]) -> None:
18    async def preload_age_predictions(
19        self, name_country_pairs: List[Tuple[str, str]]
20    ) -> None:
21        """
22        Preload and cache age predictions for batches of (name, country) pairs.
23
24        :param name_country_pairs: List of (name, country) tuples to preload predictions for.
25        """
26        country_groups: Dict[str, List[str]] = {}
27        for name, country in name_country_pairs:
28            country = country.upper()
29            country_groups.setdefault(country, []).append(name)
30
31        for country, names in country_groups.items():
32            for i in range(0, len(names), 10):
33                batch = names[i : i + 10]
34                try:
35                    results = await self.agify_client.get_batch_ages(batch, country)
36                    info_logger.info(
37                        f"[BATCH] {len(batch)} names in {country}: {', '.join(batch)}"
38                    )
39                    for result in results:
40                        self.age_cache[(result["name"], country)] = result
41                except Exception as e:
42                    error_logger.error(
43                        f"[BATCH] Failed batch request for {country}: {str(e)}"
44                    )

Preload and cache age predictions for batches of (name, country) pairs.

Parameters
  • name_country_pairs: List of (name, country) tuples to preload predictions for.
async def handle(self, data: Dict[str, Any]) -> Dict[str, Any]:
46    async def handle(self, data: Dict[str, Any]) -> Dict[str, Any]:
47        """
48        Process a single task based on its type ("age", "joke", etc.).
49
50        :param data: Input data dict with a "type" field.
51        :return: Processed result posted through PostmanClient.
52        :raises Exception: If any error occurs during processing.
53        """
54        task_type: str = data.get("type", "").lower()
55        name: str = data.get("name", "")
56        country: str = data.get("country", "").upper()
57
58        try:
59            if task_type == "age":
60                key = (name, country)
61                if key not in self.age_cache:
62                    result = await self.agify_client.get_age(name, country)
63                    info_logger.info(f"[SINGLE] Age fetched for {name} in {country}")
64                    self.age_cache[key] = result
65                response: Dict[str, Any] = self.age_cache[key]
66
67            elif task_type == "joke":
68                response = await self.joke_client.get_random_joke()
69                info_logger.info(f"[JOKE] Random joke fetched")
70
71            else:
72                response = data
73                info_logger.info(
74                    f"[RAW] Unrecognized task_type. Used input as response."
75                )
76
77            postman_response = await self.postman_client.post_response(response)
78            return postman_response.get("json", {})
79
80        except Exception as e:
81            error_logger.error(f"[TASK] Failed to handle task: {str(e)}")
82            raise

Process a single task based on its type ("age", "joke", etc.).

Parameters
  • data: Input data dict with a "type" field.
Returns

Processed result posted through PostmanClient.

Raises
  • Exception: If any error occurs during processing.