本地执行

即使在单个Java虚拟机中,Flink也可以在一台机器上运行。这允许用户在本地测试和调试Flink程序。本节概述了本地执行机制。

本地环境和执行程序允许您在本地Java虚拟机中运行Flink程序,或在任何JVM中作为现有程序的一部分运行。只需点击IDE的“运行”按钮,即可在本地启动大多数示例。

Flink支持两种不同的本地执行。LocalExecutionEnvironment开始全面运行Flink,包括JobManager和TaskManager。这些包括内存管理和在群集模式下执行的所有内部算法。

CollectionEnvironment正在执行的Java集合Flink程序。此模式不会启动完整的Flink运行时,因此执行的开销和轻量级都非常低。例如,DataSet.map()通过将map()函数应用于Java列表中的所有数据元来执行转换。

调试

如果您在本地运行Flink程序,您也可以像调试任何其他Java程序一样调试程序。您可以使用System.out.println()写出一些内部变量,也可以使用调试器。可以在其中设置断点map()reduce()以及所有其他方法。另请参阅Java API文档中调试部分,以获取Java API中的测试和本地调试实用程序指南。

Maven依赖

如果您在Maven项目中开发程序,则必须flink-clients使用此依赖项添加模块:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-clients_2.11</artifactId>
  4. <version>1.7-SNAPSHOT</version>
  5. </dependency>

本地环境

LocalEnvironment是Flink程序本地执行的句柄。使用它在本地JVM中运行程序 - 独立或嵌入其他程序。

通过该方法实例化本地环境ExecutionEnvironment.createLocalEnvironment()默认情况下,它将使用尽可能多的本地线程来执行,因为您的计算机具有CPU核心(硬件上下文)。您也可以指定所需的并行度。可以将本地环境配置为使用enableLogging()/ 登录控制台disableLogging()

在大多数情况下,呼叫ExecutionEnvironment.getExecutionEnvironment()是更好的方式。该方法LocalEnvironment在本地启动程序时(在命令行界面之外)返回,并在命令行界面调用程序时返回预先配置的集群运行环境

  1. public static void main(String[] args) throws Exception {
  2. ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
  3. DataSet<String> data = env.readTextFile("file:///path/to/file");
  4. data
  5. .filter(new FilterFunction<String>() {
  6. public boolean filter(String value) {
  7. return value.startsWith("http://");
  8. }
  9. })
  10. .writeAsText("file:///path/to/result");
  11. JobExecutionResult res = env.execute();
  12. }

JobExecutionResult执行完成后返回对象包含程序运行时和累加器结果。

LocalEnvironment还允许通过自定义的配置值Flink。

  1. Configuration conf = new Configuration();
  2. conf.setFloat(ConfigConstants.TASK_MANAGER_MEMORY_FRACTION_KEY, 0.5f);
  3. final ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(conf);

注意:本地运行环境不会启动任何Web前端来监视执行。

集合环境

使用Java执行Java Collections CollectionEnvironment是一种执行Flink程序的低开销方法。此模式的典型用例是自动测试,调试和代码重用。

对于更具交互性的情况,用户也可以使用为批处理实现的算法。可以在Java Application Server中使用稍微更改的Flink程序变体来处理传入的请求。

基于集合的执行的框架

  1. public static void main(String[] args) throws Exception {
  2. // initialize a new Collection-based execution environment
  3. final ExecutionEnvironment env = new CollectionEnvironment();
  4. DataSet<User> users = env.fromCollection( /* get elements from a Java Collection */);
  5. /* Data Set transformations ... */
  6. // retrieve the resulting Tuple2 elements into a ArrayList.
  7. Collection<...> result = new ArrayList<...>();
  8. resultDataSet.output(new LocalCollectionOutputFormat<...>(result));
  9. // kick off execution.
  10. env.execute();
  11. // Do some work with the resulting ArrayList (=Collection).
  12. for(... t : result) {
  13. System.err.println("Result = "+t);
  14. }
  15. }

flink-examples-batch模块包含一个完整的示例,称为CollectionExecutionExample

请注意,基于集合的Flink程序的执行仅适用于适合JVM堆的小数据。集合上的执行不是多线程的,只使用一个线程。