Flink Table API Hello World: A Complete Beginner's Example
Key Concepts: TableEnvironment | Programmatic Tables | Flink SQL | Data Generation
Problem Statement
This example demonstrates the fundamental concepts of Apache Flink's Table API by showing:
- How to create a basic TableEnvironment in streaming mode
- Different ways to create and manipulate tables (programmatic, SQL, descriptors)
- Integration between Table API and Flink SQL
- Basic data generation for testing purposes
Complete Code Example
package com.twalthr.flink.examples;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
/** Hello world example. */
public class Example_00_Table_Hello_World {
public static void main(String[] args) {
// 1. Create TableEnvironment in streaming mode
TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
// 2. Fully programmatic approach
env.fromValues(1).execute().print();
// 3. Flink SQL integration
env.sqlQuery("SELECT 1").execute().print();
// 4. API with SQL-like expressions
Table table = env.fromValues("1").as("c").select($("c").cast(DataTypes.STRING()));
// 5. Create temporary view
env.createTemporaryView("InputTable", table);
// 6. Output to blackhole connector
env.from("InputTable").insertInto(TableDescriptor.forConnector("blackhole").build()).execute();
// 7. Generate test data with datagen connector
env.from(
TableDescriptor.forConnector("datagen")
.schema(
Schema.newBuilder()
.column("uid", DataTypes.BIGINT())
.column("s", DataTypes.STRING())
.column("ts", DataTypes.TIMESTAMP_LTZ(3))
.build())
.build())
.execute()
.print();
}
}
Step-by-Step Explanation
1. TableEnvironment Setup
TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
Creates the central entry point for Flink's Table API in streaming mode. This environment manages tables, catalogs, and executes queries.
2. Programmatic Table Creation
env.fromValues(1).execute().print();
Creates a table from literal values (just the number 1) and immediately executes/prints it. Demonstrates the simplest way to create a table.
3. Flink SQL Integration
env.sqlQuery("SELECT 1").execute().print();
Shows how pure SQL can be executed within the same environment, proving seamless integration between Table API and SQL.
4. Table API with Expressions
Table table = env.fromValues("1").as("c").select($("c").cast(DataTypes.STRING()));
Demonstrates the Table API's DSL-style operations:
fromValues()creates a table from valuesas()assigns a column nameselect()withcast()performs type conversion
5. Temporary View Creation
env.createTemporaryView("InputTable", table);
Registers the table as a temporary view that can be referenced in subsequent queries, similar to SQL views.
6. Blackhole Connector
env.from("InputTable").insertInto(TableDescriptor.forConnector("blackhole").build()).execute();
Shows how to:
- Read from the registered view
- Use
TableDescriptorto define a sink - Blackhole connector discards all input (useful for testing)
7. Data Generation
env.from(
TableDescriptor.forConnector("datagen")
.schema(
Schema.newBuilder()
.column("uid", DataTypes.BIGINT())
.column("s", DataTypes.STRING())
.column("ts", DataTypes.TIMESTAMP_LTZ(3))
.build())
.build())
.execute()
.print();
Demonstrates:
- Using the built-in
datagenconnector for test data - Defining a schema with different data types
- Automatic generation of random data
Key Takeaways:
- Table API provides multiple ways to create and manipulate tables
- SQL and Table API can be mixed seamlessly
- Connectors (datagen, blackhole) simplify testing
- TableEnvironment is the central coordination point
Frequently Asked Questions
Q1. What's the difference between Table API and DataStream API?
A: Table API is higher-level, SQL-like abstraction, while DataStream API provides more low-level control. Both can be converted between each other.
Q2. How do I add dependencies for Table API?
A: You need flink-table-api-java (or flink-table-api-java-bridge for DataStream integration) and flink-table-planner-blink in your project.
Ready to explore more? Official Table API Docs | Try the Flink Playgrounds

