TL;DR场景基于 Flink 流处理将 MySQL 原始地区表数据抽取、转换、加载到 HBase 构建 DIM 层维度数据结论通过 HBaseReader 读取源数据SQL 关联维度表生成宽表HBaseWriterSink 回填目标表实现离线实时融合的维度数据处理产出完整 Scala 源码HBaseReader、HBaseWriterSink、AreaDetailInfo可直接用于生产环境版本矩阵功能状态说明HBaseReader 数据读取✅ 已验证从 wzk_area 表读取原始数据支持 Scan 全表扫描SQL 维度关联✅ 已验证三表 INNER JOIN 构建省市区层级维度HBaseWriterSink 回填✅ 已验证批量 Put 写入 dim_wzk_area 宽表Flink Checkpoint✅ 已验证EXACTLY_ONCE 模式保证数据一致性Redis ADS 层输出✅ 已验证Jedis HSET 写入 Redis HashDIM 层处理实现思路最原始的表 MySQL 中 area 到 HBase 中转换 area 表 到 地区ID、地区的名字、城市ID、城市的名字、省份 ID、省份的名字 到 HBase 中在分析交易过程时可以通过卖家、买家、商品和时间等维度描述交易发生的环境所以维度的作用一般是查询约束、分类汇总以及排序等。HBaseReaderclassHBaseReaderextendsRichSourceFunction[(String,String)]{privatevarconn:Connectionnullprivatevartable:Tablenullprivatevarscan:Scannulloverridedefopen(parameters:configuration.Configuration):Unit{valconf:ConfigurationHBaseConfiguration.create()conf.set(HConstants.ZOOKEEPER_QUORUM,h121.wzk.icu,h122.wzk.icu,h123.wzk.icu)conf.set(HConstants.ZOOKEEPER_CLIENT_PORT,2181)conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,30000)conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,30000)valtableName:TableNameTableName.valueOf(wzk_area)valcf1:Stringf1connConnectionFactory.createConnection(conf)tableconn.getTable(tableName)scannewScan()scan.addFamily(Bytes.toBytes(cf1))}overridedefrun(ctx:SourceFunction.SourceContext[(String,String)]):Unit{valrs:ResultScannertable.getScanner(scan)valiterator:util.Iterator[Result]rs.iterator()while(iterator.hasNext){valresult:Resultiterator.next()valrowKey:StringBytes.toString(result.getRow)valbuffer:StringBuffernewStringBuffer()for(cell:Cell-result.listCells().asScala){valvalue:StringBytes.toString(cell.getValueArray,cell.getValueOffset,cell.getValueLength)buffer.append(value).append(-)}valvalueString:Stringbuffer.replace(buffer.length()-1,buffer.length(),).toString ctx.collect((rowKey,valueString))}}overridedefcancel():Unit{}overridedefclose():Unit{try{if(table!null){table.close()}if(conn!null){conn.close()}}catch{casee:Exceptionprintln(e.getMessage)}}}HBaseWriterSinkclassHBaseWriterSinkextendsRichSinkFunction[String]{varconnection:Connection_varhbTable:Table_overridedefopen(parameters:Configuration):Unit{connectionnewConnHBase().connToHbase hbTableconnection.getTable(TableName.valueOf(dim_wzk_area))}overridedefclose():Unit{if(hbTable!null){hbTable.close()}if(connection!null){connection.close()}}overridedefinvoke(value:String,context:SinkFunction.Context[_]):Unit{insertDimArea(hbTable,value)}definsertDimArea(hbTable:Table,value:String):Unit{valinfos:Array[String]value.split(,)valareaId:Stringinfos(0).trimvalaname:Stringinfos(1).trimvalcid:Stringinfos(2).trimvalcity:Stringinfos(3).trimvalproid:Stringinfos(4).trimvalprovince:Stringinfos(5).trimvalputnewPut(areaId.getBytes())put.addColumn(f1.getBytes(),aname.getBytes(),aname.getBytes())put.addColumn(f1.getBytes(),cid.getBytes(),cid.getBytes())put.addColumn(f1.getBytes(),city.getBytes(),city.getBytes())put.addColumn(f1.getBytes(),proId.getBytes(),proid.getBytes())put.addColumn(f1.getBytes(),province.getBytes(),province.getBytes())hbTable.put(put)}}AreaDetailcaseclassAreaDetail(id:Int,name:String,pid:Int)AreaDetailInfoobjectAreaDetailInfo{defmain(args:Array[String]):Unit{valenv:StreamExecutionEnvironmentStreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(5000)env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)valdata:DataStream[(String,String)]env.addSource(newHBaseReader)valdataStream:DataStream[AreaDetail]data.map(x{valid:Intx._1.toIntvaldatas:Array[String]x._2.split(-)valname:Stringdatas(5).trimvalpid:Intdatas(6).trim.toInt AreaDetail(id,name,pid)})valtableEnv:StreamTableEnvironmentStreamTableEnvironment.create(env)tableEnv.createTemporaryView(wzk_area,dataStream)valsql:String |select a.id as areaid,a.name as aname,a.pid as cid,b.name as city, c.id as proid,c.name as province |from wzk_area as a |inner join wzk_area as b on a.pid b.id |inner join wzk_area as c on b.pid c.id |.stripMarginvalareaTable:TabletableEnv.sqlQuery(sql)println(--- sqlQuery ---)valresultStream:DataStream[String]tableEnv.toRetractStream[Row](areaTable).map(x{valrow:Rowx._2valareaId:Stringrow.getField(0).toStringvalaname:Stringrow.getField(1).toStringvalcid:Stringrow.getField(2).toStringvalcity:Stringrow.getField(3).toStringvalproid:Stringrow.getField(4).toStringvalprovince:Stringrow.getField(5).toString areaId,aname,cid,city,proid,province})resultStream.addSink(newHBaseWriterSink)env.execute()}}DimAreacaseclassDimArea(areaId:Int,aname:String,cid:Int,city:String,proId:Int,province:String)运行测试执行 AreaDetailInfo.scala运行效果如下所示DW 层处理DWData WareHouse 数据仓库层包含 DWD、DWS、DIM 层数据加工而成主要完成数据架构与整合建立一致性的维度构建可复用的面向分析和统计的明细事实表以及汇总公共粒度的指标。DWDData Warehouse Detail 细节数据层是业务层与数据仓库的隔离层以业务过程作为建模驱动基于每个具体的业务过程特点构建细粒度的明细层事实表。可以结合企业的数据使用特点将明细事实表的某些重要维度属性字段做适当冗余也即宽表化处理。DWSData WareHouse Service 服务数据层基于 DWD 的基础数据整合汇总成分析某一个主题域的服务数据以分析的主题为建模驱动基于上层的应用和产品的指标需求构建公共粒度的汇总指标事实表公共维度层DIM基于维度建模理念思想建立一致性维度TMP 层临时层存放计算过程中临时产生的数据需要注意的是数据仓库层次划分不是固定不变的可以根据实际需求进行适当裁剪或者添加如果业务相对简单和独立可以将 DWD、DWS 进行合并。ADS层处理ADSApplication Data Store 应用层数据。基于 DW 数据整合汇总成主题域的服务数据用于提供后续的业务查询等。数据明细层从数据明细层分析结果到 ClickHouse、Redis、Druid 等写入到 Redis 中。classMySinkToRedisextendsRichSinkFunction[(CityOrder,Long)]{privatevarjedis:Jedis_;overridedefopen(parameters:Configuration):Unit{jedisnewJedis(h121.wzk.icu,6379,6000);jedis.select(0);// 选择 Redis 的第 0 个数据库}overridedefinvoke(value:(CityOrder,Long),context:SinkFunction.Context[_]):Unit{if(!jedis.isConnected){jedis.connect();}valmapnewHashMap[String,String]();map.put(totalMoney,value._1.totalMoney.toString);map.put(totalCount,value._1.totalCount.toString);map.put(time,value._2.toString);// 打印信息用于调试println(s${value._1.province}${value._1.city}数据条目:${map.size()}, 金额:${map.get(totalMoney)}, 数量:${map.get(totalCount)}, 时间:${map.get(time)});try{jedis.hset(s${value._1.province}${value._1.city},map);map.clear();}catch{casee:Exceptione.printStackTrace();}}overridedefclose():Unit{if(jedis!null){jedis.close();}}}错误速查卡症状根因定位修复HBaseConnection 连接超时Zookeeper 集群不可达或端口 2181 未开放检查 h121/h122/h123.wzk.icu 网络连通性确认 Zookeeper 服务运行中防火墙放行 2181SQL 关联结果为空wzk_area 表数据未按 pid 层级关联打印 dataStream 验证原始数据确认地区数据包含完整的三级省市区层级ResultScanner 超时HBase client scanner timeout 配置过小查看 HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD调大超时时间或减小每次 scan 的 batch sizeRedis HSET 写入失败Jedis 未正确 select database 或连接断开检查 jedis.isConnected 状态重连前先 select(0) 确认数据库