001package votorola.a.diff.harvest; // Copyright 2012. Christian Weilbach.  Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Votorola Software"), to deal in the Votorola Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicence, and/or sell copies of the Votorola Software, and to permit persons to whom the Votorola Software is furnished to do so, subject to the following conditions: The preceding copyright notice and this permission notice shall be included in all copies or substantial portions of the Votorola Software. THE VOTOROLA SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE VOTOROLA SOFTWARE OR THE USE OR OTHER DEALINGS IN THE VOTOROLA SOFTWARE.
002
003import java.util.Date;
004import java.sql.PreparedStatement;
005import java.sql.ResultSet;
006import java.sql.SQLException;
007import java.sql.Timestamp;
008import java.util.logging.Level;
009import java.util.logging.Logger;
010
011import votorola.a.diff.harvest.run.HarvestRunner;
012import votorola.g.lang.ThreadSafe;
013import votorola.g.lang.Warning;
014import votorola.g.logging.LoggerX;
015import votorola.g.sql.Database;
016
017/**
018 * Database table to store harvester state for PipermailHarvester only atm. This
019 * is not part of the public API (yet).
020 * 
021 */
022@Warning("non-Api")
023@ThreadSafe
024final public class StateTable {
025
026    static enum Type {
027        TEMP, FIN, PERM
028    }
029
030    private final transient Database db;
031
032    private final static String SCHEMA_NAME = "harvest";
033
034    private final transient String statementKeyBase;
035
036    private final static String TABLE_NAME = "pipermail_state";
037
038    // SQL table string
039    private final static String TABLE = "\"" + SCHEMA_NAME + "\".\"" + TABLE_NAME
040            + "\"";
041
042    private final static Logger LOGGER = LoggerX.i(StateTable.class);
043
044    // TODO do not synchronize around db, this will be a bottleneck
045    /**
046     * Table which saves harvested urls for {@linkplain HarvestRunner}.
047     * 
048     * @param db
049     */
050    @Warning("non-Api")
051    public StateTable(final Database db) {
052        this.db = db;
053        statementKeyBase = getClass().getName() + ":" + SCHEMA_NAME + "/"
054                + TABLE_NAME + ".";
055    }
056
057    /**
058     * Creates this table in the database.
059     * 
060     * @throws SQLException
061     */
062    void create() throws SQLException {
063
064        try {
065            synchronized (db) {
066                db.ensureSchema(SCHEMA_NAME);
067            }
068
069            final String key = statementKeyBase + "create";
070            synchronized (db) {
071                PreparedStatement prepStatem = db.statementCache().get(key);
072                if (prepStatem == null) {
073                    // this also creates a b-tree on the hash column
074                    prepStatem = db
075                            .connection()
076                            .prepareStatement(
077                                    "CREATE TABLE IF NOT EXISTS "
078                                            + TABLE
079                                            + " (archive_url character varying NOT NULL"
080                                            + ", type character varying NOT NULL"
081                                            + ", marker character varying NOT NULL"
082                                            + ", sent_ts timestamp with time zone NOT NULL"
083                                            + ", PRIMARY KEY(archive_url, type, marker))");
084
085                    db.statementCache().put(key, prepStatem);
086                }
087                prepStatem.execute();
088            }
089        } catch (SQLException e) {
090            LOGGER.log(Level.SEVERE, "Cannot create StateTable.", e);
091            System.exit(1);
092        }
093    }
094
095    /**
096     * Drops this table from the database.
097     * 
098     * @throws SQLException
099     */
100    void drop() throws SQLException {
101        final String key = statementKeyBase + "drop";
102        synchronized (db) {
103            PreparedStatement prepStatem = db.statementCache().get(key);
104            if (prepStatem == null) {
105                prepStatem = db.connection().prepareStatement(
106                        "DROP TABLE \"" + SCHEMA_NAME + "\".\"" + TABLE_NAME
107                                + "\"");
108                db.statementCache().put(key, prepStatem);
109            }
110            prepStatem.execute();
111        }
112    }
113
114    /**
115     * Returns true if this table exists in the database; false otherwise.
116     * 
117     * @return whether database already exists or not.
118     * @throws SQLException
119     */
120    public boolean exists() throws SQLException {
121        final String key = statementKeyBase + "exists";
122        synchronized (db) {
123            PreparedStatement prepStatem = db.statementCache().get(key);
124            if (prepStatem == null) {
125                prepStatem = db.connection().prepareStatement(
126                        "SELECT * FROM " + TABLE );
127                prepStatem.setMaxRows(1);
128                db.statementCache().put(key, prepStatem);
129            }
130            try {
131                prepStatem.execute();
132            } catch (SQLException x) {
133                final String sqlState = x.getSQLState();
134                if ("3F000".equals(sqlState)) {
135                    return false; // 3F000 = [missing schema]
136                }
137
138                if ("42P01".equals(sqlState)) {
139                    return false; // 42P01 = UNDEFINED TABLE
140                }
141
142                throw x;
143            }
144        }
145        return true;
146    }
147
148    /**
149     * Get newest marker of any type for a forum. If there is none, a dummy
150     * value with empty string and date from beginning at unix time is used.
151     * 
152     * @param archiveUrl
153     * @throws SQLException
154     */
155    public Marker getNewest(final String archiveUrl) throws SQLException {
156        synchronized (db) {
157            Marker temp = Marker.dummy();
158
159            final String getNewest1Key = statementKeyBase + "getNewest1";
160            PreparedStatement s = db.statementCache().get(getNewest1Key);
161            if (s == null) {
162                s = db.connection().prepareStatement(
163                        "SELECT marker, sent_ts FROM " + TABLE
164                                + " where archive_url ~ ?"
165                                + " ORDER BY sent_ts DESC LIMIT 1");
166                db.statementCache().put(getNewest1Key, s);
167            }
168
169            s.setString(1, archiveUrl);
170            ResultSet rs = s.executeQuery();
171            if (rs.next()) {
172                temp = Marker.create(rs.getString(1), new Date(rs.getTimestamp(2)
173                        .getTime()));
174            }
175            return temp;
176        }
177
178    }
179
180    /**
181     * Get either permanent or temporary marker, the newest known marker.
182     * 
183     * @param archiveUrl
184     * @param type
185     * @return newest marker of type
186     * @throws SQLException
187     */
188    public Marker getNewest(final String archiveUrl, final Type type)
189            throws SQLException {
190        synchronized (db) {
191            Marker temp = Marker.dummy();
192
193            final String getNewest2Key = statementKeyBase + "getNewest2";
194            PreparedStatement s = db.statementCache().get(getNewest2Key);
195
196            if (s == null) {
197                s = db.connection().prepareStatement(
198                        "SELECT marker, sent_ts FROM " + TABLE
199                                + " WHERE archive_url ~ ? AND type = ?"
200                                + "ORDER BY sent_ts DESC LIMIT 1");
201                db.statementCache().put(getNewest2Key, s);
202            }
203
204            s.setString(1, archiveUrl);
205            s.setString(2, type.name());
206            ResultSet rs = s.executeQuery();
207            if (rs.next()) {
208                temp = Marker.create( rs.getString(1), new Date(rs.getTimestamp(2)
209                        .getTime()));
210            }
211            return temp;
212        }
213
214    }
215
216    /**
217     * Put a temporary marker.
218     * 
219     * @throws SQLException
220     */
221    public boolean put(final String archiveUrl, final Marker marker)
222            throws SQLException {
223        synchronized (db) {
224
225            // insert if update was not successful
226            final String insertKey = statementKeyBase + "insertMarker";
227            PreparedStatement s = db.statementCache().get(insertKey);
228            if (s == null) {
229                s = db.connection().prepareStatement(
230                        "INSERT INTO "
231                                + TABLE
232                                + "(archive_url, type, marker"
233                                + ", sent_ts)"
234                                // silently drop stale TEMP markers
235                                // http://stackoverflow.com/questions/1109061/insert-on-duplicate-update-postgresql
236                                + " SELECT ?, ?, ?, ? WHERE NOT EXISTS"
237                                + " (SELECT 1 FROM " + TABLE
238                                // primary key constraint
239                                + " WHERE archive_url = ? AND type = ?"
240                                + " AND sent_ts = ?)");
241                db.statementCache().put(insertKey, s);
242            }
243
244            s.setString(1, archiveUrl);
245            s.setString(2, Type.TEMP.name());
246            s.setString(3, marker.path());
247            s.setTimestamp(4, new Timestamp(marker.date().getTime()));
248
249            // repeat for EXCEPT constraint avoidance
250            s.setString(5, archiveUrl);
251            s.setString(6, Type.TEMP.name());
252            s.setTimestamp(7, new Timestamp(marker.date().getTime()));
253
254            // success?
255            return (s.executeUpdate()==0);
256        }
257    }
258
259    /**
260     * Remove all temporary markers in finished range and mark start marker as
261     * finished. The end marker is not removed and its type is not changed. It
262     * can still be temporary.
263     * 
264     * @param archiveUrl
265     * @param start
266     * @param end
267     * @throws SQLException
268     */
269    public void finish(final String archiveUrl, final Marker start,
270            final Marker end) throws SQLException {
271        synchronized (db) {
272            // cleanup covered temp markers
273            final String cleanupKey = statementKeyBase + "cleanup";
274            PreparedStatement clStat = db.statementCache().get(cleanupKey);
275            if (clStat == null) {
276                clStat = db.connection().prepareStatement(
277                        "DELETE FROM " + TABLE
278                                + " WHERE archive_url ~ ? AND type = ?"
279                                + " AND ( (sent_ts < ? AND sent_ts > ?)"
280                                // stale fin markers, about to be reset next
281                                // otherwise UPDATE fails on a stale marker
282                                + " OR (sent_ts = ? AND type = ?) )");
283                db.statementCache().put(cleanupKey, clStat);
284            }
285
286            clStat.setString(1, archiveUrl);
287            clStat.setString(2, Type.TEMP.name());
288            clStat.setTimestamp(3, new Timestamp(start.date().getTime()));
289            clStat.setTimestamp(4, new Timestamp(end.date().getTime()));
290            clStat.setTimestamp(5, new Timestamp(start.date().getTime()));
291            clStat.setString(6, Type.FIN.name());
292            int count = clStat.executeUpdate();
293            LOGGER.fine("Removed " + count + " stale entries.");
294
295            // mark our marker as finished
296            final String finishedKey = statementKeyBase + "finished";
297            PreparedStatement finStat = db.statementCache().get(finishedKey);
298            if (finStat == null) {
299                finStat = db.connection().prepareStatement(
300                        "UPDATE " + TABLE + " SET type = ?"
301                                + " WHERE archive_url ~ ? AND type = ?"
302                                + " AND sent_ts = ? AND marker = ?");
303                db.statementCache().put(finishedKey, finStat);
304            }
305            finStat.setString(1, Type.FIN.name());
306            finStat.setString(2, archiveUrl);
307            finStat.setString(3, Type.TEMP.name());
308            finStat.setTimestamp(4, new Timestamp(start.date().getTime()));
309            finStat.setString(5, start.path());
310            finStat.executeUpdate();
311
312        }
313
314    }
315
316    /**
317     * Recalculate the PERM marker by setting the newest FIN marked Marker,
318     * which is not newer than any temporary marker to PERM and deleting all
319     * older markers.
320     * 
321     * @param archiveUrl
322     * @throws SQLException
323     */
324    public void update(final String archiveUrl) throws SQLException {
325        synchronized (db) {
326            final String updateKey = statementKeyBase + "updateMarker";
327            PreparedStatement upStat = db.statementCache().get(updateKey);
328            if (upStat == null) {
329                upStat = db.connection().prepareStatement(
330                        "UPDATE " + TABLE + " set type = ?"
331                                + " where type = ? and archive_url = ?"
332                                + " and (sent_ts < "
333                                + "(select min(sent_ts) from " + TABLE
334                                + " where type = ? and archive_url = ?)"
335                                + " or (select count(*) from " + TABLE
336                                + "where type = ? and archive_url = ?) = 0)");
337                db.statementCache().put(updateKey, upStat);
338            }
339            upStat.setString(1, Type.PERM.name()); // new
340            upStat.setString(2, Type.FIN.name()); // old
341            upStat.setString(3, archiveUrl);
342            upStat.setString(4, Type.TEMP.name()); // active
343            upStat.setString(5, archiveUrl);
344            upStat.setString(6, Type.TEMP.name()); // active
345            upStat.setString(7, archiveUrl);
346
347            if (upStat.executeUpdate() != 0) { // success
348                LOGGER.fine("Updated PERM marker for " + archiveUrl);
349            }
350
351            // remove stale markers
352            final String deleteStaleKey = statementKeyBase + "deleteStale";
353            PreparedStatement delStat = db.statementCache().get(deleteStaleKey);
354            if (delStat == null) {
355                delStat = db.connection().prepareStatement(
356                        "DELETE FROM " + TABLE + " WHERE archive_url = ?"
357                                + " AND (type = ? or type = ?)"
358                                + " AND sent_ts <"
359                                + " (SELECT max(sent_ts) FROM " + TABLE
360                                + "  WHERE archive_url = ? AND type = ?)");
361                db.statementCache().put(deleteStaleKey, delStat);
362            }
363
364            delStat.setString(1, archiveUrl);
365            // don't remove TEMP markers as they should not occur, if they
366            // do this is a bug
367            delStat.setString(2, Type.PERM.name());
368            delStat.setString(3, Type.FIN.name());
369            delStat.setString(4, archiveUrl);
370            delStat.setString(5, Type.PERM.name());
371            delStat.executeUpdate();
372        }
373    }
374
375    /**
376     * Remove all states of an archive.
377     * @param archiveUrl
378     * @return number of removed entries
379     * @throws SQLException
380     */
381    public int removeArchive(final String archiveUrl) throws SQLException {
382        // remove an archive
383        final String deleteArchiveKey = statementKeyBase + "deleteArchive";
384        synchronized (db) {
385            PreparedStatement delStat = db.statementCache().get(deleteArchiveKey);
386            if (delStat == null) {
387                delStat = db.connection().prepareStatement(
388                        "DELETE FROM " + TABLE + " where archive_url = ?");
389                db.statementCache().put(deleteArchiveKey, delStat);
390            }
391
392            delStat.setString(1, archiveUrl);
393
394            return delStat.executeUpdate();
395        }
396    }
397
398}