How to run kafka-aggregator¶
Running locally with docker-compose¶
In this guide, we use docker-compose to illustrate how to run kafka-aggregator. To run kafka-aggregator on a Kubernetes environment see the Installation guide section instead.
kafka-aggregator docker-compose configuration includes services to run Confluent Kafka (zookeeper, broker, schema-registry and control-center) and was based on this example.
Clone the kafka-aggregator repository:
$ git clone https://github.com/lsst-sqre/kafka-aggregator.git
Start the zookeeper, broker, schema-registry, internal-schema-registry and control-center services:
docker-compose up zookeeper broker schema-registry internal-schema-registry control-center
You can check the status of the Kafka cluster by opening Confluent Control Center in the browser.
On another terminal session, create a new Python virtual environment and install kafka-aggregator locally:
$ cd kafka-aggregator
$ virtualenv -p Python3 venv
$ source venv/bin/activate
$ make update
Initializing source topics¶
Note
In a production environment we expect that the source topics already exist in Kafka and that their Avro schemas are available from the Schema Registry.
Using the kafka-aggregator example module, you can initialize source topics in Kafka, control the number of fields in each topic, and produce messages for those topics at a given frequency.
With the default Configuration settings, this command will initialize 10 source topics with 10 fields each and register their Avro schemas with the Schema Registry.
kafkaaggregator -l info init-example
You can check that the source topics were created in Kafka:
docker-compose exec broker /bin/bash
root@broker:/# kafka-topics --bootstrap-server broker:29092 --list
The Avro schemas were registered with the Schema Registry:
curl http://localhost:8081/subjects
Generating Faust agents¶
Use this command to generate the Faust agents to process the source topics.
kafkaaggregator -l info generate-agents
Note
By default agents are generated under the ./agents folder where kafka-aggregator runs.
For the source topics initialized with the kafka-aggregator example module you should have this output:
kafkaaggregator -l info agents
[2020-07-06 18:30:58,115] [54727] [INFO] [^Worker]: Starting...
┌Agents─────────────────────────────┬─────────────┬──────────────────────────────────────────────────────┐
│ name                              │ topic       │ help                                                 │
├───────────────────────────────────┼─────────────┼──────────────────────────────────────────────────────┤
│ @example-000.process_source_topic │ example-000 │ Process incoming messages for the example-000 topic. │
│ @example-001.process_source_topic │ example-001 │ Process incoming messages for the example-001 topic. │
│ @example-002.process_source_topic │ example-002 │ Process incoming messages for the example-002 topic. │
│ @example-003.process_source_topic │ example-003 │ Process incoming messages for the example-003 topic. │
│ @example-004.process_source_topic │ example-004 │ Process incoming messages for the example-004 topic. │
│ @example-005.process_source_topic │ example-005 │ Process incoming messages for the example-005 topic. │
│ @example-006.process_source_topic │ example-006 │ Process incoming messages for the example-006 topic. │
│ @example-007.process_source_topic │ example-007 │ Process incoming messages for the example-007 topic. │
│ @example-008.process_source_topic │ example-008 │ Process incoming messages for the example-008 topic. │
│ @example-009.process_source_topic │ example-009 │ Process incoming messages for the example-009 topic. │
└───────────────────────────────────┴─────────────┴──────────────────────────────────────────────────────┘
[2020-07-06 18:30:58,153] [54727] [INFO] [^Worker]: Stopping...
[2020-07-06 18:30:58,153] [54727] [INFO] [^Worker]: Gathering service tasks...
[2020-07-06 18:30:58,153] [54727] [INFO] [^Worker]: Gathering all futures...
[2020-07-06 18:30:59,156] [54727] [INFO] [^Worker]: Closing event loop
Starting a worker¶
Use this command to start a kafka-aggregator worker:
kafkaaggregator -l info worker
Producing messages¶
On another terminal use this command to produce messages for the source topics. This command produces 6000 messages at 10Hz.
kafkaaggregator -l info produce --frequency 10 --max-messages 6000
You can use Confluent Control Center to inspect the messages for the source and aggregation topics or use the following from the command line:
docker-compose exec broker /bin/bash
root@broker:/# kafka-console-consumer --bootstrap-server broker:9092 --topic example-000
...
root@broker:/# kafka-console-consumer --bootstrap-server broker:9092 --topic example-000-aggregated
Inspecting the consumer lag¶
An important aspect to look at is the consumer lag for the kafkaaggregator consumers. An advantage of Faust is that you can easily add more workers to distribute the workload of the application. If the source topics are created with multiple partitions, individual partitions are assigned to different workers.
Internal vs. external managed topics¶
Faust manages topics declared as internal by the agents, like the aggregation topic, which is created by Faust and whose schema is also controlled by a Faust Record.
The kafka-aggregator example also demonstrates that we can aggregate source topics that are declared as external, i.e. not managed by Faust.  The agents assume that external topics exist and the messages can be deserialized using the Avro schemas, without specifying a model for the external topic in Faust.