001package votorola.a.diff.harvest.run; // Copyright 2012. Christian Weilbach. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Votorola Software"), to deal in the Votorola Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicence, and/or sell copies of the Votorola Software, and to permit persons to whom the Votorola Software is furnished to do so, subject to the following conditions: The preceding copyright notice and this permission notice shall be included in all copies or substantial portions of the Votorola Software. THE VOTOROLA SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE VOTOROLA SOFTWARE OR THE USE OR OTHER DEALINGS IN THE VOTOROLA SOFTWARE. 002 003import static java.util.concurrent.TimeUnit.SECONDS; 004 005import java.io.IOException; 006import java.util.Collections; 007import java.util.HashMap; 008import java.util.LinkedList; 009import java.util.List; 010import java.util.Map; 011import java.util.concurrent.Executors; 012import java.util.concurrent.ScheduledExecutorService; 013import java.util.concurrent.atomic.AtomicLong; 014import java.util.logging.Level; 015import java.util.logging.Logger; 016 017import org.apache.http.HttpEntity; 018import org.apache.http.HttpResponse; 019import org.apache.http.client.methods.HttpGet; 020import org.apache.http.concurrent.FutureCallback; 021import org.apache.http.impl.nio.client.DefaultHttpAsyncClient; 022import org.apache.http.nio.client.HttpAsyncClient; 023import org.apache.http.nio.reactor.IOReactorException; 024import org.apache.http.params.CoreConnectionPNames; 025import org.apache.http.params.CoreProtocolPNames; 026import org.apache.http.params.HttpParams; 027import org.apache.http.params.SyncBasicHttpParams; 028 029import votorola.g.lang.ThreadSafe; 030import votorola.g.logging.LoggerX; 031 032/** 033 * Service to schedule HTTP traffic asynchronously and gracefully on a per 034 * archive basis. The stepping is one second, fetchs are appended at the end of 035 * the future queue sorted by each individual base-url of the forum. 036 */ 037public @ThreadSafe 038class HarvestRunner { 039 040 /** 041 * Manage queues for each host and forum. 042 * 043 */ 044 private final class Queue { 045 public final List<Fetcher> queue = Collections 046 .synchronizedList(new LinkedList<Fetcher>()); 047 } 048 049 /** 050 * Internal runtime counter through the stepping. 051 */ 052 private final static AtomicLong counter = new AtomicLong(0L); 053 054 /** 055 * Number of available processors on runtime. 056 */ 057 private final static int CPUS = Runtime.getRuntime().availableProcessors(); 058 059 private static HarvestRunner instance; 060 061 final static Logger LOGGER = LoggerX.i(HarvestRunner.class); 062 063 /** 064 * Version advocated by the HTTP-client. 065 */ 066 public final static String VERSION = "0.0.1"; 067 068 /** 069 * Sane default settings for the TCP sockets. Taken from Apache example. 070 */ 071 private final static HttpParams params = new SyncBasicHttpParams() 072 .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000) 073 .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000) 074 .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) 075 .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, 076 false) 077 .setParameter(CoreProtocolPNames.USER_AGENT, 078 "Votorola-HarvestBot/" + VERSION); 079 080 /** 081 * Stepping is one second. 082 */ 083 public final static int STEPPING = 1; 084 085 /** 086 * Factor for number of worker threads. Total number of threads is 087 * 088 * <pre> 089 * THREADFACTOR * Runtime.getRuntime().availableProcessors() 090 * </pre> 091 */ 092 private final static int THREADFACTOR = 4; 093 094 095 /** 096 * Default singleton access. 097 * 098 * @return instance of HarvestRunner 099 */ 100 public static HarvestRunner i() { 101 if (instance == null) { 102 synchronized(HarvestRunner.class) { 103 instance = new HarvestRunner(); 104 } 105 } 106 return instance; 107 } 108 109 private final HttpAsyncClient client; 110 111 /** 112 * Queues for each base url. 113 */ 114 private final Map<String, Queue> queues = Collections 115 .synchronizedMap(new HashMap<String, Queue>()); 116 117 private final ScheduledExecutorService scheduler; 118 119 private HarvestRunner() { 120 scheduler = Executors.newScheduledThreadPool(THREADFACTOR * CPUS); 121 HttpAsyncClient tempClient = null; 122 try { 123 final DefaultHttpAsyncClient defaultClient = new DefaultHttpAsyncClient(); 124 defaultClient.setParams(params); 125 tempClient = defaultClient; 126 } catch (IOReactorException e) { 127 LOGGER.log(Level.SEVERE, "Cannot start async http client.", e); 128 System.exit(1); 129 } 130 131 client = tempClient; 132 client.start(); 133 initScheduler(); 134 } 135 136 protected void finalize() { 137 try { 138 client.shutdown(); 139 } catch (InterruptedException e) { 140 // ignore 141 } 142 } 143 144 /** 145 * Directly fetch and run a fetch in the thread pool. The scheduler 146 * runs this every second for the first fetch per archive. 147 * 148 * @param fetch 149 */ 150 private void get(final Fetcher fetch) { 151 final String url = fetch.url(); 152 LOGGER.fine("GETting: " + url); 153 client.execute(new HttpGet(url), 154 new FutureCallback<HttpResponse>() { 155 156 public void cancelled() { 157 LOGGER.info("Request cancelled(?):" + url); 158 } 159 160 public void completed(final HttpResponse res) { 161 LOGGER.finest("Got response for: " + url); 162 final HttpEntity entity = res.getEntity(); 163 if (entity == null) { 164 return; 165 } 166 167 if (res.getStatusLine().getStatusCode() > 400) { 168 LOGGER.info("HTTP error: " + res.getStatusLine()); 169 } 170 171 try { 172 fetch.setStatusCode(res.getStatusLine().getStatusCode()); 173 fetch.setInputStream(entity.getContent()); 174 } catch (IOException e) { 175 LOGGER.log(Level.WARNING, 176 "Cannot get response' content.", e); 177 return; 178 } 179 180 scheduler.execute(new Runnable() { 181 public void run() { 182 fetch.run(); 183 } 184 }); 185 186 } 187 188 public void failed(final Exception e) { 189 LOGGER.log(Level.INFO, "Request failed:" + url, e); 190 } 191 }); 192 } 193 194 private synchronized Queue getOrInitQueue(final String archiveUrl) { 195 Queue queue = queues.get(archiveUrl); 196 if (queue == null) { 197 queue = new Queue(); 198 } 199 200 queues.put(archiveUrl, queue); 201 return queue; 202 } 203 204 /** 205 * Initialize the stepping scheduler. 206 */ 207 private synchronized void initScheduler() { 208 209 final Runnable fetcher = new Runnable() { 210 211 public void run() { 212 LOGGER.fine("Running step " + counter.getAndIncrement()); 213 214 // O(n), n number of archives 215 for (final Queue queue : queues.values()) { 216 if (queue.queue.isEmpty()) { 217 continue; 218 } 219 220 // run one fetch from the regular queue 221 final Fetcher fetch = queue.queue.remove(0); 222 223 get(fetch); 224 } 225 } 226 }; 227 228 scheduler.scheduleAtFixedRate(fetcher, 0, STEPPING, SECONDS); 229 } 230 231 /** 232 * Schedule fetchs for fetching and execution. The fetch gets the next empty 233 * slot in a queue for its archive. The archive is determined by base-url. 234 * Once the data has arrived, {@linkplain Runnable#run()} is executed in the 235 * thread pool. 236 */ 237 public void scheduleFirst(final Fetcher fetch) { 238 239 final Queue queue = getOrInitQueue(fetch.archiveUrl()); 240 queue.queue.add(0, fetch); 241 } 242 243 public void scheduleLast(final Fetcher fetch) { 244 245 final Queue queue = getOrInitQueue(fetch.archiveUrl()); 246 queue.queue.add(fetch); 247 } 248 249}