본문 바로가기

카테고리 없음

Kafka Connector JDBC PostgreSQL

설정파일 생

url: http://localhost:8083/connectors

method : post

Content-Type: application/json

{
    "name": "postgres-source",
    "config":

      "connector.class""io.confluent.connect.jdbc.JdbcSourceConnector",
      "tasks.max""1",      
      "connection.url""jdbc:postgresql://172.19.0.4:5432/daekyo?user=user&password=user!234",
      "mode""timestamp"
      "timestamp.column.name""updated_at",
      "db.timezone""Asia/Seoul",
      "topic.prefix""",
      "transforms""createKey,extractInt",    
      "transforms.createKey.type""org.apache.kafka.connect.transforms.ValueToKey",
      "transforms.createKey.fields""id",
      "transforms.extractInt.type""org.apache.kafka.connect.transforms.ExtractField$Key",
      "transforms.extractInt.field""id" ,
      "poll.interval.ms""1000",
      "validate.non.null""false",
      "key.converter.schemas.enable""false",    
      "value.converter":"org.apache.kafka.connect.json.JsonConverter",
      "value.converter.schemas.enable"false,
      "key.converter":"org.apache.kafka.connect.json.JsonConverter",
      "key.converter.schemas.enable"false,
      "table.whitelist" : "public.account"


 }

}

수정

http://localhost:8083/connectors/postgres-source/config

{
      "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
      "tasks.max": "1",      
      "connection.url": "jdbc:postgresql://192.168.0.31:5432/daekyo?user=user&password=user!234",
      "mode": "incrementing",
      "incrementing.column.name": "id",
      "topic.prefix": "connect_"      
    }