Changeset 4621
- Timestamp:
- 2011-02-03 08:35:21 (2 years ago)
- Location:
- trunk
- Files:
-
- 5 added
- 37 edited
-
cr/indexer/batchbuild/src/main/java/org/lilyproject/indexer/batchbuild/IndexingMapper.java (modified) (1 diff)
-
cr/indexer/engine/src/main/java/org/lilyproject/indexer/engine/ValueEvaluator.java (modified) (1 diff)
-
cr/indexer/engine/src/test/java/org/lilyproject/indexer/engine/test/IndexerTest.java (modified) (3 diffs)
-
cr/linkindex/src/test/java/org/lilyproject/linkindex/test/LinkIndexTest.java (modified) (4 diffs)
-
cr/process/client/src/main/java/org/lilyproject/client/LilyClient.java (modified) (6 diffs)
-
cr/process/server/repository-module/src/main/java/org/lilyproject/server/modules/repository/BlobStoreConfig.java (modified) (3 diffs)
-
cr/process/server/repository-module/src/main/kauri/spring/services.xml (modified) (2 diffs)
-
cr/process/server/rest-module/src/main/java/org/lilyproject/rest/BlobByVersionAndFieldResource.java (modified) (2 diffs)
-
cr/repository-api-tutorial/src/test/java/org/lilyproject/repository/api/tutorial/TutorialTest.java (modified) (5 diffs)
-
cr/repository/api/src/main/java/org/lilyproject/repository/api/BlobInputStream.java (added)
-
cr/repository/api/src/main/java/org/lilyproject/repository/api/BlobManager.java (added)
-
cr/repository/api/src/main/java/org/lilyproject/repository/api/BlobNotFoundException.java (modified) (2 diffs)
-
cr/repository/api/src/main/java/org/lilyproject/repository/api/BlobReference.java (added)
-
cr/repository/api/src/main/java/org/lilyproject/repository/api/BlobStoreAccess.java (modified) (2 diffs)
-
cr/repository/api/src/main/java/org/lilyproject/repository/api/RecordId.java (modified) (1 diff)
-
cr/repository/api/src/main/java/org/lilyproject/repository/api/Repository.java (modified) (1 diff)
-
cr/repository/api/src/main/java/org/lilyproject/repository/api/ValueType.java (modified) (2 diffs)
-
cr/repository/impl/src/main/avro/lily.avpr (modified) (2 diffs)
-
cr/repository/impl/src/main/java/org/lilyproject/repository/avro/AvroConverter.java (modified) (3 diffs)
-
cr/repository/impl/src/main/java/org/lilyproject/repository/avro/AvroLilyImpl.java (modified) (2 diffs)
-
cr/repository/impl/src/main/java/org/lilyproject/repository/impl/BlobManagerImpl.java (added)
-
cr/repository/impl/src/main/java/org/lilyproject/repository/impl/BlobStoreAccessRegistry.java (modified) (6 diffs)
-
cr/repository/impl/src/main/java/org/lilyproject/repository/impl/ContainsValueComparator.java (added)
-
cr/repository/impl/src/main/java/org/lilyproject/repository/impl/DFSBlobStoreAccess.java (modified) (2 diffs)
-
cr/repository/impl/src/main/java/org/lilyproject/repository/impl/HBaseBlobStoreAccess.java (modified) (2 diffs)
-
cr/repository/impl/src/main/java/org/lilyproject/repository/impl/HBaseRepository.java (modified) (26 diffs)
-
cr/repository/impl/src/main/java/org/lilyproject/repository/impl/InlineBlobStoreAccess.java (modified) (1 diff)
-
cr/repository/impl/src/main/java/org/lilyproject/repository/impl/NettyTransceiverFactory.java (modified) (1 diff)
-
cr/repository/impl/src/main/java/org/lilyproject/repository/impl/RemoteRepository.java (modified) (5 diffs)
-
cr/repository/impl/src/main/java/org/lilyproject/repository/impl/ValueTypeImpl.java (modified) (10 diffs)
-
cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AbstractBlobStoreTest.java (modified) (4 diffs)
-
cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AbstractRepositoryTest.java (modified) (2 diffs)
-
cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AvroRepositoryTest.java (modified) (5 diffs)
-
cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AvroTypeManagerFieldTypeTest.java (modified) (5 diffs)
-
cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AvroTypeManagerRecordTypeTest.java (modified) (6 diffs)
-
cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/BlobStoreTest.java (modified) (3 diffs)
-
cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/HBaseRepositoryTest.java (modified) (2 diffs)
-
cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/RemoteBlobStoreTest.java (modified) (5 diffs)
-
cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/ValueTypeTest.java (modified) (4 diffs)
-
global/hbase-util/src/main/java/org/lilyproject/util/hbase/HBaseTableFactory.java (modified) (1 diff)
-
global/hbase-util/src/main/java/org/lilyproject/util/hbase/HBaseTableFactoryImpl.java (modified) (3 diffs)
-
global/hbase-util/src/main/java/org/lilyproject/util/hbase/LilyHBaseSchema.java (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
trunk/cr/indexer/batchbuild/src/main/java/org/lilyproject/indexer/batchbuild/IndexingMapper.java
r4611 r4621 84 84 TypeManager typeManager = new HBaseTypeManager(idGenerator, conf, zk, hbaseTableFactory); 85 85 86 BlobStoreAccessFactory blobStoreAccessFactory = LilyClient.getBlobStoreAccess(zk);87 88 86 RowLog wal = new DummyRowLog("The write ahead log should not be called from within MapReduce jobs."); 89 repository = new HBaseRepository(typeManager, idGenerator, blobStoreAccessFactory, wal, conf, hbaseTableFactory); 87 88 BlobManager blobManager = LilyClient.getBlobManager(zk); 89 repository = new HBaseRepository(typeManager, idGenerator, wal, conf, hbaseTableFactory, blobManager); 90 90 91 91 byte[] indexerConfBytes = Base64.decode(jobConf.get("org.lilyproject.indexer.batchbuild.indexerconf")); -
trunk/cr/indexer/engine/src/main/java/org/lilyproject/indexer/engine/ValueEvaluator.java
r4592 r4621 91 91 InputStream is = null; 92 92 try { 93 is = repository.getInputStream(blob);93 // is = repository.getInputStream(blob); 94 94 95 95 // TODO make write limit configurable -
trunk/cr/indexer/engine/src/test/java/org/lilyproject/indexer/engine/test/IndexerTest.java
r4611 r4621 17 17 18 18 import static org.junit.Assert.assertEquals; 19 20 import org.apache.hadoop.fs.Path;21 import org.lilyproject.indexer.engine.*;22 import org.lilyproject.indexer.model.indexerconf.IndexerConfBuilder; 23 import org.lilyproject.linkindex.LinkIndexUpdater;24 import org.lilyproject.rowlog.api.*;25 import org.lilyproject.rowlog.impl.*;26 import org.lilyproject.solrtestfw.SolrTestingUtility;27 import org.lilyproject.util.hbase.HBaseTableFactory;28 import org.lilyproject.util.hbase.HBaseTableFactoryImpl;29 import org.lilyproject.util.hbase.LilyHBaseSchema;30 import org.lilyproject.util.io.Closer;31 import org.lilyproject.util.repo.RecordEvent;19 import static org.lilyproject.util.repo.RecordEvent.Type.CREATE; 20 import static org.lilyproject.util.repo.RecordEvent.Type.DELETE; 21 import static org.lilyproject.util.repo.RecordEvent.Type.UPDATE; 22 23 import java.io.ByteArrayOutputStream; 24 import java.io.IOException; 25 import java.io.InputStream; 26 import java.io.OutputStream; 27 import java.util.Arrays; 28 import java.util.Collections; 29 import java.util.HashMap; 30 import java.util.List; 31 import java.util.Map; 32 32 33 33 import org.apache.commons.logging.Log; 34 34 import org.apache.commons.logging.LogFactory; 35 import org.apache.hadoop.fs.Path; 35 36 import org.apache.solr.client.solrj.SolrQuery; 36 37 import org.apache.solr.client.solrj.SolrServerException; … … 45 46 import org.junit.Test; 46 47 import org.lilyproject.hbaseindex.IndexManager; 48 import org.lilyproject.indexer.engine.IndexLocker; 49 import org.lilyproject.indexer.engine.IndexUpdater; 50 import org.lilyproject.indexer.engine.IndexUpdaterMetrics; 51 import org.lilyproject.indexer.engine.Indexer; 52 import org.lilyproject.indexer.engine.IndexerMetrics; 53 import org.lilyproject.indexer.engine.SolrServers; 47 54 import org.lilyproject.indexer.model.indexerconf.IndexerConf; 55 import org.lilyproject.indexer.model.indexerconf.IndexerConfBuilder; 48 56 import org.lilyproject.linkindex.LinkIndex; 49 import org.lilyproject.repository.api.*; 50 import org.lilyproject.repository.impl.*; 51 import org.lilyproject.util.repo.VersionTag; 57 import org.lilyproject.linkindex.LinkIndexUpdater; 58 import org.lilyproject.repository.api.Blob; 59 import org.lilyproject.repository.api.BlobManager; 60 import org.lilyproject.repository.api.BlobStoreAccess; 61 import org.lilyproject.repository.api.FieldType; 62 import org.lilyproject.repository.api.HierarchyPath; 63 import org.lilyproject.repository.api.IdGenerator; 64 import org.lilyproject.repository.api.Link; 65 import org.lilyproject.repository.api.QName; 66 import org.lilyproject.repository.api.Record; 67 import org.lilyproject.repository.api.RecordId; 68 import org.lilyproject.repository.api.RecordType; 69 import org.lilyproject.repository.api.Scope; 70 import org.lilyproject.repository.api.TypeManager; 71 import org.lilyproject.repository.api.ValueType; 72 import org.lilyproject.repository.impl.BlobManagerImpl; 73 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 74 import org.lilyproject.repository.impl.HBaseRepository; 75 import org.lilyproject.repository.impl.HBaseTypeManager; 76 import org.lilyproject.repository.impl.IdGeneratorImpl; 77 import org.lilyproject.repository.impl.SizeBasedBlobStoreAccessFactory; 78 import org.lilyproject.rowlog.api.RowLog; 79 import org.lilyproject.rowlog.api.RowLogConfig; 80 import org.lilyproject.rowlog.api.RowLogConfigurationManager; 81 import org.lilyproject.rowlog.api.RowLogException; 82 import org.lilyproject.rowlog.api.RowLogMessage; 83 import org.lilyproject.rowlog.api.RowLogMessageListener; 84 import org.lilyproject.rowlog.api.RowLogMessageListenerMapping; 85 import org.lilyproject.rowlog.api.RowLogShard; 86 import org.lilyproject.rowlog.api.RowLogSubscription; 87 import org.lilyproject.rowlog.impl.RowLogConfigurationManagerImpl; 88 import org.lilyproject.rowlog.impl.RowLogImpl; 89 import org.lilyproject.rowlog.impl.RowLogShardImpl; 90 import org.lilyproject.solrtestfw.SolrTestingUtility; 52 91 import org.lilyproject.testfw.HBaseProxy; 53 92 import org.lilyproject.testfw.TestHelper; 93 import org.lilyproject.util.hbase.HBaseTableFactory; 94 import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 95 import org.lilyproject.util.hbase.LilyHBaseSchema; 96 import org.lilyproject.util.hbase.LilyHBaseSchema.RecordCf; 97 import org.lilyproject.util.io.Closer; 98 import org.lilyproject.util.repo.RecordEvent; 99 import org.lilyproject.util.repo.VersionTag; 54 100 import org.lilyproject.util.zookeeper.ZkUtil; 55 101 import org.lilyproject.util.zookeeper.ZooKeeperItf; 56 57 import static org.lilyproject.util.repo.RecordEvent.Type.*;58 import static org.lilyproject.util.hbase.LilyHBaseSchema.*;59 60 import java.io.ByteArrayOutputStream;61 import java.io.IOException;62 import java.io.InputStream;63 import java.io.OutputStream;64 import java.util.*;65 102 66 103 // To run this test from an IDE, set a property solr.war pointing to the SOLR war … … 130 167 RowLogShard walShard = new RowLogShardImpl("WS1", HBASE_PROXY.getConf(), wal, 100); 131 168 wal.registerShard(walShard); 132 133 repository = new HBaseRepository(typeManager, idGenerator, blobStoreAccessFactory, wal, HBASE_PROXY.getConf(), hbaseTableFactory);169 BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 170 repository = new HBaseRepository(typeManager, idGenerator, wal, HBASE_PROXY.getConf(), hbaseTableFactory, blobManager); 134 171 135 172 IndexManager.createIndexMetaTableIfNotExists(HBASE_PROXY.getConf()); -
trunk/cr/linkindex/src/test/java/org/lilyproject/linkindex/test/LinkIndexTest.java
r4611 r4621 32 32 import org.lilyproject.linkindex.LinkIndex; 33 33 import org.lilyproject.linkindex.LinkIndexUpdater; 34 import org.lilyproject.repository.api.BlobManager; 34 35 import org.lilyproject.repository.api.BlobStoreAccess; 35 36 import org.lilyproject.repository.api.FieldType; … … 42 43 import org.lilyproject.repository.api.Scope; 43 44 import org.lilyproject.repository.api.TypeManager; 45 import org.lilyproject.repository.impl.BlobManagerImpl; 44 46 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 45 47 import org.lilyproject.repository.impl.HBaseRepository; … … 94 96 BlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 95 97 SizeBasedBlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 98 BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 96 99 97 100 rowLogConfMgr = new RowLogConfigurationManagerImpl(zk); … … 103 106 RowLogShard walShard = new RowLogShardImpl("WS1", HBASE_PROXY.getConf(), wal, 100); 104 107 wal.registerShard(walShard); 105 106 repository = new HBaseRepository(typeManager, idGenerator, blobStoreAccessFactory, wal, HBASE_PROXY.getConf(), hbaseTableFactory); 108 repository = new HBaseRepository(typeManager, idGenerator, wal, HBASE_PROXY.getConf(), hbaseTableFactory, blobManager); 107 109 ids = repository.getIdGenerator(); 108 110 IndexManager indexManager = new IndexManager(HBASE_PROXY.getConf()); -
trunk/cr/process/client/src/main/java/org/lilyproject/client/LilyClient.java
r4614 r4621 20 20 import java.net.InetSocketAddress; 21 21 import java.net.URI; 22 import java.util.*; 22 import java.util.ArrayList; 23 import java.util.Collections; 24 import java.util.HashSet; 25 import java.util.Iterator; 26 import java.util.List; 27 import java.util.Set; 23 28 24 29 import org.apache.commons.logging.Log; … … 32 37 import org.apache.zookeeper.Watcher; 33 38 import org.apache.zookeeper.data.Stat; 39 import org.lilyproject.repository.api.BlobManager; 34 40 import org.lilyproject.repository.api.BlobStoreAccess; 35 import org.lilyproject.repository.api.BlobStoreAccessFactory;36 41 import org.lilyproject.repository.api.Repository; 37 42 import org.lilyproject.repository.avro.AvroConverter; 43 import org.lilyproject.repository.impl.BlobManagerImpl; 38 44 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 39 45 import org.lilyproject.repository.impl.HBaseBlobStoreAccess; … … 43 49 import org.lilyproject.repository.impl.RemoteTypeManager; 44 50 import org.lilyproject.repository.impl.SizeBasedBlobStoreAccessFactory; 51 import org.lilyproject.util.hbase.HBaseTableFactory; 52 import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 45 53 import org.lilyproject.util.io.Closer; 46 54 import org.lilyproject.util.repo.DfsUri; … … 156 164 remoteConverter, idGenerator, zk); 157 165 158 Blob StoreAccessFactory blobStoreAccessFactory = getBlobStoreAccess(zk);166 BlobManager blobManager = getBlobManager(zk); 159 167 160 168 Repository repository = new RemoteRepository(parseAddressAndPort(server.lilyAddressAndPort), 161 remoteConverter, typeManager, idGenerator, blob StoreAccessFactory);169 remoteConverter, typeManager, idGenerator, blobManager); 162 170 163 171 remoteConverter.setRepository(repository); … … 165 173 server.repository = repository; 166 174 } 167 168 public static Blob StoreAccessFactory getBlobStoreAccess(ZooKeeperItf zk) throws IOException {175 176 public static BlobManager getBlobManager(ZooKeeperItf zk) throws IOException { 169 177 Configuration configuration = HBaseConfiguration.create(); 170 178 configuration.set("hbase.zookeeper.quorum", getBlobHBaseZkQuorum(zk)); 171 179 configuration.set("hbase.zookeeper.property.clientPort", getBlobHBaseZkPort(zk)); 172 180 HBaseTableFactory hbaseTableFactory = new HBaseTableFactoryImpl(configuration); 181 173 182 URI dfsUri = getDfsUri(zk); 174 183 FileSystem fs = FileSystem.get(DfsUri.getBaseDfsUri(dfsUri), configuration); … … 181 190 blobStoreAccessFactory.addBlobStoreAccess(5000, inlineBlobStoreAccess); 182 191 blobStoreAccessFactory.addBlobStoreAccess(200000, hbaseBlobStoreAccess); 183 return blobStoreAccessFactory; 192 193 return new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 184 194 } 185 195 -
trunk/cr/process/server/repository-module/src/main/java/org/lilyproject/server/modules/repository/BlobStoreConfig.java
r4611 r4621 22 22 import org.apache.hadoop.fs.FileSystem; 23 23 import org.apache.hadoop.fs.Path; 24 import org.lilyproject.repository.api.BlobManager; 24 25 import org.lilyproject.repository.api.BlobStoreAccess; 25 import org.lilyproject.repository. api.BlobStoreAccessFactory;26 import org.lilyproject.repository.impl.BlobManagerImpl; 26 27 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 27 28 import org.lilyproject.repository.impl.HBaseBlobStoreAccess; … … 32 33 33 34 public class BlobStoreConfig { 34 static BlobStoreAccessFactory get(URI dfsUri, Configuration configuration, HBaseTableFactory tableFactory) 35 throws IOException { 36 35 static BlobManager get(URI dfsUri, Configuration configuration, HBaseTableFactory tableFactory) throws IOException { 37 36 FileSystem fs = FileSystem.get(DfsUri.getBaseDfsUri(dfsUri), configuration); 38 37 Path blobRootPath = new Path(DfsUri.getDfsPath(dfsUri)); … … 41 40 BlobStoreAccess hbaseBlobStoreAccess = new HBaseBlobStoreAccess(tableFactory); 42 41 BlobStoreAccess inlineBlobStoreAccess = new InlineBlobStoreAccess(); 43 SizeBasedBlobStoreAccessFactory factory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess);44 factory.addBlobStoreAccess(5000, inlineBlobStoreAccess);45 factory.addBlobStoreAccess(200000, hbaseBlobStoreAccess);46 return factory;42 SizeBasedBlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 43 blobStoreAccessFactory.addBlobStoreAccess(5000, inlineBlobStoreAccess); 44 blobStoreAccessFactory.addBlobStoreAccess(200000, hbaseBlobStoreAccess); 45 return new BlobManagerImpl(tableFactory, blobStoreAccessFactory); 47 46 } 48 47 } -
trunk/cr/process/server/repository-module/src/main/kauri/spring/services.xml
r4611 r4621 74 74 75 75 <bean 76 id="blob StoreAccessFactory"76 id="blobManager" 77 77 class="org.lilyproject.server.modules.repository.BlobStoreConfig" 78 78 factory-method="get"> … … 89 89 <constructor-arg ref="typeManager"/> 90 90 <constructor-arg ref="idGenerator"/> 91 <constructor-arg ref="blobStoreAccessFactory"/>92 91 <constructor-arg ref="writeAheadLog"/> 93 92 <constructor-arg ref="hbaseConf"/> 94 93 <constructor-arg ref="hbaseTableFactory"/> 94 <constructor-arg ref="blobManager"/> 95 95 </bean> 96 96 -
trunk/cr/process/server/rest-module/src/main/java/org/lilyproject/rest/BlobByVersionAndFieldResource.java
r4592 r4621 44 44 protected static Response getBlob(String id, String version, String fieldName, UriInfo uriInfo, 45 45 final Repository repository) { 46 RecordId recordId = repository.getIdGenerator().fromString(id);46 final RecordId recordId = repository.getIdGenerator().fromString(id); 47 47 48 QName fieldQName = ResourceClassUtil.parseQName(fieldName, uriInfo.getQueryParameters());48 final QName fieldQName = ResourceClassUtil.parseQName(fieldName, uriInfo.getQueryParameters()); 49 49 50 Long v ersionNr = null;50 Long vNr = null; 51 51 if (version != null) { 52 v ersionNr = Long.parseLong(version);52 vNr = Long.parseLong(version); 53 53 } 54 final Long versionNr = vNr; 54 55 55 56 Record record; … … 73 74 InputStream is = null; 74 75 try { 75 is = repository.getInputStream( blob);76 is = repository.getInputStream(recordId, versionNr, fieldQName, null, null); 76 77 IOUtils.copyLarge(is, output); 77 78 } catch (BlobNotFoundException e) { -
trunk/cr/repository-api-tutorial/src/test/java/org/lilyproject/repository/api/tutorial/TutorialTest.java
r4611 r4621 33 33 import org.junit.Test; 34 34 import org.lilyproject.repository.api.Blob; 35 import org.lilyproject.repository.api.BlobManager; 35 36 import org.lilyproject.repository.api.FieldType; 36 37 import org.lilyproject.repository.api.IdGenerator; … … 43 44 import org.lilyproject.repository.api.TypeManager; 44 45 import org.lilyproject.repository.api.ValueType; 46 import org.lilyproject.repository.impl.BlobManagerImpl; 45 47 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 46 48 import org.lilyproject.repository.impl.HBaseRepository; … … 85 87 private static HBaseTableFactory hbaseTableFactory; 86 88 89 87 90 @BeforeClass 88 91 public static void setUpBeforeClass() throws Exception { … … 101 104 SizeBasedBlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 102 105 blobStoreAccessFactory.addBlobStoreAccess(Long.MAX_VALUE, dfsBlobStoreAccess); 106 BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 103 107 setupWal(); 104 repository = new HBaseRepository(typeManager, idGenerator, blobStoreAccessFactory, wal, configuration, hbaseTableFactory);108 repository = new HBaseRepository(typeManager, idGenerator, wal, configuration, hbaseTableFactory, blobManager); 105 109 106 110 } … … 294 298 InputStream is = null; 295 299 try { 296 is = repository.getInputStream( (Blob)record.getField(new QName(NS, "description")));300 is = repository.getInputStream(record.getId(), new QName(NS, "description")); 297 301 System.out.println("Data read from blob is:"); 298 302 Reader reader = new InputStreamReader(is, "UTF-8"); -
trunk/cr/repository/api/src/main/java/org/lilyproject/repository/api/BlobNotFoundException.java
r4592 r4621 17 17 18 18 public class BlobNotFoundException extends RepositoryException { 19 private final Blob blob;19 private Blob blob = null; 20 20 21 public BlobNotFoundException(Blob blob) { 21 public BlobNotFoundException(Blob blob, String message, Throwable cause) { 22 super(message, cause); 22 23 this.blob = blob; 23 24 } 24 25 26 public BlobNotFoundException(Blob blob, String message) { 27 super(message); 28 this.blob = blob; 29 } 30 31 public BlobNotFoundException(String message, Throwable cause) { 32 super(message, cause); 33 } 34 35 public BlobNotFoundException(String message) { 36 super(message); 37 } 38 25 39 public Blob getBlob() { 26 40 return blob; … … 29 43 @Override 30 44 public String getMessage() { 31 return "Blob <" + blob + "> could not be found."; 45 if (blob != null) 46 return "Blob <" + blob + "> could not be found." + super.getMessage(); 47 return super.getMessage(); 32 48 } 33 49 } -
trunk/cr/repository/api/src/main/java/org/lilyproject/repository/api/BlobStoreAccess.java
r4592 r4621 54 54 * @param key a unique key identifying the written bytes on the blobstore, see {@link #getOutputStream(Blob)} 55 55 * 56 * @return an InputStream from whi h a stream of bytes can be read56 * @return an InputStream from which a stream of bytes can be read 57 57 * @throws BlobException when an unexpected exception occurred (e.g. an IOException of the underlying blobstore) 58 58 */ … … 67 67 */ 68 68 void delete(byte[] key) throws BlobException; 69 70 boolean incubate(); 69 71 } -
trunk/cr/repository/api/src/main/java/org/lilyproject/repository/api/RecordId.java
r4592 r4621 65 65 SortedMap<String, String> getVariantProperties(); 66 66 67 int hashCode(); 68 67 69 boolean equals(Object obj); 68 70 } -
trunk/cr/repository/api/src/main/java/org/lilyproject/repository/api/Repository.java
r4592 r4621 241 241 OutputStream getOutputStream(Blob blob) throws BlobException, InterruptedException; 242 242 243 /** 244 * Returns an {@link InputStream} from which the binary data of a blob can 245 * be read. The value of blob is used to identify the underlying blobstore 246 * and actual data to return through this InputStream, see {@link #getOutputStream(Blob)}. 247 * 248 * @param blob the blob for which to open an InputStream 249 * @return an InputStream 250 * @throws BlobNotFoundException when the blob does not contain a valid key in its value 251 * @throws RepositoryException when an unexpected exception occurs 252 */ 253 InputStream getInputStream(Blob blob) throws BlobNotFoundException, BlobException, InterruptedException; 254 255 /** 256 * Deletes the data identified by a blob from the underlying blobstore. See {@link #getOutputStream(Blob)} and {@link #getInputStream(Blob)}. 257 * @param blob the blob to delete 258 * @throws BlobNotFoundException when the blob does not contain a valid key in its value 259 * @throws RepositoryException when an unexpected exception occurs 260 */ 261 void delete(Blob blob) throws BlobNotFoundException, BlobException, InterruptedException; 243 244 245 /** 246 * Returns an {@link InputStream} from which the binary data of a blob can be read. 247 * The blob can only be retrieved by referring to location of the blob in the record 248 * and field where it is used. 249 * @param recordId the id of the record containing the blob 250 * @param fieldName the QName of the field containing the blob 251 * @param version optionally a version of the record, if null the latest record version is used 252 * @param multivalueIndex the position of the blob in a multivalue field 253 * @param hierarchyIndex the position of the blob in a hierarchical field 254 * @return a BlobInputStream 255 * @throws BlobNotFoundException thrown when no blob can be found at the given location 256 * @throws BlobException thrown when opening an InputStream on the blob fails 257 */ 258 BlobInputStream getInputStream(RecordId recordId, Long version, QName fieldName, Integer multivalueIndex, Integer hierarchyIndex) throws BlobNotFoundException, BlobException, InterruptedException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException; 259 260 /** 261 * Shortcut getInputStream method where version, multivalueIndex and hierarchyIndex are set to null. 262 */ 263 BlobInputStream getInputStream(RecordId recordId, QName fieldName) throws BlobNotFoundException, BlobException, InterruptedException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException; 262 264 263 265 /** -
trunk/cr/repository/api/src/main/java/org/lilyproject/repository/api/ValueType.java
r4592 r4621 15 15 */ 16 16 package org.lilyproject.repository.api; 17 18 import java.util.Set; 17 19 18 20 /** … … 93 95 byte[] toBytes(); 94 96 97 /** 98 * Returns a set of all values contained in this value. 99 * It flattens out the aspects of multivalue and hierarchy values. 100 */ 101 Set<Object> getValues(Object value); 102 95 103 boolean equals(Object obj); 96 104 } -
trunk/cr/repository/impl/src/main/avro/lily.avpr
r4620 r4621 156 156 "type": "error", 157 157 "fields": [ 158 {"name": "blob", "type": "AvroBlob"} 158 {"name": "blob", "type": "AvroBlob"}, 159 {"name": "message", "type": ["string", "null"]}, 160 {"name": "remoteCauses", "type": ["null", {"type": "array", "items": "AvroExceptionCause"}]} 159 161 ] 160 162 }, … … 358 360 }, 359 361 360 "deleteBlob": {361 "request": [{"name": "blob", "type": "AvroBlob"}],362 "response": "null",363 "errors": ["AvroBlobNotFoundException", "AvroBlobException", "AvroGenericException", "AvroInterruptedException"]364 },365 366 362 "getVariants": { 367 363 "request": [{"name": "recordId", "type": "bytes"}], -
trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/avro/AvroConverter.java
r4620 r4621 16 16 package org.lilyproject.repository.avro; 17 17 18 import java.io.IOException;19 18 import java.nio.ByteBuffer; 20 19 import java.util.ArrayList; … … 29 28 import org.apache.avro.AvroRemoteException; 30 29 import org.apache.avro.util.Utf8; 31 import org.lilyproject.repository.api.*; 30 import org.lilyproject.repository.api.Blob; 31 import org.lilyproject.repository.api.BlobException; 32 import org.lilyproject.repository.api.BlobNotFoundException; 33 import org.lilyproject.repository.api.FieldType; 34 import org.lilyproject.repository.api.FieldTypeEntry; 35 import org.lilyproject.repository.api.FieldTypeExistsException; 36 import org.lilyproject.repository.api.FieldTypeNotFoundException; 37 import org.lilyproject.repository.api.FieldTypeUpdateException; 38 import org.lilyproject.repository.api.IdGenerator; 39 import org.lilyproject.repository.api.IdRecord; 40 import org.lilyproject.repository.api.InvalidRecordException; 41 import org.lilyproject.repository.api.QName; 42 import org.lilyproject.repository.api.Record; 43 import org.lilyproject.repository.api.RecordException; 44 import org.lilyproject.repository.api.RecordExistsException; 45 import org.lilyproject.repository.api.RecordId; 46 import org.lilyproject.repository.api.RecordLockedException; 47 import org.lilyproject.repository.api.RecordNotFoundException; 48 import org.lilyproject.repository.api.RecordType; 49 import org.lilyproject.repository.api.RecordTypeExistsException; 50 import org.lilyproject.repository.api.RecordTypeNotFoundException; 51 import org.lilyproject.repository.api.RemoteException; 52 import org.lilyproject.repository.api.Repository; 53 import org.lilyproject.repository.api.RepositoryException; 54 import org.lilyproject.repository.api.Scope; 55 import org.lilyproject.repository.api.TypeException; 56 import org.lilyproject.repository.api.TypeManager; 57 import org.lilyproject.repository.api.ValueType; 58 import org.lilyproject.repository.api.VersionNotFoundException; 32 59 import org.lilyproject.repository.impl.IdRecordImpl; 33 60 … … 601 628 602 629 public BlobNotFoundException convert(AvroBlobNotFoundException avroException) { 603 BlobNotFoundException exception = new BlobNotFoundException(convert(avroException.blob)); 604 return exception; 630 try { 631 BlobNotFoundException exception = new BlobNotFoundException(convert(avroException.blob), convert(avroException.message)); 632 restoreCauses(avroException.remoteCauses, exception); 633 return exception; 634 } catch (Exception e) { 635 // TODO this is not good, exceptions should never fail to deserialize 636 throw new RuntimeException("Error deserializing exception.", e); 637 } 605 638 } 606 639 -
trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/avro/AvroLilyImpl.java
r4592 r4621 20 20 import java.util.List; 21 21 22 import org.lilyproject.repository.api.*; 22 import org.lilyproject.repository.api.FieldTypeExistsException; 23 import org.lilyproject.repository.api.FieldTypeNotFoundException; 24 import org.lilyproject.repository.api.FieldTypeUpdateException; 25 import org.lilyproject.repository.api.InvalidRecordException; 26 import org.lilyproject.repository.api.QName; 27 import org.lilyproject.repository.api.RecordException; 28 import org.lilyproject.repository.api.RecordExistsException; 29 import org.lilyproject.repository.api.RecordLockedException; 30 import org.lilyproject.repository.api.RecordNotFoundException; 31 import org.lilyproject.repository.api.RecordTypeExistsException; 32 import org.lilyproject.repository.api.RecordTypeNotFoundException; 33 import org.lilyproject.repository.api.Repository; 34 import org.lilyproject.repository.api.RepositoryException; 35 import org.lilyproject.repository.api.TypeException; 36 import org.lilyproject.repository.api.TypeManager; 37 import org.lilyproject.repository.api.VersionNotFoundException; 23 38 24 39 public class AvroLilyImpl implements AvroLily { … … 364 379 } 365 380 } 366 367 public Void deleteBlob(AvroBlob avroBlob) throws AvroBlobNotFoundException, AvroBlobException,368 AvroInterruptedException {369 try {370 repository.delete(converter.convert(avroBlob));371 } catch (BlobNotFoundException e) {372 throw converter.convert(e);373 } catch (BlobException e) {374 throw converter.convert(e);375 } catch (InterruptedException e) {376 throw converter.convert(e);377 }378 return null;379 }380 381 381 } -
trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/BlobStoreAccessRegistry.java
r4592 r4621 18 18 import java.io.FilterOutputStream; 19 19 import java.io.IOException; 20 import java.io.InputStream;21 20 import java.io.OutputStream; 22 21 import java.util.HashMap; … … 24 23 25 24 import org.apache.hadoop.hbase.util.Bytes; 26 import org.lilyproject.repository.api.*; 25 import org.lilyproject.repository.api.Blob; 26 import org.lilyproject.repository.api.BlobException; 27 import org.lilyproject.repository.api.BlobInputStream; 28 import org.lilyproject.repository.api.BlobManager; 27 29 import org.lilyproject.repository.api.BlobNotFoundException; 30 import org.lilyproject.repository.api.BlobStoreAccess; 31 import org.lilyproject.repository.api.BlobStoreAccessFactory; 28 32 import org.lilyproject.util.Pair; 29 33 … … 32 36 Map<String, BlobStoreAccess> registry = new HashMap<String, BlobStoreAccess>(); 33 37 private BlobStoreAccessFactory blobStoreAccessFactory; 38 private final BlobManager blobManager; 34 39 35 public BlobStoreAccessRegistry() { 40 public BlobStoreAccessRegistry(BlobManager blobManager) { 41 this.blobManager = blobManager; 36 42 } 37 43 … … 49 55 public OutputStream getOutputStream(Blob blob) throws BlobException { 50 56 BlobStoreAccess blobStoreAccess = blobStoreAccessFactory.get(blob); 51 return new BlobOutputStream(blobStoreAccess.getOutputStream(blob), blobStoreAccess.getId(), blob );57 return new BlobOutputStream(blobStoreAccess.getOutputStream(blob), blobStoreAccess.getId(), blob, blobManager, blobStoreAccess.incubate()); 52 58 } 53 59 54 public InputStream getInputStream(Blob blob) throws BlobNotFoundException, BlobException {60 public BlobInputStream getInputStream(Blob blob) throws BlobNotFoundException, BlobException { 55 61 Pair<String, byte[]> decodedKey = decodeKey(blob); 56 62 BlobStoreAccess blobStoreAccess = registry.get(decodedKey.getV1()); 57 return blobStoreAccess.getInputStream(decodedKey.getV2());63 return new BlobInputStream(blobStoreAccess.getInputStream(decodedKey.getV2()), blob); 58 64 } 59 65 60 66 private Pair<String, byte[]> decodeKey(Blob blob) throws BlobNotFoundException, BlobException { 61 67 if (blob.getValue() == null) { 62 throw new BlobNotFoundException(blob );68 throw new BlobNotFoundException(blob, "Blob has no reference to a blob in the blobstore", null); 63 69 } 64 70 Pair<String, byte[]> decodedKey; … … 97 103 private final Blob blob; 98 104 private final String blobStoreAccessId; 105 private final BlobManager blobManager; 106 private final boolean incubate; 99 107 100 public BlobOutputStream(OutputStream outputStream, String blobStoreAccessId, Blob blob ) {108 public BlobOutputStream(OutputStream outputStream, String blobStoreAccessId, Blob blob, BlobManager blobManager, boolean incubate) { 101 109 super(outputStream); 102 110 this.blobStoreAccessId = blobStoreAccessId; 103 111 this.blob = blob; 112 this.blobManager = blobManager; 113 this.incubate = incubate; 104 114 } 105 115 … … 107 117 public void close() throws IOException { 108 118 super.close(); 109 blob.setValue(encode(blobStoreAccessId, blob.getValue())); 119 byte[] encodedBlobKey = encode(blobStoreAccessId, blob.getValue()); 120 if (incubate) { 121 blobManager.incubateBlob(encodedBlobKey); 122 } 123 blob.setValue(encodedBlobKey); 110 124 } 111 125 } -
trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/DFSBlobStoreAccess.java
r4592 r4621 86 86 UUID uuid = decode(blobKey); 87 87 try { 88 fileSystem.delete( new Path(uuid.toString()), false);88 fileSystem.delete(createPath(uuid), false); 89 89 } catch (IOException e) { 90 90 throw new BlobException("Failed to delete blob with key <" +blobKey+ "> from the DFS blobstore", e); 91 91 } 92 } 93 94 public boolean incubate() { 95 return true; 92 96 } 93 97 … … 111 115 } 112 116 } 117 113 118 } -
trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/HBaseBlobStoreAccess.java
r4614 r4621 85 85 } 86 86 byte[] value = result.getValue(BLOBS_COLUMN_FAMILY_BYTES, BLOB_COLUMN); 87 if (value == null) { 88 throw new BlobException("Failed to open an inputstream for blobkey <" + blobKey + "> since no blob was found on the HBASE blobstore"); 89 } 87 90 return new ByteArrayInputStream(value); 88 91 } … … 95 98 throw new BlobException("Failed to delete blob with key <" + blobKey + "> from the DFS blobstore", e); 96 99 } 100 } 101 102 public boolean incubate() { 103 return true; 97 104 } 98 105 -
trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/HBaseRepository.java
r4620 r4621 20 20 21 21 import java.io.IOException; 22 import java.io.InputStream;23 22 import java.io.OutputStream; 24 23 import java.util.ArrayList; … … 36 35 import org.apache.commons.logging.LogFactory; 37 36 import org.apache.hadoop.conf.Configuration; 37 import org.apache.hadoop.hbase.KeyValue; 38 38 import org.apache.hadoop.hbase.client.Get; 39 39 import org.apache.hadoop.hbase.client.HTableInterface; … … 44 44 import org.apache.hadoop.hbase.client.Scan; 45 45 import org.apache.hadoop.hbase.filter.CompareFilter; 46 import org.apache.hadoop.hbase.filter.Filter; 46 47 import org.apache.hadoop.hbase.filter.FilterList; 47 48 import org.apache.hadoop.hbase.filter.PrefixFilter; 48 49 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 50 import org.apache.hadoop.hbase.filter.ValueFilter; 51 import org.apache.hadoop.hbase.filter.WritableByteArrayComparable; 52 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; 49 53 import org.apache.hadoop.hbase.util.Bytes; 50 import org.lilyproject.repository.api.*; 54 import org.lilyproject.repository.api.Blob; 55 import org.lilyproject.repository.api.BlobException; 56 import org.lilyproject.repository.api.BlobInputStream; 57 import org.lilyproject.repository.api.BlobManager; 58 import org.lilyproject.repository.api.BlobNotFoundException; 59 import org.lilyproject.repository.api.BlobReference; 60 import org.lilyproject.repository.api.BlobStoreAccess; 61 import org.lilyproject.repository.api.FieldNotFoundException; 62 import org.lilyproject.repository.api.FieldType; 63 import org.lilyproject.repository.api.FieldTypeEntry; 64 import org.lilyproject.repository.api.FieldTypeNotFoundException; 65 import org.lilyproject.repository.api.IdGenerator; 66 import org.lilyproject.repository.api.IdRecord; 67 import org.lilyproject.repository.api.InvalidRecordException; 68 import org.lilyproject.repository.api.PrimitiveValueType; 69 import org.lilyproject.repository.api.QName; 70 import org.lilyproject.repository.api.Record; 71 import org.lilyproject.repository.api.RecordException; 72 import org.lilyproject.repository.api.RecordExistsException; 73 import org.lilyproject.repository.api.RecordId; 74 import org.lilyproject.repository.api.RecordLockedException; 75 import org.lilyproject.repository.api.RecordNotFoundException; 76 import org.lilyproject.repository.api.RecordType; 77 import org.lilyproject.repository.api.RecordTypeNotFoundException; 78 import org.lilyproject.repository.api.Repository; 79 import org.lilyproject.repository.api.RepositoryException; 80 import org.lilyproject.repository.api.ResponseStatus; 81 import org.lilyproject.repository.api.Scope; 82 import org.lilyproject.repository.api.TypeException; 83 import org.lilyproject.repository.api.TypeManager; 84 import org.lilyproject.repository.api.ValueType; 85 import org.lilyproject.repository.api.VersionNotFoundException; 51 86 import org.lilyproject.repository.impl.RepositoryMetrics.Action; 52 87 import org.lilyproject.repository.impl.RepositoryMetrics.HBaseAction; 53 88 import org.lilyproject.repository.impl.lock.RowLocker; 89 import org.lilyproject.repository.impl.primitivevaluetype.BlobValueType; 54 90 import org.lilyproject.rowlog.api.RowLog; 55 91 import org.lilyproject.rowlog.api.RowLogException; … … 72 108 public class HBaseRepository implements Repository { 73 109 74 75 110 private HTableInterface recordTable; 76 111 private final TypeManager typeManager; … … 80 115 private Map<Scope, byte[]> recordTypeIdColumnNames = new HashMap<Scope, byte[]>(); 81 116 private Map<Scope, byte[]> recordTypeVersionColumnNames = new HashMap<Scope, byte[]>(); 82 private BlobStoreAccessRegistry blobStoreAccessRegistry;83 117 private RowLog wal; 84 118 private RowLocker rowLocker; … … 86 120 private Log log = LogFactory.getLog(getClass()); 87 121 private RepositoryMetrics metrics; 88 private final HBaseTableFactory hbaseTableFactory;122 private BlobManager blobManager; 89 123 90 124 public HBaseRepository(TypeManager typeManager, IdGenerator idGenerator, 91 BlobStoreAccessFactory blobStoreAccessFactory, RowLog wal, Configuration configuration, HBaseTableFactory hbaseTableFactory) throws IOException {125 RowLog wal, Configuration configuration, HBaseTableFactory hbaseTableFactory, BlobManager blobManager) throws IOException { 92 126 this.typeManager = typeManager; 93 127 this.idGenerator = idGenerator; 94 128 this.wal = wal; 95 this.hbaseTableFactory = hbaseTableFactory; 96 blobStoreAccessRegistry = new BlobStoreAccessRegistry(); 97 blobStoreAccessRegistry.setBlobStoreAccessFactory(blobStoreAccessFactory); 129 this.blobManager = blobManager; 130 98 131 recordTable = LilyHBaseSchema.getRecordTable(hbaseTableFactory); 99 132 … … 231 264 RecordEvent recordEvent = new RecordEvent(); 232 265 recordEvent.setType(Type.CREATE); 233 calculateRecordChanges(newRecord, dummyOriginalRecord, version, put, recordEvent, false); 266 Set<BlobReference> referencedBlobs = new HashSet<BlobReference>(); 267 Set<BlobReference> unReferencedBlobs = new HashSet<BlobReference>(); 268 269 calculateRecordChanges(newRecord, dummyOriginalRecord, version, put, recordEvent, referencedBlobs, unReferencedBlobs, false); 270 234 271 // Make sure the record type changed flag stays false for a newly 235 272 // created record 236 273 recordEvent.setRecordTypeChanged(false); 237 if (newRecord.getVersion() != null) 238 recordEvent.setVersionCreated(newRecord.getVersion()); 239 274 Long newVersion = newRecord.getVersion(); 275 if (newVersion != null) 276 recordEvent.setVersionCreated(newVersion); 277 278 // Reserve blobs so no other records can use them 279 reserveBlobs(null, referencedBlobs); 280 240 281 walMessage = wal.putMessage(recordId.toBytes(), null, recordEvent.toJsonBytes(), put); 241 282 long beforeHbase = System.currentTimeMillis(); 242 283 recordTable.put(put); 243 284 metrics.reportHBase(HBaseAction.PUT, System.currentTimeMillis()-beforeHbase); 285 286 // Remove the used blobs from the blobIncubator 287 blobManager.handleBlobReferences(recordId, referencedBlobs, unReferencedBlobs); 244 288 245 289 // Take Custom RowLock before releasing the HBase RowLock … … 258 302 Thread.currentThread().interrupt(); 259 303 throw new RecordException("Exception occurred while creating record <" + recordId + "> in HBase table", 304 e); 305 } catch (BlobException e) { 306 throw new RecordException("Exception occurred while creating record <" + recordId + ">", 260 307 e); 261 308 } finally { … … 284 331 285 332 newRecord.setResponseStatus(ResponseStatus.CREATED); 333 334 335 286 336 return newRecord; 287 337 } finally { … … 321 371 322 372 if (updateVersion) { 323 return updateMutableFields(record, useLatestRecordType); 373 try { 374 return updateMutableFields(record, useLatestRecordType); 375 } catch (BlobException e) { 376 throw new RecordException("Exception occurred while updating record <" + record.getId() + ">", 377 e); 378 } 324 379 } else { 325 380 return updateRecord(record, useLatestRecordType); … … 350 405 351 406 Put put = new Put(newRecord.getId().toBytes()); 407 Set<BlobReference> referencedBlobs = new HashSet<BlobReference>(); 408 Set<BlobReference> unReferencedBlobs = new HashSet<BlobReference>(); 352 409 RecordEvent recordEvent = new RecordEvent(); 353 410 recordEvent.setType(Type.UPDATE); 354 411 long newVersion = originalRecord.getVersion() == null ? 1 : originalRecord.getVersion() + 1; 355 if (calculateRecordChanges(newRecord, originalRecord, newVersion, put, recordEvent, useLatestRecordType)) { 356 putMessageOnWalAndProcess(recordId, rowLock, put, recordEvent); 357 newRecord.setResponseStatus(ResponseStatus.UPDATED); 358 } else { 359 newRecord.setResponseStatus(ResponseStatus.UP_TO_DATE); 360 } 412 413 if (calculateRecordChanges(newRecord, originalRecord, newVersion, put, recordEvent, referencedBlobs, unReferencedBlobs, useLatestRecordType)) { 414 // Reserve blobs so no other records can use them 415 reserveBlobs(record.getId(), referencedBlobs); 416 putMessageOnWalAndProcess(recordId, rowLock, put, recordEvent); 417 // Remove the used blobs from the blobIncubator and delete unreferenced blobs from the blobstore 418 blobManager.handleBlobReferences(recordId, referencedBlobs, unReferencedBlobs); 419 newRecord.setResponseStatus(ResponseStatus.UPDATED); 420 } else { 421 newRecord.setResponseStatus(ResponseStatus.UP_TO_DATE); 422 } 361 423 } catch (RowLogException e) { 362 424 throw new RecordException("Exception occurred while putting updated record <" + recordId … … 370 432 throw new RecordException("Exception occurred while updating record <" + recordId + "> on HBase table", 371 433 e); 434 } catch (BlobException e) { 435 throw new RecordException("Exception occurred while putting updated record <" + recordId 436 + "> on HBase table", e); 372 437 } finally { 373 438 unlockRow(rowLock); … … 397 462 // this information on the Put object and the RecordEvent 398 463 private boolean calculateRecordChanges(Record record, Record originalRecord, Long version, Put put, 399 RecordEvent recordEvent, boolean useLatestRecordType) throws RecordTypeNotFoundException, FieldTypeNotFoundException, 400 RecordException, TypeException, InvalidRecordException, InterruptedException { 464 RecordEvent recordEvent, Set<BlobReference> referencedBlobs, Set<BlobReference> unReferencedBlobs, boolean useLatestRecordType) throws InterruptedException, RecordTypeNotFoundException, TypeException, FieldTypeNotFoundException, BlobException, RecordException, InvalidRecordException { 401 465 QName recordTypeName = record.getRecordTypeName(); 402 466 Long recordTypeVersion = null; … … 410 474 411 475 // Check which fields have changed 412 Set<Scope> changedScopes = calculateChangedFields(record, originalRecord, recordType, version, put, recordEvent );476 Set<Scope> changedScopes = calculateChangedFields(record, originalRecord, recordType, version, put, recordEvent, referencedBlobs, unReferencedBlobs); 413 477 414 478 // If no versioned fields have changed, keep the original version … … 446 510 447 511 } 512 448 513 // Always set the version on the record. If no fields were changed this 449 514 // will give the latest version in the repository 450 515 record.setVersion(version); 451 516 452 517 if (versionedFieldsHaveChanged) { 453 518 recordEvent.setVersionCreated(version); 454 519 } 455 520 456 521 // Clear the list of deleted fields, as this is typically what the user will expect when using the 457 522 // record object for future updates. … … 486 551 487 552 private Set<Scope> calculateChangedFields(Record record, Record originalRecord, RecordType recordType, 488 Long version, Put put, RecordEvent recordEvent) throws FieldTypeNotFoundException, 489 RecordTypeNotFoundException, RecordException, TypeException, InterruptedException { 553 Long version, Put put, RecordEvent recordEvent, Set<BlobReference> referencedBlobs, Set<BlobReference> unReferencedBlobs) throws InterruptedException, FieldTypeNotFoundException, TypeException, BlobException, RecordTypeNotFoundException, RecordException { 490 554 Map<QName, Object> originalFields = originalRecord.getFields(); 491 555 Set<Scope> changedScopes = new HashSet<Scope>(); … … 493 557 Map<QName, Object> fields = getFieldsToUpdate(record); 494 558 495 changedScopes.addAll(calculateUpdateFields(fields, originalFields, null, version, put, recordEvent)); 496 559 changedScopes.addAll(calculateUpdateFields(fields, originalFields, null, version, put, recordEvent, referencedBlobs, unReferencedBlobs, false)); 560 for (BlobReference referencedBlob : referencedBlobs) { 561 referencedBlob.setRecordId(record.getId()); 562 } 563 for (BlobReference unReferencedBlob : unReferencedBlobs) { 564 unReferencedBlob.setRecordId(record.getId()); 565 } 497 566 // Update record types 498 567 for (Scope scope : changedScopes) { … … 525 594 526 595 private Set<Scope> calculateUpdateFields(Map<QName, Object> fields, Map<QName, Object> originalFields, Map<QName, Object> originalNextFields, 527 Long version, Put put, RecordEvent recordEvent) throws FieldTypeNotFoundException, 528 RecordTypeNotFoundException, RecordException, TypeException, InterruptedException { 596 Long version, Put put, RecordEvent recordEvent, Set<BlobReference> referencedBlobs, Set<BlobReference> unReferencedBlobs, boolean mutableUpdate) throws InterruptedException, FieldTypeNotFoundException, TypeException, BlobException, RecordTypeNotFoundException, RecordException { 529 597 Set<Scope> changedScopes = new HashSet<Scope>(); 530 598 for (Entry<QName, Object> field : fields.entrySet()) { … … 540 608 Scope scope = fieldType.getScope(); 541 609 byte[] fieldIdAsBytes = fieldType.getIdBytes(); 610 611 // Check if the newValue contains blobs 612 Set<BlobReference> newReferencedBlobs = getReferencedBlobs(fieldType, newValue); 613 referencedBlobs.addAll(newReferencedBlobs); 614 542 615 byte[] encodedFieldValue = encodeFieldValue(fieldType, newValue); 543 616 617 // Check if the previousValue contained blobs which should be deleted since they are no longer used 618 // In case of a mutable update, it is checked later if no other versions use the blob before deciding to delete it 619 if (Scope.NON_VERSIONED.equals(scope) || (mutableUpdate && Scope.VERSIONED_MUTABLE.equals(scope))) { 620 if (originalValue != null) { 621 Set<BlobReference> previouslyReferencedBlobs = getReferencedBlobs(fieldType, originalValue); 622 previouslyReferencedBlobs.removeAll(newReferencedBlobs); 623 unReferencedBlobs.addAll(previouslyReferencedBlobs); 624 } 625 } 626 627 // Set the value 544 628 if (Scope.NON_VERSIONED.equals(scope)) { 545 629 put.add(columnFamily, fieldIdAsBytes, 1L, encodedFieldValue); … … 558 642 return changedScopes; 559 643 } 560 561 private boolean updateNeeded(Object newValue, boolean fieldIsNewOrDeleted, Object originalValue) {562 return (!(((newValue == null) && (originalValue == null)) || (isDeleteMarker(newValue) && fieldIsNewOrDeleted) || (newValue.equals(originalValue))));563 }564 644 565 645 private byte[] encodeFieldValue(FieldType fieldType, Object fieldValue) throws FieldTypeNotFoundException, … … 569 649 ValueType valueType = fieldType.getValueType(); 570 650 571 // TODO validate with Class#isAssignableFrom()572 651 byte[] encodedFieldValue = valueType.toBytes(fieldValue); 573 652 encodedFieldValue = EncodingUtil.prefixValue(encodedFieldValue, EXISTS_FLAG); … … 581 660 private Record updateMutableFields(Record record, boolean latestRecordType) throws InvalidRecordException, 582 661 RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, 583 VersionNotFoundException, TypeException, RecordLockedException {662 VersionNotFoundException, TypeException, RecordLockedException, BlobException { 584 663 585 664 Record newRecord = record.clone(); … … 598 677 rowLock = lockRow(recordId); 599 678 600 Record originalRecord = read(record .getId(), version, null, new ReadContext());679 Record originalRecord = read(recordId, version, null, new ReadContext()); 601 680 602 681 // Update the mutable fields 603 Put put = new Put(record .getId().toBytes());682 Put put = new Put(recordId.toBytes()); 604 683 Map<QName, Object> fields = getFieldsToUpdate(record); 605 684 fields = filterMutableFields(fields); 606 685 Map<QName, Object> originalFields = filterMutableFields(originalRecord.getFields()); 686 Set<BlobReference> referencedBlobs = new HashSet<BlobReference>(); 687 Set<BlobReference> unReferencedBlobs = new HashSet<BlobReference>(); 607 688 RecordEvent recordEvent = new RecordEvent(); 608 689 recordEvent.setType(Type.UPDATE); 609 690 recordEvent.setVersionUpdated(version); 610 691 611 Set<Scope> changedScopes = calculateUpdateFields(fields, originalFields, getOriginalNextFields(recordId, version), version, put, recordEvent); 612 692 Set<Scope> changedScopes = calculateUpdateFields(fields, originalFields, getOriginalNextFields(recordId, version), version, put, recordEvent, referencedBlobs, unReferencedBlobs, true); 693 for (BlobReference referencedBlob : referencedBlobs) { 694 referencedBlob.setRecordId(recordId); 695 } 696 for (BlobReference unReferencedBlob : unReferencedBlobs) { 697 unReferencedBlob.setRecordId(recordId); 698 } 699 613 700 if (!changedScopes.isEmpty()) { 614 701 Long recordTypeVersion = latestRecordType ? null : record.getRecordTypeVersion(); … … 628 715 recordEvent.setVersionUpdated(version); 629 716 717 // Reserve blobs so no other records can use them 718 reserveBlobs(record.getId(), referencedBlobs); 719 630 720 putMessageOnWalAndProcess(recordId, rowLock, put, recordEvent); 721 722 // The unReferencedBlobs could still be in use in another version of the mutable field, 723 // therefore we filter them first 724 unReferencedBlobs = filterReferencedBlobs(recordId, unReferencedBlobs, version); 725 726 // Remove the used blobs from the blobIncubator 727 blobManager.handleBlobReferences(recordId, referencedBlobs, unReferencedBlobs); 631 728 } 632 729 } catch (RowLogException e) { … … 1018 1115 RecordEvent recordEvent = new RecordEvent(); 1019 1116 recordEvent.setType(Type.DELETE); 1020 1021 1117 RowLogMessage walMessage = wal.putMessage(recordId.toBytes(), null, recordEvent.toJsonBytes(), put); 1022 1118 if (!rowLocker.put(put, rowLock)) { … … 1077 1173 1078 1174 public void registerBlobStoreAccess(BlobStoreAccess blobStoreAccess) { 1079 blob StoreAccessRegistry.register(blobStoreAccess);1175 blobManager.register(blobStoreAccess); 1080 1176 } 1081 1177 1082 1178 public OutputStream getOutputStream(Blob blob) throws BlobException { 1083 return blobStoreAccessRegistry.getOutputStream(blob); 1084 } 1085 1086 public InputStream getInputStream(Blob blob) throws BlobNotFoundException, BlobException { 1087 return blobStoreAccessRegistry.getInputStream(blob); 1088 } 1089 1090 public void delete(Blob blob) throws BlobNotFoundException, BlobException { 1091 blobStoreAccessRegistry.delete(blob); 1179 return blobManager.getOutputStream(blob); 1180 } 1181 1182 public BlobInputStream getInputStream(RecordId recordId, Long version, QName fieldName, Integer multivalueIndex, Integer hierarchyIndex) throws BlobNotFoundException, BlobException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException, InterruptedException { 1183 Record record = read(recordId, version, Arrays.asList(new QName[]{fieldName})); 1184 FieldType fieldType = typeManager.getFieldTypeByName(fieldName); 1185 return blobManager.getInputStream(record, fieldName, multivalueIndex, hierarchyIndex, fieldType); 1186 } 1187 1188 public BlobInputStream getInputStream(RecordId recordId, QName fieldName) throws BlobNotFoundException, BlobException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException, InterruptedException { 1189 return getInputStream(recordId, null, fieldName, null, null); 1190 } 1191 1192 private Set<BlobReference> getReferencedBlobs(FieldTypeImpl fieldType, Object value) throws BlobException { 1193 fieldType.getValueType().getValues(value); 1194 HashSet<BlobReference> referencedBlobs = new HashSet<BlobReference>(); 1195 ValueType valueType = fieldType.getValueType(); 1196 PrimitiveValueType primitiveValueType = valueType.getPrimitive(); 1197 if ((primitiveValueType instanceof BlobValueType) && ! isDeleteMarker(value)) { 1198 Set<Object> values = valueType.getValues(value); 1199 for (Object object : values) { 1200 referencedBlobs.add(new BlobReference((Blob)object, null, fieldType)); 1201 } 1202 } 1203 return referencedBlobs; 1204 } 1205 1206 private void reserveBlobs(RecordId recordId, Set<BlobReference> referencedBlobs) throws IOException, 1207 InvalidRecordException { 1208 if (!referencedBlobs.isEmpty()) { 1209 // Check if the blob is newly uploaded 1210 Set<BlobReference> failedReservations = blobManager.reserveBlobs(referencedBlobs); 1211 // If not, filter those that are already used by the record 1212 failedReservations = filterReferencedBlobs(recordId, failedReservations, null); 1213 if (!failedReservations.isEmpty()) 1214 { 1215 throw new InvalidRecordException("Record references blobs which are not available for use", recordId); 1216 } 1217 } 1218 } 1219 1220 // Checks the set of blobs and returns a subset of those blobs which are not referenced anymore 1221 private Set<BlobReference> filterReferencedBlobs(RecordId recordId, Set<BlobReference> blobs, Long ignoreVersion) throws IOException { 1222 if (recordId == null) 1223 return blobs; 1224 Set<BlobReference> unReferencedBlobs = new HashSet<BlobReference>(); 1225 for (BlobReference blobReference : blobs) { 1226 FieldTypeImpl fieldType = (FieldTypeImpl)blobReference.getFieldType(); 1227 Get get = new Get(recordId.toBytes()); 1228 get.addColumn(columnFamily, fieldType.getIdBytes()); 1229 ValueType valueType = fieldType.getValueType(); 1230 byte[] valueToCompare; 1231 if (valueType.isMultiValue() && valueType.isHierarchical()) { 1232 valueToCompare = Bytes.toBytes(2); 1233 } else if (valueType.isMultiValue() || valueType.isHierarchical()) { 1234 valueToCompare = Bytes.toBytes(1); 1235 } else { 1236 valueToCompare = Bytes.toBytes(0); 1237 } 1238 valueToCompare = Bytes.add(valueToCompare, valueType.getPrimitive().toBytes(blobReference.getBlob())); 1239 WritableByteArrayComparable valueComparator = new ContainsValueComparator(valueToCompare); 1240 Filter filter = new ValueFilter(CompareOp.EQUAL, valueComparator); 1241 get.setFilter(filter); 1242 Result result = recordTable.get(get); 1243 1244 if (result.isEmpty()) { 1245 unReferencedBlobs.add(blobReference); 1246 } else { 1247 if (ignoreVersion != null) { 1248 boolean stillReferenced = false; 1249 List<KeyValue> column = result.getColumn(columnFamily, fieldType.getIdBytes()); 1250 for (KeyValue keyValue : column) { 1251 if (keyValue.getTimestamp() != ignoreVersion) { 1252 stillReferenced = true; 1253 break; 1254 } 1255 } 1256 if (!stillReferenced) { 1257 unReferencedBlobs.add(blobReference); 1258 } 1259 } 1260 } 1261 } 1262 return unReferencedBlobs; 1092 1263 } 1093 1264 -
trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/InlineBlobStoreAccess.java
r4592 r4621 49 49 // no-op 50 50 } 51 52 public boolean incubate() { 53 return false; 54 } 51 55 52 56 private class InlineBlobOutputStream extends ByteArrayOutputStream { -
trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/NettyTransceiverFactory.java
r4609 r4621 27 27 } 28 28 29 @Override30 29 public Thread newThread(Runnable r) { 31 30 Thread thread = delegate.newThread(r); -
trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/RemoteRepository.java
r4609 r4621 17 17 18 18 import java.io.IOException; 19 import java.io.InputStream;20 19 import java.io.OutputStream; 21 20 import java.lang.reflect.UndeclaredThrowableException; 22 21 import java.net.InetSocketAddress; 23 22 import java.util.ArrayList; 23 import java.util.Arrays; 24 24 import java.util.List; 25 25 import java.util.Set; … … 29 29 import org.apache.avro.ipc.Transceiver; 30 30 import org.apache.avro.ipc.specific.SpecificRequestor; 31 import org.lilyproject.repository.api.*; 32 import org.lilyproject.repository.avro.*; 31 import org.lilyproject.repository.api.Blob; 32 import org.lilyproject.repository.api.BlobException; 33 import org.lilyproject.repository.api.BlobInputStream; 34 import org.lilyproject.repository.api.BlobManager; 35 import org.lilyproject.repository.api.BlobNotFoundException; 36 import org.lilyproject.repository.api.BlobStoreAccess; 37 import org.lilyproject.repository.api.FieldType; 38 import org.lilyproject.repository.api.FieldTypeNotFoundException; 39 import org.lilyproject.repository.api.IORecordException; 40 import org.lilyproject.repository.api.IdGenerator; 41 import org.lilyproject.repository.api.IdRecord; 42 import org.lilyproject.repository.api.InvalidRecordException; 43 import org.lilyproject.repository.api.QName; 44 import org.lilyproject.repository.api.Record; 45 import org.lilyproject.repository.api.RecordException; 46 import org.lilyproject.repository.api.RecordExistsException; 47 import org.lilyproject.repository.api.RecordId; 48 import org.lilyproject.repository.api.RecordLockedException; 49 import org.lilyproject.repository.api.RecordNotFoundException; 50 import org.lilyproject.repository.api.RecordTypeNotFoundException; 51 import org.lilyproject.repository.api.Repository; 52 import org.lilyproject.repository.api.RepositoryException; 53 import org.lilyproject.repository.api.TypeException; 54 import org.lilyproject.repository.api.TypeManager; 55 import org.lilyproject.repository.api.VersionNotFoundException; 56 import org.lilyproject.repository.avro.AvroConverter; 57 import org.lilyproject.repository.avro.AvroFieldTypeNotFoundException; 58 import org.lilyproject.repository.avro.AvroGenericException; 59 import org.lilyproject.repository.avro.AvroInvalidRecordException; 60 import org.lilyproject.repository.avro.AvroLily; 61 import org.lilyproject.repository.avro.AvroQName; 62 import org.lilyproject.repository.avro.AvroRecordException; 63 import org.lilyproject.repository.avro.AvroRecordExistsException; 64 import org.lilyproject.repository.avro.AvroRecordLockedException; 65 import org.lilyproject.repository.avro.AvroRecordNotFoundException; 66 import org.lilyproject.repository.avro.AvroRecordTypeNotFoundException; 67 import org.lilyproject.repository.avro.AvroRepositoryException; 68 import org.lilyproject.repository.avro.AvroTypeException; 69 import org.lilyproject.repository.avro.AvroVersionNotFoundException; 33 70 import org.lilyproject.util.ArgumentValidator; 34 71 import org.lilyproject.util.io.Closer; … … 43 80 private IdGenerator idGenerator; 44 81 private final TypeManager typeManager; 45 private BlobStoreAccessRegistry blobStoreAccessRegistry;46 82 private Transceiver client; 83 private final BlobManager blobManager; 47 84 48 85 public RemoteRepository(InetSocketAddress address, AvroConverter converter, RemoteTypeManager typeManager, 49 IdGenerator idGenerator, Blob StoreAccessFactory blobStoreAccessFactory) throws IOException {86 IdGenerator idGenerator, BlobManager blobManager) throws IOException { 50 87 this.converter = converter; 51 88 this.typeManager = typeManager; 52 89 this.idGenerator = idGenerator; 53 blobStoreAccessRegistry = new BlobStoreAccessRegistry(); 54 blobStoreAccessRegistry.setBlobStoreAccessFactory(blobStoreAccessFactory); 55 90 this.blobManager = blobManager; 91 56 92 //client = new HttpTransceiver(new URL("http://" + address.getHostName() + ":" + address.getPort() + "/")); 57 93 client = new NettyTransceiver(address); … … 328 364 329 365 public void registerBlobStoreAccess(BlobStoreAccess blobStoreAccess) { 330 blobStoreAccessRegistry.register(blobStoreAccess); 331 } 332 333 public void delete(Blob blob) throws BlobNotFoundException, BlobException { 334 try { 335 lilyProxy.deleteBlob(converter.convert(blob)); 336 } catch (AvroBlobNotFoundException e) { 337 throw converter.convert(e); 338 } catch (AvroBlobException e) { 339 throw converter.convert(e); 340 } catch (AvroGenericException e) { 341 throw converter.convert(e); 342 } catch (AvroRemoteException e) { 343 throw converter.convert(e); 344 } catch (UndeclaredThrowableException e) { 345 throw handleUndeclaredBlobThrowable(e); 346 } 347 } 348 349 public InputStream getInputStream(Blob blob) throws BlobNotFoundException, BlobException { 350 return blobStoreAccessRegistry.getInputStream(blob); 366 blobManager.register(blobStoreAccess); 367 } 368 369 public BlobInputStream getInputStream(RecordId recordId, Long version, QName fieldName, Integer multivalueIndex, Integer hierarchyIndex) throws BlobNotFoundException, BlobException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException, InterruptedException { 370 Record record = read(recordId, version, Arrays.asList(new QName[]{fieldName})); 371 FieldType fieldType = typeManager.getFieldTypeByName(fieldName); 372 return blobManager.getInputStream(record, fieldName, multivalueIndex, hierarchyIndex, fieldType); 373 } 374 375 public BlobInputStream getInputStream(RecordId recordId, QName fieldName) throws BlobNotFoundException, BlobException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException, InterruptedException { 376 return getInputStream(recordId, null, fieldName, null, null); 351 377 } 352 378 353 379 public OutputStream getOutputStream(Blob blob) throws BlobException { 354 return blob StoreAccessRegistry.getOutputStream(blob);355 } 356 380 return blobManager.getOutputStream(blob); 381 } 382 357 383 private RuntimeException handleUndeclaredRecordThrowable(UndeclaredThrowableException e) throws RecordException { 358 384 if (e.getCause() instanceof IOException) { … … 362 388 } 363 389 } 364 365 private RuntimeException handleUndeclaredBlobThrowable(UndeclaredThrowableException e) throws BlobException {366 if (e.getCause() instanceof IOException) {367 throw new IOBlobException(e.getCause());368 } else {369 throw e;370 }371 }372 390 } 373 391 -
trunk/cr/repository/impl/src/main/java/org/lilyproject/repository/impl/ValueTypeImpl.java
r4592 r4621 17 17 18 18 import java.util.ArrayList; 19 import java.util.Arrays; 20 import java.util.HashSet; 19 21 import java.util.List; 22 import java.util.Set; 20 23 21 24 import org.apache.hadoop.hbase.util.Bytes; … … 41 44 return multiValue; 42 45 } 43 46 44 47 public boolean isHierarchical() { 45 48 return hierarchical; … … 79 82 return result; 80 83 } 81 84 82 85 private HierarchyPath fromHierarchicalBytes(byte[] bytes) { 83 86 List<Object> result = new ArrayList<Object>(); … … 94 97 return new HierarchyPath(result.toArray(new Object[result.size()])); 95 98 } 96 99 97 100 public byte[] toBytes(Object value) { 98 101 if (isMultiValue()) { … … 108 111 byte[] result; 109 112 result = new byte[0]; 110 for (Object element : ((List<Object>)value)) {113 for (Object element : ((List<Object>) value)) { 111 114 byte[] encodedValue; 112 115 if (isHierarchical()) { … … 120 123 return result; 121 124 } 122 125 123 126 private byte[] toHierarchyBytes(Object value) { 124 127 byte[] result; 125 128 result = new byte[0]; 126 for (Object element : ((HierarchyPath)value).getElements()) {129 for (Object element : ((HierarchyPath) value).getElements()) { 127 130 byte[] encodedValue = primitiveValueType.toBytes(element); 128 131 result = Bytes.add(result, Bytes.toBytes(encodedValue.length)); … … 135 138 return primitiveValueType; 136 139 } 137 140 138 141 public Class getType() { 139 142 if (isMultiValue()) { … … 142 145 return primitiveValueType.getType(); 143 146 } 144 147 145 148 public byte[] toBytes() { 146 149 StringBuilder stringBuilder = new StringBuilder(); … … 153 156 } 154 157 158 public Set<Object> getValues(Object value) { 159 Set<Object> result = new HashSet<Object>(); 160 if (isMultiValue()) { 161 result.addAll(getMultiValueValues(value)); 162 } else if (isHierarchical()) { 163 result.addAll(getHierarchyValues(value)); 164 } else { 165 result.add(value); 166 } 167 return result; 168 } 169 170 private Set<Object> getMultiValueValues(Object value) { 171 Set<Object> result = new HashSet<Object>(); 172 if (isHierarchical()) { 173 for (Object element : ((List<Object>) value)) { 174 result.addAll(getHierarchyValues(element)); 175 } 176 } else { 177 result.addAll((List<Object>) value); 178 } 179 return result; 180 } 181 182 private Set<Object> getHierarchyValues(Object value) { 183 Set<Object> result = new HashSet<Object>(); 184 result.addAll(Arrays.asList(((HierarchyPath) value).getElements())); 185 return result; 186 } 187 155 188 public static ValueType fromBytes(byte[] bytes, AbstractTypeManager typeManager) { 156 189 String encodedString = Bytes.toString(bytes); 157 190 int endOfPrimitiveValueTypeName = encodedString.indexOf(","); 158 191 String primitiveValueTypeName = encodedString.substring(0, endOfPrimitiveValueTypeName); 159 int endOfMultiValueBoolean = encodedString.indexOf(",", endOfPrimitiveValueTypeName+1); 160 boolean multiValue = Boolean.parseBoolean(encodedString.substring(endOfPrimitiveValueTypeName + 1, endOfMultiValueBoolean)); 192 int endOfMultiValueBoolean = encodedString.indexOf(",", endOfPrimitiveValueTypeName + 1); 193 boolean multiValue = Boolean.parseBoolean(encodedString.substring(endOfPrimitiveValueTypeName + 1, 194 endOfMultiValueBoolean)); 161 195 boolean hierarchical = Boolean.parseBoolean(encodedString.substring(endOfMultiValueBoolean + 1)); 162 196 return typeManager.getValueType(primitiveValueTypeName, multiValue, hierarchical); … … 190 224 return true; 191 225 } 192 193 226 } -
trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AbstractBlobStoreTest.java
r4611 r4621 22 22 import java.io.InputStream; 23 23 import java.io.OutputStream; 24 import java.net.URISyntaxException; 24 25 import java.util.Arrays; 25 26 import java.util.Random; 26 27 27 28 import org.apache.hadoop.conf.Configuration; 29 import org.apache.hadoop.fs.Path; 28 30 import org.apache.hadoop.hbase.util.Bytes; 29 31 import org.junit.Test; 30 32 import org.lilyproject.repository.api.Blob; 31 33 import org.lilyproject.repository.api.BlobException; 34 import org.lilyproject.repository.api.BlobManager; 32 35 import org.lilyproject.repository.api.BlobNotFoundException; 36 import org.lilyproject.repository.api.BlobStoreAccess; 33 37 import org.lilyproject.repository.api.FieldType; 34 38 import org.lilyproject.repository.api.FieldTypeEntry; 39 import org.lilyproject.repository.api.FieldTypeNotFoundException; 40 import org.lilyproject.repository.api.HierarchyPath; 41 import org.lilyproject.repository.api.InvalidRecordException; 35 42 import org.lilyproject.repository.api.QName; 36 43 import org.lilyproject.repository.api.Record; 44 import org.lilyproject.repository.api.RecordException; 45 import org.lilyproject.repository.api.RecordId; 46 import org.lilyproject.repository.api.RecordNotFoundException; 37 47 import org.lilyproject.repository.api.RecordType; 48 import org.lilyproject.repository.api.RecordTypeNotFoundException; 38 49 import org.lilyproject.repository.api.Repository; 39 50 import org.lilyproject.repository.api.Scope; 51 import org.lilyproject.repository.api.TypeException; 40 52 import org.lilyproject.repository.api.TypeManager; 53 import org.lilyproject.repository.api.VersionNotFoundException; 54 import org.lilyproject.repository.impl.BlobManagerImpl; 55 import org.lilyproject.repository.impl.BlobStoreAccessRegistry; 56 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 57 import org.lilyproject.repository.impl.HBaseBlobStoreAccess; 58 import org.lilyproject.repository.impl.InlineBlobStoreAccess; 59 import org.lilyproject.repository.impl.SizeBasedBlobStoreAccessFactory; 41 60 import org.lilyproject.rowlog.api.RowLog; 42 61 import org.lilyproject.rowlog.api.RowLogConfig; 43 62 import org.lilyproject.rowlog.api.RowLogConfigurationManager; 44 import org.lilyproject.rowlog.api.RowLogException;45 63 import org.lilyproject.rowlog.api.RowLogShard; 46 64 import org.lilyproject.rowlog.impl.RowLogConfigurationManagerImpl; 47 65 import org.lilyproject.rowlog.impl.RowLogImpl; 48 66 import org.lilyproject.rowlog.impl.RowLogShardImpl; 67 import org.lilyproject.testfw.HBaseProxy; 49 68 import org.lilyproject.util.hbase.HBaseTableFactory; 50 import org.lilyproject.util.hbase.HBaseTableFactoryImpl;51 69 import org.lilyproject.util.hbase.LilyHBaseSchema; 70 import org.lilyproject.util.hbase.LilyHBaseSchema.RecordCf; 52 71 import org.lilyproject.util.zookeeper.ZooKeeperItf; 53 import static org.lilyproject.util.hbase.LilyHBaseSchema.*;54 72 55 73 public abstract class AbstractBlobStoreTest { 74 protected final static HBaseProxy HBASE_PROXY = new HBaseProxy(); 56 75 protected static RowLog wal; 57 76 protected static Repository repository; … … 61 80 protected static RowLogConfigurationManager rowLogConfMgr; 62 81 protected static HBaseTableFactory hbaseTableFactory; 63 82 protected static SizeBasedBlobStoreAccessFactory blobStoreAccessFactory; 83 protected static BlobStoreAccess dfsBlobStoreAccess; 84 protected static BlobStoreAccess hbaseBlobStoreAccess; 85 protected static BlobStoreAccess inlineBlobStoreAccess; 86 protected static Random random = new Random(); 87 protected static BlobStoreAccessRegistry testBlobStoreAccessRegistry; 88 64 89 protected static void setupWal() throws Exception { 65 90 rowLogConfMgr = new RowLogConfigurationManagerImpl(zooKeeper); … … 70 95 wal.registerShard(walShard); 71 96 } 97 98 protected static BlobManager setupBlobManager() throws IOException, URISyntaxException { 99 dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 100 hbaseBlobStoreAccess = new HBaseBlobStoreAccess(configuration); 101 inlineBlobStoreAccess = new InlineBlobStoreAccess(); 102 blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 103 blobStoreAccessFactory.addBlobStoreAccess(50, inlineBlobStoreAccess); 104 blobStoreAccessFactory.addBlobStoreAccess(1024, hbaseBlobStoreAccess); 105 return new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 106 } 72 107 73 108 @Test 74 109 public void testCreate() throws Exception { 110 QName fieldName = new QName("test", "testCreate"); 111 FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, 112 Scope.NON_VERSIONED); 113 fieldType = typeManager.createFieldType(fieldType); 114 RecordType recordType = typeManager.newRecordType(new QName(null, "testCreateRT")); 115 FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 116 recordType.addFieldTypeEntry(fieldTypeEntry); 117 recordType = typeManager.createRecordType(recordType); 118 75 119 byte[] bytes = Bytes.toBytes("someBytes"); 76 Blob blob = new Blob("aMediaType", (long)bytes.length, "testCreate"); 77 OutputStream outputStream = repository.getOutputStream(blob); 78 outputStream.write(bytes); 79 outputStream.close(); 80 81 InputStream inputStream = repository.getInputStream(blob); 82 byte[] readBytes = new byte[blob.getSize().intValue()]; 83 inputStream.read(readBytes); 84 inputStream.close(); 120 Blob blob = writeBlob(bytes, "aMediaType", "testCreate"); 121 Record record = repository.newRecord(); 122 record.setRecordType(recordType.getName()); 123 record.setField(fieldName, blob); 124 record = repository.create(record); 125 126 byte[] readBytes = readBlob(record.getId(), fieldName, blob.getSize()); 85 127 assertTrue(Arrays.equals(bytes, readBytes)); 86 128 } 87 129 88 130 @Test 89 131 public void testThreeSizes() throws Exception { 90 Random random = new Random(); 132 QName fieldName1 = new QName("test", "testThreeSizes1"); 133 QName fieldName2 = new QName("test", "testThreeSizes2"); 134 QName fieldName3 = new QName("test", "testThreeSizes3"); 135 FieldType fieldType1 = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName1, 136 Scope.NON_VERSIONED); 137 fieldType1 = typeManager.createFieldType(fieldType1); 138 FieldType fieldType2 = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName2, 139 Scope.NON_VERSIONED); 140 fieldType2 = typeManager.createFieldType(fieldType2); 141 FieldType fieldType3 = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName3, 142 Scope.NON_VERSIONED); 143 fieldType3 = typeManager.createFieldType(fieldType3); 144 RecordType recordType = typeManager.newRecordType(new QName(null, "testThreeSizes")); 145 recordType.addFieldTypeEntry(typeManager.newFieldTypeEntry(fieldType1.getId(), true)); 146 recordType.addFieldTypeEntry(typeManager.newFieldTypeEntry(fieldType2.getId(), true)); 147 recordType.addFieldTypeEntry(typeManager.newFieldTypeEntry(fieldType3.getId(), true)); 148 recordType = typeManager.createRecordType(recordType); 149 91 150 byte[] small = new byte[10]; 92 151 random.nextBytes(small); … … 95 154 byte[] large = new byte[2048]; 96 155 random.nextBytes(large); 97 Blob smallBlob = new Blob("mime/small", (long)10, "small"); 98 Blob mediumBlob = new Blob("mime/medium", (long)100, "medium"); 99 Blob largeBlob = new Blob("mime/large", (long)2048, "large"); 100 OutputStream outputStream = repository.getOutputStream(smallBlob); 101 outputStream.write(small); 102 outputStream.close(); 103 outputStream = repository.getOutputStream(mediumBlob); 104 outputStream.write(medium); 105 outputStream.close(); 106 outputStream = repository.getOutputStream(largeBlob); 107 outputStream.write(large); 108 outputStream.close(); 109 110 InputStream inputStream = repository.getInputStream(smallBlob); 111 byte[] readBytes = new byte[10]; 112 inputStream.read(readBytes); 113 inputStream.close(); 156 157 Blob smallBlob = writeBlob(small, "mime/small", "small"); 158 Blob mediumBlob = writeBlob(medium, "mime/medium", "medium"); 159 Blob largeBlob = writeBlob(large, "mime/large", "large"); 160 161 Record record = repository.newRecord(); 162 record.setRecordType(recordType.getName()); 163 record.setField(fieldName1, smallBlob); 164 record.setField(fieldName2, mediumBlob); 165 record.setField(fieldName3, largeBlob); 166 record = repository.create(record); 167 168 byte[] readBytes = readBlob(record.getId(), fieldName1, smallBlob.getSize()); 114 169 assertTrue(Arrays.equals(small, readBytes)); 115 inputStream = repository.getInputStream(mediumBlob); 116 readBytes = new byte[100]; 117 inputStream.read(readBytes); 118 inputStream.close(); 170 readBytes = readBlob(record.getId(), fieldName2, mediumBlob.getSize()); 119 171 assertTrue(Arrays.equals(medium, readBytes)); 120 inputStream = repository.getInputStream(largeBlob); 121 readBytes = new byte[2048]; 122 inputStream.read(readBytes); 123 inputStream.close(); 172 readBytes = readBlob(record.getId(), fieldName3, largeBlob.getSize()); 124 173 assertTrue(Arrays.equals(large, readBytes)); 125 174 } 126 127 @Test 128 public void testCreateRecordWithBlob() throws Exception { 129 QName fieldName = new QName("test", "ablob"); 130 FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, Scope.VERSIONED); 131 fieldType = typeManager.createFieldType(fieldType); 132 RecordType recordType = typeManager.newRecordType(new QName(null, "testCreateRecordWithBlobRT")); 175 176 @Test 177 public void testCreateTwoRecordsWithSameBlob() throws Exception { 178 QName fieldName = new QName("test", "ablob2"); 179 FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, 180 Scope.VERSIONED); 181 fieldType = typeManager.createFieldType(fieldType); 182 RecordType recordType = typeManager.newRecordType(new QName(null, "testCreateTwoRecordsWithSameBlobRT")); 133 183 FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 134 184 recordType.addFieldTypeEntry(fieldTypeEntry); 135 185 recordType = typeManager.createRecordType(recordType); 186 187 byte[] bytes = Bytes.toBytes("someBytes"); 188 Blob blob = writeBlob(bytes, "aMediaType", "testCreate"); 189 136 190 Record record = repository.newRecord(); 137 191 record.setRecordType(recordType.getName(), null); 138 139 byte[] bytes = Bytes.toBytes("someBytes"); 140 Blob blob = new Blob("aMediaType", (long)bytes.length, "testCreate"); 192 record.setField(fieldName, blob); 193 record = repository.create(record); 194 195 Record record2 = repository.newRecord(); 196 record2.setRecordType(recordType.getName(), null); 197 record2.setField(fieldName, blob); 198 record2 = repository.create(record2); // For an inline record this succeeds 199 200 byte[] bytesLarge = new byte[3000]; 201 random.nextBytes(bytesLarge); 202 Blob largeBlob = writeBlob(bytesLarge, "largeBlob", "testCreate"); 203 204 Record record3 = repository.newRecord(); 205 record3.setRecordType(recordType.getName(), null); 206 record3.setField(fieldName, largeBlob); 207 record3 = repository.create(record3); 208 209 Record record4 = repository.newRecord(); 210 record4.setRecordType(recordType.getName(), null); 211 record4.setField(fieldName, largeBlob); 212 213 try { 214 record4 = repository.create(record4); 215 fail("Using the same blob in two records should not succeed"); 216 } catch (InvalidRecordException expected) { 217 System.out.println("[Evert]"); 218 } 219 } 220 221 @Test 222 public void testUpdateNonVersionedBlobHDFS() throws Exception { 223 testUpdateNonVersionedBlob(3000, true); 224 } 225 226 @Test 227 public void testUpdateNonVersionedBlobHBase() throws Exception { 228 testUpdateNonVersionedBlob(150, true); 229 } 230 231 @Test 232 public void testUpdateNonVersionedBlobInline() throws Exception { 233 testUpdateNonVersionedBlob(50, false); 234 } 235 236 private void testUpdateNonVersionedBlob(int size, boolean expectDelete) throws Exception { 237 QName fieldName = new QName("test", "testUpdateNonVersionedBlob"+size); 238 FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, 239 Scope.NON_VERSIONED); 240 fieldType = typeManager.createFieldType(fieldType); 241 RecordType recordType = typeManager.newRecordType(new QName(null, "testUpdateNonVersionedBlobRT"+size)); 242 FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 243 recordType.addFieldTypeEntry(fieldTypeEntry); 244 recordType = typeManager.createRecordType(recordType); 245 246 byte[] bytes = new byte[size]; 247 random.nextBytes(bytes); 248 Blob blob = writeBlob(bytes, "aMediaType", "testUpdateNonVersionedBlob"); 249 250 byte[] bytes2 = new byte[size]; 251 random.nextBytes(bytes2); 252 Blob blob2 = writeBlob(bytes2, "aMediaType", "testUpdateNonVersionedBlob2"); 253 254 Record record = repository.newRecord(); 255 record.setRecordType(recordType.getName(), null); 256 record.setField(fieldName, blob); 257 record = repository.create(record); 258 259 Record record2 = repository.newRecord(record.getId()); 260 record2.setRecordType(recordType.getName(), null); 261 record2.setField(fieldName, blob2); 262 263 record = repository.update(record2); 264 265 // Reading should return blob2 266 byte[] readBytes = readBlob(record.getId(), record.getVersion(), fieldName, null, null, size); 267 assertTrue(Arrays.equals(bytes2, readBytes)); 268 269 assertBlobDelete(expectDelete, blob); 270 } 271 272 @Test 273 public void testDeleteNonVersionedBlobHDFS() throws Exception { 274 testDeleteNonVersionedBlob(3000, true); 275 } 276 277 @Test 278 public void testDeleteNonVersionedBlobHBase() throws Exception { 279 testDeleteNonVersionedBlob(150, true); 280 } 281 282 @Test 283 public void testDeleteNonVersionedBlobInline() throws Exception { 284 testDeleteNonVersionedBlob(50, false); 285 } 286 287 private void testDeleteNonVersionedBlob(int size, boolean expectDelete) throws Exception { 288 QName fieldName = new QName("test", "testDeleteNonVersionedBlob"+size); 289 FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, 290 Scope.NON_VERSIONED); 291 fieldType = typeManager.createFieldType(fieldType); 292 RecordType recordType = typeManager.newRecordType(new QName(null, "testDeleteNonVersionedBlobRT"+size)); 293 FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), false); 294 recordType.addFieldTypeEntry(fieldTypeEntry); 295 recordType = typeManager.createRecordType(recordType); 296 297 byte[] bytes = new byte[size]; 298 random.nextBytes(bytes); 299 Blob blob = writeBlob(bytes, "aMediaType", "testDeleteNonVersionedBlob"); 300 301 Record record = repository.newRecord(); 302 record.setRecordType(recordType.getName(), null); 303 record.setField(fieldName, blob); 304 record = repository.create(record); 305 306 Record record2 = repository.newRecord(record.getId()); 307 record2.setRecordType(recordType.getName(), null); 308 record2.addFieldsToDelete(Arrays.asList(new QName[]{fieldName})); 309 310 record = repository.update(record2); 311 312 assertBlobDelete(expectDelete, blob); 313 } 314 315 @Test 316 public void testUpdateMutableBlobHDFS() throws Exception { 317 testUpdateMutableBlob(3000, true); 318 } 319 320 @Test 321 public void testUpdateMutableBlobHBase() throws Exception { 322 testUpdateMutableBlob(150, true); 323 } 324 325 @Test 326 public void testUpdateMutableBlobInline() throws Exception { 327 testUpdateMutableBlob(50, false); 328 } 329 330 private void testUpdateMutableBlob(int size, boolean expectDelete) throws Exception { 331 QName fieldName = new QName("test", "testUpdateMutableBlob"+size); 332 FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, 333 Scope.VERSIONED_MUTABLE); 334 fieldType = typeManager.createFieldType(fieldType); 335 RecordType recordType = typeManager.newRecordType(new QName(null, "testUpdateMutableBlobRT"+size)); 336 FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 337 recordType.addFieldTypeEntry(fieldTypeEntry); 338 recordType = typeManager.createRecordType(recordType); 339 340 byte[] bytes = new byte[size]; 341 random.nextBytes(bytes); 342 Blob blob = writeBlob(bytes, "aMediaType", "testUpdateMutableBlob"); 343 344 byte[] bytes2 = new byte[size]; 345 random.nextBytes(bytes2); 346 Blob blob2 = writeBlob(bytes2, "aMediaType", "testUpdateMutableBlob2"); 347 348 Record record = repository.newRecord(); 349 record.setRecordType(recordType.getName(), null); 350 record.setField(fieldName, blob); 351 record = repository.create(record); 352 353 Record record2 = repository.newRecord(record.getId()); 354 record2.setRecordType(recordType.getName(), null); 355 record2.setField(fieldName, blob2); 356 record2.setVersion(record.getVersion()); 357 358 record = repository.update(record2, true, false); 359 360 // Blob2 should still exist 361 byte[] readBytes = readBlob(record.getId(), record.getVersion(), fieldName, null, null, size); 362 assertTrue(Arrays.equals(bytes2, readBytes)); 363 364 assertBlobDelete(expectDelete, blob); 365 } 366 367 @Test 368 public void testDeleteMutableBlobHDFS() throws Exception { 369 testDeleteMutableBlob(3000, true); 370 } 371 372 @Test 373 public void testDeleteMutableBlobHBase() throws Exception { 374 testDeleteMutableBlob(150, true); 375 } 376 377 @Test 378 public void testDeleteMutableBlobInline() throws Exception { 379 testDeleteMutableBlob(50, false); 380 } 381 382 private void testDeleteMutableBlob(int size, boolean expectDelete) throws Exception { 383 QName fieldName = new QName("test", "testDeleteMutableBlob"+size); 384 FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, false), fieldName, 385 Scope.VERSIONED_MUTABLE); 386 fieldType = typeManager.createFieldType(fieldType); 387 RecordType recordType = typeManager.newRecordType(new QName(null, "testDeleteMutableBlobRT"+size)); 388 FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), false); 389 recordType.addFieldTypeEntry(fieldTypeEntry); 390 recordType = typeManager.createRecordType(recordType); 391 392 byte[] bytes = new byte[size]; 393 random.nextBytes(bytes); 394 Blob blob = writeBlob(bytes, "aMediaType", "testDeleteMutableBlob"); 395 396 byte[] bytes2 = new byte[size]; 397 random.nextBytes(bytes2); 398 Blob blob2 = writeBlob(bytes2, "aMediaType", "testDeleteMutableBlob2"); 399 400 Record record = repository.newRecord(); 401 record.setRecordType(recordType.getName(), null); 402 record.setField(fieldName, blob); 403 record = repository.create(record); 404 405 Record record2 = repository.newRecord(record.getId()); 406 record2.setRecordType(recordType.getName(), null); 407 record2.setField(fieldName, blob2); 408 repository.update(record2, false, false); 409 410 // Blob1 should still exist 411 byte[] readBytes = readBlob(record.getId(), record.getVersion(), fieldName, null, null, size); 412 assertTrue(Arrays.equals(bytes, readBytes)); 413 // Blob2 should still exist 414 readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, null, null, size); 415 assertTrue(Arrays.equals(bytes2, readBytes)); 416 417 Record record3 = repository.newRecord(record.getId()); 418 record3.setRecordType(recordType.getName(), null); 419 record3.addFieldsToDelete(Arrays.asList(new QName[]{fieldName})); 420 record3.setVersion(record.getVersion()); 421 repository.update(record3, true, false); 422 423 // Blob2 should still exist 424 readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, null, null, size); 425 assertTrue(Arrays.equals(bytes2, readBytes)); 426 427 assertBlobDelete(expectDelete, blob); 428 } 429 430 @Test 431 public void testUpdateMutableMultivalueBlobHDFS() throws Exception { 432 testUpdateMutableMultivalueBlob(3000, true); 433 } 434 435 @Test 436 public void testUpdateMutableMultivalueBlobHBase() throws Exception { 437 testUpdateMutableMultivalueBlob(150, true); 438 } 439 440 @Test 441 public void testUpdateMutableMultivalueBlobInline() throws Exception { 442 testUpdateMutableMultivalueBlob(50, false); 443 } 444 445 private void testUpdateMutableMultivalueBlob(int size, boolean expectDelete) throws Exception { 446 QName fieldName = new QName("test", "testUpdateMutableMultivalueBlob"+size); 447 FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", true, false), fieldName, 448 Scope.VERSIONED_MUTABLE); 449 fieldType = typeManager.createFieldType(fieldType); 450 RecordType recordType = typeManager.newRecordType(new QName(null, "testUpdateMutableMultivalueBlobRT"+size)); 451 FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 452 recordType.addFieldTypeEntry(fieldTypeEntry); 453 recordType = typeManager.createRecordType(recordType); 454 455 byte[] bytes = new byte[size]; 456 random.nextBytes(bytes); 457 Blob blob = writeBlob(bytes, "aMediaType", "testUpdateMutableMultivalueBlob"); 458 459 byte[] bytes2 = new byte[size]; 460 random.nextBytes(bytes2); 461 Blob blob2 = writeBlob(bytes2, "aMediaType", "testUpdateMutableMultivalueBlob2"); 462 463 byte[] bytes3 = new byte[size]; 464 random.nextBytes(bytes3); 465 Blob blob3 = writeBlob(bytes3, "aMediaType", "testUpdateMutableMultivalueBlob3"); 466 467 byte[] bytes4 = new byte[size]; 468 random.nextBytes(bytes4); 469 Blob blob4 = writeBlob(bytes4, "aMediaType", "testUpdateMutableMultivalueBlob4"); 470 471 Record record = repository.newRecord(); 472 record.setRecordType(recordType.getName(), null); 473 record.setField(fieldName, Arrays.asList(new Blob[]{blob, blob2})); 474 record = repository.create(record); 475 476 Record record2 = repository.newRecord(record.getId()); 477 record2.setRecordType(recordType.getName(), null); 478 record2.setField(fieldName, Arrays.asList(new Blob[]{blob2, blob3})); 479 record2 = repository.update(record2, false, false); 480 481 // Mutable update of first version 482 Record record3 = repository.newRecord(record.getId()); 483 record3.setVersion(record.getVersion()); 484 record3.setRecordType(recordType.getName(), null); 485 record3.setField(fieldName, Arrays.asList(new Blob[]{blob4})); 486 record3 = repository.update(record3, true, false); 487 488 //Blob2 489 byte[] readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, 0, null, size); 490 assertTrue(Arrays.equals(bytes2, readBytes)); 491 492 //Blob3 493 readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, 1, null, size); 494 assertTrue(Arrays.equals(bytes3, readBytes)); 495 496 //Blob4 in version 1 497 readBytes = readBlob(record.getId(), record.getVersion(), fieldName, 0, null, size); 498 assertTrue(Arrays.equals(bytes4, readBytes)); 499 500 assertBlobDelete(expectDelete, blob); 501 502 try { 503 readBlob(record.getId(), record.getVersion(), fieldName, null, null, size); 504 fail("BlobNotFoundException expected since index should not be null"); 505 } catch (BlobNotFoundException expected) { 506 507 } 508 509 try { 510 readBlob(record.getId(), record.getVersion(), fieldName, 1, null, size); 511 fail("BlobNotFoundException expected since index is out of bounds"); 512 } catch (BlobNotFoundException expected) { 513 514 } 515 516 } 517 518 @Test 519 public void testUpdateMutableHierarchyBlobHDFS() throws Exception { 520 testUpdateMutableHierarchyBlob(3000, true); 521 } 522 523 @Test 524 public void testUpdateMutableHierarchyBlobHBase() throws Exception { 525 testUpdateMutableHierarchyBlob(150, true); 526 } 527 528 @Test 529 public void testUpdateMutableHierarchyBlobInline() throws Exception { 530 testUpdateMutableHierarchyBlob(50, false); 531 } 532 533 private void testUpdateMutableHierarchyBlob(int size, boolean expectDelete) throws Exception { 534 QName fieldName = new QName("test", "testUpdateMutableHierarchyBlob"+size); 535 FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", false, true), fieldName, 536 Scope.VERSIONED_MUTABLE); 537 fieldType = typeManager.createFieldType(fieldType); 538 RecordType recordType = typeManager.newRecordType(new QName(null, "testUpdateMutableHierarchyBlobRT"+size)); 539 FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 540 recordType.addFieldTypeEntry(fieldTypeEntry); 541 recordType = typeManager.createRecordType(recordType); 542 543 byte[] bytes = new byte[size]; 544 random.nextBytes(bytes); 545 Blob blob = writeBlob(bytes, "aMediaType", "testUpdateMutableHierarchyBlob"); 546 547 byte[] bytes2 = new byte[size]; 548 random.nextBytes(bytes2); 549 Blob blob2 = writeBlob(bytes2, "aMediaType", "testUpdateMutableHierarchyBlob2"); 550 551 byte[] bytes3 = new byte[size]; 552 random.nextBytes(bytes3); 553 Blob blob3 = writeBlob(bytes3, "aMediaType", "testUpdateMutableHierarchyBlob3"); 554 555 byte[] bytes4 = new byte[size]; 556 random.nextBytes(bytes4); 557 Blob blob4 = writeBlob(bytes4, "aMediaType", "testUpdateMutableHierarchyBlob4"); 558 559 Record record = repository.newRecord(); 560 record.setRecordType(recordType.getName(), null); 561 record.setField(fieldName, new HierarchyPath(blob, blob2)); 562 record = repository.create(record); 563 564 Record record2 = repository.newRecord(record.getId()); 565 record2.setRecordType(recordType.getName(), null); 566 record2.setField(fieldName, new HierarchyPath(blob2, blob3, blob4)); 567 record2 = repository.update(record2, false, false); 568 569 // Mutable update of first version 570 Record record3 = repository.newRecord(record.getId()); 571 record3.setVersion(record.getVersion()); 572 record3.setRecordType(recordType.getName(), null); 573 record3.setField(fieldName, new HierarchyPath(blob4, blob4)); 574 record3 = repository.update(record3, true, false); 575 576 // Blob2 577 byte[] readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, null, 0, size); 578 assertTrue(Arrays.equals(bytes2, readBytes)); 579 580 // Blob3 581 readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, null, 1, size); 582 assertTrue(Arrays.equals(bytes3, readBytes)); 583 584 // Blob4 in version1 585 readBytes = readBlob(record.getId(), record.getVersion(), fieldName, null, 1, size); 586 assertTrue(Arrays.equals(bytes4, readBytes)); 587 588 assertBlobDelete(expectDelete, blob); 589 590 try { 591 readBlob(record.getId(), record.getVersion(), fieldName, null, null, size); 592 fail("BlobNotFoundException expected since index should not be null"); 593 } catch (BlobNotFoundException expected) { 594 595 } 596 597 try { 598 readBlob(record.getId(), record.getVersion(), fieldName, null, 2, size); 599 fail("BlobNotFoundException expected since index is out of bounds"); 600 } catch (BlobNotFoundException expected) { 601 602 } 603 } 604 605 @Test 606 public void testUpdateMutableMultivalueHierarchyBlobHDFS() throws Exception { 607 testUpdateMutableMultivalueHierarchyBlob(3000, true); 608 } 609 610 @Test 611 public void testUpdateMutableMultivalueHierarchyBlobHBase() throws Exception { 612 testUpdateMutableMultivalueHierarchyBlob(150, true); 613 } 614 615 @Test 616 public void testUpdateMutableMultivalueHierarchyBlobInline() throws Exception { 617 testUpdateMutableMultivalueHierarchyBlob(50, false); 618 } 619 620 private void testUpdateMutableMultivalueHierarchyBlob(int size, boolean expectDelete) throws Exception { 621 QName fieldName = new QName("test", "testUpdateMutableMultivalueHierarchyBlob"+size); 622 FieldType fieldType = typeManager.newFieldType(typeManager.getValueType("BLOB", true, true), fieldName, 623 Scope.VERSIONED_MUTABLE); 624 fieldType = typeManager.createFieldType(fieldType); 625 RecordType recordType = typeManager.newRecordType(new QName(null, "testUpdateMutableMultivalueHierarchyBlobRT"+size)); 626 FieldTypeEntry fieldTypeEntry = typeManager.newFieldTypeEntry(fieldType.getId(), true); 627 recordType.addFieldTypeEntry(fieldTypeEntry); 628 recordType = typeManager.createRecordType(recordType); 629 630 byte[] bytes = new byte[size]; 631 random.nextBytes(bytes); 632 Blob blob = writeBlob(bytes, "aMediaType", "testUpdateMutableMultivalueHierarchyBlob"); 633 634 byte[] bytes2 = new byte[size]; 635 random.nextBytes(bytes); 636 Blob blob2 = writeBlob(bytes2, "aMediaType", "testUpdateMutableMultivalueHierarchyBlob2"); 637 638 byte[] bytes3 = new byte[size]; 639 random.nextBytes(bytes3); 640 Blob blob3 = writeBlob(bytes3, "aMediaType", "testUpdateMutableMultivalueHierarchyBlob3"); 641 642 byte[] bytes4 = new byte[size]; 643 random.nextBytes(bytes4); 644 Blob blob4 = writeBlob(bytes4, "aMediaType", "testUpdateMutableMultivalueHierarchyBlob4"); 645 646 Record record = repository.newRecord(); 647 record.setRecordType(recordType.getName(), null); 648 record.setField(fieldName, Arrays.asList(new HierarchyPath[]{new HierarchyPath(blob, blob2), new HierarchyPath(blob3)})); 649 record = repository.create(record); 650 651 Record record2 = repository.newRecord(record.getId()); 652 record2.setRecordType(recordType.getName(), null); 653 record2.setField(fieldName, Arrays.asList(new HierarchyPath[]{new HierarchyPath(blob2), new HierarchyPath(blob3, blob4)})); 654 record2 = repository.update(record2, false, false); 655 656 // Mutable update of first version 657 Record record3 = repository.newRecord(record.getId()); 658 record3.setVersion(record.getVersion()); 659 record3.setRecordType(recordType.getName(), null); 660 record3.setField(fieldName, Arrays.asList(new HierarchyPath[]{new HierarchyPath(blob3, blob4), new HierarchyPath(blob4)})); 661 record3 = repository.update(record3, true, false); 662 663 // Blob2 664 byte[] readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, 0, 0, size); 665 assertTrue(Arrays.equals(bytes2, readBytes)); 666 667 // Blob3 668 readBytes = readBlob(record2.getId(), record2.getVersion(), fieldName, 1, 0, size); 669 assertTrue(Arrays.equals(bytes3, readBytes)); 670 671 // Blob4 in version1 672 readBytes = readBlob(record.getId(), record.getVersion(), fieldName, 0, 1, size); 673 assertTrue(Arrays.equals(bytes4, readBytes)); 674 675 assertBlobDelete(expectDelete, blob); 676 677 try { 678 readBlob(record.getId(), record.getVersion(), fieldName, null, null, size); 679 fail("BlobNotFoundException expected since index should not be null"); 680 } catch (BlobNotFoundException expected) { 681 682 } 683 684 try { 685 readBlob(record.getId(), record.getVersion(), fieldName, 0, null, size); 686 fail("BlobNotFoundException expected since index should not be null"); 687 } catch (BlobNotFoundException expected) { 688 689 } 690 691 try { 692 readBlob(record.getId(), record.getVersion(), fieldName, 2, 0, size); 693 fail("BlobNotFoundException expected since index is out of bounds"); 694 } catch (BlobNotFoundException expected) { 695 696 } 697 698 try { 699 readBlob(record.getId(), record.getVersion(), fieldName, 1, 1, size); 700 fail("BlobNotFoundException expected since index is out of bounds"); 701 } catch (BlobNotFoundException expected) { 702 703 } 704 } 705 706 private void assertBlobDelete(boolean expectDelete, Blob blob) throws BlobNotFoundException, BlobException { 707 if (expectDelete) { 708 try { 709 testBlobStoreAccessRegistry.getInputStream(blob); 710 fail("The blob " + blob + " should have been deleted."); 711 } catch (BlobException expected) { 712 } 713 } else { 714 testBlobStoreAccessRegistry.getInputStream(blob); 715 } 716 } 717 718 719 @Test 720 public void testBadEncoding() throws Exception { 721 Blob blob = new Blob("aMediaType", (long) 10, "aName"); 722 blob.setValue(new byte[0]); 723 try { 724 testBlobStoreAccessRegistry.getInputStream(blob); 725 fail(); 726 } catch (BlobException expected) { 727 } 728 } 729 730 private Blob writeBlob(byte[] bytes, String mediaType, String name) throws BlobException, InterruptedException, 731 IOException { 732 Blob blob = new Blob(mediaType, (long) bytes.length, name); 141 733 OutputStream outputStream = repository.getOutputStream(blob); 142 734 outputStream.write(bytes); 143 735 outputStream.close(); 144 record.setField(fieldName, blob); 145 record = repository.create(record); 146 147 record = repository.read(record.getId()); 148 blob = (Blob)record.getField(fieldName); 149 InputStream inputStream = repository.getInputStream(blob); 150 byte[] readBytes = new byte[blob.getSize().intValue()]; 736 return blob; 737 } 738 739 private byte[] readBlob(RecordId recordId, QName fieldName, long size) throws BlobNotFoundException, BlobException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException, InterruptedException, IOException { 740 return readBlob(recordId, null, fieldName, null, null, size); 741 } 742 743 private byte[] readBlob(RecordId recordId, Long version, QName fieldName, Integer multivalueIndex, Integer hierarchyIndex, long size) throws BlobNotFoundException, BlobException, InterruptedException, IOException, RecordNotFoundException, RecordTypeNotFoundException, FieldTypeNotFoundException, RecordException, VersionNotFoundException, TypeException { 744 InputStream inputStream = repository.getInputStream(recordId, version, fieldName, multivalueIndex, hierarchyIndex); 745 byte[] readBytes = new byte[(int)size]; 151 746 inputStream.read(readBytes); 152 747 inputStream.close(); 153 assertTrue(Arrays.equals(bytes, readBytes)); 154 } 155 156 @Test 157 public void testReadBlobWithoutName() throws Exception { 158 Blob blob = new Blob("aMediaType", (long)10, "aName"); 159 try { 160 repository.getInputStream(blob); 161 fail(); 162 } catch (BlobNotFoundException expected) { 163 } 164 } 165 166 @Test 167 public void testBadEncoding() throws Exception { 168 Blob blob = new Blob("aMediaType", (long)10, "aName"); 169 blob.setValue(new byte[0]); 170 try { 171 repository.getInputStream(blob); 172 fail(); 173 } catch (BlobException expected) { 174 } 175 } 176 177 @Test 178 public void testDelete() throws Exception { 179 byte[] small = new byte[10]; 180 byte[] medium = new byte[100]; 181 byte[] large = new byte[2048]; 182 Blob smallBlob = new Blob("mime/small", (long)10, "small"); 183 OutputStream outputStream = repository.getOutputStream(smallBlob); 184 outputStream.write(small); 185 outputStream.close(); 186 Blob mediumBlob = new Blob("mime/medium", (long)100, "medium"); 187 outputStream = repository.getOutputStream(mediumBlob); 188 outputStream.write(medium); 189 outputStream.close(); 190 Blob largeBlob = new Blob("mime/large", (long)10, "large"); 191 outputStream = repository.getOutputStream(largeBlob); 192 outputStream.write(large); 193 outputStream.close(); 194 195 repository.delete(smallBlob); 196 // TODO ok to ignore a delete of an inline blob? it will be deleted when the record is deleted 197 repository.getInputStream(smallBlob); 198 repository.delete(mediumBlob); 199 try { 200 repository.getInputStream(smallBlob); 201 } catch (BlobException expected) { 202 } 203 repository.delete(largeBlob); 204 try { 205 repository.getInputStream(smallBlob); 206 } catch (BlobException expected) { 207 } 748 return readBytes; 208 749 } 209 750 } -
trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AbstractRepositoryTest.java
r4611 r4621 35 35 import org.junit.Before; 36 36 import org.junit.Test; 37 import org.lilyproject.repository.api.*; 37 import org.lilyproject.repository.api.FieldNotFoundException; 38 import org.lilyproject.repository.api.FieldType; 39 import org.lilyproject.repository.api.IdGenerator; 40 import org.lilyproject.repository.api.IdRecord; 41 import org.lilyproject.repository.api.InvalidRecordException; 42 import org.lilyproject.repository.api.QName; 43 import org.lilyproject.repository.api.Record; 44 import org.lilyproject.repository.api.RecordId; 45 import org.lilyproject.repository.api.RecordNotFoundException; 46 import org.lilyproject.repository.api.RecordType; 47 import org.lilyproject.repository.api.RecordTypeNotFoundException; 48 import org.lilyproject.repository.api.Repository; 49 import org.lilyproject.repository.api.ResponseStatus; 50 import org.lilyproject.repository.api.Scope; 51 import org.lilyproject.repository.api.TypeManager; 52 import org.lilyproject.repository.api.VersionNotFoundException; 38 53 import org.lilyproject.repository.impl.IdGeneratorImpl; 39 54 import org.lilyproject.rowlog.api.RowLog; … … 50 65 import org.lilyproject.testfw.HBaseProxy; 51 66 import org.lilyproject.util.hbase.HBaseTableFactory; 52 import org.lilyproject.util.hbase. HBaseTableFactoryImpl;67 import org.lilyproject.util.hbase.LilyHBaseSchema.RecordCf; 53 68 import org.lilyproject.util.hbase.LilyHBaseSchema; 54 69 import org.lilyproject.util.repo.VersionTag; 55 70 import org.lilyproject.util.zookeeper.ZooKeeperItf; 56 import static org.lilyproject.util.hbase.LilyHBaseSchema.*;57 71 58 72 public abstract class AbstractRepositoryTest { -
trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AvroRepositoryTest.java
r4611 r4621 24 24 import org.junit.AfterClass; 25 25 import org.junit.BeforeClass; 26 import org.lilyproject.repository.api.BlobManager; 26 27 import org.lilyproject.repository.api.BlobStoreAccessFactory; 27 28 import org.lilyproject.repository.api.TypeManager; … … 30 31 import org.lilyproject.repository.avro.AvroLilyImpl; 31 32 import org.lilyproject.repository.avro.LilySpecificResponder; 33 import org.lilyproject.repository.impl.BlobManagerImpl; 32 34 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 33 35 import org.lilyproject.repository.impl.HBaseRepository; … … 60 62 DFSBlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 61 63 BlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 64 BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 62 65 setupWal(); 63 serverRepository = new HBaseRepository(serverTypeManager, idGenerator, blobStoreAccessFactory , wal, configuration, hbaseTableFactory);66 serverRepository = new HBaseRepository(serverTypeManager, idGenerator, wal, configuration, hbaseTableFactory, blobManager); 64 67 65 68 AvroConverter serverConverter = new AvroConverter(); … … 73 76 remoteConverter, idGenerator, zooKeeper); 74 77 repository = new RemoteRepository(new InetSocketAddress(lilyServer.getPort()), remoteConverter, 75 (RemoteTypeManager)typeManager, idGenerator, blob StoreAccessFactory);78 (RemoteTypeManager)typeManager, idGenerator, blobManager); 76 79 remoteConverter.setRepository(repository); 77 80 ((RemoteTypeManager)typeManager).start(); … … 80 83 setupMessageQueueProcessor(); 81 84 } 82 85 83 86 @AfterClass 84 87 public static void tearDownAfterClass() throws Exception { -
trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AvroTypeManagerFieldTypeTest.java
r4611 r4621 28 28 import org.junit.Before; 29 29 import org.junit.BeforeClass; 30 import org.lilyproject.repository.api.BlobManager; 30 31 import org.lilyproject.repository.api.BlobStoreAccessFactory; 31 32 import org.lilyproject.repository.api.Repository; … … 35 36 import org.lilyproject.repository.avro.AvroLilyImpl; 36 37 import org.lilyproject.repository.avro.LilySpecificResponder; 38 import org.lilyproject.repository.impl.BlobManagerImpl; 37 39 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 38 40 import org.lilyproject.repository.impl.HBaseRepository; … … 54 56 import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 55 57 import org.lilyproject.util.hbase.LilyHBaseSchema; 58 import org.lilyproject.util.hbase.LilyHBaseSchema.RecordCf; 56 59 import org.lilyproject.util.io.Closer; 57 60 import org.lilyproject.util.zookeeper.ZkUtil; 58 61 import org.lilyproject.util.zookeeper.ZooKeeperItf; 59 import static org.lilyproject.util.hbase.LilyHBaseSchema.*;60 62 61 63 public class AvroTypeManagerFieldTypeTest extends AbstractTypeManagerFieldTypeTest { … … 84 86 DFSBlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 85 87 BlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 86 serverRepository = new HBaseRepository(serverTypeManager, idGenerator, blobStoreAccessFactory, wal, configuration, hbaseTableFactory); 88 BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 89 serverRepository = new HBaseRepository(serverTypeManager, idGenerator, wal, configuration, hbaseTableFactory, blobManager); 87 90 88 91 AvroConverter serverConverter = new AvroConverter(); … … 96 99 remoteConverter, idGenerator, zooKeeper); 97 100 repository = new RemoteRepository(new InetSocketAddress(lilyServer.getPort()), 98 remoteConverter, (RemoteTypeManager)typeManager, idGenerator, blob StoreAccessFactory);101 remoteConverter, (RemoteTypeManager)typeManager, idGenerator, blobManager); 99 102 remoteConverter.setRepository(repository); 100 103 ((RemoteTypeManager)typeManager).start(); -
trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/AvroTypeManagerRecordTypeTest.java
r4611 r4621 27 27 import org.junit.Before; 28 28 import org.junit.BeforeClass; 29 import org.lilyproject.repository.api.BlobManager; 29 30 import org.lilyproject.repository.api.BlobStoreAccessFactory; 30 31 import org.lilyproject.repository.api.Repository; … … 34 35 import org.lilyproject.repository.avro.AvroLilyImpl; 35 36 import org.lilyproject.repository.avro.LilySpecificResponder; 37 import org.lilyproject.repository.impl.BlobManagerImpl; 36 38 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 37 39 import org.lilyproject.repository.impl.HBaseRepository; … … 44 46 import org.lilyproject.rowlog.api.RowLogConfig; 45 47 import org.lilyproject.rowlog.api.RowLogConfigurationManager; 46 import org.lilyproject.rowlog.api.RowLogException;47 48 import org.lilyproject.rowlog.api.RowLogShard; 48 49 import org.lilyproject.rowlog.impl.RowLogConfigurationManagerImpl; … … 54 55 import org.lilyproject.util.hbase.HBaseTableFactoryImpl; 55 56 import org.lilyproject.util.hbase.LilyHBaseSchema; 57 import org.lilyproject.util.hbase.LilyHBaseSchema.RecordCf; 56 58 import org.lilyproject.util.io.Closer; 57 59 import org.lilyproject.util.zookeeper.StateWatchingZooKeeper; 58 60 import org.lilyproject.util.zookeeper.ZkUtil; 59 61 import org.lilyproject.util.zookeeper.ZooKeeperItf; 60 import static org.lilyproject.util.hbase.LilyHBaseSchema.*;61 62 62 63 /** … … 96 97 DFSBlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 97 98 BlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 99 BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory); 98 100 setupWal(); 99 serverRepository = new HBaseRepository(serverTypeManager, idGenerator, blobStoreAccessFactory, wal, configuration, hbaseTableFactory);101 serverRepository = new HBaseRepository(serverTypeManager, idGenerator, wal, configuration, hbaseTableFactory, blobManager); 100 102 101 103 AvroConverter serverConverter = new AvroConverter(); … … 110 112 remoteConverter, idGenerator, zooKeeper); 111 113 Repository repository = new RemoteRepository(new InetSocketAddress(lilyServer.getPort()), 112 remoteConverter, (RemoteTypeManager)typeManager, idGenerator, blob StoreAccessFactory);114 remoteConverter, (RemoteTypeManager)typeManager, idGenerator, blobManager); 113 115 remoteConverter.setRepository(repository); 114 116 ((RemoteTypeManager)typeManager).start(); -
trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/BlobStoreTest.java
r4611 r4621 17 17 18 18 19 import org.apache.hadoop.fs.Path;20 19 import org.junit.After; 21 20 import org.junit.AfterClass; 22 21 import org.junit.Before; 23 22 import org.junit.BeforeClass; 24 import org.lilyproject.repository.api.Blob StoreAccess;23 import org.lilyproject.repository.api.BlobManager; 25 24 import org.lilyproject.repository.api.IdGenerator; 26 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 27 import org.lilyproject.repository.impl.HBaseBlobStoreAccess; 25 import org.lilyproject.repository.impl.BlobStoreAccessRegistry; 28 26 import org.lilyproject.repository.impl.HBaseRepository; 29 27 import org.lilyproject.repository.impl.HBaseTypeManager; 30 28 import org.lilyproject.repository.impl.IdGeneratorImpl; 31 import org.lilyproject.repository.impl.InlineBlobStoreAccess;32 import org.lilyproject.repository.impl.SizeBasedBlobStoreAccessFactory;33 import org.lilyproject.testfw.HBaseProxy;34 29 import org.lilyproject.testfw.TestHelper; 35 30 import org.lilyproject.util.hbase.HBaseTableFactoryImpl; … … 39 34 public class BlobStoreTest extends AbstractBlobStoreTest { 40 35 41 private final static HBaseProxy HBASE_PROXY = new HBaseProxy();42 36 @BeforeClass 43 37 public static void setUpBeforeClass() throws Exception { … … 49 43 hbaseTableFactory = new HBaseTableFactoryImpl(configuration); 50 44 typeManager = new HBaseTypeManager(idGenerator, configuration, zooKeeper, hbaseTableFactory); 51 BlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs"));52 BlobStoreAccess hbaseBlobStoreAccess = new HBaseBlobStoreAccess(configuration);53 BlobStoreAccess inlineBlobStoreAccess = new InlineBlobStoreAccess();54 SizeBasedBlobStoreAccessFactory factory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess);55 factory.addBlobStoreAccess(50, inlineBlobStoreAccess);56 factory.addBlobStoreAccess(1024, hbaseBlobStoreAccess);57 45 setupWal(); 58 repository = new HBaseRepository(typeManager, idGenerator, factory, wal, configuration, hbaseTableFactory); 46 BlobManager blobManager = setupBlobManager(); 47 repository = new HBaseRepository(typeManager, idGenerator, wal, configuration, hbaseTableFactory, blobManager); 48 // Create a blobStoreAccessRegistry for testing purposes 49 testBlobStoreAccessRegistry = new BlobStoreAccessRegistry(blobManager); 50 testBlobStoreAccessRegistry.setBlobStoreAccessFactory(blobStoreAccessFactory); 59 51 } 60 52 -
trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/HBaseRepositoryTest.java
r4611 r4621 23 23 import org.junit.BeforeClass; 24 24 import org.junit.Test; 25 import org.lilyproject.repository.api.BlobManager; 25 26 import org.lilyproject.repository.api.BlobStoreAccessFactory; 26 27 import org.lilyproject.repository.api.Record; 27 28 import org.lilyproject.repository.api.TypeManager; 29 import org.lilyproject.repository.impl.BlobManagerImpl; 28 30 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 29 31 import org.lilyproject.repository.impl.HBaseRepository; … … 51 53 DFSBlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 52 54 blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 55 BlobManager blobManager = new BlobManagerImpl(hbaseTableFactory,blobStoreAccessFactory); 53 56 setupWal(); 54 repository = new HBaseRepository(typeManager, idGenerator, blobStoreAccessFactory, wal, configuration, hbaseTableFactory);57 repository = new HBaseRepository(typeManager, idGenerator, wal, configuration, hbaseTableFactory, blobManager); 55 58 setupTypes(); 56 59 setupMessageQueue(); -
trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/RemoteBlobStoreTest.java
r4611 r4621 21 21 import org.apache.avro.ipc.NettyServer; 22 22 import org.apache.avro.ipc.Server; 23 import org.apache.hadoop.fs.Path;24 23 import org.junit.After; 25 24 import org.junit.AfterClass; 26 25 import org.junit.Before; 27 26 import org.junit.BeforeClass; 28 import org.lilyproject.repository.api.Blob StoreAccess;27 import org.lilyproject.repository.api.BlobManager; 29 28 import org.lilyproject.repository.api.TypeManager; 30 29 import org.lilyproject.repository.avro.AvroConverter; … … 32 31 import org.lilyproject.repository.avro.AvroLilyImpl; 33 32 import org.lilyproject.repository.avro.LilySpecificResponder; 34 import org.lilyproject.repository.impl.DFSBlobStoreAccess; 35 import org.lilyproject.repository.impl.HBaseBlobStoreAccess; 33 import org.lilyproject.repository.impl.BlobStoreAccessRegistry; 36 34 import org.lilyproject.repository.impl.HBaseRepository; 37 35 import org.lilyproject.repository.impl.HBaseTypeManager; 38 36 import org.lilyproject.repository.impl.IdGeneratorImpl; 39 import org.lilyproject.repository.impl.InlineBlobStoreAccess;40 37 import org.lilyproject.repository.impl.RemoteRepository; 41 38 import org.lilyproject.repository.impl.RemoteTypeManager; 42 import org.lilyproject.repository.impl.SizeBasedBlobStoreAccessFactory;43 import org.lilyproject.testfw.HBaseProxy;44 39 import org.lilyproject.testfw.TestHelper; 45 40 import org.lilyproject.util.hbase.HBaseTableFactoryImpl; … … 49 44 public class RemoteBlobStoreTest extends AbstractBlobStoreTest { 50 45 51 private final static HBaseProxy HBASE_PROXY = new HBaseProxy();46 52 47 private static HBaseRepository serverRepository; 53 48 private static Server lilyServer; … … 64 59 hbaseTableFactory = new HBaseTableFactoryImpl(configuration); 65 60 serverTypeManager = new HBaseTypeManager(idGenerator, configuration, zooKeeper, hbaseTableFactory); 66 BlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 67 BlobStoreAccess hbaseBlobStoreAccess = new HBaseBlobStoreAccess(configuration); 68 BlobStoreAccess inlineBlobStoreAccess = new InlineBlobStoreAccess(); 69 SizeBasedBlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 70 blobStoreAccessFactory.addBlobStoreAccess(50, inlineBlobStoreAccess); 71 blobStoreAccessFactory.addBlobStoreAccess(1024, hbaseBlobStoreAccess); 61 BlobManager serverBlobManager = setupBlobManager(); 72 62 setupWal(); 73 serverRepository = new HBaseRepository(serverTypeManager, idGenerator, blobStoreAccessFactory, wal, configuration, hbaseTableFactory); 63 serverRepository = new HBaseRepository(serverTypeManager, idGenerator, wal, configuration, hbaseTableFactory, serverBlobManager); 64 // Create a blobStoreAccessRegistry for testing purposes 65 testBlobStoreAccessRegistry = new BlobStoreAccessRegistry(serverBlobManager); 66 testBlobStoreAccessRegistry.setBlobStoreAccessFactory(blobStoreAccessFactory); 74 67 75 68 AvroConverter serverConverter = new AvroConverter(); … … 82 75 typeManager = new RemoteTypeManager(new InetSocketAddress(lilyServer.getPort()), 83 76 remoteConverter, idGenerator, zooKeeper); 77 BlobManager blobManager = setupBlobManager(); 84 78 repository = new RemoteRepository(new InetSocketAddress(lilyServer.getPort()), remoteConverter, 85 (RemoteTypeManager)typeManager, idGenerator, blobStoreAccessFactory); 86 repository.registerBlobStoreAccess(dfsBlobStoreAccess); 87 repository.registerBlobStoreAccess(hbaseBlobStoreAccess); 88 repository.registerBlobStoreAccess(inlineBlobStoreAccess); 79 (RemoteTypeManager)typeManager, idGenerator, blobManager); 89 80 remoteConverter.setRepository(repository); 90 81 ((RemoteTypeManager)typeManager).start(); -
trunk/cr/repository/impl/src/test/java/org/lilyproject/repository/impl/test/ValueTypeTest.java
r4611 r4621 23 23 24 24 import org.apache.hadoop.conf.Configuration; 25 import org.apache.hadoop.fs.Path;26 25 import org.apache.hadoop.hbase.util.Bytes; 27 26 import org.joda.time.DateTime; … … 43 42 import org.lilyproject.repository.api.Scope; 44 43 import org.lilyproject.repository.impl.AbstractTypeManager; 45 import org.lilyproject.repository.impl. DFSBlobStoreAccess;44 import org.lilyproject.repository.impl.BlobManagerImpl; 46 45 import org.lilyproject.repository.impl.HBaseRepository; 47 46 import org.lilyproject.repository.impl.HBaseTypeManager; 48 47 import org.lilyproject.repository.impl.IdGeneratorImpl; 48 import org.lilyproject.repository.impl.InlineBlobStoreAccess; 49 49 import org.lilyproject.repository.impl.SizeBasedBlobStoreAccessFactory; 50 50 import org.lilyproject.rowlog.api.RowLog; … … 85 85 hbaseTableFactory = new HBaseTableFactoryImpl(HBASE_PROXY.getConf()); 86 86 typeManager = new HBaseTypeManager(idGenerator, HBASE_PROXY.getConf(), zooKeeper, hbaseTableFactory); 87 DFSBlobStoreAccess dfsBlobStoreAccess = new DFSBlobStoreAccess(HBASE_PROXY.getBlobFS(), new Path("/lily/blobs")); 88 BlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(dfsBlobStoreAccess); 89 repository = new HBaseRepository(typeManager, idGenerator, blobStoreAccessFactory, initializeWal(HBASE_PROXY.getConf()), HBASE_PROXY.getConf(), hbaseTableFactory); 90 87 InlineBlobStoreAccess inlineBlobStoreAccess = new InlineBlobStoreAccess(); 88 BlobStoreAccessFactory blobStoreAccessFactory = new SizeBasedBlobStoreAccessFactory(inlineBlobStoreAccess); 89 repository = new HBaseRepository(typeManager, idGenerator, initializeWal(HBASE_PROXY.getConf()), HBASE_PROXY.getConf(), hbaseTableFactory, new BlobManagerImpl(hbaseTableFactory, blobStoreAccessFactory)); 91 90 } 92 91 … … 174 173 Blob blob2 = new Blob(Bytes.toBytes("anotherKey"), "image/jpeg", Long.MIN_VALUE, "images/image.jpg"); 175 174 Blob blob3 = new Blob("text/plain", Long.valueOf(0), null); 175 176 176 runValueTypeTests("blobTypeId", "BLOB", blob1, blob2, blob3); 177 177 } -
trunk/global/hbase-util/src/main/java/org/lilyproject/util/hbase/HBaseTableFactory.java
r4614 r4621 23 23 */ 24 24 TableConfig getTableConfig(byte[] tableName); 25 25 26 26 /** 27 27 * -
trunk/global/hbase-util/src/main/java/org/lilyproject/util/hbase/HBaseTableFactoryImpl.java
r4614 r4621 23 23 import org.apache.commons.logging.LogFactory; 24 24 import org.apache.hadoop.conf.Configuration; 25 import org.apache.hadoop.hbase.*; 25 import org.apache.hadoop.hbase.HTableDescriptor; 26 import org.apache.hadoop.hbase.TableExistsException; 27 import org.apache.hadoop.hbase.TableNotFoundException; 26 28 import org.apache.hadoop.hbase.client.HBaseAdmin; 27 29 import org.apache.hadoop.hbase.client.HTableInterface; … … 75 77 return new LocalHTable(configuration, tableDescriptor.getName()); 76 78 } 79 77 80 78 81 public TableConfig getTableConfig(byte[] tableName) { … … 86 89 return splitKeys; 87 90 } 88 89 91 } -
trunk/global/hbase-util/src/main/java/org/lilyproject/util/hbase/LilyHBaseSchema.java
r4611 r4621 38 38 false, true, HConstants.FOREVER, HColumnDescriptor.DEFAULT_BLOOMFILTER)); 39 39 } 40 41 private static final HTableDescriptor blobIncubatorDescriptor; 42 43 static { 44 blobIncubatorDescriptor = new HTableDescriptor(Table.BLOBINCUBATOR.bytes); 45 blobIncubatorDescriptor.addFamily(new HColumnDescriptor(BlobIncubatorCf.REF.bytes)); 46 } 40 47 41 48 public static HTableInterface getRecordTable(HBaseTableFactory tableFactory) throws IOException { … … 46 53 return tableFactory.getTable(typeTableDescriptor); 47 54 } 55 56 public static HTableInterface getBlobIncubatorTable(HBaseTableFactory tableFactory) throws IOException { 57 return tableFactory.getTable(blobIncubatorDescriptor); 58 } 48 59 49 60 public static enum Table { 50 61 RECORD("record"), 51 TYPE("type"); 62 TYPE("type"), 63 BLOBINCUBATOR("blobincubator"); 52 64 53 65 public final byte[] bytes; … … 139 151 } 140 152 } 153 154 /** 155 * Column families in the blob incubator table. 156 */ 157 public static enum BlobIncubatorCf { 158 REF("ref"); 159 160 public final byte[] bytes; 161 public final String name; 162 163 BlobIncubatorCf(String name) { 164 this.name = name; 165 this.bytes = Bytes.toBytes(name); 166 } 167 } 168 169 /** 170 * Columns in the blob incubator table. 171 */ 172 public static enum BlobIncubatorColumn { 173 RECORD("record"), FIELD("field"); 174 175 public final byte[] bytes; 176 public final String name; 177 178 BlobIncubatorColumn(String name) { 179 this.name = name; 180 this.bytes = Bytes.toBytes(name); 181 } 182 } 141 183 }
Note: See TracChangeset
for help on using the changeset viewer.