Hive Dialect

Flink allows users to write SQL statements in Hive syntax when Hive dialect is used. By providing compatibility with Hive syntax, we aim to improve the interoperability with Hive and reduce the scenarios when users need to switch between Flink and Hive in order to execute different statements.

Use Hive Dialect

Flink currently supports two SQL dialects: default and hive. You need to switch to Hive dialect before you can write in Hive syntax. The following describes how to set dialect using SQL Client, SQL Gateway configured with HiveServer2 Endpoint and Table API. Also notice that you can dynamically switch dialect for each statement you execute. There’s no need to restart a session to use a different dialect.

Note:

  • To use Hive dialect, you have to add dependencies related to Hive. Please refer to Hive dependencies for how to add the dependencies.
  • Since Flink 1.15, if you want to use Hive dialect in Flink SQL Client or SQL Gateway, you have to put the jar flink-table-planner_2.12-1.16.0.jar located in FLINK_HOME/opt to FLINK_HOME/lib and then move out the jar flink-table-planner-loader-1.16.0.jar from FLINK_HOME/lib. Otherwise, it’ll throw ValidationException. Please refer to FLINK-25128 for more details.
  • Please make sure the current catalog is HiveCatalog. Otherwise, it will fall back to Flink’s default dialect. When using SQL Gateway configured with HiveServer2 Endpoint, the current catalog will be a HiveCatalog by default.
  • In order to have better syntax and semantic compatibility, it’s highly recommended to load HiveModule and place it first in the module list, so that Hive built-in functions can be picked up during function resolution. Please refer here for how to change resolution order. But when using SQL Gateway configured with HiveServer2 Endpoint, the Hive module will be loaded automatically.
  • Hive dialect only supports 2-part identifiers, so you can’t specify catalog for an identifier.
  • While all Hive versions support the same syntax, whether a specific feature is available still depends on the Hive version you use. For example, updating database location is only supported in Hive-2.4.0 or later.
  • The Hive dialect is mainly used in batch mode. Some Hive’s syntax (Sort/Cluster/Distributed BY, Transform, etc.) haven’t been supported in streaming mode yet.

SQL Client

SQL dialect can be specified via the table.sql-dialect property. Therefore,you can set the dialect after the SQL Client has launched.

  1. Flink SQL> SET table.sql-dialect = hive; -- to use Hive dialect
  2. [INFO] Session property has been set.
  3. Flink SQL> SET table.sql-dialect = default; -- to use Flink default dialect
  4. [INFO] Session property has been set.

SQL Gateway Configured With HiveServer2 Endpoint

When using the SQL Gateway configured with HiveServer2 Endpoint, the dialect will be Hive dialect by default, so you don’t need to do anything if you want to use Hive dialect. But you can still change the dialect to Flink default dialect.

  1. # assuming has connected to SQL Gateway with beeline
  2. jdbc:hive2> SET table.sql-dialect = default; -- to use Flink default dialect
  3. jdbc:hive2> SET table.sql-dialect = hive; -- to use Hive dialect

Table API

You can set dialect for your TableEnvironment with Table API.

Java

  1. EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
  2. TableEnvironment tableEnv = TableEnvironment.create(settings);
  3. // to use hive dialect
  4. tableEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
  5. // to use default dialect
  6. tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

Python

  1. from pyflink.table import *
  2. settings = EnvironmentSettings.in_batch_mode()
  3. t_env = TableEnvironment.create(settings)
  4. # to use Hive dialect
  5. t_env.get_config().set_sql_dialect(SqlDialect.HIVE)
  6. # to use Flink default dialect
  7. t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT)