papfa package

Subpackages

Submodules

papfa.cli module

Console script for papfa.

papfa.cli.configure_papfa(app_dir)[source]
papfa.cli.get_list_of_consumers()[source]

papfa.config module

class papfa.config.KafkaConfig(bootstrap_servers: List[str], sasl_mechanism: str, security_protocol: str, sasl_username: str, sasl_password: str)[source]

Bases: object

bootstrap_servers: List[str]
sasl_mechanism: str
sasl_password: str
sasl_username: str
security_protocol: str
class papfa.config.KafkaConsumerConfig(group_id: str, deserializer: confluent_kafka.schema_registry.avro.AvroDeserializer, kafka_config: papfa.config.KafkaConfig, topics: List[str], deserialize_key: bool = False)[source]

Bases: object

deserialize_key: bool = False
deserializer: AvroDeserializer
group_id: str
kafka_config: KafkaConfig
topics: List[str]
class papfa.config.KafkaProducerConfig(serializer: confluent_kafka.schema_registry.avro.AvroSerializer, kafka_config: papfa.config.KafkaConfig)[source]

Bases: object

kafka_config: KafkaConfig
serializer: AvroSerializer
class papfa.config.SchemaRegistryConfig[source]

Bases: object

papfa.consumers module

class papfa.consumers.BaseConsumer[source]

Bases: ABC

abstract consume()[source]
abstract get_meta_data()[source]
class papfa.consumers.BatchConfig(size: int, timeout: datetime.timedelta)[source]

Bases: object

size: int
timeout: timedelta
class papfa.consumers.ConfluentAvroDeserializer(schema_registry_client: SchemaRegistryClient)[source]

Bases: Deserializer

deserialize(value: bytes) dict[source]
class papfa.consumers.Deserializer[source]

Bases: ABC

class papfa.consumers.KafkaConsumer(kafka_consumer_config: KafkaConsumerConfig, message_handler: MessageHandler, middlewares: Optional[List[ConsumerMiddleware]] = None, batch_config: BatchConfig = BatchConfig(size=100, timeout=datetime.timedelta(seconds=1)), raise_exception: bool = False, consumer_kwargs: Optional[dict] = None)[source]

Bases: BaseConsumer

commit()[source]
consume()[source]
property consumer
property consumer_name
exit_gracefully(signum, frame)[source]
flush()[source]
get_meta_data()[source]
class papfa.consumers.MessageHandler[source]

Bases: ABC

abstract handle_batch(message: List[Record]) None[source]
abstract is_satisfy(message: Record) bool[source]
papfa.consumers.consumer(topic: Optional[Union[str, List[str]]] = None, group_id: Optional[str] = None, satisfy_method: Optional[Callable] = None, batch_config: Optional[BatchConfig] = None, consumer_strategy: Optional[BaseConsumer] = None, deserialize_key: bool = False, kafka_config: Optional[KafkaConfig] = None, consumer_kwargs: Optional[dict] = None)[source]
papfa.consumers.get_default_kafka_consumer(func, satisfy_method, topics, group_id, batch_config, deserialize_key, kafka_config=None, consumer_kwargs=None)[source]

papfa.dtos module

class papfa.dtos.Record(value: dict, timestamp: Union[int, NoneType], key: Union[str, dict, NoneType] = None, headers: dict = <factory>, meta: dict = <factory>)[source]

Bases: object

headers: dict
key: Optional[Union[str, dict]] = None
meta: dict
timestamp: Optional[int]
value: dict

papfa.papfa module

Main module.

papfa.producers module

class papfa.producers.KafkaMessageProducer(topic: str, kafka_producer_config: KafkaProducerConfig, middlewares: Optional[List[ProducerMiddleware]] = None, batch_config: Optional[BatchConfig] = None, producer_kwargs: Optional[dict] = None)[source]

Bases: MessageProducer

flush(force=False)[source]
get_config()[source]
produce(message: Record, on_delivery: Optional[callable] = None, force_flush: bool = False) None[source]
property producer
class papfa.producers.KafkaMessageTransactionalProducer(topic: str, kafka_producer_config: KafkaProducerConfig, middlewares: Optional[List[ProducerMiddleware]] = None, batch_config: Optional[BatchConfig] = None, transaction_id: Optional[str] = None, producer_kwargs: Optional[dict] = None)[source]

Bases: KafkaMessageProducer

class Transaction(producer, topic)[source]

Bases: object

produce(message: Record, on_delivery: Optional[callable] = None)[source]
get_config()[source]
transaction(consumer=None)[source]
class papfa.producers.MessageProducer[source]

Bases: ABC

abstract produce(messages)[source]
papfa.producers.get_message_producer(topic, avro_model: Type[AvroModel], kafka_producer_config: Optional[KafkaProducerConfig] = None, middlewares: Optional[List[ProducerMiddleware]] = None, batch_config=None, producer_kwargs=None)[source]
papfa.producers.get_message_transactional_producer(topic, avro_model: Type[AvroModel], kafka_producer_config: Optional[KafkaProducerConfig] = None, middlewares: Optional[List[ProducerMiddleware]] = None, transaction_id='', batch_config=None, producer_kwargs=None)[source]

papfa.settings module

class papfa.settings.Papfa[source]

Bases: object

get_config(key, config: Optional[dict] = None, default=None)[source]
get_configs()[source]
get_default_or_raise_bad_config(key, default, raise_error=False)[source]
classmethod get_instance()[source]
classmethod papfa_configured()[source]
setup(config: Optional[dict] = None)[source]

papfa.utils module

papfa.utils.import_string(dotted_path)[source]

Import a dotted module path and return the attribute/class designated by the last name in the path. Raise ImportError if the import failed.

papfa.utils.make_function_from_path(path)[source]

Module contents

Top-level package for papfa.

class papfa.Papfa[source]

Bases: object

get_config(key, config: Optional[dict] = None, default=None)[source]
get_configs()[source]
get_default_or_raise_bad_config(key, default, raise_error=False)[source]
classmethod get_instance()[source]
classmethod papfa_configured()[source]
setup(config: Optional[dict] = None)[source]
papfa.consumer(topic: Optional[Union[str, List[str]]] = None, group_id: Optional[str] = None, satisfy_method: Optional[Callable] = None, batch_config: Optional[BatchConfig] = None, consumer_strategy: Optional[BaseConsumer] = None, deserialize_key: bool = False, kafka_config: Optional[KafkaConfig] = None, consumer_kwargs: Optional[dict] = None)[source]
papfa.get_message_producer(topic, avro_model: Type[AvroModel], kafka_producer_config: Optional[KafkaProducerConfig] = None, middlewares: Optional[List[ProducerMiddleware]] = None, batch_config=None, producer_kwargs=None)[source]
papfa.get_message_transactional_producer(topic, avro_model: Type[AvroModel], kafka_producer_config: Optional[KafkaProducerConfig] = None, middlewares: Optional[List[ProducerMiddleware]] = None, transaction_id='', batch_config=None, producer_kwargs=None)[source]