本文共 4419 字,大约阅读时间需要 14 分钟。
public class TransformTest6_Partition { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); inputStream.print("input"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); }); //1、shuffle DataStream shuffleStream = inputStream.shuffle(); shuffleStream.print("shuffle"); //2、keyBy dataStream.keyBy("id").print("keyBy"); env.execute(); }}
运行结果:
input:3> sensor_1,154777324345,35.8input:3> sensor_2,154777324346,15.4input:3> sensor_3,154777324347,6.7shuffle:2> sensor_2,154777324346,15.4shuffle:2> sensor_3,154777324347,6.7keyBy:1> SensorReading{ id='sensor_2', timestamp=154777324346, temperature=15.4}shuffle:1> sensor_1,154777324345,35.8keyBy:1> SensorReading{ id='sensor_3', timestamp=154777324347, temperature=6.7}keyBy:3> SensorReading{ id='sensor_1', timestamp=154777324345, temperature=35.8}input:4> sensor_4,154777324348,38.1input:4> sensor_1,154777324349,36.9input:1> sensor_1,154777324350,32.9shuffle:2> sensor_1,154777324349,36.9input:1> sensor_1,154777324351,37.1shuffle:1> sensor_4,154777324348,38.1keyBy:3> SensorReading{ id='sensor_1', timestamp=154777324349, temperature=36.9}keyBy:4> SensorReading{ id='sensor_4', timestamp=154777324348, temperature=38.1}shuffle:1> sensor_1,154777324350,32.9shuffle:2> sensor_1,154777324351,37.1input:2> sensor_6,154777324351,15.4input:2> sensor_7,154777324350,6.7keyBy:3> SensorReading{ id='sensor_1', timestamp=154777324350, temperature=32.9}keyBy:3> SensorReading{ id='sensor_1', timestamp=154777324351, temperature=37.1}shuffle:1> sensor_6,154777324351,15.4shuffle:2> sensor_7,154777324350,6.7keyBy:3> SensorReading{ id='sensor_6', timestamp=154777324351, temperature=15.4}keyBy:4> SensorReading{ id='sensor_7', timestamp=154777324350, temperature=6.7}
public class TransformTest6_Partition { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1); // 从文件中读取数据 DataStreaminputStream = env.readTextFile("/Users/sundongping/IdeaProjects/FirstFlinkTest/src/main/resources/sensor.txt"); inputStream.print("input"); // 转换成SensorReading类型 // java8 中的lamda表达式 DataStream dataStream = inputStream.map(line -> { String[] fields = line.split(","); return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2])); });// //1、shuffle// DataStream shuffleStream = inputStream.shuffle();// shuffleStream.print("shuffle");//// //2、keyBy// dataStream.keyBy("id").print("keyBy"); //3、global dataStream.global().print("global"); env.execute(); }}
运行结果:
input:4> sensor_1,154777324345,35.8input:2> sensor_1,154777324350,32.9input:2> sensor_1,154777324351,37.1input:3> sensor_6,154777324351,15.4input:3> sensor_7,154777324350,6.7global:1> SensorReading{ id='sensor_6', timestamp=154777324351, temperature=15.4}global:1> SensorReading{ id='sensor_7', timestamp=154777324350, temperature=6.7}input:1> sensor_4,154777324348,38.1input:1> sensor_1,154777324349,36.9global:1> SensorReading{ id='sensor_4', timestamp=154777324348, temperature=38.1}global:1> SensorReading{ id='sensor_1', timestamp=154777324349, temperature=36.9}input:4> sensor_2,154777324346,15.4input:4> sensor_3,154777324347,6.7global:1> SensorReading{ id='sensor_1', timestamp=154777324345, temperature=35.8}global:1> SensorReading{ id='sensor_2', timestamp=154777324346, temperature=15.4}global:1> SensorReading{ id='sensor_3', timestamp=154777324347, temperature=6.7}global:1> SensorReading{ id='sensor_1', timestamp=154777324350, temperature=32.9}global:1> SensorReading{ id='sensor_1', timestamp=154777324351, temperature=37.1}
转载地址:http://zyxii.baihongyu.com/