class Mongo::Cursor

A cursor over query results. Returned objects are hashes.

Attributes

acceptable_latency[R]
collection[R]
comment[R]
compile_regex[R]
cursor_id[R]
fields[R]
full_collection_name[R]
hint[R]
options[R]
order[R]
read[R]
selector[R]
show_disk_loc[R]
snapshot[R]
tag_sets[R]
timeout[R]
transformer[R]

Public Class Methods

new(collection, opts={}) click to toggle source

Create a new cursor.

Note: cursors are created when executing queries using [Collection#find] and other similar methods. Application developers shouldn't have to create cursors manually.

@return [Cursor]

# File lib/mongo/cursor.rb, line 38
def initialize(collection, opts={})
  opts = opts.dup
  @cursor_id  = opts.delete(:cursor_id)
  @db         = collection.db
  @collection = collection
  @connection = @db.connection
  @logger     = @connection.logger

  # Query selector
  @selector   = opts.delete(:selector) || {}

  # Query pre-serialized bson to append
  @bson    = @selector.delete(:bson)

  # Special operators that form part of $query
  @order         = opts.delete(:order)
  @explain       = opts.delete(:explain)
  @hint          = opts.delete(:hint)
  @snapshot      = opts.delete(:snapshot)
  @max_scan      = opts.delete(:max_scan)
  @return_key    = opts.delete(:return_key)
  @show_disk_loc = opts.delete(:show_disk_loc)
  @comment       = opts.delete(:comment)
  @compile_regex = opts.key?(:compile_regex) ? opts.delete(:compile_regex) : true

  # Wire-protocol settings
  @fields   = convert_fields_for_query(opts.delete(:fields))
  @skip     = opts.delete(:skip)     || 0
  @limit    = opts.delete(:limit)    || 0
  @tailable = opts.delete(:tailable)
  @timeout  = opts.key?(:timeout) ? opts.delete(:timeout) : true
  @options  = 0

  # Use this socket for the query
  @socket = opts.delete(:socket)
  @pool   = opts.delete(:pool)

  @closed    = false
  @query_run = false

  @transformer        = opts.delete(:transformer)
  @read               = opts.delete(:read)               || @collection.read
  Mongo::ReadPreference::validate(@read)
  @tag_sets           = opts.delete(:tag_sets)           || @collection.tag_sets
  @acceptable_latency = opts.delete(:acceptable_latency) || @collection.acceptable_latency

  batch_size(opts.delete(:batch_size) || 0)

  @full_collection_name = "#{@collection.db.name}.#{@collection.name}"
  @cache                = opts.delete(:first_batch) || []
  @returned             = 0

  if(!@timeout)
    add_option(OP_QUERY_NO_CURSOR_TIMEOUT)
  end
  if(@read != :primary)
    add_option(OP_QUERY_SLAVE_OK)
  end
  if(@tailable)
    add_option(OP_QUERY_TAILABLE)
  end

  # If a cursor_id is provided, this is a cursor for a command
  if @cursor_id
    @command_cursor = true
    @query_run      = true
  end

  if @collection.name =~ /^\$cmd/ || @collection.name =~ /^system/
    @command = true
  else
    @command = false
  end

  @opts = opts
end

Public Instance Methods

add_option(opt) click to toggle source

Add an option to the query options bitfield.

@param opt a valid query option

@raise InvalidOperation if this method is run after the cursor has bee

iterated for the first time.

@return [Integer] the current value of the options bitfield for this cursor.

@see www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY

# File lib/mongo/cursor.rb, line 429
def add_option(opt)
  check_modifiable

  if exhaust?(opt)
    if @limit != 0
      raise MongoArgumentError, "Exhaust is incompatible with limit."
    elsif @connection.mongos?
      raise MongoArgumentError, "Exhaust is incompatible with mongos."
    end
  end

  @options |= opt
  @options
end
alive?() click to toggle source

Guess whether the cursor is alive on the server.

Note that this method only checks whether we have a cursor id. The cursor may still have timed out on the server. This will be indicated in the next call to #next.

@return [Boolean]

# File lib/mongo/cursor.rb, line 123
def alive?
  @cursor_id && @cursor_id != 0
end
batch_size(size=nil) click to toggle source

Set the batch size for server responses.

Note that the batch size will take effect only on queries where the number to be returned is greater than 100.

This can not override MongoDB's limit on the amount of data it will return to the client. Depending on server version this can be 4-16mb.

@param [Integer] size either 0 or some integer greater than 1. If 0,

the server will determine the batch size.

@return [Cursor]

# File lib/mongo/cursor.rb, line 309
def batch_size(size=nil)
  return @batch_size unless size
  check_modifiable
  if size < 0 || size == 1
    raise ArgumentError, "Invalid value for batch_size #{size}; must be 0 or > 1."
  else
    @batch_size = @limit != 0 && size > @limit ? @limit : size
  end

  self
end
close() click to toggle source

Close the cursor.

Note: if a cursor is read until exhausted (read until Mongo::Constants::OP_QUERY or Mongo::Constants::OP_GETMORE returns zero for the cursor id), there is no need to close it manually.

Note also: Mongo::Collection#find takes an optional block argument which can be used to ensure that your cursors get closed.

@return [True]

# File lib/mongo/cursor.rb, line 384
def close
  if @cursor_id && @cursor_id != 0
    message = BSON::ByteBuffer.new([0, 0, 0, 0])
    message.put_int(1)
    message.put_long(@cursor_id)
    log(:debug, "Cursor#close #{@cursor_id}")
    @connection.send_message(
      Mongo::Constants::OP_KILL_CURSORS,
      message,
      :pool => @pool
    )
  end
  @cursor_id = 0
  @closed    = true
end
closed?() click to toggle source

Is this cursor closed?

@return [Boolean]

# File lib/mongo/cursor.rb, line 403
def closed?
  @closed
end
count(skip_and_limit = false) click to toggle source

Get the size of the result set for this query.

@param [Boolean] skip_and_limit whether or not to take skip or limit into account.

@return [Integer] the number of objects in the result set for this query.

@raise [OperationFailure] on a database error.

# File lib/mongo/cursor.rb, line 197
def count(skip_and_limit = false)
  check_command_cursor
  command = BSON::OrderedHash["count",  @collection.name, "query",  @selector]

  if skip_and_limit
    command.merge!(BSON::OrderedHash["limit", @limit]) if @limit != 0
    command.merge!(BSON::OrderedHash["skip", @skip]) if @skip != 0
  end

  command.merge!(BSON::OrderedHash["fields", @fields])

  response = @db.command(command, :read => @read, :comment => @comment)
  return response['n'].to_i if Mongo::Support.ok?(response)
  return 0 if response['errmsg'] == "ns missing"
  raise OperationFailure.new("Count failed: #{response['errmsg']}", response['code'], response)
end
each() { |doc| ... } click to toggle source

Iterate over each document in this cursor, yielding it to the given block, if provided. An Enumerator is returned if no block is given.

Iterating over an entire cursor will close it.

@yield passes each document to a block for processing.

@example if 'comments' represents a collection of comments:

comments.find.each do |doc|
  puts doc['user']
end
# File lib/mongo/cursor.rb, line 332
def each
  if block_given? || !defined?(Enumerator)
    while doc = self.next
      yield doc
    end
  else
    Enumerator.new do |yielder|
      while doc = self.next
        yielder.yield doc
      end
    end
  end
end
explain() click to toggle source

Get the explain plan for this cursor.

@return [Hash] a document containing the explain plan for this cursor.

# File lib/mongo/cursor.rb, line 364
def explain
  check_command_cursor
  c = Cursor.new(@collection,
    query_options_hash.merge(:limit => -@limit.abs, :explain => true))
  explanation = c.next_document
  c.close

  explanation
end
has_next?() click to toggle source

Determine whether this cursor has any remaining results.

@return [Boolean]

# File lib/mongo/cursor.rb, line 186
def has_next?
  num_remaining > 0
end
inspect() click to toggle source

Clean output for inspect.

# File lib/mongo/cursor.rb, line 481
def inspect
  "<Mongo::Cursor:0x#{object_id.to_s(16)} namespace='#{@db.name}.#{@collection.name}' " +
    "@selector=#{@selector.inspect} @cursor_id=#{@cursor_id}>"
end
limit(number_to_return=nil) click to toggle source

Limit the number of results to be returned by this cursor.

This method overrides any limit specified in the Mongo::Collection#find method, and only the last limit applied has an effect.

@return [Integer] the current number_to_return if no parameter is given.

@raise [InvalidOperation] if this cursor has already been used.

# File lib/mongo/cursor.rb, line 243
def limit(number_to_return=nil)
  return @limit unless number_to_return
  check_modifiable

  if (number_to_return != 0) && exhaust?
    raise MongoArgumentError, "Limit is incompatible with exhaust option."
  end

  @limit = number_to_return
  self
end
max_time_ms(max_time_ms=nil) click to toggle source

Instruct the server to abort queries after they exceed the specified wall-clock execution time.

A query that completes in under its time limit will “roll over” remaining time to the first getmore op (which will then “roll over” its remaining time to the second getmore op and so on, until the time limit is hit).

Cursors returned by successful time-limited queries will still obey the default cursor idle timeout (unless the “no cursor idle timeout” flag has been set).

@note This will only have an effect in MongoDB 2.5+

@param #max_time_ms [Fixnum] max execution time (in milliseconds)

@return [Fixnum, Cursor] either the current #max_time_ms or cursor

# File lib/mongo/cursor.rb, line 289
def max_time_ms(max_time_ms=nil)
  return @max_time_ms unless max_time_ms
  check_modifiable

  @max_time_ms = max_time_ms
  self
end
next() click to toggle source

Get the next document specified the cursor options.

@return [Hash, Nil] the next document or Nil if no documents remain.

# File lib/mongo/cursor.rb, line 130
def next
  if @cache.length == 0
    if @query_run && exhaust?
      close
      return nil
    else
      refresh
    end
  end
  doc = @cache.shift

  if doc && (err = doc['errmsg'] || doc['$err']) # assignment
    code = doc['code'] || doc['assertionCode']

    # If the server has stopped being the master (e.g., it's one of a
    # pair but it has died or something like that) then we close that
    # connection. The next request will re-open on master server.
    if err.include?("not master")
      @connection.close
      raise ConnectionFailure.new(err, code, doc)
    end

    # Handle server side operation execution timeout
    if code == 50
      raise ExecutionTimeout.new(err, code, doc)
    end

    raise OperationFailure.new(err, code, doc)
  elsif doc && (write_concern_error = doc['writeConcernError']) # assignment
    raise WriteConcernError.new(write_concern_error['errmsg'], write_concern_error['code'], doc)
  end

  if @transformer.nil?
    doc
  else
    @transformer.call(doc) if doc
  end
end
Also aliased as: next_document
next_document()
Alias for: next
query_options_hash() click to toggle source

Get the query options for this Cursor.

@return [Hash]

# File lib/mongo/cursor.rb, line 464
def query_options_hash
  BSON::OrderedHash[
    :selector => @selector,
    :fields   => @fields,
    :skip     => @skip,
    :limit    => @limit,
    :order    => @order,
    :hint     => @hint,
    :snapshot => @snapshot,
    :timeout  => @timeout,
    :max_scan => @max_scan,
    :return_key => @return_key,
    :show_disk_loc => @show_disk_loc,
    :comment  => @comment ]
end
query_opts() click to toggle source

Returns an integer indicating which query options have been selected.

@return [Integer]

@see www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY The MongoDB wire protocol.

# File lib/mongo/cursor.rb, line 413
def query_opts
  warn "The method Cursor#query_opts has been deprecated " +
    "and will removed in v2.0. Use Cursor#options instead."
  @options
end
remove_option(opt) click to toggle source

Remove an option from the query options bitfield.

@param opt a valid query option

@raise InvalidOperation if this method is run after the cursor has bee

iterated for the first time.

@return [Integer] the current value of the options bitfield for this cursor.

@see www.mongodb.org/display/DOCS/Mongo+Wire+Protocol#MongoWireProtocol-Mongo::Constants::OPQUERY

# File lib/mongo/cursor.rb, line 454
def remove_option(opt)
  check_modifiable

  @options &= ~opt
  @options
end
rewind!() click to toggle source

Reset this cursor on the server. Cursor options, such as the query string and the values for skip and limit, are preserved.

# File lib/mongo/cursor.rb, line 172
def rewind!
  check_command_cursor
  close
  @cache.clear
  @cursor_id  = nil
  @closed     = false
  @query_run  = false
  @n_received = nil
  true
end
skip(number_to_skip=nil) click to toggle source

Skips the first number_to_skip results of this cursor. Returns the current number_to_skip if no parameter is given.

This method overrides any skip specified in the Mongo::Collection#find method, and only the last skip applied has an effect.

@return [Integer]

@raise [InvalidOperation] if this cursor has already been used.

# File lib/mongo/cursor.rb, line 264
def skip(number_to_skip=nil)
  return @skip unless number_to_skip
  check_modifiable

  @skip = number_to_skip
  self
end
sort(order, direction=nil) click to toggle source

Sort this cursor's results.

This method overrides any sort order specified in the Mongo::Collection#find method, and only the last sort applied has an effect.

@param [Symbol, Array, Hash, OrderedHash] order either 1) a key to sort by 2)

an array of [key, direction] pairs to sort by or 3) a hash of
field => direction pairs to sort by. Direction should be specified as
Mongo::ASCENDING (or :ascending / :asc) or Mongo::DESCENDING
(or :descending / :desc)

@raise [InvalidOperation] if this cursor has already been used.

@raise [InvalidSortValueError] if the specified order is invalid.

# File lib/mongo/cursor.rb, line 228
def sort(order, direction=nil)
  check_modifiable
  order = [[order, direction]] unless direction.nil?
  @order = order
  self
end
to_a() click to toggle source

Receive all the documents from this cursor as an array of hashes.

Notes:

If you've already started iterating over the cursor, the array returned by this method contains only the remaining documents. See #rewind! if you need to reset the cursor.

Use of this method is discouraged - in most cases, it's much more efficient to retrieve documents as you need them by iterating over the cursor.

@return [Array] an array of documents.

Calls superclass method
# File lib/mongo/cursor.rb, line 357
def to_a
  super
end

Private Instance Methods

check_command_cursor() click to toggle source
# File lib/mongo/cursor.rb, line 709
def check_command_cursor
  if @command_cursor
    raise InvalidOperation, "Cannot call #{caller.first} on command cursors"
  end
end
check_modifiable() click to toggle source
# File lib/mongo/cursor.rb, line 703
def check_modifiable
  if @query_run || @closed
    raise InvalidOperation, "Cannot modify the query once it has been run or closed."
  end
end
checkin_socket(sock) click to toggle source
# File lib/mongo/cursor.rb, line 625
def checkin_socket(sock)
  @connection.checkin(sock)
end
checkout_socket_from_connection() click to toggle source
# File lib/mongo/cursor.rb, line 608
def checkout_socket_from_connection
  begin
    if @pool
      socket = @pool.checkout
    elsif @command && !Mongo::ReadPreference::secondary_ok?(@selector)
      socket = @connection.checkout_reader({:mode => :primary})
    else
      socket = @connection.checkout_reader(read_preference)
    end
  rescue SystemStackError, NoMemoryError, SystemCallError => ex
    @connection.close
    raise ex
  end
  @pool = socket.pool
  socket
end
close_cursor_if_query_complete() click to toggle source
# File lib/mongo/cursor.rb, line 690
def close_cursor_if_query_complete
  if @limit > 0 && @returned >= @limit
    close
  end
end
compile_regex?() click to toggle source
# File lib/mongo/cursor.rb, line 715
def compile_regex?
  @compile_regex
end
construct_query_message() click to toggle source
# File lib/mongo/cursor.rb, line 629
def construct_query_message
  message = BSON::ByteBuffer.new("", @connection.max_bson_size + MongoClient::COMMAND_HEADROOM)
  message.put_int(@options)
  BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@collection.name}")
  message.put_int(@skip)
  @batch_size > 1 ? message.put_int(@batch_size) : message.put_int(@limit)
  if query_contains_special_fields? && @bson # costs two serialize calls
    query_message = BSON::BSON_CODER.serialize(@selector, false, false, @connection.max_bson_size + MongoClient::APPEND_HEADROOM)
    query_message.grow(@bson)
    query_spec = construct_query_spec
    query_spec.delete('$query')
    query_message.grow(BSON::BSON_CODER.serialize(query_spec, false, false, @connection.max_bson_size))
  else # costs only one serialize call
    spec = query_contains_special_fields? ? construct_query_spec : @selector
    spec.merge!(@opts)
    query_message = BSON::BSON_CODER.serialize(spec, false, false, @connection.max_bson_size + MongoClient::APPEND_HEADROOM)
    query_message.grow(@bson) if @bson
  end
  message.put_binary(query_message.to_s)
  message.put_binary(BSON::BSON_CODER.serialize(@fields, false, false, @connection.max_bson_size).to_s) if @fields
  message
end
construct_query_spec() click to toggle source
# File lib/mongo/cursor.rb, line 661
def construct_query_spec
  return @selector if @selector.has_key?('$query')
  spec = BSON::OrderedHash.new
  spec['$query']    = @selector
  spec['$orderby']  = Mongo::Support.format_order_clause(@order) if @order
  spec['$hint']     = @hint if @hint && @hint.length > 0
  spec['$explain']  = true if @explain
  spec['$snapshot'] = true if @snapshot
  spec['$maxScan']  = @max_scan if @max_scan
  spec['$returnKey']   = true if @return_key
  spec['$showDiskLoc'] = true if @show_disk_loc
  spec['$comment']  = @comment if @comment
  spec['$maxTimeMS'] = @max_time_ms if @max_time_ms
  if needs_read_pref?
    read_pref = Mongo::ReadPreference::mongos(@read, @tag_sets)
    spec['$readPreference'] = read_pref if read_pref
  end
  spec
end
convert_fields_for_query(fields) click to toggle source

Convert the :fields parameter from a single field name or an array of fields names to a hash, with the field names for keys and '1' for each value.

# File lib/mongo/cursor.rb, line 491
def convert_fields_for_query(fields)
  case fields
    when String, Symbol
      {fields => 1}
    when Array
      return nil if fields.length.zero?
      fields.inject({}) do |hash, field|
        field.is_a?(Hash) ? hash.merge!(field) : hash[field] = 1
        hash
      end
    when Hash
      return fields
  end
end
exhaust?(opts = options) click to toggle source

Check whether the exhaust option is set

@return [true, false] The state of the exhaust flag.

# File lib/mongo/cursor.rb, line 699
def exhaust?(opts = options)
  !(opts & OP_QUERY_EXHAUST).zero?
end
instrument_payload() click to toggle source
# File lib/mongo/cursor.rb, line 652
def instrument_payload
  log = { :database => @db.name, :collection => @collection.name, :selector => selector }
  log[:fields] = @fields if @fields
  log[:skip]   = @skip   if @skip && (@skip != 0)
  log[:limit]  = @limit  if @limit && (@limit != 0)
  log[:order]  = @order  if @order
  log
end
needs_read_pref?() click to toggle source
# File lib/mongo/cursor.rb, line 681
def needs_read_pref?
  @connection.mongos? && @read != :primary
end
num_remaining() click to toggle source

Return the number of documents remaining for this cursor.

# File lib/mongo/cursor.rb, line 507
def num_remaining
  if @cache.length == 0
    if @query_run && exhaust?
      close
      return 0
    else
      refresh
    end
  end

  @cache.length
end
query_contains_special_fields?() click to toggle source
# File lib/mongo/cursor.rb, line 685
def query_contains_special_fields?
  @order || @explain || @hint || @snapshot || @show_disk_loc ||
    @max_scan || @return_key || @comment || @max_time_ms || needs_read_pref?
end
refresh() click to toggle source

Refresh the documents in @cache. This means either sending the initial query or sending a GET_MORE operation.

# File lib/mongo/cursor.rb, line 522
def refresh
  if !@query_run
    send_initial_query
  elsif !@cursor_id.zero?
    send_get_more
  end
end
send_get_more() click to toggle source
# File lib/mongo/cursor.rb, line 572
def send_get_more
  message = BSON::ByteBuffer.new([0, 0, 0, 0])

  # DB name.
  BSON::BSON_RUBY.serialize_cstr(message, "#{@db.name}.#{@collection.name}")

  # Number of results to return.
  if @limit > 0
    limit = @limit - @returned
    if @batch_size > 0
      limit = limit < @batch_size ? limit : @batch_size
    end
    message.put_int(limit)
  else
    message.put_int(@batch_size)
  end

  # Cursor id.
  message.put_long(@cursor_id)
  log(:debug, "cursor.refresh() for cursor #{@cursor_id}") if @logger

  socket = @pool.checkout

  begin
    results, @n_received, @cursor_id = @connection.receive_message(
      Mongo::Constants::OP_GET_MORE, message, nil, socket, @command,
      nil, exhaust?, compile_regex?)
  ensure
    socket.checkin
  end

  @returned += @n_received
  @cache += results
  close_cursor_if_query_complete
end
send_initial_query() click to toggle source

Sends initial query – which is always a read unless it is a command

Upon ConnectionFailure, tries query 3 times if socket was not provided and the query is either not a command or is a secondary_ok command.

Pins pools upon successful read and unpins pool upon ConnectionFailure

# File lib/mongo/cursor.rb, line 537
def send_initial_query
  tries = 0
  instrument(:find, instrument_payload) do
    begin
      message = construct_query_message
      socket = @socket || checkout_socket_from_connection
      results, @n_received, @cursor_id = @connection.receive_message(
        Mongo::Constants::OP_QUERY, message, nil, socket, @command,
        nil, exhaust?, compile_regex?)
    rescue ConnectionFailure => ex
      socket.close if socket
      @pool = nil
      @connection.unpin_pool
      @connection.refresh
      if tries < 3 && !@socket && (!@command || Mongo::ReadPreference::secondary_ok?(@selector))
        tries += 1
        retry
      else
        raise ex
      end
    rescue OperationFailure, OperationTimeout => ex
      raise ex
    ensure
      socket.checkin unless @socket || socket.nil?
    end
    if !@socket && !@command
      @connection.pin_pool(socket.pool, read_preference)
    end
    @returned += @n_received
    @cache += results
    @query_run = true
    close_cursor_if_query_complete
  end
end