<!-- This dependency is provided, because it should not be packaged into the JAR file. --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> <version>${flink.version}</version> <!-- <scope>provided</scope>--> </dependency>
<!-- Add logging framework, to produce console output when running in the IDE. --> <!-- These dependencies are excluded from the application JAR by default. --> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.7</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> <scope>runtime</scope> </dependency> </dependencies>
<!-- We use the maven-shade plugin to create a fat jar that contains all necessary dependencies. --> <!-- Change the value of <mainClass>...</mainClass> if your program entry point changes. --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.0.0</version> <executions> <!-- Run shade goal on package phase --> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <artifactSet> <excludes> <exclude>org.apache.flink:force-shading</exclude> <exclude>com.google.code.findbugs:jsr305</exclude> <exclude>org.slf4j:*</exclude> <exclude>log4j:*</exclude> </excludes> </artifactSet> <filters> <filter> <!-- Do not copy the signatures in the META-INF folder. Otherwise, this might cause SecurityExceptions when using the JAR. --> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>spendreport.FraudDetectionJob</mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins>
public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 配置kafka,账号密码 final Properties properties = new Properties(); final DataStreamSource<String> sensor = env.addSource(new FlinkKafkaConsumer010<String>("sensor", new SimpleStringSchema(), properties)); sensor.print("data"); env.execute(); }
集合获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<SensorReading> dataStreamSource = env.fromCollection(Arrays.asList( new SensorReading("sen1", 1l, 37.5), new SensorReading("sen2", 2l, 38.5), new SensorReading("sen3", 3l, 39.5), new SensorReading("sen4", 4l, 40.5)));
public class MySensorce implements SourceFunction<SensorReading> {
private boolean running = true;
@Override public void run(SourceContext<SensorReading> sourceContext) throws Exception {
final Random random = new Random();
final HashMap<String, Double> stringDoubleHashMap = new HashMap<>();
for (int i = 1; i < 11; i++) { stringDoubleHashMap.put(i + "", random.nextGaussian()); }
while (running) {
for (String id : stringDoubleHashMap.keySet()) { final double v = stringDoubleHashMap.get(id) + random.nextGaussian(); final SensorReading sensorReading = new SensorReading(id, System.currentTimeMillis(), v); sourceContext.collect(sensorReading); }
Thread.sleep(1000);
} }
@Override public void cancel() { running = false; } }
算子
基本算子
map
1 2 3 4 5 6
final SingleOutputStreamOperator<Integer> map = inputStream.map(new MapFunction<String, Integer>() { @Override public Integer map(String s) throws Exception { return s.length(); } });
final DataStream<SensorReading> hight = split.select("hight"); final DataStream<SensorReading> low = split.select("low"); final DataStream<SensorReading> all = split.select("low","hight");
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final DataStream<String> inputStream = env.readTextFile("Sensor.txt");
final DataStream<SensorReading> map = inputStream.map(line -> { final String[] split = line.split(","); return new SensorReading(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2])); });
// 分组 // final KeyedStream<SensorReading, Tuple> id = map.keyBy("id"); final KeyedStream<SensorReading, String> keyedStream = map.keyBy(SensorReading::getId);
// reduce final SingleOutputStreamOperator<SensorReading> reduce = keyedStream.reduce(new ReduceFunction<SensorReading>() { @Override public SensorReading reduce(SensorReading valu1, SensorReading valu2) throws Exception { return new SensorReading(valu1.getId(), valu2.getTimestamp(), Math.max(valu1.getTemp(), valu2.getTemp())); } });