Flink起步安装和使用

2019-03-13 23:33:24来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

下载安装

下载地址

下载对应操作系统和版本的flink

 # 首先确认下Java环境
 $ java -version
 java version "1.8.0_111"
 Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
 Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)
 ?
 # Linux安装
 $ wget https://www.apache.org/dyn/closer.lua/flink/flink-1.7.2/flink-1.7.2-bin-scala_2.11.tgz # 下载二进制安装包
 $ tar xzf flink-*.tgz   # 解压安装包
 $ cd flink-1.7.1 # 切到安装包目录
 ?
 # Mac安装
 $ brew install apache-flink
 ...
 # 查看版本
 $ flink --version
 Version: 1.2.0, Commit ID: 1c659cf
 ?
 # 查看安装位置
 $ brew info apache-flink
 https://flink.apache.org/
 /usr/local/Cellar/apache-flink/1.7.1 (170 files, 321.1MB) *
  Built from source on 2019-02-14 at 09:32:35
 From: https://github.com/Homebrew/homebrew-core/blob/master/Formula/apache-flink.rb
 ==> Requirements
 Required: java = 1.8 ?
 ==> Options
 --HEAD
  Install HEAD version
 ==> Analytics
 install: 915 (30 days), 3,279 (90 days), 9,094 (365 days)
 install_on_request: 899 (30 days), 3,226 (90 days), 8,878 (365 days)
 build_error: 0 (30 days)
 ?
 # Mac上注意事项
 # Mac上对应Linux的bin目录在/usr/local/Cellar/apache-flink/1.7.1/libexec
 ?

 

配置运行

 $ cd flink-1.7.1
 $ ./bin/start-cluster.sh
 # 端口运行在localhost:8081

 

创建IDEA Maven工程 -> Add Archetype

GroupId: org.apache.flink

ArtifactId: flink-quickstart-java

Version: 1.7.1

 

添加代码

 public class SocketWindowWordCount {
 ?
     public static void main(String[] args) throws Exception {
 ?
         // the port to connect to
         final int port;
         try {
             final ParameterTool params = ParameterTool.fromArgs(args);
             port = params.getInt("port");
        } catch (Exception e) {
             System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
             return;
        }
 ?
         // get the execution environment
         final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 ?
         // get input data by connecting to the socket
         DataStream<String> text = env.socketTextStream("localhost", port, "\n");
 ?
         // parse the data, group it, window it, and aggregate the counts
         DataStream<WordWithCount> windowCounts = text
            .flatMap(new FlatMapFunction<String, WordWithCount>() {
                 @Override
                 public void flatMap(String value, Collector<WordWithCount> out) {
                     for (String word : value.split("\\s")) {
                         out.collect(new WordWithCount(word, 1L));
                    }
                }
            })
            .keyBy("word")
            .timeWindow(Time.seconds(5), Time.seconds(1))
            .reduce(new ReduceFunction<WordWithCount>() {
                 @Override
                 public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                     return new WordWithCount(a.word, a.count + b.count);
                }
            });
 ?
         // print the results with a single thread, rather than in parallel
         windowCounts.print().setParallelism(1);
 ?
         env.execute("Socket Window WordCount");
    }
 ?
     // Data type for words with count
     public static class WordWithCount {
 ?
         public String word;
         public long count;
 ?
         public WordWithCount() {}
 ?
         public WordWithCount(String word, long count) {
             this.word = word;
             this.count = count;
        }
 ?
         @Override
         public String toString() {
             return word + " : " + count;
        }
    }
 }

 

打包运行

 $ maven clean package -Dmaven.test.skip=true

 

开启监听端口

 $ nc -l 9000

 

运行上方代码

 # 连接到上方端口, 切到上方打好的jar包路径
 $ flink run -c 包路径.SocketWindowWordCount jar包路径 --port 9000 # 包路径指的是当前的java类的package

 

我们可以在nc中输入数据

 $ nc -l 9000
 hello hello hello
 hehe
 your world

 

查看结果

 $ tail -f log/flink-*-taskexecutor-*.out

 

停止flink

 $ ./bin/stop-cluster.sh

 


原文链接:https://www.cnblogs.com/yisen614/p/10521718.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:Java Object类

下一篇:java之接口开发-初级篇-webservice协议