How to Download io.confluent.connect.jdbc.jdbcsourceconnector
If you want to import data from any relational database with a JDBC driver into an Apache Kafka topic, you can use the Kafka Connect JDBC Source connector. This connector can support a wide variety of databases and query modes, and it can run in standalone or distributed mode. In this article, we will show you how to download and configure the connector, and how to use it to stream data from a database into Kafka.
download io.confluent.connect.jdbc.jdbcsourceconnector
What is io.confluent.connect.jdbc.jdbcsourceconnector and what does it do?
The io.confluent.connect.jdbc.jdbcsourceconnector is a Java class that implements the SourceConnector interface of Kafka Connect. It is part of the Confluent Platform, which is a distribution of Apache Kafka that provides additional features and integrations. The connector allows you to periodically execute a SQL query against a database and create an output record for each row in the result set. The output records are then sent to a Kafka topic, where they can be consumed by other applications or connectors.
What are the benefits and use cases of using it?
Using the JDBC source connector has several benefits and use cases, such as:
It simplifies data integration between databases and Kafka, as you don't need to write any custom code or use any intermediate tools.
It supports incremental query modes, which means that it can track which rows have been processed and which rows are new or updated, and only fetch the relevant data from the database.
It enables you to stream data from multiple tables or schemas, using whitelists or blacklists, or custom queries.
It enables you to leverage the scalability and fault-tolerance of Kafka Connect, as you can run the connector in standalone or distributed mode, depending on your needs.
It enables you to use Kafka as a central hub for data pipelines, streaming analytics, data integration, and mission-critical applications.
Prerequisites
Before you can download and configure the JDBC source connector, you need to have the following prerequisites:
A relational database with a JDBC driver. For this article, we will use MySQL as an example, but you can use any database that has a compatible driver.
A Kafka cluster with at least one broker and one ZooKeeper node. You can use Confluent Platform or Apache Kafka to set up your cluster.
Kafka Connect, which is included in Confluent Platform or Apache Kafka. You need to have at least one Kafka Connect worker node running.
The JDBC driver for your database. You need to download the driver JAR file and place it in a directory that is accessible by Kafka Connect.
How to install Kafka Connect and JDBC driver?
To install Kafka Connect, you can follow the instructions from , depending on which distribution you are using. You can run Kafka Connect in standalone mode (for development and testing) or distributed mode (for production).
To install the JDBC driver for your database, you can follow these steps:
Download the driver JAR file from your database vendor's website. For example, for MySQL, you can download it from .
Create a directory for the driver JAR file on your Kafka Connect worker node. For example, /usr/share/java/kafka-connect-jdbc.
Copy the driver JAR file to the directory you created.
<li Steps to download and configure the connector
Now that you have the prerequisites ready, you can proceed to download and configure the JDBC source connector. You can download the connector from Confluent Hub or Maven repository, depending on your preference.
How to download the connector from Confluent Hub or Maven repository?
To download the connector from Confluent Hub, you can follow these steps:
Go to and search for "JDBC Source Connector".
Select the latest version of the connector and click on "Download".
Extract the ZIP file to a directory of your choice. For example, /usr/share/confluent-hub-components.
Add the directory to the plugin.path property in your Kafka Connect worker configuration file. For example, plugin.path=/usr/share/confluent-hub-components.
Restart your Kafka Connect worker node.
To download the connector from Maven repository, you can follow these steps:
Go to and search for "io.confluent.connect.jdbc.jdbcsourceconnector".
Select the latest version of the connector and click on "Download JAR".
Create a directory for the connector JAR file on your Kafka Connect worker node. For example, /usr/share/java/kafka-connect-jdbc.
Copy the connector JAR file to the directory you created.
Add the directory to the plugin.path property in your Kafka Connect worker configuration file. For example, plugin.path=/usr/share/java/kafka-connect-jdbc.
Restart your Kafka Connect worker node.
How to create a configuration file for the connector?
To create a configuration file for the connector, you need to specify some mandatory and optional properties. The mandatory properties are:
How to download io.confluent.connect.jdbc.jdbcsourceconnector for Kafka Connect
Download io.confluent.connect.jdbc.jdbcsourceconnector and import data from any relational database
Download io.confluent.connect.jdbc.jdbcsourceconnector and configure JDBC source connector properties
Download io.confluent.connect.jdbc.jdbcsourceconnector and use catalog pattern to fetch table metadata
Download io.confluent.connect.jdbc.jdbcsourceconnector and use table whitelist or blacklist to filter tables
Download io.confluent.connect.jdbc.jdbcsourceconnector and use schema pattern to fetch table metadata
Download io.confluent.connect.jdbc.jdbcsourceconnector and use query mode to execute custom SQL queries
Download io.confluent.connect.jdbc.jdbcsourceconnector and use bulk mode to improve performance
Download io.confluent.connect.jdbc.jdbcsourceconnector and use incremental mode to capture changes
Download io.confluent.connect.jdbc.jdbcsourceconnector and use timestamp mode to track updates
Download io.confluent.connect.jdbc.jdbcsourceconnector and use timestamp+incrementing mode to track updates and inserts
Download io.confluent.connect.jdbc.jdbcsourceconnector and use incrementing mode to track inserts
Download io.confluent.connect.jdbc.jdbcsourceconnector and use query-based timestamp mode to track updates with custom queries
Download io.confluent.connect.jdbc.jdbcsourceconnector and use query-based timestamp+incrementing mode to track updates and inserts with custom queries
Download io.confluent.connect.jdbc.jdbcsourceconnector and use query-based incrementing mode to track inserts with custom queries
Download io.confluent.connect.jdbc.jdbcsourceconnector and use topic prefix to customize topic names
Download io.confluent.connect.jdbc.jdbcsourceconnector and use poll interval to control frequency of queries
Download io.confluent.connect.jdbc.jdbcsourceconnector and use batch size to control number of records per query
Download io.confluent.connect.jdbc.jdbcsourceconnector and use table types to specify types of tables to include
Download io.confluent.connect.jdbc.jdbcsourceconnector and use numeric mapping to specify how numeric values are represented in Kafka records
Download io.confluent.connect.jdbc.JdbcSourceConnector jar file from Confluent Maven repository
Download io.confluent.connect.jdbc.JdbcSourceConnector jar file from Confluent Hub client
Download io.confluent.connect.JdbcSourceConnector jar file from GitHub releases page
Download io.confluent.connect.JdbcSourceConnector jar file from Confluent Platform download page
Download io.confluent.connect.JdbcSourceConnector jar file from Confluent Cloud UI
Install io.confluent.connect.JdbcSourceConnector plugin on Kafka Connect cluster using Confluent CLI
Install io.confluent.connect.JdbcSourceConnector plugin on Kafka Connect cluster using REST API
Install io.confluent.connect.JdbcSourceConnector plugin on Kafka Connect cluster using Docker image
Install io.confluent.connect.JdbcSourceConnector plugin on Kafka Connect cluster using Helm chart
Install io.confluent.connect.JdbcSourceConnector plugin on Kafka Connect cluster using Ansible playbook
Deploy io.confluent.connect.JdbcSourceConnector on Confluent Cloud using Confluent Cloud UI
Deploy io.confluent.connect.JdbcSourceConnector on Confluent Cloud using Confluent CLI
Deploy io.confluent.connect.JdbcSourceConnector on Confluent Cloud using ccloud command-line tool
Deploy io.confluent.connect.JdbcSourceConnector on Confluent Cloud using Terraform provider
Deploy io.confluent.connect.JdbcSourceConnector on Confluent Cloud using Kubernetes Operator
Monitor io.confluent.connect.JdbcSourceConnector using Confluent Control Center UI
Monitor io.confluent.connect.JdbcSourceConnector using Confluent CLI
Monitor io.confluent.connect.JdbcSourceConnector using REST API
Monitor io.confluent.connect.JdbcSourceConnector using JMX metrics
Monitor io.confluent.connect.JdbcSourceConnector using Prometheus and Grafana dashboards
Troubleshoot io.confluent.connect.JdbcSourceConnector using Confluent Control Center UI
Troubleshoot io.confluent.connect.JdbcSourceConnector using Confluent CLI
Troubleshoot io.confluent.connect.JdbcSourceConnector using REST API
Troubleshoot io.confluent.connect.JdbcSourceConnector using Kafka Connect logs
Troubleshoot io.confluent.connect.JdbcSourceConnector using Kafka Connect status endpoint
Troubleshoot io.confluent.connect.JdbcSourceConnector using Kafka Connect error handling features
Troubleshoot io.confluent.connect.JdbcSourceConnector using Kafka Connect dead letter queue topic
Troubleshoot io.confluent.connect.JdbcSourceConnector using Kafka Connect retry policies and maximum retries
name: A unique name for the connector instance.
connector.class: The name of the Java class for the connector. In this case, it is io.confluent.connect.jdbc.JdbcSourceConnector.
tasks.max: The maximum number of tasks that the connector can run in parallel.
connection.url: The JDBC connection URL for your database. You need to include the database name, host, port, and any other parameters required by your driver.
mode: The query mode for the connector. You can choose from bulk, incrementing, timestamp, or timestamp+incrementing modes, depending on how you want to fetch data from your database.
topic.prefix: The prefix to use for the names of the Kafka topics where the data will be sent. The connector will append the table name or query name to the prefix to form the full topic name.
The optional properties are:
table.whitelist: A comma-separated list of tables to include in copying. If specified, only these tables will be processed.
table.blacklist: A comma-separated list of tables to exclude from copying. If specified, these tables will be skipped.
query: A custom SQL query to execute and copy data from. If specified, this query will override any table.whitelist or table.blacklist settings.
incrementing.column.name: The name of a column in your database that is monotonically increasing and unique for each row. This column will be used to track which rows have been processed and which rows are new or updated. This property is required if you use incrementing or timestamp+incrementing modes.
timestamp.column.name: The name of a column in your database that contains a timestamp value that indicates when each row was last modified. This column will be used to track which rows have been processed and which rows are new or updated. This property is required if you use timestamp or timestamp+incrementing modes.
validate.non.null: A boolean value that indicates whether the connector should validate that all primary key columns are not null in each row. If set to true, the connector will fail if any primary key column is null. If set to false, the connector will ignore null values and use them as part of the key. The default value is true.
batch.max.rows: The maximum number of rows to include in each batch when polling for new data. This setting can help reduce memory usage and network traffic. The default value is 100.
poll.interval.ms: The frequency in milliseconds to poll for new or updated data. This setting can affect the latency and throughput of the connector. The default value is 5000.
table.types: A comma-separated list of table types to include in copying. The supported types are TABLE, VIEW, SYSTEM TABLE, GLOBAL TEMPORARY, LOCAL TEMPORARY, ALIAS, and SYNONYM. By default, only TABLE is included.
numeric.mapping: A mapping of numeric types from the database to Kafka Connect types. The supported mappings are best_fit (the default), precision_only, and none.
dialect.name: The name of the database dialect that the connector should use. By default, the connector will automatically detect the dialect based on the JDBC connection URL. You can also specify a custom dialect class name if you have implemented one.
quote.sql.identifiers: A setting that controls how the connector quotes identifiers such as table and column names in SQL statements. The supported values are always (the default), never, or auto (only when needed).
timestamp.delay.interval.ms: The delay in milliseconds to allow between the current time and the latest time that is used for querying timestamp-based tables. This setting can help avoid missing data that has not been committed to the database yet. The default value is 0.
db.timezone: The name of the Java timezone that the connector should use to interpret date and time values from the database. By default, the connector will use UTC. You can also specify a custom timezone ID if you have one.
Here is an example of a configuration file for the JDBC source connector:
"name": "jdbc-source-connector", "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://localhost:3306/testdb?user=root&password=root", "mode": "incrementing", "incrementing.column.name": "id", "topic.prefix": "test-", "table.whitelist": "customers,orders"
How to specify the connection properties, query modes, and topic names?
To specify the connection properties, query modes, and topic names for the connector, you need to use the following parameters in your configuration file:
Parameter
Description
Example
connection.url
The JDBC connection URL for your database. You need to include the database name, host, port, and any other parameters required by your driver.
"connection.url": "jdbc:mysql://localhost:3306/testdb?user=root&password=root"
mode
The query mode for the connector. You can choose from bulk, incrementing, timestamp, or timestamp+incrementing modes, depending on how you want to fetch data from your database.
"mode": "incrementing"
topic.prefix
The prefix to use for the names of the Kafka topics where the data will be sent. The connector will append the table name or query name to the prefix to form the full topic name.
"topic.prefix": "test-"
The connection.url parameter is mandatory and it depends on your database type and driver. You can find some examples of connection URLs for different databases .
The mode parameter is also mandatory and it determines how the connector queries data from your database. There are four possible modes:
Bulk mode: In this mode, the connector will perform a full table scan for each table and send all rows to Kafka. This mode is suitable for tables that do not have any primary key or timestamp columns, or for tables that do not change frequently.
Incrementing mode: In this mode, the connector will use a monotonically increasing column (such as an auto-incrementing ID) to track which rows have been processed and which rows are new or updated. This mode is suitable for tables that have a primary key column that is always increasing.
Timestamp mode: In this mode, the connector will use a timestamp column (such as a last modified date) to track which rows have been processed and which rows are new or updated. This mode is suitable for tables that have a timestamp column that is updated whenever a row is inserted or modified.
Timestamp+incrementing mode: In this mode, the connector will use a combination of a timestamp column and an incrementing column to track which rows have been processed and which rows are new or updated. This mode is suitable for tables that have both a timestamp column and a primary key column, and where the timestamp column may not be updated for some rows.
The topic.prefix parameter is also mandatory and it defines the common prefix for all the Kafka topics that the connector will create. The connector will append the table name or the query name to the prefix to form the full topic name. For example, if the prefix is "test-" and the table name is "customers", the topic name will be "test-customers".
How to run the connector in standalone or distributed mode?
To run the connector in standalone or distributed mode, you need to use different commands and configuration files. In standalone mode, you can run the connector on a single Kafka Connect worker node, using a single configuration file that contains both the worker and the connector properties. In distributed mode, you can run the connector on multiple Kafka Connect worker nodes, using separate configuration files for the worker and the connector properties. You also need to use a REST API or a command-line tool to submit the connector configuration to the Kafka Connect cluster.
To run the connector in standalone mode, you can follow these steps:
Create a configuration file that contains both the worker and the connector properties. For example, jdbc-source-connector-standalone.properties.
Run the following command from your Kafka Connect installation directory:
bin/connect-standalone.sh config/connect-standalone.properties jdbc-source-connector-standalone.properties
Check the logs and the Kafka topics to verify that the connector is running and sending data.
To run the connector in distributed mode, you can follow these steps:
Create a configuration file that contains only the connector properties. For example, jdbc-source-connector-distributed.json.
Run the following command from your Kafka Connect installation directory to start a Kafka Connect worker node in distributed mode:
bin/connect-distributed.sh config/connect-distributed.properties
Run the following command from another terminal to submit the connector configuration to the Kafka Connect cluster using the REST API:
curl -X POST -H "Content-Type: application/json" --data @jdbc-source-connector-distributed.json
Check the logs and the Kafka topics to verify that the connector is running and sending data.
Conclusion
In this article, we have shown you how to download and configure the JDBC source connector, and how to use it to stream data from a database into Kafka. We have also explained some of the benefits and use cases of using this connector, as well as some of the parameters and modes that you can use to customize its behavior. We hope that this article has helped you understand how to use this connector effectively and efficiently.
Here are some tips and best practices for using the JDBC source connector:
Choose an appropriate query mode for your database and data. For example, if your database supports triggers or change data capture (CDC), you can use timestamp or timestamp+incrementing modes to fetch only new or updated data. If your database does not support these features, you can use bulk or incrementing modes to fetch all data periodically.
Use whitelists or blacklists to filter out unwanted tables or schemas. For example, if you only want to copy data from certain tables, you can use table.whitelist to specify them. If you want to exclude some tables, you can use table.blacklist to specify them.
Use custom queries to transform or enrich data before sending it to Kafka. For example, if you want to join data from multiple tables, or add some computed columns, or filter out some rows, you can use query to specify a custom SQL query that does these operations.
Tune your batch size and poll interval according to your data volume and latency requirements. For example, if you have a large amount of data or low latency requirements, you can use a smaller batch size and a shorter poll interval to fetch data more frequently. If you have a small amount of data or high latency requirements, you can use a larger batch size and a longer poll interval to fetch data less frequently.
Monitor your connector performance and status using metrics and logs. For example, you can use JMX metrics or Confluent Control Center to monitor metrics such as throughput, error rate, lag, and offset. You can also use the Kafka Connect REST API or the Confluent CLI to check the status, configuration, and tasks of your connector. You can also use the logs to troubleshoot any issues or errors that may occur.
Here are some links for further reading and learning about the JDBC source connector:
FAQs
Here are some frequently asked questions about the JDBC source connector:
How can I update or delete data from Kafka using the JDBC source connector?
The JDBC source connector does not support updating or deleting data from Kafka. It only supports inserting new or updated data from the database to Kafka. If you want to update or delete data from Kafka, you need to use another connector or tool that supports these operations, such as the Kafka Connect SMTs, KSQL, or Kafka Streams.
How can I handle schema changes in the database using the JDBC source connector?
The JDBC source connector can handle schema changes in the database automatically, as long as the changes do not affect the primary key or timestamp columns that are used for query modes. The connector will detect the schema changes and update the schema of the output records accordingly. However, if the schema changes affect the primary key or timestamp columns, you need to restart the connector with a new configuration file that reflects the changes.
How can I secure the connection between the JDBC source connector and the database using SSL?
To secure the connection between the JDBC source connector and the database using SSL, you need to enable SSL on your database server and configure your JDBC driver accordingly. You also need to provide the SSL certificates and truststore files to your Kafka Connect worker node and specify them in your connection.url property. For example, for MySQL, you can use these parameters:
"connection.url": "jdbc:mysql://localhost:3306/testdb?user=root&password=root&useSSL=true&requireSSL=true&verifyServerCertificate=true&trustCertificateKeyStoreUrl=file:///path/to/truststore.jks&trustCertificateKeyStorePassword=truststorepassword"
How can I filter or transform data before sending it to Kafka using the JDBC source connector?
To filter or transform data before sending it to Kafka using the JDBC source connector, you can use one of these options:
Use a custom SQL query in your query property that performs the filtering or transformation operations on your database side.
Use a Kafka Connect Single Message Transform (SMT) in your transforms property that performs the filtering or transformation operations on your Kafka Connect side.
Use a KSQL or Kafka Streams application that consumes data from your Kafka topic and performs the filtering or transformation operations on your stream processing side.
How can I join data from multiple tables using the JDBC source connector?
To join data from multiple tables using the JDBC source connector, you can use one of these options:
Use a custom SQL query in your query property that performs the join operation on your database side.
Use multiple instances of the JDBC source connector, one for each table, and send data to different Kafka topics. Then use a KSQL or Kafka Streams application that consumes data from these topics and performs the join operation on your stream processing side.
44f88ac181
Comments