您的位置:首頁>設計>正文

[編織訊息方塊架][設計協定]分層模型設計

上篇有介紹過, 物件設計按職責劃分設計

為什麼不直全部實現不走QRpc?

開始設計時沒考慮到rpc功能, 後面才補上, 加上rpc有二級業務模型, 要進行二次解釋, 有性能損耗, 所以乾脆不改了

QPacket是個吸血模型(相對貧血模型來講 如 java bean物件只有getter setter 方法), 大多數邏輯實現寫在該物件上, 如轉換成應用層模型, 包編解碼

1 /*** 2 * 包 格式 [sn] + [c] + [b] +[sid] 3 * 4 * c [0000 1111] 16個協議 5 * 6 * @author solq 7 */ 8 public class QPacket implements IRecycle, IByte { 9 10 /** 11 * 包固定長度 12 */ 13 public final static int PACK_FIXED_LENG = Long.BYTES + Short.BYTES + Long.BYTES; 14 /** 15 * 回應遮罩 [0001 0000] 16 */ 17 public final static short MASK_RESPONSE = 0x10; 18 19 /** 20 * 壓縮遮罩[0010 0000] 21 */ 22 public final static short MASK_COMPRESS = 0x20; 23 24 public final static int MASK_OPCODE = MASK_RESPONSE | MASK_COMPRESS; 25 26 /** 序號 用於包ID, 解決冥等 **/ 27 private long sn; 28 /** sessionId **/ 29 private long sid; 30 /** opCode **/ 31 private short c; 32 /** 內容 **/ 33 private byte b; 34 /** 臨時數據 **/ 35 private Object tmpData; 36 37 @Override 38 public void recycle { 39 b = null; 40 tmpData = null; 41 } 42 43 @Override 44 public int toSize { 45 return QPacket.PACK_FIXED_LENG + b.length; 46 } 47 48 public int writeBytes(ByteBuf byteBuf) { 49 byteBuf.writeLong(sn); 50 byteBuf.writeShort(c); 51 byteBuf.writeBytes(b); 52 byteBuf.writeLong(sid); 53 return toSize; 54 } 55 56 public void writeToByteBuf(ByteBuf byteBuf) { 57 final int packetLen = toSize; 58 byteBuf.writeShort(QMConfig.getInstance.getPacketHeadFlag(packetLen)); 59 byteBuf.writeInt(packetLen); 60 writeBytes(byteBuf); 61 byteBuf.writeByte(QMConfig.getInstance.getPacketEndFlag(packetLen)); 62 } 63 64 /////////////////// toObject/////////////////////// 65 @Override 66 public byte toBytes { 67 final int len = toSize; 68 byte ret = new byte[len]; 69 int offset = 0; 70 PacketUtil.writeLong(offset, sn, ret); 71 PacketUtil.writeShort(offset += Long.BYTES, c, ret); 72 PacketUtil.writeBytes(offset += Short.BYTES, b, ret); 73 PacketUtil.writeLong(offset += b.length, sid, ret); 74 return ret; 75 } 76 77 public QProduce toProduce { 78 QProduce ret = toObject; 79 if (ret == null) { 80 ret = QProduce.of(getBytes); 81 tmpData = ret; 82 } 83 return ret; 84 } 85 86 public QConsume toConsume { 87 QConsume ret = toObject; 88 if (ret == null) { 89 ret = QConsume.byte2Object(getBytes); 90 tmpData = ret; 91 } 92 return ret; 93 } 94 95 public Collection toSubscribe { 96 Collection ret = toObject; 97 if (ret == null) { 98 ret = SerialUtil.readValue(getBytes, SerialUtil.subType); 99 tmpData = ret; 100 } 101 return ret; 102 } 103 104 public QRpc toRpc { 105 QRpc ret = toObject; 106 if (ret == null) { 107 ret = QRpc.toObject(getBytes); 108 tmpData = ret; 109 } 110 return ret; 111 } 112 113 public short toCode { 114 Short ret = toObject; 115 if (ret == null) { 116 ret = PacketUtil.readShort(0, b); 117 tmpData = ret; 118 } 119 return ret; 120 } 121 122 @SuppressWarnings("unchecked") 123 T toObject { 124 return (T) tmpData; 125 } 126 127 byte getBytes { 128 byte bytes = null; 129 if (hasStatus(MASK_COMPRESS)) { 130 bytes = SerialUtil.unZip(b); 131 } else { 132 bytes = b; 133 } 134 return bytes; 135 } 136 137 public void responseCode(short code) { 138 c = QOpCode.QCODE; 139 b = new byte[2]; 140 tmpData = code; 141 PacketUtil.writeShort(0, code, b); 142 } 143 144 public void setStatus(short value) { 145 c |= value; 146 } 147 148 public boolean hasStatus(short value) { 149 return (c & value) == value; 150 } 151 152 // static 153 @SuppressWarnings("unchecked") 154 public static QPacket of(Object data) { 155 if (data instanceof QProduce) { 156 return of((QProduce) data); 157 } 158 if (data instanceof QRpc) { 159 return of((QRpc) data); 160 } 161 if (data instanceof QConsume) { 162 return of((QConsume) data); 163 } 164 if (data instanceof byte[]) { 165 return of((byte[]) data); 166 } 167 if (data instanceof Integer) { 168 return of((Integer) data); 169 } 170 if (data instanceof QPacket) { 171 return (QPacket) data; 172 } 173 if (TypeUtils.isAssignable(data.getClass, SerialUtil.subType.getType)) { 174 return of((Collection) data); 175 } 176 throw new RuntimeException("未支援類型 :" + data.getClass); 177 } 178 179 public static QPacket of(ByteBuf byteBuf, int packetLen) { 180 long sn = byteBuf.readLong; 181 short c = byteBuf.readShort; 182 byte b = new byte[packetLen - QPacket.PACK_FIXED_LENG]; 183 byteBuf.readBytes(b); 184 long sid = byteBuf.readLong; 185 return of(c, sn, sid, null, b); 186 } 187 188 public static QPacket of(byte[] bytes) { 189 int offset = 0; 190 long sn = PacketUtil.readLong(offset, bytes); 191 short c = PacketUtil.readShort(offset += Long.BYTES, bytes); 192 byte b = PacketUtil.readBytes(offset += Short.BYTES, bytes.length - QPacket.PACK_FIXED_LENG, bytes); 193 long sid = PacketUtil.readLong(offset += b.length, bytes); 194 return of(c, sn, sid, null, b); 195 } 196 197 public static QPacket of(short code) { 198 byte b = new byte[2]; 199 PacketUtil.writeShort(0, code, b); 200 long sn = PacketUtil.getSn; 201 return of(QOpCode.QCODE, sn, -1, null, b); 202 } 203 204 public static QPacket of(QRpc obj) { 205 byte b = obj.toBytes; 206 long sn = PacketUtil.getSn; 207 return of(QOpCode.QRPC, sn, -1, null, b); 208 } 209 210 public static QPacket of(QProduce obj) { 211 byte b = obj.toBytes; 212 long sn = PacketUtil.getSn; 213 return of(QOpCode.QPRODUCE, sn, -1, null, b); 214 } 215 216 public static QPacket of(QConsume obj) { 217 byte b = obj.toBytes; 218 long sn = PacketUtil.getSn; 219 return of(QOpCode.QCONSUME, sn, -1, null, b); 220 } 221 222 public static QPacket of(Collection obj) { 223 byte b = SerialUtil.writeValueAsBytes(obj); 224 long sn = PacketUtil.getSn; 225 return of(QOpCode.QSUBSCIBE, sn, -1, null, b); 226 } 227 228 public static QPacket of(short c, long sn, long sid, Object value, byte[] body) { 229 QPacket ret = new QPacket; 230 ret.c = c; 231 ret.sn = sn; 232 ret.sid = sid; 233 // 未壓縮, 處理壓縮 234 if (!ret.hasStatus(MASK_COMPRESS) && body.length >= QMConfig.getInstance.COMPRESS_SIZE) { 235 body = SerialUtil.zip(body); 236 ret.setStatus(MASK_COMPRESS); 237 } 238 ret.b = body; 239 ret.tmpData = value; 240 return ret; 241 } 242 243 // getter 244 245 public short getC { 246 return (short) (c & ~QPacket.MASK_OPCODE); 247 } 248 249 250 public long getSid { 251 return sid; 252 } 253 254 public long getSn { 255 return sn; 256 } 257 258 public byte getB { 259 return b; 260 } 261 262 public void setSid(long sid) { 263 this.sid = sid; 264 } 265 266 }

QPacket

1 public class QConsume implements IRecycle { 2 /** 3 * 最後讀取指標記錄 4 */ 5 private long o; 6 /** 7 * 數據 8 */ 9 private Object b; 10 /** 11 * topic 12 */ 13 private String t; 14 /** 15 * raw數據 16 */ 17 private byte r; 18 }

QConsume

1 /*** 2 * 屬性名採取最少字母命名, 減少通信跟存儲 生產消息物件 3 * 臨時物件,
負責 業務與qpacket資料交互 4 * @author solq 5 */ 6 @QOpCode(QOpCode.QPRODUCE) 7 public class QProduce implements IRecycle { 8 /** 訂閱 **/ 9 private String t; 10 /** 內容資訊 **/ 11 private Object b; 12 /** 作用本地查詢 **/ 13 @JsonIgnore 14 private long offset; 15 }

QProduce

1 public class QSubscribe { 2 /** 3 * topic 4 */ 5 private String topic; 6 /** 7 * groudId 8 */ 9 private String groupId; 10 }

QSubscribe

1 /** 2 *

3 * 2byte model 1byte command + 4byte indexs len + indexs + params + 1byte 4 * compress 5 *

6 * 7 * @author solq 8 */ 9 public class QRpc implements IRecycle, IByte { 10 /**模組號**/ 11 private short model; 12 /**方法命令編號**/ 13 private byte command; 14 /**參數索引**/ 15 private byte indexs; 16 /**參數**/ 17 private byte params; 18 }

QRpc

public static QPacket of(short c, long sn, long sid, Object value, byte[] body) { QPacket ret = new QPacket; ret.c = c; ret.sn = sn; ret.sid = sid; // 未壓縮, 處理壓縮 當內容長度大於配置就啟動壓縮處理 if (!ret.hasStatus(MASK_COMPRESS) && body.length >= QMConfig.getInstance.COMPRESS_SIZE) { body = SerialUtil.zip(body); ret.setStatus(MASK_COMPRESS); } ret.b = body; ret.tmpData = value; return ret; }
Next Article
喜欢就按个赞吧!!!
点击关闭提示