Connector

The connector takes advantage of the abstraction provided from Hadoop Common using the implementation of the org.apache.hadoop.fs.FileSystem class. So, it’s possible to use a wide variety of FS or if your FS is not included in the Hadoop Common API you can implement an extension of this abstraction and using it in a transparent way.

Among others, these are some file systems it supports:

  • HDFS.
  • S3.
  • Google Cloud Storage.
  • Azure Blob Storage & Azure Data Lake Store.
  • FTP & SFTP.
  • WebHDFS.
  • Local File System.
  • Hadoop Archive File System.

Getting started

Prerequisites

  • Apache Kafka 2.6.0.
  • Java 8.
  • Confluent Schema Registry (recommended).

Building from source

mvn clean package

General config

The kafka-connect-fs.properties file defines the following properties as required:

name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=file:///data,hdfs://localhost:8020/data
topic=mytopic
policy.class=<Policy class>
policy.recursive=true
policy.regexp=.*
policy.batch_size=0
policy.cleanup=none
file_reader.class=<File reader class>
file_reader.batch_size=0
  1. The connector name.
  2. Class indicating the connector.
  3. Number of tasks the connector is allowed to start.
  4. Comma-separated URIs of the FS(s). They can be URIs pointing out directly to a file or a directory in the FS. These URIs can also be dynamic by using expressions for modifying them in runtime.
  5. Topic in which copy data from the FS.
  6. Policy class to apply (must implement com.github.mmolimar.kafka.connect.fs.policy.Policy interface).
  7. Flag to activate traversed recursion in subdirectories when listing files.
  8. Regular expression to filter files from the FS.
  9. Number of files that should be handled at a time. Non-positive values disable batching.
  10. Cleanup strategy to manage processed files.
  11. File reader class to read files from the FS (must implement com.github.mmolimar.kafka.connect.fs.file.reader.FileReader interface).
  12. Number of records to process at a time. Non-positive values disable batching.

A more detailed information about these properties can be found here.

Running in local

export KAFKA_HOME=/path/to/kafka/install/dir
mvn clean package
export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties

Running in Docker

mvn clean package
docker build --build-arg PROJECT_VERSION=<VERSION> .
docker-compose build
docker-compose up -d
docker logs --tail="all" -f connect
curl -sX GET http://localhost:8083/connector-plugins | grep FsSourceConnector

Components

There are two main concepts to decouple concerns within the connector. They are policies and file readers, described below.

Policies

In order to ingest data from the FS(s), the connector needs a policy to define the rules to do it.

Basically, the policy tries to connect to each FS included in the fs.uris connector property, lists files (and filter them using the regular expression provided in the policy.regexp property) and enables a file reader to read records.

The policy to be used by the connector is defined in the policy.class connector property.

Important

When delivering records from the connector to Kafka, they contain their own file offset so, if in the next eventual policy execution this file is processed again, the policy will seek the file to this offset and process the next records if any (if the offset was committed).

Note

If the URIs included in the fs.uris connector property contain any expression of the form ${XXX}, this dynamic URI is built in the moment of the policy execution.

Currently, there are few policies to support some use cases but, for sure, you can develop your own one if the existing policies don’t fit your needs. The only restriction is that you must implement the interface com.github.mmolimar.kafka.connect.fs.policy.Policy.

Simple

It’s a policy which just filters and processes files included in the corresponding URIs one time.

Attention

This policy is more oriented for testing purposes.

Sleepy

The behaviour of this policy is similar to Simple policy but on each execution it sleeps and wait for the next one. Additionally, its custom properties allow to end it.

You can learn more about the properties of this policy here.

Cron

This policy is scheduled based on cron expressions and their format to put in the configuration are based on the library Quartz Scheduler.

After finishing each execution, the policy gets slept until the next one is scheduled, if applicable.

You can learn more about the properties of this policy here.

HDFS file watcher

It uses Hadoop notifications events and all create/append/rename/close events will be reported as files to be ingested.

Just use it when you have HDFS URIs.

You can learn more about the properties of this policy here.

Attention

The URIs included in the general property fs.uris will be filtered and only those ones which start with the prefix hdfs:// will be watched. Also, this policy will only work for Hadoop versions 2.6.0 or higher.

S3 event notifications

It uses S3 event notifications sent from S3 to process files which have been created or modified in S3. These notifications will be read from a AWS-SQS queue and they can be sent to SQS directly from S3 or via AWS-SNS, either as a SNS notification or a raw message in the subscription.

Just use it when you have S3 URIs and the event notifications in the S3 bucket must be enabled to a SNS topic or a SQS queue.

You can learn more about the properties of this policy here.

File readers

They read files and process each record from the FS. The file reader is needed by the policy to enable the connector to process each record and includes in the implementation how to seek and iterate over the records within the file.

The file reader to be used when processing files is defined in the file_reader.class connector property.

In the same way as policies, the connector provides several sort of readers to parse and read records for different file formats. If you don’t have a file reader that fits your needs, just implement one with the unique restriction that it must implement the interface com.github.mmolimar.kafka.connect.fs.file.reader.FileReader.

The are several file readers included which can read the following file formats:

  • Parquet.
  • Avro.
  • ORC.
  • SequenceFile.
  • Cobol / EBCDIC.
  • Other binary files.
  • CSV.
  • TSV.
  • Fixed-width.
  • JSON.
  • XML.
  • YAML.
  • Text.

Parquet

Reads files with Parquet format.

The reader takes advantage of the Parquet-Avro API and uses the Parquet file as if it was an Avro file, so the message sent to Kafka is built in the same way as the Avro file reader does.

More information about properties of this file reader here.

Avro

Files with Avro format can be read with this reader.

The Avro schema is not needed due to is read from the file. The message sent to Kafka is created by transforming the record by means of Confluent avro-converter API.

More information about properties of this file reader here.

ORC

ORC files are a self-describing type-aware columnar file format designed for Hadoop workloads.

This reader can process this file format, translating its schema and building a Kafka message with the content.

Warning

If you have ORC files with union data types, this sort of data types will be transformed in a map object in the Kafka message. The value of each key will be fieldN, where N represents the index within the data type.

More information about properties of this file reader here.

SequenceFile

Sequence files are one kind of the Hadoop file formats which are serialized in key-value pairs.

This reader can process this file format and build a Kafka message with the key-value pair. These two values are named key and value in the message by default but you can customize these field names.

More information about properties of this file reader here.

Cobol

Mainframe files (Cobol / EBCDIC binary files) can be processed with this reader which uses the Cobrix parser.

By means of the corresponding copybook -representing its schema-, it parses each record and translate it into a Kafka message with the schema.

More information about properties of this file reader here.

Binary

All other kind of binary files can be ingested using this reader.

It just extracts the content plus some metadata such as: path, file owner, file group, length, access time, and modification time.

Each message will contain the following schema:

  • path: File path (string).
  • owner: Owner of the file. (string).
  • group: Group associated with the file. (string).
  • length: Length of this file, in bytes. (long).
  • access_time: Access time of the file. (long).
  • modification_time: Modification time of the file (long).
  • content: Content of the file (bytes).

More information about properties of this file reader here.

CSV

CSV file reader using a custom token to distinguish different columns in each line.

It allows to distinguish a header in the files and set the name of their columns in the message sent to Kafka. If there is no header, the value of each column will be in the field named column_N (N represents the column index) in the message. Also, the token delimiter for columns is configurable.

This reader is based on the Univocity CSV parser.

More information about properties of this file reader here.

TSV

TSV file reader using a tab \t to distinguish different columns in each line.

Its behaviour is the same one for the CSV file reader regarding the header and the column names.

This reader is based on the Univocity TSV parser.

More information about properties of this file reader here.

FixedWidth

FixedWidth is a plain text file reader which distinguishes each column based on the length of each field.

Its behaviour is the same one for the CSV / TSV file readers regarding the header and the column names.

This reader is based on the Univocity Fixed-Width parser.

More information about properties of this file reader here.

JSON

Reads JSON files which might contain multiple number of fields with their specified data types. The schema for this sort of records is inferred reading the first record and marked as optional in the schema all the fields contained.

More information about properties of this file reader here.

XML

Reads XML files which might contain multiple number of fields with their specified data types. The schema for this sort of records is inferred reading the first record and marked as optional in the schema all the fields contained.

Warning

Take into account the current limitations.

More information about properties of this file reader here.

YAML

Reads YAML files which might contain multiple number of fields with their specified data types. The schema for this sort of records is inferred reading the first record and marked as optional in the schema all the fields contained.

More information about properties of this file reader here.

Text

Reads plain text files.

Each line represents one record (by default) which will be in a field named value in the message sent to Kafka by default but you can customize these field names.

More information about properties of this file reader here.

Agnostic

Actually, this reader is a wrapper of the readers listing above.

It tries to read any kind of file format using an internal reader based on the file extension, applying the proper one (Parquet, Avro, ORC, SequenceFile, Cobol / EBCDIC, CSV, TSV, FixedWidth, JSON, XML, YAML, or Text). In case of no extension has been matched, the Text file reader will be applied.

Default extensions for each format (configurable):

  • Parquet: .parquet
  • Avro: .avro
  • ORC: .orc
  • SequenceFile: .seq
  • Cobol / EBCDIC: .dat
  • Other binary files: .bin
  • CSV: .csv
  • TSV: .tsv
  • FixedWidth: .fixed
  • JSON: .json
  • XML: .xml
  • YAML: .yaml
  • Text: any other sort of file extension.

More information about properties of this file reader here.