-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Implement per column compression #3396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,6 +44,7 @@ | |
| import org.apache.parquet.column.values.bloomfilter.BloomFilter; | ||
| import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; | ||
| import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; | ||
| import org.apache.parquet.compression.CompressionCodecFactory; | ||
| import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; | ||
| import org.apache.parquet.crypto.AesCipher; | ||
| import org.apache.parquet.crypto.InternalColumnEncryptionSetup; | ||
|
|
@@ -672,6 +673,83 @@ public ColumnChunkPageWriteStore( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Construct a page write store with per-column compression support. | ||
| * Each column's compression codec is resolved from {@code props} via | ||
| * {@link ParquetProperties#getCompressionCodec(ColumnDescriptor)}. | ||
| * | ||
| * @param codecFactory factory to create compressors for each codec | ||
| * @param props properties containing per-column compression configuration | ||
| * @param schema the message schema | ||
| * @param allocator byte buffer allocator | ||
| * @param columnIndexTruncateLength truncate length for column indexes | ||
| * @param pageWriteChecksumEnabled whether to write page checksums | ||
| * @param fileEncryptor file encryptor (null if not encrypted) | ||
| * @param rowGroupOrdinal row group ordinal | ||
| */ | ||
| public ColumnChunkPageWriteStore( | ||
| CompressionCodecFactory codecFactory, | ||
| ParquetProperties props, | ||
| MessageType schema, | ||
| ByteBufferAllocator allocator, | ||
| int columnIndexTruncateLength, | ||
| boolean pageWriteChecksumEnabled, | ||
| InternalFileEncryptor fileEncryptor, | ||
| int rowGroupOrdinal) { | ||
| this.schema = schema; | ||
| if (null == fileEncryptor) { | ||
| for (ColumnDescriptor path : schema.getColumns()) { | ||
| BytesInputCompressor compressor = codecFactory.getCompressor(props.getCompressionCodec(path)); | ||
| writers.put( | ||
| path, | ||
| new ColumnChunkPageWriter( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this copy and paste from other constructors, I wonder if there is some refactoring that can be done to avoid duplication? (I wonder if we should have a ColumnChunkPageWriterBuilder? |
||
| path, | ||
| compressor, | ||
| allocator, | ||
| columnIndexTruncateLength, | ||
| pageWriteChecksumEnabled, | ||
| null, | ||
| null, | ||
| null, | ||
| -1, | ||
| -1)); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| // Encrypted file | ||
| int columnOrdinal = -1; | ||
| byte[] fileAAD = fileEncryptor.getFileAAD(); | ||
| for (ColumnDescriptor path : schema.getColumns()) { | ||
| columnOrdinal++; | ||
| BlockCipher.Encryptor headerBlockEncryptor = null; | ||
| BlockCipher.Encryptor pageBlockEncryptor = null; | ||
| ColumnPath columnPath = ColumnPath.get(path.getPath()); | ||
|
|
||
| BytesInputCompressor compressor = codecFactory.getCompressor(props.getCompressionCodec(path)); | ||
|
|
||
| InternalColumnEncryptionSetup columnSetup = fileEncryptor.getColumnSetup(columnPath, true, columnOrdinal); | ||
| if (columnSetup.isEncrypted()) { | ||
| headerBlockEncryptor = columnSetup.getMetaDataEncryptor(); | ||
| pageBlockEncryptor = columnSetup.getDataEncryptor(); | ||
| } | ||
|
|
||
| writers.put( | ||
| path, | ||
| new ColumnChunkPageWriter( | ||
| path, | ||
| compressor, | ||
| allocator, | ||
| columnIndexTruncateLength, | ||
| pageWriteChecksumEnabled, | ||
| headerBlockEncryptor, | ||
| pageBlockEncryptor, | ||
| fileAAD, | ||
| rowGroupOrdinal, | ||
| columnOrdinal)); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public PageWriter getPageWriter(ColumnDescriptor path) { | ||
| return writers.get(path); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |
| import org.apache.parquet.column.ColumnWriteStore; | ||
| import org.apache.parquet.column.ParquetProperties; | ||
| import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; | ||
| import org.apache.parquet.compression.CompressionCodecFactory; | ||
| import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; | ||
| import org.apache.parquet.crypto.InternalFileEncryptor; | ||
| import org.apache.parquet.hadoop.api.WriteSupport; | ||
|
|
@@ -52,6 +53,7 @@ class InternalParquetRecordWriter<T> { | |
| private final int rowGroupRecordCountThreshold; | ||
| private long nextRowGroupSize; | ||
| private final BytesInputCompressor compressor; | ||
| private final CompressionCodecFactory codecFactory; | ||
| private final boolean validating; | ||
| private final ParquetProperties props; | ||
|
|
||
|
|
@@ -77,7 +79,9 @@ class InternalParquetRecordWriter<T> { | |
| * @param extraMetaData extra meta data to write in the footer of the file | ||
| * @param rowGroupSize the size of a block in the file (this will be approximate) | ||
| * @param compressor the codec used to compress | ||
| * @deprecated Use {@link #InternalParquetRecordWriter(ParquetFileWriter, WriteSupport, MessageType, Map, long, CompressionCodecFactory, boolean, ParquetProperties)} for per-column compression support | ||
| */ | ||
| @Deprecated | ||
| public InternalParquetRecordWriter( | ||
| ParquetFileWriter parquetFileWriter, | ||
| WriteSupport<T> writeSupport, | ||
|
|
@@ -95,6 +99,41 @@ public InternalParquetRecordWriter( | |
| this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit(); | ||
| this.nextRowGroupSize = rowGroupSizeThreshold; | ||
| this.compressor = compressor; | ||
| this.codecFactory = null; | ||
| this.validating = validating; | ||
| this.props = props; | ||
| this.fileEncryptor = parquetFileWriter.getEncryptor(); | ||
| this.rowGroupOrdinal = 0; | ||
| initStore(); | ||
| recordCountForNextMemCheck = props.getMinRowCountForPageSizeCheck(); | ||
| } | ||
|
|
||
| /** | ||
| * @param parquetFileWriter the file to write to | ||
| * @param writeSupport the class to convert incoming records | ||
| * @param schema the schema of the records | ||
| * @param extraMetaData extra meta data to write in the footer of the file | ||
| * @param rowGroupSize the size of a block in the file (this will be approximate) | ||
| * @param codecFactory the codec factory for per-column compression | ||
| */ | ||
| public InternalParquetRecordWriter( | ||
| ParquetFileWriter parquetFileWriter, | ||
| WriteSupport<T> writeSupport, | ||
| MessageType schema, | ||
| Map<String, String> extraMetaData, | ||
| long rowGroupSize, | ||
| CompressionCodecFactory codecFactory, | ||
| boolean validating, | ||
| ParquetProperties props) { | ||
| this.parquetFileWriter = parquetFileWriter; | ||
| this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null"); | ||
| this.schema = schema; | ||
| this.extraMetaData = extraMetaData; | ||
| this.rowGroupSizeThreshold = rowGroupSize; | ||
| this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit(); | ||
| this.nextRowGroupSize = rowGroupSizeThreshold; | ||
| this.compressor = null; | ||
| this.codecFactory = codecFactory; | ||
| this.validating = validating; | ||
| this.props = props; | ||
| this.fileEncryptor = parquetFileWriter.getEncryptor(); | ||
|
|
@@ -108,14 +147,27 @@ public ParquetMetadata getFooter() { | |
| } | ||
|
|
||
| private void initStore() { | ||
| ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore( | ||
| compressor, | ||
| schema, | ||
| props.getAllocator(), | ||
| props.getColumnIndexTruncateLength(), | ||
| props.getPageWriteChecksumEnabled(), | ||
| fileEncryptor, | ||
| rowGroupOrdinal); | ||
| ColumnChunkPageWriteStore columnChunkPageWriteStore; | ||
| if (codecFactory != null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to create a default codecFactory to avoid the if/else block below? |
||
| columnChunkPageWriteStore = new ColumnChunkPageWriteStore( | ||
| codecFactory, | ||
| props, | ||
| schema, | ||
| props.getAllocator(), | ||
| props.getColumnIndexTruncateLength(), | ||
| props.getPageWriteChecksumEnabled(), | ||
| fileEncryptor, | ||
| rowGroupOrdinal); | ||
| } else { | ||
| columnChunkPageWriteStore = new ColumnChunkPageWriteStore( | ||
| compressor, | ||
| schema, | ||
| props.getAllocator(), | ||
| props.getColumnIndexTruncateLength(), | ||
| props.getPageWriteChecksumEnabled(), | ||
| fileEncryptor, | ||
| rowGroupOrdinal); | ||
| } | ||
| pageStore = columnChunkPageWriteStore; | ||
| bloomFilterWriteStore = columnChunkPageWriteStore; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -201,15 +201,11 @@ public ParquetRecordWriter( | |
| MemoryManager memoryManager, | ||
| Configuration conf) { | ||
| this.codecFactory = new CodecFactory(conf, props.getPageSizeThreshold()); | ||
| // Ensure the default compression codec from ParquetOutputFormat is set in props | ||
| ParquetProperties propsWithCodec = | ||
| ParquetProperties.copy(props).withCompressionCodec(codec).build(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does risk overwriting an already set compression codec? |
||
| internalWriter = new InternalParquetRecordWriter<T>( | ||
| w, | ||
| writeSupport, | ||
| schema, | ||
| extraMetaData, | ||
| blockSize, | ||
| codecFactory.getCompressor(codec), | ||
| validating, | ||
| props); | ||
| w, writeSupport, schema, extraMetaData, blockSize, codecFactory, validating, propsWithCodec); | ||
| this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null"); | ||
| memoryManager.addWriter(internalWriter, blockSize); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There isn't an existing API to set this? I have to look more closely at the the convention but would
withDefaultCompressionCodecmake sense?