Configuration

class kafkaaggregator.config.Configuration(broker: str = 'kafka://localhost:9092', registry_url: str = 'http://localhost:8081', internal_registry_url: str = 'http://localhost:8081', store: str = 'memory://', window_size: float = 1.0, window_expires: float = 1.0, min_sample_size: int = 2, operations: List[str] = <factory>, topic_partitions: int = 4, source_topic_name_prefix: str = 'example', ntopics: int = 10, nfields: int = 10, frequency: float = 10.0, max_messages: int = 10, topic_regex: str = '^example-[0-9][0-9][0-9]?$', excluded_topics: List[str] = <factory>, topic_rename_format: str = '{source_topic_name}-aggregated', excluded_field_names: List[str] = <factory>, agents_output_dir: str = 'agents', agent_template_file: str = 'agent.j2')

Bases: object

Configuration for kafkaaggregator.

Attributes Summary

agent_template_file Name of the agent Jinja2 template file.
agents_output_dir Name of output directory for the agents’ code.
broker The Kafka broker URL.
frequency The frequency in Hz in which messages are produced for the example topics.
internal_registry_url Internal Confluent Schema Registry URL.
max_messages The maximum number of messages to produce.
min_sample_size Minimum sample size to compute statistics.
nfields Number of fields for source topics used in the aggregation example.
ntopics Number of source topics used in the aggregation example.
registry_url The Confluent Schema Registry URL.
source_topic_name_prefix Prefix for the source topic name used in the aggregation example.
store The backend used for table storage.
topic_partitions Default number of partitions for new topics.
topic_regex Regex to select source topics to aggregate.
topic_rename_format A format string for the aggregation topic name, which must contain {source_topic_name} as a placeholder for the source topic name.
window_expires Window expiration time in seconds.
window_size Size of the tumbling window in seconds used to aggregate messages.

Attributes Documentation

agent_template_file = 'agent.j2'

Name of the agent Jinja2 template file.

agents_output_dir = 'agents'

Name of output directory for the agents’ code.

broker = 'kafka://localhost:9092'

The Kafka broker URL.

Currently, the only supported production transport is kafka://. This uses the aiokafka client under the hood, for consuming and producing messages.

frequency = 10.0

The frequency in Hz in which messages are produced for the example topics.

internal_registry_url = 'http://localhost:8081'

Internal Confluent Schema Registry URL.

Used in conjunction with faust-avro to register aggregated topic schemas. Depending on your Kafka setup you can use this internal Schema Registry to separate the aggregated topic schemas from other schemas and avoid Schema ID conflicts.

max_messages = 10

The maximum number of messages to produce. Set max_messages to a number smaller than 1 to produce an indefinite number of messages.

min_sample_size = 2

Minimum sample size to compute statistics.

Given the size of the tumbling window and the frequency of incoming messages, this parameter sets the minimum sample size to compute statistics. The Faust tumbling window will always contain at least one message. If the number messages in the tumbling window is smaller than min_sample_size the values of the first message are used instead.

The default value min_sample_size=2 make sure we can compute stdev.

nfields = 10

Number of fields for source topics used in the aggregation example.

ntopics = 10

Number of source topics used in the aggregation example.

registry_url = 'http://localhost:8081'

The Confluent Schema Registry URL.

Schema Registry used to read source topic schemas.

source_topic_name_prefix = 'example'

Prefix for the source topic name used in the aggregation example.

store = 'memory://'

The backend used for table storage.

Tables are stored in-memory by default. In production, a persistent table store, such as rocksdb:// is preferred.

topic_partitions = 4

Default number of partitions for new topics.

This defines the maximum number of workers we could use to distribute the workload of the application.

topic_regex = '^example-[0-9][0-9][0-9]?$'

Regex to select source topics to aggregate.

topic_rename_format = '{source_topic_name}-aggregated'

A format string for the aggregation topic name, which must contain {source_topic_name} as a placeholder for the source topic name.

window_expires = 1.0

Window expiration time in seconds. This parameter controls when the callback function to process the expired window(s) is called.

The default value is set to the window size, which means that at least two tumbling windows will be filled up with messages before the callback function is called to process the expired window(s).

Note that if the worker (or the producer) stops, the next time the callback is called it might process windows from previous executions as messages from the stream are persisted by Faust.

window_size = 1.0

Size of the tumbling window in seconds used to aggregate messages.

See also Faust’s windowing feature documentation.