当前位置:Gxlcms > 数据库问题 > flink 使用sql实现kafka生产者和消费者

flink 使用sql实现kafka生产者和消费者

时间:2021-07-01 10:21:17 帮助过:212人阅读

com.g2.flink.models.CustomerStatusChangedEvent; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import static org.apache.flink.table.api.Expressions.$; /** * Hello world! */ //@Slf4j public class KafkaTableStreamApiProducerTest { public static void main(String[] args) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance() .inStreamingMode() //.useOldPlanner() // flink .useBlinkPlanner() // blink .build(); StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings); Long baseTimestamp = 1600855709000L; DataStream<CustomerStatusChangedEvent> eventDataSet = env.fromElements( new CustomerStatusChangedEvent(1010L, 2, baseTimestamp), new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100), new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100), new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150) ); String ddl = "CREATE TABLE CustomerStatusChangedEvent(\n" + "customerId int,\n" + "oldStatus int,\n" + "newStatus int,\n" + "eventTime bigint\n" + ") WITH(\n" + "‘connector.type‘=‘kafka‘,\n" + "‘connector.version‘=‘universal‘,\n" + "‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.12.87:9092‘,\n" + "‘connector.topic‘=‘customer_statusChangedEvent‘,\n" + "‘format.type‘=‘json‘\n" + ")\n" ; tableEnvironment.executeSql(ddl); while (true) { try { TimeUnit.SECONDS.sleep(3); int status = (int) (System.currentTimeMillis() % 3); String insert = "insert into CustomerStatusChangedEvent(customerId,oldStatus,newStatus,eventTime)" + "values(1001,1," + status + "," + System.currentTimeMillis() + ")"; tableEnvironment.executeSql(insert); } catch (Exception ex) { } } } }

 

3.消费者

import com.g2.flink.models.CustomerStatusChangedEvent;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * Hello world!
 */
//@Slf4j
public class KafkaTableStreamApiConsumerTest {

    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                //.useOldPlanner() // flink
                .useBlinkPlanner() // blink
                .build();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env, settings);

        Long baseTimestamp = 1600855709000L;
        DataStream<CustomerStatusChangedEvent> eventDataSet = env.fromElements(
                new CustomerStatusChangedEvent(1010L, 2, baseTimestamp),
                new CustomerStatusChangedEvent(1011L, 2, baseTimestamp + 100),
                new CustomerStatusChangedEvent(1011L, 1, baseTimestamp - 100),
                new CustomerStatusChangedEvent(1010L, 3, baseTimestamp + 150)
        );

        String ddl = "CREATE TABLE CustomerStatusChangedEvent(\n" +
                "customerId int,\n" +
                "oldStatus int,\n" +
                "newStatus int,\n" +
                "eventTime bigint\n" +
                ") WITH(\n" +
                "‘connector.type‘=‘kafka‘,\n" +
                "‘connector.version‘=‘universal‘,\n" +
                "‘connector.properties.group.id‘=‘g2_group‘,\n" +
                "‘connector.properties.bootstrap.servers‘=‘192.168.1.85:9092,192.168.1.86:9092,192.168.1.87:9092‘,\n" +
                "‘connector.topic‘=‘customer_statusChangedEvent‘,\n" +
                "‘connector.startup-mode‘ = ‘latest-offset‘,\n" +
                "‘format.type‘=‘json‘\n" +
                ")\n";
        tableEnvironment.executeSql(ddl);

        Table resultTb = tableEnvironment.sqlQuery("select customerId,newStatus as status " +
                " from CustomerStatusChangedEvent" +
                " where newStatus in(1,2)"
        );


    /*
    DataStream<Tuple2<Boolean, Tuple2<Integer, Integer>>> result = tableEnvironment.toRetractStream(resultTb,
                Types.TUPLE(Types.INT, Types.INT));

  */
        DataStream<CustomerStatusLog> result = tableEnvironment.toAppendStream(resultTb, CustomerStatusLog.class);
        result.print();

        try {
            env.execute();
        } catch (Exception ex) {

        }
    }

    public static class CustomerStatusLog {
        private Long customerId;

        private Integer status;

        public Long getCustomerId() {
            return customerId;
        }

        public void setCustomerId(Long customerId) {
            this.customerId = customerId;
        }

        public Integer getStatus() {
            return status;
        }

        public void setStatus(Integer newStatus) {
            this.status = newStatus;
        }

        public CustomerStatusLog() {

        }

        @Override
        public String toString() {
            return "CustomerStatusLog{" +
                    "customerId=" + customerId +
                    ", status=" + status +
                    ‘}‘;
        }
    }
}

 

4.消费者打印

4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=1}
4> CustomerStatusLog{customerId=1001, status=2}
4> CustomerStatusLog{customerId=1001, status=2}

flink 使用sql实现kafka生产者和消费者

标签:try   public   into   ring   tables   bootstrap   slf4j   rap   static   

人气教程排行