Changeset 4113
- Timestamp:
- 2010-07-02 09:12:58 (3 years ago)
- Location:
- projects/lily/trunk
- Files:
-
- 4 edited
-
indexer/src/main/java/org/lilycms/indexer/IndexUpdater.java (modified) (4 diffs)
-
indexer/src/test/java/org/lilycms/indexer/test/IndexerTest.java (modified) (3 diffs)
-
linkindex/src/main/java/org/lilycms/linkindex/LinkIndexUpdater.java (modified) (3 diffs)
-
linkindex/src/test/java/org/lilycms/linkindex/test/LinkIndexTest.java (modified) (1 diff)
Legend:
- Unmodified
- Added
- Removed
-
projects/lily/trunk/indexer/src/main/java/org/lilycms/indexer/IndexUpdater.java
r4110 r4113 7 7 import org.lilycms.indexer.conf.IndexField; 8 8 import org.lilycms.linkindex.LinkIndex; 9 import org.lilycms.linkindex.LinkIndexUpdater; 9 10 import org.lilycms.repository.api.*; 10 11 import org.lilycms.repository.api.FieldTypeNotFoundException; … … 32 33 private IndexerListener indexerListener = new IndexerListener(); 33 34 private Indexer indexer; 35 private LinkIndexUpdater linkIndexUpdater; 34 36 35 37 private Log log = LogFactory.getLog(getClass()); … … 41 43 this.typeManager = repository.getTypeManager(); 42 44 this.linkIndex = linkIndex; 45 this.linkIndexUpdater = new LinkIndexUpdater(repository, linkIndex); 43 46 44 47 rowLog.registerConsumer(indexerListener); … … 58 61 RecordEvent event = new RecordEvent(msg.getPayload()); 59 62 RecordId recordId = repository.getIdGenerator().fromBytes(msg.getRowKey()); 63 64 // First, bring the link index up to date. 65 // We always make sure the link index is up to date with the current record state, before updating 66 // the index entry of the record. This way, from the moment the index entry is inserted, any 67 // denormalized data contained in it will be correctly updated. 68 linkIndexUpdater.update(recordId, event); 60 69 61 70 if (log.isDebugEnabled()) { -
projects/lily/trunk/indexer/src/test/java/org/lilycms/indexer/test/IndexerTest.java
r4110 r4113 11 11 import org.apache.commons.logging.Log; 12 12 import org.apache.commons.logging.LogFactory; 13 import org.apache.hadoop.hbase.TableExistsException;14 13 import org.apache.solr.client.solrj.SolrQuery; 15 14 import org.apache.solr.client.solrj.SolrServer; … … 29 28 import org.lilycms.indexer.conf.IndexerConfBuilder; 30 29 import org.lilycms.linkindex.LinkIndex; 31 import org.lilycms.linkindex.LinkIndexUpdater;32 30 import org.lilycms.repository.api.*; 33 31 import org.lilycms.repository.impl.*; … … 103 101 IndexManager indexManager = new IndexManager(HBASE_PROXY.getConf()); 104 102 105 try { LinkIndex.createIndexes(indexManager); } catch (TableExistsException e) { }103 LinkIndex.createIndexes(indexManager); 106 104 LinkIndex linkIndex = new LinkIndex(indexManager, repository); 107 new LinkIndexUpdater(repository, typeManager, linkIndex, repository.getWal());108 105 109 106 // Field types should exist before the indexer conf is loaded -
projects/lily/trunk/linkindex/src/main/java/org/lilycms/linkindex/LinkIndexUpdater.java
r4103 r4113 33 33 private Log log = LogFactory.getLog(getClass()); 34 34 35 public LinkIndexUpdater(Repository repository, TypeManager typeManager, LinkIndex linkIndex, RowLog rowLog) {35 public LinkIndexUpdater(Repository repository, LinkIndex linkIndex) { 36 36 this.repository = repository; 37 this.typeManager = typeManager;37 this.typeManager = repository.getTypeManager(); 38 38 this.linkIndex = linkIndex; 39 this.rowLog = rowLog; 40 41 rowLog.registerConsumer(listener); 39 } 40 41 /** 42 * This constructor registers a listener with the rowlog. 43 * 44 * <b>IMPORTANT: this should not be used, the link index update is now triggered as part 45 * of the indexer.</b> 46 */ 47 public LinkIndexUpdater(Repository repository, LinkIndex linkIndex, RowLog rowLog) { 48 this.repository = repository; 49 this.typeManager = repository.getTypeManager(); 50 this.linkIndex = linkIndex; 51 52 if (rowLog != null) { 53 this.rowLog = rowLog; 54 rowLog.registerConsumer(listener); 55 } 42 56 } 43 57 … … 47 61 48 62 private class MyListener implements RowLogMessageConsumer { 63 64 public int getId() { 65 return 102; 66 } 67 68 public boolean processMessage(RowLogMessage msg) { 69 try { 70 RecordId recordId = repository.getIdGenerator().fromBytes(msg.getRowKey()); 71 RecordEvent recordEvent = new RecordEvent(msg.getPayload()); 72 update(recordId, recordEvent); 73 } catch (Exception e) { 74 log.error("Error processing event in LinkIndexUpdater", e); 75 } 76 return true; 77 } 78 } 79 80 public void update(RecordId recordId, RecordEvent recordEvent) { 49 81 // This is the algorithm for updating the LinkIndex when a record changes. 50 82 // … … 61 93 // 62 94 63 public int getId() { 64 return 102; 65 } 66 67 public boolean processMessage(RowLogMessage msg) { 68 try { 69 RecordEvent recordEvent = new RecordEvent(msg.getPayload()); 70 RecordId recordId = repository.getIdGenerator().fromBytes(msg.getRowKey()); 71 72 if (recordEvent.getType().equals(DELETE)) { 73 // Delete everything from the link index for this record, thus for all vtags 95 try { 96 if (recordEvent.getType().equals(DELETE)) { 97 // Delete everything from the link index for this record, thus for all vtags 98 linkIndex.deleteLinks(recordId); 99 if (log.isDebugEnabled()) { 100 log.debug("Record " + recordId + " : delete event : deleted extracted links."); 101 } 102 } else if (recordEvent.getType().equals(CREATE) || recordEvent.getType().equals(UPDATE)) { 103 104 // If the record is not new but its first version was created now, there might be existing 105 // entries for the @@versionless vtag 106 if (recordEvent.getType() == RecordEvent.Type.UPDATE && recordEvent.getVersionCreated() == 1) { 107 linkIndex.deleteLinks(recordId, VersionTag.VERSIONLESS_TAG); 108 } 109 110 IdRecord record; 111 try { 112 record = repository.readWithIds(recordId, null, null); 113 } catch (RecordNotFoundException e) { 114 // record not found: delete all links for all vtags 74 115 linkIndex.deleteLinks(recordId); 75 116 if (log.isDebugEnabled()) { 76 log.debug("Record " + recordId + " : delete event : deleted extracted links."); 77 } 78 } else if (recordEvent.getType().equals(CREATE) || recordEvent.getType().equals(UPDATE)) { 79 80 // If the record is not new but its first version was created now, there might be existing 81 // entries for the @@versionless vtag 82 if (recordEvent.getType() == RecordEvent.Type.UPDATE && recordEvent.getVersionCreated() == 1) { 83 linkIndex.deleteLinks(recordId, VersionTag.VERSIONLESS_TAG); 84 } 85 86 IdRecord record; 87 try { 88 record = repository.readWithIds(recordId, null, null); 89 } catch (RecordNotFoundException e) { 90 // record not found: delete all links for all vtags 91 linkIndex.deleteLinks(recordId); 92 if (log.isDebugEnabled()) { 93 log.debug("Record " + recordId + " : does not exist : deleted extracted links."); 94 } 95 return true; 96 } 97 boolean hasVersions = record.getVersion() != null; 98 99 if (hasVersions) { 100 Map<String, Long> vtags = VersionTag.getTagsById(record, typeManager); 101 Map<Long, Set<String>> tagsByVersion = VersionTag.tagsByVersion(vtags); 102 103 // 104 // First find out for what vtags we need to re-perform the link extraction 105 // 106 Set<String> vtagsToProcess = new HashSet<String>(); 107 108 // Modified vtag fields 109 Set<String> changedVTags = VersionTag.filterVTagFields(recordEvent.getUpdatedFields(), typeManager); 110 vtagsToProcess.addAll(changedVTags); 111 112 // The vtags of the created/modified version, if any 113 Set<String> vtagsOfChangedVersion = null; 114 if (recordEvent.getVersionCreated() != -1) { 115 vtagsOfChangedVersion = tagsByVersion.get(recordEvent.getVersionCreated()); 116 } else if (recordEvent.getVersionUpdated() != -1) { 117 vtagsOfChangedVersion = tagsByVersion.get(recordEvent.getVersionUpdated()); 118 } 119 120 if (vtagsOfChangedVersion != null) { 121 vtagsToProcess.addAll(vtagsOfChangedVersion); 122 } 123 124 // 125 // For each of the vtags, perform the link extraction 126 // 127 Map<Long, Set<FieldedLink>> cache = new HashMap<Long, Set<FieldedLink>>(); 128 for (String vtag : vtagsToProcess) { 129 if (!vtags.containsKey(vtag)) { 130 // The vtag is not defined on the document: it is a deleted vtag, delete the 131 // links corresponding to it 132 linkIndex.deleteLinks(recordId, vtag); 133 if (log.isDebugEnabled()) { 134 log.debug(String.format("Record %1$s, vtag %2$s : deleted extracted links", 135 record.getId(), safeLoadTagName(vtag))); 136 } 117 log.debug("Record " + recordId + " : does not exist : deleted extracted links."); 118 } 119 return; 120 } 121 boolean hasVersions = record.getVersion() != null; 122 123 if (hasVersions) { 124 Map<String, Long> vtags = VersionTag.getTagsById(record, typeManager); 125 Map<Long, Set<String>> tagsByVersion = VersionTag.tagsByVersion(vtags); 126 127 // 128 // First find out for what vtags we need to re-perform the link extraction 129 // 130 Set<String> vtagsToProcess = new HashSet<String>(); 131 132 // Modified vtag fields 133 Set<String> changedVTags = VersionTag.filterVTagFields(recordEvent.getUpdatedFields(), typeManager); 134 vtagsToProcess.addAll(changedVTags); 135 136 // The vtags of the created/modified version, if any 137 Set<String> vtagsOfChangedVersion = null; 138 if (recordEvent.getVersionCreated() != -1) { 139 vtagsOfChangedVersion = tagsByVersion.get(recordEvent.getVersionCreated()); 140 } else if (recordEvent.getVersionUpdated() != -1) { 141 vtagsOfChangedVersion = tagsByVersion.get(recordEvent.getVersionUpdated()); 142 } 143 144 if (vtagsOfChangedVersion != null) { 145 vtagsToProcess.addAll(vtagsOfChangedVersion); 146 } 147 148 // 149 // For each of the vtags, perform the link extraction 150 // 151 Map<Long, Set<FieldedLink>> cache = new HashMap<Long, Set<FieldedLink>>(); 152 for (String vtag : vtagsToProcess) { 153 if (!vtags.containsKey(vtag)) { 154 // The vtag is not defined on the document: it is a deleted vtag, delete the 155 // links corresponding to it 156 linkIndex.deleteLinks(recordId, vtag); 157 if (log.isDebugEnabled()) { 158 log.debug(String.format("Record %1$s, vtag %2$s : deleted extracted links", 159 record.getId(), safeLoadTagName(vtag))); 160 } 161 } else { 162 // Since one version might have multiple vtags, we keep a little cache to avoid 163 // extracting the links from the same version twice. 164 long version = vtags.get(vtag); 165 Set<FieldedLink> links; 166 if (cache.containsKey(version)) { 167 links = cache.get(version); 137 168 } else { 138 // Since one version might have multiple vtags, we keep a little cache to avoid 139 // extracting the links from the same version twice. 140 long version = vtags.get(vtag); 141 Set<FieldedLink> links; 142 if (cache.containsKey(version)) { 143 links = cache.get(version); 144 } else { 145 links = extractLinks(recordId, version); 146 cache.put(version, links); 147 } 148 linkIndex.updateLinks(recordId, vtag, links); 149 if (log.isDebugEnabled()) { 150 log.debug(String.format("Record %1$s, vtag %2$s : extracted links count : %3$s", 151 record.getId(), safeLoadTagName(vtag), links.size())); 152 } 169 links = extractLinks(recordId, version); 170 cache.put(version, links); 171 } 172 linkIndex.updateLinks(recordId, vtag, links); 173 if (log.isDebugEnabled()) { 174 log.debug(String.format("Record %1$s, vtag %2$s : extracted links count : %3$s", 175 record.getId(), safeLoadTagName(vtag), links.size())); 153 176 } 154 177 } 155 } else { 156 // The record has no versions 157 Set<FieldedLink> links = extractLinks(recordId, null); 158 linkIndex.updateLinks(recordId, VersionTag.VERSIONLESS_TAG, links); 159 if (log.isDebugEnabled()) { 160 log.debug(String.format("Record %1$s, vtag %2$s : extracted links count : %3$s", 161 record.getId(), VersionTag.VERSIONLESS_TAG, links.size())); 162 } 163 } 164 165 } 166 } catch (Exception e) { 167 log.error("Error processing event in LinkIndexUpdater", e); 168 } 169 return true; 178 } 179 } else { 180 // The record has no versions 181 Set<FieldedLink> links = extractLinks(recordId, null); 182 linkIndex.updateLinks(recordId, VersionTag.VERSIONLESS_TAG, links); 183 if (log.isDebugEnabled()) { 184 log.debug(String.format("Record %1$s, vtag %2$s : extracted links count : %3$s", 185 record.getId(), VersionTag.VERSIONLESS_TAG, links.size())); 186 } 187 } 188 } 189 } catch (Exception e) { 190 log.error("Error processing event in LinkIndexUpdater", e); 170 191 } 171 192 } -
projects/lily/trunk/linkindex/src/test/java/org/lilycms/linkindex/test/LinkIndexTest.java
r4090 r4113 102 102 @Test 103 103 public void testLinkIndexUpdater() throws Exception { 104 LinkIndexUpdater linkIndexUpdater = new LinkIndexUpdater(repository, typeManager,linkIndex, repository.getWal());104 LinkIndexUpdater linkIndexUpdater = new LinkIndexUpdater(repository, linkIndex, repository.getWal()); 105 105 106 106 FieldType nonVersionedFt = typeManager.newFieldType(typeManager.getValueType("LINK", false, false),
Note: See TracChangeset
for help on using the changeset viewer.