Kafka Cluster using Containers - p2
Kafka Schema Registry and REST API
Will bring these two up first and then kafka connect containers as those depend on the registry.
On kfk234 (the fourth server) within the /data/kafka directory on the second drive.
First, an environment file with passwords for certs (the file is in the same directory as the compose file)
.env
SSL_KEYSTORE_PASSWD=Password123
SSL_TRUSTSTORE_PASSWD=Password123
SSL_KEY_PASSWD=Password123
kfk234-schema-reg-rest.yaml
version: '2.4'
name: sch-reg-rest-pxy
services:
# https://github.com/confluentinc/schema-registry
schema-registry:
image: confluentinc/cp-schema-registry:6.2.1
hostname: kfk234.domain.dom
container_name: schema-registry
network_mode: host
restart: "unless-stopped"
volumes:
- ./secrets:/etc/schema-registry/secrets
env_file: .env_sch_reg_rest_pxy
environment:
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'SSL://kfk231:9093,SSL://kfk232:9093,SSL://kfk233:9093'
#SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kfk231:9092,PLAINTEXT://kfk232:9092,PLAINTEXT://kfk233:9092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
SCHEMA_REGISTRY_HOST_NAME: "192.168.122.234"
SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SSL
SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/kafka.kfk-cluster.keystore.jks
SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/kafka.kfk-cluster.keystore.jks
SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_PASSWORD: ${SSL_KEYSTORE_PASSWD}
SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: ${SSL_KEYSTORE_PASSWD}
SCHEMA_REGISTRY_KAFKASTORE_SSL_KEY_PASSWORD: ${SSL_KEY_PASSWD}
SCHEMA_REGISTRY_SSL_KEY_PASSWORD: ${SSL_KEY_PASSWD}
SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/kafka.kfk-cluster.truststore.jks
SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/kafka.kfk-cluster.truststore.jks
SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: ${SSL_TRUSTSTORE_PASSWD}
SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: ${SSL_TRUSTSTORE_PASSWD}
SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: http
SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _allschemas
SCHEMA_REGISTRY_SSL_CLIENT_AUTHENTICATION: NONE
SCHEMA_REGISTRY_KAFKASTORE_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
extra_hosts:
- "kfk231:192.168.122.231"
- "kfk231-kafka:192.168.122.231"
- "kfk231-zk:192.168.122.231"
- "kfk231-cnt:192.168.122.231"
- "kfk231-reg:192.168.122.231"
- "kfk231.domain.dom:192.168.122.231"
- "kfk232:192.168.122.232"
- "kfk232-kafka:192.168.122.232"
- "kfk232-zk:192.168.122.232"
- "kfk232-cnt:192.168.122.232"
- "kfk232-reg:192.168.122.232"
- "kfk232.domain.dom:192.168.122.232"
- "kfk233:192.168.122.233"
- "kfk233-kafka:192.168.122.233"
- "kfk233-zk:192.168.122.233"
- "kfk233-cnt:192.168.122.233"
- "kfk233-reg:192.168.122.233"
- "kfk233.domain.dom:192.168.122.233"
- "kfk234:192.168.122.234"
- "kfk234.domain.dom:192.168.122.234"
# https://github.com/confluentinc/kafka-rest
rest-proxy:
image: confluentinc/cp-kafka-rest:6.2.1
hostname: kfk234.domain.dom
container_name: rest-proxy
network_mode: host
restart: "unless-stopped"
environment:
KAFKA_REST_BOOTSTRAP_SERVERS: 'SSL://kfk231:9093,SSL://kfk232:9093,SSL://kfk233:9093'
KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
KAFKA_REST_SCHEMA_REGISTRY_URL: http://kfk234:8081/
KAFKA_REST_HOST_NAME: kfk234 # "192.168.122.234"
KAFKA_REST_CLIENT_SECURITY_PROTOCOL: SSL
KAFKA_REST_CLIENT_SSL_KEYSTORE_LOCATION: /etc/rest-proxy/secrets/kafka.kfk-cluster.keystore.jks
KAFKA_REST_CLIENT_SSL_KEYSTORE_PASSWORD: ${SSL_KEYSTORE_PASSWD}
KAFKA_REST_CLIENT_SSL_TRUSTSTORE_LOCATION: /etc/rest-proxy/secrets/kafka.kfk-cluster.truststore.jks
KAFKA_REST_CLIENT_SSL_TRUSTSTORE_PASSWORD: ${SSL_TRUSTSTORE_PASSWD}
KAFKA_REST_CLIENT_SSL_KEY_PASSWORD: ${SSL_KEY_PASSWD}
KAFKA_REST_CLIENT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
depends_on:
- schema-registry
volumes:
- ./secrets:/etc/rest-proxy/secrets
extra_hosts:
- "kfk231:192.168.122.231"
- "kfk231-kafka:192.168.122.231"
- "kfk231-zk:192.168.122.231"
- "kfk231-cnt:192.168.122.231"
- "kfk231-reg:192.168.122.231"
- "kfk231.domain.dom:192.168.122.231"
- "kfk232:192.168.122.232"
- "kfk232-kafka:192.168.122.232"
- "kfk232-zk:192.168.122.232"
- "kfk232-cnt:192.168.122.232"
- "kfk232-reg:192.168.122.232"
- "kfk232.domain.dom:192.168.122.232"
- "kfk233:192.168.122.233"
- "kfk233-kafka:192.168.122.233"
- "kfk233-zk:192.168.122.233"
- "kfk233-cnt:192.168.122.233"
- "kfk233-reg:192.168.122.233"
- "kfk233.domain.dom:192.168.122.233"
- "kfk234:192.168.122.234"
- "kfk234.domain.dom:192.168.122.234"
docker compose -f kfk234-schema-reg-rest.yaml up -d
tree -d -L 2
.
└── kafka
├── certificates
└── secrets
Kafka Connect
Same on each the three cluster nodes (kfk231, kfk232, kfk233)
Working in /data/kafka directory
We need an .env (in the same directory as the compose file) to hold the passwords.
.env
SSL_KEYSTORE_PASSWD=Password123
SSL_TRUSTSTORE_PASSWD=Password123
SSL_KEY_PASSWD=Password123
On kfk231 - kfk231-connect-compose.yaml
version: '2.4'
name: kfk231-cnt
services:
kfk231-cnt:
image: confluentinc/cp-kafka-connect:6.2.1
hostname: kfk231.domain.dom
container_name: kfk231-cnt
network_mode: host
restart: "unless-stopped"
volumes:
- ./connectors:/etc/kafka-connect/jars/
- ./secrets:/var/lib/kafka/ssl/
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kfk231:9093,kfk232:9093,kfk233:9093'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-cluster-1
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kfk234:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kfk234:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kfk231"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,org.apache.kafka.connect=INFO"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components'
# CONNECT_KAFKA_HEAP_OPTS: "-Xms4G -Xmx8G"
CONNECT_OFFSET_FLUSH_TIMEOUT_MS: 30000
CONNECT_SECURITY_PROTOCOL: SSL
CONNECT_SSL_KEYSTORE_LOCATION: /var/lib/kafka/ssl/kafka.kfk-cluster.keystore.jks
CONNECT_SSL_KEYSTORE_PASSWORD: ${SSL_KEYSTORE_PASSWD}
CONNECT_SSL_TRUSTSTORE_PASSWORD: ${SSL_TRUSTSTORE_PASSWD}
CONNECT_SSL_KEY_PASSWORD: ${SSL_KEY_PASSWD}
CONNECT_SSL_TRUSTSTORE_LOCATION: /var/lib/kafka/ssl/kafka.kfk-cluster.truststore.jks
CONNECT_SSL_ENABLED_PROTOCOLS: "TLSv1.2"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: "All"
extra_hosts:
- "kfk231:192.168.122.231"
- "kfk231-kafka:192.168.122.231"
- "kfk231-zk:192.168.122.231"
- "kfk231-cnt:192.168.122.231"
- "kfk231-reg:192.168.122.231"
- "kfk231.domain.dom:192.168.122.231"
- "kfk232:192.168.122.232"
- "kfk232-kafka:192.168.122.232"
- "kfk232-zk:192.168.122.232"
- "kfk232-cnt:192.168.122.232"
- "kfk232-reg:192.168.122.232"
- "kfk232.domain.dom:192.168.122.232"
- "kfk233:192.168.122.233"
- "kfk233-kafka:192.168.122.233"
- "kfk233-zk:192.168.122.233"
- "kfk233-cnt:192.168.122.233"
- "kfk233-reg:192.168.122.233"
- "kfk233.domain.dom:192.168.122.233"
- "kfk234:192.168.122.234"
- "kfk234.domain.dom:192.168.122.234"
docker compose -f kfk231-connect-compose.yaml up -d
On kfk232 - kfk232-connect-compose.yaml
version: '2.4'
name: kfk232-cnt
services:
kfk232-cnt:
image: confluentinc/cp-kafka-connect:6.2.1
hostname: kfk232.domain.dom
container_name: kfk232-cnt
network_mode: host
restart: "unless-stopped"
volumes:
- ./connectors:/etc/kafka-connect/jars/
- ./secrets:/var/lib/kafka/ssl/
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kfk231:9093,kfk232:9093,kfk233:9093'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-cluster-1
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kfk234:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kfk234:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kfk232"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,org.apache.kafka.connect=INFO"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components'
# CONNECT_KAFKA_HEAP_OPTS: "-Xms4G -Xmx8G"
CONNECT_OFFSET_FLUSH_TIMEOUT_MS: 30000
CONNECT_SECURITY_PROTOCOL: SSL
CONNECT_SSL_KEYSTORE_LOCATION: /var/lib/kafka/ssl/kafka.kfk-cluster.keystore.jks
CONNECT_SSL_KEYSTORE_PASSWORD: ${SSL_KEYSTORE_PASSWD}
CONNECT_SSL_TRUSTSTORE_PASSWORD: ${SSL_TRUSTSTORE_PASSWD}
CONNECT_SSL_KEY_PASSWORD: ${SSL_KEY_PASSWD}
CONNECT_SSL_TRUSTSTORE_LOCATION: /var/lib/kafka/ssl/kafka.kfk-cluster.truststore.jks
CONNECT_SSL_ENABLED_PROTOCOLS: "TLSv1.2"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: "All"
extra_hosts:
- "kfk231:192.168.122.231"
- "kfk231-kafka:192.168.122.231"
- "kfk231-zk:192.168.122.231"
- "kfk231-cnt:192.168.122.231"
- "kfk231-reg:192.168.122.231"
- "kfk231.domain.dom:192.168.122.231"
- "kfk232:192.168.122.232"
- "kfk232-kafka:192.168.122.232"
- "kfk232-zk:192.168.122.232"
- "kfk232-cnt:192.168.122.232"
- "kfk232-reg:192.168.122.232"
- "kfk232.domain.dom:192.168.122.232"
- "kfk233:192.168.122.233"
- "kfk233-kafka:192.168.122.233"
- "kfk233-zk:192.168.122.233"
- "kfk233-cnt:192.168.122.233"
- "kfk233-reg:192.168.122.233"
- "kfk233.domain.dom:192.168.122.233"
- "kfk234:192.168.122.234"
- "kfk234.domain.dom:192.168.122.234"
docker compose -f kfk232-connect-compose.yaml up -d
On kfk233 - kfk233-connect-compose.yaml
version: '2.4'
name: kfk233-cnt
services:
kfk233-cnt:
image: confluentinc/cp-kafka-connect:6.2.1
hostname: kfk233.domain.dom
container_name: kfk233-cnt
network_mode: host
restart: "unless-stopped"
volumes:
- ./connectors:/etc/kafka-connect/jars/
- ./secrets:/var/lib/kafka/ssl/
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kfk231:9093,kfk232:9093,kfk233:9093'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: connect-cluster-1
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kfk234:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kfk234:8081'
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_REST_ADVERTISED_HOST_NAME: "kfk233"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,org.apache.kafka.connect=INFO"
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "3"
CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components'
# CONNECT_KAFKA_HEAP_OPTS: "-Xms4G -Xmx8G"
CONNECT_OFFSET_FLUSH_TIMEOUT_MS: 30000
CONNECT_SECURITY_PROTOCOL: SSL
CONNECT_SSL_KEYSTORE_LOCATION: /var/lib/kafka/ssl/kafka.kfk-cluster.keystore.jks
CONNECT_SSL_KEYSTORE_PASSWORD: ${SSL_KEYSTORE_PASSWD}
CONNECT_SSL_TRUSTSTORE_PASSWORD: ${SSL_TRUSTSTORE_PASSWD}
CONNECT_SSL_KEY_PASSWORD: ${SSL_KEY_PASSWD}
CONNECT_SSL_TRUSTSTORE_LOCATION: /var/lib/kafka/ssl/kafka.kfk-cluster.truststore.jks
CONNECT_SSL_ENABLED_PROTOCOLS: "TLSv1.2"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: "All"
extra_hosts:
- "kfk231:192.168.122.231"
- "kfk231-kafka:192.168.122.231"
- "kfk231-zk:192.168.122.231"
- "kfk231-cnt:192.168.122.231"
- "kfk231-reg:192.168.122.231"
- "kfk231.domain.dom:192.168.122.231"
- "kfk232:192.168.122.232"
- "kfk232-kafka:192.168.122.232"
- "kfk232-zk:192.168.122.232"
- "kfk232-cnt:192.168.122.232"
- "kfk232-reg:192.168.122.232"
- "kfk232.domain.dom:192.168.122.232"
- "kfk233:192.168.122.233"
- "kfk233-kafka:192.168.122.233"
- "kfk233-zk:192.168.122.233"
- "kfk233-cnt:192.168.122.233"
- "kfk233-reg:192.168.122.233"
- "kfk233.domain.dom:192.168.122.233"
- "kfk234:192.168.122.234"
- "kfk234.domain.dom:192.168.122.234"
docker compose -f kfk233-connect-compose.yaml up -d