Changeset 4113


Ignore:
Timestamp:
2010-07-02 09:12:58 (3 years ago)
Author:
bruno
Message:

Trigger updating of link index as part of the index updater, rather than registering this as its own rowlog listener, to assure that link index is brought up to date before creating/updating index entries of the record.

Location:
projects/lily/trunk
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • projects/lily/trunk/indexer/src/main/java/org/lilycms/indexer/IndexUpdater.java

    r4110 r4113  
    77import org.lilycms.indexer.conf.IndexField; 
    88import org.lilycms.linkindex.LinkIndex; 
     9import org.lilycms.linkindex.LinkIndexUpdater; 
    910import org.lilycms.repository.api.*; 
    1011import org.lilycms.repository.api.FieldTypeNotFoundException; 
     
    3233    private IndexerListener indexerListener = new IndexerListener(); 
    3334    private Indexer indexer; 
     35    private LinkIndexUpdater linkIndexUpdater; 
    3436 
    3537    private Log log = LogFactory.getLog(getClass()); 
     
    4143        this.typeManager = repository.getTypeManager(); 
    4244        this.linkIndex = linkIndex; 
     45        this.linkIndexUpdater = new LinkIndexUpdater(repository, linkIndex); 
    4346 
    4447        rowLog.registerConsumer(indexerListener); 
     
    5861                RecordEvent event = new RecordEvent(msg.getPayload()); 
    5962                RecordId recordId = repository.getIdGenerator().fromBytes(msg.getRowKey()); 
     63 
     64                // First, bring the link index up to date. 
     65                // We always make sure the link index is up to date with the current record state, before updating 
     66                // the index entry of the record. This way, from the moment the index entry is inserted, any 
     67                // denormalized data contained in it will be correctly updated. 
     68                linkIndexUpdater.update(recordId, event); 
    6069 
    6170                if (log.isDebugEnabled()) { 
  • projects/lily/trunk/indexer/src/test/java/org/lilycms/indexer/test/IndexerTest.java

    r4110 r4113  
    1111import org.apache.commons.logging.Log; 
    1212import org.apache.commons.logging.LogFactory; 
    13 import org.apache.hadoop.hbase.TableExistsException; 
    1413import org.apache.solr.client.solrj.SolrQuery; 
    1514import org.apache.solr.client.solrj.SolrServer; 
     
    2928import org.lilycms.indexer.conf.IndexerConfBuilder; 
    3029import org.lilycms.linkindex.LinkIndex; 
    31 import org.lilycms.linkindex.LinkIndexUpdater; 
    3230import org.lilycms.repository.api.*; 
    3331import org.lilycms.repository.impl.*; 
     
    103101        IndexManager indexManager = new IndexManager(HBASE_PROXY.getConf()); 
    104102 
    105         try { LinkIndex.createIndexes(indexManager); } catch (TableExistsException e) { } 
     103        LinkIndex.createIndexes(indexManager); 
    106104        LinkIndex linkIndex = new LinkIndex(indexManager, repository); 
    107         new LinkIndexUpdater(repository, typeManager, linkIndex, repository.getWal()); 
    108105 
    109106        // Field types should exist before the indexer conf is loaded 
  • projects/lily/trunk/linkindex/src/main/java/org/lilycms/linkindex/LinkIndexUpdater.java

    r4103 r4113  
    3333    private Log log = LogFactory.getLog(getClass()); 
    3434 
    35     public LinkIndexUpdater(Repository repository, TypeManager typeManager, LinkIndex linkIndex, RowLog rowLog) { 
     35    public LinkIndexUpdater(Repository repository, LinkIndex linkIndex) { 
    3636        this.repository = repository; 
    37         this.typeManager = typeManager; 
     37        this.typeManager = repository.getTypeManager(); 
    3838        this.linkIndex = linkIndex; 
    39         this.rowLog = rowLog; 
    40  
    41         rowLog.registerConsumer(listener); 
     39    } 
     40 
     41    /** 
     42     * This constructor registers a listener with the rowlog. 
     43     * 
     44     * <b>IMPORTANT: this should not be used, the link index update is now triggered as part 
     45     * of the indexer.</b> 
     46     */ 
     47    public LinkIndexUpdater(Repository repository, LinkIndex linkIndex, RowLog rowLog) { 
     48        this.repository = repository; 
     49        this.typeManager = repository.getTypeManager(); 
     50        this.linkIndex = linkIndex; 
     51 
     52        if (rowLog != null) { 
     53            this.rowLog = rowLog; 
     54            rowLog.registerConsumer(listener); 
     55        } 
    4256    } 
    4357 
     
    4761 
    4862    private class  MyListener implements RowLogMessageConsumer { 
     63 
     64        public int getId() { 
     65            return 102; 
     66        } 
     67 
     68        public boolean processMessage(RowLogMessage msg) { 
     69            try { 
     70                RecordId recordId = repository.getIdGenerator().fromBytes(msg.getRowKey()); 
     71                RecordEvent recordEvent = new RecordEvent(msg.getPayload()); 
     72                update(recordId, recordEvent); 
     73            } catch (Exception e) { 
     74                log.error("Error processing event in LinkIndexUpdater", e); 
     75            } 
     76            return true; 
     77        } 
     78    } 
     79 
     80    public void update(RecordId recordId, RecordEvent recordEvent) { 
    4981        // This is the algorithm for updating the LinkIndex when a record changes. 
    5082        // 
     
    6193        // 
    6294 
    63         public int getId() { 
    64             return 102; 
    65         } 
    66  
    67         public boolean processMessage(RowLogMessage msg) { 
    68             try { 
    69                 RecordEvent recordEvent = new RecordEvent(msg.getPayload()); 
    70                 RecordId recordId = repository.getIdGenerator().fromBytes(msg.getRowKey()); 
    71  
    72                 if (recordEvent.getType().equals(DELETE)) { 
    73                     // Delete everything from the link index for this record, thus for all vtags 
     95        try { 
     96            if (recordEvent.getType().equals(DELETE)) { 
     97                // Delete everything from the link index for this record, thus for all vtags 
     98                linkIndex.deleteLinks(recordId); 
     99                if (log.isDebugEnabled()) { 
     100                    log.debug("Record " + recordId + " : delete event : deleted extracted links."); 
     101                } 
     102            } else if (recordEvent.getType().equals(CREATE) || recordEvent.getType().equals(UPDATE)) { 
     103 
     104                // If the record is not new but its first version was created now, there might be existing 
     105                // entries for the @@versionless vtag 
     106                if (recordEvent.getType() == RecordEvent.Type.UPDATE && recordEvent.getVersionCreated() == 1) { 
     107                    linkIndex.deleteLinks(recordId, VersionTag.VERSIONLESS_TAG); 
     108                } 
     109 
     110                IdRecord record; 
     111                try { 
     112                    record = repository.readWithIds(recordId, null, null); 
     113                } catch (RecordNotFoundException e) { 
     114                    // record not found: delete all links for all vtags 
    74115                    linkIndex.deleteLinks(recordId); 
    75116                    if (log.isDebugEnabled()) { 
    76                         log.debug("Record " + recordId + " : delete event : deleted extracted links."); 
    77                     } 
    78                 } else if (recordEvent.getType().equals(CREATE) || recordEvent.getType().equals(UPDATE)) { 
    79  
    80                     // If the record is not new but its first version was created now, there might be existing 
    81                     // entries for the @@versionless vtag 
    82                     if (recordEvent.getType() == RecordEvent.Type.UPDATE && recordEvent.getVersionCreated() == 1) { 
    83                         linkIndex.deleteLinks(recordId, VersionTag.VERSIONLESS_TAG); 
    84                     } 
    85  
    86                     IdRecord record; 
    87                     try { 
    88                         record = repository.readWithIds(recordId, null, null); 
    89                     } catch (RecordNotFoundException e) { 
    90                         // record not found: delete all links for all vtags 
    91                         linkIndex.deleteLinks(recordId); 
    92                         if (log.isDebugEnabled()) { 
    93                             log.debug("Record " + recordId + " : does not exist : deleted extracted links."); 
    94                         } 
    95                         return true; 
    96                     } 
    97                     boolean hasVersions = record.getVersion() != null; 
    98  
    99                     if (hasVersions) { 
    100                         Map<String, Long> vtags = VersionTag.getTagsById(record, typeManager); 
    101                         Map<Long, Set<String>> tagsByVersion = VersionTag.tagsByVersion(vtags); 
    102  
    103                         // 
    104                         // First find out for what vtags we need to re-perform the link extraction 
    105                         // 
    106                         Set<String> vtagsToProcess = new HashSet<String>(); 
    107  
    108                         // Modified vtag fields 
    109                         Set<String> changedVTags = VersionTag.filterVTagFields(recordEvent.getUpdatedFields(), typeManager); 
    110                         vtagsToProcess.addAll(changedVTags); 
    111  
    112                         // The vtags of the created/modified version, if any 
    113                         Set<String> vtagsOfChangedVersion = null; 
    114                         if (recordEvent.getVersionCreated() != -1) { 
    115                             vtagsOfChangedVersion = tagsByVersion.get(recordEvent.getVersionCreated()); 
    116                         } else if (recordEvent.getVersionUpdated() != -1) { 
    117                             vtagsOfChangedVersion = tagsByVersion.get(recordEvent.getVersionUpdated()); 
    118                         } 
    119  
    120                         if (vtagsOfChangedVersion != null) { 
    121                             vtagsToProcess.addAll(vtagsOfChangedVersion); 
    122                         } 
    123  
    124                         // 
    125                         // For each of the vtags, perform the link extraction 
    126                         // 
    127                         Map<Long, Set<FieldedLink>> cache = new HashMap<Long, Set<FieldedLink>>(); 
    128                         for (String vtag : vtagsToProcess) { 
    129                             if (!vtags.containsKey(vtag)) { 
    130                                 // The vtag is not defined on the document: it is a deleted vtag, delete the 
    131                                 // links corresponding to it 
    132                                 linkIndex.deleteLinks(recordId, vtag); 
    133                                 if (log.isDebugEnabled()) { 
    134                                     log.debug(String.format("Record %1$s, vtag %2$s : deleted extracted links", 
    135                                             record.getId(), safeLoadTagName(vtag))); 
    136                                 } 
     117                        log.debug("Record " + recordId + " : does not exist : deleted extracted links."); 
     118                    } 
     119                    return; 
     120                } 
     121                boolean hasVersions = record.getVersion() != null; 
     122 
     123                if (hasVersions) { 
     124                    Map<String, Long> vtags = VersionTag.getTagsById(record, typeManager); 
     125                    Map<Long, Set<String>> tagsByVersion = VersionTag.tagsByVersion(vtags); 
     126 
     127                    // 
     128                    // First find out for what vtags we need to re-perform the link extraction 
     129                    // 
     130                    Set<String> vtagsToProcess = new HashSet<String>(); 
     131 
     132                    // Modified vtag fields 
     133                    Set<String> changedVTags = VersionTag.filterVTagFields(recordEvent.getUpdatedFields(), typeManager); 
     134                    vtagsToProcess.addAll(changedVTags); 
     135 
     136                    // The vtags of the created/modified version, if any 
     137                    Set<String> vtagsOfChangedVersion = null; 
     138                    if (recordEvent.getVersionCreated() != -1) { 
     139                        vtagsOfChangedVersion = tagsByVersion.get(recordEvent.getVersionCreated()); 
     140                    } else if (recordEvent.getVersionUpdated() != -1) { 
     141                        vtagsOfChangedVersion = tagsByVersion.get(recordEvent.getVersionUpdated()); 
     142                    } 
     143 
     144                    if (vtagsOfChangedVersion != null) { 
     145                        vtagsToProcess.addAll(vtagsOfChangedVersion); 
     146                    } 
     147 
     148                    // 
     149                    // For each of the vtags, perform the link extraction 
     150                    // 
     151                    Map<Long, Set<FieldedLink>> cache = new HashMap<Long, Set<FieldedLink>>(); 
     152                    for (String vtag : vtagsToProcess) { 
     153                        if (!vtags.containsKey(vtag)) { 
     154                            // The vtag is not defined on the document: it is a deleted vtag, delete the 
     155                            // links corresponding to it 
     156                            linkIndex.deleteLinks(recordId, vtag); 
     157                            if (log.isDebugEnabled()) { 
     158                                log.debug(String.format("Record %1$s, vtag %2$s : deleted extracted links", 
     159                                        record.getId(), safeLoadTagName(vtag))); 
     160                            } 
     161                        } else { 
     162                            // Since one version might have multiple vtags, we keep a little cache to avoid 
     163                            // extracting the links from the same version twice. 
     164                            long version = vtags.get(vtag); 
     165                            Set<FieldedLink> links; 
     166                            if (cache.containsKey(version)) { 
     167                                links = cache.get(version); 
    137168                            } else { 
    138                                 // Since one version might have multiple vtags, we keep a little cache to avoid 
    139                                 // extracting the links from the same version twice. 
    140                                 long version = vtags.get(vtag); 
    141                                 Set<FieldedLink> links; 
    142                                 if (cache.containsKey(version)) { 
    143                                     links = cache.get(version); 
    144                                 } else { 
    145                                     links = extractLinks(recordId, version); 
    146                                     cache.put(version, links); 
    147                                 } 
    148                                 linkIndex.updateLinks(recordId, vtag, links); 
    149                                 if (log.isDebugEnabled()) { 
    150                                     log.debug(String.format("Record %1$s, vtag %2$s : extracted links count : %3$s", 
    151                                             record.getId(), safeLoadTagName(vtag), links.size())); 
    152                                 } 
     169                                links = extractLinks(recordId, version); 
     170                                cache.put(version, links); 
     171                            } 
     172                            linkIndex.updateLinks(recordId, vtag, links); 
     173                            if (log.isDebugEnabled()) { 
     174                                log.debug(String.format("Record %1$s, vtag %2$s : extracted links count : %3$s", 
     175                                        record.getId(), safeLoadTagName(vtag), links.size())); 
    153176                            } 
    154177                        } 
    155                     } else { 
    156                         // The record has no versions 
    157                         Set<FieldedLink> links = extractLinks(recordId, null); 
    158                         linkIndex.updateLinks(recordId, VersionTag.VERSIONLESS_TAG, links); 
    159                         if (log.isDebugEnabled()) { 
    160                             log.debug(String.format("Record %1$s, vtag %2$s : extracted links count : %3$s", 
    161                                     record.getId(), VersionTag.VERSIONLESS_TAG, links.size())); 
    162                         } 
    163                     } 
    164  
    165                 } 
    166             } catch (Exception e) { 
    167                 log.error("Error processing event in LinkIndexUpdater", e); 
    168             } 
    169             return true; 
     178                    } 
     179                } else { 
     180                    // The record has no versions 
     181                    Set<FieldedLink> links = extractLinks(recordId, null); 
     182                    linkIndex.updateLinks(recordId, VersionTag.VERSIONLESS_TAG, links); 
     183                    if (log.isDebugEnabled()) { 
     184                        log.debug(String.format("Record %1$s, vtag %2$s : extracted links count : %3$s", 
     185                                record.getId(), VersionTag.VERSIONLESS_TAG, links.size())); 
     186                    } 
     187                } 
     188            } 
     189        } catch (Exception e) { 
     190            log.error("Error processing event in LinkIndexUpdater", e); 
    170191        } 
    171192    } 
  • projects/lily/trunk/linkindex/src/test/java/org/lilycms/linkindex/test/LinkIndexTest.java

    r4090 r4113  
    102102    @Test 
    103103    public void testLinkIndexUpdater() throws Exception { 
    104         LinkIndexUpdater linkIndexUpdater = new LinkIndexUpdater(repository, typeManager, linkIndex, repository.getWal()); 
     104        LinkIndexUpdater linkIndexUpdater = new LinkIndexUpdater(repository, linkIndex, repository.getWal()); 
    105105 
    106106        FieldType nonVersionedFt = typeManager.newFieldType(typeManager.getValueType("LINK", false, false), 
Note: See TracChangeset for help on using the changeset viewer.