Changeset 4654
- Timestamp:
- 2011-02-08 11:29:57 (2 years ago)
- Location:
- trunk/cr/repository/impl/src
- Files:
-
- 2 edited
Legend:
- Unmodified
- Added
- Removed
-
trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/HBaseRepository.java
r4653 r4654 195 195 Result result = recordTable.get(get); 196 196 if (!result.isEmpty()) { 197 byte[] oldVersion = result.getValue(systemColumnFamily, RecordColumn.VERSION.bytes);198 if (oldVersion != null) {199 version = Bytes.toLong(oldVersion) + 1;200 // Make sure any old data gets cleared and old blobs are deleted201 // This is to cover the failure scenario where a record was deleted, but a failure202 // occured before executing the clearData203 // If this was already done, this is a no-op204 clearData(recordId);205 }197 byte[] oldVersion = result.getValue(systemColumnFamily, RecordColumn.VERSION.bytes); 198 if (oldVersion != null) { 199 version = Bytes.toLong(oldVersion) + 1; 200 // Make sure any old data gets cleared and old blobs are deleted 201 // This is to cover the failure scenario where a record was deleted, but a failure 202 // occured before executing the clearData 203 // If this was already done, this is a no-op 204 clearData(recordId); 205 } 206 206 } 207 207 … … 286 286 } 287 287 288 // Check if the update is an update of mutable fields 288 289 if (updateVersion) { 289 290 try { … … 359 360 } 360 361 362 // This method takes a put object containing the row's data to be updated 363 // A wal message is added to this put object 364 // The rowLocker is asked to put the data and message on the record table using the given rowlock 365 // Finally the wal is asked to process the message 361 366 private void putRowWithWalProcessing(RecordId recordId, org.lilyproject.repository.impl.lock.RowLock rowLock, 362 367 Put put, RecordEvent recordEvent) throws InterruptedException, RowLogException, IOException, RecordException { … … 464 469 } 465 470 471 // Calculates which fields have changed and updates the record types of the scopes that have changed fields 466 472 private Set<Scope> calculateChangedFields(Record record, Record originalRecord, RecordType recordType, 467 473 Long version, Put put, RecordEvent recordEvent, Set<BlobReference> referencedBlobs, Set<BlobReference> unReferencedBlobs) throws InterruptedException, FieldTypeNotFoundException, TypeException, BlobException, RecordTypeNotFoundException, RecordException { … … 480 486 // Update record types 481 487 for (Scope scope : changedScopes) { 488 long versionOfRTField = version; 482 489 if (Scope.NON_VERSIONED.equals(scope)) { 483 put.add(systemColumnFamily, recordTypeIdColumnNames.get(scope), 1L, Bytes.toBytes(recordType 484 .getId())); 485 put.add(systemColumnFamily, recordTypeVersionColumnNames.get(scope), 1L, Bytes 486 .toBytes(recordType.getVersion())); 490 versionOfRTField = 1L; // For non-versioned fields the record type is always stored at version 1. 491 } 492 // Only update the recordTypeNames and versions if they have indeed changed 493 QName originalScopeRecordTypeName = originalRecord.getRecordTypeName(scope); 494 if (originalScopeRecordTypeName == null) { 495 put.add(systemColumnFamily, recordTypeIdColumnNames.get(scope), versionOfRTField, Bytes.toBytes(recordType.getId())); 496 put.add(systemColumnFamily, recordTypeVersionColumnNames.get(scope), versionOfRTField, Bytes.toBytes(recordType.getVersion())); 487 497 } else { 488 put.add(systemColumnFamily, recordTypeIdColumnNames.get(scope), version, Bytes 489 .toBytes(recordType.getId())); 490 put.add(systemColumnFamily, recordTypeVersionColumnNames.get(scope), version, Bytes 491 .toBytes(recordType.getVersion())); 492 498 RecordType originalScopeRecordType = typeManager.getRecordTypeByName(originalScopeRecordTypeName, originalRecord.getRecordTypeVersion(scope)); 499 if (!recordType.getId().equals(originalScopeRecordType.getId())) { 500 put.add(systemColumnFamily, recordTypeIdColumnNames.get(scope), versionOfRTField, Bytes.toBytes(recordType.getId())); 501 } 502 if (!recordType.getVersion().equals(originalScopeRecordType.getVersion())) { 503 put.add(systemColumnFamily, recordTypeVersionColumnNames.get(scope), versionOfRTField, Bytes.toBytes(recordType.getVersion())); 504 } 493 505 } 494 506 record.setRecordType(scope, recordType.getName(), recordType.getVersion()); … … 497 509 } 498 510 511 // Returns a map (fieldname -> field) of all fields that are indicated by the record to be updated. 512 // The map also includes the fields that need to be deleted. Their name is mapped onto the delete marker. 499 513 private Map<QName, Object> getFieldsToUpdate(Record record) { 500 514 // Work with a copy of the map … … 507 521 } 508 522 523 // Checks for each field if it is different from its previous value and indeed needs to be updated. 509 524 private Set<Scope> calculateUpdateFields(Map<QName, Object> fields, Map<QName, Object> originalFields, Map<QName, Object> originalNextFields, 510 525 Long version, Put put, RecordEvent recordEvent, Set<BlobReference> referencedBlobs, Set<BlobReference> unReferencedBlobs, boolean mutableUpdate) throws InterruptedException, FieldTypeNotFoundException, TypeException, BlobException, RecordTypeNotFoundException, RecordException { … … 544 559 } else { 545 560 put.add(dataColumnFamily, fieldIdAsBytes, version, encodedFieldValue); 561 // If it is a mutable update and the next version of the field was the same as the one that is being updated, 562 // the original value needs to be copied to that next version (due to sparseness of the table). 546 563 if (originalNextFields != null && !fieldIsNewOrDeleted && originalNextFields.containsKey(fieldName)) { 547 564 copyValueToNextVersionIfNeeded(version, put, originalNextFields, fieldName, originalValue); … … 591 608 rowLock = lockRow(recordId); 592 609 593 Record originalRecord = read(recordId, version, null, new ReadContext());594 595 // Update the mutable fields596 Put put = new Put(recordId.toBytes());597 610 Map<QName, Object> fields = getFieldsToUpdate(record); 598 611 fields = filterMutableFields(fields); 612 613 Record originalRecord = read(recordId, version, null, new ReadContext()); 599 614 Map<QName, Object> originalFields = filterMutableFields(originalRecord.getFields()); 615 616 Record originalNextRecord = null; 617 Map<QName, Object> originalNextFields = null; 618 try { 619 originalNextRecord = read(recordId, version + 1, null, new ReadContext()); 620 originalNextFields = filterMutableFields(originalNextRecord.getFields()); 621 } catch (VersionNotFoundException e) { 622 // There is no next version of the record 623 } 624 625 Put put = new Put(recordId.toBytes()); 600 626 Set<BlobReference> referencedBlobs = new HashSet<BlobReference>(); 601 627 Set<BlobReference> unReferencedBlobs = new HashSet<BlobReference>(); 628 602 629 RecordEvent recordEvent = new RecordEvent(); 603 630 recordEvent.setType(Type.UPDATE); 604 631 recordEvent.setVersionUpdated(version); 605 632 606 Set<Scope> changedScopes = calculateUpdateFields(fields, originalFields, getOriginalNextFields(recordId, version), version, put, recordEvent, referencedBlobs, unReferencedBlobs, true); 633 634 Set<Scope> changedScopes = calculateUpdateFields(fields, originalFields, originalNextFields, version, put, recordEvent, referencedBlobs, unReferencedBlobs, true); 607 635 for (BlobReference referencedBlob : referencedBlobs) { 608 636 referencedBlob.setRecordId(recordId); … … 613 641 614 642 if (!changedScopes.isEmpty()) { 643 // Update the record types 644 615 645 // If no record type is specified explicitly, use the current one of the non-versioned scope 616 646 QName recordTypeName = record.getRecordTypeName() != null ? record.getRecordTypeName() : originalRecord.getRecordTypeName(); … … 625 655 RecordType recordType = typeManager.getRecordTypeByName(recordTypeName, recordTypeVersion); 626 656 627 // Update the mutable record type 628 Scope scope = Scope.VERSIONED_MUTABLE;629 newRecord.setRecordType( scope, recordType.getName(), recordType.getVersion());657 // Update the mutable record type in the record object 658 Scope mutableScope = Scope.VERSIONED_MUTABLE; 659 newRecord.setRecordType(mutableScope, recordType.getName(), recordType.getVersion()); 630 660 661 // If the record type changed, update it on the record table 662 QName originalMutableScopeRecordTypeName = originalRecord.getRecordTypeName(mutableScope); 663 if (originalMutableScopeRecordTypeName == null) { // There was no initial mutable record type yet 664 put.add(systemColumnFamily, recordTypeIdColumnNames.get(mutableScope), version, Bytes.toBytes(recordType.getId())); 665 put.add(systemColumnFamily, recordTypeVersionColumnNames.get(mutableScope), version, Bytes.toBytes(recordType.getVersion())); 666 } else { 667 RecordType originalMutableScopeRecordType = typeManager.getRecordTypeByName(originalMutableScopeRecordTypeName, originalRecord.getRecordTypeVersion(mutableScope)); 668 if (!recordType.getId().equals(originalMutableScopeRecordType.getId())) { 669 // If the next record version had the same record type name, copy the original value to that one 670 if (originalNextRecord != null && originalMutableScopeRecordType.getName().equals(originalNextRecord.getRecordTypeName(mutableScope))) { 671 put.add(systemColumnFamily, recordTypeIdColumnNames.get(mutableScope), version+1, Bytes.toBytes(originalMutableScopeRecordType.getId())); 672 } 673 put.add(systemColumnFamily, recordTypeIdColumnNames.get(mutableScope), version, Bytes.toBytes(recordType.getId())); 674 } 675 if (!recordType.getVersion().equals(originalMutableScopeRecordType.getVersion())) { 676 // If the next record version had the same record type version, copy the original value to that one 677 if (originalNextRecord != null && originalMutableScopeRecordType.getVersion().equals(originalNextRecord.getRecordTypeVersion(mutableScope))) { 678 put.add(systemColumnFamily, recordTypeIdColumnNames.get(mutableScope), version+1, Bytes.toBytes(originalMutableScopeRecordType.getVersion())); 679 } 680 put.add(systemColumnFamily, recordTypeVersionColumnNames.get(mutableScope), version, Bytes.toBytes(recordType.getVersion())); 681 } 682 } 683 684 // Validate if the new values for the record are valid wrt the recordType (e.g. mandatory fields) 631 685 validateRecord(newRecord, originalRecord, recordType); 632 633 put.add(systemColumnFamily, recordTypeIdColumnNames.get(scope), version, Bytes634 .toBytes(recordType.getId()));635 put.add(systemColumnFamily, recordTypeVersionColumnNames.get(scope), version, Bytes636 .toBytes(recordType.getVersion()));637 686 638 687 recordEvent.setVersionUpdated(version); … … 701 750 } 702 751 703 private Map<QName, Object> getOriginalNextFields(RecordId recordId, Long version)704 throws RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException,705 TypeException, RecordNotFoundException, InterruptedException {706 try {707 Record originalNextRecord = read(recordId, version + 1, null, new ReadContext());708 return filterMutableFields(originalNextRecord.getFields());709 } catch (VersionNotFoundException exception) {710 // There is no next record711 return null;712 }713 }714 715 752 public Record read(RecordId recordId) throws RecordNotFoundException, RecordTypeNotFoundException, 716 753 FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException, … … 844 881 // extracted 845 882 Pair<String, Long> recordTypePair = extractRecordType(Scope.NON_VERSIONED, result, null, record); 846 RecordType recordType = typeManager.getRecordTypeById(recordTypePair.getV1(), recordTypePair.getV2()); 847 record.setRecordType(recordType.getName(), recordType.getVersion()); 848 readContext.setRecordTypeId(Scope.NON_VERSIONED, recordType); 883 if (recordTypePair != null) { 884 RecordType recordType = typeManager.getRecordTypeById(recordTypePair.getV1(), recordTypePair.getV2()); 885 record.setRecordType(recordType.getName(), recordType.getVersion()); 886 readContext.setRecordTypeId(Scope.NON_VERSIONED, recordType); 887 } 849 888 } 850 889 return record; … … 915 954 if (version == null) { 916 955 // Get latest version 917 return new Pair<String, Long>(Bytes.toString(result.getValue(systemColumnFamily, 918 recordTypeIdColumnNames.get(scope))), Bytes.toLong(result.getValue(systemColumnFamily, 919 recordTypeVersionColumnNames.get(scope)))); 956 byte[] idBytes = result.getValue(systemColumnFamily, 957 recordTypeIdColumnNames.get(scope)); 958 byte[] versionBytes = result.getValue(systemColumnFamily, 959 recordTypeVersionColumnNames.get(scope)); 960 if ((idBytes == null || idBytes.length == 0) || (versionBytes == null || versionBytes.length == 0)) 961 return null; 962 return new Pair<String, Long>(Bytes.toString(idBytes), Bytes.toLong(versionBytes)); 920 963 921 964 } else { … … 932 975 byte[] recordTypeIdColumnName = recordTypeIdColumnNames.get(scope); 933 976 byte[] recordTypeVersionColumnName = recordTypeVersionColumnNames.get(scope); 934 Entry<Long, byte[]> ceilingEntry = versionableSystemCFversions.get(recordTypeIdColumnName) 935 .ceilingEntry(version); 977 Entry<Long, byte[]> idCeilingEntry = null; 978 NavigableMap<Long, byte[]> recordTypeIdMap = versionableSystemCFversions.get(recordTypeIdColumnName); 979 if (recordTypeIdMap != null) { 980 idCeilingEntry = recordTypeIdMap.ceilingEntry(version); 981 } 936 982 String recordTypeId = null; 937 if (ceilingEntry != null) { 938 recordTypeId = Bytes.toString(ceilingEntry.getValue()); 939 } 983 if (idCeilingEntry == null) 984 return null; 985 recordTypeId = Bytes.toString(idCeilingEntry.getValue()); 986 940 987 Long recordTypeVersion = null; 941 ceilingEntry = versionableSystemCFversions.get(recordTypeVersionColumnName).ceilingEntry(version); 942 if (ceilingEntry != null) { 943 recordTypeVersion = Bytes.toLong(ceilingEntry.getValue()); 944 } 988 Entry<Long, byte[]> versionCeilingEntry = null; 989 NavigableMap<Long, byte[]> recordTypeVersionMap = versionableSystemCFversions.get(recordTypeVersionColumnName); 990 if (recordTypeVersionMap != null) { 991 versionCeilingEntry = recordTypeVersionMap.ceilingEntry(version); 992 } 993 if (versionCeilingEntry == null) 994 return null; 995 recordTypeVersion = Bytes.toLong(versionCeilingEntry.getValue()); 945 996 Pair<String, Long> recordType = new Pair<String, Long>(recordTypeId, recordTypeVersion); 946 997 return recordType; … … 1021 1072 record.setField(field.getV1(), field.getV2()); 1022 1073 } 1023 for (Scope readScope : context.getScopes()) { 1024 Pair<String, Long> recordTypePair = extractRecordType(readScope, result, version, record); 1025 RecordType recordType = typeManager.getRecordTypeById(recordTypePair.getV1(), recordTypePair.getV2()); 1026 record.setRecordType(readScope, recordType.getName(), recordType.getVersion()); 1027 context.setRecordTypeId(readScope, recordType); 1074 for (Scope scope : Scope.values()) { 1075 Pair<String, Long> recordTypePair = extractRecordType(scope, result, version, record); 1076 if (recordTypePair != null) { 1077 RecordType recordType = typeManager.getRecordTypeById(recordTypePair.getV1(), recordTypePair.getV2()); 1078 record.setRecordType(scope, recordType.getName(), recordType.getVersion()); 1079 context.setRecordTypeId(scope, recordType); 1080 } 1028 1081 } 1029 1082 return true; … … 1167 1220 1168 1221 private Set<BlobReference> getReferencedBlobs(FieldTypeImpl fieldType, Object value) throws BlobException { 1169 fieldType.getValueType().getValues(value);1170 1222 HashSet<BlobReference> referencedBlobs = new HashSet<BlobReference>(); 1171 1223 ValueType valueType = fieldType.getValueType(); -
trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AbstractRepositoryTest.java
r4645 r4654 900 900 @Test 901 901 public void testUpdateMutableFieldWithNewRecordType() throws Exception { 902 Record record = createDefaultRecord(); 902 // Create default record 903 Record record = createDefaultRecord(); 904 905 // Update the record, creates a second version 903 906 Record updateRecord = record.clone(); 904 907 updateRecord.setField(fieldType1.getName(), "value2"); 905 908 updateRecord.setField(fieldType2.getName(), 789); 906 909 updateRecord.setField(fieldType3.getName(), false); 907 repository.update(updateRecord); 908 910 Record updatedRecord = repository.update(updateRecord, false, false); 911 912 // Read the first version of the record 913 Record readRecord = repository.read(record.getId(), Long.valueOf(1)); 914 assertEquals(recordType1.getName(), readRecord.getRecordTypeName()); 915 assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion()); 916 assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.NON_VERSIONED)); 917 assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion(Scope.NON_VERSIONED)); 918 assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.VERSIONED)); 919 assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion(Scope.VERSIONED)); 920 assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.VERSIONED_MUTABLE)); 921 assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion(Scope.VERSIONED_MUTABLE)); 922 923 // Do a mutable update of the first version of the record, change the record type 909 924 Record updateMutableRecord = repository.newRecord(record.getId()); 910 925 updateMutableRecord.setVersion(Long.valueOf(1)); … … 914 929 updateMutableRecord.setField(fieldType6.getName(), "value3"); 915 930 916 assertEquals(Long.valueOf(1), repository.update(updateMutableRecord, true, false).getVersion()); 917 918 Record readRecord = repository.read(record.getId(), Long.valueOf(1)); 931 Record updatedMutableRecord = repository.update(updateMutableRecord, true, false); 932 assertEquals(Long.valueOf(1), updatedMutableRecord.getVersion()); 933 934 // Read the first version of the record again 935 readRecord = repository.read(record.getId(), Long.valueOf(1)); 919 936 assertEquals(Long.valueOf(1), readRecord.getVersion()); 920 937 assertEquals("value2", readRecord.getField(fieldType1.getName())); … … 938 955 } 939 956 assertEquals(recordType1.getName(), readRecord.getRecordTypeName()); 957 assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion()); 940 958 assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.NON_VERSIONED)); 959 assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion(Scope.NON_VERSIONED)); 941 960 assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.VERSIONED)); 961 assertEquals(recordType1.getVersion(), readRecord.getRecordTypeVersion(Scope.VERSIONED)); 962 // The mutable record type should have been changed 942 963 assertEquals(recordType2.getName(), readRecord.getRecordTypeName(Scope.VERSIONED_MUTABLE)); 943 964 assertEquals(recordType2.getVersion(), readRecord.getRecordTypeVersion(Scope.VERSIONED_MUTABLE)); 965 966 // Read the second version again of the record 944 967 readRecord = repository.read(record.getId()); 945 968 assertEquals(Long.valueOf(2), readRecord.getVersion()); … … 951 974 assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.NON_VERSIONED)); 952 975 assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.VERSIONED)); 976 // The original mutable record type should have been copied to the next version 953 977 assertEquals(recordType1.getName(), readRecord.getRecordTypeName(Scope.VERSIONED_MUTABLE)); 954 978 }
Note: See TracChangeset
for help on using the changeset viewer.