001package votorola.a.diff.harvest.run; // Copyright 2012. Christian Weilbach.  Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Votorola Software"), to deal in the Votorola Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicence, and/or sell copies of the Votorola Software, and to permit persons to whom the Votorola Software is furnished to do so, subject to the following conditions: The preceding copyright notice and this permission notice shall be included in all copies or substantial portions of the Votorola Software. THE VOTOROLA SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE VOTOROLA SOFTWARE OR THE USE OR OTHER DEALINGS IN THE VOTOROLA SOFTWARE.
002
003import static java.util.concurrent.TimeUnit.SECONDS;
004
005import java.io.IOException;
006import java.util.Collections;
007import java.util.HashMap;
008import java.util.LinkedList;
009import java.util.List;
010import java.util.Map;
011import java.util.concurrent.Executors;
012import java.util.concurrent.ScheduledExecutorService;
013import java.util.concurrent.atomic.AtomicLong;
014import java.util.logging.Level;
015import java.util.logging.Logger;
016
017import org.apache.http.HttpEntity;
018import org.apache.http.HttpResponse;
019import org.apache.http.client.methods.HttpGet;
020import org.apache.http.concurrent.FutureCallback;
021import org.apache.http.impl.nio.client.DefaultHttpAsyncClient;
022import org.apache.http.nio.client.HttpAsyncClient;
023import org.apache.http.nio.reactor.IOReactorException;
024import org.apache.http.params.CoreConnectionPNames;
025import org.apache.http.params.CoreProtocolPNames;
026import org.apache.http.params.HttpParams;
027import org.apache.http.params.SyncBasicHttpParams;
028
029import votorola.g.lang.ThreadSafe;
030import votorola.g.logging.LoggerX;
031
032/**
033 * Service to schedule HTTP traffic asynchronously and gracefully on a per
034 * archive basis. The stepping is one second, fetchs are appended at the end of
035 * the future queue sorted by each individual base-url of the forum.
036 */
037public @ThreadSafe
038class HarvestRunner {
039
040    /**
041     * Manage queues for each host and forum.
042     * 
043     */
044    private final class Queue {
045        public final List<Fetcher> queue = Collections
046                .synchronizedList(new LinkedList<Fetcher>());
047    }
048
049    /**
050     * Internal runtime counter through the stepping.
051     */
052    private final static AtomicLong counter = new AtomicLong(0L);
053
054    /**
055     * Number of available processors on runtime.
056     */
057    private final static int CPUS = Runtime.getRuntime().availableProcessors();
058
059    private static HarvestRunner instance;
060
061    final static Logger LOGGER = LoggerX.i(HarvestRunner.class);
062
063    /**
064     * Version advocated by the HTTP-client.
065     */
066    public final static String VERSION = "0.0.1";
067
068    /**
069     * Sane default settings for the TCP sockets. Taken from Apache example.
070     */
071    private final static HttpParams params = new SyncBasicHttpParams()
072            .setIntParameter(CoreConnectionPNames.SO_TIMEOUT, 5000)
073            .setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, 30000)
074            .setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
075            .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK,
076                    false)
077            .setParameter(CoreProtocolPNames.USER_AGENT,
078                    "Votorola-HarvestBot/" + VERSION);
079
080    /**
081     * Stepping is one second.
082     */
083    public final static int STEPPING = 1;
084
085    /**
086     * Factor for number of worker threads. Total number of threads is
087     * 
088     * <pre>
089     * THREADFACTOR * Runtime.getRuntime().availableProcessors()
090     * </pre>
091     */
092    private final static int THREADFACTOR = 4;
093
094
095    /**
096     * Default singleton access.
097     * 
098     * @return instance of HarvestRunner
099     */
100    public static HarvestRunner i() {
101        if (instance == null) {
102            synchronized(HarvestRunner.class) {
103                instance = new HarvestRunner();
104            }
105        }
106        return instance;
107    }
108
109    private final HttpAsyncClient client;
110
111    /**
112     * Queues for each base url.
113     */
114    private final Map<String, Queue> queues = Collections
115            .synchronizedMap(new HashMap<String, Queue>());
116
117    private final ScheduledExecutorService scheduler;
118
119    private HarvestRunner() {
120        scheduler = Executors.newScheduledThreadPool(THREADFACTOR * CPUS);
121        HttpAsyncClient tempClient = null;
122        try {
123            final DefaultHttpAsyncClient defaultClient = new DefaultHttpAsyncClient();
124            defaultClient.setParams(params);
125            tempClient = defaultClient;
126        } catch (IOReactorException e) {
127            LOGGER.log(Level.SEVERE, "Cannot start async http client.", e);
128            System.exit(1);
129        }
130
131        client = tempClient;
132        client.start();
133        initScheduler();
134    }
135
136    protected void finalize() {
137        try {
138            client.shutdown();
139        } catch (InterruptedException e) {
140            // ignore
141        }
142    }
143
144    /**
145     * Directly fetch and run a fetch in the thread pool. The scheduler
146     * runs this every second for the first fetch per archive.
147     * 
148     * @param fetch
149     */
150    private void get(final Fetcher fetch) {
151        final String url = fetch.url();
152        LOGGER.fine("GETting: " + url);
153        client.execute(new HttpGet(url),
154                new FutureCallback<HttpResponse>() {
155
156                    public void cancelled() {
157                        LOGGER.info("Request cancelled(?):" + url);
158                    }
159
160                    public void completed(final HttpResponse res) {
161                        LOGGER.finest("Got response for: " + url);
162                        final HttpEntity entity = res.getEntity();
163                        if (entity == null) {
164                            return;
165                        }
166
167                        if (res.getStatusLine().getStatusCode() > 400) {
168                            LOGGER.info("HTTP error: " + res.getStatusLine());
169                        }
170                        
171                        try {
172                            fetch.setStatusCode(res.getStatusLine().getStatusCode());
173                            fetch.setInputStream(entity.getContent());
174                        } catch (IOException e) {
175                            LOGGER.log(Level.WARNING,
176                                    "Cannot get response' content.", e);
177                            return;
178                        }
179
180                        scheduler.execute(new Runnable() {
181                            public void run() {
182                                fetch.run();
183                            }
184                        });
185
186                    }
187
188                    public void failed(final Exception e) {
189                        LOGGER.log(Level.INFO, "Request failed:" + url, e);
190                    }
191                });
192    }
193
194    private synchronized Queue getOrInitQueue(final String archiveUrl) {
195        Queue queue = queues.get(archiveUrl);
196        if (queue == null) {
197            queue = new Queue();
198        }
199
200        queues.put(archiveUrl, queue);
201        return queue;
202    }
203
204    /**
205     * Initialize the stepping scheduler.
206     */
207    private synchronized void initScheduler() {
208
209        final Runnable fetcher = new Runnable() {
210
211            public void run() {
212                LOGGER.fine("Running step " + counter.getAndIncrement());
213
214                // O(n), n number of archives
215                for (final Queue queue : queues.values()) {
216                    if (queue.queue.isEmpty()) {
217                        continue;
218                    }
219
220                    // run one fetch from the regular queue
221                    final Fetcher fetch = queue.queue.remove(0);
222
223                    get(fetch);
224                }
225            }
226        };
227
228        scheduler.scheduleAtFixedRate(fetcher, 0, STEPPING, SECONDS);
229    }
230
231    /**
232     * Schedule fetchs for fetching and execution. The fetch gets the next empty
233     * slot in a queue for its archive. The archive is determined by base-url.
234     * Once the data has arrived, {@linkplain Runnable#run()} is executed in the
235     * thread pool.
236     */
237    public void scheduleFirst(final Fetcher fetch) {
238
239        final Queue queue = getOrInitQueue(fetch.archiveUrl());
240        queue.queue.add(0, fetch);
241    }
242
243    public void scheduleLast(final Fetcher fetch) {
244
245        final Queue queue = getOrInitQueue(fetch.archiveUrl());
246        queue.queue.add(fetch);
247    }
248
249}