Kafka Connect – это интеграционный фреймворк, который является частью проекта Apache Kafka. На платформах Kubernetes и Red Hat OpenShift его можно развернуть с помощью операторов Strimzi и Red Hat AMQ Streams. В Kafka Connect есть два вида коннекторов: source и sink. Первые предназначены для загрузки данных в Kafka из внешних систем, вторые – для выгрузки данных из Kafka. Подключение к внешним системам, как правило, требует аутентификации, поэтому при настройке коннектора надо указывать учетные данные. В этом посте мы покажем как пользоваться секретами Kubernetes для хранения учетных данных при настройке коннекторов.
Здесь мы будем использовать source-коннектор S3, который входит в состав Apache Camel Kafka (подробнее см. здесь), и на его примере покажем, как сконфигурировать коннектор, чтобы он использовал секрет.
Описанная здесь процедура настройки универсальна и подходит для коннекторов любых типов. Мы будем применять коннектор S3 для подключения к хранилищу Amazon AWS S3 и загрузки файлов из корзины (bucket) S3 в топик (topic) Apache Kafka. Для подключения к хранилищу S3 нам потребуются следующие учетные данные AWS: ключ доступа (access key) и секретный ключ (secret key). Итак, начнем с того, что подготовим секрет с учетными данными.
aws_access_key_id=AKIAIOSFODNN7EXAMPLE
aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
$ kubectl create secret generic aws-credentials --from-file=./aws-credentials.properties
FROM strimzi/kafka:0.16.1-kafka-2.4.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001
FROM registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER jboss:jboss
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnect
metadata:
name: my-connect-cluster
spec:
image: docker.io/scholzj/kafka:camel-kafka-2.4.0
replicas: 3
bootstrapServers: my-cluster-kafka-bootstrap:9092
externalConfiguration:
volumes:
- name: aws-credentials
secret:
secretName: aws-credentials
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: false
value.converter.schemas.enable: false
image: docker.io/scholzj/kafka:camel-kafka-2.4.0
externalConfiguration:
volumes:
- name: aws-credentials
secret:
secretName: aws-credentials
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
{
"name": "s3-connector",
"config": {
"connector.class": "org.apache.camel.kafkaconnector.CamelSourceConnector",
"tasks.max": "1",
"camel.source.kafka.topic": "s3-topic",
"camel.source.maxPollDuration": "10000",
"camel.source.url": "aws-s3://camel-connector-test?autocloseBody=false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.camel.kafkaconnector.converters.S3ObjectConverter",
"camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}",
"camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}",
"camel.component.aws-s3.configuration.region": "US_EAST_1"
}
}
"camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}"
"camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}"
$ curl -X POST -H "Content-Type: application/json" -d connector-config.json http://my-connect-cluster-connect-api:8083/connectors
$ curl http://my-connect-cluster-connect-api:8083/connectors/s3-connector
{
"name": "s3-connector",
"config": {
"connector.class": "org.apache.camel.kafkaconnector.CamelSourceConnector",
"camel.source.maxPollDuration": "10000",
"camel.source.url": "aws-s3://camel-connector-test?autocloseBody=false",
"camel.component.aws-s3.configuration.region": "US_EAST_1",
"camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}",
"tasks.max": "1",
"name": "s3-connector",
"value.converter": "org.apache.camel.kafkaconnector.converters.S3ObjectConverter",
"camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"camel.source.kafka.topic": "s3-topic"
},
"tasks": [
{
"connector": "s3-connector",
"task": 0
}
],
"type": "source"
}
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: s3-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: org.apache.camel.kafkaconnector.CamelSourceConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.camel.kafkaconnector.converters.S3ObjectConverter
camel.source.kafka.topic: s3-topic
camel.source.url: aws-s3://camel-connector-test?autocloseBody=false
camel.source.maxPollDuration: 10000
camel.component.aws-s3.configuration.access-key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
camel.component.aws-s3.configuration.secret-key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
camel.component.aws-s3.configuration.region: US_EAST_1
К сожалению, не доступен сервер mySQL