Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Push Publishing #82

Merged
merged 15 commits into from
Nov 17, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mc/nat.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func NATConfigFromString(str string) (cfg NATConfig, err error) {
}

func GetPublicIP() (string, error) {
res, err := http.Get("http://ifconfig.me/ip")
res, err := http.Get("http://ifconfig.co/ip")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be nice to fallback through a couple/one that we run

Copy link
Contributor Author

@vyzo vyzo Nov 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that's probably a good idea.
The code running ifconfig.co is available at github: https://github.com/martinp/ipd
So we can run it at ifconfig.medichain.io and have fallback to ifconfig.co.

I'll open an enhancement issue and add support once we have the ifconfing server up.

if err != nil {
return "", err
}
Expand Down
4 changes: 4 additions & 0 deletions mc/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ func (q *Query) IsSimpleSelect(sel string) bool {
return false
}

func (q *Query) WithSimpleSelect(sel string) *Query {
return &Query{q.Op, q.namespace, SimpleSelector(sel), q.criteria, q.order, q.limit}
}

type QuerySelector interface {
selectorType() string
}
Expand Down
127 changes: 126 additions & 1 deletion mcnode/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ func (node *Node) httpRemoteQuery(w http.ResponseWriter, r *http.Request) {
// POST /merge/{peerId}
// DATA: MCQL SELECT query
// Queries a remote peer and merges the resulting statements into the local
// db; returns the number of statements merged
// db; returns the number of statements and objects merged
func (node *Node) httpMerge(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
peerId := vars["peerId"]
Expand Down Expand Up @@ -410,6 +410,63 @@ func (node *Node) httpMerge(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, ocount)
}

// POST /push/{peerId}
// DATA: MCQL SELECT query
// Pushes statements matching the query to peerId for merge; must be
// authorized for push by the peer
// returns the number of statements and objects merged
func (node *Node) httpPush(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
peerId := vars["peerId"]

body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("http/push: Error reading request body: %s", err.Error())
return
}

q := string(body)

qq, err := mcq.ParseQuery(q)
if err != nil {
apiError(w, http.StatusBadRequest, err)
return
}

if !qq.IsSimpleSelect("*") {
apiError(w, http.StatusBadRequest, BadQuery)
return
}

pid, err := p2p_peer.IDB58Decode(peerId)
if err != nil {
apiError(w, http.StatusBadRequest, err)
return
}

ctx, cancel := context.WithCancel(r.Context())
defer cancel()

count, ocount, err := node.doPush(ctx, pid, qq)
if err != nil {
apiError(w, http.StatusInternalServerError, err)
if count > 0 {
fmt.Fprintf(w, "Partial push: %d statements merged\n", count)
}
if ocount > 0 {
fmt.Fprintf(w, "Partial push: %d objects merged\n", ocount)
}
if count < 0 {
fmt.Fprintf(w, "Incomplete push: some statements may have been merged\n")
}

return
}

fmt.Fprintln(w, count)
fmt.Fprintln(w, ocount)
}

// POST /delete
// DATA: MCQL DELTE query
// Deletes statements from the statement db
Expand Down Expand Up @@ -719,6 +776,74 @@ func (node *Node) httpConfigInfoSet(w http.ResponseWriter, r *http.Request) {
fmt.Fprintln(w, "OK")
}

// GET /auth
// retrieves all peer authorization rules in json
func (node *Node) httpAuth(w http.ResponseWriter, r *http.Request) {
rules := node.auth.toJSON()

err := json.NewEncoder(w).Encode(rules)
if err != nil {
log.Printf("Error writing response body: %s", err.Error())
}
}

// GET /auth/{peerId}
// POST /auth/{peerId}
// gets/sets auth rules for peerId
// rules are specified as a comma separated list of namespaces (or ns wildcards)
func (node *Node) httpAuthPeer(w http.ResponseWriter, r *http.Request) {
apiConfigMethod(w, r, node.httpAuthPeerGet, node.httpAuthPeerSet)
}

func (node *Node) httpAuthPeerGet(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
peerId := vars["peerId"]

pid, err := p2p_peer.IDB58Decode(peerId)
if err != nil {
apiError(w, http.StatusBadRequest, err)
return
}

rules := node.auth.getRules(pid)
if len(rules) > 0 {
fmt.Fprintln(w, strings.Join(rules, ","))
}
}

func (node *Node) httpAuthPeerSet(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
peerId := vars["peerId"]

pid, err := p2p_peer.IDB58Decode(peerId)
if err != nil {
apiError(w, http.StatusBadRequest, err)
return
}

body, err := ioutil.ReadAll(r.Body)
if err != nil {
log.Printf("http/auth: Error reading request body: %s", err.Error())
return
}

rbody := strings.TrimSpace(string(body))
if rbody == "" {
node.auth.clearRules(pid)
} else {
rules := strings.Split(rbody, ",")
node.auth.setRules(pid, rules)
}

err = node.saveConfig()
if err != nil {
apiError(w, http.StatusInternalServerError, err)
return
}

fmt.Fprintln(w, "OK")
}

// POST /shutdown
// shutdown the node
func (node *Node) httpShutdown(w http.ResponseWriter, r *http.Request) {
Expand Down
3 changes: 3 additions & 0 deletions mcnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func main() {
router.HandleFunc("/query", node.httpQuery)
router.HandleFunc("/query/{peerId}", node.httpRemoteQuery)
router.HandleFunc("/merge/{peerId}", node.httpMerge)
router.HandleFunc("/push/{peerId}", node.httpPush)
router.HandleFunc("/delete", node.httpDelete)
router.HandleFunc("/data/put", node.httpPutData)
router.HandleFunc("/data/get/{objectId}", node.httpGetData)
Expand All @@ -88,6 +89,8 @@ func main() {
router.HandleFunc("/config/dir", node.httpConfigDir)
router.HandleFunc("/config/nat", node.httpConfigNAT)
router.HandleFunc("/config/info", node.httpConfigInfo)
router.HandleFunc("/auth", node.httpAuth)
router.HandleFunc("/auth/{peerId}", node.httpAuthPeer)
router.HandleFunc("/dir/list", node.httpDirList)
router.HandleFunc("/net/addr", node.httpNetAddr)
router.HandleFunc("/shutdown", node.httpShutdown)
Expand Down
131 changes: 127 additions & 4 deletions mcnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
ggproto "github.com/gogo/protobuf/proto"
p2p_crypto "github.com/libp2p/go-libp2p-crypto"
p2p_host "github.com/libp2p/go-libp2p-host"
p2p_peer "github.com/libp2p/go-libp2p-peer"
p2p_pstore "github.com/libp2p/go-libp2p-peerstore"
mc "github.com/mediachain/concat/mc"
mcq "github.com/mediachain/concat/mc/query"
Expand All @@ -18,6 +19,7 @@ import (
"log"
"os"
"path"
"strings"
"sync"
"time"
)
Expand All @@ -36,6 +38,7 @@ type Node struct {
home string
db StatementDB
ds Datastore
auth PeerAuth
mx sync.Mutex
counter int
}
Expand Down Expand Up @@ -65,6 +68,11 @@ type Datastore interface {
Close()
}

type PeerAuth struct {
peers map[p2p_peer.ID][]string
mx sync.Mutex
}

type NodeInfo struct {
Peer string `json:"peer"`
Publisher string `json:"publisher"`
Expand All @@ -85,6 +93,9 @@ var (
MissingData = errors.New("Missing statement metadata")
UnexpectedData = errors.New("Unexpected data object")
BadData = errors.New("Bad data object; hash mismatch")
BadPush = errors.New("Bad push value; unexpected object")
BadResponse = errors.New("Bad response; unexpected object")
BadRuleset = errors.New("Bad auth ruleset; unexpected object")
)

const (
Expand All @@ -95,6 +106,12 @@ const (

var statusString = []string{"offline", "online", "public"}

type PushError string

func (s PushError) Error() string {
return string(s)
}

type StreamError struct {
Err string `json:"error"`
}
Expand Down Expand Up @@ -256,9 +273,10 @@ func (node *Node) openDS() error {

// persistent configuration
type NodeConfig struct {
Info string `json:"info,omitempty"`
NAT string `json:"nat,omitempty"`
Dir string `json:"dir,omitempty"`
Info string `json:"info,omitempty"`
NAT string `json:"nat,omitempty"`
Dir string `json:"dir,omitempty"`
Auth map[string]interface{} `json:"auth,omitempty"`
}

func (node *Node) saveConfig() error {
Expand All @@ -268,6 +286,7 @@ func (node *Node) saveConfig() error {
if node.dir != nil {
cfg.Dir = mc.FormatHandle(*node.dir)
}
cfg.Auth = node.auth.toJSON()

bytes, err := json.Marshal(cfg)
if err != nil {
Expand Down Expand Up @@ -311,7 +330,8 @@ func (node *Node) loadConfig() error {
node.dir = &pinfo
}

return nil
err = node.auth.fromJSON(cfg.Auth)
return err
}

func (node *Node) doShutdown() {
Expand All @@ -323,3 +343,106 @@ func (node *Node) doShutdown() {
node.ds.Close()
os.Exit(0)
}

func (auth *PeerAuth) fromJSON(rmap map[string]interface{}) error {
auth.mx.Lock()
defer auth.mx.Unlock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this just executes when it goes out of scope? weird. I guess it's like a finally a bit?

Copy link
Contributor Author

@vyzo vyzo Nov 17, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more like unwind-protect at function-scope.
Each function has an unwind stack, where defer pushes thunks. And at the function epilogue, the unwind stack is poped and the thunks are executed.


auth.peers = make(map[p2p_peer.ID][]string)
for id58, xrules := range rmap {
pid, err := p2p_peer.IDB58Decode(id58)
if err != nil {
return err
}

xxrules, ok := xrules.([]interface{})
if !ok {
return BadRuleset
}

rules := make([]string, len(xxrules))
for x, xxrule := range xxrules {
rule, ok := xxrule.(string)
if !ok {
return BadRuleset
}

rules[x] = rule
}

auth.peers[pid] = rules
}

return nil
}

func (auth *PeerAuth) toJSON() map[string]interface{} {
auth.mx.Lock()
defer auth.mx.Unlock()

rmap := make(map[string]interface{})
for pid, rules := range auth.peers {
rmap[pid.Pretty()] = rules
}

return rmap
}

func (auth *PeerAuth) authorize(pid p2p_peer.ID, nss []string) bool {
if len(nss) == 0 {
return false
}

auth.mx.Lock()
defer auth.mx.Unlock()

rules := auth.peers[pid]
if len(rules) == 0 {
return false
}

for _, ns := range nss {
if !auth.authorizeAllow(rules, ns) {
return false
}
}

return true
}

func (auth *PeerAuth) authorizeAllow(rules []string, ns string) bool {
for _, rule := range rules {
switch {
case rule == "*":
return true

case strings.HasSuffix(rule, ".*"):
if strings.HasPrefix(ns, rule[:len(rule)-2]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about foo.bar.*.*?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm, I don't think it's worth the trouble.
I kept it simple and used the same wildcard semantics we have in MCQL.
If it proves to be insufficient for some use case, we can generalize to support regexps.

return true
}

case rule == ns:
return true
}
}

return false
}

func (auth *PeerAuth) getRules(pid p2p_peer.ID) []string {
auth.mx.Lock()
defer auth.mx.Unlock()
return auth.peers[pid]
}

func (auth *PeerAuth) setRules(pid p2p_peer.ID, rules []string) {
auth.mx.Lock()
auth.peers[pid] = rules
auth.mx.Unlock()
}

func (auth *PeerAuth) clearRules(pid p2p_peer.ID) {
auth.mx.Lock()
delete(auth.peers, pid)
auth.mx.Unlock()
}
Loading