Skip to content

Commit

Permalink
Merge pull request #7 from ralph-tice/master
Browse files Browse the repository at this point in the history
route53 collection attempt
  • Loading branch information
coryb committed Feb 21, 2013
2 parents 2f003f9 + 2b83fda commit 1f02c1b
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 3 deletions.
2 changes: 1 addition & 1 deletion src/main/scala/com/netflix/edda/Collection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ abstract class Collection(val ctx: Collection.Context) extends Queryable {

/** stop elector, crawler and shutdown ForkJoin special scheduler */
override def stop() {
logger.info("Stoping " + this)
logger.info("Stopping " + this)
Option(elector).foreach(_.stop())
Option(crawler).foreach(_.stop())
fjScheduler.shutdown()
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/netflix/edda/MergedCollection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class MergedCollection(val name: String, val collections: Seq[Collection]) exten

/** stop the actors for all the merged collections then stop this actor */
override def stop() {
logger.info("Stoping " + this)
logger.info("Stopping " + this)
collections.foreach(_.stop())
super.stop()
}
Expand Down
8 changes: 8 additions & 0 deletions src/main/scala/com/netflix/edda/aws/AwsClient.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.amazonaws.services.elasticloadbalancing.AmazonElasticLoadBalancingCli
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.sqs.AmazonSQSClient
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient
import com.amazonaws.services.route53.AmazonRoute53Client

/** provides access to AWS service client objects
*
Expand Down Expand Up @@ -102,4 +103,11 @@ class AwsClient(val provider: AWSCredentialsProvider, val region: String) {
client.setEndpoint("monitoring." + region + ".amazonaws.com")
client
}

/** get [[com.amazonaws.services.route53.AmazonRoute53Client]] object */
def route53 = {
val client = new AmazonRoute53Client(provider)
client.setEndpoint("route53.amazonaws.com")
client
}
}
48 changes: 47 additions & 1 deletion src/main/scala/com/netflix/edda/aws/AwsCollections.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ object AwsCollectionBuilder {
val elb = new AwsLoadBalancerCollection(dsFactory, accountName, elector, ctx)
val asg = new AwsAutoScalingGroupCollection(dsFactory, accountName, elector, ctx)
val inst = new AwsInstanceCollection(res.crawler, dsFactory, accountName, elector, ctx)
val hostedZones = new AwsHostedZoneCollection(dsFactory, accountName, elector, ctx)
val hostedRecords = new AwsHostedRecordCollection(hostedZones.crawler, dsFactory, accountName, elector, ctx)
Seq(
new AwsAddressCollection(dsFactory, accountName, elector, ctx),
asg,
Expand All @@ -113,7 +115,10 @@ object AwsCollectionBuilder {
new AwsBucketCollection(dsFactory, accountName, elector, ctx),
new AwsSimpleQueueCollection(dsFactory, accountName, elector, ctx),
new AwsReservedInstanceCollection(dsFactory, accountName, elector, ctx),
new GroupAutoScalingGroups(asg, inst, dsFactory, elector, ctx))
new GroupAutoScalingGroups(asg, inst, dsFactory, elector, ctx),
hostedZones,
hostedRecords
)
}
}

Expand Down Expand Up @@ -591,3 +596,44 @@ class GroupAutoScalingGroups(
super.delta(modNewRecords, oldRecords)
}
}

/** collection for AWS Route53 hosted zones
*
* root collection name: aws.hostedZones
*
* see crawler details [[com.netflix.edda.aws.AwsHostedZoneCrawler]]
*
* @param dsFactory function that creates new DataStore object from collection name
* @param accountName account name to be prefixed to collection name
* @param elector Elector to determine leadership
* @param ctx context for configuration and AWS clients objects
*/
class AwsHostedZoneCollection(
dsFactory: String => Option[DataStore],
val accountName: String,
val elector: Elector,
override val ctx: AwsCollection.Context) extends RootCollection("aws.hostedZones", accountName, ctx) {
val dataStore: Option[DataStore] = dsFactory(name)
val crawler = new AwsHostedZoneCrawler(name, ctx)
}

/** collection for AWS Route53 record sets
*
* root collection name: aws.hostedRecords
*
* see crawler details [[com.netflix.edda.aws.AwsHostedRecordCrawler]]
*
* @param dsFactory function that creates new DataStore object from collection name
* @param accountName account name to be prefixed to collection name
* @param elector Elector to determine leadership
* @param ctx context for configuration and AWS clients objects
*/
class AwsHostedRecordCollection(
val zoneCrawler: AwsHostedZoneCrawler,
dsFactory: String => Option[DataStore],
val accountName: String,
val elector: Elector,
override val ctx: AwsCollection.Context) extends RootCollection("aws.hostedRecords", accountName, ctx) {
val dataStore: Option[DataStore] = dsFactory(name)
val crawler = new AwsHostedRecordCrawler(name, ctx, zoneCrawler)
}
114 changes: 114 additions & 0 deletions src/main/scala/com/netflix/edda/aws/AwsCrawlers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ import com.amazonaws.services.autoscaling.model.DescribePoliciesRequest
import com.amazonaws.services.elasticloadbalancing.model.DescribeLoadBalancersRequest
import com.amazonaws.services.elasticloadbalancing.model.DescribeInstanceHealthRequest

import com.amazonaws.services.route53.model.ListHostedZonesRequest
import com.amazonaws.services.route53.model.GetHostedZoneRequest
import com.amazonaws.services.route53.model.ListResourceRecordSetsRequest

import collection.JavaConverters._

import java.util.concurrent.Executors
Expand Down Expand Up @@ -577,3 +581,113 @@ class AwsReservedInstanceCrawler(val name: String, val ctx: AwsCrawler.Context)
override def doCrawl() = ctx.awsClient.ec2.describeReservedInstances(request).getReservedInstances.asScala.map(
item => Record(item.getReservedInstancesId, new DateTime(item.getStart), ctx.beanMapper(item))).toSeq
}


/** crawler for Route53 Hosted Zones (DNS records)
*
* @param name name of collection we are crawling for
* @param ctx context to provide beanMapper and configuration
*/
class AwsHostedZoneCrawler(val name: String, val ctx: AwsCrawler.Context) extends Crawler(ctx) {
val request = new ListHostedZonesRequest

override def doCrawl() = ctx.awsClient.route53.listHostedZones(request).getHostedZones.asScala.map(
item => Record(item.getName, ctx.beanMapper(item))).toSeq
}

case class AwsHostedRecordCrawlerState(hostedZones: Seq[Record] = Seq[Record]())

object AwsHostedRecordCrawler extends StateMachine.LocalState[AwsHostedRecordCrawlerState]

/** crawler for Route53 Resource Record Sets (DNS records)
* this is a secondary crawler that crawls the resource recordsets for each hosted zone
* and then pulls out each recordset in the zones to track them seperately
*
* @param name name of collection we are crawling for
* @param ctx context to provide beanMapper and configuration
* @param crawler the awsHostedZone crawler
*/
class AwsHostedRecordCrawler(val name: String, val ctx: AwsCrawler.Context, val crawler: Crawler) extends Crawler(ctx) {

import AwsHostedRecordCrawler._

override def crawl() {}

// we dont crawl, just get updates from crawler when it crawls
override def doCrawl() = throw new java.lang.UnsupportedOperationException("doCrawl() should not be called on HostedRecordCrawler")

private[this] val logger = LoggerFactory.getLogger(getClass)
private[this] val threadPool = Executors.newFixedThreadPool(10)
/** for each zone call listResourceRecordSets and map that to a new document
*
* @param zones the records to crawl
* @return the record set for the resourceRecordSet
*/
def doCrawl(zones: Seq[Record]): Seq[Record] = {

val futures: Seq[java.util.concurrent.Future[Seq[Record]]] = zones.map(
zone => {
val zoneId = zone.data.asInstanceOf[Map[String,Any]]("id").asInstanceOf[String]
val zoneName = zone.id
val request = new ListResourceRecordSetsRequest(zoneId)
threadPool.submit(
new Callable[Seq[Record]] {
def call() = {
val it = new AwsIterator() {
def next() = {
val response = ctx.awsClient.route53.listResourceRecordSets(request.withStartRecordName(this.nextToken.get))
this.nextToken = Option(response.getNextRecordName)
response.getResourceRecordSets.asScala.map(
item => {
Record(item.getName, ctx.beanMapper(item).asInstanceOf[Map[String,Any]] ++ Map("zone" -> Map("id" -> zoneId, "name" -> zoneName)))
}
).toList
}
}
it.toList.flatten
}
}
)
}
)
var failed: Boolean = false
val records = futures.map(
f => {
try Some(f.get)
catch {
case e: Exception => {
failed = true
logger.error(this + "exception from listResourceRecordSets", e)
None
}
}
}
).collect({
case Some(rec) => rec
}).flatten

if (failed) {
throw new java.lang.RuntimeException("failed to crawl resource record sets")
}
records
}

protected override def initState = addInitialState(super.initState, newLocalState(AwsHostedRecordCrawlerState()))

protected override def init() {
crawler.addObserver(this)
}

protected def localTransitions: PartialFunction[(Any, StateMachine.State), StateMachine.State] = {
case (Crawler.CrawlResult(from, hostedZones), state) => {
// this is blocking so we dont crawl in parallel
if (hostedZones ne localState(state).hostedZones) {
val newRecords = doCrawl(hostedZones)
Observable.localState(state).observers.foreach(_ ! Crawler.CrawlResult(this, newRecords))
setLocalState(Crawler.setLocalState(state, CrawlerState(newRecords)), AwsHostedRecordCrawlerState(hostedZones))
} else state
}
}

override protected def transitions = localTransitions orElse super.transitions
}

0 comments on commit 1f02c1b

Please sign in to comment.