Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
609 views
in Technique[技术] by (71.8m points)

jdbc - Kafka JDBC Sink连接器,批量插入值(Kafka JDBC Sink Connector, insert values in batches)

I receive a lot of the messages (by http-protocol) per second (50000 - 100000) and want to save them to PostgreSql.

(我每秒(50000-100000)收到很多消息(通过http协议),并希望将它们保存到PostgreSql。)

I decided to use Kafka JDBC Sink for this purpose.

(我决定为此目的使用Kafka JDBC Sink。)

The messages are saved to database by one record, not in batches.

(消息通过一个记录而不是成批保存到数据库中。)

I want to insert records in PostgreSQL in batches with size 500-1000 records.

(我想以500-1000条记录的大小批量在PostgreSQL中插入记录。)

I found some answers on this problem in issue: How to use batch.size?

(我发现了有关此问题的一些答案: 如何使用batch.size?)

I tried to use related options in configuration, but it seems that they no have any effect.

(我尝试在配置中使用相关选项,但似乎它们没有任何作用。)

My Kafka JDBC Sink PostgreSql configuration ( etc/kafka-connect-jdbc/postgres.properties ):

(我的Kafka JDBC Sink PostgreSql配置( etc/kafka-connect-jdbc/postgres.properties ):)

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=3

# The topics to consume from - required for sink connectors like this one
topics=jsonb_pkgs

connection.url=jdbc:postgresql://localhost:5432/test?currentSchema=test
auto.create=false
auto.evolve=false

insert.mode=insert
connection.user=postgres
table.name.format=${topic}

connection.password=pwd

batch.size=500
# based on 500*3000byte message size
fetch.min.bytes=1500000
fetch.wait.max.ms=1500
max.poll.records=4000

I also added options to connect-distributed.properties :

(我还向connect-distributed.properties添加了选项:)

consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500

Although each a partition gets more than 1000 records per second, records are saved to PostgreSQL by one.

(尽管每个分区每秒可获取1000条以上的记录,但记录会被一个保存到PostgreSQL。)

Edit: consumer options were added in other file with correct names

(编辑:消费者选项已添加到其他文件中具有正确的名称)

I also added options to etc/schema-registry/connect-avro-standalone.properties :

(我还向etc/schema-registry/connect-avro-standalone.properties添加了选项:)

# based on 500*3000 byte message size
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000
  ask by Mariya translate from so

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

I realised that I misunderstood the documentation.

(我意识到我误解了文档。)

The records are inserted in database one by one.

(记录被一一插入到数据库中。)

The count of the records inserted in one transaction depends on batch.size and consumer.max.poll.records .

(在一个事务中插入的记录数取决于batch.sizeconsumer.max.poll.records 。)

I expected that the batch insert was implemented the other way.

(我希望批量插入是通过其他方式实现的。)

I would like to have an option to insert records like this:

(我想有一个插入这样的记录的选项:)

INSERT INTO table1 (First, Last)
VALUES
    ('Fred', 'Smith'),
    ('John', 'Smith'),
    ('Michael', 'Smith'),
    ('Robert', 'Smith');

But that seems impossible.

(但这似乎是不可能的。)


与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...