Connector¶
The connector takes advantage of the abstraction provided from Hadoop Common
using the implementation of the org.apache.hadoop.fs.FileSystem
class. So, it’s possible to use a
wide variety of FS or if your FS is not included in the Hadoop Common API you can implement an extension
of this abstraction and using it in a transparent way.
Among others, these are some file systems it supports:
- HDFS.
- S3.
- Google Cloud Storage.
- Azure Blob Storage & Azure Data Lake Store.
- FTP & SFTP.
- WebHDFS.
- Local File System.
- Hadoop Archive File System.
Getting started¶
Prerequisites¶
- Apache Kafka 2.6.0.
- Java 8.
- Confluent Schema Registry (recommended).
Building from source¶
mvn clean package
General config¶
The kafka-connect-fs.properties
file defines the following properties as required:
name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=file:///data,hdfs://localhost:8020/data
topic=mytopic
policy.class=<Policy class>
policy.recursive=true
policy.regexp=.*
policy.batch_size=0
policy.cleanup=none
file_reader.class=<File reader class>
file_reader.batch_size=0
- The connector name.
- Class indicating the connector.
- Number of tasks the connector is allowed to start.
- Comma-separated URIs of the FS(s). They can be URIs pointing out directly to a file or a directory in the FS. These URIs can also be dynamic by using expressions for modifying them in runtime.
- Topic in which copy data from the FS.
- Policy class to apply (must implement
com.github.mmolimar.kafka.connect.fs.policy.Policy
interface). - Flag to activate traversed recursion in subdirectories when listing files.
- Regular expression to filter files from the FS.
- Number of files that should be handled at a time. Non-positive values disable batching.
- Cleanup strategy to manage processed files.
- File reader class to read files from the FS
(must implement
com.github.mmolimar.kafka.connect.fs.file.reader.FileReader
interface). - Number of records to process at a time. Non-positive values disable batching.
A more detailed information about these properties can be found here.
Running in local¶
export KAFKA_HOME=/path/to/kafka/install/dir
mvn clean package
export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
Running in Docker¶
mvn clean package
docker build --build-arg PROJECT_VERSION=<VERSION> .
docker-compose build
docker-compose up -d
docker logs --tail="all" -f connect
curl -sX GET http://localhost:8083/connector-plugins | grep FsSourceConnector
Components¶
There are two main concepts to decouple concerns within the connector. They are policies and file readers, described below.
Policies¶
In order to ingest data from the FS(s), the connector needs a policy to define the rules to do it.
Basically, the policy tries to connect to each FS included in the fs.uris
connector property, lists files
(and filter them using the regular expression provided in the policy.regexp
property) and enables
a file reader to read records.
The policy to be used by the connector is defined in the policy.class
connector property.
Important
When delivering records from the connector to Kafka, they contain their own file offset so, if in the next eventual policy execution this file is processed again, the policy will seek the file to this offset and process the next records if any (if the offset was committed).
Note
If the URIs included in the fs.uris
connector property contain any expression of the
form ${XXX}
, this dynamic URI is built in the moment of the policy execution.
Currently, there are few policies to support some use cases but, for sure, you can develop your own one
if the existing policies don’t fit your needs.
The only restriction is that you must implement the interface
com.github.mmolimar.kafka.connect.fs.policy.Policy
.
Simple¶
It’s a policy which just filters and processes files included in the corresponding URIs one time.
Attention
This policy is more oriented for testing purposes.
Sleepy¶
The behaviour of this policy is similar to Simple policy but on each execution it sleeps and wait for the next one. Additionally, its custom properties allow to end it.
You can learn more about the properties of this policy here.
Cron¶
This policy is scheduled based on cron expressions and their format to put in the configuration are based on the library Quartz Scheduler.
After finishing each execution, the policy gets slept until the next one is scheduled, if applicable.
You can learn more about the properties of this policy here.
HDFS file watcher¶
It uses Hadoop notifications events and all create/append/rename/close events will be reported as files to be ingested.
Just use it when you have HDFS URIs.
You can learn more about the properties of this policy here.
Attention
The URIs included in the general property fs.uris
will be filtered and only those
ones which start with the prefix hdfs://
will be watched. Also, this policy
will only work for Hadoop versions 2.6.0 or higher.
S3 event notifications¶
It uses S3 event notifications sent from S3 to process files which have been created or modified in S3. These notifications will be read from a AWS-SQS queue and they can be sent to SQS directly from S3 or via AWS-SNS, either as a SNS notification or a raw message in the subscription.
Just use it when you have S3 URIs and the event notifications in the S3 bucket must be enabled to a SNS topic or a SQS queue.
You can learn more about the properties of this policy here.
File readers¶
They read files and process each record from the FS. The file reader is needed by the policy to enable the connector to process each record and includes in the implementation how to seek and iterate over the records within the file.
The file reader to be used when processing files is defined in the file_reader.class
connector property.
In the same way as policies, the connector provides several sort of readers to parse and read records
for different file formats. If you don’t have a file reader that fits your needs, just implement one
with the unique restriction that it must implement the interface
com.github.mmolimar.kafka.connect.fs.file.reader.FileReader
.
The are several file readers included which can read the following file formats:
- Parquet.
- Avro.
- ORC.
- SequenceFile.
- Cobol / EBCDIC.
- Other binary files.
- CSV.
- TSV.
- Fixed-width.
- JSON.
- XML.
- YAML.
- Text.
Parquet¶
Reads files with Parquet format.
The reader takes advantage of the Parquet-Avro API and uses the Parquet file as if it was an Avro file, so the message sent to Kafka is built in the same way as the Avro file reader does.
More information about properties of this file reader here.
Avro¶
Files with Avro format can be read with this reader.
The Avro schema is not needed due to is read from the file. The message sent to Kafka is created by transforming the record by means of Confluent avro-converter API.
More information about properties of this file reader here.
ORC¶
ORC files are a self-describing type-aware columnar file format designed for Hadoop workloads.
This reader can process this file format, translating its schema and building a Kafka message with the content.
Warning
If you have ORC files with union
data types, this sort of
data types will be transformed in a map
object in the Kafka message.
The value of each key will be fieldN
, where N
represents
the index within the data type.
More information about properties of this file reader here.
SequenceFile¶
Sequence files are one kind of the Hadoop file formats which are serialized in key-value pairs.
This reader can process this file format and build a Kafka message with the
key-value pair. These two values are named key
and value
in the message
by default but you can customize these field names.
More information about properties of this file reader here.
Cobol¶
Mainframe files (Cobol / EBCDIC binary files) can be processed with this reader which uses the Cobrix parser.
By means of the corresponding copybook -representing its schema-, it parses each record and translate it into a Kafka message with the schema.
More information about properties of this file reader here.
Binary¶
All other kind of binary files can be ingested using this reader.
It just extracts the content plus some metadata such as: path, file owner, file group, length, access time, and modification time.
Each message will contain the following schema:
path
: File path (string).owner
: Owner of the file. (string).group
: Group associated with the file. (string).length
: Length of this file, in bytes. (long).access_time
: Access time of the file. (long).modification_time
: Modification time of the file (long).content
: Content of the file (bytes).
More information about properties of this file reader here.
CSV¶
CSV file reader using a custom token to distinguish different columns in each line.
It allows to distinguish a header in the files and set the name of their columns
in the message sent to Kafka. If there is no header, the value of each column will be in
the field named column_N
(N represents the column index) in the message.
Also, the token delimiter for columns is configurable.
This reader is based on the Univocity CSV parser.
More information about properties of this file reader here.
TSV¶
TSV file reader using a tab \t
to distinguish different columns in each line.
Its behaviour is the same one for the CSV file reader regarding the header and the column names.
This reader is based on the Univocity TSV parser.
More information about properties of this file reader here.
FixedWidth¶
FixedWidth is a plain text file reader which distinguishes each column based on the length of each field.
Its behaviour is the same one for the CSV / TSV file readers regarding the header and the column names.
This reader is based on the Univocity Fixed-Width parser.
More information about properties of this file reader here.
JSON¶
Reads JSON files which might contain multiple number of fields with their specified data types. The schema for this sort of records is inferred reading the first record and marked as optional in the schema all the fields contained.
More information about properties of this file reader here.
XML¶
Reads XML files which might contain multiple number of fields with their specified data types. The schema for this sort of records is inferred reading the first record and marked as optional in the schema all the fields contained.
Warning
Take into account the current limitations.
More information about properties of this file reader here.
YAML¶
Reads YAML files which might contain multiple number of fields with their specified data types. The schema for this sort of records is inferred reading the first record and marked as optional in the schema all the fields contained.
More information about properties of this file reader here.
Text¶
Reads plain text files.
Each line represents one record (by default) which will be in a field
named value
in the message sent to Kafka by default but you can
customize these field names.
More information about properties of this file reader here.
Agnostic¶
Actually, this reader is a wrapper of the readers listing above.
It tries to read any kind of file format using an internal reader based on the file extension, applying the proper one (Parquet, Avro, ORC, SequenceFile, Cobol / EBCDIC, CSV, TSV, FixedWidth, JSON, XML, YAML, or Text). In case of no extension has been matched, the Text file reader will be applied.
Default extensions for each format (configurable):
- Parquet:
.parquet
- Avro:
.avro
- ORC:
.orc
- SequenceFile:
.seq
- Cobol / EBCDIC:
.dat
- Other binary files:
.bin
- CSV:
.csv
- TSV:
.tsv
- FixedWidth:
.fixed
- JSON:
.json
- XML:
.xml
- YAML:
.yaml
- Text: any other sort of file extension.
More information about properties of this file reader here.