时间:2021-07-01 10:21:17 帮助过:212人阅读
3.消费者
import com.g2.flink.models.CustomerStatusChangedEvent; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; /** * Hello world! */ //@Slf4j public class KafkaTableStreamApiConsumerTest { public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() //.useOldPlanner() // flink .useBlinkPlanner() // blink .build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings); Long baseTimestamp = 1600855709000L; DataStream<CustomerStatusChangedEvent> eventDataSet = env.fromElements( new CustomerStatusChangedEvent(1010L, 2, baseTimestamp), new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100), new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100), new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150) ); String ddl = "CREATE TABLE CustomerStatusChangedEvent(\n" + "customerId int,\n" + "oldStatus int,\n" + "newStatus int,\n" + "eventTime bigint\n" + ") WITH(\n" + "‘connector.type‘=‘kafka‘,\n" + "‘connector.version‘=‘universal‘,\n" + "‘connector.properties.group.id‘=‘g2_group‘,\n" + "‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.1.87:9092‘,\n" + "‘connector.topic‘=‘customer_statusChangedEvent‘,\n" + "‘connector.startup-mode‘ = ‘latest-offset‘,\n" + "‘format.type‘=‘json‘\n" + ")\n"; tableEnvironment.executeSql(ddl); Table resultTb = tableEnvironment.sqlQuery("select customerId,newStatus as status " + " from CustomerStatusChangedEvent" + " where newStatus in(1,2)" ); /* DataStream<Tuple2<Boolean, Tuple2<Integer, Integer>>> result = tableEnvironment.toRetractStream(resultTb, Types.TUPLE(Types.INT, Types.INT)); */ DataStream<CustomerStatusLog> result = tableEnvironment.toAppendStream(resultTb, CustomerStatusLog.class); result.print(); try { env.execute(); } catch (Exception ex) { } } public static class CustomerStatusLog { private Long customerId; private Integer status; public Long getCustomerId() { return customerId; } public void setCustomerId(Long customerId) { this.customerId = customerId; } public Integer getStatus() { return status; } public void setStatus(Integer newStatus) { this.status = newStatus; } public CustomerStatusLog() { } @Override public String toString() { return "CustomerStatusLog{" + "customerId=" + customerId + ", status=" + status + ‘}‘; } } }
4.消费者打印
4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=1}
4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=2}
flink 使用sql实现kafka生产者和消费者
标签:try public into ring tables bootstrap slf4j rap static