0、前言最近有个需求,需要使用flinksql读写redis,由于官网上并没有redis的connector,在网上找了很久,开源的几个connector又没法满足要求,所有这里就自己动手实现了一个 。已经适配了各个版本的flink,从flink1.12到flink1.15 。
简单介绍一下功能吧:
- 将redis作为流表时支持BLPOP、BRPOP、LPOP、RPOP、SPOP等命令;使用lua脚本封装的批量弹出提高消费性能
- 将redis作为维表时支持GET、HGET等命令;支持lookup缓存
- 将redis作为sink表时支持LPUSH、RPUSH、SADD、SET、HSET等命令;支持指定key的ttl时间
- 支持flink常见的序列化反序列化方式,如json、csv等,具体参见flink官网:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/overview/
@Beforepublic void init() {/**设置当前属于测试模式,在这个测试模式下,当流表数据消费完成后程序会停止,方便测试,这个模式默认false*/RedisOptions.IS_TEST = true;RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);List<String> lists = new ArrayList<>();for (int i = 0; i < 1000; i++) {lists.add("{\n" +"\"number\": " + i + ",\n" +"\"name\": \"学生" + i + "\",\n" +"\"school\": \"学校" + ((i % 3) + 1) +"\",\n" +"\"class_id\": " + ((i % 10) + 1) +"\n" +"}");}/*** 初始化学生数据*/for (int i = 0; i < 1; i++) {redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));}/*** 初始化班级数据*/for(int i = 0;i < 10;i++) {redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");}/*** 初始化学校班级数据*/for(int j = 1;j < 4;j++) {for (int i = 1; i < 11; i++) {redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");}}}1.2、使用BLPOP、BRPOP、LPOP、RPOP、SPOP消费指定的key的list或者set的数据@Testpublic void testBlpopSQL() throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);String source ="CREATE TABLE students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT \n" +") \n" +"WITH (\n" +"'connector'='redis',\n" +"'host'='10.201.0.33', \n" +"'port'='6379',\n" +"'redis-mode'='single', \n" +"'password'='123456',\n" +"'database'='0',\n" +"'key'='students',\n" +"'format'='json',\n" +"'batch-fetch-rows'='1000',\n" +"'json.fail-on-missing-field' = 'false',\n" +"'json.ignore-parse-errors' = 'true',\n" +"'command'='BLPOP'\n" +" )";String sink ="CREATE TABLE sink_students\n" +"(\n" +"numberBIGINT ,\n" +"namestring,\n" +"schoolstring, \n" +"class_idBIGINT \n" +") \n" +"WITH (\n" +"'connector'='print'\n" +" )";tEnv.executeSql(source);tEnv.executeSql(sink);String sql =" insert into sink_students select * from students";TableResult tableResult = tEnv.executeSql(sql);tableResult.getJobClient().get().getJobExecutionResult().get();}2、redis作为维表(不带format)2.1、数据准备@Beforepublic void init() {/**设置当前属于测试模式,在这个测试模式下,当流表数据消费完成后程序会停止,方便测试,这个模式默认false*/RedisOptions.IS_TEST = true;RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);List<String> lists = new ArrayList<>();for (int i = 0; i < 1000; i++) {lists.add("{\n" +"\"number\": " + i + ",\n" +"\"name\": \"学生" + i + "\",\n" +"\"school\": \"学校" + ((i % 3) + 1) +"\",\n" +"\"class_id\": " + ((i % 10) + 1) +"\n" +"}");}/*** 初始化学生数据*/for (int i = 0; i < 1; i++) {redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));}/*** 初始化班级数据*/for(int i = 0;i < 10;i++) {redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");}/*** 初始化学校班级数据*/for(int j = 1;j < 4;j++) {for (int i = 1; i < 11; i++) {redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");}}}
经验总结扩展阅读
- 追求性能极致:Redis6.0的多线程模型
- spring boot集成redis基础入门
- STM32的SPI口的DMA读写[原创www.cnblogs.com/helesheng]
- CentOS 7.9 安装 redis-6.2.0
- Redis实现布隆过滤器解析
- 深入底层C源码 Redis核心设计原理
- Redis高并发分布式锁详解
- 利用msg_msg实现任意地址读写
- 原生Redis跨数据中心双向同步优化实践
- Java 读写锁 ReadWriteLock 原理与应用场景详解