Flink sample code

Admin, Student's Library
0

Flink Table API Hello World: A Complete Beginner's Example

Flink Table API Architecture

Key Concepts: TableEnvironment | Programmatic Tables | Flink SQL | Data Generation

Problem Statement

This example demonstrates the fundamental concepts of Apache Flink's Table API by showing:

  • How to create a basic TableEnvironment in streaming mode
  • Different ways to create and manipulate tables (programmatic, SQL, descriptors)
  • Integration between Table API and Flink SQL
  • Basic data generation for testing purposes

Complete Code Example

package com.twalthr.flink.examples;

import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.TableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/** Hello world example. */
public class Example_00_Table_Hello_World {

  public static void main(String[] args) {
    // 1. Create TableEnvironment in streaming mode
    TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode());

    // 2. Fully programmatic approach
    env.fromValues(1).execute().print();

    // 3. Flink SQL integration
    env.sqlQuery("SELECT 1").execute().print();

    // 4. API with SQL-like expressions
    Table table = env.fromValues("1").as("c").select($("c").cast(DataTypes.STRING()));

    // 5. Create temporary view
    env.createTemporaryView("InputTable", table);

    // 6. Output to blackhole connector
    env.from("InputTable").insertInto(TableDescriptor.forConnector("blackhole").build()).execute();

    // 7. Generate test data with datagen connector
    env.from(
            TableDescriptor.forConnector("datagen")
                .schema(
                    Schema.newBuilder()
                        .column("uid", DataTypes.BIGINT())
                        .column("s", DataTypes.STRING())
                        .column("ts", DataTypes.TIMESTAMP_LTZ(3))
                        .build())
                .build())
        .execute()
        .print();
  }
}

Step-by-Step Explanation

1. TableEnvironment Setup

TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode());

Creates the central entry point for Flink's Table API in streaming mode. This environment manages tables, catalogs, and executes queries.

2. Programmatic Table Creation

env.fromValues(1).execute().print();

Creates a table from literal values (just the number 1) and immediately executes/prints it. Demonstrates the simplest way to create a table.

3. Flink SQL Integration

env.sqlQuery("SELECT 1").execute().print();

Shows how pure SQL can be executed within the same environment, proving seamless integration between Table API and SQL.

4. Table API with Expressions

Table table = env.fromValues("1").as("c").select($("c").cast(DataTypes.STRING()));

Demonstrates the Table API's DSL-style operations:

  • fromValues() creates a table from values
  • as() assigns a column name
  • select() with cast() performs type conversion

5. Temporary View Creation

env.createTemporaryView("InputTable", table);

Registers the table as a temporary view that can be referenced in subsequent queries, similar to SQL views.

6. Blackhole Connector

env.from("InputTable").insertInto(TableDescriptor.forConnector("blackhole").build()).execute();

Shows how to:

  • Read from the registered view
  • Use TableDescriptor to define a sink
  • Blackhole connector discards all input (useful for testing)

7. Data Generation

env.from(
        TableDescriptor.forConnector("datagen")
            .schema(
                Schema.newBuilder()
                    .column("uid", DataTypes.BIGINT())
                    .column("s", DataTypes.STRING())
                    .column("ts", DataTypes.TIMESTAMP_LTZ(3))
                    .build())
            .build())
    .execute()
    .print();

Demonstrates:

  • Using the built-in datagen connector for test data
  • Defining a schema with different data types
  • Automatic generation of random data

Key Takeaways:

  • Table API provides multiple ways to create and manipulate tables
  • SQL and Table API can be mixed seamlessly
  • Connectors (datagen, blackhole) simplify testing
  • TableEnvironment is the central coordination point

Frequently Asked Questions

Q1. What's the difference between Table API and DataStream API?
A: Table API is higher-level, SQL-like abstraction, while DataStream API provides more low-level control. Both can be converted between each other.

Q2. How do I add dependencies for Table API?
A: You need flink-table-api-java (or flink-table-api-java-bridge for DataStream integration) and flink-table-planner-blink in your project.

Ready to explore more? Official Table API Docs | Try the Flink Playgrounds

Post a Comment

0 Comments
* Please Don't Spam Here. All the Comments are Reviewed by Admin.
Post a Comment (0)
Our website uses cookies to enhance your experience. Learn More
Accept !