flinksql读写redis( 五 )

4.3、使用HSET命令作为sink表写入命令(不指定key)@Testpublic void testHSet() throws Exception {long start = System.currentTimeMillis();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";String daeamon ="CREATE TABLE classes\n" +"(\n" +"schoolstring, \n" +"class_idBIGINT,\n" +"class_namestring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='HGET'\n" +" )";/***1、这里因为command是HSET,所以需要一个key和一个field,这里是按照表申明的顺序,第一个作为key,*第二个作为field,由于需要更新,也需要一个主键,这里最好把前两个字段一起作为主键*2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间,单位秒,默认为-1表示长期保存*/String sink ="CREATE TABLE sink_students\n" +"(\n" +"schoolstring, \n" +"numberBIGINT ,\n" +"namestring,\n" +"class_idBIGINT, \n" +"class_namestring, \n" +"primary key(school,number) not enforced" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'sink.parallelism' = '16',\n" +"'sink.key.ttl' = '300',\n" +"'command'='HSET'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);String sql =" insert into sink_students "+ " select s.school,s.number,s.name,s.class_id,d.class_namefrom students s"+ " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start) + "ms");}4.4、使用HSET命令作为sink表写入命令(指定key)【flinksql读写redis】@Testpublic void testHSetWithKey() throws Exception {long start = System.currentTimeMillis();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT, \n" +"proctime as PROCTIME() \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";String daeamon ="CREATE TABLE classes\n" +"(\n" +"schoolstring, \n" +"class_idBIGINT,\n" +"class_namestring" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'lookup.cache.max-rows'='1000',\n" +"'lookup.cache.ttl'='3600',\n" +"'lookup.cache.load-all'='true',\n" +"'database'='0',\n" +"'command'='HGET'\n" +" )";/***1、这里因为command是HSET,所以需要一个key和一个field,这里配置项指定了key,那么主键拼接就作为field,*使用hset保存到redis*2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间,单位秒,默认-1表示长期保存*/String sink ="CREATE TABLE sink_students\n" +"(\n" +"schoolstring, \n" +"numberBIGINT ,\n" +"namestring,\n" +"class_idBIGINT, \n" +"class_namestring, \n" +"primary key(number) not enforced" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'format'='json',\n" +"'key'='sink_students_hset',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'sink.parallelism' = '16',\n" +"'sink.key.ttl' = '300',\n" +"'command'='HSET'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(daeamon);tEnv.executeSql(sink);String sql =" insert into sink_students "+ " select s.school,s.number,s.name,s.class_id,d.class_namefrom students s"+ " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();long end = System.currentTimeMillis();System.out.println("耗时:" + (end - start) + "ms");}

经验总结扩展阅读