Skip to content

Commit

Permalink
added Scroll Iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
webdevsHub committed May 18, 2015
1 parent b226c37 commit dbfe675
Show file tree
Hide file tree
Showing 6 changed files with 355 additions and 189 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ All notable changes to this project will be documented in this file based on the
- Multiple rescore query [#820](https://github.com/ruflin/Elastica/issues/820/)
- Support for a custom connection timeout through a connectTimeout parameter. [#841](https://github.com/ruflin/Elastica/issues/841/)
- SignificantTerms Aggregation [#847](https://github.com/ruflin/Elastica/issues/847/)
- Scroll Iterator [#842](https://github.com/ruflin/Elastica/issues/842/)


### Improvements
Expand Down
133 changes: 29 additions & 104 deletions lib/Elastica/ScanAndScroll.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,154 +3,79 @@
namespace Elastica;

/**
* scan and scroll object
* Scan and Scroll Iterator
*
* @category Xodoa
* @package Elastica
* @author Manuel Andreo Garcia <[email protected]>
*
* @link http://www.elastic.co/guide/en/elasticsearch/guide/current/scan-scroll.html
*/
class ScanAndScroll implements \Iterator
class ScanAndScroll extends Scroll
{
/**
* time value parameter
*
* @var string
*/
public $expiryTime;

/**
* @var int
*/
public $sizePerShard;

/**
* @var Search
*/
protected $_search;

/**
* @var null|string
*/
protected $_nextScrollId = null;

/**
* @var null|string
*/
protected $_lastScrollId = null;

/**
* @var null|ResultSet
*/
protected $_currentResultSet = null;

/**
* Constructs scroll iterator object
* Constructor
*
* @param Search $search
* @param string $expiryTime
* @param int $sizePerShard
*/
public function __construct(Search $search, $expiryTime = '1m', $sizePerShard = 1000)
{
$this->_search = $search;
$this->expiryTime = $expiryTime;
$this->sizePerShard = $sizePerShard;
}

/**
* Return the current result set
*
* @link http://php.net/manual/en/iterator.current.php
* @return ResultSet
*/
public function current()
{
return $this->_currentResultSet;
parent::__construct($search, $expiryTime);
}

/**
* Perform next scroll search
* Initial scan search
*
* @link http://php.net/manual/en/iterator.next.php
* @return void
*/
public function next()
{
$this->_scroll();
}

/**
* Return the scroll id of current scroll request
*
* @link http://php.net/manual/en/iterator.key.php
* @return string
*/
public function key()
{
return $this->_lastScrollId;
}

/**
* Returns true if current result set contains one hit
*
* @link http://php.net/manual/en/iterator.valid.php
* @return boolean
*/
public function valid()
{
return
$this->_nextScrollId !== null
&& $this->_currentResultSet !== null
&& $this->_currentResultSet->count() > 0;
}

/**
* Start the initial scan search
* @link http://php.net/manual/en/iterator.rewind.php
* @throws \Elastica\Exception\InvalidException
*
* @return void
*/
public function rewind()
{
$this->_search->getQuery()->setSize($this->sizePerShard);
// reset state
$this->_nextScrollId = null;
$this->_options = array(null, null, null, null);

$this->_search->setOption(Search::OPTION_SEARCH_TYPE, Search::OPTION_SEARCH_TYPE_SCAN);
$this->_search->setOption(Search::OPTION_SCROLL, $this->expiryTime);
$this->_saveOptions();

// initial scan request
$this->_search->getQuery()->setSize($this->sizePerShard);
$this->_search->setOption(Search::OPTION_SCROLL, $this->expiryTime);
$this->_search->setOption(Search::OPTION_SCROLL_ID, null);
$this->_search->setOption(Search::OPTION_SEARCH_TYPE, Search::OPTION_SEARCH_TYPE_SCAN);
$this->_setScrollId($this->_search->search());

// trigger first scroll request
$this->_scroll();
$this->_revertOptions();

// first scroll request
$this->next();
}

/**
* Perform next scroll search
* @throws \Elastica\Exception\InvalidException
* Save all search options manipulated by Scroll
*/
protected function _scroll()
protected function _saveOptions()
{
$this->_search->setOption(Search::OPTION_SEARCH_TYPE, Search::OPTION_SEARCH_TYPE_SCROLL);
$this->_search->setOption(Search::OPTION_SCROLL_ID, $this->_nextScrollId);
$query = $this->_search->getQuery();
if ($query->hasParam('size')) {
$this->_options[3] = $query->getParam('size');
}

$resultSet = $this->_search->search();
$this->_currentResultSet = $resultSet;
$this->_setScrollId($resultSet);
parent::_saveOptions();
}

/**
* Save last scroll id and extract the new one if possible
* @param ResultSet $resultSet
* Revert search options to previously saved state
*/
protected function _setScrollId(ResultSet $resultSet)
protected function _revertOptions()
{
$this->_lastScrollId = $this->_nextScrollId;
$this->_search->getQuery()->setSize($this->_options[3]);

$this->_nextScrollId = null;
if ($resultSet->getResponse()->isOk()) {
$this->_nextScrollId = $resultSet->getResponse()->getScrollId();
}
parent::_revertOptions();
}
}
175 changes: 175 additions & 0 deletions lib/Elastica/Scroll.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
<?php

namespace Elastica;

/**
* Scroll Iterator
*
* @author Manuel Andreo Garcia <[email protected]>
*
* @link http://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-scroll.html
*/
class Scroll implements \Iterator
{
/**
* @var string
*/
public $expiryTime;

/**
* @var Search
*/
protected $_search;

/**
* @var null|string
*/
protected $_nextScrollId = null;

/**
* @var null|ResultSet
*/
protected $_currentResultSet = null;

/**
* 0: scroll<br>
* 1: scroll id<br>
* 2: search type.
*
* @var array
*/
protected $_options = array(null, null, null);

/**
* Constructor
*
* @param Search $search
* @param string $expiryTime
*/
public function __construct(Search $search, $expiryTime = '1m')
{
$this->_search = $search;
$this->expiryTime = $expiryTime;
}

/**
* Returns current result set
*
* @link http://php.net/manual/en/iterator.current.php
*
* @return ResultSet
*/
public function current()
{
return $this->_currentResultSet;
}

/**
* Next scroll search
*
* @link http://php.net/manual/en/iterator.next.php
*/
public function next()
{
$this->_saveOptions();

$this->_search->setOption(Search::OPTION_SCROLL, $this->expiryTime);
$this->_search->setOption(Search::OPTION_SCROLL_ID, $this->_nextScrollId);
$this->_search->setOption(Search::OPTION_SEARCH_TYPE, Search::OPTION_SEARCH_TYPE_SCROLL);
$this->_setScrollId($this->_search->search());

$this->_revertOptions();
}

/**
* Returns scroll id
*
* @link http://php.net/manual/en/iterator.key.php
*
* @return string
*/
public function key()
{
return $this->_nextScrollId;
}

/**
* Returns true if current result set contains at least one hit
*
* @link http://php.net/manual/en/iterator.valid.php
*
* @return bool
*/
public function valid()
{
return
$this->_nextScrollId !== null
&& $this->_currentResultSet !== null
&& $this->_currentResultSet->count() > 0;
}

/**
* Initial scroll search
*
* @link http://php.net/manual/en/iterator.rewind.php
*/
public function rewind()
{
// reset state
$this->_nextScrollId = null;
$this->_options = array(null, null, null);

// initial search
$this->_saveOptions();

$this->_search->setOption(Search::OPTION_SCROLL, $this->expiryTime);
$this->_search->setOption(Search::OPTION_SCROLL_ID, null);
$this->_search->setOption(Search::OPTION_SEARCH_TYPE, null);
$this->_setScrollId($this->_search->search());

$this->_revertOptions();
}

/**
* Prepares Scroll for next request
*
* @param ResultSet $resultSet
*/
protected function _setScrollId(ResultSet $resultSet)
{
$this->_currentResultSet = $resultSet;

$this->_nextScrollId = null;
if ($resultSet->getResponse()->isOk()) {
$this->_nextScrollId = $resultSet->getResponse()->getScrollId();
}
}

/**
* Save all search options manipulated by Scroll
*/
protected function _saveOptions()
{
if ($this->_search->hasOption(Search::OPTION_SCROLL)) {
$this->_options[0] = $this->_search->getOption(Search::OPTION_SCROLL);
}

if ($this->_search->hasOption(Search::OPTION_SCROLL_ID)) {
$this->_options[1] = $this->_search->getOption(Search::OPTION_SCROLL_ID);
}

if ($this->_search->hasOption(Search::OPTION_SEARCH_TYPE)) {
$this->_options[2] = $this->_search->getOption(Search::OPTION_SEARCH_TYPE);
}
}

/**
* Revert search options to previously saved state
*/
protected function _revertOptions()
{
$this->_search->setOption(Search::OPTION_SCROLL, $this->_options[0]);
$this->_search->setOption(Search::OPTION_SCROLL_ID, $this->_options[1]);
$this->_search->setOption(Search::OPTION_SEARCH_TYPE, $this->_options[2]);
}
}
12 changes: 12 additions & 0 deletions lib/Elastica/Search.php
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,18 @@ public function setSuggest(Suggest $suggest)
return $this->setOptionsAndQuery(array(self::OPTION_SEARCH_TYPE_SUGGEST => 'suggest'), $suggest);
}

/**
* Returns the Scroll Iterator
*
* @see Elastica\Scroll
* @param string $expiryTime
* @return Scroll
*/
public function scroll($expiryTime = '1m')
{
return new Scroll($this, $expiryTime);
}

/**
* Returns the ScanAndScroll Iterator
*
Expand Down
Loading

0 comments on commit dbfe675

Please sign in to comment.