Skip to content

Commit

Permalink
websocket reconnection time default 60s
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouaini528 committed Apr 23, 2021
1 parent 634485d commit 124e578
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 29 deletions.
10 changes: 10 additions & 0 deletions src/Api/WebSocketV3/SocketClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,14 @@ function test_reconnection(){
'public'=>['public'=>'close'],
];
}

function test_reconnection2(){
$this->client->debug2=1;
}

function test_reconnection3($key){
$this->client->debug=[
'private'=>[$key=>'close'],
];
}
}
12 changes: 10 additions & 2 deletions src/Api/WebSocketV3/SocketFunction.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ protected function userKey(array $keysecret,string $sub){
/**
* 重新订阅
*/
private function reconnection($global,$type='public'){
private function reconnection($global,$type='public',array $keysecret=[]){
$all_sub=$global->get('all_sub');
if(empty($all_sub)) return;

Expand All @@ -113,9 +113,17 @@ private function reconnection($global,$type='public'){
foreach ($all_sub as $v){
if(!is_array($v)) $temp[]=$v;
}
$global->save('add_sub',$this->resub($temp));
}else{
$this->keysecret=$keysecret;

$temp=[];
foreach ($all_sub as $v){
if(is_array($v) && $keysecret['key']==$v[1]['key']){
$temp[]=$v[0];
}
}
}

$global->save('add_sub',$this->resub($temp));
}
}
57 changes: 50 additions & 7 deletions src/Api/WebSocketV3/SocketServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ private function newConnection(){

//自定义属性
$this->connection[$this->connectionIndex]->tag=$tag;//标记公共连接还是私有连接
$this->connection[$this->connectionIndex]->tag_reconnection_num=0;//标记当前已重连次数
if(!empty($keysecret)) $this->connection[$this->connectionIndex]->tag_keysecret=$keysecret;//标记私有连接

$this->connection[$this->connectionIndex]->onConnect=$this->onConnect($keysecret);
Expand Down Expand Up @@ -96,6 +97,12 @@ private function onMessage($global){
$data=json_decode($data,true);

if(isset($data['table'])) {
/*$debug=$global->get('debug2');
if($debug==1){
$con->tag_data_time='1619149751';
return;
}*/

$table=$data['table'].':'.$this->getInstrumentId($data);
$table=strtolower($table);

Expand All @@ -104,6 +111,11 @@ private function onMessage($global){
$global->saveQueue($table,$data);
}else{
$global->save($table,$data);

//最后数据更新时间
$con->tag_data_time=time();
//成功接收数据重连次数回归0
$con->tag_reconnection_num=0;
}

return;
Expand Down Expand Up @@ -144,14 +156,18 @@ private function onClose($global){
$this->log($con->tag.' reconnection');

$this->reconnection($global,'public');

//这里连接失败 会轮询 connect
$con->reConnect(10);
}else{
$this->log('connection close '.$con->tag_keysecret['key']);
$this->log('private connection close,ready to reconnect '.$con->tag_keysecret['key']);

//更改登录状态
$global->keysecretUpdate($con->tag_keysecret['key'],2);

Timer::del($con->timer_other);
//重新订阅私有频道
$this->reconnection($global,'private',$con->tag_keysecret);
//Timer::del($con->timer_other);
}

$con->reConnect(10);
};
}

Expand Down Expand Up @@ -186,6 +202,26 @@ private function other($con,$global){
$this->debug($con,$global);

$this->log('listen '.$con->tag);

//公共数据如果60秒内无数据更新,则断开连接重新订阅,重试次数不超过10次
if($con->tag=='public'){
/*if(isset($con->tag_data_time)){
//debug
echo time() - $con->tag_data_time;
echo PHP_EOL;
}*/

//public
if (isset($con->tag_data_time) && time() - $con->tag_data_time > 60 * ($con->tag_reconnection_num + 1) && $con->tag_reconnection_num <= 10) {
$con->close();

$con->tag_reconnection_num++;

$this->log('listen ' . $con->tag . ' reconnection_num:' . $con->tag_reconnection_num . ' tag_data_time:' . $con->tag_data_time);
}
}else{
//private
}
});
}

Expand All @@ -195,10 +231,9 @@ private function other($con,$global){
* @param $global
*/
private function debug($con,$global){
$debug=$global->get('debug');
if($con->tag=='public'){
//public
$debug=$global->get('debug');

if(isset($debug['public']) && $debug['public'][$con->tag]=='close'){
$this->log($con->tag.' debug '.json_encode($debug));

Expand All @@ -209,6 +244,14 @@ private function debug($con,$global){
}
}else{
//private
if(isset($debug['private'][$con->tag_keysecret['key']]) && $debug['private'][$con->tag_keysecret['key']]=='close'){
$this->log($con->tag_keysecret['key'].' debug '.json_encode($debug));

$debug['private'][$con->tag_keysecret['key']]='recon';
$global->save('debug',$debug);

$con->close();
}
}
}

Expand Down
30 changes: 11 additions & 19 deletions tests/websocket_v3/client.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
//'log'=>['filename'=>'okex'],

//Daemons address and port,default 0.0.0.0:2207
//'global'=>'127.0.0.1:2208',
'global'=>'127.0.0.1:22080',

//Heartbeat time,default 20 seconds
//'ping_time'=>20,
Expand All @@ -46,7 +46,7 @@
case 1:{
$okex->subscribe([
'spot/depth5:BCH-USDT',
'futures/depth5:BCH-USD-210326',
'futures/depth5:BCH-USD-210924',
'swap/depth5:BCH-USD-SWAP',
]);

Expand All @@ -71,10 +71,10 @@
$okex->keysecret($key_secret[0]);
$okex->subscribe([
'spot/depth5:BCH-USDT',
'futures/depth5:BCH-USD-210326',
'futures/depth5:BCH-USD-210924',
'swap/depth5:BCH-USD-SWAP',

'futures/position:BCH-USD-210326',
'futures/position:BCH-USD-210924',
'futures/account:BCH-USDT',
'swap/position:BCH-USD-SWAP',
]);
Expand Down Expand Up @@ -239,12 +239,7 @@
}

case 10004:{
//$a='futures/position:bch-usd-210326';
//print_r($okex->client()->$a);
/*$data=$okex->getSubscribe();
print_r(json_encode($data));*/
$okex->client()->test2();

break;
}

Expand All @@ -253,18 +248,15 @@
break;
}

//subscribe
case 10006:{
$okex->keysecret($key_secret[1]);
$okex->subscribe([
'spot/depth5:BCH-USDT',
'futures/depth5:BCH-USD-210326',
'swap/depth5:BCH-USD-SWAP',
$okex->client()->test_reconnection2();
break;
}

'futures/position:BCH-USD-210326',
'futures/account:BCH-USDT',
'swap/position:BCH-USD-SWAP',
]);
case 10007:{
//private
//print_r($key_secret[0]);
$okex->client()->test_reconnection3($key_secret[0]['key']);
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/websocket_v3/server.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//'log'=>['filename'=>'okex'],

//Daemons address and port,default 0.0.0.0:2207
//'global'=>'127.0.0.1:2208',
'global'=>'127.0.0.1:22080',

//Heartbeat time,default 20 seconds
//'ping_time'=>20,
Expand Down

0 comments on commit 124e578

Please sign in to comment.