Source code for papfa.middlewares.consumer

import abc
import dataclasses
import json
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import List

from papfa.dtos import Record


[docs]class ConsumerMiddleware(abc.ABC):
[docs] def process_before_poll(self): pass
[docs] def process_before_batching(self, message: Record) -> Record: return message
[docs] def process_before_flush(self, batch: List[Record]) -> List[Record]: return batch
[docs] def process_after_flush(self): pass
[docs]@dataclasses.dataclass(frozen=True, eq=True) class MessageKey: topic: str group_id: str
[docs]class ConsumedMessageStatsMiddleware(ConsumerMiddleware): def __init__(self): self.consumed_message_stats = defaultdict(lambda: defaultdict(int))
[docs] def process_before_flush(self, batch: List[Record]) -> List[Record]: for r in batch: key = MessageKey(topic=r.meta["topic"], group_id=r.meta["group_id"]) self.consumed_message_stats[key]["last_message_timestamp"] = max( self.consumed_message_stats[key]["last_message_timestamp"], r.timestamp ) self.consumed_message_stats[key]["consumed_message_timestamp"] = max( self.consumed_message_stats[key]["consumed_message_timestamp"], datetime.now().timestamp() * 10**3, # to milliseconds ) return batch
[docs] def process_after_flush(self): for k, v in self.consumed_message_stats.items(): Path("./consume-data/").mkdir(parents=True, exist_ok=True) _dir = f"{k.topic}-{k.group_id}.json" with open(f"./consume-data/{_dir}", "w") as f: json.dump(v, f)