Kafka Connect FileSystem Connector¶
Kafka Connect FileSystem Connector is a source connector for reading records from files in the file systems specified and load them into Kafka.
The connector supports:
- Several sort of File Systems (FS) to use.
- Dynamic and static URIs to ingest data from.
- Policies to define rules about how to look for files and clean them up after processing.
- File readers to parse and read different kind of file formats.
To learn more about the connector you can read this section and for more detailed configuration options you can read this other one.
Also, you can download the source code from here.
Contents¶
Connector¶
The connector takes advantage of the abstraction provided from Hadoop Common
using the implementation of the org.apache.hadoop.fs.FileSystem
class. So, it’s possible to use a
wide variety of FS or if your FS is not included in the Hadoop Common API you can implement an extension
of this abstraction and using it in a transparent way.
Among others, these are some file systems it supports:
- HDFS.
- S3.
- Google Cloud Storage.
- Azure Blob Storage & Azure Data Lake Store.
- FTP & SFTP.
- WebHDFS.
- Local File System.
- Hadoop Archive File System.
Getting started¶
Prerequisites¶
- Apache Kafka 2.6.0.
- Java 8.
- Confluent Schema Registry (recommended).
Building from source¶
mvn clean package
General config¶
The kafka-connect-fs.properties
file defines the following properties as required:
name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=file:///data,hdfs://localhost:8020/data
topic=mytopic
policy.class=<Policy class>
policy.recursive=true
policy.regexp=.*
policy.batch_size=0
policy.cleanup=none
file_reader.class=<File reader class>
file_reader.batch_size=0
- The connector name.
- Class indicating the connector.
- Number of tasks the connector is allowed to start.
- Comma-separated URIs of the FS(s). They can be URIs pointing out directly to a file or a directory in the FS. These URIs can also be dynamic by using expressions for modifying them in runtime.
- Topic in which copy data from the FS.
- Policy class to apply (must implement
com.github.mmolimar.kafka.connect.fs.policy.Policy
interface). - Flag to activate traversed recursion in subdirectories when listing files.
- Regular expression to filter files from the FS.
- Number of files that should be handled at a time. Non-positive values disable batching.
- Cleanup strategy to manage processed files.
- File reader class to read files from the FS
(must implement
com.github.mmolimar.kafka.connect.fs.file.reader.FileReader
interface). - Number of records to process at a time. Non-positive values disable batching.
A more detailed information about these properties can be found here.
Running in local¶
export KAFKA_HOME=/path/to/kafka/install/dir
mvn clean package
export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '\-package' | tr '\n' ':')"
$KAFKA_HOME/bin/connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties config/kafka-connect-fs.properties
Running in Docker¶
mvn clean package
docker build --build-arg PROJECT_VERSION=<VERSION> .
docker-compose build
docker-compose up -d
docker logs --tail="all" -f connect
curl -sX GET http://localhost:8083/connector-plugins | grep FsSourceConnector
Components¶
There are two main concepts to decouple concerns within the connector. They are policies and file readers, described below.
Policies¶
In order to ingest data from the FS(s), the connector needs a policy to define the rules to do it.
Basically, the policy tries to connect to each FS included in the fs.uris
connector property, lists files
(and filter them using the regular expression provided in the policy.regexp
property) and enables
a file reader to read records.
The policy to be used by the connector is defined in the policy.class
connector property.
Important
When delivering records from the connector to Kafka, they contain their own file offset so, if in the next eventual policy execution this file is processed again, the policy will seek the file to this offset and process the next records if any (if the offset was committed).
Note
If the URIs included in the fs.uris
connector property contain any expression of the
form ${XXX}
, this dynamic URI is built in the moment of the policy execution.
Currently, there are few policies to support some use cases but, for sure, you can develop your own one
if the existing policies don’t fit your needs.
The only restriction is that you must implement the interface
com.github.mmolimar.kafka.connect.fs.policy.Policy
.
Simple¶
It’s a policy which just filters and processes files included in the corresponding URIs one time.
Attention
This policy is more oriented for testing purposes.
Sleepy¶
The behaviour of this policy is similar to Simple policy but on each execution it sleeps and wait for the next one. Additionally, its custom properties allow to end it.
You can learn more about the properties of this policy here.
Cron¶
This policy is scheduled based on cron expressions and their format to put in the configuration are based on the library Quartz Scheduler.
After finishing each execution, the policy gets slept until the next one is scheduled, if applicable.
You can learn more about the properties of this policy here.
HDFS file watcher¶
It uses Hadoop notifications events and all create/append/rename/close events will be reported as files to be ingested.
Just use it when you have HDFS URIs.
You can learn more about the properties of this policy here.
Attention
The URIs included in the general property fs.uris
will be filtered and only those
ones which start with the prefix hdfs://
will be watched. Also, this policy
will only work for Hadoop versions 2.6.0 or higher.
S3 event notifications¶
It uses S3 event notifications sent from S3 to process files which have been created or modified in S3. These notifications will be read from a AWS-SQS queue and they can be sent to SQS directly from S3 or via AWS-SNS, either as a SNS notification or a raw message in the subscription.
Just use it when you have S3 URIs and the event notifications in the S3 bucket must be enabled to a SNS topic or a SQS queue.
You can learn more about the properties of this policy here.
File readers¶
They read files and process each record from the FS. The file reader is needed by the policy to enable the connector to process each record and includes in the implementation how to seek and iterate over the records within the file.
The file reader to be used when processing files is defined in the file_reader.class
connector property.
In the same way as policies, the connector provides several sort of readers to parse and read records
for different file formats. If you don’t have a file reader that fits your needs, just implement one
with the unique restriction that it must implement the interface
com.github.mmolimar.kafka.connect.fs.file.reader.FileReader
.
The are several file readers included which can read the following file formats:
- Parquet.
- Avro.
- ORC.
- SequenceFile.
- Cobol / EBCDIC.
- Other binary files.
- CSV.
- TSV.
- Fixed-width.
- JSON.
- XML.
- YAML.
- Text.
Parquet¶
Reads files with Parquet format.
The reader takes advantage of the Parquet-Avro API and uses the Parquet file as if it was an Avro file, so the message sent to Kafka is built in the same way as the Avro file reader does.
More information about properties of this file reader here.
Avro¶
Files with Avro format can be read with this reader.
The Avro schema is not needed due to is read from the file. The message sent to Kafka is created by transforming the record by means of Confluent avro-converter API.
More information about properties of this file reader here.
ORC¶
ORC files are a self-describing type-aware columnar file format designed for Hadoop workloads.
This reader can process this file format, translating its schema and building a Kafka message with the content.
Warning
If you have ORC files with union
data types, this sort of
data types will be transformed in a map
object in the Kafka message.
The value of each key will be fieldN
, where N
represents
the index within the data type.
More information about properties of this file reader here.
SequenceFile¶
Sequence files are one kind of the Hadoop file formats which are serialized in key-value pairs.
This reader can process this file format and build a Kafka message with the
key-value pair. These two values are named key
and value
in the message
by default but you can customize these field names.
More information about properties of this file reader here.
Cobol¶
Mainframe files (Cobol / EBCDIC binary files) can be processed with this reader which uses the Cobrix parser.
By means of the corresponding copybook -representing its schema-, it parses each record and translate it into a Kafka message with the schema.
More information about properties of this file reader here.
Binary¶
All other kind of binary files can be ingested using this reader.
It just extracts the content plus some metadata such as: path, file owner, file group, length, access time, and modification time.
Each message will contain the following schema:
path
: File path (string).owner
: Owner of the file. (string).group
: Group associated with the file. (string).length
: Length of this file, in bytes. (long).access_time
: Access time of the file. (long).modification_time
: Modification time of the file (long).content
: Content of the file (bytes).
More information about properties of this file reader here.
CSV¶
CSV file reader using a custom token to distinguish different columns in each line.
It allows to distinguish a header in the files and set the name of their columns
in the message sent to Kafka. If there is no header, the value of each column will be in
the field named column_N
(N represents the column index) in the message.
Also, the token delimiter for columns is configurable.
This reader is based on the Univocity CSV parser.
More information about properties of this file reader here.
TSV¶
TSV file reader using a tab \t
to distinguish different columns in each line.
Its behaviour is the same one for the CSV file reader regarding the header and the column names.
This reader is based on the Univocity TSV parser.
More information about properties of this file reader here.
FixedWidth¶
FixedWidth is a plain text file reader which distinguishes each column based on the length of each field.
Its behaviour is the same one for the CSV / TSV file readers regarding the header and the column names.
This reader is based on the Univocity Fixed-Width parser.
More information about properties of this file reader here.
JSON¶
Reads JSON files which might contain multiple number of fields with their specified data types. The schema for this sort of records is inferred reading the first record and marked as optional in the schema all the fields contained.
More information about properties of this file reader here.
XML¶
Reads XML files which might contain multiple number of fields with their specified data types. The schema for this sort of records is inferred reading the first record and marked as optional in the schema all the fields contained.
Warning
Take into account the current limitations.
More information about properties of this file reader here.
YAML¶
Reads YAML files which might contain multiple number of fields with their specified data types. The schema for this sort of records is inferred reading the first record and marked as optional in the schema all the fields contained.
More information about properties of this file reader here.
Text¶
Reads plain text files.
Each line represents one record (by default) which will be in a field
named value
in the message sent to Kafka by default but you can
customize these field names.
More information about properties of this file reader here.
Agnostic¶
Actually, this reader is a wrapper of the readers listing above.
It tries to read any kind of file format using an internal reader based on the file extension, applying the proper one (Parquet, Avro, ORC, SequenceFile, Cobol / EBCDIC, CSV, TSV, FixedWidth, JSON, XML, YAML, or Text). In case of no extension has been matched, the Text file reader will be applied.
Default extensions for each format (configurable):
- Parquet:
.parquet
- Avro:
.avro
- ORC:
.orc
- SequenceFile:
.seq
- Cobol / EBCDIC:
.dat
- Other binary files:
.bin
- CSV:
.csv
- TSV:
.tsv
- FixedWidth:
.fixed
- JSON:
.json
- XML:
.xml
- YAML:
.yaml
- Text: any other sort of file extension.
More information about properties of this file reader here.
Configuration Options¶
General¶
General config properties for this connector.
name
The connector name.
- Type: string
- Importance: high
connector.class
Class indicating the connector.
- Type: string
- Importance: high
tasks.max
Number of tasks the connector is allowed to start.
- Type: int
- Importance: high
Tip
The number of URIs specified in the connector config will be grouped based on the number of tasks defined. So, if you have just one URI with one task is fine. Otherwise, if you want to improve the performance and process URIs in parallel you should adjust this number based on your requirements.
fs.uris
Comma-separated URIs of the FS(s). They can be URIs pointing directly to a file in the FS and also can be dynamic using expressions for modifying the URIs in runtime. These expressions have the form
${XXX}
where XXX represents a pattern fromjava.time.format.DateTimeFormatter
Java class.- Type: string
- Importance: high
Tip
If you want to ingest data from dynamic directories, this is, directories created every day and
avoiding to add new URIs or look for files from a parent directory, you can include expressions
in the URIs to do that. For example, for this URI file:///data/${yyyy}
, it will be
converted to file:///data/2020
(when executing whe policy).
You can use as many as you like in the URIs, for instance:
file:///data/${yyyy}/${MM}/${dd}/${HH}${mm}
Tip
If you want to ingest data from S3, you can add credentials with:
policy.fs.fs.s3a.access.key=<ACCESS_KEY>
and
policy.fs.fs.s3a.secret.key=<SECRET_KEY>
.
Also, in case you want to configure a custom credentials provider, you should use
policy.fs.fs.s3a.aws.credentials.provider=<CLASS>
property.
topic
Topic in which copy data to.
- Type: string
- Importance: high
poll.interval.ms
Frequency in milliseconds to poll for new data. This config just applies when the policies have ended.
- Type: int
- Default:
10000
- Importance: medium
policy.class
Policy class to apply (must implement
com.github.mmolimar.kafka.connect.fs.policy.Policy
interface).- Type: string
- Importance: high
policy.regexp
Regular expression to filter files from the FS.
- Type: string
- Importance: high
policy.recursive
Flag to activate traversed recursion in subdirectories when listing files.
- Type: boolean
- Default:
false
- Importance: medium
policy.batch_size
Number of files that should be handled at a time. Non-positive values disable batching.
- Type: int
- Default:
0
- Importance: medium
policy.cleanup
Cleanup strategy to use when skipping files. It’s possible to move these files to another folder, remove them or do nothing.
- Type: enum (available values
none
,move
anddelete
) - Default:
none
- Importance: medium
- Type: enum (available values
policy.cleanup.move
Target directory to move files for the
move
cleanup strategy. Mandatory just in case of using this strategy.- Type: string
- Importance: medium
policy.cleanup.move.prefix
Prefix to set to the filename in moved files.
policy.<policy_name>.<policy_property>
This represents custom properties you can include based on the policy class specified.
- Type: based on the policy.
- Importance: based on the policy.
policy.fs.<fs_property>
Custom properties to use for the FS.
- Type: based on the FS.
- Importance: based on the FS.
file_reader.class
File reader class to read files from the FS (must implement
com.github.mmolimar.kafka.connect.fs.file.reader.FileReader
interface).- Type: string
- Importance: high
file_reader.batch_size
Number of records to process at a time. Non-positive values disable batching.
- Type: int
- Default:
0
- Importance: medium
file_reader.<file_reader_name>.<file_reader_property>
This represents custom properties you can include based on the file reader class specified.
- Type: based on the file reader.
- Importance: based on the file reader.
Policies¶
Some policies have custom properties to define and others don’t. So, depending on the configuration you’ll have to take into account their properties.
Simple¶
This policy does not have any additional configuration.
Sleepy¶
In order to configure custom properties for this policy, the name you must use is sleepy
.
policy.sleepy.sleep
Max sleep time (in ms) to wait to look for files in the FS. Once an execution has finished, the policy will sleep during this time to be executed again.
- Type: long
- Importance: high
policy.sleepy.fraction
Sleep fraction to divide the sleep time to allow interrupting the policy faster.
- Type: long
- Default:
10
- Importance: medium
policy.sleepy.max_execs
Max executions allowed (negative to disable). After exceeding this number, the policy will end. An execution represents: listing files from the FS and its corresponding sleep time.
- Type: long
- Default:
-1
- Importance: medium
Cron¶
In order to configure custom properties for this policy, the name you must use is cron
.
policy.cron.expression
Cron expression to schedule the policy.
- Type: string
- Importance: high
policy.cron.end_date
End date to finish the policy with ISO date-time format.
- Type: date
- Default:
null
- Importance: medium
HDFS file watcher¶
In order to configure custom properties for this policy, the name you must use is hdfs_file_watcher
.
policy.hdfs_file_watcher.poll
Time to wait (in milliseconds) until the records retrieved from the file watcher will be sent to the source task.
- Type: long
- Default:
5000
- Importance: medium
policy.hdfs_file_watcher.retry
Sleep time to retry connections to HDFS in case of connection errors happened.
- Type: long
- Default:
20000
- Importance: medium
S3 event notifications¶
In order to configure custom properties for this policy, the name you must use is s3_event_notifications
.
policy.s3_event_notifications.queue
SQS queue name to retrieve messages from.
- Type: string
- Importance: high
policy.s3_event_notifications.poll
Time to wait (in milliseconds) until the records retrieved from the queue will be sent to the source task.
- Type: long
- Default:
5000
- Importance: medium
policy.s3_event_notifications.event_regex
Regular expression to filter event based on their types.
- Type: string
- Default:
.*
- Importance: medium
policy.s3_event_notifications.delete_messages
If messages from SQS should be removed after reading them.
- Type: boolean
- Default:
true
- Importance: medium
policy.s3_event_notifications.max_messages
Maximum number of messages to retrieve at a time (must be between 1 and 10).
- Type: int
- Importance: medium
policy.s3_event_notifications.visibility_timeout
Duration (in seconds) that the received messages are hidden from subsequent retrieve requests.
- Type: int
- Importance: low
File readers¶
Some file readers have custom properties to define and others don’t. So, depending on the configuration you’ll have to take into account their properties.
Parquet¶
In order to configure custom properties for this reader, the name you must use is parquet
.
file_reader.parquet.schema
Avro schema in JSON format to use when reading a file.
- Type: string
- Importance: medium
file_reader.parquet.projection
Avro schema in JSON format to use for projecting fields from records in a file.
- Type: string
- Importance: medium
Avro¶
In order to configure custom properties for this reader, the name you must use is avro
.
file_reader.avro.schema
Avro schema in JSON format to use when reading a file. If not specified, the reader will use the schema defined in the file.
- Type: string
- Importance: medium
ORC¶
In order to configure custom properties for this reader, the name you must use is orc
.
file_reader.orc.use_zerocopy
Use zero-copy when reading a ORC file.
- Type: boolean
- Default:
false
- Importance: medium
file_reader.orc.skip_corrupt_records
If reader will skip corrupt data or not. If disabled, an exception will be thrown when there is corrupted data in the file.
- Type: boolean
- Default:
false
- Importance: medium
SequenceFile¶
In order to configure custom properties for this reader, the name you must use is sequence
.
file_reader.sequence.field_name.key
Custom field name for the output key to include in the Kafka message.
- Type: string
- Default:
key
- Importance: medium
file_reader.sequence.field_name.value
Custom field name for the output value to include in the Kafka message.
- Type: string
- Default:
value
- Importance: medium
file_reader.sequence.buffer_size
Custom buffer size to read data from the Sequence file.
- Type: int
- Default:
4096
- Importance: low
Cobol¶
In order to configure custom properties for this reader, the name you must use is cobol
.
file_reader.cobol.copybook.content
The content of the copybook. It is mandatory if property
file_reader.cobol.copybook.path
is not set.- Type: string
- Default:
null
- Importance: high
file_reader.cobol.copybook.path
Copybook file path in the file system to be used. It is mandatory if property
file_reader.cobol.copybook.content
is not set.- Type: string
- Default:
null
- Importance: high
file_reader.cobol.reader.is_ebcdic
If the input data file encoding is EBCDIC, otherwise it is ASCII.
- Type: boolean
- Default:
true
- Importance: medium
file_reader.cobol.reader.is_text
If line ending characters will be used (LF / CRLF) as the record separator.
- Type: boolean
- Default:
false
- Importance: medium
file_reader.cobol.reader.ebcdic_code_page
Code page to be used for EBCDIC to ASCII / Unicode conversions.
- Type: string
- Default:
common
- Importance: medium
file_reader.cobol.reader.is_record_sequence
If the input file has 4 byte record length headers.
- Type: boolean
- Default:
false
- Importance: medium
file_reader.cobol.reader.floating_point_format
Format used for the floating-point numbers.
- Type: enum (available values
ibm
,ibm_little_endian
,ieee754
, andieee754_little_endian
) - Default:
ibm
- Importance: medium
- Type: enum (available values
file_reader.cobol.reader.schema_policy
Specifies a policy to transform the input schema.
- Type: enum (available values
keep_original
andcollapse_root
) - Default:
keep_original
- Importance: medium
- Type: enum (available values
file_reader.cobol.reader.string_trimming_policy
The trim to apply for records with string data types.
- Type: enum (available values
both
,left
,right
andnone
) - Default:
both
- Importance: medium
- Type: enum (available values
file_reader.cobol.reader.start_offset
An offset to the start of the record in each binary data block.
- Type: int
- Default:
0
- Importance: medium
file_reader.cobol.reader.end_offset
An offset from the end of the record to the end of the binary data block.
- Type: int
- Default:
0
- Importance: medium
file_reader.cobol.reader.file_start_offset
A number of bytes to skip at the beginning of each file.
- Type: int
- Default:
0
- Importance: medium
file_reader.cobol.reader.file_end_offset
A number of bytes to skip at the end of each file.
- Type: int
- Default:
0
- Importance: medium
file_reader.cobol.reader.ebcdic_code_page_class
Custom code page conversion class provided.
- Type: string
- Default:
null
- Importance: low
file_reader.cobol.reader.ascii_charset
Charset for ASCII data.
file_reader.cobol.reader.is_uft16_big_endian
Flag to consider UTF-16 strings as big-endian.
- Type: boolean
- Default:
true
- Importance: low
file_reader.cobol.reader.variable_size_occurs
If true, occurs depending on data size will depend on the number of elements.
- Type: boolean
- Default:
false
- Importance: low
file_reader.cobol.reader.record_length
Specifies the length of the record disregarding the copybook record size. Implied the file has fixed record length.
- Type: int
- Default:
null
- Importance: low
file_reader.cobol.reader.length_field_name
The name for a field that contains the record length. If not set, the copybook record length will be used.
- Type: string
- Default:
null
- Importance: low
file_reader.cobol.reader.is_rdw_big_endian
If the RDW is big endian.
- Type: boolean
- Default:
false
- Importance: low
file_reader.cobol.reader.is_rdw_part_rec_length
If the RDW count itself as part of record length itself.
- Type: boolean
- Default:
false
- Importance: low
file_reader.cobol.reader.rdw_adjustment
Controls a mismatch between RDW and record length.
- Type: int
- Default:
0
- Importance: low
file_reader.cobol.reader.is_index_generation_needed
If the indexing input file before processing is requested.
- Type: boolean
- Default:
false
- Importance: low
file_reader.cobol.reader.input_split_records
The number of records to include in each partition.
- Type: int
- Default:
null
- Importance: low
file_reader.cobol.reader.input_split_size_mb
A partition size to target.
- Type: int
- Default:
null
- Importance: low
file_reader.cobol.reader.hdfs_default_block_size
Default HDFS block size for the HDFS filesystem used.
- Type: int
- Default:
null
- Importance: low
file_reader.cobol.reader.drop_group_fillers
If true the parser will drop all FILLER fields, even GROUP FILLERS that have non-FILLER nested fields.
- Type: boolean
- Default:
false
- Importance: low
file_reader.cobol.reader.drop_value_fillers
If true the parser will drop all value FILLER fields.
- Type: boolean
- Default:
true
- Importance: low
file_reader.cobol.reader.non_terminals
A comma-separated list of group-type fields to combine and parse as primitive fields.
- Type: string[]
- Default:
null
- Importance: low
file_reader.cobol.reader.debug_fields_policy
Specifies if debugging fields need to be added and what should they contain.
- Type: enum (available values
hex
,raw
andnone
) - Default:
none
- Importance: low
- Type: enum (available values
file_reader.cobol.reader.record_header_parser
Parser to be used to parse data field record headers.
- Type: string
- Default:
null
- Importance: low
file_reader.cobol.reader.record_extractor
Parser to be used to parse records.
- Type: string
- Default:
null
- Importance: low
file_reader.cobol.reader.rhp_additional_info
Extra option to be passed to a custom record header parser.
- Type: string
- Default:
null
- Importance: low
file_reader.cobol.reader.re_additional_info
A string provided for the raw record extractor.
file_reader.cobol.reader.input_file_name_column
A column name to add to each record containing the input file name.
Binary¶
There are no extra configuration options for this file reader.
CSV¶
To configure custom properties for this reader, the name you must use is delimited
(even though it’s for CSV).
file_reader.delimited.settings.format.delimiter
Field delimiter.
- Type: string
- Default:
,
- Importance: high
file_reader.delimited.settings.header
If the file contains header or not.
- Type: boolean
- Default:
false
- Importance: high
file_reader.delimited.settings.schema
A comma-separated list of ordered data types for each field in the file. Possible values:
byte
,short
,int
,long
,float
,double
,boolean
,bytes
andstring
)- Type: string[]
- Default:
null
- Importance: medium
file_reader.delimited.settings.data_type_mapping_error
Flag to enable/disable throwing errors when mapping data types based on the schema is not possible. If disabled, the returned value which could not be mapped will be
null
.- Type: boolean
- Default:
true
- Importance: medium
file_reader.delimited.settings.allow_nulls
If the schema supports nullable fields. If
file_reader.delimited.settings.data_type_mapping_error
config flag is disabled, the value set for this config will be ignored and set totrue
.- Type: boolean
- Default:
false
- Importance: medium
file_reader.delimited.settings.header_names
A comma-separated list of ordered field names to set when reading a file.
- Type: string[]
- Default:
null
- Importance: medium
file_reader.delimited.settings.null_value
Default value for
null
values.- Type: string
- Default:
null
- Importance: medium
file_reader.delimited.settings.empty_value
Default value for empty values (empty values within quotes).
- Type: string
- Default:
null
- Importance: medium
file_reader.delimited.settings.format.line_separator
Line separator to be used.
- Type: string
- Default:
\n
- Importance: medium
file_reader.delimited.settings.max_columns
Default value for
null
values.- Type: int
- Default:
512
- Importance: low
file_reader.delimited.settings.max_chars_per_column
Default value for
null
values.- Type: int
- Default:
4096
- Importance: low
file_reader.delimited.settings.rows_to_skip
Number of rows to skip.
- Type: long
- Default:
0
- Importance: low
file_reader.delimited.settings.line_separator_detection
If the reader should detect the line separator automatically.
- Type: boolean
- Default:
false
- Importance: low
file_reader.delimited.settings.delimiter_detection
If the reader should detect the delimiter automatically.
- Type: boolean
- Default:
false
- Importance: low
file_reader.delimited.settings.ignore_leading_whitespaces
Flag to enable/disable skipping leading whitespaces from values.
- Type: boolean
- Default:
true
- Importance: low
file_reader.delimited.settings.ignore_trailing_whitespaces
Flag to enable/disable skipping trailing whitespaces from values.
- Type: boolean
- Default:
true
- Importance: low
file_reader.delimited.settings.format.comment
Character that represents a line comment at the beginning of a line.
- Type: char
- Default:
#
- Importance: low
file_reader.delimited.settings.escape_unquoted
Flag to enable/disable processing escape sequences in unquoted values.
- Type: boolean
- Default:
false
- Importance: low
file_reader.delimited.settings.format.quote
Character used for escaping values where the field delimiter is part of the value.
- Type: char
- Default:
"
- Importance: low
file_reader.delimited.settings.format.quote_escape
Character used for escaping quotes inside an already quoted value.
- Type: char
- Default:
"
- Importance: low
file_reader.delimited.encoding
Encoding to use for reading a file. If not specified, the reader will use the default encoding.
- Type: string
- Default: based on the locale and charset of the underlying operating system.
- Importance: medium
file_reader.delimited.compression.type
Compression type to use when reading a file.
- Type: enum (available values
bzip2
,gzip
andnone
) - Default:
none
- Importance: medium
- Type: enum (available values
file_reader.delimited.compression.concatenated
Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.
- Type: boolean
- Default:
true
- Importance: low
TSV¶
To configure custom properties for this reader, the name you must use is delimited
(even though it’s for TSV).
file_reader.delimited.settings.header
If the file contains header or not.
- Type: boolean
- Default:
false
- Importance: high
file_reader.delimited.settings.schema
A comma-separated list of ordered data types for each field in the file. Possible values:
byte
,short
,int
,long
,float
,double
,boolean
,bytes
andstring
)- Type: string[]
- Default:
null
- Importance: medium
file_reader.delimited.settings.data_type_mapping_error
Flag to enable/disable throwing errors when mapping data types based on the schema is not possible. If disabled, the returned value which could not be mapped will be
null
.- Type: boolean
- Default:
true
- Importance: medium
file_reader.delimited.settings.allow_nulls
If the schema supports nullable fields. If
file_reader.delimited.settings.data_type_mapping_error
config flag is disabled, the value set for this config will be ignored and set totrue
.- Type: boolean
- Default:
false
- Importance: medium
file_reader.delimited.settings.header_names
A comma-separated list of ordered field names to set when reading a file.
- Type: string[]
- Default:
null
- Importance: medium
file_reader.delimited.settings.null_value
Default value for
null
values.- Type: string
- Default:
null
- Importance: medium
file_reader.delimited.settings.format.line_separator
Line separator to be used.
- Type: string
- Default:
\n
- Importance: medium
file_reader.delimited.settings.max_columns
Default value for
null
values.- Type: int
- Default:
512
- Importance: low
file_reader.delimited.settings.max_chars_per_column
Default value for
null
values.- Type: int
- Default:
4096
- Importance: low
file_reader.delimited.settings.rows_to_skip
Number of rows to skip.
- Type: long
- Default:
0
- Importance: low
file_reader.delimited.settings.line_separator_detection
If the reader should detect the line separator automatically.
- Type: boolean
- Default:
false
- Importance: low
file_reader.delimited.settings.line_joining
Identifies whether or lines ending with the escape character and followed by a line separator character should be joined with the following line.
- Type: boolean
- Default:
true
- Importance: low
file_reader.delimited.settings.ignore_leading_whitespaces
Flag to enable/disable skipping leading whitespaces from values.
- Type: boolean
- Default:
true
- Importance: low
file_reader.delimited.settings.ignore_trailing_whitespaces
Flag to enable/disable skipping trailing whitespaces from values.
- Type: boolean
- Default:
true
- Importance: low
file_reader.delimited.settings.format.comment
Character that represents a line comment at the beginning of a line.
- Type: char
- Default:
#
- Importance: low
file_reader.delimited.settings.format.escape
Character used for escaping special characters.
- Type: char
- Default:
\
- Importance: low
file_reader.delimited.settings.format.escaped_char
Character used to represent an escaped tab.
- Type: char
- Default:
t
- Importance: low
file_reader.delimited.encoding
Encoding to use for reading a file. If not specified, the reader will use the default encoding.
- Type: string
- Default: based on the locale and charset of the underlying operating system.
- Importance: medium
file_reader.delimited.compression.type
Compression type to use when reading a file.
- Type: enum (available values
bzip2
,gzip
andnone
) - Default:
none
- Importance: medium
- Type: enum (available values
file_reader.delimited.compression.concatenated
Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.
- Type: boolean
- Default:
true
- Importance: low
FixedWidth¶
To configure custom properties for this reader, the name you must use is delimited
(even though it’s for FixedWidth).
file_reader.delimited.settings.field_lengths
A comma-separated ordered list of integers with the lengths of each field.
- Type: int[]
- Importance: high
file_reader.delimited.settings.header
If the file contains header or not.
- Type: boolean
- Default:
false
- Importance: high
file_reader.delimited.settings.schema
A comma-separated list of ordered data types for each field in the file. Possible values:
byte
,short
,int
,long
,float
,double
,boolean
,bytes
andstring
)- Type: string[]
- Default:
null
- Importance: medium
file_reader.delimited.settings.data_type_mapping_error
Flag to enable/disable throwing errors when mapping data types based on the schema is not possible. If disabled, the returned value which could not be mapped will be
null
.- Type: boolean
- Default:
true
- Importance: medium
file_reader.delimited.settings.allow_nulls
If the schema supports nullable fields. If
file_reader.delimited.settings.data_type_mapping_error
config flag is disabled, the value set for this config will be ignored and set totrue
.- Type: boolean
- Default:
false
- Importance: medium
file_reader.delimited.settings.header_names
A comma-separated list of ordered field names to set when reading a file.
- Type: string[]
- Default:
null
- Importance: medium
file_reader.delimited.settings.keep_padding
If the padding character should be kept in each value.
- Type: boolean
- Default:
false
- Importance: medium
file_reader.delimited.settings.padding_for_headers
If headers have the default padding specified.
- Type: boolean
- Default:
true
- Importance: medium
file_reader.delimited.settings.null_value
Default value for
null
values.- Type: string
- Default:
null
- Importance: medium
file_reader.delimited.settings.format.ends_on_new_line
Line separator to be used.
- Type: boolean
- Default:
true
- Importance: medium
file_reader.delimited.settings.format.line_separator
Line separator to be used.
- Type: string
- Default:
\n
- Importance: medium
file_reader.delimited.settings.format.padding
The padding character used to represent unwritten spaces.
- Type: char
- Default: `` ``
- Importance: medium
file_reader.delimited.settings.max_columns
Default value for
null
values.- Type: int
- Default:
512
- Importance: low
file_reader.delimited.settings.max_chars_per_column
Default value for
null
values.- Type: int
- Default:
4096
- Importance: low
file_reader.delimited.settings.skip_trailing_chars
If the trailing characters beyond the record’s length should be skipped.
- Type: boolean
- Default:
false
- Importance: low
file_reader.delimited.settings.rows_to_skip
Number of rows to skip.
- Type: long
- Default:
0
- Importance: low
file_reader.delimited.settings.line_separator_detection
If the reader should detect the line separator automatically.
- Type: boolean
- Default:
false
- Importance: low
file_reader.delimited.settings.ignore_leading_whitespaces
Flag to enable/disable skipping leading whitespaces from values.
- Type: boolean
- Default:
true
- Importance: low
file_reader.delimited.settings.ignore_trailing_whitespaces
Flag to enable/disable skipping trailing whitespaces from values.
- Type: boolean
- Default:
true
- Importance: low
file_reader.delimited.settings.format.comment
Character that represents a line comment at the beginning of a line.
- Type: char
- Default:
#
- Importance: low
file_reader.delimited.encoding
Encoding to use for reading a file. If not specified, the reader will use the default encoding.
- Type: string
- Default: based on the locale and charset of the underlying operating system.
- Importance: medium
file_reader.delimited.compression.type
Compression type to use when reading a file.
- Type: enum (available values
bzip2
,gzip
andnone
) - Default:
none
- Importance: medium
- Type: enum (available values
file_reader.delimited.compression.concatenated
Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.
- Type: boolean
- Default:
true
- Importance: low
JSON¶
To configure custom properties for this reader, the name you must use is json
.
file_reader.json.record_per_line
If enabled, the reader will read each line as a record. Otherwise, the reader will read the full content of the file as a record.
- Type: boolean
- Default:
true
- Importance: medium
file_reader.json.deserialization.<deserialization_feature>
Deserialization feature to use when reading a JSON file. You can add as much as you like based on the ones defined here.
- Type: boolean
- Importance: medium
file_reader.json.encoding
Encoding to use for reading a file. If not specified, the reader will use the default encoding.
- Type: string
- Default: based on the locale and charset of the underlying operating system.
- Importance: medium
file_reader.json.compression.type
Compression type to use when reading a file.
- Type: enum (available values
bzip2
,gzip
andnone
) - Default:
none
- Importance: medium
- Type: enum (available values
file_reader.json.compression.concatenated
Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.
- Type: boolean
- Default:
true
- Importance: low
XML¶
To configure custom properties for this reader, the name you must use is xml
.
file_reader.xml.record_per_line
If enabled, the reader will read each line as a record. Otherwise, the reader will read the full content of the file as a record.
- Type: boolean
- Default:
true
- Importance: medium
file_reader.xml.deserialization.<deserialization_feature>
Deserialization feature to use when reading a XML file. You can add as much as you like based on the ones defined here.
- Type: boolean
- Importance: medium
file_reader.xml.encoding
Encoding to use for reading a file. If not specified, the reader will use the default encoding.
- Type: string
- Default: based on the locale and charset of the underlying operating system.
- Importance: medium
file_reader.xml.compression.type
Compression type to use when reading a file.
- Type: enum (available values
bzip2
,gzip
andnone
) - Default:
none
- Importance: medium
- Type: enum (available values
file_reader.xml.compression.concatenated
Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.
- Type: boolean
- Default:
true
- Importance: low
YAML¶
To configure custom properties for this reader, the name you must use is yaml
.
file_reader.yaml.deserialization.<deserialization_feature>
Deserialization feature to use when reading a YAML file. You can add as much as you like based on the ones defined here.
- Type: boolean
- Importance: medium
file_reader.yaml.encoding
Encoding to use for reading a file. If not specified, the reader will use the default encoding.
- Type: string
- Default: based on the locale and charset of the underlying operating system.
- Importance: medium
file_reader.yaml.compression.type
Compression type to use when reading a file.
- Type: enum (available values
bzip2
,gzip
andnone
) - Default:
none
- Importance: medium
- Type: enum (available values
file_reader.yaml.compression.concatenated
Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.
- Type: boolean
- Default:
true
- Importance: low
Text¶
To configure custom properties for this reader, the name you must use is text
.
file_reader.text.record_per_line
If enabled, the reader will read each line as a record. Otherwise, the reader will read the full content of the file as a record.
- Type: boolean
- Default:
true
- Importance: medium
file_reader.text.field_name.value
Custom field name for the output value to include in the Kafka message.
- Type: string
- Default:
value
- Importance: medium
file_reader.text.encoding
Encoding to use for reading a file. If not specified, the reader will use the default encoding.
- Type: string
- Default: based on the locale and charset of the underlying operating system.
- Importance: medium
file_reader.text.compression.type
Compression type to use when reading a file.
- Type: enum (available values
bzip2
,gzip
andnone
) - Default:
none
- Importance: medium
- Type: enum (available values
file_reader.text.compression.concatenated
Flag to specify if the decompression of the reader will finish at the end of the file or after the first compressed stream.
- Type: boolean
- Default:
true
- Importance: low
Agnostic¶
To configure custom properties for this reader, the name you must use is agnostic
.
file_reader.agnostic.extensions.parquet
A comma-separated string list with the accepted extensions for Parquet files.
- Type: string[]
- Default:
parquet
- Importance: medium
file_reader.agnostic.extensions.avro
A comma-separated string list with the accepted extensions for Avro files.
- Type: string[]
- Default:
avro
- Importance: medium
file_reader.agnostic.extensions.orc
A comma-separated string list with the accepted extensions for ORC files.
- Type: string[]
- Default:
orc
- Importance: medium
file_reader.agnostic.extensions.sequence
A comma-separated string list with the accepted extensions for Sequence files.
- Type: string[]
- Default:
seq
- Importance: medium
file_reader.agnostic.extensions.cobol
A comma-separated string list with the accepted extensions for Cobol files.
- Type: string[]
- Default:
dat
- Importance: medium
file_reader.agnostic.extensions.binary
A comma-separated string list with the accepted extensions for binary files.
- Type: string[]
- Default:
bin
- Importance: medium
file_reader.agnostic.extensions.csv
A comma-separated string list with the accepted extensions for CSV files.
- Type: string[]
- Default:
csv
- Importance: medium
file_reader.agnostic.extensions.tsv
A comma-separated string list with the accepted extensions for TSV files.
- Type: string[]
- Default:
tsv
- Importance: medium
file_reader.agnostic.extensions.fixed
A comma-separated string list with the accepted extensions for fixed-width files.
- Type: string[]
- Default:
fixed
- Importance: medium
file_reader.agnostic.extensions.json
A comma-separated string list with the accepted extensions for JSON files.
- Type: string[]
- Default:
json
- Importance: medium
file_reader.agnostic.extensions.xml
A comma-separated string list with the accepted extensions for XML files.
- Type: string[]
- Default:
xml
- Importance: medium
file_reader.agnostic.extensions.yaml
A comma-separated string list with the accepted extensions for YAML files.
- Type: string[]
- Default:
yaml
- Importance: medium
Note
The Agnostic reader uses the previous ones as inner readers. So, in case of using this reader, you’ll probably need to include also the specified properties for those readers in the connector configuration as well.
FAQs¶
My file was already processed and the connector, when it’s executed again, processes the same records again.
If during the previous executions the records were sent successfully to Kafka, their offsets were sent too. Then, when executing the policy again, it retrieves the offset and seeks the file. If this didn’t happen, it’s possible that the offset was not committed yet and, consequently, the offset retrieved is non-existent or too old.
Have a look when the offsets are committed in Kafka and/or try to execute the policy when you are sure the offsets have been committed.
The connector started but does not process any kind of file.
This can be for several reasons:
- Check if the files contained in the FS match the regexp provided.
- Check if there is any kind of problem with the FS. The connector tolerates FS connection exceptions to process them later but in log files you’ll find these possible errors.
- The file reader is reading files with an invalid format so it cannot process the file and continues with the next one. You can see this as an error in the log.
I have directories in the FS created day by day and I have to modify the connector everyday.
Don’t do this! Take advantage of the dynamic URIs using expressions.
For instance, if you have this URI hdfs://host:9000/data/2020
, you can
use this URI hdfs://host:9000/data/${yyyy}
instead.
The connector is too slow to process all URIs I have.
Obviously, this depends of the files in the FS(s) but having several URIs in
the connector might be a good idea to adjust the number of tasks
to process those URIs in parallel ( tasks.max
connector property).
Also, using the properties policy.batch_size
and/or file_reader.batch_size
in case you have tons of files or files too large might help.
I removed a file from the FS but the connector is still sending messages with the contents of that file.
This is a tricky issue. The file reader is an iterator and processes record by record but part of the file is buffered and, even though the file was removed from the FS, the file reader continues producing records until throws an exception. It’s a matter of time.
But the main thing is that you don’t have to worry about removing files from the FS when they are being processed. The connector tolerates errors when reading files and continues with the next file.