博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Flink中的数据重分区操作
阅读量:4096 次
发布时间:2019-05-25

本文共 4419 字,大约阅读时间需要 14 分钟。

示例1:

public class TransformTest6_Partition {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1); // 从文件中读取数据 DataStream
inputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); inputStream.print("input"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream
dataStream = inputStream.map(line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); //1、shuffle DataStream
shuffleStream = inputStream.shuffle(); shuffleStream.print("shuffle"); //2、keyBy dataStream.keyBy("id").print("keyBy"); env.execute(); }}

运行结果:

input:3> sensor_1,154777324345,35.8input:3> sensor_2,154777324346,15.4input:3> sensor_3,154777324347,6.7shuffle:2> sensor_2,154777324346,15.4shuffle:2> sensor_3,154777324347,6.7keyBy:1> SensorReading{
id='sensor_2', timestamp=154777324346, temperature=15.4}shuffle:1> sensor_1,154777324345,35.8keyBy:1> SensorReading{
id='sensor_3', timestamp=154777324347, temperature=6.7}keyBy:3> SensorReading{
id='sensor_1', timestamp=154777324345, temperature=35.8}input:4> sensor_4,154777324348,38.1input:4> sensor_1,154777324349,36.9input:1> sensor_1,154777324350,32.9shuffle:2> sensor_1,154777324349,36.9input:1> sensor_1,154777324351,37.1shuffle:1> sensor_4,154777324348,38.1keyBy:3> SensorReading{
id='sensor_1', timestamp=154777324349, temperature=36.9}keyBy:4> SensorReading{
id='sensor_4', timestamp=154777324348, temperature=38.1}shuffle:1> sensor_1,154777324350,32.9shuffle:2> sensor_1,154777324351,37.1input:2> sensor_6,154777324351,15.4input:2> sensor_7,154777324350,6.7keyBy:3> SensorReading{
id='sensor_1', timestamp=154777324350, temperature=32.9}keyBy:3> SensorReading{
id='sensor_1', timestamp=154777324351, temperature=37.1}shuffle:1> sensor_6,154777324351,15.4shuffle:2> sensor_7,154777324350,6.7keyBy:3> SensorReading{
id='sensor_6', timestamp=154777324351, temperature=15.4}keyBy:4> SensorReading{
id='sensor_7', timestamp=154777324350, temperature=6.7}

示例2:

public class TransformTest6_Partition {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1); // 从文件中读取数据 DataStream
inputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); inputStream.print("input"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream
dataStream = inputStream.map(line -> {
String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); });// //1、shuffle// DataStream
shuffleStream = inputStream.shuffle();// shuffleStream.print("shuffle");//// //2、keyBy// dataStream.keyBy("id").print("keyBy"); //3、global dataStream.global().print("global"); env.execute(); }}

运行结果:

input:4> sensor_1,154777324345,35.8input:2> sensor_1,154777324350,32.9input:2> sensor_1,154777324351,37.1input:3> sensor_6,154777324351,15.4input:3> sensor_7,154777324350,6.7global:1> SensorReading{
id='sensor_6', timestamp=154777324351, temperature=15.4}global:1> SensorReading{
id='sensor_7', timestamp=154777324350, temperature=6.7}input:1> sensor_4,154777324348,38.1input:1> sensor_1,154777324349,36.9global:1> SensorReading{
id='sensor_4', timestamp=154777324348, temperature=38.1}global:1> SensorReading{
id='sensor_1', timestamp=154777324349, temperature=36.9}input:4> sensor_2,154777324346,15.4input:4> sensor_3,154777324347,6.7global:1> SensorReading{
id='sensor_1', timestamp=154777324345, temperature=35.8}global:1> SensorReading{
id='sensor_2', timestamp=154777324346, temperature=15.4}global:1> SensorReading{
id='sensor_3', timestamp=154777324347, temperature=6.7}global:1> SensorReading{
id='sensor_1', timestamp=154777324350, temperature=32.9}global:1> SensorReading{
id='sensor_1', timestamp=154777324351, temperature=37.1}

转载地址:http://zyxii.baihongyu.com/

你可能感兴趣的文章
Linux并发服务器编程之多线程并发服务器
查看>>
聊聊gcc参数中的-I, -L和-l
查看>>
[C++基础]034_C++模板编程里的主版本模板类、全特化、偏特化(C++ Type Traits)
查看>>
C语言内存检测
查看>>
Linux epoll模型
查看>>
Linux select TCP并发服务器与客户端编程
查看>>
Linux系统编程——线程池
查看>>
基于Visual C++2013拆解世界五百强面试题--题5-自己实现strstr
查看>>
Linux 线程信号量同步
查看>>
C++静态成员函数访问非静态成员的几种方法
查看>>
类中的静态成员函数访问非静态成员变量
查看>>
C++学习之普通函数指针与成员函数指针
查看>>
C++的静态成员函数指针
查看>>
Linux系统编程——线程池
查看>>
yfan.qiu linux硬链接与软链接
查看>>
Linux C++线程池实例
查看>>
shared_ptr简介以及常见问题
查看>>
c++11 你需要知道这些就够了
查看>>
c++11 你需要知道这些就够了
查看>>
shared_ptr的一些尴尬
查看>>