#!/bin/bash set -euo pipefail # Retention: messages older than 2 days are deleted (172800000 ms = 2 days) RETENTION_MS=172800000 KAFKA_BOOTSTRAP_SERVER="kafka:29092" wait_for_topic() { local topic="$1" # Topic metadata can take a moment to become available after creation. for i in $(seq 1 30); do if /opt/kafka/bin/kafka-topics.sh \ --bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \ --topic "$topic" \ --describe >/dev/null 2>&1; then return 0 fi sleep 1 done echo "timed out waiting for topic '$topic' to be available" >&2 return 1 } # create topic rawbeacons /opt/kafka/bin/kafka-topics.sh --bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \ --create --if-not-exists --topic rawbeacons \ --partitions 1 --replication-factor 1 --config retention.ms=$RETENTION_MS # create topic apibeacons /opt/kafka/bin/kafka-topics.sh --bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \ --create --if-not-exists --topic apibeacons \ --partitions 1 --replication-factor 1 --config retention.ms=$RETENTION_MS # create topic alertBeacons /opt/kafka/bin/kafka-topics.sh --bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \ --create --if-not-exists --topic alertbeacons \ --partitions 1 --replication-factor 1 --config retention.ms=$RETENTION_MS # create topic locevents /opt/kafka/bin/kafka-topics.sh --bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \ --create --if-not-exists --topic locevents \ --partitions 1 --replication-factor 1 --config retention.ms=$RETENTION_MS # create topic settings /opt/kafka/bin/kafka-topics.sh --bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \ --create --if-not-exists --topic settings \ --partitions 1 --replication-factor 1 --config retention.ms=$RETENTION_MS # create topic alert /opt/kafka/bin/kafka-topics.sh --bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \ --create --if-not-exists --topic alert \ --partitions 1 --replication-factor 1 --config retention.ms=$RETENTION_MS # create topic healthlocation /opt/kafka/bin/kafka-topics.sh --bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \ --create --if-not-exists --topic healthlocation \ --partitions 1 --replication-factor 1 --config retention.ms=$RETENTION_MS # create topic healthdecoder /opt/kafka/bin/kafka-topics.sh --bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \ --create --if-not-exists --topic healthdecoder \ --partitions 1 --replication-factor 1 --config retention.ms=$RETENTION_MS # create topic healthbridge /opt/kafka/bin/kafka-topics.sh --bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \ --create --if-not-exists --topic healthbridge \ --partitions 1 --replication-factor 1 --config retention.ms=$RETENTION_MS # create topic parser (server writes parser config, decoder reads it) /opt/kafka/bin/kafka-topics.sh --bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \ --create --if-not-exists --topic parser \ --partitions 1 --replication-factor 1 --config retention.ms=$RETENTION_MS # create topic mqtt (bridge reads/writes MQTT event payloads) /opt/kafka/bin/kafka-topics.sh --bootstrap-server "$KAFKA_BOOTSTRAP_SERVER" \ --create --if-not-exists --topic mqtt \ --partitions 1 --replication-factor 1 --config retention.ms=$RETENTION_MS wait_for_topic rawbeacons wait_for_topic apibeacons wait_for_topic alertbeacons wait_for_topic locevents wait_for_topic settings wait_for_topic alert wait_for_topic healthlocation wait_for_topic healthdecoder wait_for_topic healthbridge wait_for_topic parser wait_for_topic mqtt