001package votorola.a.diff.harvest; // Copyright 2012. Christian Weilbach. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Votorola Software"), to deal in the Votorola Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicence, and/or sell copies of the Votorola Software, and to permit persons to whom the Votorola Software is furnished to do so, subject to the following conditions: The preceding copyright notice and this permission notice shall be included in all copies or substantial portions of the Votorola Software. THE VOTOROLA SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE VOTOROLA SOFTWARE OR THE USE OR OTHER DEALINGS IN THE VOTOROLA SOFTWARE. 002 003import java.util.Date; 004import java.sql.PreparedStatement; 005import java.sql.ResultSet; 006import java.sql.SQLException; 007import java.sql.Timestamp; 008import java.util.logging.Level; 009import java.util.logging.Logger; 010 011import votorola.a.diff.harvest.run.HarvestRunner; 012import votorola.g.lang.ThreadSafe; 013import votorola.g.lang.Warning; 014import votorola.g.logging.LoggerX; 015import votorola.g.sql.Database; 016 017/** 018 * Database table to store harvester state for PipermailHarvester only atm. This 019 * is not part of the public API (yet). 020 * 021 */ 022@Warning("non-Api") 023@ThreadSafe 024final public class StateTable { 025 026 static enum Type { 027 TEMP, FIN, PERM 028 } 029 030 private final transient Database db; 031 032 private final static String SCHEMA_NAME = "harvest"; 033 034 private final transient String statementKeyBase; 035 036 private final static String TABLE_NAME = "pipermail_state"; 037 038 // SQL table string 039 private final static String TABLE = "\"" + SCHEMA_NAME + "\".\"" + TABLE_NAME 040 + "\""; 041 042 private final static Logger LOGGER = LoggerX.i(StateTable.class); 043 044 // TODO do not synchronize around db, this will be a bottleneck 045 /** 046 * Table which saves harvested urls for {@linkplain HarvestRunner}. 047 * 048 * @param db 049 */ 050 @Warning("non-Api") 051 public StateTable(final Database db) { 052 this.db = db; 053 statementKeyBase = getClass().getName() + ":" + SCHEMA_NAME + "/" 054 + TABLE_NAME + "."; 055 } 056 057 /** 058 * Creates this table in the database. 059 * 060 * @throws SQLException 061 */ 062 void create() throws SQLException { 063 064 try { 065 synchronized (db) { 066 db.ensureSchema(SCHEMA_NAME); 067 } 068 069 final String key = statementKeyBase + "create"; 070 synchronized (db) { 071 PreparedStatement prepStatem = db.statementCache().get(key); 072 if (prepStatem == null) { 073 // this also creates a b-tree on the hash column 074 prepStatem = db 075 .connection() 076 .prepareStatement( 077 "CREATE TABLE IF NOT EXISTS " 078 + TABLE 079 + " (archive_url character varying NOT NULL" 080 + ", type character varying NOT NULL" 081 + ", marker character varying NOT NULL" 082 + ", sent_ts timestamp with time zone NOT NULL" 083 + ", PRIMARY KEY(archive_url, type, marker))"); 084 085 db.statementCache().put(key, prepStatem); 086 } 087 prepStatem.execute(); 088 } 089 } catch (SQLException e) { 090 LOGGER.log(Level.SEVERE, "Cannot create StateTable.", e); 091 System.exit(1); 092 } 093 } 094 095 /** 096 * Drops this table from the database. 097 * 098 * @throws SQLException 099 */ 100 void drop() throws SQLException { 101 final String key = statementKeyBase + "drop"; 102 synchronized (db) { 103 PreparedStatement prepStatem = db.statementCache().get(key); 104 if (prepStatem == null) { 105 prepStatem = db.connection().prepareStatement( 106 "DROP TABLE \"" + SCHEMA_NAME + "\".\"" + TABLE_NAME 107 + "\""); 108 db.statementCache().put(key, prepStatem); 109 } 110 prepStatem.execute(); 111 } 112 } 113 114 /** 115 * Returns true if this table exists in the database; false otherwise. 116 * 117 * @return whether database already exists or not. 118 * @throws SQLException 119 */ 120 public boolean exists() throws SQLException { 121 final String key = statementKeyBase + "exists"; 122 synchronized (db) { 123 PreparedStatement prepStatem = db.statementCache().get(key); 124 if (prepStatem == null) { 125 prepStatem = db.connection().prepareStatement( 126 "SELECT * FROM " + TABLE ); 127 prepStatem.setMaxRows(1); 128 db.statementCache().put(key, prepStatem); 129 } 130 try { 131 prepStatem.execute(); 132 } catch (SQLException x) { 133 final String sqlState = x.getSQLState(); 134 if ("3F000".equals(sqlState)) { 135 return false; // 3F000 = [missing schema] 136 } 137 138 if ("42P01".equals(sqlState)) { 139 return false; // 42P01 = UNDEFINED TABLE 140 } 141 142 throw x; 143 } 144 } 145 return true; 146 } 147 148 /** 149 * Get newest marker of any type for a forum. If there is none, a dummy 150 * value with empty string and date from beginning at unix time is used. 151 * 152 * @param archiveUrl 153 * @throws SQLException 154 */ 155 public Marker getNewest(final String archiveUrl) throws SQLException { 156 synchronized (db) { 157 Marker temp = Marker.dummy(); 158 159 final String getNewest1Key = statementKeyBase + "getNewest1"; 160 PreparedStatement s = db.statementCache().get(getNewest1Key); 161 if (s == null) { 162 s = db.connection().prepareStatement( 163 "SELECT marker, sent_ts FROM " + TABLE 164 + " where archive_url ~ ?" 165 + " ORDER BY sent_ts DESC LIMIT 1"); 166 db.statementCache().put(getNewest1Key, s); 167 } 168 169 s.setString(1, archiveUrl); 170 ResultSet rs = s.executeQuery(); 171 if (rs.next()) { 172 temp = Marker.create(rs.getString(1), new Date(rs.getTimestamp(2) 173 .getTime())); 174 } 175 return temp; 176 } 177 178 } 179 180 /** 181 * Get either permanent or temporary marker, the newest known marker. 182 * 183 * @param archiveUrl 184 * @param type 185 * @return newest marker of type 186 * @throws SQLException 187 */ 188 public Marker getNewest(final String archiveUrl, final Type type) 189 throws SQLException { 190 synchronized (db) { 191 Marker temp = Marker.dummy(); 192 193 final String getNewest2Key = statementKeyBase + "getNewest2"; 194 PreparedStatement s = db.statementCache().get(getNewest2Key); 195 196 if (s == null) { 197 s = db.connection().prepareStatement( 198 "SELECT marker, sent_ts FROM " + TABLE 199 + " WHERE archive_url ~ ? AND type = ?" 200 + "ORDER BY sent_ts DESC LIMIT 1"); 201 db.statementCache().put(getNewest2Key, s); 202 } 203 204 s.setString(1, archiveUrl); 205 s.setString(2, type.name()); 206 ResultSet rs = s.executeQuery(); 207 if (rs.next()) { 208 temp = Marker.create( rs.getString(1), new Date(rs.getTimestamp(2) 209 .getTime())); 210 } 211 return temp; 212 } 213 214 } 215 216 /** 217 * Put a temporary marker. 218 * 219 * @throws SQLException 220 */ 221 public boolean put(final String archiveUrl, final Marker marker) 222 throws SQLException { 223 synchronized (db) { 224 225 // insert if update was not successful 226 final String insertKey = statementKeyBase + "insertMarker"; 227 PreparedStatement s = db.statementCache().get(insertKey); 228 if (s == null) { 229 s = db.connection().prepareStatement( 230 "INSERT INTO " 231 + TABLE 232 + "(archive_url, type, marker" 233 + ", sent_ts)" 234 // silently drop stale TEMP markers 235 // http://stackoverflow.com/questions/1109061/insert-on-duplicate-update-postgresql 236 + " SELECT ?, ?, ?, ? WHERE NOT EXISTS" 237 + " (SELECT 1 FROM " + TABLE 238 // primary key constraint 239 + " WHERE archive_url = ? AND type = ?" 240 + " AND sent_ts = ?)"); 241 db.statementCache().put(insertKey, s); 242 } 243 244 s.setString(1, archiveUrl); 245 s.setString(2, Type.TEMP.name()); 246 s.setString(3, marker.path()); 247 s.setTimestamp(4, new Timestamp(marker.date().getTime())); 248 249 // repeat for EXCEPT constraint avoidance 250 s.setString(5, archiveUrl); 251 s.setString(6, Type.TEMP.name()); 252 s.setTimestamp(7, new Timestamp(marker.date().getTime())); 253 254 // success? 255 return (s.executeUpdate()==0); 256 } 257 } 258 259 /** 260 * Remove all temporary markers in finished range and mark start marker as 261 * finished. The end marker is not removed and its type is not changed. It 262 * can still be temporary. 263 * 264 * @param archiveUrl 265 * @param start 266 * @param end 267 * @throws SQLException 268 */ 269 public void finish(final String archiveUrl, final Marker start, 270 final Marker end) throws SQLException { 271 synchronized (db) { 272 // cleanup covered temp markers 273 final String cleanupKey = statementKeyBase + "cleanup"; 274 PreparedStatement clStat = db.statementCache().get(cleanupKey); 275 if (clStat == null) { 276 clStat = db.connection().prepareStatement( 277 "DELETE FROM " + TABLE 278 + " WHERE archive_url ~ ? AND type = ?" 279 + " AND ( (sent_ts < ? AND sent_ts > ?)" 280 // stale fin markers, about to be reset next 281 // otherwise UPDATE fails on a stale marker 282 + " OR (sent_ts = ? AND type = ?) )"); 283 db.statementCache().put(cleanupKey, clStat); 284 } 285 286 clStat.setString(1, archiveUrl); 287 clStat.setString(2, Type.TEMP.name()); 288 clStat.setTimestamp(3, new Timestamp(start.date().getTime())); 289 clStat.setTimestamp(4, new Timestamp(end.date().getTime())); 290 clStat.setTimestamp(5, new Timestamp(start.date().getTime())); 291 clStat.setString(6, Type.FIN.name()); 292 int count = clStat.executeUpdate(); 293 LOGGER.fine("Removed " + count + " stale entries."); 294 295 // mark our marker as finished 296 final String finishedKey = statementKeyBase + "finished"; 297 PreparedStatement finStat = db.statementCache().get(finishedKey); 298 if (finStat == null) { 299 finStat = db.connection().prepareStatement( 300 "UPDATE " + TABLE + " SET type = ?" 301 + " WHERE archive_url ~ ? AND type = ?" 302 + " AND sent_ts = ? AND marker = ?"); 303 db.statementCache().put(finishedKey, finStat); 304 } 305 finStat.setString(1, Type.FIN.name()); 306 finStat.setString(2, archiveUrl); 307 finStat.setString(3, Type.TEMP.name()); 308 finStat.setTimestamp(4, new Timestamp(start.date().getTime())); 309 finStat.setString(5, start.path()); 310 finStat.executeUpdate(); 311 312 } 313 314 } 315 316 /** 317 * Recalculate the PERM marker by setting the newest FIN marked Marker, 318 * which is not newer than any temporary marker to PERM and deleting all 319 * older markers. 320 * 321 * @param archiveUrl 322 * @throws SQLException 323 */ 324 public void update(final String archiveUrl) throws SQLException { 325 synchronized (db) { 326 final String updateKey = statementKeyBase + "updateMarker"; 327 PreparedStatement upStat = db.statementCache().get(updateKey); 328 if (upStat == null) { 329 upStat = db.connection().prepareStatement( 330 "UPDATE " + TABLE + " set type = ?" 331 + " where type = ? and archive_url = ?" 332 + " and (sent_ts < " 333 + "(select min(sent_ts) from " + TABLE 334 + " where type = ? and archive_url = ?)" 335 + " or (select count(*) from " + TABLE 336 + "where type = ? and archive_url = ?) = 0)"); 337 db.statementCache().put(updateKey, upStat); 338 } 339 upStat.setString(1, Type.PERM.name()); // new 340 upStat.setString(2, Type.FIN.name()); // old 341 upStat.setString(3, archiveUrl); 342 upStat.setString(4, Type.TEMP.name()); // active 343 upStat.setString(5, archiveUrl); 344 upStat.setString(6, Type.TEMP.name()); // active 345 upStat.setString(7, archiveUrl); 346 347 if (upStat.executeUpdate() != 0) { // success 348 LOGGER.fine("Updated PERM marker for " + archiveUrl); 349 } 350 351 // remove stale markers 352 final String deleteStaleKey = statementKeyBase + "deleteStale"; 353 PreparedStatement delStat = db.statementCache().get(deleteStaleKey); 354 if (delStat == null) { 355 delStat = db.connection().prepareStatement( 356 "DELETE FROM " + TABLE + " WHERE archive_url = ?" 357 + " AND (type = ? or type = ?)" 358 + " AND sent_ts <" 359 + " (SELECT max(sent_ts) FROM " + TABLE 360 + " WHERE archive_url = ? AND type = ?)"); 361 db.statementCache().put(deleteStaleKey, delStat); 362 } 363 364 delStat.setString(1, archiveUrl); 365 // don't remove TEMP markers as they should not occur, if they 366 // do this is a bug 367 delStat.setString(2, Type.PERM.name()); 368 delStat.setString(3, Type.FIN.name()); 369 delStat.setString(4, archiveUrl); 370 delStat.setString(5, Type.PERM.name()); 371 delStat.executeUpdate(); 372 } 373 } 374 375 /** 376 * Remove all states of an archive. 377 * @param archiveUrl 378 * @return number of removed entries 379 * @throws SQLException 380 */ 381 public int removeArchive(final String archiveUrl) throws SQLException { 382 // remove an archive 383 final String deleteArchiveKey = statementKeyBase + "deleteArchive"; 384 synchronized (db) { 385 PreparedStatement delStat = db.statementCache().get(deleteArchiveKey); 386 if (delStat == null) { 387 delStat = db.connection().prepareStatement( 388 "DELETE FROM " + TABLE + " where archive_url = ?"); 389 db.statementCache().put(deleteArchiveKey, delStat); 390 } 391 392 delStat.setString(1, archiveUrl); 393 394 return delStat.executeUpdate(); 395 } 396 } 397 398}