Changeset 4166


Ignore:
Timestamp:
2010-07-20 13:38:32 (3 years ago)
Author:
bruno
Message:

Small metrics change, check thread interrupted state before processing each message, do the thread wait completely following the pattern from wait javadoc.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • trunk/rowlog/impl/src/main/java/org/lilycms/rowlog/impl/RowLogProcessorImpl.java

    r4160 r4166  
    308308                try { 
    309309                    List<RowLogMessage> messages = shard.next(consumer.getId()); 
    310                     metrics.incScanCount(); 
     310                    metrics.setScannedMessages(messages != null ? messages.size() : 0); 
    311311                    if (messages != null && !messages.isEmpty()) { 
    312                         metrics.incMessageCount(); 
    313312                        for (RowLogMessage message : messages) { 
     313                            if (isInterrupted()) 
     314                                return; 
     315 
     316                            metrics.incMessageCount(); 
    314317                            byte[] lock = rowLog.lockMessage(message, consumer.getId()); 
    315318                            if (lock != null) { 
     
    326329                        try { 
    327330                            long timeout = 5000; 
    328                             if (lastWakeup + timeout < System.currentTimeMillis()) { 
    329                                 synchronized (this) { 
     331                            synchronized (this) { 
     332                                while (lastWakeup + timeout < System.currentTimeMillis()) { 
    330333                                    wait(timeout); 
    331334                                } 
     
    344347        private class ProcessorMetrics implements Updater { 
    345348            private int scanCount = 0; 
     349            private long scannedMessageCount = 0; 
    346350            private int messageCount = 0; 
    347351            private int successCount = 0; 
     
    358362            public synchronized void doUpdates(MetricsContext unused) { 
    359363                record.setMetric("scanCount", scanCount); 
     364                record.setMetric("messagesPerScan", scanCount > 0 ? scannedMessageCount / scanCount : 0f); 
    360365                record.setMetric("messageCount", messageCount); 
    361366                record.setMetric("successCount", successCount); 
     
    365370 
    366371                scanCount = 0; 
     372                scannedMessageCount = 0; 
    367373                messageCount = 0; 
    368374                successCount = 0; 
     
    371377            } 
    372378 
    373             synchronized void incScanCount() { 
     379            synchronized void setScannedMessages(int read) { 
    374380                scanCount++; 
     381                scannedMessageCount += read; 
    375382            } 
    376383 
Note: See TracChangeset for help on using the changeset viewer.