Talk about queryTimeout and next methods of pg jdbc.

  jdbc

Order

This article mainly introduces queryTimeout of pg jdbc statement and next method of resultSet.

Instance program

    @Test
    public void testReadTimeout() throws SQLException {
        Connection connection = dataSource.getConnection();
        //https://jdbc.postgresql.org/documentation/head/query.html
        connection.setAutoCommit(false); //NOTE 为了设置fetchSize,必须设置为false

        String sql = "select * from demo_table";
        PreparedStatement pstmt;
        try {
            pstmt = (PreparedStatement)connection.prepareStatement(sql);
            pstmt.setQueryTimeout(1); //NOTE 设置Statement执行完成的超时时间,前提是socket的timeout比这个大
            pstmt.setFetchSize(5000); //NOTE 这样设置为了模拟query timeout的异常
            System.out.println("ps.getQueryTimeout():" + pstmt.getQueryTimeout());
            System.out.println("ps.getFetchSize():" + pstmt.getFetchSize());
            System.out.println("ps.getFetchDirection():" + pstmt.getFetchDirection());
            System.out.println("ps.getMaxFieldSize():" + pstmt.getMaxFieldSize());

            ResultSet rs = pstmt.executeQuery(); //NOTE 设置Statement执行完成的超时时间,前提是socket的timeout比这个大
            //NOTE 这里返回了就代表statement执行完成,默认返回fetchSize的数据
            int col = rs.getMetaData().getColumnCount();
            System.out.println("============================");
            while (rs.next()) { //NOTE 这个的timeout由socket的超时时间设置,oracle.jdbc.ReadTimeout=60000
                for (int i = 1; i <= col; i++) {
                    System.out.print(rs.getObject(i));
                }
                System.out.println("");
            }
            System.out.println("============================");
        } catch (SQLException e) {
            e.printStackTrace();
        } finally {
            //close resources
        }
    }

PgStatement

ostgresql-9.4.1212.jre7-sources.jar! /org/postgresql/jdbc/PgStatement.java

executeInternal()

private void executeInternal(CachedQuery cachedQuery, ParameterList queryParameters, int flags)
      throws SQLException {
    closeForNextExecution();

    // Enable cursor-based resultset if possible.
    if (fetchSize > 0 && !wantsScrollableResultSet() && !connection.getAutoCommit()
        && !wantsHoldableResultSet()) {
      flags |= QueryExecutor.QUERY_FORWARD_CURSOR;
    }

    if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {
      flags |= QueryExecutor.QUERY_BOTH_ROWS_AND_STATUS;

      // If the no results flag is set (from executeUpdate)
      // clear it so we get the generated keys results.
      //
      if ((flags & QueryExecutor.QUERY_NO_RESULTS) != 0) {
        flags &= ~(QueryExecutor.QUERY_NO_RESULTS);
      }
    }

    if (isOneShotQuery(cachedQuery)) {
      flags |= QueryExecutor.QUERY_ONESHOT;
    }
    // Only use named statements after we hit the threshold. Note that only
    // named statements can be transferred in binary format.

    if (connection.getAutoCommit()) {
      flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
    }

    // updateable result sets do not yet support binary updates
    if (concurrency != ResultSet.CONCUR_READ_ONLY) {
      flags |= QueryExecutor.QUERY_NO_BINARY_TRANSFER;
    }

    Query queryToExecute = cachedQuery.query;

    if (queryToExecute.isEmpty()) {
      flags |= QueryExecutor.QUERY_SUPPRESS_BEGIN;
    }

    if (!queryToExecute.isStatementDescribed() && forceBinaryTransfers
        && (flags & QueryExecutor.QUERY_EXECUTE_AS_SIMPLE) == 0) {
      // Simple 'Q' execution does not need to know parameter types
      // When binaryTransfer is forced, then we need to know resulting parameter and column types,
      // thus sending a describe request.
      int flags2 = flags | QueryExecutor.QUERY_DESCRIBE_ONLY;
      StatementResultHandler handler2 = new StatementResultHandler();
      connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler2, 0, 0,
          flags2);
      ResultWrapper result2 = handler2.getResults();
      if (result2 != null) {
        result2.getResultSet().close();
      }
    }

    StatementResultHandler handler = new StatementResultHandler();
    result = null;
    try {
      startTimer();
      connection.getQueryExecutor().execute(queryToExecute, queryParameters, handler, maxrows,
          fetchSize, flags);
    } finally {
      killTimerTask();
    }
    result = firstUnclosedResult = handler.getResults();

    if (wantsGeneratedKeysOnce || wantsGeneratedKeysAlways) {
      generatedKeys = result;
      result = result.getNext();

      if (wantsGeneratedKeysOnce) {
        wantsGeneratedKeysOnce = false;
      }
    }

  }

Note that startTimer () and killTimerTask () are called before and after execution respectively.

startTimer()

private void startTimer() {
    /*
     * there shouldn't be any previous timer active, but better safe than sorry.
     */
    cleanupTimer();

    STATE_UPDATER.set(this, StatementCancelState.IN_QUERY);

    if (timeout == 0) {
      return;
    }

    TimerTask cancelTask = new TimerTask() {
      public void run() {
        try {
          if (!CANCEL_TIMER_UPDATER.compareAndSet(PgStatement.this, this, null)) {
            // Nothing to do here, statement has already finished and cleared
            // cancelTimerTask reference
            return;
          }
          PgStatement.this.cancel();
        } catch (SQLException e) {
        }
      }
    };

    CANCEL_TIMER_UPDATER.set(this, cancelTask);
    connection.addTimerTask(cancelTask, timeout);
  }
  • StartTimer called cleanupTimer ()
  • CancelTask calls PgStatement.this.cancel ()
  • Finally, call connection.addTimerTask to add a timed task.

cleanupTimer()

/**
   * Clears {@link #cancelTimerTask} if any. Returns true if and only if "cancel" timer task would
   * never invoke {@link #cancel()}.
   */
  private boolean cleanupTimer() {
    TimerTask timerTask = CANCEL_TIMER_UPDATER.get(this);
    if (timerTask == null) {
      // If timeout is zero, then timer task did not exist, so we safely report "all clear"
      return timeout == 0;
    }
    if (!CANCEL_TIMER_UPDATER.compareAndSet(this, timerTask, null)) {
      // Failed to update reference -> timer has just fired, so we must wait for the query state to
      // become "cancelling".
      return false;
    }
    timerTask.cancel();
    connection.purgeTimerTasks();
    // All clear
    return true;
  }

Note that after updating the statement status here, call cancel of task and connection.purgeTimerTasks ()

cancel()

public void cancel() throws SQLException {
    if (!STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.CANCELING)) {
      // Not in query, there's nothing to cancel
      return;
    }
    try {
      // Synchronize on connection to avoid spinning in killTimerTask
      synchronized (connection) {
        connection.cancelQuery();
      }
    } finally {
      STATE_UPDATER.set(this, StatementCancelState.CANCELLED);
      synchronized (connection) {
        connection.notifyAll(); // wake-up killTimerTask
      }
    }
  }

Cancelquery ()

  public void cancelQuery() throws SQLException {
    checkClosed();
    queryExecutor.sendQueryCancel();
  }

postgresql-9.4.1212.jre7-sources.jar! /org/postgresql/core/QueryExecutorBase.java

public void sendQueryCancel() throws SQLException {
    if (cancelPid <= 0) {
      return;
    }

    PGStream cancelStream = null;

    // Now we need to construct and send a cancel packet
    try {
      if (logger.logDebug()) {
        logger.debug(" FE=> CancelRequest(pid=" + cancelPid + ",ckey=" + cancelKey + ")");
      }

      cancelStream =
          new PGStream(pgStream.getSocketFactory(), pgStream.getHostSpec(), cancelSignalTimeout);
      if (cancelSignalTimeout > 0) {
        cancelStream.getSocket().setSoTimeout(cancelSignalTimeout);
      }
      cancelStream.sendInteger4(16);
      cancelStream.sendInteger2(1234);
      cancelStream.sendInteger2(5678);
      cancelStream.sendInteger4(cancelPid);
      cancelStream.sendInteger4(cancelKey);
      cancelStream.flush();
      cancelStream.receiveEOF();
    } catch (IOException e) {
      // Safe to ignore.
      if (logger.logDebug()) {
        logger.debug("Ignoring exception on cancel request:", e);
      }
    } finally {
      if (cancelStream != null) {
        try {
          cancelStream.close();
        } catch (IOException e) {
          // Ignored.
        }
      }
    }
  }

Send cancel command to database server

killTimerTask()

private void killTimerTask() {
    boolean timerTaskIsClear = cleanupTimer();
    // The order is important here: in case we need to wait for the cancel task, the state must be
    // kept StatementCancelState.IN_QUERY, so cancelTask would be able to cancel the query.
    // It is believed that this case is very rare, so "additional cancel and wait below" would not
    // harm it.
    if (timerTaskIsClear && STATE_UPDATER.compareAndSet(this, StatementCancelState.IN_QUERY, StatementCancelState.IDLE)) {
      return;
    }

    // Being here means someone managed to call .cancel() and our connection did not receive
    // "timeout error"
    // We wait till state becomes "cancelled"
    boolean interrupted = false;
    while (!STATE_UPDATER.compareAndSet(this, StatementCancelState.CANCELLED, StatementCancelState.IDLE)) {
      synchronized (connection) {
        try {
          // Note: wait timeout here is irrelevant since synchronized(connection) would block until
          // .cancel finishes
          connection.wait(10);
        } catch (InterruptedException e) { // NOSONAR
          // Either re-interrupt this method or rethrow the "InterruptedException"
          interrupted = true;
        }
      }
    }
    if (interrupted) {
      Thread.currentThread().interrupt();
    }
  }

Here, cleanupTimer is called first, and then the state of statement is updated.

PgConnection

postgresql-9.4.1212.jre7-sources.jar! /org/postgresql/jdbc/PgConnection.java

getTimer()

private synchronized Timer getTimer() {
    if (cancelTimer == null) {
      cancelTimer = Driver.getSharedTimer().getTimer();
    }
    return cancelTimer;
  }

Create or get a timer here

addTimerTask()

  public void addTimerTask(TimerTask timerTask, long milliSeconds) {
    Timer timer = getTimer();
    timer.schedule(timerTask, milliSeconds);
  }

The addition of timerTask is directly scheduled

purgeTimerTasks()

postgresql-9.4.1212.jre7-sources.jar! /org/postgresql/jdbc/PgConnection.java

public void purgeTimerTasks() {
    Timer timer = cancelTimer;
    if (timer != null) {
      timer.purge();
    }
  }

Called in cleanupTimer to clean up timer task that has been cancelled.

PgResultSet

postgresql-9.4.1212.jre7-sources.jar! /org/postgresql/jdbc/PgResultSet.java

next()

public boolean next() throws SQLException {
    checkClosed();

    if (onInsertRow) {
      throw new PSQLException(GT.tr("Can''t use relative move methods while on the insert row."),
          PSQLState.INVALID_CURSOR_STATE);
    }

    if (current_row + 1 >= rows.size()) {
      if (cursor == null || (maxRows > 0 && row_offset + rows.size() >= maxRows)) {
        current_row = rows.size();
        this_row = null;
        rowBuffer = null;
        return false; // End of the resultset.
      }

      // Ask for some more data.
      row_offset += rows.size(); // We are discarding some data.

      int fetchRows = fetchSize;
      if (maxRows != 0) {
        if (fetchRows == 0 || row_offset + fetchRows > maxRows) {
          // Fetch would exceed maxRows, limit it.
          fetchRows = maxRows - row_offset;
        }
      }

      // Execute the fetch and update this resultset.
      connection.getQueryExecutor().fetch(cursor, new CursorResultHandler(), fetchRows);

      current_row = 0;

      // Test the new rows array.
      if (rows.isEmpty()) {
        this_row = null;
        rowBuffer = null;
        return false;
      }
    } else {
      current_row++;
    }

    initRowBuffer();
    return true;
  }

Fetch here does not add timer like executeQuery

postgresql-9.4.1212.jre7-sources.jar! /org/postgresql/core/v3/QueryExecutorImpl.java

public synchronized void fetch(ResultCursor cursor, ResultHandler handler, int fetchSize)
      throws SQLException {
    waitOnLock();
    final Portal portal = (Portal) cursor;

    // Insert a ResultHandler that turns bare command statuses into empty datasets
    // (if the fetch returns no rows, we see just a CommandStatus..)
    final ResultHandler delegateHandler = handler;
    handler = new ResultHandlerDelegate(delegateHandler) {
      public void handleCommandStatus(String status, int updateCount, long insertOID) {
        handleResultRows(portal.getQuery(), null, new ArrayList<byte[][]>(), null);
      }
    };

    // Now actually run it.

    try {
      processDeadParsedQueries();
      processDeadPortals();

      sendExecute(portal.getQuery(), portal, fetchSize);
      sendSync();

      processResults(handler, 0);
      estimatedReceiveBufferBytes = 0;
    } catch (IOException e) {
      abort();
      handler.handleError(
          new PSQLException(GT.tr("An I/O error occurred while sending to the backend."),
              PSQLState.CONNECTION_FAILURE, e));
    }

    handler.handleCompletion();
  }

Summary

  • QueryTimeout is controlled by adding Timers. If there are too many requests, there may be too many Timers.

Timeout time should not be too long, but after normal execution of sql, killTimerTask () will be called. in it, the timer will be cleaned up first, and timerTask will be cancelled. then purgeTimerTasks () will be called to cleanupTimer cancelled task, so as to avoid the accumulation of task and final memory overflow caused by too long timeout time.

  • Timer task sends cancel query command to database server after timeout.
  • After sending the cancel query instruction, the client-side query should throw a SQLException (The mechanism of the header here needs to be further studied, which may be that the server side returns timeout error.)
  • By default, the executeQuery method pulls the data of fetchSize and returns it.
  • The next () method needs to fetch again, so this fetch method has no timer to limit the time, but the bottom layer should be limited by socketTimeout.

doc