Changeset 4148
- Timestamp:
- 2010-07-16 12:18:47 (3 years ago)
- Location:
- projects/lily/trunk
- Files:
-
- 5 edited
-
repository/impl/src/main/java/org/lilycms/repository/impl/HBaseRepository.java (modified) (10 diffs)
-
repository/impl/src/test/java/org/lilycms/repository/impl/test/AbstractRepositoryTest.java (modified) (3 diffs)
-
repository/impl/src/test/java/org/lilycms/repository/impl/test/AvroRepositoryTest.java (modified) (1 diff)
-
repository/impl/src/test/java/org/lilycms/repository/impl/test/HBaseRepositoryTest.java (modified) (1 diff)
-
testfw/src/main/java/org/lilycms/testfw/HBaseProxy.java (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
projects/lily/trunk/repository/impl/src/main/java/org/lilycms/repository/impl/HBaseRepository.java
r4146 r4148 84 84 private static final byte[] CURRENT_VERSION_COLUMN_NAME = Bytes.toBytes("$CurrentVersion"); 85 85 private static final byte[] LOCK_COLUMN_NAME = Bytes.toBytes("$Lock"); 86 private static final byte[] DELETED_COLUMN_NAME = Bytes.toBytes("$Deleted"); 86 87 private static final byte[] NON_VERSIONED_RECORDTYPEID_COLUMN_NAME = Bytes.toBytes("$NonVersionableRecordTypeId"); 87 88 private static final byte[] NON_VERSIONED_RECORDTYPEVERSION_COLUMN_NAME = Bytes … … 212 213 checkCreatePreconditions(record); 213 214 214 // Creating an existing record is not allowed. Use update instead. 215 if (recordTable.exists(new Get(rowId, rowLock))) { 216 throw new RecordExistsException(newRecord); 217 } 218 215 // Check if a previous incarnation of the record existed and clear data if needed 216 reincarnateRecord(newRecord, rowId, rowLock); 217 219 218 Record dummyOriginalRecord = newRecord(); 220 219 Put put = new Put(newRecord.getId().toBytes(), rowLock); 220 put.add(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME, Bytes.toBytes(false)); 221 221 RecordEvent recordEvent = new RecordEvent(); 222 222 recordEvent.setType(Type.CREATE); … … 272 272 } 273 273 274 private void reincarnateRecord(Record newRecord, byte[] rowId, RowLock rowLock) throws IOException, 275 RecordExistsException { 276 Get get = new Get(rowId, rowLock); 277 if (recordTable.exists(get)) { 278 get.addColumn(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME); 279 get.addColumn(systemColumnFamilies.get(Scope.NON_VERSIONED), CURRENT_VERSION_COLUMN_NAME); 280 Result result = recordTable.get(get); 281 if (result == null || result.isEmpty()) { 282 throw new RecordExistsException(newRecord); 283 } else { 284 byte[] deleted = result.getValue(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME); 285 if ((deleted != null) && (Bytes.toBoolean(deleted))) { 286 if (null != result.getValue(systemColumnFamilies.get(Scope.NON_VERSIONED), CURRENT_VERSION_COLUMN_NAME)) { 287 clearData(rowId, rowLock); 288 } 289 } else { 290 throw new RecordExistsException(newRecord); 291 } 292 } 293 } 294 } 295 296 private void clearData(byte[] rowId, RowLock rowLock) throws IOException { 297 Delete delete = new Delete(rowId, -1, rowLock); 298 delete.deleteFamily(columnFamilies.get(Scope.NON_VERSIONED)); 299 delete.deleteFamily(columnFamilies.get(Scope.VERSIONED)); 300 delete.deleteFamily(columnFamilies.get(Scope.VERSIONED_MUTABLE)); 301 delete.deleteColumns(systemColumnFamilies.get(Scope.NON_VERSIONED), CURRENT_VERSION_COLUMN_NAME); 302 delete.deleteColumns(systemColumnFamilies.get(Scope.NON_VERSIONED), NON_VERSIONED_RECORDTYPEID_COLUMN_NAME); 303 delete.deleteColumns(systemColumnFamilies.get(Scope.NON_VERSIONED), NON_VERSIONED_RECORDTYPEVERSION_COLUMN_NAME); 304 delete.deleteColumns(systemColumnFamilies.get(Scope.VERSIONED), VERSIONED_RECORDTYPEID_COLUMN_NAME); 305 delete.deleteColumns(systemColumnFamilies.get(Scope.VERSIONED), VERSIONED_RECORDTYPEVERSION_COLUMN_NAME); 306 delete.deleteColumns(systemColumnFamilies.get(Scope.VERSIONED), VERSIONED_MUTABLE_RECORDTYPEID_COLUMN_NAME); 307 delete.deleteColumns(systemColumnFamilies.get(Scope.VERSIONED), VERSIONED_MUTABLE_RECORDTYPEVERSION_COLUMN_NAME); 308 recordTable.delete(delete); 309 } 310 274 311 private void checkCreatePreconditions(Record record) throws InvalidRecordException { 275 312 ArgumentValidator.notNull(record, "record"); … … 325 362 326 363 } catch (IOException e) { 327 throw new RecordException("Exception occurred while updating record <" + recordId + "> in HBase table",364 throw new RecordException("Exception occurred while updating record <" + recordId + "> on HBase table", 328 365 e); 329 366 } finally { … … 344 381 if (record.getRecordTypeId() == null) { 345 382 throw new InvalidRecordException(record, "The recordType cannot be null for a record to be updated."); 346 }347 Get get = new Get(record.getId().toBytes());348 try {349 if (!recordTable.exists(get)) {350 throw new RecordNotFoundException(record);351 }352 } catch (IOException e) {353 throw new RecordException("Exception occurred while retrieving original record <" + record.getId()354 + "> from HBase table", e);355 383 } 356 384 } … … 703 731 RecordException, VersionNotFoundException, TypeException { 704 732 ArgumentValidator.notNull(recordId, "recordId"); 705 Record record = newRecord(); 706 record.setId(recordId); 707 708 Get get = new Get(recordId.toBytes()); 709 if (requestedVersion != null) { 710 get.setMaxVersions(); 711 } 712 // Add the columns for the fields to get 713 addFieldsToGet(get, fields); 714 Result result; 715 try { 716 if (!recordTable.exists(new Get(recordId.toBytes()))) { 717 throw new RecordNotFoundException(record); 718 } 719 // Retrieve the data from the repository 720 result = recordTable.get(get); 721 } catch (IOException e) { 722 throw new RecordException("Exception occurred while retrieving record <" + recordId 723 + "> from HBase table", e); 724 } 733 734 Result result = getRow(recordId, requestedVersion, fields); 735 736 Record record = newRecord(recordId); 725 737 726 738 // Set retrieved version on the record 739 extractVersion(requestedVersion, result, record); 740 741 // Extract the actual fields from the retrieved data 742 if (extractFields(result, record.getVersion(), record, readContext)) { 743 // Set the recordType explicitly in case only versioned fields were 744 // extracted 745 Pair<String, Long> recordTypePair = extractRecordType(Scope.NON_VERSIONED, result, null, record); 746 record.setRecordType(recordTypePair.getV1(), recordTypePair.getV2()); 747 } 748 return record; 749 } 750 751 private void extractVersion(Long requestedVersion, Result result, Record record) throws VersionNotFoundException { 727 752 byte[] latestVersionBytes = result.getValue(systemColumnFamilies.get(Scope.NON_VERSIONED), 728 753 CURRENT_VERSION_COLUMN_NAME); … … 736 761 record.setVersion(latestVersion); 737 762 } 738 739 // Extract the actual fields from the retrieved data 740 if (extractFields(result, record.getVersion(), record, readContext)) { 741 // Set the recordType explicitly in case only versioned fields were 742 // extracted 743 Pair<String, Long> recordTypePair = extractRecordType(Scope.NON_VERSIONED, result, null, record); 744 record.setRecordType(recordTypePair.getV1(), recordTypePair.getV2()); 745 } 746 return record; 763 } 764 765 // Retrieves the row from the table and check if it exists and has not been flagged as deleted 766 private Result getRow(RecordId recordId, Long requestedVersion, List<FieldType> fields) 767 throws RecordNotFoundException, RecordException { 768 Result result; 769 Get get = new Get(recordId.toBytes()); 770 if (requestedVersion != null) { 771 get.setMaxVersions(); 772 } 773 // Add the columns for the fields to get 774 addFieldsToGet(get, fields); 775 try { 776 if (!recordTable.exists(new Get(recordId.toBytes()))) { 777 throw new RecordNotFoundException(newRecord(recordId)); 778 } 779 780 // Retrieve the data from the repository 781 result = recordTable.get(get); 782 783 // Check if the record was deleted 784 byte[] deleted = result.getValue(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME); 785 if ((deleted != null) && (Bytes.toBoolean(deleted))) { 786 throw new RecordNotFoundException(newRecord(recordId)); 787 } 788 } catch (IOException e) { 789 throw new RecordException("Exception occurred while retrieving record <" + recordId 790 + "> from HBase table", e); 791 } 792 return result; 793 } 794 795 private boolean recordExists(byte[] rowId, RowLock rowLock) throws IOException { 796 Get get = new Get(rowId, rowLock); 797 if (!recordTable.exists(get)) return false; 798 799 get.addColumn(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME); 800 Result result = recordTable.get(get); 801 if (result == null || result.isEmpty()) { 802 return true; 803 } else { 804 byte[] deleted = result.getValue(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME); 805 if ((deleted != null) && (Bytes.toBoolean(deleted))) { 806 return false; 807 } else { 808 return true; 809 } 810 } 747 811 } 748 812 … … 829 893 } 830 894 831 private void addFieldsToGet(Get get, List<FieldType> fields) throws RecordNotFoundException, 832 FieldTypeNotFoundException, RecordTypeNotFoundException, RecordException { 895 private void addFieldsToGet(Get get, List<FieldType> fields) { 833 896 boolean added = false; 834 897 if (fields != null) { … … 846 909 847 910 private void addSystemColumnsToGet(Get get) { 911 get.addColumn(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME); 848 912 get.addColumn(systemColumnFamilies.get(Scope.NON_VERSIONED), CURRENT_VERSION_COLUMN_NAME); 849 913 get.addColumn(systemColumnFamilies.get(Scope.NON_VERSIONED), NON_VERSIONED_RECORDTYPEID_COLUMN_NAME); … … 889 953 public void delete(RecordId recordId) throws RecordException { 890 954 ArgumentValidator.notNull(recordId, "recordId"); 891 Delete delete = new Delete(recordId.toBytes()); 955 956 org.lilycms.repository.impl.lock.RowLock rowLock = null; 957 byte[] rowId = recordId.toBytes(); 892 958 try { 893 recordTable.delete(delete); 959 // Take Custom Lock 960 rowLock = rowLocker.lockRow(rowId); 961 if (rowLock == null) 962 throw new RecordException("Failed to lock row while updating record <" + recordId 963 + "> in HBase table", null); 964 965 if (recordExists(rowId, null)) { // Check if the record exists in the first place 966 Put put = new Put(rowId); 967 put.add(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME, Bytes.toBytes(true)); 968 RecordEvent recordEvent = new RecordEvent(); 969 recordEvent.setType(Type.DELETE); 970 971 RowLogMessage walMessage = wal.putMessage(recordId.toBytes(), null, recordEvent.toJsonBytes(), put); 972 if (!rowLocker.put(put, rowLock)) { 973 throw new RecordException("Exception occurred while deleting record <" + recordId + "> on HBase table", null); 974 } 975 clearData(rowId, null); 976 977 if (walMessage != null) { 978 try { 979 wal.processMessage(walMessage); 980 } catch (RowLogException e) { 981 // Processing the message failed, it will be retried later. 982 } 983 } 984 } 985 } catch (RowLogException e) { 986 throw new RecordException("Exception occurred while deleting record <" + recordId 987 + "> on HBase table", e); 988 894 989 } catch (IOException e) { 895 throw new RecordException( 896 "Exception occurred while deleting record <" + recordId + "> from HBase table", e); 897 } 898 899 // TODO bruno: I put this in here as a temp fix so that the Indexer would work 900 RecordEvent recordEvent = new RecordEvent(); 901 recordEvent.setType(Type.DELETE); 902 try { 903 RowLogMessage walMessage = wal.putMessage(recordId.toBytes(), null, recordEvent.toJsonBytes(), null); 904 wal.processMessage(walMessage); 905 recordTable.delete(delete); 906 } catch (RowLogException e) { 907 throw new RuntimeException(e); 908 } catch (IOException e) { 909 throw new RuntimeException("Temporarily introduced second delete of record row failed.", e); 990 throw new RecordException("Exception occurred while deleting record <" + recordId + "> on HBase table", 991 e); 992 } finally { 993 if (rowLock != null) { 994 try { 995 rowLocker.unlockRow(rowLock); 996 } catch (IOException e) { 997 // Ignore for now 998 } 999 } 910 1000 } 911 1001 } -
projects/lily/trunk/repository/impl/src/test/java/org/lilycms/repository/impl/test/AbstractRepositoryTest.java
r4111 r4148 2 2 3 3 import static org.easymock.EasyMock.createControl; 4 import static org.junit.Assert.assertEquals; 5 import static org.junit.Assert.assertFalse; 6 import static org.junit.Assert.assertNull; 7 import static org.junit.Assert.assertTrue; 8 import static org.junit.Assert.fail; 9 4 import static org.junit.Assert.*; 10 5 import java.util.Arrays; 11 6 import java.util.HashMap; … … 19 14 import org.lilycms.repository.api.*; 20 15 import org.lilycms.repository.impl.IdGeneratorImpl; 16 import org.lilycms.testfw.HBaseProxy; 21 17 22 18 public abstract class AbstractRepositoryTest { 23 19 20 protected static final HBaseProxy HBASE_PROXY = new HBaseProxy(); 24 21 protected static IdGenerator idGenerator = new IdGeneratorImpl(); 25 22 protected static TypeManager typeManager; … … 636 633 } catch (RecordNotFoundException expected) { 637 634 } 635 try { 636 repository.update(record); 637 fail(); 638 } catch (RecordNotFoundException expected) { 639 } 640 repository.delete(record.getId()); // Deleting a record twice does not throw an exception 641 } 642 643 @Test 644 public void TestDeleteRecordCleansUpDataBeforeRecreate() throws Exception { 645 Record record = createDefaultRecord(); 646 RecordId recordId = record.getId(); 647 repository.delete(recordId); 648 649 // Work around HBASE-2256 650 HBASE_PROXY.majorCompact("recordTable", "VCF"); 651 652 record = repository.newRecord(recordId); 653 record.setRecordType(recordType2.getId(), recordType2.getVersion()); 654 record.setField(fieldType4.getName(), 555); 655 record.setField(fieldType5.getName(), false); 656 record.setField(fieldType6.getName(), "zzz"); 657 repository.create(record); 658 Record readRecord = repository.read(recordId); 659 assertEquals(Long.valueOf(1), readRecord.getVersion()); 660 try { 661 readRecord.getField(fieldType1.getName()); 662 fail(); 663 } catch (FieldNotFoundException expected) { 664 } 665 try { 666 readRecord.getField(fieldType2.getName()); 667 fail(); 668 } catch (FieldNotFoundException expected) { 669 } 670 try { 671 readRecord.getField(fieldType3.getName()); 672 fail(); 673 } catch (FieldNotFoundException expected) { 674 } 675 676 677 678 679 680 assertEquals(555, readRecord.getField(fieldType4.getName())); 681 assertFalse((Boolean)readRecord.getField(fieldType5.getName())); 682 assertEquals("zzz", readRecord.getField(fieldType6.getName())); 638 683 } 639 684 -
projects/lily/trunk/repository/impl/src/test/java/org/lilycms/repository/impl/test/AvroRepositoryTest.java
r4139 r4148 36 36 import org.lilycms.repository.impl.SizeBasedBlobStoreAccessFactory; 37 37 import org.lilycms.repository.impl.TypeManagerRemoteImpl; 38 import org.lilycms.testfw.HBaseProxy;39 38 import org.lilycms.testfw.TestHelper; 40 39 41 40 public class AvroRepositoryTest extends AbstractRepositoryTest { 42 41 private static HBaseRepository serverRepository; 43 44 private final static HBaseProxy HBASE_PROXY = new HBaseProxy();45 42 46 43 @BeforeClass -
projects/lily/trunk/repository/impl/src/test/java/org/lilycms/repository/impl/test/HBaseRepositoryTest.java
r4100 r4148 28 28 import org.lilycms.repository.impl.HBaseTypeManager; 29 29 import org.lilycms.repository.impl.SizeBasedBlobStoreAccessFactory; 30 import org.lilycms.testfw.HBaseProxy;31 30 import org.lilycms.testfw.TestHelper; 32 31 33 32 public class HBaseRepositoryTest extends AbstractRepositoryTest { 34 33 35 private final static HBaseProxy HBASE_PROXY = new HBaseProxy();36 34 @BeforeClass 37 35 public static void setUpBeforeClass() throws Exception { -
projects/lily/trunk/testfw/src/main/java/org/lilycms/testfw/HBaseProxy.java
r4137 r4148 224 224 HTable htable = new HTable(CONF, tableName); 225 225 226 byte[] tmpRowKey = Bytes.toBytes("HBaseProxyDummyRow");227 226 byte[] CF = EXPLOIT_TIMESTAMP_TABLES.get(tableName); 228 byte[] COL = Bytes.toBytes("DummyColumn"); 229 230 byte[] value = null; 231 while (value == null) { 232 Put put = new Put(tmpRowKey); 233 put.add(CF, COL, 1, new byte[] { 0 }); 234 htable.put(put); 235 236 Get get = new Get(tmpRowKey); 237 Result result = htable.get(get); 238 value = result.getValue(CF, COL); 239 if (value == null) { 240 // If the value is null, it is because the delete marker has not yet been flushed/compacted away 241 System.out.println("Waiting for flush/compact of " + tableName + " to complete"); 242 } 243 Thread.sleep(100); 244 } 227 byte[] tmpRowKey = waitForCompact(tableName, CF); 245 228 246 229 // Delete our dummy row again … … 249 232 } 250 233 234 private byte[] waitForCompact(String tableName, byte[] CF) throws IOException, InterruptedException { 235 byte[] tmpRowKey = Bytes.toBytes("HBaseProxyDummyRow"); 236 byte[] COL = Bytes.toBytes("DummyColumn"); 237 HTable htable = new HTable(CONF, tableName); 238 239 byte[] value = null; 240 while (value == null) { 241 Put put = new Put(tmpRowKey); 242 put.add(CF, COL, 1, new byte[] { 0 }); 243 htable.put(put); 244 245 Get get = new Get(tmpRowKey); 246 Result result = htable.get(get); 247 value = result.getValue(CF, COL); 248 if (value == null) { 249 // If the value is null, it is because the delete marker has not yet been flushed/compacted away 250 System.out.println("Waiting for flush/compact of " + tableName + " to complete"); 251 Thread.sleep(100); 252 } 253 } 254 return tmpRowKey; 255 } 256 257 /** Force a major compaction and wait for it to finish. 258 * This method can be used in a test to avoid issue HBASE-2256 after performing a delete operation 259 * Uses same principle as {@link #cleanTables()} 260 */ 261 public void majorCompact(String tableName, String CFName) throws Exception { 262 byte[] tmpRowKey = Bytes.toBytes("HBaseProxyDummyRow"); 263 byte[] CF = Bytes.toBytes(CFName); 264 byte[] COL = Bytes.toBytes("DummyColumn"); 265 HBaseAdmin admin = new HBaseAdmin(getConf()); 266 HTable htable = new HTable(CONF, tableName); 267 268 // Write a dummy row 269 Put put = new Put(tmpRowKey); 270 put.add(CF, COL, 1, new byte[] { 0 }); 271 htable.put(put); 272 // Delete the value again 273 Delete delete = new Delete(tmpRowKey); 274 delete.deleteColumn(CF, COL); 275 htable.delete(delete); 276 277 // Perform major compaction 278 admin.flush(tableName); 279 admin.majorCompact(tableName); 280 281 // Wait for compact to finish 282 waitForCompact(tableName, CF); 283 } 251 284 }
Note: See TracChangeset
for help on using the changeset viewer.