flinksql读写redis( 二 )

2.2、使用GET作为维表查询命令@Testpublic void testGetSQL() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";/**这里需要注意的是,由于使用get命令,而且没有加format属性,所以维表只能有两个字段,多了也识别不到,详细可以看源码里的注释*/String daeamon ="CREATE TABLE classes\n" +"(\n" +"class_idBIGINT,\n" +"class_namestring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='GET'\n" +" )";String sink ="CREATE TABLE sink_students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"class_namestring \n" +") \n" +"WITH (\n" +"'connector'='print'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);/**这里join的字段必须是GET命令的key*/String sql =" insert into sink_students "+ " select s.number,s.name,s.school,s.class_id,d.class_namefrom students s"+ "left join classes for system_time as of s.proctime as don d.class_id = s.class_id";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();}2.3、使用HGET作为维表查询命令@Testpublic void testHGetSQL() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";/**这里需要注意的是,由于使用hget命令,而且没有加format属性,所以维表只能有三个字段,多了也识别不到,详细可以看源码里的注释*/String daeamon ="CREATE TABLE classes\n" +"(\n" +"schoolstring, \n" +"class_idBIGINT,\n" +"class_namestring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='HGET'\n" +" )";String sink ="CREATE TABLE sink_students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"class_namestring \n" +") \n" +"WITH (\n" +"'connector'='print'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);/**这里需要注意的是,由于使用hget命令,这里join的参数两个参数顺序没有关系,真正执行hget命令哪个字段作为key,哪个字段作为field只与维表定义的时候的字段顺序有关系*/String sql =" insert into sink_students "+ " select s.number,s.name,s.school,s.class_id,d.class_namefrom students s"+ "left join classes for system_time as of s.proctime as don d.class_id = s.class_id and d.school = s.school";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();}

经验总结扩展阅读