Skip to content

Commit

Permalink
websocket reconnection time default 60s
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouaini528 committed Apr 23, 2021
1 parent 12f0c2b commit 0c87fae
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 13 deletions.
10 changes: 10 additions & 0 deletions src/Api/WebSocket/SocketClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -186,4 +186,14 @@ function test_reconnection(){
'public'=>['public'=>'close','kline'=>'close'],
];
}

function test_reconnection2(){
$this->client->debug2=1;
}

function test_reconnection3($key){
$this->client->debug=[
'private'=>[$key=>'close'],
];
}
}
22 changes: 18 additions & 4 deletions src/Api/WebSocket/SocketFunction.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

trait SocketFunction
{
//标记分隔符
static $USER_DELIMITER='===';

/**
* @param array $sub
* @return array
Expand Down Expand Up @@ -72,7 +75,7 @@ protected function log($message){
* @param $keysecret
*/
protected function userKey(array $keysecret,string $sub){
return $keysecret['key'].'='.$sub;
return $keysecret['key'].self::$USER_DELIMITER.$sub;
}

/**
Expand Down Expand Up @@ -113,7 +116,7 @@ private function sign(array $keysecret){
/**
* 重新订阅
*/
private function reconnection($global,$type='public'){
private function reconnection($global,$type='public',array $keysecret=[]){
$all_sub=$global->get('all_sub');
if(empty($all_sub)) return;

Expand All @@ -122,10 +125,21 @@ private function reconnection($global,$type='public'){
foreach ($all_sub as $v){
if(!is_array($v)) $temp[]=$v;
}

$global->save('add_sub',$this->resub($temp));
}else{
$this->keysecret=$keysecret;

$sub=$all_sub[$keysecret['key']];
if(empty($sub)) return;

foreach ($sub as $v){
$t=explode(self::$USER_DELIMITER,$v);
if($t[1]=='privatenotifications') $t[1]='privateNotifications';//该私有频道单独处理,订阅对大小写敏感
$temp[]=$t[1];
}
}

if(empty($temp)) return;

$global->save('add_sub',$this->resub($temp));
}
}
61 changes: 55 additions & 6 deletions src/Api/WebSocket/SocketServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ private function newConnection(){
//自定义属性
$this->connection[$this->connectionIndex]->tag=$tag;//标记公共连接还是私有连接
$this->connection[$this->connectionIndex]->tag_baseurl=$baseurl;
$this->connection[$this->connectionIndex]->tag_reconnection_num=0;//标记当前已重连次数
if(!empty($keysecret)) $this->connection[$this->connectionIndex]->tag_keysecret=$keysecret;//标记私有连接

$this->connection[$this->connectionIndex]->onConnect=$this->onConnect($keysecret);
Expand Down Expand Up @@ -112,6 +113,17 @@ private function onMessage($global){
if(isset($data['table']) && isset($data['data'][0]['symbol'])) {
$table=strtolower($data['table'].':'.$data['data'][0]['symbol']);
$global->save($table,$data);

/*$debug=$global->get('debug2');
if($debug==1){
$con->tag_data_time='1619151157';
return;
}*/

//最后数据更新时间
$con->tag_data_time=time();
//成功接收数据重连次数回归0
$con->tag_reconnection_num=0;
return;
}
}else{
Expand Down Expand Up @@ -142,14 +154,22 @@ private function onClose($global){
if(in_array($con->tag,$this->public_url)) {
//TODO如果连接失败 应该public private 都行重新加载
$this->log($con->tag.' reconnection');
$con->reConnect(10);

$this->reconnection($global,'public');
}else{
$this->log('connection close '.$con->tag_keysecret['key']);
$this->log('private connection close,ready to reconnect '.$con->tag_keysecret['key']);

Timer::del($con->timer_other);
//更改登录状态
$this->keysecretInit($con->tag_keysecret,[
'connection'=>2,
'auth'=>0,
]);

//重新订阅私有频道
$this->reconnection($global,'private',$con->tag_keysecret);
//Timer::del($con->timer_other);
}

$con->reConnect(10);
};
}

Expand All @@ -176,6 +196,26 @@ private function other($con,$global){
$this->debug($con,$global);

$this->log('listen '.$con->tag);

//公共数据如果60秒内无数据更新,则断开连接重新订阅,重试次数不超过10次
if(in_array($con->tag,$this->public_url)) {
/*if(isset($con->tag_data_time)){
//debug
echo time() - $con->tag_data_time;
echo PHP_EOL;
}*/

//public
if (isset($con->tag_data_time) && time() - $con->tag_data_time > 60 * ($con->tag_reconnection_num + 1) && $con->tag_reconnection_num <= 10) {
$con->close();

$con->tag_reconnection_num++;

$this->log('listen ' . $con->tag . ' reconnection_num:' . $con->tag_reconnection_num . ' tag_data_time:' . $con->tag_data_time);
}
}else{
//private
}
});
}

Expand All @@ -185,10 +225,11 @@ private function other($con,$global){
* @param $global
*/
private function debug($con,$global){
$debug=$global->get('debug');

if(in_array($con->tag,$this->public_url)) {
//public
$debug=$global->get('debug');
//print_r($debug);

if(isset($debug['public']) && $debug['public'][$con->tag]=='close'){
$this->log($con->tag.' debug '.json_encode($debug));

Expand All @@ -199,6 +240,14 @@ private function debug($con,$global){
}
}else{
//private
if(isset($debug['private'][$con->tag_keysecret['key']]) && $debug['private'][$con->tag_keysecret['key']]=='close'){
$this->log($con->tag_keysecret['key'].' debug '.json_encode($debug));

$debug['private'][$con->tag_keysecret['key']]='recon';
$global->save('debug',$debug);

$con->close();
}
}
}

Expand Down
12 changes: 11 additions & 1 deletion tests/websocket/client.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
'log'=>['filename'=>'future'],

//Daemons address and port,default 0.0.0.0:2216
//'global'=>'127.0.0.1:2216',
'global'=>'127.0.0.1:22160',

//Channel data update time,default 0.5 seconds
//'data_time'=>0.5,
Expand Down Expand Up @@ -276,6 +276,16 @@

break;
}

case 10006:{
$bitmex->client()->test_reconnection2();
break;
}

case 10008:{
$bitmex->client()->test_reconnection3($key_secret[0]['key']);
break;
}
}


4 changes: 2 additions & 2 deletions tests/websocket/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

$bitmex->config([
//Do you want to enable local logging,default false
'log'=>true,
//'log'=>true,
//Or set the log name
'log'=>['filename'=>'bitmex'],

//Daemons address and port,default 0.0.0.0:2216
//'global'=>'127.0.0.1:2216',
'global'=>'127.0.0.1:22160',

//Channel data update time,default 0.5 seconds
//'data_time'=>0.5,
Expand Down

0 comments on commit 0c87fae

Please sign in to comment.