Changeset 4621


Ignore:
Timestamp:
2011-02-03 08:35:21 (2 years ago)
Author:
evert
Message:

Rework of the blobstore functionality where blobs can only be accessed through the record they are used in and not directly by using their blob key.
The general idea is that, to enable introducing record-level access control in the future, blobs should only be accessed through the record they are used in (via the repository API) and not directly using their blob key.
Only in the very initial phase, where blobs are uploaded to the blobstore, can they exist without being part of a record. Before a blob can be used in a record it must have been uploaded to the blobstore. During a certain amount of time (e.g. 1 hour) the uploaded blob can then be used in a record. If after that time the blob was not used in a record it will become unavailable and will be removed from the blobstore.
Blobs can be re-used, but only within different versions (also non-sequential ones) of the same field of the same record. Blobs cannot belong to multiple records or multiple fields at the same time.

See also #41, #95

Location:
trunk
Files:
5 added
37 edited

Legend:

Unmodified
Added
Removed
  • trunk/cr/indexer/batchbuild/src/main/java/org/lilyproject/indexer/batchbuild/IndexingMapper.java

    r4611 r4621  
    8484            TypeManager typeManager = new HBaseTypeManager(idGenerator, conf, zk, hbaseTableFactory); 
    8585 
    86             BlobStoreAccessFactory blobStoreAccessFactory = LilyClient.getBlobStoreAccess(zk); 
    87  
    8886            RowLog wal = new DummyRowLog("The write ahead log should not be called from within MapReduce jobs."); 
    89             repository = new HBaseRepository(typeManager, idGenerator, blobStoreAccessFactory, wal, conf, hbaseTableFactory); 
     87             
     88            BlobManager blobManager = LilyClient.getBlobManager(zk); 
     89            repository = new HBaseRepository(typeManager, idGenerator, wal, conf, hbaseTableFactory, blobManager); 
    9090 
    9191            byte[] indexerConfBytes = Base64.decode(jobConf.get("org.lilyproject.indexer.batchbuild.indexerconf")); 
  • trunk/cr/indexer/engine/src/main/java/org/lilyproject/indexer/engine/ValueEvaluator.java

    r4592 r4621  
    9191            InputStream is = null; 
    9292            try { 
    93                 is = repository.getInputStream(blob); 
     93//                is = repository.getInputStream(blob); 
    9494 
    9595                // TODO make write limit configurable 
  • trunk/cr/indexer/engine/src/test/java/org/lilyproject/indexer/engine/test/IndexerTest.java

    r4611 r4621  
    1717 
    1818import static org.junit.Assert.assertEquals; 
    19  
    20 import org.apache.hadoop.fs.Path; 
    21 import org.lilyproject.indexer.engine.*; 
    22 import org.lilyproject.indexer.model.indexerconf.IndexerConfBuilder; 
    23 import org.lilyproject.linkindex.LinkIndexUpdater; 
    24 import org.lilyproject.rowlog.api.*; 
    25 import org.lilyproject.rowlog.impl.*; 
    26 import org.lilyproject.solrtestfw.SolrTestingUtility; 
    27 import org.lilyproject.util.hbase.HBaseTableFactory; 
    28 import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 
    29 import org.lilyproject.util.hbase.LilyHBaseSchema; 
    30 import org.lilyproject.util.io.Closer; 
    31 import org.lilyproject.util.repo.RecordEvent; 
     19import static org.lilyproject.util.repo.RecordEvent.Type.CREATE; 
     20import static org.lilyproject.util.repo.RecordEvent.Type.DELETE; 
     21import static org.lilyproject.util.repo.RecordEvent.Type.UPDATE; 
     22 
     23import java.io.ByteArrayOutputStream; 
     24import java.io.IOException; 
     25import java.io.InputStream; 
     26import java.io.OutputStream; 
     27import java.util.Arrays; 
     28import java.util.Collections; 
     29import java.util.HashMap; 
     30import java.util.List; 
     31import java.util.Map; 
    3232 
    3333import org.apache.commons.logging.Log; 
    3434import org.apache.commons.logging.LogFactory; 
     35import org.apache.hadoop.fs.Path; 
    3536import org.apache.solr.client.solrj.SolrQuery; 
    3637import org.apache.solr.client.solrj.SolrServerException; 
     
    4546import org.junit.Test; 
    4647import org.lilyproject.hbaseindex.IndexManager; 
     48import org.lilyproject.indexer.engine.IndexLocker; 
     49import org.lilyproject.indexer.engine.IndexUpdater; 
     50import org.lilyproject.indexer.engine.IndexUpdaterMetrics; 
     51import org.lilyproject.indexer.engine.Indexer; 
     52import org.lilyproject.indexer.engine.IndexerMetrics; 
     53import org.lilyproject.indexer.engine.SolrServers; 
    4754import org.lilyproject.indexer.model.indexerconf.IndexerConf; 
     55import org.lilyproject.indexer.model.indexerconf.IndexerConfBuilder; 
    4856import org.lilyproject.linkindex.LinkIndex; 
    49 import org.lilyproject.repository.api.*; 
    50 import org.lilyproject.repository.impl.*; 
    51 import org.lilyproject.util.repo.VersionTag; 
     57import org.lilyproject.linkindex.LinkIndexUpdater; 
     58import org.lilyproject.repository.api.Blob; 
     59import org.lilyproject.repository.api.BlobManager; 
     60import org.lilyproject.repository.api.BlobStoreAccess; 
     61import org.lilyproject.repository.api.FieldType; 
     62import org.lilyproject.repository.api.HierarchyPath; 
     63import org.lilyproject.repository.api.IdGenerator; 
     64import org.lilyproject.repository.api.Link; 
     65import org.lilyproject.repository.api.QName; 
     66import org.lilyproject.repository.api.Record; 
     67import org.lilyproject.repository.api.RecordId; 
     68import org.lilyproject.repository.api.RecordType; 
     69import org.lilyproject.repository.api.Scope; 
     70import org.lilyproject.repository.api.TypeManager; 
     71import org.lilyproject.repository.api.ValueType; 
     72import org.lilyproject.repository.impl.BlobManagerImpl; 
     73import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
     74import org.lilyproject.repository.impl.HBaseRepository; 
     75import org.lilyproject.repository.impl.HBaseTypeManager; 
     76import org.lilyproject.repository.impl.IdGeneratorImpl; 
     77import org.lilyproject.repository.impl.SizeBasedBlobStoreAccessFactory; 
     78import org.lilyproject.rowlog.api.RowLog; 
     79import org.lilyproject.rowlog.api.RowLogConfig; 
     80import org.lilyproject.rowlog.api.RowLogConfigurationManager; 
     81import org.lilyproject.rowlog.api.RowLogException; 
     82import org.lilyproject.rowlog.api.RowLogMessage; 
     83import org.lilyproject.rowlog.api.RowLogMessageListener; 
     84import org.lilyproject.rowlog.api.RowLogMessageListenerMapping; 
     85import org.lilyproject.rowlog.api.RowLogShard; 
     86import org.lilyproject.rowlog.api.RowLogSubscription; 
     87import org.lilyproject.rowlog.impl.RowLogConfigurationManagerImpl; 
     88import org.lilyproject.rowlog.impl.RowLogImpl; 
     89import org.lilyproject.rowlog.impl.RowLogShardImpl; 
     90import org.lilyproject.solrtestfw.SolrTestingUtility; 
    5291import org.lilyproject.testfw.HBaseProxy; 
    5392import org.lilyproject.testfw.TestHelper; 
     93import org.lilyproject.util.hbase.HBaseTableFactory; 
     94import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 
     95import org.lilyproject.util.hbase.LilyHBaseSchema; 
     96import org.lilyproject.util.hbase.LilyHBaseSchema.RecordCf; 
     97import org.lilyproject.util.io.Closer; 
     98import org.lilyproject.util.repo.RecordEvent; 
     99import org.lilyproject.util.repo.VersionTag; 
    54100import org.lilyproject.util.zookeeper.ZkUtil; 
    55101import org.lilyproject.util.zookeeper.ZooKeeperItf; 
    56  
    57 import static org.lilyproject.util.repo.RecordEvent.Type.*; 
    58 import static org.lilyproject.util.hbase.LilyHBaseSchema.*; 
    59  
    60 import java.io.ByteArrayOutputStream; 
    61 import java.io.IOException; 
    62 import java.io.InputStream; 
    63 import java.io.OutputStream; 
    64 import java.util.*; 
    65102 
    66103// To run this test from an IDE, set a property solr.war pointing to the SOLR war 
     
    130167        RowLogShard walShard = new RowLogShardImpl("WS1", HBASE_PROXY.getConf(), wal, 100); 
    131168        wal.registerShard(walShard); 
    132  
    133         repository = new HBaseRepository(typeManager, idGenerator, blobStoreAccessFactory, wal, HBASE_PROXY.getConf(), hbaseTableFactory); 
     169        BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 
     170        repository = new HBaseRepository(typeManager, idGenerator, wal, HBASE_PROXY.getConf(), hbaseTableFactory, blobManager); 
    134171 
    135172        IndexManager.createIndexMetaTableIfNotExists(HBASE_PROXY.getConf()); 
  • trunk/cr/linkindex/src/test/java/org/lilyproject/linkindex/test/LinkIndexTest.java

    r4611 r4621  
    3232import org.lilyproject.linkindex.LinkIndex; 
    3333import org.lilyproject.linkindex.LinkIndexUpdater; 
     34import org.lilyproject.repository.api.BlobManager; 
    3435import org.lilyproject.repository.api.BlobStoreAccess; 
    3536import org.lilyproject.repository.api.FieldType; 
     
    4243import org.lilyproject.repository.api.Scope; 
    4344import org.lilyproject.repository.api.TypeManager; 
     45import org.lilyproject.repository.impl.BlobManagerImpl; 
    4446import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
    4547import org.lilyproject.repository.impl.HBaseRepository; 
     
    9496        BlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 
    9597        SizeBasedBlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 
     98        BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 
    9699 
    97100        rowLogConfMgr = new RowLogConfigurationManagerImpl(zk); 
     
    103106        RowLogShard walShard = new RowLogShardImpl("WS1", HBASE_PROXY.getConf(), wal, 100); 
    104107        wal.registerShard(walShard); 
    105  
    106         repository = new HBaseRepository(typeManager, idGenerator, blobStoreAccessFactory, wal, HBASE_PROXY.getConf(), hbaseTableFactory); 
     108        repository = new HBaseRepository(typeManager, idGenerator, wal, HBASE_PROXY.getConf(), hbaseTableFactory, blobManager); 
    107109        ids = repository.getIdGenerator(); 
    108110        IndexManager indexManager = new IndexManager(HBASE_PROXY.getConf()); 
  • trunk/cr/process/client/src/main/java/org/lilyproject/client/LilyClient.java

    r4614 r4621  
    2020import java.net.InetSocketAddress; 
    2121import java.net.URI; 
    22 import java.util.*; 
     22import java.util.ArrayList; 
     23import java.util.Collections; 
     24import java.util.HashSet; 
     25import java.util.Iterator; 
     26import java.util.List; 
     27import java.util.Set; 
    2328 
    2429import org.apache.commons.logging.Log; 
     
    3237import org.apache.zookeeper.Watcher; 
    3338import org.apache.zookeeper.data.Stat; 
     39import org.lilyproject.repository.api.BlobManager; 
    3440import org.lilyproject.repository.api.BlobStoreAccess; 
    35 import org.lilyproject.repository.api.BlobStoreAccessFactory; 
    3641import org.lilyproject.repository.api.Repository; 
    3742import org.lilyproject.repository.avro.AvroConverter; 
     43import org.lilyproject.repository.impl.BlobManagerImpl; 
    3844import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
    3945import org.lilyproject.repository.impl.HBaseBlobStoreAccess; 
     
    4349import org.lilyproject.repository.impl.RemoteTypeManager; 
    4450import org.lilyproject.repository.impl.SizeBasedBlobStoreAccessFactory; 
     51import org.lilyproject.util.hbase.HBaseTableFactory; 
     52import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 
    4553import org.lilyproject.util.io.Closer; 
    4654import org.lilyproject.util.repo.DfsUri; 
     
    156164                remoteConverter, idGenerator, zk); 
    157165         
    158         BlobStoreAccessFactory blobStoreAccessFactory = getBlobStoreAccess(zk); 
     166        BlobManager blobManager = getBlobManager(zk); 
    159167         
    160168        Repository repository = new RemoteRepository(parseAddressAndPort(server.lilyAddressAndPort), 
    161                 remoteConverter, typeManager, idGenerator, blobStoreAccessFactory); 
     169                remoteConverter, typeManager, idGenerator, blobManager); 
    162170         
    163171        remoteConverter.setRepository(repository); 
     
    165173        server.repository = repository; 
    166174    } 
    167  
    168     public static BlobStoreAccessFactory getBlobStoreAccess(ZooKeeperItf zk) throws IOException { 
     175     
     176    public static BlobManager getBlobManager(ZooKeeperItf zk) throws IOException { 
    169177        Configuration configuration = HBaseConfiguration.create(); 
    170178        configuration.set("hbase.zookeeper.quorum", getBlobHBaseZkQuorum(zk)); 
    171179        configuration.set("hbase.zookeeper.property.clientPort", getBlobHBaseZkPort(zk)); 
    172  
     180        HBaseTableFactory hbaseTableFactory = new HBaseTableFactoryImpl(configuration); 
     181         
    173182        URI dfsUri = getDfsUri(zk); 
    174183        FileSystem fs = FileSystem.get(DfsUri.getBaseDfsUri(dfsUri), configuration); 
     
    181190        blobStoreAccessFactory.addBlobStoreAccess(5000, inlineBlobStoreAccess); 
    182191        blobStoreAccessFactory.addBlobStoreAccess(200000, hbaseBlobStoreAccess); 
    183         return blobStoreAccessFactory; 
     192         
     193        return new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 
    184194    } 
    185195     
  • trunk/cr/process/server/repository-module/src/main/java/org/lilyproject/server/modules/repository/BlobStoreConfig.java

    r4611 r4621  
    2222import org.apache.hadoop.fs.FileSystem; 
    2323import org.apache.hadoop.fs.Path; 
     24import org.lilyproject.repository.api.BlobManager; 
    2425import org.lilyproject.repository.api.BlobStoreAccess; 
    25 import org.lilyproject.repository.api.BlobStoreAccessFactory; 
     26import org.lilyproject.repository.impl.BlobManagerImpl; 
    2627import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
    2728import org.lilyproject.repository.impl.HBaseBlobStoreAccess; 
     
    3233 
    3334public class BlobStoreConfig { 
    34     static BlobStoreAccessFactory get(URI dfsUri, Configuration configuration, HBaseTableFactory tableFactory) 
    35             throws IOException { 
    36  
     35    static BlobManager get(URI dfsUri, Configuration configuration, HBaseTableFactory tableFactory) throws IOException { 
    3736        FileSystem fs = FileSystem.get(DfsUri.getBaseDfsUri(dfsUri), configuration); 
    3837        Path blobRootPath = new Path(DfsUri.getDfsPath(dfsUri)); 
     
    4140        BlobStoreAccess hbaseBlobStoreAccess = new HBaseBlobStoreAccess(tableFactory); 
    4241        BlobStoreAccess inlineBlobStoreAccess = new InlineBlobStoreAccess();  
    43         SizeBasedBlobStoreAccessFactory factory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 
    44         factory.addBlobStoreAccess(5000, inlineBlobStoreAccess); 
    45         factory.addBlobStoreAccess(200000, hbaseBlobStoreAccess); 
    46         return factory; 
     42        SizeBasedBlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 
     43        blobStoreAccessFactory.addBlobStoreAccess(5000, inlineBlobStoreAccess); 
     44        blobStoreAccessFactory.addBlobStoreAccess(200000, hbaseBlobStoreAccess); 
     45        return new BlobManagerImpl(tableFactory, blobStoreAccessFactory); 
    4746    } 
    4847} 
  • trunk/cr/process/server/repository-module/src/main/kauri/spring/services.xml

    r4611 r4621  
    7474 
    7575  <bean 
    76     id="blobStoreAccessFactory" 
     76    id="blobManager" 
    7777    class="org.lilyproject.server.modules.repository.BlobStoreConfig" 
    7878    factory-method="get"> 
     
    8989    <constructor-arg ref="typeManager"/> 
    9090    <constructor-arg ref="idGenerator"/> 
    91     <constructor-arg ref="blobStoreAccessFactory"/> 
    9291    <constructor-arg ref="writeAheadLog"/> 
    9392    <constructor-arg ref="hbaseConf"/> 
    9493    <constructor-arg ref="hbaseTableFactory"/> 
     94    <constructor-arg ref="blobManager"/> 
    9595  </bean> 
    9696 
  • trunk/cr/process/server/rest-module/src/main/java/org/lilyproject/rest/BlobByVersionAndFieldResource.java

    r4592 r4621  
    4444    protected static Response getBlob(String id, String version, String fieldName, UriInfo uriInfo, 
    4545            final Repository repository) { 
    46         RecordId recordId = repository.getIdGenerator().fromString(id); 
     46        final RecordId recordId = repository.getIdGenerator().fromString(id); 
    4747 
    48         QName fieldQName = ResourceClassUtil.parseQName(fieldName, uriInfo.getQueryParameters()); 
     48        final QName fieldQName = ResourceClassUtil.parseQName(fieldName, uriInfo.getQueryParameters()); 
    4949 
    50         Long versionNr = null; 
     50        Long vNr = null; 
    5151        if (version != null) { 
    52             versionNr = Long.parseLong(version); 
     52            vNr = Long.parseLong(version); 
    5353        } 
     54        final Long versionNr = vNr; 
    5455 
    5556        Record record; 
     
    7374                    InputStream is = null; 
    7475                    try { 
    75                         is = repository.getInputStream(blob); 
     76                        is = repository.getInputStream(recordId, versionNr, fieldQName, null, null); 
    7677                        IOUtils.copyLarge(is, output); 
    7778                    } catch (BlobNotFoundException e) { 
  • trunk/cr/repository-api-tutorial/src/test/java/org/lilyproject/repository/api/tutorial/TutorialTest.java

    r4611 r4621  
    3333import org.junit.Test; 
    3434import org.lilyproject.repository.api.Blob; 
     35import org.lilyproject.repository.api.BlobManager; 
    3536import org.lilyproject.repository.api.FieldType; 
    3637import org.lilyproject.repository.api.IdGenerator; 
     
    4344import org.lilyproject.repository.api.TypeManager; 
    4445import org.lilyproject.repository.api.ValueType; 
     46import org.lilyproject.repository.impl.BlobManagerImpl; 
    4547import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
    4648import org.lilyproject.repository.impl.HBaseRepository; 
     
    8587    private static HBaseTableFactory hbaseTableFactory; 
    8688 
     89 
    8790    @BeforeClass 
    8891    public static void setUpBeforeClass() throws Exception { 
     
    101104        SizeBasedBlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 
    102105        blobStoreAccessFactory.addBlobStoreAccess(Long.MAX_VALUE, dfsBlobStoreAccess); 
     106        BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 
    103107        setupWal(); 
    104         repository = new HBaseRepository(typeManager, idGenerator, blobStoreAccessFactory, wal, configuration, hbaseTableFactory); 
     108        repository = new HBaseRepository(typeManager, idGenerator, wal, configuration, hbaseTableFactory, blobManager); 
    105109 
    106110    } 
     
    294298        InputStream is = null; 
    295299        try { 
    296             is = repository.getInputStream((Blob)record.getField(new QName(NS, "description"))); 
     300            is = repository.getInputStream(record.getId(), new QName(NS, "description")); 
    297301            System.out.println("Data read from blob is:"); 
    298302            Reader reader = new InputStreamReader(is, "UTF-8"); 
  • trunk/cr/repository/api/src/main/java/org/lilyproject/repository/api/BlobNotFoundException.java

    r4592 r4621  
    1717 
    1818public class BlobNotFoundException extends RepositoryException { 
    19     private final Blob blob; 
     19    private Blob blob = null; 
    2020 
    21     public BlobNotFoundException(Blob blob) { 
     21    public BlobNotFoundException(Blob blob, String message, Throwable cause) { 
     22        super(message, cause); 
    2223        this.blob = blob; 
    2324    } 
    24  
     25     
     26    public BlobNotFoundException(Blob blob, String message) { 
     27        super(message); 
     28        this.blob = blob; 
     29    } 
     30     
     31    public BlobNotFoundException(String message, Throwable cause) { 
     32        super(message, cause); 
     33    } 
     34     
     35    public BlobNotFoundException(String message) { 
     36        super(message); 
     37    } 
     38     
    2539    public Blob getBlob() { 
    2640        return blob; 
     
    2943    @Override 
    3044    public String getMessage() { 
    31         return "Blob <" + blob + "> could not be found."; 
     45        if (blob != null) 
     46            return "Blob <" + blob + "> could not be found." + super.getMessage(); 
     47        return super.getMessage(); 
    3248    } 
    3349} 
  • trunk/cr/repository/api/src/main/java/org/lilyproject/repository/api/BlobStoreAccess.java

    r4592 r4621  
    5454     * @param key a unique key identifying the written bytes on the blobstore, see {@link #getOutputStream(Blob)} 
    5555     *  
    56      * @return an InputStream from whih a stream of bytes can be read 
     56     * @return an InputStream from which a stream of bytes can be read 
    5757     * @throws BlobException when an unexpected exception occurred (e.g. an IOException of the underlying blobstore) 
    5858     */ 
     
    6767     */ 
    6868    void delete(byte[] key) throws BlobException; 
     69     
     70    boolean incubate(); 
    6971} 
  • trunk/cr/repository/api/src/main/java/org/lilyproject/repository/api/RecordId.java

    r4592 r4621  
    6565    SortedMap<String, String> getVariantProperties(); 
    6666 
     67    int hashCode(); 
     68 
    6769    boolean equals(Object obj); 
    6870} 
  • trunk/cr/repository/api/src/main/java/org/lilyproject/repository/api/Repository.java

    r4592 r4621  
    241241    OutputStream getOutputStream(Blob blob) throws BlobException, InterruptedException; 
    242242 
    243     /** 
    244      * Returns an {@link InputStream} from which the binary data of a blob can 
    245      * be read. The value of blob is used to identify the underlying blobstore 
    246      * and actual data to return through this InputStream, see {@link #getOutputStream(Blob)}. 
    247      * 
    248      * @param blob the blob for which to open an InputStream 
    249      * @return an InputStream 
    250      * @throws BlobNotFoundException when the blob does not contain a valid key in its value 
    251      * @throws RepositoryException when an unexpected exception occurs 
    252      */ 
    253     InputStream getInputStream(Blob blob) throws BlobNotFoundException, BlobException, InterruptedException; 
    254  
    255     /** 
    256      * Deletes the data identified by a blob from the underlying blobstore. See {@link #getOutputStream(Blob)} and {@link #getInputStream(Blob)}. 
    257      * @param blob the blob to delete 
    258      * @throws BlobNotFoundException when the blob does not contain a valid key in its value 
    259      * @throws RepositoryException when an unexpected exception occurs 
    260      */ 
    261     void delete(Blob blob) throws BlobNotFoundException, BlobException, InterruptedException; 
     243     
     244 
     245    /** 
     246     * Returns an {@link InputStream} from which the binary data of a blob can be read. 
     247     * The blob can only be retrieved by referring to location of the blob in the record  
     248     * and field where it is used.    
     249     * @param recordId the id of the record containing the blob 
     250     * @param fieldName the QName of the field containing the blob 
     251     * @param version optionally a version of the record, if null the latest record version is used 
     252     * @param multivalueIndex the position of the blob in a multivalue field 
     253     * @param hierarchyIndex the position of the blob in a hierarchical field 
     254     * @return a BlobInputStream 
     255     * @throws BlobNotFoundException thrown when no blob can be found at the given location 
     256     * @throws BlobException thrown when opening an InputStream on the blob fails  
     257     */ 
     258    BlobInputStream getInputStream(RecordId recordId, Long version, QName fieldName, Integer multivalueIndex, Integer hierarchyIndex) throws BlobNotFoundException, BlobException, InterruptedException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException; 
     259 
     260    /** 
     261     * Shortcut getInputStream method where version, multivalueIndex and hierarchyIndex are set to null. 
     262     */ 
     263    BlobInputStream getInputStream(RecordId recordId, QName fieldName) throws BlobNotFoundException, BlobException, InterruptedException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException; 
    262264 
    263265    /** 
  • trunk/cr/repository/api/src/main/java/org/lilyproject/repository/api/ValueType.java

    r4592 r4621  
    1515 */ 
    1616package org.lilyproject.repository.api; 
     17 
     18import java.util.Set; 
    1719 
    1820/** 
     
    9395    byte[] toBytes(); 
    9496     
     97    /** 
     98     * Returns a set of all values contained in this value. 
     99     * It flattens out the aspects of multivalue and hierarchy values. 
     100     */ 
     101    Set<Object> getValues(Object value); 
     102     
    95103    boolean equals(Object obj); 
    96104} 
  • trunk/cr/repository/impl/src/main/avro/lily.avpr

    r4620 r4621  
    156156      "type": "error", 
    157157      "fields": [ 
    158         {"name": "blob", "type": "AvroBlob"} 
     158        {"name": "blob", "type": "AvroBlob"}, 
     159        {"name": "message", "type": ["string", "null"]}, 
     160        {"name": "remoteCauses", "type": ["null", {"type": "array", "items": "AvroExceptionCause"}]} 
    159161      ] 
    160162    }, 
     
    358360    }, 
    359361     
    360     "deleteBlob": { 
    361       "request": [{"name": "blob", "type": "AvroBlob"}], 
    362       "response": "null", 
    363       "errors": ["AvroBlobNotFoundException", "AvroBlobException", "AvroGenericException", "AvroInterruptedException"] 
    364     }, 
    365      
    366362    "getVariants": { 
    367363      "request": [{"name": "recordId", "type": "bytes"}], 
  • trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/avro/AvroConverter.java

    r4620 r4621  
    1616package org.lilyproject.repository.avro; 
    1717 
    18 import java.io.IOException; 
    1918import java.nio.ByteBuffer; 
    2019import java.util.ArrayList; 
     
    2928import org.apache.avro.AvroRemoteException; 
    3029import org.apache.avro.util.Utf8; 
    31 import org.lilyproject.repository.api.*; 
     30import org.lilyproject.repository.api.Blob; 
     31import org.lilyproject.repository.api.BlobException; 
     32import org.lilyproject.repository.api.BlobNotFoundException; 
     33import org.lilyproject.repository.api.FieldType; 
     34import org.lilyproject.repository.api.FieldTypeEntry; 
     35import org.lilyproject.repository.api.FieldTypeExistsException; 
     36import org.lilyproject.repository.api.FieldTypeNotFoundException; 
     37import org.lilyproject.repository.api.FieldTypeUpdateException; 
     38import org.lilyproject.repository.api.IdGenerator; 
     39import org.lilyproject.repository.api.IdRecord; 
     40import org.lilyproject.repository.api.InvalidRecordException; 
     41import org.lilyproject.repository.api.QName; 
     42import org.lilyproject.repository.api.Record; 
     43import org.lilyproject.repository.api.RecordException; 
     44import org.lilyproject.repository.api.RecordExistsException; 
     45import org.lilyproject.repository.api.RecordId; 
     46import org.lilyproject.repository.api.RecordLockedException; 
     47import org.lilyproject.repository.api.RecordNotFoundException; 
     48import org.lilyproject.repository.api.RecordType; 
     49import org.lilyproject.repository.api.RecordTypeExistsException; 
     50import org.lilyproject.repository.api.RecordTypeNotFoundException; 
     51import org.lilyproject.repository.api.RemoteException; 
     52import org.lilyproject.repository.api.Repository; 
     53import org.lilyproject.repository.api.RepositoryException; 
     54import org.lilyproject.repository.api.Scope; 
     55import org.lilyproject.repository.api.TypeException; 
     56import org.lilyproject.repository.api.TypeManager; 
     57import org.lilyproject.repository.api.ValueType; 
     58import org.lilyproject.repository.api.VersionNotFoundException; 
    3259import org.lilyproject.repository.impl.IdRecordImpl; 
    3360 
     
    601628 
    602629    public BlobNotFoundException convert(AvroBlobNotFoundException avroException) { 
    603         BlobNotFoundException exception = new BlobNotFoundException(convert(avroException.blob)); 
    604         return exception; 
     630        try { 
     631            BlobNotFoundException exception = new BlobNotFoundException(convert(avroException.blob), convert(avroException.message)); 
     632            restoreCauses(avroException.remoteCauses, exception); 
     633            return exception; 
     634        } catch (Exception e) { 
     635            // TODO this is not good, exceptions should never fail to deserialize 
     636            throw new RuntimeException("Error deserializing exception.", e); 
     637        } 
    605638    } 
    606639 
  • trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/avro/AvroLilyImpl.java

    r4592 r4621  
    2020import java.util.List; 
    2121 
    22 import org.lilyproject.repository.api.*; 
     22import org.lilyproject.repository.api.FieldTypeExistsException; 
     23import org.lilyproject.repository.api.FieldTypeNotFoundException; 
     24import org.lilyproject.repository.api.FieldTypeUpdateException; 
     25import org.lilyproject.repository.api.InvalidRecordException; 
     26import org.lilyproject.repository.api.QName; 
     27import org.lilyproject.repository.api.RecordException; 
     28import org.lilyproject.repository.api.RecordExistsException; 
     29import org.lilyproject.repository.api.RecordLockedException; 
     30import org.lilyproject.repository.api.RecordNotFoundException; 
     31import org.lilyproject.repository.api.RecordTypeExistsException; 
     32import org.lilyproject.repository.api.RecordTypeNotFoundException; 
     33import org.lilyproject.repository.api.Repository; 
     34import org.lilyproject.repository.api.RepositoryException; 
     35import org.lilyproject.repository.api.TypeException; 
     36import org.lilyproject.repository.api.TypeManager; 
     37import org.lilyproject.repository.api.VersionNotFoundException; 
    2338 
    2439public class AvroLilyImpl implements AvroLily { 
     
    364379        } 
    365380    } 
    366  
    367     public Void deleteBlob(AvroBlob avroBlob) throws AvroBlobNotFoundException, AvroBlobException, 
    368             AvroInterruptedException { 
    369         try { 
    370             repository.delete(converter.convert(avroBlob)); 
    371         } catch (BlobNotFoundException e) { 
    372             throw converter.convert(e); 
    373         } catch (BlobException e) { 
    374             throw converter.convert(e); 
    375         } catch (InterruptedException e) { 
    376             throw converter.convert(e); 
    377         } 
    378         return null; 
    379     } 
    380  
    381381} 
  • trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/BlobStoreAccessRegistry.java

    r4592 r4621  
    1818import java.io.FilterOutputStream; 
    1919import java.io.IOException; 
    20 import java.io.InputStream; 
    2120import java.io.OutputStream; 
    2221import java.util.HashMap; 
     
    2423 
    2524import org.apache.hadoop.hbase.util.Bytes; 
    26 import org.lilyproject.repository.api.*; 
     25import org.lilyproject.repository.api.Blob; 
     26import org.lilyproject.repository.api.BlobException; 
     27import org.lilyproject.repository.api.BlobInputStream; 
     28import org.lilyproject.repository.api.BlobManager; 
    2729import org.lilyproject.repository.api.BlobNotFoundException; 
     30import org.lilyproject.repository.api.BlobStoreAccess; 
     31import org.lilyproject.repository.api.BlobStoreAccessFactory; 
    2832import org.lilyproject.util.Pair; 
    2933 
     
    3236    Map<String, BlobStoreAccess> registry = new HashMap<String, BlobStoreAccess>(); 
    3337    private BlobStoreAccessFactory blobStoreAccessFactory; 
     38    private final BlobManager blobManager; 
    3439     
    35     public BlobStoreAccessRegistry() { 
     40    public BlobStoreAccessRegistry(BlobManager blobManager) { 
     41        this.blobManager = blobManager; 
    3642    } 
    3743     
     
    4955    public OutputStream getOutputStream(Blob blob) throws BlobException { 
    5056        BlobStoreAccess blobStoreAccess = blobStoreAccessFactory.get(blob); 
    51         return new BlobOutputStream(blobStoreAccess.getOutputStream(blob), blobStoreAccess.getId(), blob); 
     57        return new BlobOutputStream(blobStoreAccess.getOutputStream(blob), blobStoreAccess.getId(), blob, blobManager, blobStoreAccess.incubate()); 
    5258    } 
    5359 
    54     public InputStream getInputStream(Blob blob) throws BlobNotFoundException, BlobException { 
     60    public BlobInputStream getInputStream(Blob blob) throws BlobNotFoundException, BlobException { 
    5561        Pair<String, byte[]> decodedKey = decodeKey(blob); 
    5662        BlobStoreAccess blobStoreAccess = registry.get(decodedKey.getV1()); 
    57         return blobStoreAccess.getInputStream(decodedKey.getV2()); 
     63        return new BlobInputStream(blobStoreAccess.getInputStream(decodedKey.getV2()), blob); 
    5864    } 
    5965 
    6066    private Pair<String, byte[]> decodeKey(Blob blob) throws BlobNotFoundException, BlobException { 
    6167        if (blob.getValue() == null) { 
    62             throw new BlobNotFoundException(blob); 
     68            throw new BlobNotFoundException(blob, "Blob has no reference to a blob in the blobstore", null); 
    6369        } 
    6470        Pair<String, byte[]> decodedKey; 
     
    97103        private final Blob blob; 
    98104        private final String blobStoreAccessId; 
     105        private final BlobManager blobManager; 
     106        private final boolean incubate; 
    99107 
    100         public BlobOutputStream(OutputStream outputStream, String blobStoreAccessId, Blob blob) { 
     108        public BlobOutputStream(OutputStream outputStream, String blobStoreAccessId, Blob blob, BlobManager blobManager, boolean incubate) { 
    101109            super(outputStream); 
    102110            this.blobStoreAccessId = blobStoreAccessId; 
    103111            this.blob = blob; 
     112            this.blobManager = blobManager; 
     113            this.incubate = incubate; 
    104114        } 
    105115 
     
    107117        public void close() throws IOException { 
    108118            super.close(); 
    109             blob.setValue(encode(blobStoreAccessId, blob.getValue())); 
     119            byte[] encodedBlobKey = encode(blobStoreAccessId, blob.getValue()); 
     120            if (incubate) { 
     121                blobManager.incubateBlob(encodedBlobKey); 
     122            } 
     123            blob.setValue(encodedBlobKey); 
    110124        } 
    111125    } 
  • trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/DFSBlobStoreAccess.java

    r4592 r4621  
    8686        UUID uuid = decode(blobKey); 
    8787        try { 
    88             fileSystem.delete(new Path(uuid.toString()), false); 
     88            fileSystem.delete(createPath(uuid), false); 
    8989        } catch (IOException e) { 
    9090            throw new BlobException("Failed to delete blob with key <" +blobKey+ "> from the DFS blobstore", e); 
    9191        } 
     92    } 
     93 
     94    public boolean incubate() { 
     95        return true; 
    9296    } 
    9397 
     
    111115        } 
    112116    } 
     117     
    113118} 
  • trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/HBaseBlobStoreAccess.java

    r4614 r4621  
    8585        } 
    8686        byte[] value = result.getValue(BLOBS_COLUMN_FAMILY_BYTES, BLOB_COLUMN); 
     87        if (value == null) { 
     88            throw new BlobException("Failed to open an inputstream for blobkey <" + blobKey + "> since no blob was found on the HBASE blobstore"); 
     89        } 
    8790        return new ByteArrayInputStream(value); 
    8891    } 
     
    9598            throw new BlobException("Failed to delete blob with key <" + blobKey + "> from the DFS blobstore", e); 
    9699        } 
     100    } 
     101     
     102    public boolean incubate() { 
     103        return true; 
    97104    } 
    98105 
  • trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/HBaseRepository.java

    r4620 r4621  
    2020 
    2121import java.io.IOException; 
    22 import java.io.InputStream; 
    2322import java.io.OutputStream; 
    2423import java.util.ArrayList; 
     
    3635import org.apache.commons.logging.LogFactory; 
    3736import org.apache.hadoop.conf.Configuration; 
     37import org.apache.hadoop.hbase.KeyValue; 
    3838import org.apache.hadoop.hbase.client.Get; 
    3939import org.apache.hadoop.hbase.client.HTableInterface; 
     
    4444import org.apache.hadoop.hbase.client.Scan; 
    4545import org.apache.hadoop.hbase.filter.CompareFilter; 
     46import org.apache.hadoop.hbase.filter.Filter; 
    4647import org.apache.hadoop.hbase.filter.FilterList; 
    4748import org.apache.hadoop.hbase.filter.PrefixFilter; 
    4849import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 
     50import org.apache.hadoop.hbase.filter.ValueFilter; 
     51import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; 
     52import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 
    4953import org.apache.hadoop.hbase.util.Bytes; 
    50 import org.lilyproject.repository.api.*; 
     54import org.lilyproject.repository.api.Blob; 
     55import org.lilyproject.repository.api.BlobException; 
     56import org.lilyproject.repository.api.BlobInputStream; 
     57import org.lilyproject.repository.api.BlobManager; 
     58import org.lilyproject.repository.api.BlobNotFoundException; 
     59import org.lilyproject.repository.api.BlobReference; 
     60import org.lilyproject.repository.api.BlobStoreAccess; 
     61import org.lilyproject.repository.api.FieldNotFoundException; 
     62import org.lilyproject.repository.api.FieldType; 
     63import org.lilyproject.repository.api.FieldTypeEntry; 
     64import org.lilyproject.repository.api.FieldTypeNotFoundException; 
     65import org.lilyproject.repository.api.IdGenerator; 
     66import org.lilyproject.repository.api.IdRecord; 
     67import org.lilyproject.repository.api.InvalidRecordException; 
     68import org.lilyproject.repository.api.PrimitiveValueType; 
     69import org.lilyproject.repository.api.QName; 
     70import org.lilyproject.repository.api.Record; 
     71import org.lilyproject.repository.api.RecordException; 
     72import org.lilyproject.repository.api.RecordExistsException; 
     73import org.lilyproject.repository.api.RecordId; 
     74import org.lilyproject.repository.api.RecordLockedException; 
     75import org.lilyproject.repository.api.RecordNotFoundException; 
     76import org.lilyproject.repository.api.RecordType; 
     77import org.lilyproject.repository.api.RecordTypeNotFoundException; 
     78import org.lilyproject.repository.api.Repository; 
     79import org.lilyproject.repository.api.RepositoryException; 
     80import org.lilyproject.repository.api.ResponseStatus; 
     81import org.lilyproject.repository.api.Scope; 
     82import org.lilyproject.repository.api.TypeException; 
     83import org.lilyproject.repository.api.TypeManager; 
     84import org.lilyproject.repository.api.ValueType; 
     85import org.lilyproject.repository.api.VersionNotFoundException; 
    5186import org.lilyproject.repository.impl.RepositoryMetrics.Action; 
    5287import org.lilyproject.repository.impl.RepositoryMetrics.HBaseAction; 
    5388import org.lilyproject.repository.impl.lock.RowLocker; 
     89import org.lilyproject.repository.impl.primitivevaluetype.BlobValueType; 
    5490import org.lilyproject.rowlog.api.RowLog; 
    5591import org.lilyproject.rowlog.api.RowLogException; 
     
    72108public class HBaseRepository implements Repository { 
    73109  
    74  
    75110    private HTableInterface recordTable; 
    76111    private final TypeManager typeManager; 
     
    80115    private Map<Scope, byte[]> recordTypeIdColumnNames = new HashMap<Scope, byte[]>(); 
    81116    private Map<Scope, byte[]> recordTypeVersionColumnNames = new HashMap<Scope, byte[]>(); 
    82     private BlobStoreAccessRegistry blobStoreAccessRegistry; 
    83117    private RowLog wal; 
    84118    private RowLocker rowLocker; 
     
    86120    private Log log = LogFactory.getLog(getClass()); 
    87121    private RepositoryMetrics metrics; 
    88     private final HBaseTableFactory hbaseTableFactory; 
     122    private BlobManager blobManager; 
    89123 
    90124    public HBaseRepository(TypeManager typeManager, IdGenerator idGenerator, 
    91             BlobStoreAccessFactory blobStoreAccessFactory, RowLog wal, Configuration configuration, HBaseTableFactory hbaseTableFactory) throws IOException { 
     125            RowLog wal, Configuration configuration, HBaseTableFactory hbaseTableFactory, BlobManager blobManager) throws IOException { 
    92126        this.typeManager = typeManager; 
    93127        this.idGenerator = idGenerator; 
    94128        this.wal = wal; 
    95         this.hbaseTableFactory = hbaseTableFactory; 
    96         blobStoreAccessRegistry = new BlobStoreAccessRegistry(); 
    97         blobStoreAccessRegistry.setBlobStoreAccessFactory(blobStoreAccessFactory); 
     129        this.blobManager = blobManager; 
     130 
    98131        recordTable = LilyHBaseSchema.getRecordTable(hbaseTableFactory); 
    99132 
     
    231264                RecordEvent recordEvent = new RecordEvent(); 
    232265                recordEvent.setType(Type.CREATE); 
    233                 calculateRecordChanges(newRecord, dummyOriginalRecord, version, put, recordEvent, false); 
     266                Set<BlobReference> referencedBlobs = new HashSet<BlobReference>(); 
     267                Set<BlobReference> unReferencedBlobs = new HashSet<BlobReference>(); 
     268                 
     269                calculateRecordChanges(newRecord, dummyOriginalRecord, version, put, recordEvent, referencedBlobs, unReferencedBlobs, false); 
     270 
    234271                // Make sure the record type changed flag stays false for a newly 
    235272                // created record 
    236273                recordEvent.setRecordTypeChanged(false); 
    237                 if (newRecord.getVersion() != null) 
    238                     recordEvent.setVersionCreated(newRecord.getVersion()); 
    239      
     274                Long newVersion = newRecord.getVersion(); 
     275                if (newVersion != null) 
     276                    recordEvent.setVersionCreated(newVersion); 
     277 
     278                // Reserve blobs so no other records can use them 
     279                reserveBlobs(null, referencedBlobs); 
     280                 
    240281                walMessage = wal.putMessage(recordId.toBytes(), null, recordEvent.toJsonBytes(), put); 
    241282                long beforeHbase = System.currentTimeMillis(); 
    242283                recordTable.put(put); 
    243284                metrics.reportHBase(HBaseAction.PUT, System.currentTimeMillis()-beforeHbase); 
     285                 
     286                // Remove the used blobs from the blobIncubator 
     287                blobManager.handleBlobReferences(recordId, referencedBlobs, unReferencedBlobs); 
    244288                 
    245289                // Take Custom RowLock before releasing the HBase RowLock 
     
    258302                Thread.currentThread().interrupt(); 
    259303                throw new RecordException("Exception occurred while creating record <" + recordId + "> in HBase table", 
     304                        e); 
     305            } catch (BlobException e) { 
     306                throw new RecordException("Exception occurred while creating record <" + recordId + ">", 
    260307                        e); 
    261308            } finally { 
     
    284331 
    285332            newRecord.setResponseStatus(ResponseStatus.CREATED); 
     333             
     334             
     335             
    286336            return newRecord; 
    287337        } finally { 
     
    321371             
    322372            if (updateVersion) { 
    323                 return updateMutableFields(record, useLatestRecordType); 
     373                try { 
     374                    return updateMutableFields(record, useLatestRecordType); 
     375                } catch (BlobException e) { 
     376                    throw new RecordException("Exception occurred while updating record <" + record.getId() + ">", 
     377                            e); 
     378                } 
    324379            } else { 
    325380                return updateRecord(record, useLatestRecordType); 
     
    350405 
    351406            Put put = new Put(newRecord.getId().toBytes()); 
     407            Set<BlobReference> referencedBlobs = new HashSet<BlobReference>(); 
     408            Set<BlobReference> unReferencedBlobs = new HashSet<BlobReference>(); 
    352409            RecordEvent recordEvent = new RecordEvent(); 
    353410            recordEvent.setType(Type.UPDATE); 
    354411            long newVersion = originalRecord.getVersion() == null ? 1 : originalRecord.getVersion() + 1; 
    355             if (calculateRecordChanges(newRecord, originalRecord, newVersion, put, recordEvent, useLatestRecordType)) { 
    356                 putMessageOnWalAndProcess(recordId, rowLock, put, recordEvent); 
    357                 newRecord.setResponseStatus(ResponseStatus.UPDATED); 
    358             } else { 
    359                 newRecord.setResponseStatus(ResponseStatus.UP_TO_DATE); 
    360             } 
     412                 
     413                if (calculateRecordChanges(newRecord, originalRecord, newVersion, put, recordEvent, referencedBlobs, unReferencedBlobs, useLatestRecordType)) { 
     414                    // Reserve blobs so no other records can use them 
     415                    reserveBlobs(record.getId(), referencedBlobs); 
     416                    putMessageOnWalAndProcess(recordId, rowLock, put, recordEvent); 
     417                    // Remove the used blobs from the blobIncubator and delete unreferenced blobs from the blobstore 
     418                    blobManager.handleBlobReferences(recordId, referencedBlobs, unReferencedBlobs); 
     419                    newRecord.setResponseStatus(ResponseStatus.UPDATED); 
     420                } else { 
     421                    newRecord.setResponseStatus(ResponseStatus.UP_TO_DATE); 
     422                } 
    361423        } catch (RowLogException e) { 
    362424            throw new RecordException("Exception occurred while putting updated record <" + recordId 
     
    370432            throw new RecordException("Exception occurred while updating record <" + recordId + "> on HBase table", 
    371433                    e); 
     434        } catch (BlobException e) { 
     435            throw new RecordException("Exception occurred while putting updated record <" + recordId 
     436                    + "> on HBase table", e); 
    372437        } finally { 
    373438            unlockRow(rowLock); 
     
    397462    // this information on the Put object and the RecordEvent 
    398463    private boolean calculateRecordChanges(Record record, Record originalRecord, Long version, Put put, 
    399             RecordEvent recordEvent, boolean useLatestRecordType) throws RecordTypeNotFoundException, FieldTypeNotFoundException, 
    400             RecordException, TypeException, InvalidRecordException, InterruptedException { 
     464            RecordEvent recordEvent, Set<BlobReference> referencedBlobs, Set<BlobReference> unReferencedBlobs, boolean useLatestRecordType) throws InterruptedException, RecordTypeNotFoundException, TypeException, FieldTypeNotFoundException, BlobException, RecordException, InvalidRecordException { 
    401465        QName recordTypeName = record.getRecordTypeName(); 
    402466        Long recordTypeVersion = null; 
     
    410474         
    411475        // Check which fields have changed 
    412         Set<Scope> changedScopes = calculateChangedFields(record, originalRecord, recordType, version, put, recordEvent); 
     476        Set<Scope> changedScopes = calculateChangedFields(record, originalRecord, recordType, version, put, recordEvent, referencedBlobs, unReferencedBlobs); 
    413477 
    414478        // If no versioned fields have changed, keep the original version 
     
    446510 
    447511        } 
     512         
    448513        // Always set the version on the record. If no fields were changed this 
    449514        // will give the latest version in the repository 
    450515        record.setVersion(version); 
    451  
     516         
    452517        if (versionedFieldsHaveChanged) { 
    453518            recordEvent.setVersionCreated(version); 
    454519        } 
    455          
     520 
    456521        // Clear the list of deleted fields, as this is typically what the user will expect when using the 
    457522        // record object for future updates.  
     
    486551 
    487552    private Set<Scope> calculateChangedFields(Record record, Record originalRecord, RecordType recordType, 
    488             Long version, Put put, RecordEvent recordEvent) throws FieldTypeNotFoundException, 
    489             RecordTypeNotFoundException, RecordException, TypeException, InterruptedException { 
     553            Long version, Put put, RecordEvent recordEvent, Set<BlobReference> referencedBlobs, Set<BlobReference> unReferencedBlobs) throws InterruptedException, FieldTypeNotFoundException, TypeException, BlobException, RecordTypeNotFoundException, RecordException { 
    490554        Map<QName, Object> originalFields = originalRecord.getFields(); 
    491555        Set<Scope> changedScopes = new HashSet<Scope>(); 
     
    493557        Map<QName, Object> fields = getFieldsToUpdate(record); 
    494558         
    495         changedScopes.addAll(calculateUpdateFields(fields, originalFields, null, version, put, recordEvent)); 
    496          
     559        changedScopes.addAll(calculateUpdateFields(fields, originalFields, null, version, put, recordEvent, referencedBlobs, unReferencedBlobs, false)); 
     560        for (BlobReference referencedBlob : referencedBlobs) { 
     561            referencedBlob.setRecordId(record.getId()); 
     562        } 
     563        for (BlobReference unReferencedBlob : unReferencedBlobs) { 
     564            unReferencedBlob.setRecordId(record.getId()); 
     565        } 
    497566        // Update record types 
    498567        for (Scope scope : changedScopes) { 
     
    525594 
    526595    private Set<Scope> calculateUpdateFields(Map<QName, Object> fields, Map<QName, Object> originalFields, Map<QName, Object> originalNextFields, 
    527             Long version, Put put, RecordEvent recordEvent) throws FieldTypeNotFoundException, 
    528             RecordTypeNotFoundException, RecordException, TypeException, InterruptedException { 
     596            Long version, Put put, RecordEvent recordEvent, Set<BlobReference> referencedBlobs, Set<BlobReference> unReferencedBlobs, boolean mutableUpdate) throws InterruptedException, FieldTypeNotFoundException, TypeException, BlobException, RecordTypeNotFoundException, RecordException { 
    529597        Set<Scope> changedScopes = new HashSet<Scope>(); 
    530598        for (Entry<QName, Object> field : fields.entrySet()) { 
     
    540608                Scope scope = fieldType.getScope(); 
    541609                byte[] fieldIdAsBytes = fieldType.getIdBytes(); 
     610                 
     611                // Check if the newValue contains blobs  
     612                Set<BlobReference> newReferencedBlobs = getReferencedBlobs(fieldType, newValue); 
     613                referencedBlobs.addAll(newReferencedBlobs); 
     614 
    542615                byte[] encodedFieldValue = encodeFieldValue(fieldType, newValue); 
    543616 
     617                // Check if the previousValue contained blobs which should be deleted since they are no longer used 
     618                // In case of a mutable update, it is checked later if no other versions use the blob before deciding to delete it 
     619                if (Scope.NON_VERSIONED.equals(scope) || (mutableUpdate && Scope.VERSIONED_MUTABLE.equals(scope))) { 
     620                    if (originalValue != null) { 
     621                        Set<BlobReference> previouslyReferencedBlobs = getReferencedBlobs(fieldType, originalValue); 
     622                        previouslyReferencedBlobs.removeAll(newReferencedBlobs); 
     623                        unReferencedBlobs.addAll(previouslyReferencedBlobs); 
     624                    } 
     625                } 
     626 
     627                // Set the value  
    544628                if (Scope.NON_VERSIONED.equals(scope)) { 
    545629                    put.add(columnFamily, fieldIdAsBytes, 1L, encodedFieldValue); 
     
    558642        return changedScopes; 
    559643    } 
    560  
    561     private boolean updateNeeded(Object newValue, boolean fieldIsNewOrDeleted, Object originalValue) { 
    562         return (!(((newValue == null) && (originalValue == null)) || (isDeleteMarker(newValue) && fieldIsNewOrDeleted) || (newValue.equals(originalValue)))); 
    563     } 
    564644     
    565645    private byte[] encodeFieldValue(FieldType fieldType, Object fieldValue) throws FieldTypeNotFoundException, 
     
    569649        ValueType valueType = fieldType.getValueType(); 
    570650 
    571         // TODO validate with Class#isAssignableFrom() 
    572651        byte[] encodedFieldValue = valueType.toBytes(fieldValue); 
    573652        encodedFieldValue = EncodingUtil.prefixValue(encodedFieldValue, EXISTS_FLAG); 
     
    581660    private Record updateMutableFields(Record record, boolean latestRecordType) throws InvalidRecordException, 
    582661            RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, 
    583             VersionNotFoundException, TypeException, RecordLockedException { 
     662            VersionNotFoundException, TypeException, RecordLockedException, BlobException { 
    584663 
    585664        Record newRecord = record.clone(); 
     
    598677            rowLock = lockRow(recordId); 
    599678             
    600             Record originalRecord = read(record.getId(), version, null, new ReadContext()); 
     679            Record originalRecord = read(recordId, version, null, new ReadContext()); 
    601680 
    602681            // Update the mutable fields 
    603             Put put = new Put(record.getId().toBytes()); 
     682            Put put = new Put(recordId.toBytes()); 
    604683            Map<QName, Object> fields = getFieldsToUpdate(record); 
    605684            fields = filterMutableFields(fields); 
    606685            Map<QName, Object> originalFields = filterMutableFields(originalRecord.getFields()); 
     686            Set<BlobReference> referencedBlobs = new HashSet<BlobReference>(); 
     687            Set<BlobReference> unReferencedBlobs = new HashSet<BlobReference>(); 
    607688            RecordEvent recordEvent = new RecordEvent(); 
    608689            recordEvent.setType(Type.UPDATE); 
    609690            recordEvent.setVersionUpdated(version); 
    610691 
    611             Set<Scope> changedScopes = calculateUpdateFields(fields, originalFields, getOriginalNextFields(recordId, version), version, put, recordEvent); 
    612  
     692            Set<Scope> changedScopes = calculateUpdateFields(fields, originalFields, getOriginalNextFields(recordId, version), version, put, recordEvent, referencedBlobs, unReferencedBlobs, true); 
     693            for (BlobReference referencedBlob : referencedBlobs) { 
     694                referencedBlob.setRecordId(recordId); 
     695            } 
     696            for (BlobReference unReferencedBlob : unReferencedBlobs) { 
     697                unReferencedBlob.setRecordId(recordId); 
     698            } 
     699             
    613700            if (!changedScopes.isEmpty()) { 
    614701                Long recordTypeVersion = latestRecordType ? null : record.getRecordTypeVersion(); 
     
    628715                recordEvent.setVersionUpdated(version); 
    629716 
     717                // Reserve blobs so no other records can use them 
     718                reserveBlobs(record.getId(), referencedBlobs); 
     719                 
    630720                putMessageOnWalAndProcess(recordId, rowLock, put, recordEvent); 
     721                 
     722                // The unReferencedBlobs could still be in use in another version of the mutable field, 
     723                // therefore we filter them first 
     724                unReferencedBlobs = filterReferencedBlobs(recordId, unReferencedBlobs, version); 
     725                 
     726                // Remove the used blobs from the blobIncubator 
     727                blobManager.handleBlobReferences(recordId, referencedBlobs, unReferencedBlobs); 
    631728            } 
    632729        } catch (RowLogException e) { 
     
    10181115                RecordEvent recordEvent = new RecordEvent(); 
    10191116                recordEvent.setType(Type.DELETE); 
    1020                  
    10211117                RowLogMessage walMessage = wal.putMessage(recordId.toBytes(), null, recordEvent.toJsonBytes(), put); 
    10221118                if (!rowLocker.put(put, rowLock)) { 
     
    10771173 
    10781174    public void registerBlobStoreAccess(BlobStoreAccess blobStoreAccess) { 
    1079         blobStoreAccessRegistry.register(blobStoreAccess); 
     1175        blobManager.register(blobStoreAccess); 
    10801176    } 
    10811177 
    10821178    public OutputStream getOutputStream(Blob blob) throws BlobException { 
    1083         return blobStoreAccessRegistry.getOutputStream(blob); 
    1084     } 
    1085  
    1086     public InputStream getInputStream(Blob blob) throws BlobNotFoundException, BlobException { 
    1087         return blobStoreAccessRegistry.getInputStream(blob); 
    1088     } 
    1089  
    1090     public void delete(Blob blob) throws BlobNotFoundException, BlobException { 
    1091         blobStoreAccessRegistry.delete(blob); 
     1179        return blobManager.getOutputStream(blob); 
     1180    } 
     1181     
     1182    public BlobInputStream getInputStream(RecordId recordId, Long version, QName fieldName, Integer multivalueIndex, Integer hierarchyIndex) throws BlobNotFoundException, BlobException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException, InterruptedException { 
     1183        Record record = read(recordId, version, Arrays.asList(new QName[]{fieldName})); 
     1184        FieldType fieldType = typeManager.getFieldTypeByName(fieldName); 
     1185        return blobManager.getInputStream(record, fieldName, multivalueIndex, hierarchyIndex, fieldType); 
     1186    } 
     1187     
     1188    public BlobInputStream getInputStream(RecordId recordId, QName fieldName) throws BlobNotFoundException, BlobException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException, InterruptedException { 
     1189        return getInputStream(recordId, null, fieldName, null, null); 
     1190    } 
     1191 
     1192    private Set<BlobReference> getReferencedBlobs(FieldTypeImpl fieldType, Object value) throws BlobException { 
     1193        fieldType.getValueType().getValues(value); 
     1194        HashSet<BlobReference> referencedBlobs = new HashSet<BlobReference>(); 
     1195        ValueType valueType = fieldType.getValueType(); 
     1196        PrimitiveValueType primitiveValueType = valueType.getPrimitive(); 
     1197        if ((primitiveValueType instanceof BlobValueType) && ! isDeleteMarker(value)) { 
     1198            Set<Object> values = valueType.getValues(value); 
     1199            for (Object object : values) { 
     1200                referencedBlobs.add(new BlobReference((Blob)object, null, fieldType)); 
     1201            } 
     1202        } 
     1203        return referencedBlobs; 
     1204    } 
     1205 
     1206    private void reserveBlobs(RecordId recordId, Set<BlobReference> referencedBlobs) throws IOException, 
     1207            InvalidRecordException { 
     1208        if (!referencedBlobs.isEmpty()) { 
     1209            // Check if the blob is newly uploaded 
     1210            Set<BlobReference> failedReservations = blobManager.reserveBlobs(referencedBlobs); 
     1211            // If not, filter those that are already used by the record 
     1212            failedReservations = filterReferencedBlobs(recordId, failedReservations, null); 
     1213            if (!failedReservations.isEmpty()) 
     1214            { 
     1215                throw new InvalidRecordException("Record references blobs which are not available for use", recordId); 
     1216            } 
     1217        } 
     1218    } 
     1219     
     1220    // Checks the set of blobs and returns a subset of those blobs which are not referenced anymore 
     1221    private Set<BlobReference> filterReferencedBlobs(RecordId recordId, Set<BlobReference> blobs, Long ignoreVersion) throws IOException { 
     1222        if (recordId == null) 
     1223            return blobs; 
     1224        Set<BlobReference> unReferencedBlobs = new HashSet<BlobReference>(); 
     1225        for (BlobReference blobReference : blobs) { 
     1226            FieldTypeImpl fieldType = (FieldTypeImpl)blobReference.getFieldType(); 
     1227            Get get = new Get(recordId.toBytes()); 
     1228            get.addColumn(columnFamily, fieldType.getIdBytes()); 
     1229            ValueType valueType = fieldType.getValueType(); 
     1230            byte[] valueToCompare; 
     1231            if (valueType.isMultiValue() && valueType.isHierarchical()) { 
     1232                valueToCompare = Bytes.toBytes(2); 
     1233            } else if (valueType.isMultiValue() || valueType.isHierarchical()) { 
     1234                valueToCompare = Bytes.toBytes(1); 
     1235            } else { 
     1236                valueToCompare = Bytes.toBytes(0); 
     1237            } 
     1238            valueToCompare = Bytes.add(valueToCompare, valueType.getPrimitive().toBytes(blobReference.getBlob())); 
     1239            WritableByteArrayComparable valueComparator = new ContainsValueComparator(valueToCompare); 
     1240            Filter filter = new ValueFilter(CompareOp.EQUAL, valueComparator); 
     1241            get.setFilter(filter); 
     1242            Result result = recordTable.get(get); 
     1243             
     1244            if (result.isEmpty()) { 
     1245                unReferencedBlobs.add(blobReference); 
     1246            } else { 
     1247                if (ignoreVersion != null) { 
     1248                    boolean stillReferenced = false; 
     1249                    List<KeyValue> column = result.getColumn(columnFamily, fieldType.getIdBytes()); 
     1250                    for (KeyValue keyValue : column) { 
     1251                        if (keyValue.getTimestamp() != ignoreVersion) { 
     1252                            stillReferenced = true; 
     1253                            break; 
     1254                        } 
     1255                    } 
     1256                    if (!stillReferenced) { 
     1257                        unReferencedBlobs.add(blobReference); 
     1258                    } 
     1259                } 
     1260            } 
     1261        } 
     1262        return unReferencedBlobs; 
    10921263    } 
    10931264 
  • trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/InlineBlobStoreAccess.java

    r4592 r4621  
    4949        // no-op 
    5050    } 
     51     
     52    public boolean incubate() { 
     53        return false; 
     54    } 
    5155 
    5256    private class InlineBlobOutputStream extends ByteArrayOutputStream { 
  • trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/NettyTransceiverFactory.java

    r4609 r4621  
    2727        } 
    2828 
    29         @Override 
    3029        public Thread newThread(Runnable r) { 
    3130            Thread thread = delegate.newThread(r); 
  • trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/RemoteRepository.java

    r4609 r4621  
    1717 
    1818import java.io.IOException; 
    19 import java.io.InputStream; 
    2019import java.io.OutputStream; 
    2120import java.lang.reflect.UndeclaredThrowableException; 
    2221import java.net.InetSocketAddress; 
    2322import java.util.ArrayList; 
     23import java.util.Arrays; 
    2424import java.util.List; 
    2525import java.util.Set; 
     
    2929import org.apache.avro.ipc.Transceiver; 
    3030import org.apache.avro.ipc.specific.SpecificRequestor; 
    31 import org.lilyproject.repository.api.*; 
    32 import org.lilyproject.repository.avro.*; 
     31import org.lilyproject.repository.api.Blob; 
     32import org.lilyproject.repository.api.BlobException; 
     33import org.lilyproject.repository.api.BlobInputStream; 
     34import org.lilyproject.repository.api.BlobManager; 
     35import org.lilyproject.repository.api.BlobNotFoundException; 
     36import org.lilyproject.repository.api.BlobStoreAccess; 
     37import org.lilyproject.repository.api.FieldType; 
     38import org.lilyproject.repository.api.FieldTypeNotFoundException; 
     39import org.lilyproject.repository.api.IORecordException; 
     40import org.lilyproject.repository.api.IdGenerator; 
     41import org.lilyproject.repository.api.IdRecord; 
     42import org.lilyproject.repository.api.InvalidRecordException; 
     43import org.lilyproject.repository.api.QName; 
     44import org.lilyproject.repository.api.Record; 
     45import org.lilyproject.repository.api.RecordException; 
     46import org.lilyproject.repository.api.RecordExistsException; 
     47import org.lilyproject.repository.api.RecordId; 
     48import org.lilyproject.repository.api.RecordLockedException; 
     49import org.lilyproject.repository.api.RecordNotFoundException; 
     50import org.lilyproject.repository.api.RecordTypeNotFoundException; 
     51import org.lilyproject.repository.api.Repository; 
     52import org.lilyproject.repository.api.RepositoryException; 
     53import org.lilyproject.repository.api.TypeException; 
     54import org.lilyproject.repository.api.TypeManager; 
     55import org.lilyproject.repository.api.VersionNotFoundException; 
     56import org.lilyproject.repository.avro.AvroConverter; 
     57import org.lilyproject.repository.avro.AvroFieldTypeNotFoundException; 
     58import org.lilyproject.repository.avro.AvroGenericException; 
     59import org.lilyproject.repository.avro.AvroInvalidRecordException; 
     60import org.lilyproject.repository.avro.AvroLily; 
     61import org.lilyproject.repository.avro.AvroQName; 
     62import org.lilyproject.repository.avro.AvroRecordException; 
     63import org.lilyproject.repository.avro.AvroRecordExistsException; 
     64import org.lilyproject.repository.avro.AvroRecordLockedException; 
     65import org.lilyproject.repository.avro.AvroRecordNotFoundException; 
     66import org.lilyproject.repository.avro.AvroRecordTypeNotFoundException; 
     67import org.lilyproject.repository.avro.AvroRepositoryException; 
     68import org.lilyproject.repository.avro.AvroTypeException; 
     69import org.lilyproject.repository.avro.AvroVersionNotFoundException; 
    3370import org.lilyproject.util.ArgumentValidator; 
    3471import org.lilyproject.util.io.Closer; 
     
    4380    private IdGenerator idGenerator; 
    4481    private final TypeManager typeManager; 
    45     private BlobStoreAccessRegistry blobStoreAccessRegistry; 
    4682    private Transceiver client; 
     83    private final BlobManager blobManager; 
    4784 
    4885    public RemoteRepository(InetSocketAddress address, AvroConverter converter, RemoteTypeManager typeManager, 
    49             IdGenerator idGenerator, BlobStoreAccessFactory blobStoreAccessFactory) throws IOException {         
     86            IdGenerator idGenerator, BlobManager blobManager) throws IOException {         
    5087        this.converter = converter; 
    5188        this.typeManager = typeManager; 
    5289        this.idGenerator = idGenerator; 
    53         blobStoreAccessRegistry = new BlobStoreAccessRegistry(); 
    54         blobStoreAccessRegistry.setBlobStoreAccessFactory(blobStoreAccessFactory); 
    55  
     90        this.blobManager = blobManager; 
     91         
    5692        //client = new HttpTransceiver(new URL("http://" + address.getHostName() + ":" + address.getPort() + "/")); 
    5793        client = new NettyTransceiver(address); 
     
    328364 
    329365    public void registerBlobStoreAccess(BlobStoreAccess blobStoreAccess) { 
    330         blobStoreAccessRegistry.register(blobStoreAccess); 
    331     } 
    332      
    333     public void delete(Blob blob) throws BlobNotFoundException, BlobException { 
    334         try { 
    335             lilyProxy.deleteBlob(converter.convert(blob)); 
    336         } catch (AvroBlobNotFoundException e) { 
    337             throw converter.convert(e); 
    338         } catch (AvroBlobException e) { 
    339             throw converter.convert(e); 
    340         } catch (AvroGenericException e) { 
    341             throw converter.convert(e); 
    342         } catch (AvroRemoteException e) { 
    343             throw converter.convert(e); 
    344         } catch (UndeclaredThrowableException e) { 
    345             throw handleUndeclaredBlobThrowable(e); 
    346         } 
    347     } 
    348      
    349     public InputStream getInputStream(Blob blob) throws BlobNotFoundException, BlobException { 
    350         return blobStoreAccessRegistry.getInputStream(blob); 
     366        blobManager.register(blobStoreAccess); 
     367    } 
     368     
     369    public BlobInputStream getInputStream(RecordId recordId, Long version, QName fieldName, Integer multivalueIndex, Integer hierarchyIndex) throws BlobNotFoundException, BlobException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException, InterruptedException { 
     370        Record record = read(recordId, version, Arrays.asList(new QName[]{fieldName})); 
     371        FieldType fieldType = typeManager.getFieldTypeByName(fieldName); 
     372        return blobManager.getInputStream(record, fieldName, multivalueIndex, hierarchyIndex, fieldType); 
     373    } 
     374     
     375    public BlobInputStream getInputStream(RecordId recordId, QName fieldName) throws BlobNotFoundException, BlobException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException, InterruptedException { 
     376        return getInputStream(recordId, null, fieldName, null, null); 
    351377    } 
    352378     
    353379    public OutputStream getOutputStream(Blob blob) throws BlobException { 
    354         return blobStoreAccessRegistry.getOutputStream(blob); 
    355     } 
    356  
     380        return blobManager.getOutputStream(blob); 
     381    } 
     382     
    357383    private RuntimeException handleUndeclaredRecordThrowable(UndeclaredThrowableException e) throws RecordException { 
    358384        if (e.getCause() instanceof IOException) { 
     
    362388        } 
    363389    } 
    364  
    365     private RuntimeException handleUndeclaredBlobThrowable(UndeclaredThrowableException e) throws BlobException { 
    366         if (e.getCause() instanceof IOException) { 
    367             throw new IOBlobException(e.getCause()); 
    368         } else { 
    369             throw e; 
    370         } 
    371     } 
    372390} 
    373391 
  • trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/ValueTypeImpl.java

    r4592 r4621  
    1717 
    1818import java.util.ArrayList; 
     19import java.util.Arrays; 
     20import java.util.HashSet; 
    1921import java.util.List; 
     22import java.util.Set; 
    2023 
    2124import org.apache.hadoop.hbase.util.Bytes; 
     
    4144        return multiValue; 
    4245    } 
    43      
     46 
    4447    public boolean isHierarchical() { 
    4548        return hierarchical; 
     
    7982        return result; 
    8083    } 
    81      
     84 
    8285    private HierarchyPath fromHierarchicalBytes(byte[] bytes) { 
    8386        List<Object> result = new ArrayList<Object>(); 
     
    9497        return new HierarchyPath(result.toArray(new Object[result.size()])); 
    9598    } 
    96      
     99 
    97100    public byte[] toBytes(Object value) { 
    98101        if (isMultiValue()) { 
     
    108111        byte[] result; 
    109112        result = new byte[0]; 
    110         for(Object element : ((List<Object>)value)) { 
     113        for (Object element : ((List<Object>) value)) { 
    111114            byte[] encodedValue; 
    112115            if (isHierarchical()) { 
     
    120123        return result; 
    121124    } 
    122      
     125 
    123126    private byte[] toHierarchyBytes(Object value) { 
    124127        byte[] result; 
    125128        result = new byte[0]; 
    126         for(Object element : ((HierarchyPath)value).getElements()) { 
     129        for (Object element : ((HierarchyPath) value).getElements()) { 
    127130            byte[] encodedValue = primitiveValueType.toBytes(element); 
    128131            result = Bytes.add(result, Bytes.toBytes(encodedValue.length)); 
     
    135138        return primitiveValueType; 
    136139    } 
    137      
     140 
    138141    public Class getType() { 
    139142        if (isMultiValue()) { 
     
    142145        return primitiveValueType.getType(); 
    143146    } 
    144      
     147 
    145148    public byte[] toBytes() { 
    146149        StringBuilder stringBuilder = new StringBuilder(); 
     
    153156    } 
    154157 
     158    public Set<Object> getValues(Object value) { 
     159        Set<Object> result = new HashSet<Object>(); 
     160        if (isMultiValue()) { 
     161            result.addAll(getMultiValueValues(value)); 
     162        } else if (isHierarchical()) { 
     163            result.addAll(getHierarchyValues(value)); 
     164        } else { 
     165            result.add(value); 
     166        } 
     167        return result; 
     168    } 
     169 
     170    private Set<Object> getMultiValueValues(Object value) { 
     171        Set<Object> result = new HashSet<Object>(); 
     172        if (isHierarchical()) { 
     173            for (Object element : ((List<Object>) value)) { 
     174                result.addAll(getHierarchyValues(element)); 
     175            } 
     176        } else { 
     177            result.addAll((List<Object>) value); 
     178        } 
     179        return result; 
     180    } 
     181 
     182    private Set<Object> getHierarchyValues(Object value) { 
     183        Set<Object> result = new HashSet<Object>(); 
     184        result.addAll(Arrays.asList(((HierarchyPath) value).getElements())); 
     185        return result; 
     186    } 
     187     
    155188    public static ValueType fromBytes(byte[] bytes, AbstractTypeManager typeManager) { 
    156189        String encodedString = Bytes.toString(bytes); 
    157190        int endOfPrimitiveValueTypeName = encodedString.indexOf(","); 
    158191        String primitiveValueTypeName = encodedString.substring(0, endOfPrimitiveValueTypeName); 
    159         int endOfMultiValueBoolean = encodedString.indexOf(",", endOfPrimitiveValueTypeName+1);  
    160         boolean multiValue = Boolean.parseBoolean(encodedString.substring(endOfPrimitiveValueTypeName + 1, endOfMultiValueBoolean)); 
     192        int endOfMultiValueBoolean = encodedString.indexOf(",", endOfPrimitiveValueTypeName + 1); 
     193        boolean multiValue = Boolean.parseBoolean(encodedString.substring(endOfPrimitiveValueTypeName + 1, 
     194                endOfMultiValueBoolean)); 
    161195        boolean hierarchical = Boolean.parseBoolean(encodedString.substring(endOfMultiValueBoolean + 1)); 
    162196        return typeManager.getValueType(primitiveValueTypeName, multiValue, hierarchical); 
     
    190224        return true; 
    191225    } 
    192  
    193226} 
  • trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AbstractBlobStoreTest.java

    r4611 r4621  
    2222import java.io.InputStream; 
    2323import java.io.OutputStream; 
     24import java.net.URISyntaxException; 
    2425import java.util.Arrays; 
    2526import java.util.Random; 
    2627 
    2728import org.apache.hadoop.conf.Configuration; 
     29import org.apache.hadoop.fs.Path; 
    2830import org.apache.hadoop.hbase.util.Bytes; 
    2931import org.junit.Test; 
    3032import org.lilyproject.repository.api.Blob; 
    3133import org.lilyproject.repository.api.BlobException; 
     34import org.lilyproject.repository.api.BlobManager; 
    3235import org.lilyproject.repository.api.BlobNotFoundException; 
     36import org.lilyproject.repository.api.BlobStoreAccess; 
    3337import org.lilyproject.repository.api.FieldType; 
    3438import org.lilyproject.repository.api.FieldTypeEntry; 
     39import org.lilyproject.repository.api.FieldTypeNotFoundException; 
     40import org.lilyproject.repository.api.HierarchyPath; 
     41import org.lilyproject.repository.api.InvalidRecordException; 
    3542import org.lilyproject.repository.api.QName; 
    3643import org.lilyproject.repository.api.Record; 
     44import org.lilyproject.repository.api.RecordException; 
     45import org.lilyproject.repository.api.RecordId; 
     46import org.lilyproject.repository.api.RecordNotFoundException; 
    3747import org.lilyproject.repository.api.RecordType; 
     48import org.lilyproject.repository.api.RecordTypeNotFoundException; 
    3849import org.lilyproject.repository.api.Repository; 
    3950import org.lilyproject.repository.api.Scope; 
     51import org.lilyproject.repository.api.TypeException; 
    4052import org.lilyproject.repository.api.TypeManager; 
     53import org.lilyproject.repository.api.VersionNotFoundException; 
     54import org.lilyproject.repository.impl.BlobManagerImpl; 
     55import org.lilyproject.repository.impl.BlobStoreAccessRegistry; 
     56import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
     57import org.lilyproject.repository.impl.HBaseBlobStoreAccess; 
     58import org.lilyproject.repository.impl.InlineBlobStoreAccess; 
     59import org.lilyproject.repository.impl.SizeBasedBlobStoreAccessFactory; 
    4160import org.lilyproject.rowlog.api.RowLog; 
    4261import org.lilyproject.rowlog.api.RowLogConfig; 
    4362import org.lilyproject.rowlog.api.RowLogConfigurationManager; 
    44 import org.lilyproject.rowlog.api.RowLogException; 
    4563import org.lilyproject.rowlog.api.RowLogShard; 
    4664import org.lilyproject.rowlog.impl.RowLogConfigurationManagerImpl; 
    4765import org.lilyproject.rowlog.impl.RowLogImpl; 
    4866import org.lilyproject.rowlog.impl.RowLogShardImpl; 
     67import org.lilyproject.testfw.HBaseProxy; 
    4968import org.lilyproject.util.hbase.HBaseTableFactory; 
    50 import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 
    5169import org.lilyproject.util.hbase.LilyHBaseSchema; 
     70import org.lilyproject.util.hbase.LilyHBaseSchema.RecordCf; 
    5271import org.lilyproject.util.zookeeper.ZooKeeperItf; 
    53 import static org.lilyproject.util.hbase.LilyHBaseSchema.*; 
    5472 
    5573public abstract class AbstractBlobStoreTest { 
     74    protected final static HBaseProxy HBASE_PROXY = new HBaseProxy(); 
    5675    protected static RowLog wal; 
    5776    protected static Repository repository; 
     
    6180    protected static RowLogConfigurationManager rowLogConfMgr; 
    6281    protected static HBaseTableFactory hbaseTableFactory; 
    63      
     82    protected static SizeBasedBlobStoreAccessFactory blobStoreAccessFactory; 
     83    protected static BlobStoreAccess dfsBlobStoreAccess; 
     84    protected static BlobStoreAccess hbaseBlobStoreAccess; 
     85    protected static BlobStoreAccess inlineBlobStoreAccess; 
     86    protected static Random random = new Random(); 
     87    protected static BlobStoreAccessRegistry testBlobStoreAccessRegistry; 
     88 
    6489    protected static void setupWal() throws Exception { 
    6590        rowLogConfMgr = new RowLogConfigurationManagerImpl(zooKeeper); 
     
    7095        wal.registerShard(walShard); 
    7196    } 
     97 
     98    protected static BlobManager setupBlobManager() throws IOException, URISyntaxException { 
     99        dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 
     100        hbaseBlobStoreAccess = new HBaseBlobStoreAccess(configuration); 
     101        inlineBlobStoreAccess = new InlineBlobStoreAccess(); 
     102        blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 
     103        blobStoreAccessFactory.addBlobStoreAccess(50, inlineBlobStoreAccess); 
     104        blobStoreAccessFactory.addBlobStoreAccess(1024, hbaseBlobStoreAccess); 
     105        return new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 
     106    } 
    72107     
    73108    @Test 
    74109    public void testCreate() throws Exception { 
     110        QName fieldName = new QName("test", "testCreate"); 
     111        FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, 
     112                Scope.NON_VERSIONED); 
     113        fieldType = typeManager.createFieldType(fieldType); 
     114        RecordType recordType = typeManager.newRecordType(new QName(null, "testCreateRT")); 
     115        FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 
     116        recordType.addFieldTypeEntry(fieldTypeEntry); 
     117        recordType = typeManager.createRecordType(recordType); 
     118         
    75119        byte[] bytes = Bytes.toBytes("someBytes"); 
    76         Blob blob = new Blob("aMediaType", (long)bytes.length, "testCreate"); 
    77         OutputStream outputStream = repository.getOutputStream(blob); 
    78         outputStream.write(bytes); 
    79         outputStream.close(); 
    80          
    81         InputStream inputStream = repository.getInputStream(blob); 
    82         byte[] readBytes = new byte[blob.getSize().intValue()]; 
    83         inputStream.read(readBytes); 
    84         inputStream.close(); 
     120        Blob blob = writeBlob(bytes, "aMediaType", "testCreate"); 
     121        Record record = repository.newRecord(); 
     122        record.setRecordType(recordType.getName()); 
     123        record.setField(fieldName, blob); 
     124        record = repository.create(record); 
     125         
     126        byte[] readBytes = readBlob(record.getId(), fieldName, blob.getSize()); 
    85127        assertTrue(Arrays.equals(bytes, readBytes)); 
    86128    } 
    87      
     129 
    88130    @Test 
    89131    public void testThreeSizes() throws Exception { 
    90         Random random = new Random(); 
     132        QName fieldName1 = new QName("test", "testThreeSizes1"); 
     133        QName fieldName2 = new QName("test", "testThreeSizes2"); 
     134        QName fieldName3 = new QName("test", "testThreeSizes3"); 
     135        FieldType fieldType1 = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName1, 
     136                Scope.NON_VERSIONED); 
     137        fieldType1 = typeManager.createFieldType(fieldType1); 
     138        FieldType fieldType2 = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName2, 
     139                Scope.NON_VERSIONED); 
     140        fieldType2 = typeManager.createFieldType(fieldType2); 
     141        FieldType fieldType3 = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName3, 
     142                Scope.NON_VERSIONED); 
     143        fieldType3 = typeManager.createFieldType(fieldType3); 
     144        RecordType recordType = typeManager.newRecordType(new QName(null, "testThreeSizes")); 
     145        recordType.addFieldTypeEntry(typeManager.newFieldTypeEntry(fieldType1.getId(), true)); 
     146        recordType.addFieldTypeEntry(typeManager.newFieldTypeEntry(fieldType2.getId(), true)); 
     147        recordType.addFieldTypeEntry(typeManager.newFieldTypeEntry(fieldType3.getId(), true)); 
     148        recordType = typeManager.createRecordType(recordType); 
     149         
    91150        byte[] small = new byte[10]; 
    92151        random.nextBytes(small); 
     
    95154        byte[] large = new byte[2048]; 
    96155        random.nextBytes(large); 
    97         Blob smallBlob = new Blob("mime/small", (long)10, "small"); 
    98         Blob mediumBlob = new Blob("mime/medium", (long)100, "medium"); 
    99         Blob largeBlob = new Blob("mime/large", (long)2048, "large"); 
    100         OutputStream outputStream = repository.getOutputStream(smallBlob); 
    101         outputStream.write(small); 
    102         outputStream.close(); 
    103         outputStream = repository.getOutputStream(mediumBlob); 
    104         outputStream.write(medium); 
    105         outputStream.close(); 
    106         outputStream = repository.getOutputStream(largeBlob); 
    107         outputStream.write(large); 
    108         outputStream.close(); 
    109  
    110         InputStream inputStream = repository.getInputStream(smallBlob); 
    111         byte[] readBytes = new byte[10]; 
    112         inputStream.read(readBytes); 
    113         inputStream.close(); 
     156         
     157        Blob smallBlob = writeBlob(small, "mime/small", "small"); 
     158        Blob mediumBlob = writeBlob(medium, "mime/medium", "medium"); 
     159        Blob largeBlob = writeBlob(large, "mime/large", "large"); 
     160         
     161        Record record = repository.newRecord(); 
     162        record.setRecordType(recordType.getName()); 
     163        record.setField(fieldName1, smallBlob); 
     164        record.setField(fieldName2, mediumBlob); 
     165        record.setField(fieldName3, largeBlob); 
     166        record = repository.create(record); 
     167         
     168        byte[] readBytes = readBlob(record.getId(), fieldName1, smallBlob.getSize()); 
    114169        assertTrue(Arrays.equals(small, readBytes)); 
    115         inputStream = repository.getInputStream(mediumBlob); 
    116         readBytes = new byte[100]; 
    117         inputStream.read(readBytes); 
    118         inputStream.close(); 
     170        readBytes = readBlob(record.getId(), fieldName2, mediumBlob.getSize()); 
    119171        assertTrue(Arrays.equals(medium, readBytes)); 
    120         inputStream = repository.getInputStream(largeBlob); 
    121         readBytes = new byte[2048]; 
    122         inputStream.read(readBytes); 
    123         inputStream.close(); 
     172        readBytes = readBlob(record.getId(), fieldName3, largeBlob.getSize()); 
    124173        assertTrue(Arrays.equals(large, readBytes)); 
    125174    } 
    126      
    127     @Test 
    128     public void testCreateRecordWithBlob() throws Exception { 
    129         QName fieldName = new QName("test", "ablob"); 
    130         FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, Scope.VERSIONED); 
    131         fieldType = typeManager.createFieldType(fieldType); 
    132         RecordType recordType = typeManager.newRecordType(new QName(null, "testCreateRecordWithBlobRT")); 
     175 
     176    @Test 
     177    public void testCreateTwoRecordsWithSameBlob() throws Exception { 
     178        QName fieldName = new QName("test", "ablob2"); 
     179        FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, 
     180                Scope.VERSIONED); 
     181        fieldType = typeManager.createFieldType(fieldType); 
     182        RecordType recordType = typeManager.newRecordType(new QName(null, "testCreateTwoRecordsWithSameBlobRT")); 
    133183        FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 
    134184        recordType.addFieldTypeEntry(fieldTypeEntry); 
    135185        recordType = typeManager.createRecordType(recordType); 
     186 
     187        byte[] bytes = Bytes.toBytes("someBytes"); 
     188        Blob blob = writeBlob(bytes, "aMediaType", "testCreate"); 
     189 
    136190        Record record = repository.newRecord(); 
    137191        record.setRecordType(recordType.getName(), null); 
    138  
    139         byte[] bytes = Bytes.toBytes("someBytes"); 
    140         Blob blob = new Blob("aMediaType", (long)bytes.length, "testCreate"); 
     192        record.setField(fieldName, blob); 
     193        record = repository.create(record); 
     194 
     195        Record record2 = repository.newRecord(); 
     196        record2.setRecordType(recordType.getName(), null); 
     197        record2.setField(fieldName, blob); 
     198        record2 = repository.create(record2); // For an inline record this succeeds 
     199 
     200        byte[] bytesLarge = new byte[3000];  
     201        random.nextBytes(bytesLarge); 
     202        Blob largeBlob = writeBlob(bytesLarge, "largeBlob", "testCreate"); 
     203         
     204        Record record3 = repository.newRecord(); 
     205        record3.setRecordType(recordType.getName(), null); 
     206        record3.setField(fieldName, largeBlob); 
     207        record3 = repository.create(record3); 
     208 
     209        Record record4 = repository.newRecord(); 
     210        record4.setRecordType(recordType.getName(), null); 
     211        record4.setField(fieldName, largeBlob); 
     212         
     213        try { 
     214            record4 = repository.create(record4); 
     215            fail("Using the same blob in two records should not succeed"); 
     216        } catch (InvalidRecordException expected) { 
     217            System.out.println("[Evert]"); 
     218        } 
     219    } 
     220 
     221    @Test 
     222    public void testUpdateNonVersionedBlobHDFS() throws Exception { 
     223        testUpdateNonVersionedBlob(3000, true); 
     224    } 
     225     
     226    @Test 
     227    public void testUpdateNonVersionedBlobHBase() throws Exception { 
     228        testUpdateNonVersionedBlob(150, true); 
     229    } 
     230     
     231    @Test 
     232    public void testUpdateNonVersionedBlobInline() throws Exception { 
     233        testUpdateNonVersionedBlob(50, false); 
     234    } 
     235     
     236    private void testUpdateNonVersionedBlob(int size, boolean expectDelete) throws Exception { 
     237        QName fieldName = new QName("test", "testUpdateNonVersionedBlob"+size); 
     238        FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, 
     239                Scope.NON_VERSIONED); 
     240        fieldType = typeManager.createFieldType(fieldType); 
     241        RecordType recordType = typeManager.newRecordType(new QName(null, "testUpdateNonVersionedBlobRT"+size)); 
     242        FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 
     243        recordType.addFieldTypeEntry(fieldTypeEntry); 
     244        recordType = typeManager.createRecordType(recordType); 
     245 
     246        byte[] bytes = new byte[size];  
     247        random.nextBytes(bytes); 
     248        Blob blob = writeBlob(bytes, "aMediaType", "testUpdateNonVersionedBlob"); 
     249 
     250        byte[] bytes2 = new byte[size];  
     251        random.nextBytes(bytes2); 
     252        Blob blob2 = writeBlob(bytes2, "aMediaType", "testUpdateNonVersionedBlob2"); 
     253         
     254        Record record = repository.newRecord(); 
     255        record.setRecordType(recordType.getName(), null); 
     256        record.setField(fieldName, blob); 
     257        record = repository.create(record); 
     258 
     259        Record record2 = repository.newRecord(record.getId()); 
     260        record2.setRecordType(recordType.getName(), null); 
     261        record2.setField(fieldName, blob2); 
     262         
     263        record = repository.update(record2); 
     264         
     265        // Reading should return blob2 
     266        byte[] readBytes = readBlob(record.getId(), record.getVersion(), fieldName, null, null, size); 
     267        assertTrue(Arrays.equals(bytes2, readBytes)); 
     268         
     269        assertBlobDelete(expectDelete, blob); 
     270    } 
     271     
     272    @Test 
     273    public void testDeleteNonVersionedBlobHDFS() throws Exception { 
     274        testDeleteNonVersionedBlob(3000, true); 
     275    } 
     276     
     277    @Test 
     278    public void testDeleteNonVersionedBlobHBase() throws Exception { 
     279        testDeleteNonVersionedBlob(150, true); 
     280    } 
     281     
     282    @Test 
     283    public void testDeleteNonVersionedBlobInline() throws Exception { 
     284        testDeleteNonVersionedBlob(50, false); 
     285    } 
     286     
     287    private void testDeleteNonVersionedBlob(int size, boolean expectDelete) throws Exception { 
     288        QName fieldName = new QName("test", "testDeleteNonVersionedBlob"+size); 
     289        FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, 
     290                Scope.NON_VERSIONED); 
     291        fieldType = typeManager.createFieldType(fieldType); 
     292        RecordType recordType = typeManager.newRecordType(new QName(null, "testDeleteNonVersionedBlobRT"+size)); 
     293        FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), false); 
     294        recordType.addFieldTypeEntry(fieldTypeEntry); 
     295        recordType = typeManager.createRecordType(recordType); 
     296 
     297        byte[] bytes = new byte[size];  
     298        random.nextBytes(bytes); 
     299        Blob blob = writeBlob(bytes, "aMediaType", "testDeleteNonVersionedBlob"); 
     300 
     301        Record record = repository.newRecord(); 
     302        record.setRecordType(recordType.getName(), null); 
     303        record.setField(fieldName, blob); 
     304        record = repository.create(record); 
     305 
     306        Record record2 = repository.newRecord(record.getId()); 
     307        record2.setRecordType(recordType.getName(), null); 
     308        record2.addFieldsToDelete(Arrays.asList(new QName[]{fieldName})); 
     309         
     310        record = repository.update(record2); 
     311 
     312        assertBlobDelete(expectDelete, blob); 
     313    } 
     314     
     315    @Test 
     316    public void testUpdateMutableBlobHDFS() throws Exception { 
     317        testUpdateMutableBlob(3000, true); 
     318    } 
     319 
     320    @Test 
     321    public void testUpdateMutableBlobHBase() throws Exception { 
     322        testUpdateMutableBlob(150, true); 
     323    } 
     324 
     325    @Test 
     326    public void testUpdateMutableBlobInline() throws Exception { 
     327        testUpdateMutableBlob(50, false); 
     328    } 
     329 
     330    private void testUpdateMutableBlob(int size, boolean expectDelete) throws Exception { 
     331        QName fieldName = new QName("test", "testUpdateMutableBlob"+size); 
     332        FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, 
     333                Scope.VERSIONED_MUTABLE); 
     334        fieldType = typeManager.createFieldType(fieldType); 
     335        RecordType recordType = typeManager.newRecordType(new QName(null, "testUpdateMutableBlobRT"+size)); 
     336        FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 
     337        recordType.addFieldTypeEntry(fieldTypeEntry); 
     338        recordType = typeManager.createRecordType(recordType); 
     339 
     340        byte[] bytes = new byte[size];  
     341        random.nextBytes(bytes); 
     342        Blob blob = writeBlob(bytes, "aMediaType", "testUpdateMutableBlob"); 
     343 
     344        byte[] bytes2 = new byte[size];  
     345        random.nextBytes(bytes2); 
     346        Blob blob2 = writeBlob(bytes2, "aMediaType", "testUpdateMutableBlob2"); 
     347         
     348        Record record = repository.newRecord(); 
     349        record.setRecordType(recordType.getName(), null); 
     350        record.setField(fieldName, blob); 
     351        record = repository.create(record); 
     352 
     353        Record record2 = repository.newRecord(record.getId()); 
     354        record2.setRecordType(recordType.getName(), null); 
     355        record2.setField(fieldName, blob2); 
     356        record2.setVersion(record.getVersion()); 
     357         
     358        record = repository.update(record2, true, false); 
     359         
     360        // Blob2 should still exist 
     361        byte[] readBytes = readBlob(record.getId(), record.getVersion(), fieldName, null, null, size); 
     362        assertTrue(Arrays.equals(bytes2, readBytes)); 
     363         
     364        assertBlobDelete(expectDelete, blob); 
     365    } 
     366 
     367    @Test 
     368    public void testDeleteMutableBlobHDFS() throws Exception { 
     369        testDeleteMutableBlob(3000, true); 
     370    } 
     371 
     372    @Test 
     373    public void testDeleteMutableBlobHBase() throws Exception { 
     374        testDeleteMutableBlob(150, true); 
     375    } 
     376     
     377    @Test 
     378    public void testDeleteMutableBlobInline() throws Exception { 
     379        testDeleteMutableBlob(50, false); 
     380    } 
     381     
     382    private void testDeleteMutableBlob(int size, boolean expectDelete) throws Exception { 
     383        QName fieldName = new QName("test", "testDeleteMutableBlob"+size); 
     384        FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, 
     385                Scope.VERSIONED_MUTABLE); 
     386        fieldType = typeManager.createFieldType(fieldType); 
     387        RecordType recordType = typeManager.newRecordType(new QName(null, "testDeleteMutableBlobRT"+size)); 
     388        FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), false); 
     389        recordType.addFieldTypeEntry(fieldTypeEntry); 
     390        recordType = typeManager.createRecordType(recordType); 
     391 
     392        byte[] bytes = new byte[size];  
     393        random.nextBytes(bytes); 
     394        Blob blob = writeBlob(bytes, "aMediaType", "testDeleteMutableBlob"); 
     395 
     396        byte[] bytes2 = new byte[size];  
     397        random.nextBytes(bytes2); 
     398        Blob blob2 = writeBlob(bytes2, "aMediaType", "testDeleteMutableBlob2"); 
     399         
     400        Record record = repository.newRecord(); 
     401        record.setRecordType(recordType.getName(), null); 
     402        record.setField(fieldName, blob); 
     403        record = repository.create(record); 
     404 
     405        Record record2 = repository.newRecord(record.getId()); 
     406        record2.setRecordType(recordType.getName(), null); 
     407        record2.setField(fieldName, blob2); 
     408        repository.update(record2, false, false); 
     409         
     410        // Blob1 should still exist 
     411        byte[] readBytes = readBlob(record.getId(), record.getVersion(), fieldName, null, null, size); 
     412        assertTrue(Arrays.equals(bytes, readBytes)); 
     413        // Blob2 should still exist 
     414        readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, null, null, size); 
     415        assertTrue(Arrays.equals(bytes2, readBytes)); 
     416         
     417        Record record3 = repository.newRecord(record.getId()); 
     418        record3.setRecordType(recordType.getName(), null); 
     419        record3.addFieldsToDelete(Arrays.asList(new QName[]{fieldName})); 
     420        record3.setVersion(record.getVersion()); 
     421        repository.update(record3, true, false); 
     422         
     423        // Blob2 should still exist 
     424        readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, null, null, size); 
     425        assertTrue(Arrays.equals(bytes2, readBytes)); 
     426         
     427        assertBlobDelete(expectDelete, blob); 
     428    } 
     429     
     430    @Test 
     431    public void testUpdateMutableMultivalueBlobHDFS() throws Exception { 
     432        testUpdateMutableMultivalueBlob(3000, true); 
     433    } 
     434 
     435    @Test 
     436    public void testUpdateMutableMultivalueBlobHBase() throws Exception { 
     437        testUpdateMutableMultivalueBlob(150, true); 
     438    } 
     439     
     440    @Test 
     441    public void testUpdateMutableMultivalueBlobInline() throws Exception { 
     442        testUpdateMutableMultivalueBlob(50, false); 
     443    } 
     444     
     445    private void testUpdateMutableMultivalueBlob(int size, boolean expectDelete) throws Exception { 
     446        QName fieldName = new QName("test", "testUpdateMutableMultivalueBlob"+size); 
     447        FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", true, false), fieldName, 
     448                Scope.VERSIONED_MUTABLE); 
     449        fieldType = typeManager.createFieldType(fieldType); 
     450        RecordType recordType = typeManager.newRecordType(new QName(null, "testUpdateMutableMultivalueBlobRT"+size)); 
     451        FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 
     452        recordType.addFieldTypeEntry(fieldTypeEntry); 
     453        recordType = typeManager.createRecordType(recordType); 
     454 
     455        byte[] bytes = new byte[size];  
     456        random.nextBytes(bytes); 
     457        Blob blob = writeBlob(bytes, "aMediaType", "testUpdateMutableMultivalueBlob"); 
     458 
     459        byte[] bytes2 = new byte[size];  
     460        random.nextBytes(bytes2); 
     461        Blob blob2 = writeBlob(bytes2, "aMediaType", "testUpdateMutableMultivalueBlob2"); 
     462         
     463        byte[] bytes3 = new byte[size];  
     464        random.nextBytes(bytes3); 
     465        Blob blob3 = writeBlob(bytes3, "aMediaType", "testUpdateMutableMultivalueBlob3"); 
     466         
     467        byte[] bytes4 = new byte[size];  
     468        random.nextBytes(bytes4); 
     469        Blob blob4 = writeBlob(bytes4, "aMediaType", "testUpdateMutableMultivalueBlob4"); 
     470         
     471        Record record = repository.newRecord(); 
     472        record.setRecordType(recordType.getName(), null); 
     473        record.setField(fieldName, Arrays.asList(new Blob[]{blob, blob2})); 
     474        record = repository.create(record); 
     475 
     476        Record record2 = repository.newRecord(record.getId()); 
     477        record2.setRecordType(recordType.getName(), null); 
     478        record2.setField(fieldName, Arrays.asList(new Blob[]{blob2, blob3})); 
     479        record2 = repository.update(record2, false, false); 
     480 
     481        // Mutable update of first version 
     482        Record record3 = repository.newRecord(record.getId()); 
     483        record3.setVersion(record.getVersion()); 
     484        record3.setRecordType(recordType.getName(), null); 
     485        record3.setField(fieldName, Arrays.asList(new Blob[]{blob4})); 
     486        record3 = repository.update(record3, true, false); 
     487         
     488        //Blob2 
     489        byte[] readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, 0, null, size); 
     490        assertTrue(Arrays.equals(bytes2, readBytes)); 
     491 
     492        //Blob3 
     493        readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, 1, null, size); 
     494        assertTrue(Arrays.equals(bytes3, readBytes)); 
     495         
     496        //Blob4 in version 1 
     497        readBytes = readBlob(record.getId(), record.getVersion(), fieldName, 0, null, size); 
     498        assertTrue(Arrays.equals(bytes4, readBytes)); 
     499         
     500        assertBlobDelete(expectDelete, blob); 
     501         
     502        try { 
     503            readBlob(record.getId(), record.getVersion(), fieldName, null, null, size); 
     504            fail("BlobNotFoundException expected since index should not be null"); 
     505        } catch (BlobNotFoundException expected) { 
     506             
     507        } 
     508 
     509        try { 
     510            readBlob(record.getId(), record.getVersion(), fieldName, 1, null, size); 
     511            fail("BlobNotFoundException expected since index is out of bounds"); 
     512        } catch (BlobNotFoundException expected) { 
     513             
     514        } 
     515 
     516    } 
     517     
     518    @Test 
     519    public void testUpdateMutableHierarchyBlobHDFS() throws Exception { 
     520        testUpdateMutableHierarchyBlob(3000, true); 
     521    } 
     522     
     523    @Test 
     524    public void testUpdateMutableHierarchyBlobHBase() throws Exception { 
     525        testUpdateMutableHierarchyBlob(150, true); 
     526    } 
     527     
     528    @Test 
     529    public void testUpdateMutableHierarchyBlobInline() throws Exception { 
     530        testUpdateMutableHierarchyBlob(50, false); 
     531    } 
     532     
     533    private void testUpdateMutableHierarchyBlob(int size, boolean expectDelete) throws Exception { 
     534        QName fieldName = new QName("test", "testUpdateMutableHierarchyBlob"+size); 
     535        FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, true), fieldName, 
     536                Scope.VERSIONED_MUTABLE); 
     537        fieldType = typeManager.createFieldType(fieldType); 
     538        RecordType recordType = typeManager.newRecordType(new QName(null, "testUpdateMutableHierarchyBlobRT"+size)); 
     539        FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 
     540        recordType.addFieldTypeEntry(fieldTypeEntry); 
     541        recordType = typeManager.createRecordType(recordType); 
     542 
     543        byte[] bytes = new byte[size];  
     544        random.nextBytes(bytes); 
     545        Blob blob = writeBlob(bytes, "aMediaType", "testUpdateMutableHierarchyBlob"); 
     546 
     547        byte[] bytes2 = new byte[size];  
     548        random.nextBytes(bytes2); 
     549        Blob blob2 = writeBlob(bytes2, "aMediaType", "testUpdateMutableHierarchyBlob2"); 
     550         
     551        byte[] bytes3 = new byte[size];  
     552        random.nextBytes(bytes3); 
     553        Blob blob3 = writeBlob(bytes3, "aMediaType", "testUpdateMutableHierarchyBlob3"); 
     554         
     555        byte[] bytes4 = new byte[size];  
     556        random.nextBytes(bytes4); 
     557        Blob blob4 = writeBlob(bytes4, "aMediaType", "testUpdateMutableHierarchyBlob4"); 
     558         
     559        Record record = repository.newRecord(); 
     560        record.setRecordType(recordType.getName(), null); 
     561        record.setField(fieldName, new HierarchyPath(blob, blob2)); 
     562        record = repository.create(record); 
     563 
     564        Record record2 = repository.newRecord(record.getId()); 
     565        record2.setRecordType(recordType.getName(), null); 
     566        record2.setField(fieldName, new HierarchyPath(blob2, blob3, blob4)); 
     567        record2 = repository.update(record2, false, false); 
     568 
     569        // Mutable update of first version 
     570        Record record3 = repository.newRecord(record.getId()); 
     571        record3.setVersion(record.getVersion()); 
     572        record3.setRecordType(recordType.getName(), null); 
     573        record3.setField(fieldName, new HierarchyPath(blob4, blob4)); 
     574        record3 = repository.update(record3, true, false); 
     575         
     576        // Blob2 
     577        byte[] readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, null, 0, size); 
     578        assertTrue(Arrays.equals(bytes2, readBytes)); 
     579         
     580        // Blob3 
     581        readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, null, 1, size); 
     582        assertTrue(Arrays.equals(bytes3, readBytes)); 
     583         
     584        // Blob4 in version1 
     585        readBytes = readBlob(record.getId(), record.getVersion(), fieldName, null, 1, size); 
     586        assertTrue(Arrays.equals(bytes4, readBytes)); 
     587         
     588        assertBlobDelete(expectDelete, blob); 
     589 
     590        try { 
     591            readBlob(record.getId(), record.getVersion(), fieldName, null, null, size); 
     592            fail("BlobNotFoundException expected since index should not be null"); 
     593        } catch (BlobNotFoundException expected) { 
     594             
     595        } 
     596 
     597        try { 
     598            readBlob(record.getId(), record.getVersion(), fieldName, null, 2, size); 
     599            fail("BlobNotFoundException expected since index is out of bounds"); 
     600        } catch (BlobNotFoundException expected) { 
     601             
     602        } 
     603    } 
     604 
     605    @Test 
     606    public void testUpdateMutableMultivalueHierarchyBlobHDFS() throws Exception { 
     607        testUpdateMutableMultivalueHierarchyBlob(3000, true); 
     608    } 
     609     
     610    @Test 
     611    public void testUpdateMutableMultivalueHierarchyBlobHBase() throws Exception { 
     612        testUpdateMutableMultivalueHierarchyBlob(150, true); 
     613    } 
     614 
     615    @Test 
     616    public void testUpdateMutableMultivalueHierarchyBlobInline() throws Exception { 
     617        testUpdateMutableMultivalueHierarchyBlob(50, false); 
     618    } 
     619 
     620    private void testUpdateMutableMultivalueHierarchyBlob(int size, boolean expectDelete) throws Exception { 
     621        QName fieldName = new QName("test", "testUpdateMutableMultivalueHierarchyBlob"+size); 
     622        FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", true, true), fieldName, 
     623                Scope.VERSIONED_MUTABLE); 
     624        fieldType = typeManager.createFieldType(fieldType); 
     625        RecordType recordType = typeManager.newRecordType(new QName(null, "testUpdateMutableMultivalueHierarchyBlobRT"+size)); 
     626        FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 
     627        recordType.addFieldTypeEntry(fieldTypeEntry); 
     628        recordType = typeManager.createRecordType(recordType); 
     629 
     630        byte[] bytes = new byte[size];  
     631        random.nextBytes(bytes); 
     632        Blob blob = writeBlob(bytes, "aMediaType", "testUpdateMutableMultivalueHierarchyBlob"); 
     633 
     634        byte[] bytes2 = new byte[size];  
     635        random.nextBytes(bytes); 
     636        Blob blob2 = writeBlob(bytes2, "aMediaType", "testUpdateMutableMultivalueHierarchyBlob2"); 
     637         
     638        byte[] bytes3 = new byte[size];  
     639        random.nextBytes(bytes3); 
     640        Blob blob3 = writeBlob(bytes3, "aMediaType", "testUpdateMutableMultivalueHierarchyBlob3"); 
     641         
     642        byte[] bytes4 = new byte[size];  
     643        random.nextBytes(bytes4); 
     644        Blob blob4 = writeBlob(bytes4, "aMediaType", "testUpdateMutableMultivalueHierarchyBlob4"); 
     645         
     646        Record record = repository.newRecord(); 
     647        record.setRecordType(recordType.getName(), null); 
     648        record.setField(fieldName, Arrays.asList(new HierarchyPath[]{new HierarchyPath(blob, blob2), new HierarchyPath(blob3)})); 
     649        record = repository.create(record); 
     650 
     651        Record record2 = repository.newRecord(record.getId()); 
     652        record2.setRecordType(recordType.getName(), null); 
     653        record2.setField(fieldName, Arrays.asList(new HierarchyPath[]{new HierarchyPath(blob2), new HierarchyPath(blob3, blob4)})); 
     654        record2 = repository.update(record2, false, false); 
     655 
     656        // Mutable update of first version 
     657        Record record3 = repository.newRecord(record.getId()); 
     658        record3.setVersion(record.getVersion()); 
     659        record3.setRecordType(recordType.getName(), null); 
     660        record3.setField(fieldName, Arrays.asList(new HierarchyPath[]{new HierarchyPath(blob3, blob4), new HierarchyPath(blob4)})); 
     661        record3 = repository.update(record3, true, false); 
     662         
     663        // Blob2 
     664        byte[] readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, 0, 0, size); 
     665        assertTrue(Arrays.equals(bytes2, readBytes)); 
     666         
     667        // Blob3 
     668        readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, 1, 0, size); 
     669        assertTrue(Arrays.equals(bytes3, readBytes)); 
     670         
     671        // Blob4 in version1 
     672        readBytes = readBlob(record.getId(), record.getVersion(), fieldName, 0, 1, size); 
     673        assertTrue(Arrays.equals(bytes4, readBytes)); 
     674         
     675        assertBlobDelete(expectDelete, blob); 
     676         
     677        try { 
     678            readBlob(record.getId(), record.getVersion(), fieldName, null, null, size); 
     679            fail("BlobNotFoundException expected since index should not be null"); 
     680        } catch (BlobNotFoundException expected) { 
     681             
     682        } 
     683         
     684        try { 
     685            readBlob(record.getId(), record.getVersion(), fieldName, 0, null, size); 
     686            fail("BlobNotFoundException expected since index should not be null"); 
     687        } catch (BlobNotFoundException expected) { 
     688             
     689        } 
     690 
     691        try { 
     692            readBlob(record.getId(), record.getVersion(), fieldName, 2, 0, size); 
     693            fail("BlobNotFoundException expected since index is out of bounds"); 
     694        } catch (BlobNotFoundException expected) { 
     695             
     696        } 
     697         
     698        try { 
     699            readBlob(record.getId(), record.getVersion(), fieldName, 1, 1, size); 
     700            fail("BlobNotFoundException expected since index is out of bounds"); 
     701        } catch (BlobNotFoundException expected) { 
     702             
     703        } 
     704    } 
     705 
     706    private void assertBlobDelete(boolean expectDelete, Blob blob) throws BlobNotFoundException, BlobException { 
     707        if (expectDelete) { 
     708            try { 
     709                testBlobStoreAccessRegistry.getInputStream(blob); 
     710                fail("The blob " + blob + " should have been deleted."); 
     711            } catch (BlobException expected) { 
     712            } 
     713        } else { 
     714            testBlobStoreAccessRegistry.getInputStream(blob); 
     715        } 
     716    } 
     717 
     718     
     719    @Test 
     720    public void testBadEncoding() throws Exception { 
     721        Blob blob = new Blob("aMediaType", (long) 10, "aName"); 
     722        blob.setValue(new byte[0]); 
     723        try { 
     724            testBlobStoreAccessRegistry.getInputStream(blob); 
     725            fail(); 
     726        } catch (BlobException expected) { 
     727        } 
     728    } 
     729 
     730    private Blob writeBlob(byte[] bytes, String mediaType, String name) throws BlobException, InterruptedException, 
     731            IOException { 
     732        Blob blob = new Blob(mediaType, (long) bytes.length, name); 
    141733        OutputStream outputStream = repository.getOutputStream(blob); 
    142734        outputStream.write(bytes); 
    143735        outputStream.close(); 
    144         record.setField(fieldName, blob); 
    145         record = repository.create(record); 
    146          
    147         record = repository.read(record.getId()); 
    148         blob = (Blob)record.getField(fieldName); 
    149         InputStream inputStream = repository.getInputStream(blob); 
    150         byte[] readBytes = new byte[blob.getSize().intValue()]; 
     736        return blob; 
     737    } 
     738     
     739    private byte[] readBlob(RecordId recordId, QName fieldName, long size) throws BlobNotFoundException, BlobException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException, InterruptedException, IOException { 
     740        return readBlob(recordId, null, fieldName, null, null, size); 
     741    } 
     742     
     743    private byte[] readBlob(RecordId recordId, Long version, QName fieldName, Integer multivalueIndex, Integer hierarchyIndex, long size) throws BlobNotFoundException, BlobException, InterruptedException, IOException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException { 
     744        InputStream inputStream = repository.getInputStream(recordId, version, fieldName, multivalueIndex, hierarchyIndex); 
     745        byte[] readBytes = new byte[(int)size]; 
    151746        inputStream.read(readBytes); 
    152747        inputStream.close(); 
    153         assertTrue(Arrays.equals(bytes, readBytes)); 
    154     } 
    155      
    156     @Test 
    157     public void testReadBlobWithoutName() throws Exception { 
    158         Blob blob = new Blob("aMediaType", (long)10, "aName"); 
    159         try { 
    160             repository.getInputStream(blob); 
    161             fail(); 
    162         } catch (BlobNotFoundException expected) { 
    163         } 
    164     } 
    165      
    166     @Test 
    167     public void testBadEncoding() throws Exception { 
    168         Blob blob = new Blob("aMediaType", (long)10, "aName"); 
    169         blob.setValue(new byte[0]); 
    170         try { 
    171             repository.getInputStream(blob); 
    172             fail(); 
    173         } catch (BlobException expected) { 
    174         } 
    175     } 
    176      
    177     @Test 
    178     public void testDelete() throws Exception { 
    179         byte[] small = new byte[10]; 
    180         byte[] medium = new byte[100]; 
    181         byte[] large = new byte[2048]; 
    182         Blob smallBlob = new Blob("mime/small", (long)10, "small"); 
    183         OutputStream outputStream = repository.getOutputStream(smallBlob); 
    184         outputStream.write(small); 
    185         outputStream.close(); 
    186         Blob mediumBlob = new Blob("mime/medium", (long)100, "medium"); 
    187         outputStream = repository.getOutputStream(mediumBlob); 
    188         outputStream.write(medium); 
    189         outputStream.close(); 
    190         Blob largeBlob = new Blob("mime/large", (long)10, "large"); 
    191         outputStream = repository.getOutputStream(largeBlob); 
    192         outputStream.write(large); 
    193         outputStream.close(); 
    194          
    195         repository.delete(smallBlob); 
    196         // TODO ok to ignore a delete of an inline blob? it will be deleted when the record is deleted  
    197         repository.getInputStream(smallBlob); 
    198         repository.delete(mediumBlob); 
    199         try { 
    200             repository.getInputStream(smallBlob); 
    201         } catch (BlobException expected) { 
    202         } 
    203         repository.delete(largeBlob); 
    204         try { 
    205             repository.getInputStream(smallBlob); 
    206         } catch (BlobException expected) { 
    207         } 
     748        return readBytes; 
    208749    } 
    209750} 
  • trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AbstractRepositoryTest.java

    r4611 r4621  
    3535import org.junit.Before; 
    3636import org.junit.Test; 
    37 import org.lilyproject.repository.api.*; 
     37import org.lilyproject.repository.api.FieldNotFoundException; 
     38import org.lilyproject.repository.api.FieldType; 
     39import org.lilyproject.repository.api.IdGenerator; 
     40import org.lilyproject.repository.api.IdRecord; 
     41import org.lilyproject.repository.api.InvalidRecordException; 
     42import org.lilyproject.repository.api.QName; 
     43import org.lilyproject.repository.api.Record; 
     44import org.lilyproject.repository.api.RecordId; 
     45import org.lilyproject.repository.api.RecordNotFoundException; 
     46import org.lilyproject.repository.api.RecordType; 
     47import org.lilyproject.repository.api.RecordTypeNotFoundException; 
     48import org.lilyproject.repository.api.Repository; 
     49import org.lilyproject.repository.api.ResponseStatus; 
     50import org.lilyproject.repository.api.Scope; 
     51import org.lilyproject.repository.api.TypeManager; 
     52import org.lilyproject.repository.api.VersionNotFoundException; 
    3853import org.lilyproject.repository.impl.IdGeneratorImpl; 
    3954import org.lilyproject.rowlog.api.RowLog; 
     
    5065import org.lilyproject.testfw.HBaseProxy; 
    5166import org.lilyproject.util.hbase.HBaseTableFactory; 
    52 import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 
     67import org.lilyproject.util.hbase.LilyHBaseSchema.RecordCf; 
    5368import org.lilyproject.util.hbase.LilyHBaseSchema; 
    5469import org.lilyproject.util.repo.VersionTag; 
    5570import org.lilyproject.util.zookeeper.ZooKeeperItf; 
    56 import static org.lilyproject.util.hbase.LilyHBaseSchema.*; 
    5771 
    5872public abstract class AbstractRepositoryTest { 
  • trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AvroRepositoryTest.java

    r4611 r4621  
    2424import org.junit.AfterClass; 
    2525import org.junit.BeforeClass; 
     26import org.lilyproject.repository.api.BlobManager; 
    2627import org.lilyproject.repository.api.BlobStoreAccessFactory; 
    2728import org.lilyproject.repository.api.TypeManager; 
     
    3031import org.lilyproject.repository.avro.AvroLilyImpl; 
    3132import org.lilyproject.repository.avro.LilySpecificResponder; 
     33import org.lilyproject.repository.impl.BlobManagerImpl; 
    3234import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
    3335import org.lilyproject.repository.impl.HBaseRepository; 
     
    6062        DFSBlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 
    6163        BlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 
     64        BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 
    6265        setupWal(); 
    63         serverRepository = new HBaseRepository(serverTypeManager, idGenerator, blobStoreAccessFactory , wal, configuration, hbaseTableFactory); 
     66        serverRepository = new HBaseRepository(serverTypeManager, idGenerator, wal, configuration, hbaseTableFactory, blobManager); 
    6467         
    6568        AvroConverter serverConverter = new AvroConverter(); 
     
    7376                remoteConverter, idGenerator, zooKeeper); 
    7477        repository = new RemoteRepository(new InetSocketAddress(lilyServer.getPort()), remoteConverter, 
    75                 (RemoteTypeManager)typeManager, idGenerator, blobStoreAccessFactory); 
     78                (RemoteTypeManager)typeManager, idGenerator, blobManager); 
    7679        remoteConverter.setRepository(repository); 
    7780        ((RemoteTypeManager)typeManager).start(); 
     
    8083        setupMessageQueueProcessor(); 
    8184    } 
    82  
     85     
    8386    @AfterClass 
    8487    public static void tearDownAfterClass() throws Exception { 
  • trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AvroTypeManagerFieldTypeTest.java

    r4611 r4621  
    2828import org.junit.Before; 
    2929import org.junit.BeforeClass; 
     30import org.lilyproject.repository.api.BlobManager; 
    3031import org.lilyproject.repository.api.BlobStoreAccessFactory; 
    3132import org.lilyproject.repository.api.Repository; 
     
    3536import org.lilyproject.repository.avro.AvroLilyImpl; 
    3637import org.lilyproject.repository.avro.LilySpecificResponder; 
     38import org.lilyproject.repository.impl.BlobManagerImpl; 
    3739import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
    3840import org.lilyproject.repository.impl.HBaseRepository; 
     
    5456import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 
    5557import org.lilyproject.util.hbase.LilyHBaseSchema; 
     58import org.lilyproject.util.hbase.LilyHBaseSchema.RecordCf; 
    5659import org.lilyproject.util.io.Closer; 
    5760import org.lilyproject.util.zookeeper.ZkUtil; 
    5861import org.lilyproject.util.zookeeper.ZooKeeperItf; 
    59 import static org.lilyproject.util.hbase.LilyHBaseSchema.*; 
    6062 
    6163public class AvroTypeManagerFieldTypeTest extends AbstractTypeManagerFieldTypeTest { 
     
    8486        DFSBlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 
    8587        BlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 
    86         serverRepository = new HBaseRepository(serverTypeManager, idGenerator, blobStoreAccessFactory, wal, configuration, hbaseTableFactory); 
     88        BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 
     89        serverRepository = new HBaseRepository(serverTypeManager, idGenerator, wal, configuration, hbaseTableFactory, blobManager); 
    8790         
    8891        AvroConverter serverConverter = new AvroConverter(); 
     
    9699                remoteConverter, idGenerator, zooKeeper); 
    97100        repository = new RemoteRepository(new InetSocketAddress(lilyServer.getPort()), 
    98                 remoteConverter, (RemoteTypeManager)typeManager, idGenerator, blobStoreAccessFactory); 
     101                remoteConverter, (RemoteTypeManager)typeManager, idGenerator, blobManager); 
    99102        remoteConverter.setRepository(repository); 
    100103        ((RemoteTypeManager)typeManager).start(); 
  • trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AvroTypeManagerRecordTypeTest.java

    r4611 r4621  
    2727import org.junit.Before; 
    2828import org.junit.BeforeClass; 
     29import org.lilyproject.repository.api.BlobManager; 
    2930import org.lilyproject.repository.api.BlobStoreAccessFactory; 
    3031import org.lilyproject.repository.api.Repository; 
     
    3435import org.lilyproject.repository.avro.AvroLilyImpl; 
    3536import org.lilyproject.repository.avro.LilySpecificResponder; 
     37import org.lilyproject.repository.impl.BlobManagerImpl; 
    3638import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
    3739import org.lilyproject.repository.impl.HBaseRepository; 
     
    4446import org.lilyproject.rowlog.api.RowLogConfig; 
    4547import org.lilyproject.rowlog.api.RowLogConfigurationManager; 
    46 import org.lilyproject.rowlog.api.RowLogException; 
    4748import org.lilyproject.rowlog.api.RowLogShard; 
    4849import org.lilyproject.rowlog.impl.RowLogConfigurationManagerImpl; 
     
    5455import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 
    5556import org.lilyproject.util.hbase.LilyHBaseSchema; 
     57import org.lilyproject.util.hbase.LilyHBaseSchema.RecordCf; 
    5658import org.lilyproject.util.io.Closer; 
    5759import org.lilyproject.util.zookeeper.StateWatchingZooKeeper; 
    5860import org.lilyproject.util.zookeeper.ZkUtil; 
    5961import org.lilyproject.util.zookeeper.ZooKeeperItf; 
    60 import static org.lilyproject.util.hbase.LilyHBaseSchema.*; 
    6162 
    6263/** 
     
    9697        DFSBlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 
    9798        BlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 
     99        BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 
    98100        setupWal(); 
    99         serverRepository = new HBaseRepository(serverTypeManager, idGenerator, blobStoreAccessFactory, wal, configuration, hbaseTableFactory); 
     101        serverRepository = new HBaseRepository(serverTypeManager, idGenerator, wal, configuration, hbaseTableFactory, blobManager); 
    100102         
    101103        AvroConverter serverConverter = new AvroConverter(); 
     
    110112                remoteConverter, idGenerator, zooKeeper); 
    111113        Repository repository = new RemoteRepository(new InetSocketAddress(lilyServer.getPort()), 
    112                 remoteConverter, (RemoteTypeManager)typeManager, idGenerator, blobStoreAccessFactory); 
     114                remoteConverter, (RemoteTypeManager)typeManager, idGenerator, blobManager); 
    113115        remoteConverter.setRepository(repository); 
    114116        ((RemoteTypeManager)typeManager).start(); 
  • trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/BlobStoreTest.java

    r4611 r4621  
    1717 
    1818 
    19 import org.apache.hadoop.fs.Path; 
    2019import org.junit.After; 
    2120import org.junit.AfterClass; 
    2221import org.junit.Before; 
    2322import org.junit.BeforeClass; 
    24 import org.lilyproject.repository.api.BlobStoreAccess; 
     23import org.lilyproject.repository.api.BlobManager; 
    2524import org.lilyproject.repository.api.IdGenerator; 
    26 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
    27 import org.lilyproject.repository.impl.HBaseBlobStoreAccess; 
     25import org.lilyproject.repository.impl.BlobStoreAccessRegistry; 
    2826import org.lilyproject.repository.impl.HBaseRepository; 
    2927import org.lilyproject.repository.impl.HBaseTypeManager; 
    3028import org.lilyproject.repository.impl.IdGeneratorImpl; 
    31 import org.lilyproject.repository.impl.InlineBlobStoreAccess; 
    32 import org.lilyproject.repository.impl.SizeBasedBlobStoreAccessFactory; 
    33 import org.lilyproject.testfw.HBaseProxy; 
    3429import org.lilyproject.testfw.TestHelper; 
    3530import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 
     
    3934public class BlobStoreTest extends AbstractBlobStoreTest { 
    4035 
    41     private final static HBaseProxy HBASE_PROXY = new HBaseProxy(); 
    4236    @BeforeClass 
    4337    public static void setUpBeforeClass() throws Exception { 
     
    4943        hbaseTableFactory = new HBaseTableFactoryImpl(configuration); 
    5044        typeManager = new HBaseTypeManager(idGenerator, configuration, zooKeeper, hbaseTableFactory); 
    51         BlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 
    52         BlobStoreAccess hbaseBlobStoreAccess = new HBaseBlobStoreAccess(configuration); 
    53         BlobStoreAccess inlineBlobStoreAccess = new InlineBlobStoreAccess();  
    54         SizeBasedBlobStoreAccessFactory factory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 
    55         factory.addBlobStoreAccess(50, inlineBlobStoreAccess); 
    56         factory.addBlobStoreAccess(1024, hbaseBlobStoreAccess); 
    5745        setupWal(); 
    58         repository = new HBaseRepository(typeManager, idGenerator, factory, wal, configuration, hbaseTableFactory); 
     46        BlobManager blobManager = setupBlobManager(); 
     47        repository = new HBaseRepository(typeManager, idGenerator, wal, configuration, hbaseTableFactory, blobManager); 
     48        // Create a blobStoreAccessRegistry for testing purposes 
     49        testBlobStoreAccessRegistry = new BlobStoreAccessRegistry(blobManager); 
     50        testBlobStoreAccessRegistry.setBlobStoreAccessFactory(blobStoreAccessFactory); 
    5951    } 
    6052 
  • trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/HBaseRepositoryTest.java

    r4611 r4621  
    2323import org.junit.BeforeClass; 
    2424import org.junit.Test; 
     25import org.lilyproject.repository.api.BlobManager; 
    2526import org.lilyproject.repository.api.BlobStoreAccessFactory; 
    2627import org.lilyproject.repository.api.Record; 
    2728import org.lilyproject.repository.api.TypeManager; 
     29import org.lilyproject.repository.impl.BlobManagerImpl; 
    2830import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
    2931import org.lilyproject.repository.impl.HBaseRepository; 
     
    5153        DFSBlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 
    5254        blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 
     55        BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory,blobStoreAccessFactory); 
    5356        setupWal(); 
    54         repository = new HBaseRepository(typeManager, idGenerator, blobStoreAccessFactory, wal, configuration, hbaseTableFactory); 
     57        repository = new HBaseRepository(typeManager, idGenerator, wal, configuration, hbaseTableFactory, blobManager); 
    5558        setupTypes(); 
    5659        setupMessageQueue(); 
  • trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/RemoteBlobStoreTest.java

    r4611 r4621  
    2121import org.apache.avro.ipc.NettyServer; 
    2222import org.apache.avro.ipc.Server; 
    23 import org.apache.hadoop.fs.Path; 
    2423import org.junit.After; 
    2524import org.junit.AfterClass; 
    2625import org.junit.Before; 
    2726import org.junit.BeforeClass; 
    28 import org.lilyproject.repository.api.BlobStoreAccess; 
     27import org.lilyproject.repository.api.BlobManager; 
    2928import org.lilyproject.repository.api.TypeManager; 
    3029import org.lilyproject.repository.avro.AvroConverter; 
     
    3231import org.lilyproject.repository.avro.AvroLilyImpl; 
    3332import org.lilyproject.repository.avro.LilySpecificResponder; 
    34 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
    35 import org.lilyproject.repository.impl.HBaseBlobStoreAccess; 
     33import org.lilyproject.repository.impl.BlobStoreAccessRegistry; 
    3634import org.lilyproject.repository.impl.HBaseRepository; 
    3735import org.lilyproject.repository.impl.HBaseTypeManager; 
    3836import org.lilyproject.repository.impl.IdGeneratorImpl; 
    39 import org.lilyproject.repository.impl.InlineBlobStoreAccess; 
    4037import org.lilyproject.repository.impl.RemoteRepository; 
    4138import org.lilyproject.repository.impl.RemoteTypeManager; 
    42 import org.lilyproject.repository.impl.SizeBasedBlobStoreAccessFactory; 
    43 import org.lilyproject.testfw.HBaseProxy; 
    4439import org.lilyproject.testfw.TestHelper; 
    4540import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 
     
    4944public class RemoteBlobStoreTest extends AbstractBlobStoreTest { 
    5045 
    51     private final static HBaseProxy HBASE_PROXY = new HBaseProxy(); 
     46     
    5247    private static HBaseRepository serverRepository; 
    5348    private static Server lilyServer; 
     
    6459        hbaseTableFactory = new HBaseTableFactoryImpl(configuration); 
    6560        serverTypeManager = new HBaseTypeManager(idGenerator, configuration, zooKeeper, hbaseTableFactory); 
    66         BlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 
    67         BlobStoreAccess hbaseBlobStoreAccess = new HBaseBlobStoreAccess(configuration); 
    68         BlobStoreAccess inlineBlobStoreAccess = new InlineBlobStoreAccess();  
    69         SizeBasedBlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 
    70         blobStoreAccessFactory.addBlobStoreAccess(50, inlineBlobStoreAccess); 
    71         blobStoreAccessFactory.addBlobStoreAccess(1024, hbaseBlobStoreAccess); 
     61        BlobManager serverBlobManager = setupBlobManager(); 
    7262        setupWal(); 
    73         serverRepository = new HBaseRepository(serverTypeManager, idGenerator, blobStoreAccessFactory, wal, configuration, hbaseTableFactory); 
     63        serverRepository = new HBaseRepository(serverTypeManager, idGenerator, wal, configuration, hbaseTableFactory, serverBlobManager); 
     64     // Create a blobStoreAccessRegistry for testing purposes 
     65        testBlobStoreAccessRegistry = new BlobStoreAccessRegistry(serverBlobManager); 
     66        testBlobStoreAccessRegistry.setBlobStoreAccessFactory(blobStoreAccessFactory); 
    7467         
    7568        AvroConverter serverConverter = new AvroConverter(); 
     
    8275        typeManager = new RemoteTypeManager(new InetSocketAddress(lilyServer.getPort()), 
    8376                remoteConverter, idGenerator, zooKeeper); 
     77        BlobManager blobManager = setupBlobManager(); 
    8478        repository = new RemoteRepository(new InetSocketAddress(lilyServer.getPort()), remoteConverter, 
    85                 (RemoteTypeManager)typeManager, idGenerator, blobStoreAccessFactory); 
    86         repository.registerBlobStoreAccess(dfsBlobStoreAccess); 
    87         repository.registerBlobStoreAccess(hbaseBlobStoreAccess); 
    88         repository.registerBlobStoreAccess(inlineBlobStoreAccess); 
     79                (RemoteTypeManager)typeManager, idGenerator, blobManager); 
    8980        remoteConverter.setRepository(repository); 
    9081        ((RemoteTypeManager)typeManager).start(); 
  • trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/ValueTypeTest.java

    r4611 r4621  
    2323 
    2424import org.apache.hadoop.conf.Configuration; 
    25 import org.apache.hadoop.fs.Path; 
    2625import org.apache.hadoop.hbase.util.Bytes; 
    2726import org.joda.time.DateTime; 
     
    4342import org.lilyproject.repository.api.Scope; 
    4443import org.lilyproject.repository.impl.AbstractTypeManager; 
    45 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 
     44import org.lilyproject.repository.impl.BlobManagerImpl; 
    4645import org.lilyproject.repository.impl.HBaseRepository; 
    4746import org.lilyproject.repository.impl.HBaseTypeManager; 
    4847import org.lilyproject.repository.impl.IdGeneratorImpl; 
     48import org.lilyproject.repository.impl.InlineBlobStoreAccess; 
    4949import org.lilyproject.repository.impl.SizeBasedBlobStoreAccessFactory; 
    5050import org.lilyproject.rowlog.api.RowLog; 
     
    8585        hbaseTableFactory = new HBaseTableFactoryImpl(HBASE_PROXY.getConf()); 
    8686        typeManager = new HBaseTypeManager(idGenerator, HBASE_PROXY.getConf(), zooKeeper, hbaseTableFactory); 
    87         DFSBlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 
    88         BlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 
    89         repository = new HBaseRepository(typeManager, idGenerator, blobStoreAccessFactory, initializeWal(HBASE_PROXY.getConf()), HBASE_PROXY.getConf(), hbaseTableFactory); 
    90  
     87        InlineBlobStoreAccess inlineBlobStoreAccess = new InlineBlobStoreAccess(); 
     88        BlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(inlineBlobStoreAccess); 
     89        repository = new HBaseRepository(typeManager, idGenerator, initializeWal(HBASE_PROXY.getConf()), HBASE_PROXY.getConf(), hbaseTableFactory, new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory)); 
    9190    } 
    9291 
     
    174173        Blob blob2 = new Blob(Bytes.toBytes("anotherKey"), "image/jpeg", Long.MIN_VALUE, "images/image.jpg"); 
    175174        Blob blob3 = new Blob("text/plain", Long.valueOf(0), null); 
     175         
    176176        runValueTypeTests("blobTypeId", "BLOB", blob1, blob2, blob3); 
    177177    } 
  • trunk/global/hbase-util/src/main/java/org/lilyproject/util/hbase/HBaseTableFactory.java

    r4614 r4621  
    2323     */ 
    2424    TableConfig getTableConfig(byte[] tableName); 
    25  
     25     
    2626    /** 
    2727     * 
  • trunk/global/hbase-util/src/main/java/org/lilyproject/util/hbase/HBaseTableFactoryImpl.java

    r4614 r4621  
    2323import org.apache.commons.logging.LogFactory; 
    2424import org.apache.hadoop.conf.Configuration; 
    25 import org.apache.hadoop.hbase.*; 
     25import org.apache.hadoop.hbase.HTableDescriptor; 
     26import org.apache.hadoop.hbase.TableExistsException; 
     27import org.apache.hadoop.hbase.TableNotFoundException; 
    2628import org.apache.hadoop.hbase.client.HBaseAdmin; 
    2729import org.apache.hadoop.hbase.client.HTableInterface; 
     
    7577        return new LocalHTable(configuration, tableDescriptor.getName()); 
    7678    } 
     79     
    7780 
    7881    public TableConfig getTableConfig(byte[] tableName) { 
     
    8689        return splitKeys; 
    8790    } 
    88  
    8991} 
  • trunk/global/hbase-util/src/main/java/org/lilyproject/util/hbase/LilyHBaseSchema.java

    r4611 r4621  
    3838                false, true, HConstants.FOREVER, HColumnDescriptor.DEFAULT_BLOOMFILTER)); 
    3939    } 
     40     
     41    private static final HTableDescriptor blobIncubatorDescriptor; 
     42     
     43    static { 
     44        blobIncubatorDescriptor = new HTableDescriptor(Table.BLOBINCUBATOR.bytes); 
     45        blobIncubatorDescriptor.addFamily(new HColumnDescriptor(BlobIncubatorCf.REF.bytes)); 
     46    } 
    4047 
    4148    public static HTableInterface getRecordTable(HBaseTableFactory tableFactory) throws IOException { 
     
    4653        return tableFactory.getTable(typeTableDescriptor); 
    4754    } 
     55     
     56    public static HTableInterface getBlobIncubatorTable(HBaseTableFactory tableFactory) throws IOException { 
     57        return tableFactory.getTable(blobIncubatorDescriptor); 
     58    } 
    4859 
    4960    public static enum Table { 
    5061        RECORD("record"), 
    51         TYPE("type"); 
     62        TYPE("type"), 
     63        BLOBINCUBATOR("blobincubator"); 
    5264 
    5365        public final byte[] bytes; 
     
    139151        } 
    140152    } 
     153     
     154    /** 
     155     * Column families in the blob incubator table. 
     156     */ 
     157    public static enum BlobIncubatorCf { 
     158        REF("ref"); 
     159         
     160        public final byte[] bytes; 
     161        public final String name; 
     162 
     163        BlobIncubatorCf(String name) { 
     164            this.name = name; 
     165            this.bytes = Bytes.toBytes(name); 
     166        } 
     167    } 
     168     
     169    /** 
     170     * Columns in the blob incubator table. 
     171     */ 
     172    public static enum BlobIncubatorColumn { 
     173        RECORD("record"), FIELD("field"); 
     174         
     175        public final byte[] bytes; 
     176        public final String name; 
     177         
     178        BlobIncubatorColumn(String name) { 
     179            this.name = name; 
     180            this.bytes = Bytes.toBytes(name); 
     181        } 
     182    } 
    141183} 
Note: See TracChangeset for help on using the changeset viewer.