Skip to content

Commit

Permalink
Merge pull request #618 from milan/guzzle-transport
Browse files Browse the repository at this point in the history
#465 use guzzle for http transport
  • Loading branch information
ruflin committed May 25, 2014
2 parents 58b0928 + 0eccf63 commit 071f262
Show file tree
Hide file tree
Showing 5 changed files with 397 additions and 0 deletions.
3 changes: 3 additions & 0 deletions changes.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
CHANGES

2014-05-25
- Added Guzzle transport as an alternative to the default Http transport

2014-05-13
- Add JSON compat library; Elasticsearch JSON flags and nicer error handling

Expand Down
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
},
"suggest": {
"munkie/elasticsearch-thrift-php": "Allow using thrift transport",
"guzzlehttp/guzzle": "Allow using guzzle 4.x as the http transport (requires php 5.4)",
"psr/log": "for logging",
"monolog/monolog": "Logging request"
},
Expand Down
51 changes: 51 additions & 0 deletions lib/Elastica/Exception/Connection/GuzzleException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
<?php

namespace Elastica\Exception\Connection;

use Elastica\Exception\ConnectionException;
use Elastica\Request;
use Elastica\Response;
use GuzzleHttp\Exception\TransferException;

/**
* Transport exception
*
* @package Elastica
* @author Milan Magudia <[email protected]>
*/
class GuzzleException extends ConnectionException
{
/**
* @var TransferException
*/
protected $_guzzleException;

/**
* @param \GuzzleHttp\Exception\TransferException $guzzleException
* @param \Elastica\Request $request
* @param \Elastica\Response $response
*/
public function __construct(TransferException $guzzleException, Request $request = null, Response $response = null)
{
$this->_guzzleException = $guzzleException;
$message = $this->getErrorMessage($this->getGuzzleException());
parent::__construct($message, $request, $response);
}

/**
* @param \GuzzleHttp\Exception\TransferException $guzzleException
* @return string
*/
public function getErrorMessage(TransferException $guzzleException)
{
return $guzzleException->getMessage();
}

/**
* @return TransferException
*/
public function getGuzzleException()
{
return $this->_guzzleException;
}
}
179 changes: 179 additions & 0 deletions lib/Elastica/Transport/Guzzle.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
<?php

namespace Elastica\Transport;

use Elastica\Exception\Connection\HttpException;
use Elastica\Exception\Connection\GuzzleException;
use Elastica\Exception\PartialShardFailureException;
use Elastica\Exception\ResponseException;
use Elastica\Exception\InvalidException;
use Elastica\Connection;
use Elastica\Request;
use Elastica\Response;
use Elastica\JSON;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\TransferException;
use GuzzleHttp\Exception\ClientException;
use GuzzleHttp\Stream\Stream;

/**
* Elastica Guzzle Transport object
*
* @package Elastica
* @author Milan Magudia <[email protected]>
*/
class Guzzle extends AbstractTransport
{
/**
* Http scheme
*
* @var string Http scheme
*/
protected $_scheme = 'http';

/**
* Curl resource to reuse
*
* @var resource Guzzle resource to reuse
*/
protected static $_guzzleClientConnection = null;

/**
* Makes calls to the elasticsearch server
*
* All calls that are made to the server are done through this function
*
* @param \Elastica\Request $request
* @param array $params Host, Port, ...
* @throws \Elastica\Exception\ConnectionException
* @throws \Elastica\Exception\ResponseException
* @throws \Elastica\Exception\Connection\HttpException
* @return \Elastica\Response Response object
*/
public function exec(Request $request, array $params)
{
$connection = $this->getConnection();

try {
$client = $this->_getGuzzleClient($this->_getBaseUrl($connection), $connection->isPersistent());

$options = array();
if ($connection->getTimeout()) {
$options['timeout'] = $connection->getTimeout();
}

if ($connection->getProxy()) {
$options['proxy'] = $connection->getProxy();
}

$req = $client->createRequest($request->getMethod(), $this->_getActionPath($request), $options);
$req->setHeaders($connection->hasConfig('headers') ?: array());

$data = $request->getData();
if (isset($data) && !empty($data)) {

if ($req->getMethod() == Request::GET) {
$req->setMethod(Request::POST);
}

if ($this->hasParam('postWithRequestBody') && $this->getParam('postWithRequestBody') == true) {
$request->setMethod(Request::POST);
$req->setMethod(Request::POST);
}

if (is_array($data)) {
$content = JSON::stringify($data, 'JSON_ELASTICSEARCH');
} else {
$content = $data;
}
$req->setBody(Stream::factory($content));
}

$start = microtime(true);
$res = $client->send($req);
$end = microtime(true);

$response = new Response((string)$res->getBody(), $res->getStatusCode());

if (defined('DEBUG') && DEBUG) {
$response->setQueryTime($end - $start);
}

$response->setTransferInfo(
array(
'request_header' => $request->getMethod(),
'http_code' => $res->getStatusCode()
)
);

if ($response->hasError()) {
throw new ResponseException($request, $response);
}

if ($response->hasFailedShards()) {
throw new PartialShardFailureException($request, $response);
}

return $response;

} catch (ClientException $e) {
// ignore 4xx errors
} catch (TransferException $e) {
throw new GuzzleException($e, $request, new Response($e->getMessage()));
}

}

/**
* Return Guzzle resource
*
* @param bool $persistent False if not persistent connection
* @return resource Connection resource
*/
protected function _getGuzzleClient($baseUrl, $persistent = true)
{
if (!$persistent || !self::$_guzzleClientConnection) {
self::$_guzzleClientConnection = new Client(array('base_url' => $baseUrl));
}

return self::$_guzzleClientConnection;
}

/**
* Builds the base url for the guzzle connection
*
* @param \Elastica\Connection $connection
*/
protected function _getBaseUrl(Connection $connection)
{
// If url is set, url is taken. Otherwise port, host and path
$url = $connection->hasConfig('url') ? $connection->getConfig('url') : '';

if (!empty($url)) {
$baseUri = $url;
} else {
$baseUri = $this->_scheme . '://' . $connection->getHost() . ':' . $connection->getPort() . '/' . $connection->getPath();
}
return rtrim($baseUri, '/');
}

/**
* Builds the action path url for each request
*
* @param \Elastica\Request $request
*/
protected function _getActionPath(Request $request)
{
$action = $request->getPath();
if ($action) {
$action = '/'. ltrim($action, '/');
}
$query = $request->getQuery();

if (!empty($query)) {
$action .= '?' . http_build_query($query);
}

return $action;
}
}
Loading

0 comments on commit 071f262

Please sign in to comment.