001package votorola.a.count; // Copyright 2007-2013, Michael Allan. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Votorola Software"), to deal in the Votorola Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicence, and/or sell copies of the Votorola Software, and to permit persons to whom the Votorola Software is furnished to do so, subject to the following conditions: The preceding copyright notice and this permission notice shall be included in all copies or substantial portions of the Votorola Software. THE VOTOROLA SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE VOTOROLA SOFTWARE OR THE USE OR OTHER DEALINGS IN THE VOTOROLA SOFTWARE. 002 003import com.google.gson.stream.*; 004import com.sun.jersey.api.uri.*; 005import java.io.*; 006import java.net.*; 007import java.nio.charset.*; 008import java.sql.*; 009import java.text.*; 010import java.util.*; 011import java.util.logging.*; import votorola.g.logging.*; 012import java.util.regex.*; 013import javax.mail.internet.AddressException; 014import javax.script.*; 015import javax.xml.stream.*; 016import javax.xml.ws.Holder; 017import votorola.a.*; 018import votorola.a.position.*; 019import votorola.a.trust.*; 020import votorola.a.voter.*; 021import votorola.g.*; 022import votorola.g.hold.*; 023import votorola.g.io.*; 024import votorola.g.lang.*; 025import votorola.g.mail.*; 026import votorola.g.sql.*; 027import votorola.g.text.*; 028import votorola.g.xml.stream.*; 029import votorola.s.wap.PollspaceWAP; 030 031import static votorola.a.count.CountNode.DART_SECTOR_MAX; 032 033 034/** The path to a snap/readyCount record. It is the file part of the backing for a poll 035 * count. It is guaranteed to be in canonical form at the time of construction. 036 * 037 * @see <a href='../../../../../s/manual.xht#line-vocount'>snap/readyCount record</a> 038 * @see Count 039 */ 040public @ThreadSafe final class ReadyDirectory extends OutputStore.ReadyDirectory 041{ 042 043 // cf. a/trust/ReadyDirectory 044 045 private static final long serialVersionUID = 3L; 046 047 048 049 /** Creates a new ReadyDirectory. 050 * 051 * @param d the abstract pathname. It will be converted to canonical form if 052 * necessary. 053 * @param readyTracePathname the pathname of the ready directory of the trust 054 * trace to use. 055 * @see #pipeRecognizer() 056 */ 057 public static ReadyDirectory createReadyDirectory( final File d, 058 final String readyTracePathname, final PipeRecognizer pipeRecognizer ) throws IOException 059 { 060 FileX.writeObject( pipeRecognizer, new File(d,pipeRecognizerSerialName) ); 061 final ReadyDirectory r = new ReadyDirectory( d ); 062 FileX.symlink( readyTracePathname, r.readyTraceLink().getPath() ); 063 return r; 064 } 065 066 067 068 /** Constructs a ReadyDirectory from an abstract (File) pathname. 069 * 070 * @param _d the abstract pathname. It will be converted to canonical form if 071 * necessary. 072 * 073 * @throws FileNotFoundException if no directory exists at the specified pathname. 074 * @see #createReadyDirectory(File,String,PipeRecognizer) 075 */ 076 public ReadyDirectory( File _d ) throws IOException 077 { 078 super( _d ); 079 final File f = new File( ReadyDirectory.this, pipeRecognizerSerialName ); 080 if( f.exists() ) 081 { 082 PipeRecognizer p = null; 083 try{ p = (PipeRecognizer)FileX.readObject( f ); } 084 catch( final ClassNotFoundException x ) // class names have changed 085 { 086 logger.log( LoggerX.WARNING, /*message*/"defaulting to a null pipe recognizer, unable to deserialize from file: " + f, x ); 087 isInitWarned = true; 088 p = new PipeRecognizer0(); 089 } 090 pipeRecognizer = p; 091 } 092 else pipeRecognizer = new PipeRecognizer0(); 093 // directory apparently readied prior to addition of pipe support in May 2013 094 } 095 096 097 098 /** Constructs a ReadyDirectory from a string pathname. 099 * 100 * @param pathname the string pathname, per {@linkplain File#File(String) 101 * File}(pathname). It will be converted to canonical form if necessary. 102 * 103 * @throws FileNotFoundException if no directory exists at the specified pathname. 104 * @see #createReadyDirectory(File,String,PipeRecognizer) 105 */ 106 public ReadyDirectory( String pathname ) throws IOException { this( new File( pathname )); } 107 108 109 110 // ------------------------------------------------------------------------------------ 111 112 113 /** Answers whether any warnings were logged during the construction of this ready 114 * directory. 115 */ 116 public boolean isInitWarned() { return isInitWarned; } 117 118 119 private boolean isInitWarned; 120 121 122 123 /** Tallies the results and posts them to the database and filebase. 124 * 125 * @param networkTrace the previously mounted trace of the trust network. 126 * @param isVerbose if true, additional details are printed to standard output 127 * as the mount progresses. 128 * 129 * @return count of polls tallied. 130 * 131 * @see #isMounted() 132 * @see #unmount(VoteServer.Run) 133 */ 134 public int mount( final VoteServer.Run run, final NetworkTrace networkTrace, 135 final boolean isVerbose ) throws VotorolaException 136 { 137 // Currently the count depends directly on the trustserver's mounted trace, which 138 // it uses as a voter list. This works because the mounted trace is already an 139 // aggregate of the registration lists for all areas of the reference streetwiki. 140 // In future, we will interpose a 'volist' command that allows for aggregation 141 // from multiple remote trustservers (list lengthening), as well as merging from 142 // non-residential registration lists (widening). See 143 // http://reluk.ca/w/User:Mike-ZeleaCom/G/p/edor 144 145 final int nPruneTrigger = PollspaceWAP.MAX_RESPONSE_SIZE * 2; 146 final ArrayList<PollService> pollspace0List = new ArrayList<>( nPruneTrigger + /*spare*/1 ); 147 final Holder<Integer> pollCountH = new Holder<Integer>( 0 ); // for write access from inner class 148 final Database database = run.database(); 149 try 150 { 151 final Membership.Table membershipTable = new Membership.Table( 152 networkTrace.readyDirectory(), database ); 153 final TraceNodeW.Table traceNodeTable = new TraceNodeW.Table( 154 networkTrace.readyDirectory(), database ); 155 final CountTable countTable = new CountTable( ReadyDirectory.this, database ); 156 countTable.drop(); 157 countTable.create(); 158 mountedDirectory.mkdir(); 159 final URI inVoteDirectoryURI = inVoteDirectory.toURI(); 160 final Holder<VotorolaException> xH = new Holder<VotorolaException>(); // " 161 synchronized( database ){ database.connection().setAutoCommit( false ); } // bulk write 162 // otherwise too slow on Linux 3 kernels, which use stricter ext3 file synchronization 163 try 164 { 165 // Tally all polls. 166 // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 167 FileX.traverseDown( inVoteDirectory, new FileFilter() 168 { 169 public boolean accept( final File inVoteFile ) 170 { 171 if( !inVoteFile.isFile() ) return true; 172 173 final String suffix = ".xml"; 174 if( !inVoteFile.getName().endsWith( suffix )) return true; 175 176 final String pollName; 177 { 178 final URI inVoteFileURI = inVoteFile.toURI(); 179 final String relativePath = inVoteDirectoryURI.relativize( 180 inVoteFileURI ).getPath(); 181 pollName = relativePath.substring( 0, 182 relativePath.length() - suffix.length() ); 183 } 184 try 185 { 186 final PollService poll = run.scopePoll().ensurePoll( pollName ); 187 logger.fine( "counting votes for poll: " + pollName ); // after constructing, because it logs its own message 188 if( isVerbose ) 189 { 190 if( pollCountH.value > 0 ) System.out.print( ", " ); 191 System.out.print( pollName ); 192 } 193 final Count count = tally( poll, networkTrace, inVoteFile, countTable, 194 membershipTable, traceNodeTable, isVerbose ); 195 count.writeObjectToSerialFile(); 196 ++pollCountH.value; 197 pollspace0List.add( poll ); 198 if( pollspace0List.size() >= nPruneTrigger ) sortPrune( pollspace0List ); 199 return true; 200 } 201 catch( IOException|ParseException|ScriptException|SQLException|XMLStreamException x ) 202 { 203 xH.value = new VotorolaException( "Unable to count votes for poll: " 204 + pollName, x ); 205 return false; // escape from here and throw it 206 } 207 } 208 }); 209 if( xH.value != null ) throw xH.value; 210 211 // Create indices in count table (last, as it slows down table updates). 212 // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 213 countTable.createIndices(); 214 } 215 finally{ synchronized( database ){ database.connection().setAutoCommit( true ); }} 216 // does a commit, too 217 218 // Write the cache file for PollspaceWAP. (temporary hack, per pollspace0Cache) 219 // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 220 sortPrune( pollspace0List ); // sort at least is almost always needed 221 try 222 ( 223 final JsonWriter out = new JsonWriter( new BufferedWriter( new OutputStreamWriter( 224 new FileOutputStream(pollspace0Cache), StandardCharsets.UTF_8 ))); // see g.web GSONCLOSE 225 final JsonWriter outP = new JsonWriter( new BufferedWriter( new OutputStreamWriter( 226 new FileOutputStream(pollspace0PCache), StandardCharsets.UTF_8 )) ); // " 227 ){ 228 // outPJ.setIndent( votorola.a.web.wap.Responding.PRETTY_INDENT ); 229 /// gives NoClassDefFoundError ServletRequest, so just do this for now: 230 outP.setIndent( " " ); // otherwise it would be packed 231 out.beginArray(); outP.beginArray(); 232 for( int p = 0, pN = pollspace0List.size(); p < pN; ++p ) 233 { 234 final PollService poll = pollspace0List.get( p ); 235 out.beginObject(); outP.beginObject(); 236 String s; 237 s = poll.name(); 238 out.name( "name" ).value( s ); outP.name( "name" ).value( s ); 239 s = poll.displayTitle(); 240 if( s != null ) 241 { 242 out.name( "displayTitle" ).value( s ); 243 outP.name( "displayTitle" ).value( s ); 244 } 245 s = poll.issueType(); 246 out.name( "issueType" ).value( s ); outP.name( "issueType" ).value( s ); 247 out.endObject(); outP.endObject(); 248 } 249 out.endArray(); outP.endArray(); 250 } 251 } 252 catch( VotorolaException x ) { throw x; } 253 catch( IOException|SQLException x ) 254 { 255 throw new VotorolaException( "Unable to mount:" + ReadyDirectory.this, x ); 256 } 257 return pollCountH.value; 258 } 259 260 261 262 /** The directory to which the count is serialized when mounted. 263 */ 264 public File mountedDirectory() { return mountedDirectory; } 265 266 267 private final File mountedDirectory = new File( ReadyDirectory.this, "_mountedCount" ); 268 269 270 271 /** Constructs the formal file to which the count for the specified poll service is 272 * serialized when mounted. 273 */ 274 public File newSerialFile( final String serviceName ) 275 { 276 return new File( mountedDirectory(), serviceName + ".serial" ); 277 } 278 279 280 281 /** The pipe recognizer for this ready directory. 282 */ 283 public PipeRecognizer pipeRecognizer() { return pipeRecognizer; } 284 285 286 private final PipeRecognizer pipeRecognizer; 287 288 289 290 /** File to which JSON content for the first block of a {@linkplain PollspaceWAP 291 * PollspaceWAP} response is written, when the count is mounted. 292 */ 293 public File pollspace0Cache() { return pollspace0Cache; } 294 295 // These files are a temporary hack. PollspaceWAP will probably need a poll table 296 // later, and that table will be what the mount creates. Each row will be a 297 // compiled poll. It will be kept separate from the count summary (currently 298 // serialized as a file) because the two are conceptually distinct, and because 299 // polls might be compiled separately from counts in future, as trust traces are. 300 301 private final File pollspace0Cache = new File( mountedDirectory, "pollspace0.json" ); 302 303 304 305 /** File to which JSON content for the first block of a {@linkplain PollspaceWAP 306 * PollspaceWAP} response is written in prettified form, when the count is mounted. 307 */ 308 public File pollspace0PCache() { return pollspace0PCache; } 309 310 311 private final File pollspace0PCache = new File( mountedDirectory, "pollspace0P.json" ); 312 313 314 315 /** A file filter that accepts only apparent ready directories. 316 */ 317 public static final FileFilter READY_DIRECTORY_FILTER = new FileFilter() 318 { 319 public boolean accept( final File file ) 320 { 321 final String name = file.getName(); 322 return file.isDirectory() && name.startsWith( "readyCount-" ) 323 && OutputStore.isS( OutputStore.suffix( name )); 324 } 325 }; 326 327 328 329 /** The symbolic link to the ready directory of the trust network trace. 330 */ 331 public File readyTraceLink() { return readyTraceLink; } 332 333 334 private final File readyTraceLink = new File( ReadyDirectory.this, "readyTrace" ); 335 336 337 338 /** Reverses a previous mount, erasing the results from the database and filebase. 339 * 340 * @return true if the count was unmounted, false if it was not mounted to begin 341 * with, either in whole or part. 342 * 343 * @see #isMounted() 344 * @see #mount(VoteServer.Run,NetworkTrace,boolean) 345 */ 346 public boolean unmount( final VoteServer.Run run ) throws IOException, SQLException 347 { 348 boolean unmounted = false; // thus far 349 if( mountedDirectory.isDirectory() ) 350 { 351 FileX.deleteRecursiveSure( mountedDirectory ); 352 unmounted = true; 353 } 354 if( new CountTable(ReadyDirectory.this,run.database()).drop() ) unmounted = true; 355 return unmounted; 356 } 357 358 359 360 // - O u t p u t - S t o r e . R e a d y - D i r e c t o r y -------------------------- 361 362 363 /** Answers whether the count is nominally mounted. 364 */ 365 public boolean isMounted() { return mountedDirectory.isDirectory(); } 366 367 368 369//// P r i v a t e /////////////////////////////////////////////////////////////////////// 370 371 372 /** @param r the reader having just read the 'pos' start element. 373 */ 374 private void imagePos( final File mirrorSourceDirectory, final PollService poll, 375 final CountTable.PollView countTablePV, final XMLStreamReader r, 376 final SimpleDateFormat iso8601Formatter ) 377 throws ParseException, ScriptException, SQLException, XMLStreamException 378 { 379 final LocationI rLocPos = new LocationI( r.getLocation() ); // of position element in stream 380 final CountNodeW node = new CountNodeW( countTablePV ); 381 while( r.hasNext() ) 382 { 383 r.next(); 384 if( r.isEndElement() && "pos".equals( r.getLocalName() )) break; 385 386 if( r.isStartElement() ) 387 { 388 if( "doc".equals( r.getLocalName() )) 389 { 390 final String url = r.getAttributeValue( /*namespaceURI*/null, "url" ); 391 if( url != null ) node.setLocation( url ); 392 else logger.warning( "unlocated position doc, missing 'url' attribute: " + LocationX.toString(r.getLocation()) ); 393 } 394 else if( "subj".equals( r.getLocalName() )) 395 { 396 try 397 { 398 node.init( IDPair.fromEmail( InternetAddressX.newValidAddress( 399 r.getAttributeValue( /*namespaceURI*/null, "loc" ), 400 r.getAttributeValue( /*namespaceURI*/null, "dom" )))); 401 } 402 catch( AddressException x ) { logger.log( LoggerX.WARNING, /*message*/LocationX.toString(r.getLocation()), x ); } 403 } 404 else if( "vote".equals( r.getLocalName() )) inputVote( node, r, iso8601Formatter ); 405 } 406 } 407 final String email = node.email(); 408 if( email == null ) 409 { 410 logger.warning( "ignoring vote, unidentified subject: " + rLocPos ); 411 return; 412 } 413 414 final CountNodeW existingNode = countTablePV.get( email ); 415 if( existingNode != null ) 416 { 417 if( node.getTime() < existingNode.getTime() ) return; // image is stale 418 419 node.setDartSector( existingNode.dartSector() ); // retain voter's dart sector 420 } 421 node.setSource( mirrorSourceDirectory.getName() ); 422 if( node.isBarrable() ) 423 { 424 node.setBar( (String)poll.configurationScript().invokeKnownFunction( 425 "voterBarUnregistered", node.person() )); // till proven otherwise, in next pass 426 } 427 node.write(); 428 } 429 430 431 432 /** @param r the reader having just read the 'vote' start element. 433 */ 434 private void inputVote( final CountNodeW node, final XMLStreamReader r, 435 final SimpleDateFormat iso8601Formatter ) throws ParseException, XMLStreamException 436 { 437 final String tISO = r.getAttributeValue( /*namespaceURI*/null, "t" ); 438 if( tISO != null ) 439 { 440 final long time = iso8601Formatter.parse( 441 SimpleDateFormatX.simplifiedISO8601( tISO )).getTime(); 442 if( time <= System.currentTimeMillis() ) node.setTime( time ); 443 else logger.warning( "ignoring 't' attribute in future: " + LocationX.toString(r.getLocation()) ); 444 } 445 else logger.warning( "vote likely to be ignored, missing 't' attribute: " + LocationX.toString(r.getLocation()) ); 446 final String dS = r.getAttributeValue( /*namespaceURI*/null, "dS" ); 447 if( dS != null ) node.setDartSector( Integer.parseInt( dS )); 448 while( r.hasNext() ) 449 { 450 r.next(); 451 if( r.isEndElement() && "vote".equals( r.getLocalName() )) break; 452 453 if( r.isStartElement() && "obj".equals( r.getLocalName() )) 454 { 455 try 456 { 457 node.setCandidateEmail( InternetAddressX.newValidAddress( 458 r.getAttributeValue( /*namespaceURI*/null, "loc" ), 459 r.getAttributeValue( /*namespaceURI*/null, "dom" ))); 460 } 461 catch( AddressException x ) { logger.log( LoggerX.WARNING, /*message*/LocationX.toString(r.getLocation()), x ); } 462 } 463 } 464 return; 465 } 466 467 468 469 private final File inVoteDirectory = new File( snapDirectory(), "_in_vote" ); 470 471 472 473 private static final Logger logger = LoggerX.i( ReadyDirectory.class ); 474 475 476 477 /** @param r the reader having just read the 'pos' start element. 478 */ 479 private void originatePos( final PollService poll, final CountTable.PollView countTablePV, 480 final XMLStreamReader r, final SimpleDateFormat iso8601Formatter ) 481 throws ParseException, ScriptException, SQLException, XMLStreamException 482 { 483 final LocationI rLocPos = new LocationI( r.getLocation() ); // of position element in stream 484 final CountNodeW node = new CountNodeW( countTablePV ); 485 while( r.hasNext() ) 486 { 487 r.next(); 488 if( r.isEndElement() && "pos".equals( r.getLocalName() )) break; 489 490 if( r.isStartElement() ) 491 { 492 if( "subj".equals( r.getLocalName() )) 493 { 494 try 495 { 496 node.init( IDPair.fromEmail( InternetAddressX.newValidAddress( 497 r.getAttributeValue( /*namespaceURI*/null, "loc" ), 498 r.getAttributeValue( /*namespaceURI*/null, "dom" )))); 499 } 500 catch( final AddressException x ) 501 { 502 logger.log( LoggerX.WARNING, /*message*/LocationX.toString(r.getLocation()), x ); 503 } 504 } 505 else if( "vote".equals( r.getLocalName() )) inputVote( node, r, iso8601Formatter ); 506 } 507 } 508 final IDPair person = node.person(); 509 if( person == null ) 510 { 511 logger.warning( "ignoring vote, unidentified subject: " + rLocPos ); 512 return; 513 } 514 515 if( node.isBarrable() ) 516 { 517 node.setBar( (String)poll.configurationScript().invokeKnownFunction( 518 "voterBarUnregistered", person )); // till proven otherwise in next pass 519 } 520 node.write(); 521 } 522 523 524 525 private static final String pipeRecognizerSerialName = "pipeRecognizer.serial"; 526 527 528 529 /** Sorts the list and removes any trailing elements over MAX_RESPONSE_SIZE. 530 */ 531 private static void sortPrune( final ArrayList<PollService> pollspace0List ) 532 { 533 Collections.sort( pollspace0List ); 534 int p = pollspace0List.size(); 535 if( p <= PollspaceWAP.MAX_RESPONSE_SIZE ) return; 536 537 // pollspace0List.removeRange( PollspaceWAP.MAX_RESPONSE_SIZE, p ); 538 /// has protected access for some reason, so do it the hard way: 539 --p; 540 for( ;; ) 541 { 542 pollspace0List.remove( p ); 543 --p; 544 if( p < PollspaceWAP.MAX_RESPONSE_SIZE ) break; 545 } 546 assert pollspace0List.size() == PollspaceWAP.MAX_RESPONSE_SIZE; 547 } 548 549 550 551 private Count tally( final PollService poll, final NetworkTrace networkTrace, 552 final File inVoteFile, final CountTable countTable, final Membership.Table membershipTable, 553 final TraceNodeW.Table traceNodeTable, final boolean isVerbose ) 554 throws IOException, ParseException, ScriptException, SQLException, XMLStreamException 555 { 556 final String pollName = poll.name(); 557 final CountTable.PollView countTablePV = countTable.new PollView( pollName ); 558 final SimpleDateFormat iso8601Formatter = new SimpleDateFormat( 559 SimpleDateFormatX.ISO_8601_PATTERN ); 560 561 // Pass 1. Read snapshot of voter input and create one node per voter. 562 // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 563 if( isVerbose ) System.out.print( " 1" ); 564 // This pass pre-creates subject nodes (voters), thus allowing duplicate image 565 // votes to be reconciled in pass 2. 566 567 // It also pre-creates intermediate object nodes (delegates), thus allowing 568 // their bars to be recorded in pass 3 prior to casting in pass 4. The bars are 569 // here initialized to (configurationScript) barUnregistered. 570 571 // Finally, pre-creation enables end-to-end cascading of single votes in pass 4. 572 // End-to-end cascading is simpler (and more robust) than multiple partial 573 // cascades. The latter would involve carries that are complicated by cycles. 574 // See CountNodeW.castCyclic(trace). 575 { 576 final InputStream in = new BufferedInputStream( new FileInputStream( inVoteFile )); 577 // one stream for each read through, as using mark/reset on a single one 578 // would defeat buffering 579 try 580 { 581 final XMLStreamReader r = XMLColumnAppender.newStreamReader( 582 /*systemId*/inVoteFile.toString(), in ); 583 try 584 { 585 while( r.hasNext() ) 586 { 587 r.next(); 588 if( r.isStartElement() ) 589 { 590 if( "in".equals( r.getLocalName() )) verifyServiceName( poll, r ); 591 else if( "pos".equals( r.getLocalName() )) 592 { 593 originatePos( poll, countTablePV, r, iso8601Formatter ); 594 } 595 } 596 } 597 } 598 finally{ r.close(); } 599 } 600 catch( final XMLStreamException x ) { throw new RuntimeException( x ); } 601 finally{ in.close(); } 602 } 603 604 // Pass 2. Image votes from external engines. 605 // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 606 if( isVerbose ) System.out.print( "2" ); 607 final VoteServer vS = poll.vsRun().voteServer(); 608 { 609 for( final File mirrorSourceDirectory: 610 InputStore.U.mirrorSourceDirectories( vS )) 611 { 612 final File file = new File( mirrorSourceDirectory, // ought to be stored in output snap directory, for later verification (not yet implemented) 613 "_snap_current" + File.separator + pollName + ".xml" ).getCanonicalFile(); 614 if( !file.isFile() ) continue; 615 616 final InputStream in = new BufferedInputStream( new FileInputStream( file )); 617 try 618 { 619 final XMLStreamReader r = XMLColumnAppender.newStreamReader( 620 /*systemId*/file.toString(), in ); 621 try 622 { 623 while( r.hasNext() ) 624 { 625 r.next(); 626 if( r.isStartElement() ) 627 { 628 if( "in".equals( r.getLocalName() )) verifyServiceName( poll, r ); 629 else if( "pos".equals( r.getLocalName() )) 630 { 631 imagePos( mirrorSourceDirectory, poll, countTablePV, r, 632 iso8601Formatter ); 633 } 634 } 635 } 636 } 637 finally{ r.close(); } 638 } 639 catch( final XMLStreamException x ) { throw new RuntimeException( x ); } 640 finally{ in.close(); } 641 } 642 } 643 644 // Pass 3. Read reported trace of network and mark eligible casters. 645 // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 646 if( isVerbose ) System.out.print( "3" ); 647 // This pass will be slow for large trust networks, especially where poll 648 // turnout is low. OPT Speed it by traversing the poll instead of the network. 649 traceNodeTable.run( new TraceNodeW.Runner() 650 { 651 public void run( final TraceNodeW node ) 652 { 653 try 654 { 655 final IDPair registrant = node.registrant(); 656 if( !CountNodeW.isBarrable( registrant.username(), ReadyDirectory.this )) return; 657 // though it's unlikely a non-barrable would register 658 659 final CountNodeW countNode = countTablePV.get( registrant.email() ); 660 if( countNode == null ) return; // registrant has no input to this poll 661 662 if( !countNode.isVoter() ) 663 { 664 countNode.setBar( (String)poll.configurationScript().invokeKnownFunction( 665 "voterBarUnknown", registrant )); /* not voting, so save time by 666 bypassing the bar test */ 667 countNode.write(); 668 return; 669 } 670 671 // Poll eligibility bar? 672 // ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` 673 final Set<String> divisions = Collections.unmodifiableSet( 674 membershipTable.divisionSet( node.registrant().email() )); 675 final VoteCastingContext vCC = new VoteCastingContext( /*isRealCount*/true, 676 node, divisions ); 677 poll.configurationScript().invokeKnownFunction( "castingVote", vCC ); 678 countNode.setBar( vCC.getBar() ); 679 countNode.write(); 680 } 681 catch( final Exception x ) { throw VotorolaRuntimeException.castOrWrapped( x ); } 682 } 683 }); 684 685 // Pass 4. For each eligible caster, trace the route and effect the cast. 686 // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 687 if( isVerbose ) System.out.print( "4" ); 688 // after this pass, all participating nodes will have been created 689 synchronized( countTable.database() ) 690 { 691 final ResultSet r = countTablePV.getByIndeces(); 692 try 693 { 694 final CountTablePVC countTablePVC = new CountTablePVC( countTable, pollName ); 695 while( r.next() ) 696 { 697 countTablePVC.skimFlush(); 698 final String voterEmail = r.getString( 1 ); 699 final CountNodeW node = countTablePVC.getOrCreate( voterEmail ); 700 node.castSolo(); 701 } 702 countTablePVC.writeAll(); 703 } 704 finally{ r.close(); } 705 } 706 707 // Pass 5. Assign dart sectors. 708 // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 709 if( isVerbose ) System.out.print( "5" ); 710 final Random randomizer = new Random(); 711 { 712 // assign dart sectors to base candidates 713 // ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` 714 final NodeThrower nT = new NodeThrower( poll, randomizer ); 715 countTablePV.run( CountTable.BASE_CANDIDATE_TAIL + " " 716 + CountNodeW.RECEIVE_COUNT_ORDER_CLAUSE, nT ); 717 nT.writeDartBoard(); 718 } 719 { 720 // assign dart sectors to voters 721 // ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` 722 final class Thrower extends NodeThrower 723 { 724 Thrower() { super( poll, randomizer ); } 725 @Override boolean wasSectoredAsBase( final CountNodeW cN ) 726 { 727 return cN.isBaseCandidate(); // whether already sectored above 728 } 729 } 730 final class Runner implements CountNodeW.Runner 731 { 732 String candidateEmailLast; 733 Thrower nT; 734 void flush() 735 { 736 if( nT == null ) return; 737 738 try{ nT.writeDartBoard(); } 739 catch( final SQLException x ) { throw new RuntimeException( x ); } 740 } 741 public void run( final CountNodeW node ) 742 { 743 final String candidateEmail = node.getCandidateEmail(); 744 if( !candidateEmail.equals( candidateEmailLast )) 745 { 746 flush(); 747 nT = new Thrower(); 748 candidateEmailLast = candidateEmail; 749 } 750 nT.run( node ); 751 } 752 } 753 final Runner r = new Runner(); // adding email for stability: 754 countTablePV.run( "AND isCast ORDER BY candidateEmail, carryVolume DESC, email", r ); 755 r.flush(); 756 } 757 758 // Pass 6. Transfer position properties from pollwiki to nodes. 759 // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 760 if( isVerbose ) System.out.print( "6" ); 761 // This pass does not contribute to the tally proper, and therefore does not 762 // violate its repeatability. It merely decorates the nodes with non-tally 763 // properties, effectively using the nodes as a cache onto the wiki. 764 pass6: 765 { 766 final StringBuilder qB = new StringBuilder(); 767 qB.append( vS.pollwiki().scriptURI() ); 768 qB.append( "/api.php?action=ask&query=" ); // http://semantic-mediawiki.org/wiki/Ask_API 769 qB.append( UriComponent.encode( "[[Category:Position]][[Display title::+]][[Poll::", 770 UriComponent.Type.QUERY_PARAM )); 771 qB.append( pollName ); 772 final int limit = 500; 773 qB.append( UriComponent.encode( "]]|?Display title|limit=", 774 UriComponent.Type.QUERY_PARAM )); 775 qB.append( limit ); 776 qB.append( "&format=json" ); // JSON as XML element names are malformed (SMW 1.7.1) 777 final URL queryURL = new URL( qB.toString() ); 778 logger.fine( "querying pollwiki for position properties: " + queryURL ); 779 final Spool spool = new Spool1(); 780 try 781 { 782 final JsonReader in = MediaWiki.requestJSON( queryURL.openConnection(), spool ); 783 if( in.peek() != JsonToken.BEGIN_OBJECT ) break pass6; 784 785 in.beginObject(); 786 if( in.peek() != JsonToken.NAME || !in.nextName().equals("query") 787 || in.peek() != JsonToken.BEGIN_OBJECT ) break pass6; 788 789 in.beginObject(); 790 for( ;; ) // find the results 791 { 792 if( in.peek() != JsonToken.NAME ) break pass6; 793 794 if( in.nextName().equals( "results" )) break; 795 796 in.skipValue(); 797 } 798 if( in.peek() != JsonToken.BEGIN_OBJECT ) break pass6; 799 800 in.beginObject(); 801 results: while( in.peek() != JsonToken.END_OBJECT ) 802 { 803 if( in.peek() != JsonToken.NAME ) 804 { 805 in.skipValue(); 806 continue; 807 } 808 809 final String pageName = in.nextName(); 810 if( in.peek() != JsonToken.BEGIN_OBJECT ) break results; 811 812 in.beginObject(); 813 page: while( in.peek() != JsonToken.END_OBJECT ) 814 { 815 if( in.peek() != JsonToken.NAME ) 816 { 817 in.skipValue(); 818 continue; 819 } 820 821 String n = in.nextName(); 822 if( "printouts".equals(n) && in.peek() == JsonToken.BEGIN_OBJECT ) 823 { 824 in.beginObject(); 825 printouts: while( in.peek() != JsonToken.END_OBJECT ) 826 { 827 if( in.peek() != JsonToken.NAME ) 828 { 829 in.skipValue(); 830 continue; 831 } 832 833 n = in.nextName(); 834 if( "Display title".equals(n) 835 && in.peek() == JsonToken.BEGIN_ARRAY ) 836 { 837 in.beginArray(); 838 if( in.peek() == JsonToken.STRING ) 839 { 840 final String displayTitle = in.nextString(); 841 final MatchResult m = MediaWiki.parsePageNameS( pageName ); 842 if( m != null ) 843 { 844 CountNodeW node = null; 845 try 846 { 847 node = countTablePV.get( 848 IDPair.toInternetAddress(m.group(2)).getAddress() ); 849 } 850 catch( final AddressException x ) 851 { 852 logger.config( "skipping position page with malformed username: " + pageName ); 853 } 854 if( node != null ) // if a participant in this poll 855 { 856 node.setDisplayTitle( displayTitle ); 857 node.write(); 858 } 859 } 860 } 861 while( in.peek() != JsonToken.END_ARRAY ) in.skipValue(); 862 // just the one value expected, ignore any others 863 in.endArray(); 864 } 865 else in.skipValue(); 866 } 867 in.endObject(); // printouts 868 } 869 else in.skipValue(); 870 } 871 in.endObject(); // page 872 } 873 in.endObject(); // results 874 while( in.peek() != JsonToken.END_OBJECT ) in.skipValue(); 875 in.endObject(); // query 876 while( in.peek() != JsonToken.END_OBJECT ) 877 { 878 if( in.peek() == JsonToken.NAME ) 879 { 880 if( in.nextName().startsWith( "query-continue" )) 881 { 882 // expect "query-continue" if continuation not supported 883 // (e.g. SMW 1.7.1), or "query-continue-offset" if it is 884 // supported (1.8+ I think, which we don't have yet) 885 logger.warning( "continuation code needed, exceeded SMW query limit: " + limit ); 886 break; 887 } 888 } 889 else in.skipValue(); 890 } 891 } 892 finally{ spool.unwind(); } 893 } 894 895 // Pass 7. Rank candidates and construct the formal count. 896 // - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 897 if( isVerbose ) System.out.print( "7" ); 898 final class Finalizer implements CountNodeW.Runner 899 { 900 long baseCandidateCount; 901 long candidateCount; 902 long holdVolume; 903 long rankCount; 904 long castVolume; 905 906 private long n; // node count 907 private long receiveVolumeLast = Long.MAX_VALUE; 908 private long rank; 909 910 public void run( final CountNodeW node ) 911 { 912 // rank candidates 913 // ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` 914 if( node.receiveVolume() < receiveVolumeLast ) 915 { 916 ++rankCount; 917 rank = n + 1; 918 assert !(n == 0L && rank != 1L): "top rank is 1"; 919 receiveVolumeLast = node.receiveVolume(); 920 } 921 else assert node.receiveVolume() == receiveVolumeLast: "nodes are sorted"; 922 node.setRank( rank ); 923 node.setRankIndex( n ); 924 925 // accumulate other count data 926 // ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` ` 927 if( node.isCandidate() ) 928 { 929 ++candidateCount; 930 if( node.isCycler() || !node.isCast() ) 931 { 932 assert node.isBaseCandidate(); 933 ++baseCandidateCount; 934 } 935 } 936 castVolume += node.castVolume(); 937 holdVolume += node.holdVolume(); 938 939 // - - - 940 ++n; 941 try{ node.write(); } 942 catch( final SQLException x ) { throw new RuntimeException( x ); } 943 } 944 }; 945 946 final Finalizer f = new Finalizer(); 947 countTablePV.run( CountNodeW.RECEIVE_COUNT_ORDER_CLAUSE, f ); 948 assert f.castVolume == f.holdVolume; 949 return new Count( pollName, ReadyDirectory.this, f.baseCandidateCount, f.candidateCount, 950 f.castVolume, f.rankCount ); 951 } 952 953 954 955 /** @param r the reader having just read the 'in' start element. 956 */ 957 private void verifyServiceName( final PollService poll, final XMLStreamReader r ) 958 throws XMLStreamException 959 { 960 final String serviceName = r.getAttributeValue( /*namespaceURI*/null, "serviceName" ); 961 if( !poll.name().equals( serviceName )) 962 { 963 throw new VotorolaRuntimeException( "wrong service name '" + serviceName + "' at " 964 + LocationX.toString(r.getLocation()) ); 965 } 966 } 967 968 969 970 // ==================================================================================== 971 972 973 /** A cached view of a count table, restricted to a particular poll. 974 */ 975 private static @ThreadRestricted final class CountTablePVC extends CountTable.PollView 976 implements CacheControlledTable 977 { 978 979 // cf. CR_Vote.CountTablePVC 980 981 982 private CountTablePVC( CountTable t, String serviceName ) throws IOException 983 { 984 t.super( serviceName ); 985 } 986 987 988 private final LinkedHashMap<String,CountNodeW> cache; 989 { 990 final float loadFactor = 0.75f; 991 final int maximumOvergrow = 100; // in capacity, guess 992 cache = new LinkedHashMap<String,CountNodeW>( 993 /*initial capacity*/(int)((skimmedSize + maximumOvergrow) / loadFactor), 994 loadFactor, /*accessOrder*/true ); 995 } 996 997 998 private static final int skimmedSize = 1000; // to which it returns after skimming 999 1000 1001 // -------------------------------------------------------------------------------- 1002 1003 1004 public @Override CountNodeW get( final String voterEmail ) 1005 throws SQLException, XMLStreamException 1006 { 1007 CountNodeW node = cache.get( voterEmail ); 1008 // if( node == null && !cache.containsKey( voterEmail )) 1009 //// no need of null values 1010 if( node == null ) 1011 { 1012 node = super.get( voterEmail ); 1013 if( node != null ) cache.put( voterEmail, node ); 1014 } 1015 return node; 1016 } 1017 1018 1019 public @Override CountNodeW getOrCreate( final String voterEmail ) 1020 throws SQLException, XMLStreamException 1021 { 1022 final CountNodeW node = super.getOrCreate( voterEmail ); 1023 if( node instanceof CountNodeIC ) cache.put( voterEmail, node ); 1024 return node; 1025 } 1026 1027 1028 // - C a c h e - C o n t r o l l e d - T a b l e ---------------------------------- 1029 1030 1031 public void skimFlush() throws SQLException 1032 { 1033 if( cache.size() <= skimmedSize ) return; 1034 1035 final Iterator<CountNodeW> nodeIterator = cache.values().iterator(); 1036 for( ;; ) 1037 { 1038 CountNodeW node = nodeIterator.next(); 1039 if( node != null ) node.write(); // null guard probably redundant 1040 1041 nodeIterator.remove(); 1042 if( cache.size() <= skimmedSize ) break; 1043 } 1044 } 1045 1046 1047 public void writeAll() throws SQLException 1048 { 1049 for( CountNodeW node: cache.values() ) 1050 { 1051 if( node != null ) node.write(); // null guard probably redundant 1052 } 1053 } 1054 1055 } 1056 1057 1058 1059 // ==================================================================================== 1060 1061 1062 private static class NodeThrower implements CountNodeW.Runner 1063 { 1064 1065 NodeThrower( PollService _poll, Random _randomizer ) 1066 { 1067 poll = _poll; 1068 randomizer = _randomizer; 1069 } 1070 1071 1072 private final CountNodeW[] dartBoard = new CountNodeW[DART_SECTOR_MAX]; 1073 1074 1075 private int nodeCount; 1076 1077 1078 private final PollService poll; 1079 1080 1081 private final void set( final CountNodeW node, final int newSector ) throws SQLException 1082 { 1083 node.setDartSector( newSector ); 1084 node.write(); 1085 1086 final Vote vote = new Vote( node.person(), poll.voterInputTable() ); 1087 vote.setDartSector( newSector ); // persist 1088 try{ vote.write( poll.voterInputTable(), /*userSession*/null, /*toForce*/true ); } 1089 catch( final VoterInputTable.BadInputException x ) { throw new RuntimeException( x ); } 1090 } 1091 1092 1093 private final Random randomizer; 1094 1095 1096 // -------------------------------------------------------------------------------- 1097 1098 1099 /** Answers whether the node was already sectored as a base candidate. 1100 */ 1101 boolean wasSectoredAsBase( final CountNodeW node ) { return false; } 1102 1103 1104 final void writeDartBoard() throws SQLException 1105 { 1106 for( int s = 0; s < DART_SECTOR_MAX; ++s ) 1107 { 1108 final CountNodeW node = dartBoard[s]; 1109 if( node == null ) continue; 1110 1111 final int sector = s + 1; 1112 if( node.dartSector() == sector ) continue; 1113 1114 set( node, sector ); 1115 } 1116 } 1117 1118 1119 // - C o u n t - N o d e . R u n n e r -------------------------------------------- 1120 1121 1122 public final void run( final CountNodeW newNode ) 1123 { 1124 final int sector = newNode.dartSector(); // 0, 1..20 1125 1126 if( nodeCount >= DART_SECTOR_MAX ) // board is full 1127 { 1128 if( sector != 0 ) 1129 { 1130 if( wasSectoredAsBase( newNode )) throw new IllegalStateException(); 1131 // if in top 20 on end board, then in top 20 on all upstream boards 1132 try 1133 { 1134 set( newNode, 0 ); 1135 } 1136 catch( final Exception x ) { throw VotorolaRuntimeException.castOrWrapped( x ); } 1137 } 1138 } 1139 else // throw onto board 1140 { 1141 int s; // 0..19 1142 if( sector == 0 ) 1143 { 1144 if( wasSectoredAsBase( newNode )) return; 1145 // per CountNodeW.dartSector(), a cycler who does not 1146 // fit on end board will appear nowhere 1147 1148 s = randomizer.nextInt( DART_SECTOR_MAX ); 1149 } 1150 else s = sector - 1; 1151 1152 CountNodeW oldNode = dartBoard[s]; 1153 if( oldNode == null ) dartBoard[s] = newNode; 1154 else // collision, one of the nodes must be displaced 1155 { 1156 final CountNodeW nodeToDisplace; 1157 if( wasSectoredAsBase( newNode )) 1158 { 1159 assert !wasSectoredAsBase( oldNode ); // cyclers cannot be co-voters 1160 nodeToDisplace = oldNode; 1161 } 1162 else if( wasSectoredAsBase( oldNode )) 1163 { 1164 assert !wasSectoredAsBase( newNode ); // " 1165 nodeToDisplace = newNode; 1166 } 1167 else if( newNode.getTime() > oldNode.getTime() ) nodeToDisplace = newNode; 1168 else 1169 { 1170 dartBoard[s] = newNode; 1171 nodeToDisplace = oldNode; 1172 } 1173 s = randomizer.nextInt( DART_SECTOR_MAX ); 1174 while( dartBoard[s] != null ) // find next open sector 1175 { 1176 ++s; 1177 if( s == DART_SECTOR_MAX ) s = 0; 1178 } 1179 dartBoard[s] = nodeToDisplace; 1180 } 1181 } 1182 ++nodeCount; 1183 } 1184 1185 } 1186 1187 1188}