• 64160

    文章

  • 632

    评论

  • 59

    友链

  • 最近新加了换肤功能,大家多来逛逛吧~~~~
  • 喜欢这个网站的朋友可以加一下QQ群,我们一起交流技术。

Apache Flink 零基础入门(十八)Flink Table API&SQL

撸了今年阿里、腾讯和美团的面试,我有一个重要发现.......>>

什么是Flink关系型API?

虽然Flink已经支持了DataSet和DataStream API,但是有没有一种更好的方式去编程,而不用关心具体的API实现?不需要去了解Java和Scala的具体实现。

Flink provides three layered APIs. Each API offers a different trade-off between conciseness and expressiveness and targets different use cases.

Flink提供了三层API,每一层API提供了一个在简洁性和表达力之间的权衡 。

最低层是一个有状态的事件驱动。在这一层进行开发是非常麻烦的。

虽然很多功能基于DataSet和DataStreamAPI是可以完成的,需要熟悉这两套API,而且必须要熟悉Java和Scala,这是有一定的难度的。一个框架如果在使用的过程中没法使用SQL来处理,那么这个框架就有很大的限制。虽然对于开发人员无所谓,但是对于用户来说却不显示。因此SQL是非常面向大众语言。

好比MapReduce使用Hive SQL,Spark使用Spark SQL,Flink使用Flink SQL。

虽然Flink支持批处理/流处理,那么如何做到API层面的统一?

这样Table和SQL应运而生。

这其实就是一个关系型API,操作起来如同操作Mysql一样简单。

Apache Flink features two relational APIs - the Table API and SQL - for unified stream and batch processing. The Table API is a language-integrated query API for Scala and Java that allows the composition of queries from relational operators such as selection, filter, and join in a very intuitive way. 

Apache Flink通过使用Table API和SQL 两大特性,来统一批处理和流处理。 Table API是一个查询API,集成了Scala和Java语言,并且允许使用select filter join等操作。

使用Table SQL API需要额外依赖

java:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

scala:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>

使用Table SQL API编程

首先导入上面的依赖,然后读取sales.csv文件,文件内容如下:

transactionId,customerId,itemId,amountPaid
111,1,1,100.0
112,2,2,505.0
113,1,3,510.0
114,2,4,600.0
115,3,2,500.0
116,4,2,500.0
117,1,2,500.0
118,1,2,500.0
119,1,3,500.0
120,1,2,500.0
121,2,4,500.0
122,1,2,500.0
123,1,4,500.0
124,1,2,500.0

Scala

object TableSQLAPI {

  def main(args: Array[String]): Unit = {
    val bEnv = ExecutionEnvironment.getExecutionEnvironment
    val bTableEnv = BatchTableEnvironment.create(bEnv)
    val filePath="E:/test/sales.csv"
    // 已经拿到DataSet
    val csv = bEnv.readCsvFile[SalesLog](filePath,ignoreFirstLine = true)
    // DataSet => Table
  }

  case class SalesLog(transactionId:String,customerId:String,itemId:String,amountPaid:Double
                     )
}

首先拿到DataSet,接下来将DataSet转为Table,然后就可以执行SQL了

    // DataSet => Table
    val salesTable = bTableEnv.fromDataSet(csv)
    // 注册成Table  Table => table
    bTableEnv.registerTable("sales", salesTable)
    // sql
    val resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId")
    bTableEnv.toDataSet[Row](resultTable).print()

输出结果如下:

4,500.0
3,500.0
1,4110.0
2,1605.0

这种方式只需要使用SQL就可以实现之前写mapreduce的功能。大大方便了开发过程。

Java

package com.vincent.course06;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

public class JavaTableSQLAPI {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(bEnv);
        DataSource<Sales> salesDataSource = bEnv.readCsvFile("E:/test/sales.csv").ignoreFirstLine().
                pojoType(Sales.class, "transactionId", "customerId", "itemId", "amountPaid");
        Table sales = bTableEnv.fromDataSet(salesDataSource);
        bTableEnv.registerTable("sales", sales);
        Table resultTable = bTableEnv.sqlQuery("select customerId, sum(amountPaid) money from sales group by customerId");
        DataSet<Row> rowDataSet = bTableEnv.toDataSet(resultTable, Row.class);
        rowDataSet.print();
    }

    public static class Sales {
        public String transactionId;
        public String customerId;
        public String itemId;
        public Double amountPaid;

        @Override
        public String toString() {
            return "Sales{" +
                    "transactionId='" + transactionId + '\'' +
                    ", customerId='" + customerId + '\'' +
                    ", itemId='" + itemId + '\'' +
                    ", amountPaid=" + amountPaid +
                    '}';
        }
    }
}

 


 转载至链接:https://my.oschina.net/duanvincent/blog/3105069。

695856371Web网页设计师②群 | 喜欢本站的朋友可以收藏本站,或者加入我们大家一起来交流技术!

欢迎来到梁钟霖个人博客网站。本个人博客网站提供最新的站长新闻,各种互联网资讯。 还提供个人博客模板,最新最全的java教程,java面试题。在此我将尽我最大所能将此个人博客网站做的最好! 谢谢大家,愿大家一起进步!

转载原创文章请注明出处,转载至: 梁钟霖个人博客www.liangzl.com

0条评论

Loading...


发表评论

电子邮件地址不会被公开。 必填项已用*标注

自定义皮肤
注册梁钟霖个人博客