Source code for papfa.producers

import abc
import signal
from datetime import timedelta, datetime
from typing import Type, List
from uuid import uuid4

from confluent_kafka import SerializingProducer, KafkaError
from confluent_kafka.schema_registry.avro import AvroSerializer
from dataclasses_avroschema import AvroModel

from .config import KafkaProducerConfig
from .consumers import BatchConfig
from .dtos import Record
from .middlewares.producer import ProducerMiddleware
from .settings import Papfa


[docs]class MessageProducer(abc.ABC):
[docs] @abc.abstractmethod def produce(self, messages): pass
[docs]class KafkaMessageProducer(MessageProducer): def __init__( self, topic: str, kafka_producer_config: KafkaProducerConfig, middlewares: List[ProducerMiddleware] = None, batch_config: BatchConfig = None, producer_kwargs: dict = None, ): self.topic = topic self.not_flushed_count = 0 self.__producer = None self.kafka_producer_config = kafka_producer_config self.middlewares = middlewares or [] self.batch_config = batch_config or BatchConfig(size=100, timeout=timedelta(microseconds=10)) self.producer_kwargs = producer_kwargs or dict() self.last_flush_time = None @property def producer(self): class KafkaSerializingProducer(SerializingProducer): def __init__(self, config, middlewares): self.middlewares = middlewares super().__init__(config) def produce(self, topic, key=None, value=None, partition=-1, on_delivery=None, timestamp=0, headers=None): for middleware in self.middlewares: value = middleware.process_before_produce(value) super(KafkaSerializingProducer, self).produce( topic, key, value, partition, on_delivery, timestamp, headers ) for middleware in self.middlewares: middleware.process_after_produce(value) if not self.__producer: self.__producer = KafkaSerializingProducer(self.get_config(), middlewares=self.middlewares) return self.__producer
[docs] def get_config(self): _config = { "bootstrap.servers": ','.join(self.kafka_producer_config.kafka_config.bootstrap_servers), "value.serializer": self.kafka_producer_config.serializer, "linger.ms": self.producer_kwargs.pop('linger.ms', self.batch_config.timeout.total_seconds() * 10 ** 6), "security.protocol": self.kafka_producer_config.kafka_config.security_protocol, "sasl.mechanism": self.kafka_producer_config.kafka_config.sasl_mechanism, "sasl.username": self.kafka_producer_config.kafka_config.sasl_username, "sasl.password": self.kafka_producer_config.kafka_config.sasl_password, **self.producer_kwargs, } return {k: v for k, v in _config.items() if v is not None}
[docs] def produce(self, message: Record, on_delivery: callable = None, force_flush: bool = False) -> None: self.producer.produce( topic=self.topic, key=message.key or uuid4(), value=message.value, on_delivery=on_delivery, headers=message.headers, timestamp=message.timestamp, ) self.not_flushed_count += 1 self.last_flush_time = self.last_flush_time or datetime.now() self.flush(force_flush)
[docs] def flush(self, force=False): if ( force or self.not_flushed_count >= self.batch_config.size or datetime.now() - self.last_flush_time > self.batch_config.timeout ): self.producer.flush() self.not_flushed_count = 0
[docs]class KafkaMessageTransactionalProducer(KafkaMessageProducer):
[docs] class Transaction: def __init__(self, producer, topic): self.topic = topic self.producer = producer self.consumer = None self.producer.init_transactions() def _handle_interrupt(self): self.producer.abort_transaction() def __call__(self, consumer): self.consumer = consumer return self def __enter__(self): signal.signal(signal.SIGINT, self._handle_interrupt) signal.signal(signal.SIGTERM, self._handle_interrupt) self.producer.begin_transaction() return self.produce
[docs] def produce(self, message: Record, on_delivery: callable = None): self.producer.produce( topic=self.topic, key=message.key or uuid4(), value=message.value, on_delivery=on_delivery, headers=message.headers, timestamp=message.timestamp, )
def __exit__(self, exc_type, exc_val, exc_tb): if not exc_type: try: if self.consumer: self.producer.send_offsets_to_transaction( self.consumer.position(self.consumer.assignment()), self.consumer.consumer_group_metadata() ) self.producer.commit_transaction() return except KafkaError: pass self.producer.abort_transaction()
def __init__( self, topic: str, kafka_producer_config: KafkaProducerConfig, middlewares: List[ProducerMiddleware] = None, batch_config: BatchConfig = None, transaction_id: str = None, producer_kwargs: dict = None, ): super().__init__(topic, kafka_producer_config, middlewares, batch_config, producer_kwargs) self.transaction_id = transaction_id or uuid4() self.transaction_instance = self.Transaction(self.producer, self.topic)
[docs] def transaction(self, consumer=None): if self.transaction_instance: return self.transaction_instance(consumer) raise Exception('Not a transactional producer.')
[docs] def get_config(self): conf = super().get_config() conf.update( {'transactional.id': self.transaction_id, } ) return conf
[docs]def get_message_producer( topic, avro_model: Type[AvroModel], kafka_producer_config: KafkaProducerConfig = None, middlewares: List[ProducerMiddleware] = None, batch_config=None, producer_kwargs=None, ): return KafkaMessageProducer( topic=topic, kafka_producer_config=kafka_producer_config or KafkaProducerConfig( serializer=AvroSerializer( schema_registry_client=Papfa.get_instance()["schema_registry"], schema_str=avro_model.avro_schema(), ), kafka_config=Papfa.get_instance()["kafka_config"], ), batch_config=batch_config, middlewares=middlewares, producer_kwargs=producer_kwargs, )
[docs]def get_message_transactional_producer( topic, avro_model: Type[AvroModel], kafka_producer_config: KafkaProducerConfig = None, middlewares: List[ProducerMiddleware] = None, transaction_id='', batch_config=None, producer_kwargs=None, ): return KafkaMessageTransactionalProducer( topic=topic, kafka_producer_config=kafka_producer_config or KafkaProducerConfig( serializer=AvroSerializer( schema_registry_client=Papfa.get_instance()["schema_registry"], schema_str=avro_model.avro_schema(), ), kafka_config=Papfa.get_instance()["kafka_config"], ), batch_config=batch_config, middlewares=middlewares, transaction_id=transaction_id, producer_kwargs=producer_kwargs, )