Changeset 4148


Ignore:
Timestamp:
2010-07-16 12:18:47 (3 years ago)
Author:
evert
Message:

Delete of a record now puts a deleted-flag and avoids that Rowlog information for that record is deleted as well.
A cleanup process will be needed for any records that are flaged as deleted and don't have any remaining rowlog tasks to perform.

Location:
projects/lily/trunk
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • projects/lily/trunk/repository/impl/src/main/java/org/lilycms/repository/impl/HBaseRepository.java

    r4146 r4148  
    8484    private static final byte[] CURRENT_VERSION_COLUMN_NAME = Bytes.toBytes("$CurrentVersion"); 
    8585    private static final byte[] LOCK_COLUMN_NAME = Bytes.toBytes("$Lock"); 
     86    private static final byte[] DELETED_COLUMN_NAME = Bytes.toBytes("$Deleted"); 
    8687    private static final byte[] NON_VERSIONED_RECORDTYPEID_COLUMN_NAME = Bytes.toBytes("$NonVersionableRecordTypeId"); 
    8788    private static final byte[] NON_VERSIONED_RECORDTYPEVERSION_COLUMN_NAME = Bytes 
     
    212213            checkCreatePreconditions(record); 
    213214 
    214             // Creating an existing record is not allowed. Use update instead. 
    215             if (recordTable.exists(new Get(rowId, rowLock))) { 
    216                 throw new RecordExistsException(newRecord); 
    217             } 
    218  
     215            // Check if a previous incarnation of the record existed and clear data if needed 
     216            reincarnateRecord(newRecord, rowId, rowLock); 
     217             
    219218            Record dummyOriginalRecord = newRecord(); 
    220219            Put put = new Put(newRecord.getId().toBytes(), rowLock); 
     220            put.add(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME, Bytes.toBytes(false)); 
    221221            RecordEvent recordEvent = new RecordEvent(); 
    222222            recordEvent.setType(Type.CREATE); 
     
    272272    } 
    273273 
     274    private void reincarnateRecord(Record newRecord, byte[] rowId, RowLock rowLock) throws IOException, 
     275            RecordExistsException { 
     276        Get get = new Get(rowId, rowLock); 
     277        if (recordTable.exists(get)) { 
     278            get.addColumn(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME); 
     279            get.addColumn(systemColumnFamilies.get(Scope.NON_VERSIONED), CURRENT_VERSION_COLUMN_NAME); 
     280            Result result = recordTable.get(get); 
     281            if (result == null || result.isEmpty()) { 
     282                throw new RecordExistsException(newRecord); 
     283            } else { 
     284                byte[] deleted = result.getValue(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME); 
     285                if ((deleted != null) && (Bytes.toBoolean(deleted))) { 
     286                    if (null != result.getValue(systemColumnFamilies.get(Scope.NON_VERSIONED), CURRENT_VERSION_COLUMN_NAME)) { 
     287                        clearData(rowId, rowLock); 
     288                    } 
     289                } else { 
     290                    throw new RecordExistsException(newRecord); 
     291                } 
     292            } 
     293        } 
     294    } 
     295 
     296    private void clearData(byte[] rowId, RowLock rowLock) throws IOException { 
     297        Delete delete = new Delete(rowId, -1, rowLock); 
     298        delete.deleteFamily(columnFamilies.get(Scope.NON_VERSIONED)); 
     299        delete.deleteFamily(columnFamilies.get(Scope.VERSIONED)); 
     300        delete.deleteFamily(columnFamilies.get(Scope.VERSIONED_MUTABLE)); 
     301        delete.deleteColumns(systemColumnFamilies.get(Scope.NON_VERSIONED), CURRENT_VERSION_COLUMN_NAME); 
     302        delete.deleteColumns(systemColumnFamilies.get(Scope.NON_VERSIONED), NON_VERSIONED_RECORDTYPEID_COLUMN_NAME); 
     303        delete.deleteColumns(systemColumnFamilies.get(Scope.NON_VERSIONED), NON_VERSIONED_RECORDTYPEVERSION_COLUMN_NAME); 
     304        delete.deleteColumns(systemColumnFamilies.get(Scope.VERSIONED), VERSIONED_RECORDTYPEID_COLUMN_NAME); 
     305        delete.deleteColumns(systemColumnFamilies.get(Scope.VERSIONED), VERSIONED_RECORDTYPEVERSION_COLUMN_NAME); 
     306        delete.deleteColumns(systemColumnFamilies.get(Scope.VERSIONED), VERSIONED_MUTABLE_RECORDTYPEID_COLUMN_NAME); 
     307        delete.deleteColumns(systemColumnFamilies.get(Scope.VERSIONED), VERSIONED_MUTABLE_RECORDTYPEVERSION_COLUMN_NAME); 
     308        recordTable.delete(delete); 
     309    } 
     310 
    274311    private void checkCreatePreconditions(Record record) throws InvalidRecordException { 
    275312        ArgumentValidator.notNull(record, "record"); 
     
    325362 
    326363        } catch (IOException e) { 
    327             throw new RecordException("Exception occurred while updating record <" + recordId + "> in HBase table", 
     364            throw new RecordException("Exception occurred while updating record <" + recordId + "> on HBase table", 
    328365                    e); 
    329366        } finally { 
     
    344381        if (record.getRecordTypeId() == null) { 
    345382            throw new InvalidRecordException(record, "The recordType cannot be null for a record to be updated."); 
    346         } 
    347         Get get = new Get(record.getId().toBytes()); 
    348         try { 
    349             if (!recordTable.exists(get)) { 
    350                 throw new RecordNotFoundException(record); 
    351             } 
    352         } catch (IOException e) { 
    353             throw new RecordException("Exception occurred while retrieving original record <" + record.getId() 
    354                     + "> from HBase table", e); 
    355383        } 
    356384    } 
     
    703731            RecordException, VersionNotFoundException, TypeException { 
    704732        ArgumentValidator.notNull(recordId, "recordId"); 
    705         Record record = newRecord(); 
    706         record.setId(recordId); 
    707  
    708         Get get = new Get(recordId.toBytes()); 
    709         if (requestedVersion != null) { 
    710             get.setMaxVersions(); 
    711         } 
    712         // Add the columns for the fields to get 
    713         addFieldsToGet(get, fields); 
    714         Result result; 
    715         try { 
    716             if (!recordTable.exists(new Get(recordId.toBytes()))) { 
    717                 throw new RecordNotFoundException(record); 
    718             } 
    719             // Retrieve the data from the repository 
    720             result = recordTable.get(get); 
    721         } catch (IOException e) { 
    722             throw new RecordException("Exception occurred while retrieving record <" + recordId 
    723                     + "> from HBase table", e); 
    724         } 
     733 
     734        Result result = getRow(recordId, requestedVersion, fields); 
     735 
     736        Record record = newRecord(recordId); 
    725737 
    726738        // Set retrieved version on the record 
     739        extractVersion(requestedVersion, result, record); 
     740 
     741        // Extract the actual fields from the retrieved data 
     742        if (extractFields(result, record.getVersion(), record, readContext)) { 
     743            // Set the recordType explicitly in case only versioned fields were 
     744            // extracted 
     745            Pair<String, Long> recordTypePair = extractRecordType(Scope.NON_VERSIONED, result, null, record); 
     746            record.setRecordType(recordTypePair.getV1(), recordTypePair.getV2()); 
     747        } 
     748        return record; 
     749    } 
     750 
     751    private void extractVersion(Long requestedVersion, Result result, Record record) throws VersionNotFoundException { 
    727752        byte[] latestVersionBytes = result.getValue(systemColumnFamilies.get(Scope.NON_VERSIONED), 
    728753                CURRENT_VERSION_COLUMN_NAME); 
     
    736761            record.setVersion(latestVersion); 
    737762        } 
    738  
    739         // Extract the actual fields from the retrieved data 
    740         if (extractFields(result, record.getVersion(), record, readContext)) { 
    741             // Set the recordType explicitly in case only versioned fields were 
    742             // extracted 
    743             Pair<String, Long> recordTypePair = extractRecordType(Scope.NON_VERSIONED, result, null, record); 
    744             record.setRecordType(recordTypePair.getV1(), recordTypePair.getV2()); 
    745         } 
    746         return record; 
     763    } 
     764 
     765    // Retrieves the row from the table and check if it exists and has not been flagged as deleted 
     766    private Result getRow(RecordId recordId, Long requestedVersion, List<FieldType> fields) 
     767            throws RecordNotFoundException, RecordException { 
     768        Result result; 
     769        Get get = new Get(recordId.toBytes()); 
     770        if (requestedVersion != null) { 
     771            get.setMaxVersions(); 
     772        } 
     773        // Add the columns for the fields to get 
     774        addFieldsToGet(get, fields); 
     775        try { 
     776            if (!recordTable.exists(new Get(recordId.toBytes()))) { 
     777                throw new RecordNotFoundException(newRecord(recordId)); 
     778            } 
     779             
     780            // Retrieve the data from the repository 
     781            result = recordTable.get(get); 
     782             
     783            // Check if the record was deleted 
     784            byte[] deleted = result.getValue(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME); 
     785            if ((deleted != null) && (Bytes.toBoolean(deleted))) { 
     786                throw new RecordNotFoundException(newRecord(recordId)); 
     787            } 
     788        } catch (IOException e) { 
     789            throw new RecordException("Exception occurred while retrieving record <" + recordId 
     790                    + "> from HBase table", e); 
     791        } 
     792        return result; 
     793    } 
     794     
     795    private boolean recordExists(byte[] rowId, RowLock rowLock) throws IOException { 
     796        Get get = new Get(rowId, rowLock); 
     797        if (!recordTable.exists(get)) return false; 
     798         
     799        get.addColumn(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME); 
     800        Result result = recordTable.get(get); 
     801        if (result == null || result.isEmpty()) { 
     802            return true; 
     803        } else { 
     804            byte[] deleted = result.getValue(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME); 
     805            if ((deleted != null) && (Bytes.toBoolean(deleted))) { 
     806                return false; 
     807            } else { 
     808                return true; 
     809            } 
     810        } 
    747811    } 
    748812 
     
    829893    } 
    830894 
    831     private void addFieldsToGet(Get get, List<FieldType> fields) throws RecordNotFoundException, 
    832             FieldTypeNotFoundException, RecordTypeNotFoundException, RecordException { 
     895    private void addFieldsToGet(Get get, List<FieldType> fields) { 
    833896        boolean added = false; 
    834897        if (fields != null) { 
     
    846909 
    847910    private void addSystemColumnsToGet(Get get) { 
     911        get.addColumn(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME); 
    848912        get.addColumn(systemColumnFamilies.get(Scope.NON_VERSIONED), CURRENT_VERSION_COLUMN_NAME); 
    849913        get.addColumn(systemColumnFamilies.get(Scope.NON_VERSIONED), NON_VERSIONED_RECORDTYPEID_COLUMN_NAME); 
     
    889953    public void delete(RecordId recordId) throws RecordException { 
    890954        ArgumentValidator.notNull(recordId, "recordId"); 
    891         Delete delete = new Delete(recordId.toBytes()); 
     955         
     956        org.lilycms.repository.impl.lock.RowLock rowLock = null; 
     957        byte[] rowId = recordId.toBytes(); 
    892958        try { 
    893             recordTable.delete(delete); 
     959            // Take Custom Lock 
     960            rowLock = rowLocker.lockRow(rowId); 
     961            if (rowLock == null) 
     962                throw new RecordException("Failed to lock row while updating record <" + recordId 
     963                        + "> in HBase table", null); 
     964 
     965            if (recordExists(rowId, null)) { // Check if the record exists in the first place  
     966                Put put = new Put(rowId); 
     967                put.add(systemColumnFamilies.get(Scope.NON_VERSIONED), DELETED_COLUMN_NAME, Bytes.toBytes(true)); 
     968                RecordEvent recordEvent = new RecordEvent(); 
     969                recordEvent.setType(Type.DELETE); 
     970                 
     971                RowLogMessage walMessage = wal.putMessage(recordId.toBytes(), null, recordEvent.toJsonBytes(), put); 
     972                if (!rowLocker.put(put, rowLock)) { 
     973                    throw new RecordException("Exception occurred while deleting record <" + recordId + "> on HBase table", null); 
     974                } 
     975                clearData(rowId, null); 
     976     
     977                if (walMessage != null) { 
     978                    try { 
     979                        wal.processMessage(walMessage); 
     980                    } catch (RowLogException e) { 
     981                        // Processing the message failed, it will be retried later. 
     982                    } 
     983                } 
     984            } 
     985        } catch (RowLogException e) { 
     986            throw new RecordException("Exception occurred while deleting record <" + recordId 
     987                    + "> on HBase table", e); 
     988 
    894989        } catch (IOException e) { 
    895             throw new RecordException( 
    896                     "Exception occurred while deleting record <" + recordId + "> from HBase table", e); 
    897         } 
    898  
    899         // TODO bruno: I put this in here as a temp fix so that the Indexer would work 
    900         RecordEvent recordEvent = new RecordEvent(); 
    901         recordEvent.setType(Type.DELETE); 
    902         try { 
    903             RowLogMessage walMessage = wal.putMessage(recordId.toBytes(), null, recordEvent.toJsonBytes(), null); 
    904             wal.processMessage(walMessage); 
    905             recordTable.delete(delete); 
    906         } catch (RowLogException e) { 
    907             throw new RuntimeException(e); 
    908         } catch (IOException e) { 
    909             throw new RuntimeException("Temporarily introduced second delete of record row failed.", e); 
     990            throw new RecordException("Exception occurred while deleting record <" + recordId + "> on HBase table", 
     991                    e); 
     992        } finally { 
     993            if (rowLock != null) { 
     994                try { 
     995                    rowLocker.unlockRow(rowLock); 
     996                } catch (IOException e) { 
     997                    // Ignore for now 
     998                } 
     999            } 
    9101000        } 
    9111001    } 
  • projects/lily/trunk/repository/impl/src/test/java/org/lilycms/repository/impl/test/AbstractRepositoryTest.java

    r4111 r4148  
    22 
    33import static org.easymock.EasyMock.createControl; 
    4 import static org.junit.Assert.assertEquals; 
    5 import static org.junit.Assert.assertFalse; 
    6 import static org.junit.Assert.assertNull; 
    7 import static org.junit.Assert.assertTrue; 
    8 import static org.junit.Assert.fail; 
    9  
     4import static org.junit.Assert.*; 
    105import java.util.Arrays; 
    116import java.util.HashMap; 
     
    1914import org.lilycms.repository.api.*; 
    2015import org.lilycms.repository.impl.IdGeneratorImpl; 
     16import org.lilycms.testfw.HBaseProxy; 
    2117 
    2218public abstract class AbstractRepositoryTest { 
    2319 
     20    protected static final HBaseProxy HBASE_PROXY = new HBaseProxy(); 
    2421    protected static IdGenerator idGenerator = new IdGeneratorImpl(); 
    2522    protected static TypeManager typeManager; 
     
    636633        } catch (RecordNotFoundException expected) { 
    637634        } 
     635        try { 
     636            repository.update(record); 
     637            fail(); 
     638        } catch (RecordNotFoundException expected) { 
     639        } 
     640        repository.delete(record.getId()); // Deleting a record twice does not throw an exception 
     641    } 
     642     
     643    @Test 
     644    public void TestDeleteRecordCleansUpDataBeforeRecreate() throws Exception { 
     645        Record record = createDefaultRecord(); 
     646        RecordId recordId = record.getId(); 
     647        repository.delete(recordId); 
     648         
     649     // Work around HBASE-2256 
     650        HBASE_PROXY.majorCompact("recordTable", "VCF"); 
     651 
     652        record = repository.newRecord(recordId); 
     653        record.setRecordType(recordType2.getId(), recordType2.getVersion()); 
     654        record.setField(fieldType4.getName(), 555); 
     655        record.setField(fieldType5.getName(), false); 
     656        record.setField(fieldType6.getName(), "zzz"); 
     657        repository.create(record); 
     658        Record readRecord = repository.read(recordId); 
     659        assertEquals(Long.valueOf(1), readRecord.getVersion()); 
     660        try { 
     661            readRecord.getField(fieldType1.getName()); 
     662            fail(); 
     663        } catch (FieldNotFoundException expected) { 
     664        } 
     665        try { 
     666            readRecord.getField(fieldType2.getName()); 
     667            fail(); 
     668        } catch (FieldNotFoundException expected) { 
     669        } 
     670        try { 
     671            readRecord.getField(fieldType3.getName()); 
     672            fail(); 
     673        } catch (FieldNotFoundException expected) { 
     674        } 
     675         
     676         
     677         
     678         
     679         
     680        assertEquals(555, readRecord.getField(fieldType4.getName())); 
     681        assertFalse((Boolean)readRecord.getField(fieldType5.getName())); 
     682        assertEquals("zzz", readRecord.getField(fieldType6.getName())); 
    638683    } 
    639684 
  • projects/lily/trunk/repository/impl/src/test/java/org/lilycms/repository/impl/test/AvroRepositoryTest.java

    r4139 r4148  
    3636import org.lilycms.repository.impl.SizeBasedBlobStoreAccessFactory; 
    3737import org.lilycms.repository.impl.TypeManagerRemoteImpl; 
    38 import org.lilycms.testfw.HBaseProxy; 
    3938import org.lilycms.testfw.TestHelper; 
    4039 
    4140public class AvroRepositoryTest extends AbstractRepositoryTest { 
    4241    private static HBaseRepository serverRepository; 
    43  
    44     private final static HBaseProxy HBASE_PROXY = new HBaseProxy(); 
    4542 
    4643    @BeforeClass 
  • projects/lily/trunk/repository/impl/src/test/java/org/lilycms/repository/impl/test/HBaseRepositoryTest.java

    r4100 r4148  
    2828import org.lilycms.repository.impl.HBaseTypeManager; 
    2929import org.lilycms.repository.impl.SizeBasedBlobStoreAccessFactory; 
    30 import org.lilycms.testfw.HBaseProxy; 
    3130import org.lilycms.testfw.TestHelper; 
    3231 
    3332public class HBaseRepositoryTest extends AbstractRepositoryTest { 
    3433 
    35     private final static HBaseProxy HBASE_PROXY = new HBaseProxy(); 
    3634    @BeforeClass 
    3735    public static void setUpBeforeClass() throws Exception { 
  • projects/lily/trunk/testfw/src/main/java/org/lilycms/testfw/HBaseProxy.java

    r4137 r4148  
    224224            HTable htable = new HTable(CONF, tableName); 
    225225 
    226             byte[] tmpRowKey = Bytes.toBytes("HBaseProxyDummyRow"); 
    227226            byte[] CF = EXPLOIT_TIMESTAMP_TABLES.get(tableName); 
    228             byte[] COL = Bytes.toBytes("DummyColumn"); 
    229  
    230             byte[] value = null; 
    231             while (value == null) { 
    232                 Put put = new Put(tmpRowKey); 
    233                 put.add(CF, COL, 1, new byte[] { 0 }); 
    234                 htable.put(put); 
    235  
    236                 Get get = new Get(tmpRowKey); 
    237                 Result result = htable.get(get); 
    238                 value = result.getValue(CF, COL); 
    239                 if (value == null) { 
    240                     // If the value is null, it is because the delete marker has not yet been flushed/compacted away 
    241                     System.out.println("Waiting for flush/compact of " + tableName + " to complete"); 
    242                 } 
    243                 Thread.sleep(100); 
    244             } 
     227            byte[] tmpRowKey = waitForCompact(tableName, CF); 
    245228 
    246229            // Delete our dummy row again 
     
    249232    } 
    250233 
     234    private byte[] waitForCompact(String tableName, byte[] CF) throws IOException, InterruptedException { 
     235        byte[] tmpRowKey = Bytes.toBytes("HBaseProxyDummyRow"); 
     236        byte[] COL = Bytes.toBytes("DummyColumn"); 
     237        HTable htable = new HTable(CONF, tableName); 
     238 
     239        byte[] value = null; 
     240        while (value == null) { 
     241            Put put = new Put(tmpRowKey); 
     242            put.add(CF, COL, 1, new byte[] { 0 }); 
     243            htable.put(put); 
     244 
     245            Get get = new Get(tmpRowKey); 
     246            Result result = htable.get(get); 
     247            value = result.getValue(CF, COL); 
     248            if (value == null) { 
     249                // If the value is null, it is because the delete marker has not yet been flushed/compacted away 
     250                System.out.println("Waiting for flush/compact of " + tableName + " to complete"); 
     251                Thread.sleep(100); 
     252            } 
     253        } 
     254        return tmpRowKey; 
     255    } 
     256 
     257    /** Force a major compaction and wait for it to finish. 
     258     *  This method can be used in a test to avoid issue HBASE-2256 after performing a delete operation  
     259     *  Uses same principle as {@link #cleanTables()} 
     260     */  
     261    public void majorCompact(String tableName, String CFName) throws Exception { 
     262        byte[] tmpRowKey = Bytes.toBytes("HBaseProxyDummyRow"); 
     263        byte[] CF = Bytes.toBytes(CFName); 
     264        byte[] COL = Bytes.toBytes("DummyColumn"); 
     265        HBaseAdmin admin = new HBaseAdmin(getConf()); 
     266        HTable htable = new HTable(CONF, tableName); 
     267         
     268        // Write a dummy row 
     269        Put put = new Put(tmpRowKey); 
     270        put.add(CF, COL, 1, new byte[] { 0 }); 
     271        htable.put(put); 
     272        // Delete the value again 
     273        Delete delete = new Delete(tmpRowKey); 
     274        delete.deleteColumn(CF, COL); 
     275        htable.delete(delete); 
     276         
     277        // Perform major compaction 
     278        admin.flush(tableName); 
     279        admin.majorCompact(tableName); 
     280         
     281        // Wait for compact to finish 
     282        waitForCompact(tableName, CF); 
     283    } 
    251284} 
Note: See TracChangeset for help on using the changeset viewer.