/** * Packet sender implementation for the compressed MySQL protocol. For compressed transmission of multi-packets, split the packets up in the same way as the * uncompressed protocol. We fit up to MAX_PACKET_SIZE bytes of split uncompressed packet, including the header, into an compressed packet. The first packet * of the multi-packet is 4 bytes of header and MAX_PACKET_SIZE - 4 bytes of the payload. The next packet must send the remaining four bytes of the payload * followed by a new header and payload. If the second split packet is also around MAX_PACKET_SIZE in length, then only MAX_PACKET_SIZE - 4 (from the * previous packet) - 4 (for the new header) can be sent. This means the payload will be limited by 8 bytes and this will continue to increase by 4 at every * iteration. * * @param packet * data bytes * @param packetLen * packet length * @param packetSequence * sequence id * @throws IOException * if i/o exception occurs */ public void send(byte[] packet, int packetLen, byte packetSequence) throws IOException { this.compressedSequenceId = packetSequence;
// short-circuit send small packets without compression and return if (packetLen < MIN_COMPRESS_LEN) { writeCompressedHeader(packetLen + NativeConstants.HEADER_LENGTH, this.compressedSequenceId, 0); writeUncompressedHeader(packetLen, packetSequence); this.outputStream.write(packet, 0, packetLen); this.outputStream.flush(); return; }
if (packetLen + NativeConstants.HEADER_LENGTH > NativeConstants.MAX_PACKET_SIZE) { this.compressedPacket = new byte[NativeConstants.MAX_PACKET_SIZE]; } else { this.compressedPacket = new byte[NativeConstants.HEADER_LENGTH + packetLen]; }
PacketSplitter packetSplitter = new PacketSplitter(packetLen);
int unsentPayloadLen = 0; int unsentOffset = 0; // loop over constructing and sending compressed packets while (true) { this.compressedPayloadLen = 0;
if (packetSplitter.nextPacket()) { // rest of previous packet if (unsentPayloadLen > 0) { addPayload(packet, unsentOffset, unsentPayloadLen); }
// current packet int remaining = NativeConstants.MAX_PACKET_SIZE - unsentPayloadLen; // if remaining is 0 then we are sending a very huge packet such that are 4-byte header-size carryover from last packet accumulated to the size // of a whole packet itself. We don't handle this. Would require 4 million packet segments (64 gigs in one logical packet) int len = Math.min(remaining, NativeConstants.HEADER_LENGTH + packetSplitter.getPacketLen()); int lenNoHdr = len - NativeConstants.HEADER_LENGTH; addUncompressedHeader(packetSequence, packetSplitter.getPacketLen()); addPayload(packet, packetSplitter.getOffset(), lenNoHdr);
completeCompression(); // don't send payloads with incompressible data if (this.compressedPayloadLen >= len) { // combine the unsent and current packet in an uncompressed packet writeCompressedHeader(unsentPayloadLen + len, this.compressedSequenceId++, 0); this.outputStream.write(packet, unsentOffset, unsentPayloadLen); writeUncompressedHeader(lenNoHdr, packetSequence); this.outputStream.write(packet, packetSplitter.getOffset(), lenNoHdr); } else { sendCompressedPacket(len + unsentPayloadLen); }
packetSequence++; unsentPayloadLen = packetSplitter.getPacketLen() - lenNoHdr; unsentOffset = packetSplitter.getOffset() + lenNoHdr; resetPacket(); } else if (unsentPayloadLen > 0) { // no more packets, send remaining unsent data addPayload(packet, unsentOffset, unsentPayloadLen); completeCompression(); if (this.compressedPayloadLen >= unsentPayloadLen) { writeCompressedHeader(unsentPayloadLen, this.compressedSequenceId, 0); this.outputStream.write(packet, unsentOffset, unsentPayloadLen); } else { sendCompressedPacket(unsentPayloadLen); } resetPacket(); break; } else { // nothing left to send (only happens on boundaries) break; } }