본문 바로가기

Open Source/Kafka

Kafka JDBC SinkConnector

  1. JDBC Connector (Source and Sink) Kafka connect에 들어가서 해당 connector를 install 하던가 다운로드 받아서 설치해야한다.
  2. 메뉴얼로 설치하기 위해서 Connect에 접속해서 connector plugin들이 설치된 위치를 확인한다.
  3. 현재 로컬 위치에는 /usr/share/java,/usr/share/confluent-hub-components,/data/connect-jars 이렇게 3개의 폴더로 설정 되어있다.
  4. 처음의 ‘/usr/share/java,/usr/share’ 이 폴더에 위에서 다운받은 폴더를 복사한다.
  5. connecte를 재기동한다.
  6. 혹시 mysql driver 를 찾을수 없다고 나오면
    Maven Repository: mysql » mysql-connector-java » 8.0.27 여기서 driver를 다운받아서
    해당 connector에 lib 폴더밑에 복사하고 재기동 하면 된다.

 

Configuration

{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"transforms.TimestampConverter.target.type": "Timestamp",
"connection.password": "*****",
"transforms.TimestampConverter.field": "update_date",
"tasks.max": "1",
"transforms": "unwrap, route, TimestampConverter",
"tombstones.on.delete": "true",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"schema.registry.url": "http://schema-registry:8081",
"transforms.route.regex": "([^.]+)\.([^.]+)\.([^.]+)",
"auto.evolve": "false",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"insert.mode": "upsert",
"transforms.route.replacement": "$3",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"table.name.format": "${topic}",
"transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss",
"topics.regex": "postgre_source.public.(.)",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schemas.enable": "false",
"delete.enabled": "true",
"connection.user": "root",
"value.converter.schemas.enable": "true",
"name": "My Sql Sink Connector2",
"auto.create": "true",
"connection.url": "jdbc:mysql://172.18.0.3:3306/customerdb?user=root&password=root",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"pk.mode": "record_key"
}