- JDBC Connector (Source and Sink) Kafka connect에 들어가서 해당 connector를 install 하던가 다운로드 받아서 설치해야한다.
- 메뉴얼로 설치하기 위해서 Connect에 접속해서 connector plugin들이 설치된 위치를 확인한다.
- 현재 로컬 위치에는 /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars 이렇게 3개의 폴더로 설정 되어있다.
- 처음의 ‘/usr/share/java,/usr/share’ 이 폴더에 위에서 다운받은 폴더를 복사한다.
- connecte를 재기동한다.
- 혹시 mysql driver 를 찾을수 없다고 나오면
Maven Repository: mysql » mysql-connector-java » 8.0.27 여기서 driver를 다운받아서
해당 connector에 lib 폴더밑에 복사하고 재기동 하면 된다.
Configuration
{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"transforms.TimestampConverter.target.type": "Timestamp",
"connection.password": "*****",
"transforms.TimestampConverter.field": "update_date",
"tasks.max": "1",
"transforms": "unwrap, route, TimestampConverter",
"tombstones.on.delete": "true",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"schema.registry.url": "http://schema-registry:8081",
"transforms.route.regex": "([^.]+)\.([^.]+)\.([^.]+)",
"auto.evolve": "false",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"insert.mode": "upsert",
"transforms.route.replacement": "$3",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"table.name.format": "${topic}",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"topics.regex": "postgre_source.public.(.)",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "false",
"delete.enabled": "true",
"connection.user": "root",
"value.converter.schemas.enable": "true",
"name": "My Sql Sink Connector2",
"auto.create": "true",
"connection.url": "jdbc:mysql://172.18.0.3:3306/customerdb?user=root&password=root",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"pk.mode": "record_key"
}