Skip to content

Commit

Permalink
add public reconnection and debug
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouaini528 committed Jan 14, 2021
1 parent 3500ca6 commit ce2d4f5
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 13 deletions.
8 changes: 8 additions & 0 deletions src/Api/WebSocket/SocketClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ protected function init(){
$this->add('del_sub',[]);//正在删除的频道

$this->add('keysecret',[]);//目前总共key

$this->add('debug',[]);
}

function keysecret(array $keysecret=[]){
Expand Down Expand Up @@ -179,4 +181,10 @@ function test2(){
echo json_encode($this->client->$v).PHP_EOL;
}
}

function test_reconnection(){
$this->client->debug=[
'public'=>['public'=>'close'],
];
}
}
18 changes: 18 additions & 0 deletions src/Api/WebSocket/SocketFunction.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,22 @@ protected function log($message){
protected function userKey(array $keysecret,string $sub){
return $keysecret['key'].self::$USER_DELIMITER.$sub;
}

/**
* 重新订阅
*/
private function reconnection($global,$type='public'){
$all_sub=$global->get('all_sub');
if(empty($all_sub)) return;

if($type=='public'){
$temp=[];
foreach ($all_sub as $v){
if(!is_array($v)) $temp[]=$v;
}
$global->save('add_sub',$this->resub($temp));
}else{

}
}
}
54 changes: 46 additions & 8 deletions src/Api/WebSocket/SocketServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,22 @@ private function addConnection(string $tag,array $keysecret=[]){

private function newConnection(){
return function($tag,$keysecret){
$baseurl='ws://real.okex.com:8443/ws/v3';

$global=$this->client();

$this->connection[$this->connectionIndex] = new AsyncTcpConnection('ws://real.okex.com:8443/ws/v3');
$this->connection[$this->connectionIndex] = new AsyncTcpConnection($baseurl);
$this->connection[$this->connectionIndex]->transport = 'ssl';

$this->log('Connection '.$baseurl);

//自定义属性
$this->connection[$this->connectionIndex]->tag=$tag;//标记公共连接还是私有连接
if(!empty($keysecret)) $this->connection[$this->connectionIndex]->tag_keysecret=$keysecret;//标记私有连接

$this->connection[$this->connectionIndex]->onConnect=$this->onConnect($keysecret);
$this->connection[$this->connectionIndex]->onMessage=$this->onMessage($global);
$this->connection[$this->connectionIndex]->onClose=$this->onClose();
$this->connection[$this->connectionIndex]->onClose=$this->onClose($global);
$this->connection[$this->connectionIndex]->onError=$this->onError();

$this->connect($this->connection[$this->connectionIndex]);
Expand Down Expand Up @@ -134,12 +138,20 @@ private function onMessage($global){
};
}

private function onClose(){
return function($con){
$this->log('reconnection');
private function onClose($global){
return function($con) use($global){
if($con->tag=='public'){
$this->log($con->tag.' reconnection');

$this->reconnection($global,'public');

//这里连接失败 会轮询 connect
$con->reConnect(10);
}else{
$this->log('connection close '.$con->tag_keysecret['key']);

//这里连接失败 会轮询 connect
$con->reConnect(5);
Timer::del($con->timer_other);
}
};
}

Expand All @@ -166,15 +178,40 @@ private function ping($con){
private function other($con,$global){
$time=isset($this->config['listen_time']) ? $this->config['listen_time'] : 2 ;

Timer::add($time, function() use($con,$global) {
$con->timer_other=Timer::add($time, function() use($con,$global) {
$this->subscribe($con,$global);

$this->unsubscribe($con,$global);

$this->debug($con,$global);

$this->log('listen '.$con->tag);
});
}

/**
* 调试用
* @param $con
* @param $global
*/
private function debug($con,$global){
if($con->tag=='public'){
//public
$debug=$global->get('debug');

if(isset($debug['public']) && $debug['public'][$con->tag]=='close'){
$this->log($con->tag.' debug '.json_encode($debug));

$debug['public'][$con->tag]='recon';
$global->save('debug',$debug);

$con->close();
}
}else{
//private
}
}

private function subscribe($con,$global){
if(empty($global->get('add_sub'))) return;

Expand Down Expand Up @@ -237,6 +274,7 @@ private function subscribePrivate($con,$global){
$temp=$global->get('add_sub');
//判断是否是私有连接,并判断该私有连接是否是 当前用户。
foreach ($temp as $v){
if(!isset($v[1])) continue;
$key=$v[1]['key'];
if(count($v)>1 && $key==$keysecret['key']) $sub[]=$v[0];
}
Expand Down
6 changes: 1 addition & 5 deletions tests/websocket/client.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
'spot/depth5:BCH-USDT',
'futures/depth5:BCH-USD-210326',
'swap/depth5:BCH-USD-SWAP',
'option/depth5:BTCUSD-20201021-11750-C',
]);

break;
Expand Down Expand Up @@ -250,10 +249,7 @@
}

case 10005:{
$okex->keysecret($key_secret[0]);
$data=$okex->getSubscribes();
print_r(json_encode($data));
die;
$okex->client()->test_reconnection();
break;
}

Expand Down

0 comments on commit ce2d4f5

Please sign in to comment.