Skip to content

Commit

Permalink
Fix some queries
Browse files Browse the repository at this point in the history
  • Loading branch information
Darkren committed Sep 23, 2019
1 parent 07d5b80 commit b2dcbbb
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 37 deletions.
20 changes: 10 additions & 10 deletions pkg/app2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,24 @@ var (
errNoMoreAvailableValues = errors.New("no more available values")
)

// manager manages allows to store and retrieve arbitrary values
// idManager manages allows to store and retrieve arbitrary values
// associated with the `uint16` key in a thread-safe manner.
// Provides method to generate key.
type manager struct {
type idManager struct {
values map[uint16]interface{}
mx sync.RWMutex
lstKey uint16
}

// newManager constructs new `manager`.
func newManager() *manager {
return &manager{
// newIDManager constructs new `idManager`.
func newIDManager() *idManager {
return &idManager{
values: make(map[uint16]interface{}),
}
}

// `nextKey` reserves next free slot for the value and returns the key for it.
func (m *manager) nextKey() (*uint16, error) {
func (m *idManager) nextKey() (*uint16, error) {
m.mx.Lock()

nxtKey := m.lstKey + 1
Expand All @@ -50,9 +50,9 @@ func (m *manager) nextKey() (*uint16, error) {
return &nxtKey, nil
}

// getAndRemove removes value specified by `key` from the manager instance and
// pop removes value specified by `key` from the idManager instance and
// returns it.
func (m *manager) pop(key uint16) (interface{}, error) {
func (m *idManager) pop(key uint16) (interface{}, error) {
m.mx.Lock()
v, ok := m.values[key]
if !ok {
Expand All @@ -72,7 +72,7 @@ func (m *manager) pop(key uint16) (interface{}, error) {
}

// set sets value `v` associated with `key`.
func (m *manager) set(key uint16, v interface{}) error {
func (m *idManager) set(key uint16, v interface{}) error {
m.mx.Lock()

l, ok := m.values[key]
Expand All @@ -93,7 +93,7 @@ func (m *manager) set(key uint16, v interface{}) error {
}

// get gets the value associated with the `key`.
func (m *manager) get(key uint16) (interface{}, bool) {
func (m *idManager) get(key uint16) (interface{}, bool) {
m.mx.RLock()
lis, ok := m.values[key]
m.mx.RUnlock()
Expand Down
34 changes: 17 additions & 17 deletions pkg/app2/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"github.com/stretchr/testify/require"
)

func TestManager_NextID(t *testing.T) {
func TestIDManager_NextID(t *testing.T) {
t.Run("simple call", func(t *testing.T) {
m := newManager()
m := newIDManager()

nextKey, err := m.nextKey()
require.NoError(t, err)
Expand All @@ -29,7 +29,7 @@ func TestManager_NextID(t *testing.T) {
})

t.Run("call on full manager", func(t *testing.T) {
m := newManager()
m := newIDManager()
for i := uint16(0); i < math.MaxUint16; i++ {
m.values[i] = nil
}
Expand All @@ -40,7 +40,7 @@ func TestManager_NextID(t *testing.T) {
})

t.Run("concurrent run", func(t *testing.T) {
m := newManager()
m := newIDManager()

valsToReserve := 10000

Expand All @@ -66,9 +66,9 @@ func TestManager_NextID(t *testing.T) {
})
}

func TestManager_Pop(t *testing.T) {
func TestIDManager_Pop(t *testing.T) {
t.Run("simple call", func(t *testing.T) {
m := newManager()
m := newIDManager()

v := "value"

Expand All @@ -84,14 +84,14 @@ func TestManager_Pop(t *testing.T) {
})

t.Run("no value", func(t *testing.T) {
m := newManager()
m := newIDManager()

_, err := m.pop(1)
require.Error(t, err)
})

t.Run("value not set", func(t *testing.T) {
m := newManager()
m := newIDManager()

m.values[1] = nil

Expand All @@ -100,7 +100,7 @@ func TestManager_Pop(t *testing.T) {
})

t.Run("concurrent run", func(t *testing.T) {
m := newManager()
m := newIDManager()

m.values[1] = "value"

Expand Down Expand Up @@ -128,9 +128,9 @@ func TestManager_Pop(t *testing.T) {
})
}

func TestManager_Set(t *testing.T) {
func TestIDManager_Set(t *testing.T) {
t.Run("simple call", func(t *testing.T) {
m := newManager()
m := newIDManager()

nextKey, err := m.nextKey()
require.NoError(t, err)
Expand All @@ -145,7 +145,7 @@ func TestManager_Set(t *testing.T) {
})

t.Run("key is not reserved", func(t *testing.T) {
m := newManager()
m := newIDManager()

err := m.set(1, "value")
require.Error(t, err)
Expand All @@ -155,7 +155,7 @@ func TestManager_Set(t *testing.T) {
})

t.Run("value already exists", func(t *testing.T) {
m := newManager()
m := newIDManager()

v := "value"

Expand All @@ -169,7 +169,7 @@ func TestManager_Set(t *testing.T) {
})

t.Run("concurrent run", func(t *testing.T) {
m := newManager()
m := newIDManager()

concurrency := 1000

Expand Down Expand Up @@ -208,9 +208,9 @@ func TestManager_Set(t *testing.T) {
})
}

func TestManager_Get(t *testing.T) {
prepManagerWithVal := func(v interface{}) (*manager, uint16) {
m := newManager()
func TestIDManager_Get(t *testing.T) {
prepManagerWithVal := func(v interface{}) (*idManager, uint16) {
m := newIDManager()

nextKey, err := m.nextKey()
require.NoError(t, err)
Expand Down
13 changes: 10 additions & 3 deletions pkg/app2/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
type RPCClient interface {
Dial(remote network.Addr) (uint16, error)
Listen(local network.Addr) (uint16, error)
Accept(lisID uint16) (uint16, network.Addr, error)
Accept(lisID uint16) (connID uint16, remote network.Addr, err error)
Write(connID uint16, b []byte) (int, error)
Read(connID uint16, b []byte) (int, []byte, error)
CloseConn(id uint16) error
Expand Down Expand Up @@ -52,7 +52,7 @@ func (c *rpcCLient) Listen(local network.Addr) (uint16, error) {
}

// Accept sends `Accept` command to the server.
func (c *rpcCLient) Accept(lisID uint16) (uint16, network.Addr, error) {
func (c *rpcCLient) Accept(lisID uint16) (connID uint16, remote network.Addr, err error) {
var acceptResp AcceptResp
if err := c.rpc.Call("Accept", &lisID, &acceptResp); err != nil {
return 0, network.Addr{}, err
Expand All @@ -78,11 +78,18 @@ func (c *rpcCLient) Write(connID uint16, b []byte) (int, error) {

// Read sends `Read` command to the server.
func (c *rpcCLient) Read(connID uint16, b []byte) (int, []byte, error) {
req := ReadReq{
ConnID: connID,
BufLen: len(b),
}

var resp ReadResp
if err := c.rpc.Call("Read", &connID, &resp); err != nil {
if err := c.rpc.Call("Read", &req, &resp); err != nil {
return 0, nil, err
}

copy(b[:resp.N], resp.B[:resp.N])

return resp.N, resp.B, nil
}

Expand Down
24 changes: 17 additions & 7 deletions pkg/app2/rpc_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ import (

// RPCGateway is a RPC interface for the app server.
type RPCGateway struct {
lm *manager
cm *manager
lm *idManager
cm *idManager
log *logging.Logger
}

// newRPCGateway constructs new server RPC interface.
func newRPCGateway(log *logging.Logger) *RPCGateway {
return &RPCGateway{
lm: newManager(),
cm: newManager(),
lm: newIDManager(), // contains listeners associated with their IDs
cm: newIDManager(), // contains connections associated with their IDs
log: log,
}
}
Expand Down Expand Up @@ -141,24 +141,34 @@ func (r *RPCGateway) Write(req *WriteReq, n *int) error {
return nil
}

// ReadReq contains arguments for `Read`.
type ReadReq struct {
ConnID uint16
BufLen int
}

// ReadResp contains response parameters for `Read`.
type ReadResp struct {
B []byte
N int
}

// Read reads data from connection specified by `connID`.
func (r *RPCGateway) Read(connID *uint16, resp *ReadResp) error {
conn, err := r.getConn(*connID)
func (r *RPCGateway) Read(req *ReadReq, resp *ReadResp) error {
conn, err := r.getConn(req.ConnID)
if err != nil {
return err
}

resp.N, err = conn.Read(resp.B)
buf := make([]byte, req.BufLen)
resp.N, err = conn.Read(buf)
if err != nil {
return err
}

resp.B = make([]byte, resp.N)
copy(resp.B, buf[:resp.N])

return nil
}

Expand Down

0 comments on commit b2dcbbb

Please sign in to comment.