Kafka Cluster using Containers - p2

Kafka Schema Registry and REST API

Will bring these two up first and then kafka connect containers as those depend on the registry.

On kfk234 (the fourth server) within the /data/kafka directory on the second drive.

First, an environment file with passwords for certs (the file is in the same directory as the compose file)

.env

SSL_KEYSTORE_PASSWD=Password123
SSL_TRUSTSTORE_PASSWD=Password123
SSL_KEY_PASSWD=Password123

kfk234-schema-reg-rest.yaml

version: '2.4'
name: sch-reg-rest-pxy
services:

  # https://github.com/confluentinc/schema-registry
  schema-registry:
    image: confluentinc/cp-schema-registry:6.2.1
    hostname: kfk234.domain.dom
    container_name: schema-registry
    network_mode: host
    restart: "unless-stopped"
    volumes:
      - ./secrets:/etc/schema-registry/secrets
    env_file: .env_sch_reg_rest_pxy
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'SSL://kfk231:9093,SSL://kfk232:9093,SSL://kfk233:9093'
      #SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kfk231:9092,PLAINTEXT://kfk232:9092,PLAINTEXT://kfk233:9092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_HOST_NAME: "192.168.122.234"
      SCHEMA_REGISTRY_KAFKASTORE_SECURITY_PROTOCOL: SSL
      SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/kafka.kfk-cluster.keystore.jks
      SCHEMA_REGISTRY_SSL_KEYSTORE_LOCATION: /etc/schema-registry/secrets/kafka.kfk-cluster.keystore.jks
      SCHEMA_REGISTRY_KAFKASTORE_SSL_KEYSTORE_PASSWORD: ${SSL_KEYSTORE_PASSWD}
      SCHEMA_REGISTRY_SSL_KEYSTORE_PASSWORD: ${SSL_KEYSTORE_PASSWD}
      SCHEMA_REGISTRY_KAFKASTORE_SSL_KEY_PASSWORD: ${SSL_KEY_PASSWD}
      SCHEMA_REGISTRY_SSL_KEY_PASSWORD: ${SSL_KEY_PASSWD}
      SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/kafka.kfk-cluster.truststore.jks
      SCHEMA_REGISTRY_SSL_TRUSTSTORE_LOCATION: /etc/schema-registry/secrets/kafka.kfk-cluster.truststore.jks
      SCHEMA_REGISTRY_KAFKASTORE_SSL_TRUSTSTORE_PASSWORD: ${SSL_TRUSTSTORE_PASSWD}
      SCHEMA_REGISTRY_SSL_TRUSTSTORE_PASSWORD: ${SSL_TRUSTSTORE_PASSWD}
      SCHEMA_REGISTRY_SCHEMA_REGISTRY_INTER_INSTANCE_PROTOCOL: http
      SCHEMA_REGISTRY_KAFKASTORE_TOPIC: _allschemas
      SCHEMA_REGISTRY_SSL_CLIENT_AUTHENTICATION: NONE
      SCHEMA_REGISTRY_KAFKASTORE_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
    extra_hosts:
      - "kfk231:192.168.122.231"
      - "kfk231-kafka:192.168.122.231"
      - "kfk231-zk:192.168.122.231"
      - "kfk231-cnt:192.168.122.231"
      - "kfk231-reg:192.168.122.231"
      - "kfk231.domain.dom:192.168.122.231"
      - "kfk232:192.168.122.232"
      - "kfk232-kafka:192.168.122.232"
      - "kfk232-zk:192.168.122.232"
      - "kfk232-cnt:192.168.122.232"
      - "kfk232-reg:192.168.122.232"
      - "kfk232.domain.dom:192.168.122.232"
      - "kfk233:192.168.122.233"
      - "kfk233-kafka:192.168.122.233"
      - "kfk233-zk:192.168.122.233"
      - "kfk233-cnt:192.168.122.233"
      - "kfk233-reg:192.168.122.233"
      - "kfk233.domain.dom:192.168.122.233"
      - "kfk234:192.168.122.234"
      - "kfk234.domain.dom:192.168.122.234"

  # https://github.com/confluentinc/kafka-rest
  rest-proxy:
    image: confluentinc/cp-kafka-rest:6.2.1
    hostname: kfk234.domain.dom
    container_name: rest-proxy
    network_mode: host
    restart: "unless-stopped"
    environment:
      KAFKA_REST_BOOTSTRAP_SERVERS: 'SSL://kfk231:9093,SSL://kfk232:9093,SSL://kfk233:9093'
      KAFKA_REST_LISTENERS: http://0.0.0.0:8082/
      KAFKA_REST_SCHEMA_REGISTRY_URL: http://kfk234:8081/
      KAFKA_REST_HOST_NAME: kfk234      # "192.168.122.234"
      KAFKA_REST_CLIENT_SECURITY_PROTOCOL: SSL
      KAFKA_REST_CLIENT_SSL_KEYSTORE_LOCATION: /etc/rest-proxy/secrets/kafka.kfk-cluster.keystore.jks
      KAFKA_REST_CLIENT_SSL_KEYSTORE_PASSWORD: ${SSL_KEYSTORE_PASSWD}
      KAFKA_REST_CLIENT_SSL_TRUSTSTORE_LOCATION: /etc/rest-proxy/secrets/kafka.kfk-cluster.truststore.jks
      KAFKA_REST_CLIENT_SSL_TRUSTSTORE_PASSWORD: ${SSL_TRUSTSTORE_PASSWD}
      KAFKA_REST_CLIENT_SSL_KEY_PASSWORD: ${SSL_KEY_PASSWD}
      KAFKA_REST_CLIENT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
    depends_on:
      - schema-registry
    volumes:
      - ./secrets:/etc/rest-proxy/secrets
    extra_hosts:
      - "kfk231:192.168.122.231"
      - "kfk231-kafka:192.168.122.231"
      - "kfk231-zk:192.168.122.231"
      - "kfk231-cnt:192.168.122.231"
      - "kfk231-reg:192.168.122.231"
      - "kfk231.domain.dom:192.168.122.231"
      - "kfk232:192.168.122.232"
      - "kfk232-kafka:192.168.122.232"
      - "kfk232-zk:192.168.122.232"
      - "kfk232-cnt:192.168.122.232"
      - "kfk232-reg:192.168.122.232"
      - "kfk232.domain.dom:192.168.122.232"
      - "kfk233:192.168.122.233"
      - "kfk233-kafka:192.168.122.233"
      - "kfk233-zk:192.168.122.233"
      - "kfk233-cnt:192.168.122.233"
      - "kfk233-reg:192.168.122.233"
      - "kfk233.domain.dom:192.168.122.233"
      - "kfk234:192.168.122.234"
      - "kfk234.domain.dom:192.168.122.234"
docker compose -f kfk234-schema-reg-rest.yaml up -d
tree -d -L 2
.
└── kafka
    ├── certificates
    └── secrets

Kafka Connect

Same on each the three cluster nodes (kfk231, kfk232, kfk233)

Working in /data/kafka directory

We need an .env (in the same directory as the compose file) to hold the passwords.

.env

SSL_KEYSTORE_PASSWD=Password123
SSL_TRUSTSTORE_PASSWD=Password123
SSL_KEY_PASSWD=Password123

On kfk231 - kfk231-connect-compose.yaml

version: '2.4'
name: kfk231-cnt
services:

  kfk231-cnt:
    image: confluentinc/cp-kafka-connect:6.2.1
    hostname: kfk231.domain.dom
    container_name: kfk231-cnt
    network_mode: host
    restart: "unless-stopped"
    volumes:
      - ./connectors:/etc/kafka-connect/jars/
      - ./secrets:/var/lib/kafka/ssl/
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kfk231:9093,kfk232:9093,kfk233:9093'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: connect-cluster-1
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kfk234:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kfk234:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kfk231"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,org.apache.kafka.connect=INFO"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "3"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "3"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "3"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components'
      # CONNECT_KAFKA_HEAP_OPTS: "-Xms4G -Xmx8G" 
      CONNECT_OFFSET_FLUSH_TIMEOUT_MS: 30000
      CONNECT_SECURITY_PROTOCOL: SSL 
      CONNECT_SSL_KEYSTORE_LOCATION: /var/lib/kafka/ssl/kafka.kfk-cluster.keystore.jks 
      CONNECT_SSL_KEYSTORE_PASSWORD: ${SSL_KEYSTORE_PASSWD}
      CONNECT_SSL_TRUSTSTORE_PASSWORD: ${SSL_TRUSTSTORE_PASSWD}
      CONNECT_SSL_KEY_PASSWORD: ${SSL_KEY_PASSWD}
      CONNECT_SSL_TRUSTSTORE_LOCATION: /var/lib/kafka/ssl/kafka.kfk-cluster.truststore.jks 
      CONNECT_SSL_ENABLED_PROTOCOLS: "TLSv1.2" 
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "" 
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
      CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: "All"
    extra_hosts:
      - "kfk231:192.168.122.231"
      - "kfk231-kafka:192.168.122.231"
      - "kfk231-zk:192.168.122.231"
      - "kfk231-cnt:192.168.122.231"
      - "kfk231-reg:192.168.122.231"
      - "kfk231.domain.dom:192.168.122.231"
      - "kfk232:192.168.122.232"
      - "kfk232-kafka:192.168.122.232"
      - "kfk232-zk:192.168.122.232"
      - "kfk232-cnt:192.168.122.232"
      - "kfk232-reg:192.168.122.232"
      - "kfk232.domain.dom:192.168.122.232"
      - "kfk233:192.168.122.233"
      - "kfk233-kafka:192.168.122.233"
      - "kfk233-zk:192.168.122.233"
      - "kfk233-cnt:192.168.122.233"
      - "kfk233-reg:192.168.122.233"
      - "kfk233.domain.dom:192.168.122.233"
      - "kfk234:192.168.122.234"
      - "kfk234.domain.dom:192.168.122.234" 
docker compose -f kfk231-connect-compose.yaml up -d

On kfk232 - kfk232-connect-compose.yaml

version: '2.4'
name: kfk232-cnt
services:

  kfk232-cnt:
    image: confluentinc/cp-kafka-connect:6.2.1
    hostname: kfk232.domain.dom
    container_name: kfk232-cnt
    network_mode: host
    restart: "unless-stopped"
    volumes:
      - ./connectors:/etc/kafka-connect/jars/
      - ./secrets:/var/lib/kafka/ssl/
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kfk231:9093,kfk232:9093,kfk233:9093'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: connect-cluster-1
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kfk234:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kfk234:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kfk232"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,org.apache.kafka.connect=INFO"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "3"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "3"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "3"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components'
      # CONNECT_KAFKA_HEAP_OPTS: "-Xms4G -Xmx8G" 
      CONNECT_OFFSET_FLUSH_TIMEOUT_MS: 30000
      CONNECT_SECURITY_PROTOCOL: SSL 
      CONNECT_SSL_KEYSTORE_LOCATION: /var/lib/kafka/ssl/kafka.kfk-cluster.keystore.jks 
      CONNECT_SSL_KEYSTORE_PASSWORD: ${SSL_KEYSTORE_PASSWD}
      CONNECT_SSL_TRUSTSTORE_PASSWORD: ${SSL_TRUSTSTORE_PASSWD}
      CONNECT_SSL_KEY_PASSWORD: ${SSL_KEY_PASSWD}
      CONNECT_SSL_TRUSTSTORE_LOCATION: /var/lib/kafka/ssl/kafka.kfk-cluster.truststore.jks 
      CONNECT_SSL_ENABLED_PROTOCOLS: "TLSv1.2" 
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "" 
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
      CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: "All"
    extra_hosts:
      - "kfk231:192.168.122.231"
      - "kfk231-kafka:192.168.122.231"
      - "kfk231-zk:192.168.122.231"
      - "kfk231-cnt:192.168.122.231"
      - "kfk231-reg:192.168.122.231"
      - "kfk231.domain.dom:192.168.122.231"
      - "kfk232:192.168.122.232"
      - "kfk232-kafka:192.168.122.232"
      - "kfk232-zk:192.168.122.232"
      - "kfk232-cnt:192.168.122.232"
      - "kfk232-reg:192.168.122.232"
      - "kfk232.domain.dom:192.168.122.232"
      - "kfk233:192.168.122.233"
      - "kfk233-kafka:192.168.122.233"
      - "kfk233-zk:192.168.122.233"
      - "kfk233-cnt:192.168.122.233"
      - "kfk233-reg:192.168.122.233"
      - "kfk233.domain.dom:192.168.122.233"
      - "kfk234:192.168.122.234"
      - "kfk234.domain.dom:192.168.122.234"
docker compose -f kfk232-connect-compose.yaml up -d

On kfk233 - kfk233-connect-compose.yaml

version: '2.4'
name: kfk233-cnt
services:

  kfk233-cnt:
    image: confluentinc/cp-kafka-connect:6.2.1
    hostname: kfk233.domain.dom
    container_name: kfk233-cnt
    network_mode: host
    restart: "unless-stopped"
    volumes:
      - ./connectors:/etc/kafka-connect/jars/
      - ./secrets:/var/lib/kafka/ssl/
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'kfk231:9093,kfk232:9093,kfk233:9093'
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: connect-cluster-1
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kfk234:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://kfk234:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_REST_ADVERTISED_HOST_NAME: "kfk233"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR,org.apache.kafka.connect=INFO"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "3"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "3"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "3"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components'
      # CONNECT_KAFKA_HEAP_OPTS: "-Xms4G -Xmx8G" 
      CONNECT_OFFSET_FLUSH_TIMEOUT_MS: 30000
      CONNECT_SECURITY_PROTOCOL: SSL 
      CONNECT_SSL_KEYSTORE_LOCATION: /var/lib/kafka/ssl/kafka.kfk-cluster.keystore.jks 
      CONNECT_SSL_KEYSTORE_PASSWORD: ${SSL_KEYSTORE_PASSWD}
      CONNECT_SSL_TRUSTSTORE_PASSWORD: ${SSL_TRUSTSTORE_PASSWD}
      CONNECT_SSL_KEY_PASSWORD: ${SSL_KEY_PASSWD}
      CONNECT_SSL_TRUSTSTORE_LOCATION: /var/lib/kafka/ssl/kafka.kfk-cluster.truststore.jks 
      CONNECT_SSL_ENABLED_PROTOCOLS: "TLSv1.2" 
      CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
      CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "" 
      CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: ""
      CONNECT_CONNECTOR_CLIENT_CONFIG_OVERRIDE_POLICY: "All"
    extra_hosts:
      - "kfk231:192.168.122.231"
      - "kfk231-kafka:192.168.122.231"
      - "kfk231-zk:192.168.122.231"
      - "kfk231-cnt:192.168.122.231"
      - "kfk231-reg:192.168.122.231"
      - "kfk231.domain.dom:192.168.122.231"
      - "kfk232:192.168.122.232"
      - "kfk232-kafka:192.168.122.232"
      - "kfk232-zk:192.168.122.232"
      - "kfk232-cnt:192.168.122.232"
      - "kfk232-reg:192.168.122.232"
      - "kfk232.domain.dom:192.168.122.232"
      - "kfk233:192.168.122.233"
      - "kfk233-kafka:192.168.122.233"
      - "kfk233-zk:192.168.122.233"
      - "kfk233-cnt:192.168.122.233"
      - "kfk233-reg:192.168.122.233"
      - "kfk233.domain.dom:192.168.122.233"
      - "kfk234:192.168.122.234"
      - "kfk234.domain.dom:192.168.122.234"
docker compose -f kfk233-connect-compose.yaml up -d