Changeset 4654


Ignore:
Timestamp:
2011-02-08 11:29:57 (2 years ago)
Author:
evert
Message:

Unchanged versioned system fields are no longer stored for each version. (#162)
In case of mutable updates, old values are copied to the next version if needed (cfr sparseness of the table).

Location:
trunk/cr/repository/impl/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/HBaseRepository.java

    r4653 r4654  
    195195                Result result = recordTable.get(get); 
    196196                if (!result.isEmpty()) { 
    197                     byte[] oldVersion = result.getValue(systemColumnFamily, RecordColumn.VERSION.bytes); 
    198                     if (oldVersion != null) { 
    199                         version = Bytes.toLong(oldVersion) + 1; 
    200                         // Make sure any old data gets cleared and old blobs are deleted 
    201                         // This is to cover the failure scenario where a record was deleted, but a failure 
    202                         // occured before executing the clearData 
    203                         // If this was already done, this is a no-op 
    204                         clearData(recordId); 
    205                     } 
     197                    byte[] oldVersion = result.getValue(systemColumnFamily, RecordColumn.VERSION.bytes); 
     198                    if (oldVersion != null) { 
     199                        version = Bytes.toLong(oldVersion) + 1; 
     200                        // Make sure any old data gets cleared and old blobs are deleted 
     201                        // This is to cover the failure scenario where a record was deleted, but a failure 
     202                        // occured before executing the clearData 
     203                        // If this was already done, this is a no-op 
     204                        clearData(recordId); 
     205                    } 
    206206                } 
    207207                 
     
    286286            } 
    287287             
     288            // Check if the update is an update of mutable fields 
    288289            if (updateVersion) { 
    289290                try { 
     
    359360    } 
    360361 
     362    // This method takes a put object containing the row's data to be updated 
     363    // A wal message is added to this put object 
     364    // The rowLocker is asked to put the data and message on the record table using the given rowlock 
     365    // Finally the wal is asked to process the message 
    361366    private void putRowWithWalProcessing(RecordId recordId, org.lilyproject.repository.impl.lock.RowLock rowLock, 
    362367            Put put, RecordEvent recordEvent) throws InterruptedException, RowLogException, IOException, RecordException { 
     
    464469    } 
    465470 
     471    // Calculates which fields have changed and updates the record types of the scopes that have changed fields 
    466472    private Set<Scope> calculateChangedFields(Record record, Record originalRecord, RecordType recordType, 
    467473            Long version, Put put, RecordEvent recordEvent, Set<BlobReference> referencedBlobs, Set<BlobReference> unReferencedBlobs) throws InterruptedException, FieldTypeNotFoundException, TypeException, BlobException, RecordTypeNotFoundException, RecordException { 
     
    480486        // Update record types 
    481487        for (Scope scope : changedScopes) { 
     488            long versionOfRTField = version; 
    482489            if (Scope.NON_VERSIONED.equals(scope)) { 
    483                 put.add(systemColumnFamily, recordTypeIdColumnNames.get(scope), 1L, Bytes.toBytes(recordType 
    484                         .getId())); 
    485                 put.add(systemColumnFamily, recordTypeVersionColumnNames.get(scope), 1L, Bytes 
    486                         .toBytes(recordType.getVersion())); 
     490                versionOfRTField = 1L; // For non-versioned fields the record type is always stored at version 1. 
     491            } 
     492            // Only update the recordTypeNames and versions if they have indeed changed 
     493            QName originalScopeRecordTypeName = originalRecord.getRecordTypeName(scope); 
     494            if (originalScopeRecordTypeName == null) { 
     495                put.add(systemColumnFamily, recordTypeIdColumnNames.get(scope), versionOfRTField, Bytes.toBytes(recordType.getId())); 
     496                put.add(systemColumnFamily, recordTypeVersionColumnNames.get(scope), versionOfRTField, Bytes.toBytes(recordType.getVersion())); 
    487497            } else { 
    488                 put.add(systemColumnFamily, recordTypeIdColumnNames.get(scope), version, Bytes 
    489                         .toBytes(recordType.getId())); 
    490                 put.add(systemColumnFamily, recordTypeVersionColumnNames.get(scope), version, Bytes 
    491                         .toBytes(recordType.getVersion())); 
    492  
     498                RecordType originalScopeRecordType = typeManager.getRecordTypeByName(originalScopeRecordTypeName, originalRecord.getRecordTypeVersion(scope)); 
     499                if (!recordType.getId().equals(originalScopeRecordType.getId())) { 
     500                    put.add(systemColumnFamily, recordTypeIdColumnNames.get(scope), versionOfRTField, Bytes.toBytes(recordType.getId())); 
     501                } 
     502                if (!recordType.getVersion().equals(originalScopeRecordType.getVersion())) { 
     503                    put.add(systemColumnFamily, recordTypeVersionColumnNames.get(scope), versionOfRTField, Bytes.toBytes(recordType.getVersion())); 
     504                } 
    493505            } 
    494506            record.setRecordType(scope, recordType.getName(), recordType.getVersion()); 
     
    497509    } 
    498510 
     511    // Returns a map (fieldname -> field) of all fields that are indicated by the record to be updated. 
     512    // The map also includes the fields that need to be deleted. Their name is mapped onto the delete marker. 
    499513    private Map<QName, Object> getFieldsToUpdate(Record record) { 
    500514        // Work with a copy of the map 
     
    507521    } 
    508522 
     523    // Checks for each field if it is different from its previous value and indeed needs to be updated. 
    509524    private Set<Scope> calculateUpdateFields(Map<QName, Object> fields, Map<QName, Object> originalFields, Map<QName, Object> originalNextFields, 
    510525            Long version, Put put, RecordEvent recordEvent, Set<BlobReference> referencedBlobs, Set<BlobReference> unReferencedBlobs, boolean mutableUpdate) throws InterruptedException, FieldTypeNotFoundException, TypeException, BlobException, RecordTypeNotFoundException, RecordException { 
     
    544559                } else { 
    545560                    put.add(dataColumnFamily, fieldIdAsBytes, version, encodedFieldValue); 
     561                    // If it is a mutable update and the next version of the field was the same as the one that is being updated, 
     562                    // the original value needs to be copied to that next version (due to sparseness of the table).  
    546563                    if (originalNextFields != null && !fieldIsNewOrDeleted && originalNextFields.containsKey(fieldName)) { 
    547564                        copyValueToNextVersionIfNeeded(version, put, originalNextFields, fieldName, originalValue); 
     
    591608            rowLock = lockRow(recordId); 
    592609             
    593             Record originalRecord = read(recordId, version, null, new ReadContext()); 
    594  
    595             // Update the mutable fields 
    596             Put put = new Put(recordId.toBytes()); 
    597610            Map<QName, Object> fields = getFieldsToUpdate(record); 
    598611            fields = filterMutableFields(fields); 
     612 
     613            Record originalRecord = read(recordId, version, null, new ReadContext()); 
    599614            Map<QName, Object> originalFields = filterMutableFields(originalRecord.getFields()); 
     615 
     616            Record originalNextRecord = null; 
     617            Map<QName, Object> originalNextFields = null; 
     618            try { 
     619                originalNextRecord = read(recordId, version + 1, null, new ReadContext()); 
     620                originalNextFields = filterMutableFields(originalNextRecord.getFields()); 
     621            } catch (VersionNotFoundException e) { 
     622                // There is no next version of the record 
     623            } 
     624 
     625            Put put = new Put(recordId.toBytes()); 
    600626            Set<BlobReference> referencedBlobs = new HashSet<BlobReference>(); 
    601627            Set<BlobReference> unReferencedBlobs = new HashSet<BlobReference>(); 
     628 
    602629            RecordEvent recordEvent = new RecordEvent(); 
    603630            recordEvent.setType(Type.UPDATE); 
    604631            recordEvent.setVersionUpdated(version); 
    605632 
    606             Set<Scope> changedScopes = calculateUpdateFields(fields, originalFields, getOriginalNextFields(recordId, version), version, put, recordEvent, referencedBlobs, unReferencedBlobs, true); 
     633             
     634            Set<Scope> changedScopes = calculateUpdateFields(fields, originalFields, originalNextFields, version, put, recordEvent, referencedBlobs, unReferencedBlobs, true); 
    607635            for (BlobReference referencedBlob : referencedBlobs) { 
    608636                referencedBlob.setRecordId(recordId); 
     
    613641             
    614642            if (!changedScopes.isEmpty()) { 
     643                // Update the record types 
     644                 
    615645                // If no record type is specified explicitly, use the current one of the non-versioned scope 
    616646                QName recordTypeName = record.getRecordTypeName() != null ? record.getRecordTypeName() : originalRecord.getRecordTypeName(); 
     
    625655                RecordType recordType = typeManager.getRecordTypeByName(recordTypeName, recordTypeVersion); 
    626656                 
    627                 // Update the mutable record type 
    628                 Scope scope = Scope.VERSIONED_MUTABLE; 
    629                 newRecord.setRecordType(scope, recordType.getName(), recordType.getVersion()); 
     657                // Update the mutable record type in the record object 
     658                Scope mutableScope = Scope.VERSIONED_MUTABLE; 
     659                newRecord.setRecordType(mutableScope, recordType.getName(), recordType.getVersion()); 
    630660                 
     661                // If the record type changed, update it on the record table 
     662                QName originalMutableScopeRecordTypeName = originalRecord.getRecordTypeName(mutableScope); 
     663                if (originalMutableScopeRecordTypeName == null) { // There was no initial mutable record type yet 
     664                    put.add(systemColumnFamily, recordTypeIdColumnNames.get(mutableScope), version, Bytes.toBytes(recordType.getId())); 
     665                    put.add(systemColumnFamily, recordTypeVersionColumnNames.get(mutableScope), version, Bytes.toBytes(recordType.getVersion())); 
     666                } else { 
     667                    RecordType originalMutableScopeRecordType = typeManager.getRecordTypeByName(originalMutableScopeRecordTypeName, originalRecord.getRecordTypeVersion(mutableScope)); 
     668                    if (!recordType.getId().equals(originalMutableScopeRecordType.getId())) { 
     669                        // If the next record version had the same record type name, copy the original value to that one 
     670                        if (originalNextRecord != null && originalMutableScopeRecordType.getName().equals(originalNextRecord.getRecordTypeName(mutableScope))) { 
     671                            put.add(systemColumnFamily, recordTypeIdColumnNames.get(mutableScope), version+1, Bytes.toBytes(originalMutableScopeRecordType.getId())); 
     672                        } 
     673                        put.add(systemColumnFamily, recordTypeIdColumnNames.get(mutableScope), version, Bytes.toBytes(recordType.getId())); 
     674                    } 
     675                    if (!recordType.getVersion().equals(originalMutableScopeRecordType.getVersion())) { 
     676                        // If the next record version had the same record type version, copy the original value to that one 
     677                        if (originalNextRecord != null && originalMutableScopeRecordType.getVersion().equals(originalNextRecord.getRecordTypeVersion(mutableScope))) { 
     678                            put.add(systemColumnFamily, recordTypeIdColumnNames.get(mutableScope), version+1, Bytes.toBytes(originalMutableScopeRecordType.getVersion())); 
     679                        } 
     680                        put.add(systemColumnFamily, recordTypeVersionColumnNames.get(mutableScope), version, Bytes.toBytes(recordType.getVersion())); 
     681                    } 
     682                } 
     683                 
     684                // Validate if the new values for the record are valid wrt the recordType (e.g. mandatory fields) 
    631685                validateRecord(newRecord, originalRecord, recordType); 
    632  
    633                 put.add(systemColumnFamily, recordTypeIdColumnNames.get(scope), version, Bytes 
    634                         .toBytes(recordType.getId())); 
    635                 put.add(systemColumnFamily, recordTypeVersionColumnNames.get(scope), version, Bytes 
    636                         .toBytes(recordType.getVersion())); 
    637686 
    638687                recordEvent.setVersionUpdated(version); 
     
    701750    } 
    702751 
    703     private Map<QName, Object> getOriginalNextFields(RecordId recordId, Long version) 
    704             throws RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, 
    705             TypeException, RecordNotFoundException, InterruptedException { 
    706         try { 
    707             Record originalNextRecord = read(recordId, version + 1, null, new ReadContext()); 
    708             return filterMutableFields(originalNextRecord.getFields()); 
    709         } catch (VersionNotFoundException exception) { 
    710             // There is no next record 
    711             return null; 
    712         } 
    713     } 
    714  
    715752    public Record read(RecordId recordId) throws RecordNotFoundException, RecordTypeNotFoundException, 
    716753            FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException, 
     
    844881            // extracted 
    845882            Pair<String, Long> recordTypePair = extractRecordType(Scope.NON_VERSIONED, result, null, record); 
    846             RecordType recordType = typeManager.getRecordTypeById(recordTypePair.getV1(), recordTypePair.getV2()); 
    847             record.setRecordType(recordType.getName(), recordType.getVersion()); 
    848             readContext.setRecordTypeId(Scope.NON_VERSIONED, recordType); 
     883            if (recordTypePair != null) { 
     884                RecordType recordType = typeManager.getRecordTypeById(recordTypePair.getV1(), recordTypePair.getV2()); 
     885                record.setRecordType(recordType.getName(), recordType.getVersion()); 
     886                readContext.setRecordTypeId(Scope.NON_VERSIONED, recordType); 
     887            } 
    849888        } 
    850889        return record; 
     
    915954        if (version == null) { 
    916955            // Get latest version 
    917             return new Pair<String, Long>(Bytes.toString(result.getValue(systemColumnFamily, 
    918                     recordTypeIdColumnNames.get(scope))), Bytes.toLong(result.getValue(systemColumnFamily, 
    919                     recordTypeVersionColumnNames.get(scope)))); 
     956            byte[] idBytes = result.getValue(systemColumnFamily, 
     957                    recordTypeIdColumnNames.get(scope)); 
     958            byte[] versionBytes = result.getValue(systemColumnFamily, 
     959                    recordTypeVersionColumnNames.get(scope)); 
     960            if ((idBytes == null || idBytes.length == 0) || (versionBytes == null || versionBytes.length == 0)) 
     961                return null; 
     962            return new Pair<String, Long>(Bytes.toString(idBytes), Bytes.toLong(versionBytes)); 
    920963 
    921964        } else { 
     
    932975        byte[] recordTypeIdColumnName = recordTypeIdColumnNames.get(scope); 
    933976        byte[] recordTypeVersionColumnName = recordTypeVersionColumnNames.get(scope); 
    934         Entry<Long, byte[]> ceilingEntry = versionableSystemCFversions.get(recordTypeIdColumnName) 
    935                 .ceilingEntry(version); 
     977        Entry<Long, byte[]> idCeilingEntry = null; 
     978        NavigableMap<Long, byte[]> recordTypeIdMap = versionableSystemCFversions.get(recordTypeIdColumnName); 
     979        if (recordTypeIdMap != null) { 
     980            idCeilingEntry = recordTypeIdMap.ceilingEntry(version); 
     981        } 
    936982        String recordTypeId = null; 
    937         if (ceilingEntry != null) { 
    938             recordTypeId = Bytes.toString(ceilingEntry.getValue()); 
    939         } 
     983        if (idCeilingEntry == null) 
     984            return null; 
     985        recordTypeId = Bytes.toString(idCeilingEntry.getValue()); 
     986         
    940987        Long recordTypeVersion = null; 
    941         ceilingEntry = versionableSystemCFversions.get(recordTypeVersionColumnName).ceilingEntry(version); 
    942         if (ceilingEntry != null) { 
    943             recordTypeVersion = Bytes.toLong(ceilingEntry.getValue()); 
    944         } 
     988        Entry<Long, byte[]> versionCeilingEntry = null; 
     989        NavigableMap<Long, byte[]> recordTypeVersionMap = versionableSystemCFversions.get(recordTypeVersionColumnName); 
     990        if (recordTypeVersionMap != null) { 
     991            versionCeilingEntry = recordTypeVersionMap.ceilingEntry(version); 
     992        } 
     993        if (versionCeilingEntry == null) 
     994            return null; 
     995        recordTypeVersion = Bytes.toLong(versionCeilingEntry.getValue()); 
    945996        Pair<String, Long> recordType = new Pair<String, Long>(recordTypeId, recordTypeVersion); 
    946997        return recordType; 
     
    10211072            record.setField(field.getV1(), field.getV2()); 
    10221073        } 
    1023         for (Scope readScope : context.getScopes()) { 
    1024             Pair<String, Long> recordTypePair = extractRecordType(readScope, result, version, record); 
    1025             RecordType recordType = typeManager.getRecordTypeById(recordTypePair.getV1(), recordTypePair.getV2()); 
    1026             record.setRecordType(readScope, recordType.getName(), recordType.getVersion()); 
    1027             context.setRecordTypeId(readScope, recordType); 
     1074        for (Scope scope : Scope.values()) { 
     1075            Pair<String, Long> recordTypePair = extractRecordType(scope, result, version, record); 
     1076            if (recordTypePair != null) { 
     1077                RecordType recordType = typeManager.getRecordTypeById(recordTypePair.getV1(), recordTypePair.getV2()); 
     1078                record.setRecordType(scope, recordType.getName(), recordType.getVersion()); 
     1079                context.setRecordTypeId(scope, recordType); 
     1080            } 
    10281081        } 
    10291082        return true; 
     
    11671220 
    11681221    private Set<BlobReference> getReferencedBlobs(FieldTypeImpl fieldType, Object value) throws BlobException { 
    1169         fieldType.getValueType().getValues(value); 
    11701222        HashSet<BlobReference> referencedBlobs = new HashSet<BlobReference>(); 
    11711223        ValueType valueType = fieldType.getValueType(); 
  • trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AbstractRepositoryTest.java

    r4645 r4654  
    900900    @Test 
    901901    public void testUpdateMutableFieldWithNewRecordType() throws Exception { 
    902         Record record = createDefaultRecord(); 
     902        // Create default record 
     903        Record record = createDefaultRecord(); 
     904         
     905        // Update the record, creates a second version 
    903906        Record updateRecord = record.clone(); 
    904907        updateRecord.setField(fieldType1.getName(), "value2"); 
    905908        updateRecord.setField(fieldType2.getName(), 789); 
    906909        updateRecord.setField(fieldType3.getName(), false); 
    907         repository.update(updateRecord); 
    908  
     910        Record updatedRecord = repository.update(updateRecord, false, false); 
     911 
     912        // Read the first version of the record 
     913        Record readRecord = repository.read(record.getId(), Long.valueOf(1)); 
     914        assertEquals(recordType1.getName(), readRecord.getRecordTypeName()); 
     915        assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion()); 
     916        assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.NON_VERSIONED)); 
     917        assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion(Scope.NON_VERSIONED)); 
     918        assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.VERSIONED)); 
     919        assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion(Scope.VERSIONED)); 
     920        assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.VERSIONED_MUTABLE)); 
     921        assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion(Scope.VERSIONED_MUTABLE)); 
     922         
     923        // Do a mutable update of the first version of the record, change the record type 
    909924        Record updateMutableRecord = repository.newRecord(record.getId()); 
    910925        updateMutableRecord.setVersion(Long.valueOf(1)); 
     
    914929        updateMutableRecord.setField(fieldType6.getName(), "value3"); 
    915930 
    916         assertEquals(Long.valueOf(1), repository.update(updateMutableRecord, true, false).getVersion()); 
    917  
    918         Record readRecord = repository.read(record.getId(), Long.valueOf(1)); 
     931        Record updatedMutableRecord = repository.update(updateMutableRecord, true, false); 
     932        assertEquals(Long.valueOf(1), updatedMutableRecord.getVersion()); 
     933 
     934        // Read the first version of the record again 
     935        readRecord = repository.read(record.getId(), Long.valueOf(1)); 
    919936        assertEquals(Long.valueOf(1), readRecord.getVersion()); 
    920937        assertEquals("value2", readRecord.getField(fieldType1.getName())); 
     
    938955        } 
    939956        assertEquals(recordType1.getName(), readRecord.getRecordTypeName()); 
     957        assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion()); 
    940958        assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.NON_VERSIONED)); 
     959        assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion(Scope.NON_VERSIONED)); 
    941960        assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.VERSIONED)); 
     961        assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion(Scope.VERSIONED)); 
     962        // The mutable record type should have been changed 
    942963        assertEquals(recordType2.getName(), readRecord.getRecordTypeName(Scope.VERSIONED_MUTABLE)); 
    943  
     964        assertEquals(recordType2.getVersion(), readRecord.getRecordTypeVersion(Scope.VERSIONED_MUTABLE)); 
     965 
     966        // Read the second version again of the record 
    944967        readRecord = repository.read(record.getId()); 
    945968        assertEquals(Long.valueOf(2), readRecord.getVersion()); 
     
    951974        assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.NON_VERSIONED)); 
    952975        assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.VERSIONED)); 
     976        // The original mutable record type should have been copied to the next version 
    953977        assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.VERSIONED_MUTABLE)); 
    954978    } 
Note: See TracChangeset for help on using the changeset viewer.