Apache Flink Data Sources: Complete Guide with Code Examples
Key Methods: socketTextStream() | readTextFile() | addSource() | Kafka Connector
Apache Flink's StreamExecutionEnvironment provides multiple methods to ingest data from various sources. Whether you're working with sockets for testing, files for batch processing, or Kafka for production streams, Flink offers robust solutions. Apache Flink supports both built-in and custom sources for flexible stream processing.
1. Socket-based Source
Reads data from a socket (useful for testing):
DataStream<String> stream = env.socketTextStream("hostname", port);
2. File-based Sources
Reads data from files:
Text files:
DataStream<String> stream = env.readTextFile("file:///path/to/file");
CSV, Avro, Parquet (using FileInputFormat):
DataStream<Tuple2<String, Integer>> stream = env.readFile(
new CsvInputFormat<>(new Path("file:///path/to/file")),
"file:///path/to/file",
FileProcessingMode.PROCESS_CONTINUOUSLY, // or PROCESS_ONCE
1000 // monitoring interval in ms
);
3. Collection-based Sources
Creates a stream from in-memory collections (useful for testing):
List<String> data = Arrays.asList("a", "b", "c");
DataStream<String> stream = env.fromCollection(data);
Or directly from elements:
DataStream<String> stream = env.fromElements("a", "b", "c");
4. Kafka Source
Reads data from Apache Kafka (requires flink-connector-kafka dependency):
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(
"topic-name",
new SimpleStringSchema(),
props
);
DataStream<String> stream = env.addSource(kafkaSource);
Important: Remember to include the appropriate connector dependencies in your project (e.g., flink-connector-kafka) for external system sources.
5. Custom Source (SourceFunction)
Implement a custom source using SourceFunction:
DataStream<Long> stream = env.addSource(
new SourceFunction<Long>() {
private volatile boolean isRunning = true;
private long counter = 0;
@Override
public void run(SourceContext<Long> ctx) throws Exception {
while (isRunning) {
ctx.collect(counter++);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
);
6. External Systems (Connectors)
Flink provides connectors for various systems:
- RabbitMQ: flink-connector-rabbitmq
- AWS Kinesis: flink-connector-kinesis
- Google Pub/Sub: flink-connector-google-pubsub
- JDBC: flink-connector-jdbc
- Elasticsearch: flink-connector-elasticsearch
Example (RabbitMQ):
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("localhost")
.setPort(5672)
.setUserName("guest")
.setPassword("guest")
.setVirtualHost("/")
.build();
DataStream<String> stream = env.addSource(
new RMQSource<>(
connectionConfig,
"queue-name",
true,
new SimpleStringSchema()
)
);
7. Event Time Sources
For event time processing, you can assign timestamps and watermarks:
DataStream<Event> stream = env.addSource(new CustomSource())
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp())
);
8. Parallel Sources
For parallel sources (e.g., RichParallelSourceFunction):
DataStream<Long> stream = env.addSource(
new ParallelSourceFunction<Long>() {
// Implement parallel source logic
}
).setParallelism(4); // Run with 4 parallel instances
Summary Table
| Method | Description | Use Case |
|---|---|---|
| socketTextStream() | Reads from a socket | Testing |
| readTextFile() | Reads text files | Batch/Streaming |
| fromCollection() | Creates stream from a collection | Testing |
| fromElements() | Creates stream from elements | Testing |
| addSource() | Custom or connector-based sources | Production |
| readFile() | Reads files with custom FileInputFormat | Batch/Streaming |
| Kafka Connector | Reads from Kafka | Production |
| Custom SourceFunction | User-defined source logic | Custom needs |
Ready to implement Flink data sources? Download Apache Flink | Explore the official GitHub repo
Frequently Asked Questions
Q1. Which Flink source method is best for production systems?
A: For production systems, Kafka connector or other mature connectors (RabbitMQ, Kinesis) are recommended as they provide reliability, fault tolerance, and scalability.
Q2. How do I handle backpressure in custom sources?
A: Implement proper backpressure handling in your SourceFunction by checking SourceContext#getCheckpointLock() and implementing appropriate throttling mechanisms.

