Practical Cassandra: A Developer's Approach

ptg11524036 ptg11524036 Practical Cassandra ptg11524036 ptg11524036 Practical Cassandra A Developer’s Approach Russell Bradberry Eric Lubow Upper Saddle River, NJ • Boston • Indianapolis • San Francisco New York • Toronto • Montreal • London • Munich • Paris • Madrid Capetown • Sydney • Tokyo • Singapore • Mexico City ptg11524036 Many of the designations used by manufacturers and sellers to distinguish their products are claimed as trademarks. Where those designations appear in this book, and the publisher was aware of a trademark claim, the designations have been printed with initial capital letters or in all capitals. The authors and publisher have taken care in the preparation of this book, but make no expressed or implied warranty of any kind and assume no responsibility for errors or omis- sions. No liability is assumed for incidental or consequential damages in connection with or arising out of the use of the information or programs contained herein. For information about buying this title in bulk quantities, or for special sales opportunities (which may include electronic versions; custom cover designs; and content particular to your business, training goals, marketing focus, or branding interests), please contact our corporate sales department at or (800) 382-3419. For government sales inquiries, please contact For questions about sales outside the U.S., please contact Visit us on the Web: Cataloging-in-Publication Data is on file with the Library of Congress. Copyright © 2014 Pearson Education, Inc. All rights reserved. Printed in the United States of America. This publication is protected by copyright, and permission must be obtained from the publisher prior to any prohibited repro- duction, storage in a retrieval system, or transmission in any form or by any means, electronic, mechanical, photocopying, recording, or likewise. To obtain permission to use material from this work, please submit a written request to Pearson Education, Inc., Permissions Department, One Lake Street, Upper Saddle River, New Jersey 07458, or you may fax your request to (201) 236-3290. ISBN-13: 978-0-321-93394-2 ISBN-10: 0-321-93394-X Text printed in the United States on recycled paper at RR Donnelley in Crawfordsville, Indiana. First printing, December 2013 ptg11524036 ❖ This book is for the community. We have been a part of the Cassandra community for a few years now, and they have been fantastic every step of the way. This book is our way of giving back to the people who have helped us and have allowed us to help pave the way for the future of Cassandra. ❖ ptg11524036 This page intentionally left blank ptg11524036 Contents Foreword by Jonathon Ellis  xiii Foreword by Paul Dix  xv Preface xvii Acknowledgments xxi About the Authors  xxiii 1 Introduction to Cassandra  1 A Greek Story  1 What Is NoSQL?  2 There’s No Such Thing as “Web Scale”  2 ACID, CAP, and BASE  2 ACID 3 CAP 3 BASE 4 Where Cassandra Fits In  5 What Is Cassandra?  5 History of Cassandra  6 Schema-less (If You Want)  7 Who Uses Cassandra?  7 Is Cassandra Right for Me?  7 Cassandra Terminology  8 Cluster 8 Homogeneous Environment  8 Node 8 Replication Factor  8 Tunable Consistency  8 Our Hope  9 2 Installation 11 Prerequisites 11 Installation 11 Debian 12 RedHat/CentOS/Oracle 12 From Binaries  12 ptg11524036 Configuration 13 Cluster Setup  15 Summary 16 3 Data Modeling  17 The Cassandra Data Model  17 Model Queries—Not Data  19 Collections 22 Sets 22 Lists 23 Maps 24 Summary 25 4 CQL 27 A Familiar Way of Doing Things  27 CQL 1  27 CQL 2  28 CQL 3  28 Data Types  28 Commands 30 Example Schemas  37 Summary 39 5 Deployment and Provisioning   41 Keyspace Creation  41 Replication Factor  41 Replication Strategies  42 SimpleStrategy 42 NetworkTopologyStrategy 42 Snitches 43 Simple 43 Dynamic 43 Rack Inferring  44 EC2 44 Ec2MultiRegion 45 Property File  45 PropertyFileSnitch Configuration  46 Partitioners 46 viii Contents ptg11524036 Byte Ordered  47 Random Partitioners  47 Node Layout  48 Virtual Nodes  48 Balanced Clusters  49 Firewalls 49 Platforms 49 Amazon Web Services  50 Other Platforms  50 Summary 50 6 Performance Tuning  51 Methodology 51 Testing in Production  52 Tuning 52 Timeouts 52 CommitLog 53 MemTables 54 Concurrency 55 Durability and Consistency  55 Compression 56 SnappyCompressor 58 DeflateCompressor 58 File System  58 Caching 59 How Cassandra Caching Works  59 General Caching Tips  59 Global Cache Tuning  60 ColumnFamily Cache Tuning  61 Bloom Filters  61 System Tuning  62 Testing I/O Concurrency  62 Virtual Memory and Swap  63 sysctl Network Settings   64 File Limit Settings  64 Solid-State Drives  64 JVM Tuning  65 ixContents ptg11524036 Multiple JVM Options  65 Maximum Heap Size  65 Garbage Collection  66 Summary 67 7 Maintenance 69 Understanding nodetool 69 General Usage  71 Node Information  72 Ring Information  72 ColumnFamily Statistics  73 Thread Pool Statistics  74 Flushing and Draining  75 Cleaning 75 upgradesstables and scrub   76 Compactions 76 What, Where, Why, and How  76 Compaction Strategies  77 Impact 77 Backup and Restore  79 Are Backups Necessary?  79 Snapshots 79 CommitLog Archiving  81 archive_command  81 restore_command  81 restore_directories  81 restore_point_in_time  82 CommitLog Archiving Notes  82 Summary 82 8 Monitoring 83 Logging 83 Changing Log Levels  84 Example Error  84 JMX and MBeans  85 JConsole 86 Health Checks  91 x Contents ptg11524036 Nagios 91 Cassandra-Specific Health Checks  94 Cassandra Interactions  96 Summary 96 9 Drivers and Sample Code  99 Java 100 C# 104 Python 108 Ruby 112 Summary 117 10 Troubleshooting 119 Toolkit 119 iostat  119 dstat  120 nodetool  121 Common Problems  121 Slow Reads, Fast Writes  122 Freezing Nodes  123 Tracking Down OOM Errors  124 Ring View Differs between Nodes  124 Insufficient User Resources  124 Summary 126 11 Architecture 127 Meta Keyspaces  127 System Keyspace  127 Authentication 128 Gossip Protocol  129 Failure Detection  130 CommitLogs and MemTables  130 SSTables 130 HintedHandoffs 131 Bloom Filters  131 Compaction Types  132 Tombstones 132 Staged Event-Driven Architecture  133 Summary 134 xiContents ptg11524036 12 Case Studies  135 Ooyala 135 Hailo 137 Taking the Leap  138 Proof Is in the Pudding  139 Lessons Learned  140 Summary 141 eBay 141 eBay’s Transactional Data Platform  141 Why Cassandra?  142 Cassandra Growth  143 Many Use Cases  143 Cassandra Deployment  146 Challenges Faced and Lessons Learned  147 Summary 147 A Getting Help  149 Preparing Information  149 IRC 149 Mailing Lists  149 B Enterprise Cassandra  151 DataStax 151 Acunu 152 Titan by Aurelius  153 Pentaho 154 Instaclustr 154 Index  157 xii Contents ptg11524036 Foreword by Jonathon Ellis I was excited to learn that Practical Cassandra would be released right at my five-year anniversary of working on Cassandra. During that time, Cassandra has achieved its goal of offering the world’s most reliable and performant scalable database. Along the way, Cassandra has changed significantly, and a modern book is, at this point, overdue. Eric and Russell were early adopters of Cassandra at SimpleReach; in Practical Cassandra, you benefit from their experience in the trenches administering Cassandra, developing against it, and building one of the first CQL drivers. If you are deploying Cassandra soon, or you inherited a Cassandra cluster to tend, spend some time with the deployment, performance tuning, and maintenance chapters. Some complexity is inherent in a distributed system, particularly one designed to push performance limits and scale without compromise; forewarned is, as they say, forearmed. If you are new to Cassandra, I highly recommend the chapters on data modeling and CQL. The Cassandra Query Language represents a major shift in developing against Cassandra and dramatically lowers the learning curve from what you may expect or fear. Here’s to the next five years of progress! —Jonathon Ellis, Apache Cassandra Chair ptg11524036 This page intentionally left blank ptg11524036 Foreword by Paul Dix Cassandra is quickly becoming one of the backbone components for anyone working with large datasets and real-time analytics. Its ability to scale horizontally to handle hundreds of thousands (or millions) of writes per second makes it a great choice for high-volume systems that must also be highly available. That’s why I’m very pleased that this book is the first in the series to cover a key infrastructural component for the Addison-Wesley Data & Analytics Series: the data storage layer. In 2011, I was making my second foray into working with Cassandra to create a high- volume, scalable time series data store. At the time, Cassandra 0.8 had been released, and the path to 1.0 was fairly clear, but the available literature was lagging sorely behind. This book is exactly what I could have used at the time. It provides a great introduction to setting up and modeling your data in Cassandra. It has coverage of the most recent features, including CQL, sets, maps, and lists. However, it doesn’t stop with the introductory stuff. There’s great material on how to run a cluster in production, how to tune performance, and on general operational concerns. I can’t think of more qualified users of Cassandra to bring this material to you. Eric and Russell are Datastax Cassandra MVPs and have been working extensively with Cassandra and running it in production for years. Thankfully, they’ve done a great job of distilling their experience into this book so you won’t have to search for insight into how to develop against and run the most current release of Cassandra. —Paul Dix, Series Editor ptg11524036 This page intentionally left blank ptg11524036 Preface Apache Cassandra is a massively scalable, open-source, NoSQL database. Cassandra is best suited to applications that need to store large amounts of structured, semistructured, and unstructured data. Cassandra offers asynchronous masterless replication to nodes in many data centers. This gives it the capability to have no single point of failure while still offering low latency operations. When we first embarked on the journey of writing a book, we had one goal in mind: We wanted to keep the book easily digestible by someone just getting started with Cassandra, but also make it a useful reference guide for day-to-day maintenance, tuning, and troubleshooting. We know the pain of scouring the Internet only to find outdated and contrived examples of how to get started with a new technology. We hope that Practical Cassandra will be the go-to guide for developers—both new and at an interme- diate level—to get up and running with as little friction as possible. This book describes, in detail, how to go from nothing to a fully functional Cassandra cluster. It shows how to bring up a cluster of Cassandra servers, choose the appropriate configuration options for the cluster, model your data, and monitor and troubleshoot any issues. Toward the end of the book, we provide sample code, in-depth detail as to how Cassandra works under the covers, and real-world case studies from prominent users. What’s in This Book? This book is intended to guide a developer in getting started with Cassandra, from instal- lation to common maintenance tasks to writing an application. If you are just starting with Cassandra, this book will be most helpful when read from start to finish. If you are familiar with Cassandra, you can skip around the chapters to easily find what you need. nn Chapter 1, Introduction to Cassandra: This chapter gives an introduction to Cassandra and the philosophies and history of the project. It provides an overview of terminology, what Cassandra is best suited for, and, most important what we hope to accomplish with this book. nn Chapter 2, Installation: Chapter 2 is the start-to-finish guide to getting Cassandra up and running. Whether the installation is on a single node or a large cluster, this chapter guides you through the process. In addition to cluster setup, the most important configuration options are outlined. nn Chapter 3, Data Modeling: Data modeling is one of the most important aspects of using Cassandra. Chapter 3 discusses the primary differences between Cassandra ptg11524036 xviii Preface and traditional RDBMSs, as well as going in depth into different design patterns, philosophies, and special features that make Cassandra the data store of tomorrow. nn Chapter 4, CQL: CQL is Cassandra’s answer to SQL. While not a full implementa- tion of SQL, CQL helps to bridge the gap when transitioning from an RDBMS. This chapter explores in depth the features of CQL and provides several real-world examples of how to use it. nn Chapter 5, Deployment and Provisioning: After you’ve gotten an overview of instal- lation and querying, this chapter guides you through real-world deployment and resource provisioning. Whether you plan on deploying to the cloud or on bare- metal hardware, this chapter is for you. In addition to outlining provisioning in various types of configurations, it discusses the impact of the different configuration options and what is best for different types of workloads. nn Chapter 6, Performance Tuning: Now that you have a live production cluster deployed, this chapter guides you through tweaking the Cassandra dials to get the most out of your hardware, operating system, and the Java Virtual Machine ( JVM). nn Chapter 7, Maintenance: Just as with everything in life, the key to having a perfor- mant and, more important, working Cassandra cluster is to maintain it properly. Chapter 7 describes all the different tools that take the headache out of maintaining the components of your system. nn Chapter 8, Monitoring: Any systems administrator will tell you that a healthy sys- tem is a monitored system. Chapter 8 outlines the different types of monitoring options, tools, and what to look out for when administering a Cassandra cluster. nn Chapter 9, Drivers and Sample Code: Now that you have a firm grasp on how to manage and maintain your Cassandra cluster, it is time to get your feet wet. In Chapter 9, we discuss the different drivers and driver features offered in various languages. We then go for the deep dive by presenting a working example applica- tion in not only one, but four of the most commonly used languages: Java, C#, Ruby, and Python. nn Chapter 10, Troubleshooting: Now that you have written your sample application, what happens when something doesn’t quite work right? Chapter 10 outlines the tools and techniques that can be used to get your application back on the fast track. nn Chapter 11, Architecture: Ever wonder what goes on under the Cassandra “hood”? In this chapter, we discuss how Cassandra works, how it keeps your data safe and accurate, and how it achieves such blazingly fast performance. nn Chapter 12, Case Studies: So who uses Cassandra, and how? Chapter 12 presents three case studies from forward-thinking companies that use Cassandra in unique ways. You will get the perspective straight from the mouths of the developers at Ooyala, Hailo, and eBay. ptg11524036 xixPreface nn Appendix A, Getting Help: Whether you’re stuck on a confusing problem or just have a theoretical question, having a place to go for help is paramount. This appen- dix tells you about the best places to get that help. nn Appendix B, Enterprise Cassandra: There are many reasons to use Cassandra, but sometimes it may be better for you to focus on your organization’s core competencies. This appendix describes a few companies that can help you leverage Cassandra efficiently and effectively while letting you focus on what you do best. Code Samples All code samples and more in-depth examples can be found on GitHub at ptg11524036 This page intentionally left blank ptg11524036 Acknowledgments We would like to acknowledge everyone involved with Cassandra and the Cassandra community—everyone from the core contributors of Cassandra all the way down to the end users who have made it such a popular platform to work with. Without the commu- nity, Cassandra wouldn’t be where it is today. Special thanks go to nn Jay Patel for putting together the eBay case study nn Al Tobey and Evan Chan for putting together the case study on Ooyala nn Dominic Wong for putting together the Hailo case study nn All the technical reviewers, including Adam Chalemian, Mark Herschberg, Joe Stein, and Bryan Smith, who helped give excellent feedback and ensured technical accuracy where possible nn Paul Dix for setting us up and getting us on the right track with writing ptg11524036 This page intentionally left blank ptg11524036 About the Authors Russell Bradberry (Twitter: @devdazed) is the principal architect at SimpleReach, where he is responsible for designing and building out highly scalable, high-volume, distributed data solutions. He has brought to market a wide range of products, including a real-time bidding ad server, a rich media ad management tool, a content recommenda- tion system, and, most recently, a real-time social intelligence platform. He is a U.S. Navy veteran, a DataStax MVP for Apache Cassandra, and the author of the NodeJS Cassandra driver Helenus. Eric Lubow (Twitter: @elubow) is currently chief technology officer of SimpleReach, where he builds highly scalable, distributed systems for processing social data. He began his career building secure Linux systems. Since then he has worked on building and administering various types of ad systems, maintaining and deploying large-scale Web applications, and building email delivery and analytics systems. He is also a U.S. Army combat veteran and a DataStax MVP for Apache Cassandra. Eric and Russ are regular speakers about Cassandra and distributed systems, and both live in New York City. ptg11524036 This page intentionally left blank ptg11524036 1 Introduction to Cassandra Apache Cassandra is a powerful and massively scalable NoSQL database. It is architected to handle real-time big-data workloads across multiple data centers with no single point of failure. It works on commodity hardware and can easily be deployed in a cloud-based infrastructure. But before we get into the nitty-gritty of things, here is a quick lesson in Greek mythology. A Greek Story In Greek mythology, Cassandra was the beautiful daughter of King Priam and Queen Hecuba of Troy, the twin sister of Helenus and younger sister to the great Trojan warrior Hector, and eventually a priestess of Apollo. She was believed to be the second-most beautiful woman in the world. Her beauty was compared to the likes of Aphrodite or Helen of Troy. She had red curly hair, blue eyes, and fair skin and was intelligent, charming, friendly, and very desirable. The other side of Cassandra was that she was generally considered to be insane. When Apollo first saw Cassandra, he immediately fell in love with her. To show his love, he offered her the gift of prophecy if she would kiss him, and she agreed. But when Apollo went to kiss Cassandra, instead of a kiss, she spat in his mouth. Because Apollo had already granted Cassandra the gift of prophecy, he could not take it away. But he did change it so that even though Cassandra would always know what was going to happen, nobody would ever believe her. And in fabled fashion, when Cassandra told the people of Troy that the Trojan Horse was bad news, they ignored her and Troy was captured. After the Trojans lost the war, a Greek warrior named Ajax took Cassandra prisoner and gave her to King Agamemnon as a slave. She told Agamemnon that his wife, Clytemnestra, was going to kill him. But Apollo’s curse did not allow anyone to believe her. After killing her husband, King Agamemnon, Clytemnestra then killed Cassandra. The reason for telling this story is twofold. First, it shows a little about why the name Cassandra was chosen for this database. She was a repository of knowledge of things that were going to happen. This is similar to the way you can use the Cassandra system to help you build a better product by having a keen understanding of what’s going on around you. ptg11524036 2 Chapter 1 Introduction to Cassandra Second, the names of many of the characters in this and other Greek tragedies are used for the names of many of the applications that play well with Cassandra. These include Helenus (the Node.js driver), Priam (a Cassandra automation tool), and Hector (the Java driver), just to name a few. What Is NoSQL? There is no single definition for NoSQL. To some it stands for “Not Only SQL”; to others it means “No SQL.” Either way, it refers to the departure from the traditional relational database technologies that have dominated the development landscape for the past few decades. What is likely the largest driver of the NoSQL movement is a commonly held belief that relational databases are not well suited to large amounts of data and scale. Whether or not this is true, the emergence of the key/value, graph, document, and “big table” data storage engines shows that a new generation of database technologies is taking center stage. There is no single database technology that is synonymous with the NoSQL movement. Branding and marketing seem to be mostly what determine how relevant a technology is to the terminology. There’s No Such Thing as “Web Scale” Another marketing term that gets thrown around quite frequently is “Web scale.” It is used quite often when discussing how to determine whether a database system is suitable for a particular Web application’s needs and whether it will hold up as the application grows. This is a very subjective term as everyone’s needs are different. A simple SQL setup will achieve most scalability needs. Depending on the read/write patterns of an application, one may need a specialized database, such as Kyoto Cabinet (previously named Tokyo Cabinet) for key/value or MongoDB as a document store. In a system that needs high write throughput and linear scalability, Cassandra is a great fit and will hold up under some very heavy workloads. The key point to remember when discussing the idea of Web scale technologies is that nearly everything out there will scale with enough money, hardware, and headaches. The trick is to figure out which piece of software is best suited for your usage patterns and workloads and will scale out in a way suitable for your application and your organization. ACID, CAP, and BASE Before we get too deep into Cassandra, it is important to understand some of the basic concepts that surround databases so you know what concessions you may have to make when choosing a system. There are three main sets of properties that define what database systems are capable of. Those are ACID, CAP, and BASE. ACID comprises some of the general properties of database systems. CAP covers a little more about distributed systems. BASE is a little newer theory and includes the practical considerations of implementing a distributed system. ptg11524036 3ACID, CAP, and BASE Understanding these theories will help you to understand where some of the design decisions come in, not only for Cassandra but also for your application and how it is developed. The idea of building distributed applications and distributed systems often comes down to give and take. You may give up consistency for availability. You may find it’s wiser for your application’s needs to give a little on availability in favor of consistency. ACID, CAP, and BASE are the driving technical theories behind many of these decisions. It is important to understand the trade-offs made in the design of the underlying systems (Cassandra) so you can ensure that your application performs the way you expect it to perform. ACID ACID stands for Atomicity, Consistency, Isolation, and Durability. In order to understand ACID and how it relates to databases, we need to talk about transactions. When it comes to databases, a transaction is defined as a single logical operation. For example, if you are shopping online, every time you add an item to your shopping cart, that item and its quantity make up the database transaction. Even if you add multiple items or multiple quantities of the same item with a single click, that entire shopping cart addition is just a single transaction. Atomicity means that each transaction either works or it doesn’t. This is to say that if any single part of the transaction fails, the entire transaction fails. This should hold true for every situation related to a transaction that could cause a failure. Network failure, power outage, or even a node outage occurring at transaction time should cause a complete transaction failure in an atomic system. Consistency ensures that when a transaction is complete, whether it is successful or not, the database is still in a valid state. This means that any data written to the database must also be valid. When writing data to the database, you also need to include any database application-level rules such as constraints, cascades, triggers, or stored procedures. The application of those rules should also leave the data in a valid state. Isolation is a property that ensures that all transactions that are run concurrently appear as if they were executed serially (one right after the other). Each transaction must be run in a vacuum (isolation). This is to say that if two transactions are run at the same time, they remain independent of each other during the transaction. Some examples of isolation are locks (table, row, column, etc.), dirty reads, and deadlocks. The reason these are rele- vant is concurrency. Multiple changes can be attempted on the same data or set of data. Knowing what version of the data is the correct one is important for keeping the entire system in a sane state. Durability means that after the transaction is complete, it will remain that way. In other words, the data change that is incurred by the transaction is stored permanently, regardless of external events (such as a power failure). CAP The CAP theorem, also known as Brewer’s theorem, asserts that it is impossible for a distributed system to satisfy all three CAP guarantees. CAP stands for Consistency, ptg11524036 4 Chapter 1 Introduction to Cassandra Availability, and Partition tolerance. The important thing to note about the CAP theorem is that all three parts of it cannot be satisfied at the same time. Although the C in CAP also stands for “consistency” (similar to the C in ACID), the meaning is different. Consistency means that all nodes in a grouping see the same data at the same time. In other words, any particular query hitting any node in the system will return the same result for that specific query. Consistency also further implies that when a query updates a value in one node, the data will be updated to reflect the new value prior to the next query. The availability of a system speaks to the guarantee that regardless of the success or failure of a request, the requestor will receive a response. This means that system operations will be able to continue even if part of the system is down, whatever the reason. Availability is what lets the software attempt to cope with and compensate for externalities such as hardware failures, network outages, power failures, and the like. Partition tolerance refers to the capability of a distributed system to effectively distribute the load across multiple nodes. The load could be data or queries. This implies that even if a few nodes are down, the system will continue to function. Sharding is a commonly used management technique for distributing load across a cluster. Sharding, which is similar to horizontal partitioning, is a way of splitting data into separate parts and moving them to another server or physical location, generally for performance improvements. There are various reasons that all three parts of the theorem cannot be satisfied in distributed systems. Most have to do with the volume of the data and how long it takes to move data around and check to ensure that it is correct. CAP is often used to justify the use of weaker consistency models. Many of the CAP-based ideas have evolved into the idea of BASE. BASE Just as in chemistry, BASE is at the opposite end of the spectrum from ACID. BASE stands for Basically Available, Soft state, and Eventual consistency. The notion of BASE comes in when dealing with a distributed system so large that maintaining the principles of CAP becomes impractical. It is worth noting that the constraints on transactions from ACID are still in play at some level; they just happen at different times with slightly different rules. Having a system be basically available means that the system will respond to any request. The caveat is that the response may be a failure to get the data or that the data may be in an inconsistent or changing state. This is equivalent in the real world to depositing a check in your bank account and waiting for it to go through the clearinghouse to make the funds available to you. Using the BASE terminology, we can expand on the idea of banking with checks. If your bank has only one branch, consistency and availability are satisfied. No partitioning is necessary, and every transaction you make will be available and consistent with itself. If your bank has two branches, when you deposit a check into branch A, branch B will not see the funds instantaneously because the data needs time to become eventually consistent. What if ptg11524036 5What Is Cassandra? you deposit two checks and one bounces? The entire transaction should not fail because of one check; each check will be processed in isolation. A problem with one check should not cause a problem with the whole system. That would not make for a very durable system. If the computers at branch A go down, that shouldn’t stop branch B from working completely. That would mean that the system isn’t very available, so there are safety nets in place. The idea of a soft-state system means the system is always changing. This is typically due to eventual consistency. It is common for soft-state systems to undergo changes even when there is no additional input to them. Eventual consistency refers to the concept that once a system stops receiving input, the data will propagate to wherever else it needs to be in the system sooner or later. The beauty of this is that the system does not check for consistency on every transaction as is expected in an ACID-compliant system. Where Cassandra Fits In Now that we have a decent idea of the tenets of a distributed system, it’s time to take a look at where Cassandra excels. There are a lot of database systems, and nearly all of them were designed to handle a particular problem efficiently and effectively. But the most important thing that you need to know when deciding whether Cassandra is the right tool for the job is the goal of the job. In other words, if you can illustrate what it is you are trying to accomplish, you’ll be able to determine if Cassandra is what you need to be successful. In the context of the Web analytics application that we are building, Cassandra is suitable for a variety of reasons. One of the most common use cases for Cassandra is dealing with time-series data. What this means is that there is a sequence of successive data points that are all related to the same topic. For example, every time a page view happens on your Web site, an entry is made into the logs with the time of the event (page view), including some metadata around that event (IP, browser, URL, etc.). Now let’s say your Web site isn’t made up of just one or two Web servers, but a whole cluster of Web servers is required to support your traffic. And let’s also say that you want to store the resulting Web server data in a database and not just aggregate logs on a log server. How is Cassandra well suited for that? Before you can answer whether or not Cassandra is the right tool to help you solve your problem, we should talk about what Cassandra is and where it came from. What Is Cassandra? Cassandra is an open-source distributed database management system. It is designed to handle large amounts of data spread across many commodity servers while remaining highly available. Cassandra is loosely defined as a key/value store where one key can map to one or more values. Although early in its life Cassandra was just a key/value store, it has evolved into much more. It is now commonly seen as a hybrid containing common properties of two types ptg11524036 6 Chapter 1 Introduction to Cassandra of databases: a key/value store and a row store. Unlike a relational database management system (RDBMS), Cassandra ColumnFamilys (similar to relational tables) do not need to have matching columns within a row. Even rows within a ColumnFamily are not required to always follow the same naming schema. The options are available, but data patterns are not strictly enforced. Data can also be added in very high volumes at very high velocities, and Cassandra will determine the correct version of a piece of data by resolving the time- stamp at which it was inserted into the system. Architecturally, its decentralized nature allows for no single point of failure and ensures that every node in the cluster has the same role. This means that every node in the cluster can serve any request. Cassandra also supports replication and multi-data-center replication. Since replication strategies are configurable, you can set up your distribution architecture to be as centralized or spread out, or as redundant or fail-safe, as you would like. Because data is automatically replicated to nodes, downed or faulty nodes are easily replaceable. New nodes can be added at will, without downtime, to increase read and write throughput or even just availability. The consistency levels are tunable, which allows you to have the application enforce the amount of resources applied to data assurance at a transaction level. Cassandra also has an ecosystem being built around it. There are monitoring systems like OpsCenter to help you see the health of your cluster and manage common admin- istration tasks. There are drivers for many of the major languages. Cassandra now comes with integration points for Hadoop and MapReduce support, full text search with Solr, and Apache Pig and Hive support. There is even a SQL-like query language called CQL, or Cassandra Query Language, to help in the data modeling and access patterns. History of Cassandra Apache Cassandra was originally developed at Facebook in 2008 to power Facebook’s in-box search feature. The original authors were Avinash Lakshman, who also is one of the authors of the Amazon Dynamo paper, and Prashant Malik. After being in production at Facebook for a while, Cassandra was released as an open-source project on Google Code in July of 2008. In March of 2009, it was accepted to the Apache Foundation as an incubator project. In February of 2010, it became a top-level Apache project. As of the time of this writing, the most recent version of Apache Cassandra is the 1.2 series. Cassandra has come a long way since the first major release after its graduation to a top-level Apache project. It has picked up support for Hadoop, text search integra- tion through Solr, CQL, zero-downtime upgrades, virtual nodes (vnodes), and self-tuning caches, just to name a few of the major features. Cassandra is still in constant heavy development, and new features are always being added and tested. Note The central paper on Cassandra, written by the primary Facebook engineers, is called “Cassandra—A Decentralized Structured Storage System” and is available at www.cs ptg11524036 7What Is Cassandra? Schema-less (If You Want) Cassandra ColumnFamilys are considered schema-less. This means that you do not need to define a schema ahead of time. If you want to add a column, you simply specify the column name at write-time and the column will be created if it doesn’t exist. This lends itself to allowing for extremely wide rows, even rows that have millions of columns. Additionally, rows do not need to contain all or even any of the columns that other rows in the same table contain. Cassandra does give you the option to create a schema, however. If you know what your data structure looks like, you can add column names and specify default types for those columns. This also enables you to add secondary indexes for the columns that you know about. Who Uses Cassandra? Cassandra is in wide use around the world, and usage is growing all the time. Companies like Netflix, eBay, Twitter, Reddit, and Ooyala all use Cassandra to power pieces of their architecture, and it is critical to the day-to-day operations of those organizations. To date, the largest publicly known Cassandra cluster by machine count has over 300TB of data spanning 400 machines. Because of Cassandra’s ability to handle high-volume data, it works well for a myriad of applications. This means that it’s well suited to handling projects from the high-speed world of advertising technology in real time to the high-volume world of big-data analytics and everything in between. It is important to know your use case before moving forward to ensure things like proper deployment and good schema design. Is Cassandra Right for Me? This isn’t a very easy question to answer. Using Cassandra requires thinking in a different way about how you store data. While there are rows and columns, Cassandra is, at its base, a key/value store. There is no built-in full text search; there are no B-tree indexes or data manipulation functions. One of the biggest differences between Cassandra and standard SQL RDBMSs is that there are no manipulation functions. These include SUM, GROUP, JOIN, MAX, MIN, and any other method you would use to modify the data at query time. While deciding if Cassandra is a good fit for your use case, know that a lot of data manipulation can be achieved at write-time rather than read-time. This, of course, means that you will be storing different views of the same data in multiple places. This is not necessarily a bad thing. One example of this is to use counter columns where you would need aggregation. This is as easy as incrementing a value for each of the different ways you want to see your data. This pattern does require that you know what questions you want to ask ahead of time; if you need ad hoc data analysis in real time, Cassandra may not be the right fit. ptg11524036 8 Chapter 1 Introduction to Cassandra Cassandra Terminology In order to understand Cassandra, a good place to start is the vocabulary. Cluster A cluster is two or more Cassandra instances working together. These instances communicate with each other using the Gossip protocol. Homogeneous Environment Cassandra is considered homogeneous. This means that each and every Cassandra node contains everything required to complete a cluster. This differs from other systems such as HBase, which have master servers, region servers, ZooKeeper servers, and other different types of nodes. With Cassandra, expanding a cluster is as easy as adding a new node that is identical to every other node with the exception of the configuration and data for which it is responsible. This takes some complexity out of managing an infrastructure with many nodes. Node A node is an instance of Cassandra. A Cassandra cluster is made up of many nodes. If you are building a test cluster on a single machine and have multiple instances of Cassandra running, each instance would be considered a node. Replication Factor Replication factor (RF) is a setting on a keyspace that determines how many copies of the data will reside in the cluster. A replication factor of 3 means that there will be three copies of the data within a cluster. The replication factor also determines the number of nodes that return when using quorum reads/writes. A quorum read/write means that the query will be sent to (RF/2 ϩ 1). Given an RF of 3, the query will be sent to two nodes (decimals are always rounded down). If you always do quorum reads and writes, you will always have consistent responses as at least one node in the replica set has the data that is being queried. Tunable Consistency Because different reads/writes may have different needs in terms of consistency, you can specify the consistency at read/write-time. Consistency level (CL) ANY is for writes only and ensures that the write will persist on any server in the cluster. CL ONE ensures that at least one server within the replica set will persist the write or respond to the read; this is the minimum consistency level for reads. CL QUORUM means the read/write will go to half of the nodes in the replica set plus one. CL LOCAL_QUORUM is like QUORUM but applies to only those nodes within the same data center. CL EACH_QUORUM is like QUORUM but ensures a quorum read/write on each of the data centers. CL ALL ensures that all nodes in a replica set will receive the read/write. ptg11524036 9Our Hope Due to the nature of quorum reads/writes, the minimum size cluster you can have, and still survive a single node outage with consistent reads/writes, is a three-node cluster with a replication factor of 3. This means that each quorum read/write goes to two nodes; the third node will be eventually consistent. This allows you to lose one node and still have consistent data. In the event that a consistency level cannot be met (e.g., enough nodes are down that the read/write cannot be guaranteed), Cassandra will respond with an Unavailable Exception. If this happens, the application can decide to lower the consistency level and try again or handle the error state. Our Hope We know there is a lot to learn when diving into Cassandra. Just like any other distrib- uted system, it can be complex. But by the end of this book, we hope to have simplified it enough for you not only to build a new application based on Cassandra but also to be able to administer the cluster supporting your application. We will walk you through building an application that will show the basics of data modeling, schema design, and common usage patterns best suited for Cassandra. We will cover the basics of what languages are available for use with Cassandra and how to get going with some of the most common ones. This book is geared toward people who are looking to get up and running on Cassandra quickly. The goal is to keep to the facts and mentalities that will enable you to design and build your own applica- tions and administer your own clusters as quickly as possible. ptg11524036 This page intentionally left blank ptg11524036 2 Installation Cassandra is a Java application at its core. Because of this, there are certain considerations that need to be taken into account when installing it. This chapter will highlight the dif- ferent installation types for the base binary and packages. We will go over the important configuration items and files. Then, once Cassandra is installed, we will dive into setting up single- and multinode clusters, assigning the proper tokens for a cluster, and then ensuring that our cluster is up and running. Prerequisites Cassandra requires the most stable release of Java 1.6. While it may run on Java 1.7, it has not been fully tested and vetted for that version. If you are running multiple versions of Java on a single system, set the JAVA_HOME variable to the path of the 1.6 version and add 1.6 to the beginning of the user’s PATH. There are several directories that you should be aware of when installing Cassandra. The data directories contain the SSTable files for all ColumnFamilys. This folder defaults to /var/lib/cassandra/data. The data directories should be on a separate volume from the system volume. The CommitLog directories store Cassandra’s append-only data; the default directory for the CommitLog is /var/lib/cassandra/commitlog. This volume should not reside on the same volume as the system or the data directory. The CommitLog relies on the performance of append-only files, and doing random seeks during append-only writes will greatly affect the write performance. The saved cache’s default directory is located in /var/lib/cassandra/saved_caches. This directory is where the key/row caches get saved for faster loading on start-up. By default, Cassandra writes all of its logs to /var/log/cassandra. All directories need to be writable by the user who is running Cassandra. Installation Installation coverage includes installation from Debian and RedHat/CentOS/Oracle packages and from binaries. ptg11524036 12 Chapter 2 Installation Debian Add the following to /etc/apt/sources.list.d/cassandra.sources.list: deb stable main Install DataStax Community: curl -L | sudo apt-key add - sudo apt-get update sudo apt-get install dsc12 The Cassandra service will start automatically. RedHat/CentOS/Oracle Make sure you have EPEL (Extra Packages for Enterprise Linux) installed: rpm -Uvh Add a Yum repository for DataStax in /etc/yum.repos.d/datastax.repo: [datastax] nameϭ DataStax Repo for Apache Cassandra baseurlϭ enabledϭ1 gpgcheckϭ0 Install DataStax Community using Yum: yum install dsc12 Start DataStax Community (as a single-node cluster): sudo dsc cassandra From Binaries Download the .tar archive: curl -OL Unpack the .tar archive for DataStax Community: tar -xzvf dsc.tar.gz Go to the install directory: cd dsc-cassandra-1.2.x Start DataStax Community from the install directory: sudo bin/cassandra ptg11524036 13Configuration Configuration The Cassandra configuration files, by default, are kept in the conf directory in the application path, or if you install via the .deb or .rpm packages, they will be in the /etc/cassandra directory. The cassandra.yaml file contains most of the configuration items required to successfully set up a Cassandra cluster. The following list describes each of the most important options in the cassandra.yaml file and their defaults: n cluster_name Default: “Test Cluster.” The name of this cluster. The name of the cluster determines which cluster a node will belong to and prevents the node from joining an unwanted cluster by mistake. n num_tokens Default: 256. The number of tokens randomly assigned to this node. When virtual nodes (vnodes) are used, this tells Cassandra how much data, in proportion to the cluster, this node owns. Given a three-node cluster, if one node has ten tokens and the other two have five tokens, the node with ten tokens will own 50% of the data in the entire cluster. One may want to do this when using commodity hardware that may not all be the same. n initial_token Default: blank. When vnodes are not being used, or there is only one token per node, the initial token specifies where in the range this node belongs. If this field is left blank, Cassandra will try to determine which nodes in the cluster have the largest load, and it will bisect the range of those nodes. If Cassandra cannot deter- mine which nodes have the highest load, it will select a random token. Because of this, when setting up a new cluster, it is important to manually calculate the tokens for each node that will be in the cluster. n authenticator Default: org.apache.cassandra.auth.AllowAllAuthenticator. This chooses the Java class used for authentication with Cassandra. By default, there is no authentication for a node in Cassandra. Optionally, you may specify org.apache.cassandra.auth.Password Authenticator. That will keep usernames and hashed passwords in the system_auth.cre- dentials table. It is recommended to use a replication factor of 2 or higher when using the PasswordAuthenticator to prevent data loss in the event of an outage. n authorizer Default: org.apache.cassandra.auth.AllowAllAuthorizer. This chooses the Java class responsible for limiting and granting permissions to Cassandra objects. By default, the AllowAllAuthorizer turns off authorization. Optionally, the CassandraAuthorizer may be used, in which case the permissions will be stored in the system_auth .permissions table. ptg11524036 14 Chapter 2 Installation n permissions_validity_in_ms Default: 2000. When using the Authorizer, this specifies how long to cache permis- sions. If permissions do not change very often, increase this value to increase the read/write performance. This is ignored when using the AllowAllAuthorizer. n partitioner Default: org.apache.cassandra.dht.Murmur3Partitioner. This specifies the Java class that is responsible for partitioning the data among the nodes. The Murmur3Partitioner is an extremely fast hash-based partitioner for random distribution. Optionally, one may also choose the ByteOrderedPartitioner, which orders the keys by their byte value, or the CollatingOPP, which orders based on the U.S. English value. It is important to note that when using the ordered partitioners, the ability to do range slices is given but may also lead to hot spots. If you change this parameter, you will destroy all data in the data directories. n data_file_directories Default: /var/lib/cassandra/data. Where Cassandra should store the data on disk. n commitlog_directory Default: /var/lib/cassandra/commitlog. Where Cassandra should store the CommitLogs. This value should reside on a different volume from the data_ file_directories. n disk_failure_policy Default: stop. When a disk fails, Cassandra can react differently depending on need. By default, Cassandra will stop Gossip and Thrift on the node, leaving it effectively dead. Optionally, you may specify best_effort, which will attempt to use the remaining good disks. The best_effort option is best when used with JBoD (“just a bunch of disks”) configurations. You may also specify ignore, which will respond with a failure when attempting to access the node. n saved_caches_directory Default: /var/lib/cassandra/saved_caches. Specifies the directory in which to store the saved caches. n commitlog_sync Default: periodic. By default, Cassandra will acknowledge writes immediately, then periodically fsync those writes to disk. Optionally, if batch is specified, Cassandra will not acknowledge writes until the batch has been fsynced. n commitlog_sync_period_in_ms Default: 10000. Specifies the time in which Cassandra will fsync writes to disk. When using batch syncs, this value should be low as writes will block until the sync happens. n commitlog_segment_size_in_mb Default: 32. This specifies how large the CommitLog will grow before a new file is created. n seed_provider Default: org.apache.cassandra.locator.SimpleSeedProvider. Specifies the Java class that will provide the seeds that will allow nodes to autodetect the cluster. The SimpleSeedProvider by default takes a parameter of seeds, which is a comma-delimited list of nodes to act as ptg11524036 15Cluster Setup seed nodes. When running a multiple-node cluster, it is important to have as many seeds as possible so new nodes will be able to bootstrap in the event of an outage of a seed node. n concurrent_reads Default: 32. The number of concurrent reads that are allowed to take place. Because heavy read loads can pull large amounts of data from disk, the reads are going to be I/O bound. A general rule of thumb is to set this value to 16 * the number of disks in use by data_file_directories. n concurrent_writes Default: 32. The number of concurrent writes. Because writes are appended to the CommitLog, they are almost never I/O bound. The general rule of thumb for concurrent writes is 16 * the number of cores in the machine. n memtable_total_space_in_mb Default: not specified. When specified, Cassandra will flush the largest MemTable when this limit has been reached. When left unspecified, Cassandra will flush the largest MemTable when it reaches one-third of the heap. n listen_address Default: localhost. This is the address that the host listens on. When left unspecified, the listen address will default to the local address. In most cases, this will work. If left at localhost, other nodes may not be able to communicate. Cluster Setup Simply installing the package and running Cassandra will run a single-node cluster. The default configuration options will all be suitable with the exception of listen_address, which needs to be changed from localhost in order to take external connections. Multinode clusters are slightly more complex to set up as the new nodes need to know about the existing nodes in the cluster. New Cassandra nodes discover information about the other nodes via a protocol called Gossip. To get everything working correctly, we will need to modify a few of the configuration options to ensure that Gossip will be able to communicate properly and to ensure that your data is distributed properly. When bringing a new node into the cluster, you must specify a “seed node.” The seed nodes are a set of nodes that are used to give information about the cluster to newly joining nodes. The seed nodes should be stable and should also point to other seed nodes; that is, there should be no single seed node that will point to itself as the seed node. The seed nodes are specified in the partitioner section of the configuration file. The second option to specify is the initial_token. A token represents a node’s place in the cluster, as well as how much data that node is supposed to own. The token is a number between 0 and (2127-1). This initial token is used only the first time the node boots, after which you will need to use nodetool move to change the token. In most situations, you will want to have a balanced cluster, where every node owns the same amount of data. When creating a balanced cluster, the formula to create the tokens is 2^127/K * N, where K is the total number of nodes in the cluster and N is the num- ber of the nodes you are calculating for based on a zero index. Listing 2.1 shows pseudo code representing how to generate tokens for a six-node cluster. ptg11524036 16 Chapter 2 Installation Listing 2.1 Pseudo Code to Print Out Even Tokens in a Six-Node Cluster NODESϭ6 for each n in 0 to NODES : token ϭ 2^127 / NODES * n print token Once the cluster is up and running, you can use the command-line tool nodetool to show various statistics about the nodes in the cluster. The option for nodetool to describe the ring, its tokens, load, and the effective ownership of the nodes in the cluster is nodetool ring. Just run the command from any node in the ring to get an output of the entire ring’s statistics. Listing 2.2 shows the usage of nodetool ring and example output for a six-node cluster. Listing 2.2 Using nodetool to Show Cluster State $ nodetool ring Address DC Rack Status State Load Owns Token 141784319550391026443072753096570088105 datacenter1 rack1 Up Normal 6.82 KB 16.67% 0 datacenter1 rack1 Up Normal 9.09 KB 16.67% 28356863910078205288614550619314017621 datacenter1 rack1 Up Normal 13.5 KB 16.67% 56713727820156410577229101238628035242 datacenter1 rack1 Up Normal 15.57 KB 16.67% 85070591730234615865843651857942052863 datacenter1 rack1 Up Normal 13.52 KB 16.67% 113427455640312821154458202477256070484 datacenter1 rack1 Up Normal 6.82 KB 16.67% 141784319550391026443072753096570088105 Summary Installing Cassandra is made easier by the use of common package management systems. While the default configuration options will get you up and running, it is important to know each of the configuration options available and what they mean. In this chapter, we discussed only the basic, most commonly used configuration options. Chapter 6, “Performance Tuning,” will go deeper into the configuration options and how to use each of them to get the best performance out of your Cassandra cluster. ptg11524036 3 Data Modeling When creating a data model for your keyspace, the most important thing to do is to forget everything you know about relational data modeling. Relational data models are designed for efficient storage, relational lookups, and associations between concerns. The Cassandra data model is designed for raw performance and storage of vast amounts of data. Unlike relational databases, the data model for Cassandra is based on the query patterns required. This means that you have to know the read/write patterns before you create your data model. This also applies to indexes. Indexes in Cassandra are a requirement for specific types of queries, unlike a relational database where indexes are a performance- tuning device. In this chapter, we will highlight some key differences between creating a relational model and a Cassandra model. We will then dive into an example data model for storing time-series data. The Cassandra Data Model To understand how to model in Cassandra, you must first understand how the Cassandra data model works. Cassandra gets its data distribution from the Dynamo whitepaper by Amazon and its data representation from the BigTable whitepaper by Google. When creating a table using CQL, you are not only telling Cassandra what the name and type of data are, you are also telling it how to store and distribute your data. This is done via the PRIMARY KEY operator. The PRIMARY KEY tells the Cassandra storage sys- tem to distribute the data based on the value of this key; this is known as a partition key. When there are multiple fields in the PRIMARY KEY, as is the case with compound keys, the first field is the partition key (how the data is distributed) and the subsequent fields are known as the clustering keys (how the data is stored on disk). Clustering keys allow you to pregroup your data by the values in the keys. Using compound keys in Cassandra is commonly referred to as “wide rows.” “Wide rows” refers to the rows that Cassandra is storing on disk, rather than the rows that are represented to you when you make a query. ptg11524036 18 Chapter 3 Data Modeling Figure 3.1 shows how the data in Listing 3.1 might be stored in a five-node cluster. duckwolf cat duck dog wolf Figure 3.1 Illustration of how data may be stored using a PRIMARY KEY Listing 3.1 Illustration of How Data May Be Stored Using a Single PRIMARY KEY CREATE TABLE animals ( name TEXT PRIMARY KEY, species TEXT, subspecies TEXT, genus TEXT, family TEXT ); SELECT * FROM animals; name |family | genus | species | subspecies dog | Canidae | Canis | C. lupus | C. l. familiaris cat | Felidae | Felis | F. catus | null duck | Anatidae | Anas | A. platyrhynchos | null wolf | Canidae | Canis | C. lupus | null Figure 3.2 shows how the data in Listing 3.2 might be stored in a five-node cluster using a COMPOUND KEY. duckwolf cat duck dog/wolf Figure 3.2 Illustration of how data may be stored using a COMPOUND KEY ptg11524036 19Model Queries—Not Data Listing 3.2 Illustration of How Data May Be Stored Using a COMPOUND KEY CREATE TABLE animals ( name TEXT, species TEXT, subspecies TEXT, genus TEXT, family TEXT, PRIMARY KEY(family, genus) ); SELECT * FROM animals; name | family | genus | species | subspecies dog | Canidae | Canis | C. lupus | C. l. familiaris wolf | Canidae | Canis | C. lupus | null cat | Felidae | Felis | F. catus | null duck | Anatidae | Anas | A. platyrhynchos | null As you can see in Figure 3.2, when we use a COMPOUND KEY, the data for wolf and for dog is stored on the same server. This is because we changed the partition to “family” and clustered on “genus.” Literally, this means that the data for each family will be stored on the same replica sets and presorted, or clustered, by the genus. This will allow for very fast lookups when the family and genus for an animal are known. Model Queries—Not Data The first thing you should consider when creating a data model in Cassandra is the performance characteristics of the query patterns. In Cassandra, rows are not segmented across nodes. This means that if a row exists on a node, the entire row will exist on that node. If you have a heavy read or write load on a particular key, this can lead to hot spots. Hot spots occur when a particular key (row) gets so many queries that it causes the load on the machine to spike. High load on a given machine can cause cluster-wide complications as the communication channels start to back up. Consideration also needs to be given to the row size. A single row has to fit on disk; if you have a single row containing billions of columns, it may extend past the amount of available disk space on the drive. In Listing 3.3, you can see a typical way to store event logs in a relational database. There is an atomically incrementing ID field, an event time, an event type ID that relates to an event type, and some information about the event. While you may be able to mimic this model in Cassandra, it would not be performant and would cause queries that would require two lookups (one for the event row and the other for the event type), as Cassandra does not support joins. Listing 3.3 Example of a Relational Data Model for Log Storage CREATE TABLE events ( id INT PRIMARY KEY, time TIME, (Continues) ptg11524036 20 Chapter 3 Data Modeling Listing 3.3 Example of a Relational Data Model for Log Storage (continued) event_type INT references event_types(id), data text ); SELECT * FROM events; id | time | event_type | data 1 | 16:41:33.90814 | 4 | some event data 2 | 16:59:48.9131 | 2 | some event data 3 | 17:12:12.12758 | 4 | some event data 4 | 17:32:17.83765 | 1 | some event data 5 | 17:48:57.10934 | 0 | some event data To get around the lack of joins, we can just store the event_type value in the column every time. This denormalization of data is certainly not frowned upon when using Cassandra. Modeling for Cassandra should follow the idea that disk space is cheap, so duplicating data (even multiple times) should not be an issue. In fact, “normalization” of data is an anti-pattern in Cassandra. The Cassandra model would be similar; however, there are a few key differences that will make a world of difference in performance and usability. Listing 3.4 shows an exact copy of the relational version, but storing each event type as the value rather than the ID of a relation. The primary concern with this model is that each row has a single event. This makes it very difficult to find events that belong to a particular time or event type. Basically, you have to know the ID of the event to get its information without doing a full scan of the ColumnFamily. Listing 3.4 Example of Cassandra Data Model for Log Storage (Copy of RDBMS) CREATE TABLE events ( id UUID PRIMARY KEY, time TIMESTAMP, event_type TEXT, data text ); We can solve some of the issues here with indexes; however, the code would not be as performant as it could be. Let’s say you would like to get all events for a particular hour. We can easily add an index to the time field, but this will cause excessive load, as every event will need to be pulled from different rows scattered about the cluster. To compensate for this, since we know the hour we want ahead of time, we can key off that, and by using dynamic tables in Cassandra, we can ensure that every event for that particular hour exists, physically, on the same row. When you specify multiple fields in the PRIMARY KEY, Cassandra will key off the first field and every subsequent field will be a part of the column name. In Listing 3.5, we restructure the code to store fields by hour in the table to make hourly lookups of events easier. ptg11524036 21Model Queries—Not Data Listing 3.5 Example of Cassandra Data Model for Log Storage (Low Traffic) CREATE TABLE events ( hour TIMESTAMP, id UUID, time TIMESTAMP, event_type TEXT, data text PRIMARY KEY(hour, id) ); With this new model, we can look up all events from a single hour easily as we can ensure that everything that happened within that hour exists in a single row on disk. This may be a perfectly suitable model for a low-volume application. However, if you have heavy reads and/or writes, you may need to segment the row into multiple rows. Seeing as an event per row is difficult to query, and a row every hour can lead to hot spots, as all reads and writes are going to the same files on disk, we can segment the row into multi- ple rows that are easy to read as long as we have a piece of information to query by; in this case, it is event_type. We can further improve performance by ensuring that the order is stored by event time rather than ID. This will make it possible to do range que- ries based on the time of an event, without scanning the entire row. In Listing 3.6, we will use a composite row key and clustering order to remove hot spots and order by event time in a descending fashion. Listing 3.6 Example of Cassandra Data Model for Log Storage (Optimized) CREATE TABLE events ( hour TIMESTAMP, id TIMEUUID, time TIMESTAMP, event_type TEXT, data text, PRIMARY KEY((hour, event_type), time) ) WITH CLUSTERING ORDER BY (time DESC); SELECT * FROM events; hour | event_type | time | data | id 2013-06-13 11:00:00 | click | 2013-06-13 11:00:00 | some data | 3... 2013-06-13 11:00:00 | page_view | 2013-06-13 11:00:01 | some data | 2... 2013-06-13 11:00:00 | error | 2013-06-13 11:00:05 | some data | 0... 2013-06-13 11:00:00 | redirect | 2013-06-13 11:00:09 | some data | 1... Now that we have a mechanism for storing our raw events, we may also want to track metrics around the events. Cassandra does not have the ability to create aggregate metrics in an ad hoc manner. In order to track specific aggregate information, we first need to know what aggregate metrics we would like to track. In this example, we will track event ptg11524036 22 Chapter 3 Data Modeling types by hour. A good candidate for this type of tracking is Cassandra counter columns. Listing 3.7 shows the table creation for a counter ColumnFamily. Listing 3.7 Example of Counter Column Table Creation CREATE TABLE event_metrics ( hour TIMESTAMP, event_type TEXT, count COUNTER, PRIMARY KEY(hour, event_type) ); CREATE TABLE url_metrics ( hour TIMESTAMP, url TEXT, count COUNTER, PRIMARY KEY(hour, url) ); Now that we have created the tables for event_metrics that contain our counters, we can update the counters at the same time using a BATCH statement. Listing 3.8 shows the insertion of the raw event and updating of the counters in a single atomic batch. Listing 3.8 Example of Using an Atomic Counter BATCH to Insert and Update INSERT INTO events (hour, id, time, event_type, data) VALUES ('2013-06-13 11:00:00', NOW(), '2013-06-13 11:43:23', 'click', '{"url":""}') BEGIN COUNTER BATCH UPDATE event_metrics SET count count 1 WHERE hour '2013-06-13 11:00:00' AND event_type 'click' UPDATE url_metrics SET count count 1 WHERE hour '2013-06-13 11:00:00' AND url '' APPLY BATCH; Collections Cassandra also includes collections as part of its data model. Collections are a complex type that can provide flexibility in querying. Sets Cassandra sets provide a means of keeping a unique set of items without the need for read-before-write. This means that one can easily solve the problem of tracking unique e-mail addresses or unique IP addresses. Lists are ordered by the natural order of the type selected. Listing 3.9 shows how to create a table with a set type and query it. ptg11524036 23Collections Listing 3.9 Example of Using a Set CREATE TABLE users ( email TEXT PRIMARY KEY, portfolios SET@UUID:, tickers SET@TEXT: ); UPDATE users SET portfolios portfolios {756716f7-2e54-4715-9f00-91dcbea6cf50}, tickers tickers {'AMZN'} WHERE email ''; UPDATE users SET portfolios portfolios {756716f7-2e54-4715-9f00-91dcbea6cf50}, tickers tickers {'GOOG'} WHERE email ''; email | portfolios | tickers | {756716f7-2e54-4715-9f00-91dcbea6cf50} | {'AMZN', 'GOOG'} UPDATE users SET tickers tickers - {'AMZN'} WHERE email ''; email | portfolios | tickers | {756716f7-2e54-4715-9f00-91dcbea6cf50} | {'GOOG'} DELETE tickers FROM users WHERE email ''; email | portfolios | tickers | {756716f7-2e54-4715-9f00-91dcbea6cf50} | null Lists When uniqueness is not required, and maintaining order is required, Cassandra lists come in handy. Let’s say we want to allow our users to specify the top five tickers from the pre- vious example. Listing 3.10 shows an example of lists. Listing 3.10 Example of Using Lists ALTER TABLE users ADD top_tickers list; UPDATE users SET top_tickers ['GOOG'] WHERE email ''; UPDATE users SET top_tickers top_tickers ['AMZN'] WHERE email ''; email | portfolios | tickers | top_tickers | {756716f7-2e54... } | null | ['GOOG', 'AMZN'] (Continues) ptg11524036 24 Chapter 3 Data Modeling Listing 3.10 Example of Using Lists (continued) UPDATE users SET top_tickers[1] 'FB' WHERE email ''; email | portfolios | tickers | top_tickers | {756716f7-2e54...} | null | ['GOOG', 'FB'] UPDATE users SET top_tickers top_tickers – ['FB'] WHERE email ''; email | portfolios | tickers | top_tickers | {756716f7-2e54...} | null | ['GOOG'] Maps Cassandra maps provide a dictionary-like object with keys and values. Maps are useful when you want to store table-like data within a single Cassandra row. This can help elim- inate the pain of not having joins, or remove the need to store JSON data within a single column value. Listing 3.11 shows an example of using maps. Listing 3.11 Example of Using Maps ALTER TABLE users ADD ticker_updates maptext, timestamp; UPDATE users SET ticker_updates { 'AMZN':'2013-06-13 11:42:12' } WHERE email ''; email | portfolios | ticker_updates | {756716f7...} | {'AMZN': '2013-06-13 11:42:12-0400'} UPDATE users SET ticker_updates['GOOG'] '2013-06-13 12:51:31' WHERE email ''; email | portfolios | ticker_updates | {756716f7... } | {'AMZN': '2013-06-13 11:42:12-0400', 'GOOG': '2013-06-13 12:51:31-0400'} DELETE ticker_updates['AMZN'] FROM users WHERE email ''; email | portfolios | ticker_updates | {756716f7...} | {'GOOG': '2013-06-13 12:51:31-0400'} ptg11524036 25Summary Summary Data modeling in Cassandra may seem counterintuitive to someone who is used to a relational database. Certain concessions need to be made to gain performance in high- volume workloads. The main points to take away from this chapter are that relational database modeling techniques will almost never apply; model your queries, not your data; and denormalization and duplication of data are not bad—in fact, they are recommended. Also, keep in mind that “normalization” of your data, in any way, is almost never recom- mended. Collections can be very powerful, but they may impact performance when it comes to very large data sets. ptg11524036 This page intentionally left blank ptg11524036 4 CQL Due to the complex data structures that Cassandra has to offer, it is critical that there be a simple way to manipulate the data you have stored. SQL seemed to be the obvious choice, but there are many things that SQL offers in the way of RDBMSs that Cassandra just cannot yet do. Enter CQL. Cassandra Query Language (CQL) strives to be as close to SQL as possible. Given that Cassandra is a nonrelational database, a fully featured SQL construct is not possi- ble. The main thing to note about CQL is that it has no concept of GROUP or JOIN, and a very limited implementation of ORDER BY. This chapter will focus on data creation and manipulation using CQL 3. We will show the parallels of CQL with other features of Cassandra and show the end-result data struc- tures that are created when defining different CQL schemas. We will first discuss the differences between the major versions of CQL and how they represent the underlying data structures. We will then start our focus on CQL 3, moving from data types to avail- able commands, usage, and finally to example data structures and their correlation to Cassandra storage mechanisms. A Familiar Way of Doing Things Before CQL there was Thrift. Thrift is a multilanguage Remote Procedure Call (RPC) layer that was designed to make client integration as easy as wrapping a library. As the complexity of Cassandra grew, more and more client drivers started falling behind in new feature implementation. CQL arose out of the need for a server-side API that would make keeping up with client drivers much easier and require less frequent updates. CQL 1 CQL 1 was a very basic implementation. It had the ability to create ColumnFamilys and indexes and to select and insert data. It did not have support for composite columns, wide rows, or other advanced data types. The tokens used in the query language were the same as those used in the Thrift client. This made it easy to move from Thrift to CQL, as the ideas were the same, just expressed in an SQL-like statement. ptg11524036 28 Chapter 4 CQL CQL 2 CQL 2 is often referred to as “a language for wide rows.” CQL 2 filled in the gaps that CQL 1 had. This meant that one could create a wide-row ColumnFamily and access it using CQL. Arguably the most used and most important feature of Cassandra is the ability to get a slice of columns from a row. This enables a user to create very wide rows where the column name may not be known ahead of time. In many ways, CQL 2 was just an SQL-like syntax of the Thrift API representation of data. This meant the user still had to be aware of the underlying data structures and concepts that come along with storing data in Cassandra. This made CQL 2 easy to implement for current Cassandra users as it mapped directly to the storage engine. There were, however, many problems with this. Because the number of columns returned was indefinite, there had to be two LIMIT clauses, one for the row limit and another for the column limit. This created several issues when there could be different column counts in rows. Functions like COUNT would not work either, for the same reason. As a result, COUNT was never implemented. You also could not specify indexes on wide rows because of the underlying limitation that indexes could only be on predefined columns. Since the columns are not defined ahead of time, there is no way to specify an index for them. Also, if you were using CompositeType columns in your wide rows, the column names needed to be unpacked manually. This led to the clients’ or the applications’ needing to know the composite packing structure for CQL. CQL 3 CQL 3 compensates for the shortcomings of the previous versions by actually transposing the wide rows and unpacking them into named columns. This means that many rows can be visualized via CQL 3 from a single physical row in the data structure. CQL 3 can accomplish this by attaching metadata to the ColumnFamily and to each row. Since CQL 3 has special metadata attached, this presents a problem for backward com- patibility with legacy tables. All of your current tables already exist in the CQL 3 space as COMPACT STORAGE or legacy tables. In CQL 3, the PRIMARY KEY determines whether a ColumnFamily is static or dynamic. A static ColumnFamily will have a single PRIMARY KEY, while a dynamic ColumnFamily will have a compound PRIMARY KEY. When creat- ing a dynamic ColumnFamily, the first part of the compound PRIMARY KEY is known as the partition key. In the Thrift API, this maps directly to the row key. Each subsequent part of the compound PRIMARY KEY is the column’s name. If there are more than two parts to the PRIMARY KEY, the column name becomes a column of CompositeType and each of the parts after the partition key is a part of the composite. CQL 3 uses this information to generate the rows with proper column names. Data Types CQL 3 supports many data types, including all data types available to Cassandra. Table 4.1 shows the supported data types and their associated meanings. ptg11524036 29A Familiar Way of Doing Things Dates When using the Date type in Cassandra, you can specify the timestamp as either an inte- ger representation of milliseconds since the UNIX epoch, or one of several variations on the ISO 8601 date format. Here are a few examples of proper dates, all showing the date June 13, 2013: n 1371081600000 n 2013-06-13 00:00ϩ0000 n 2013-06-13 00:00:00ϩ0000 n 2013-06-13T00:00ϩ0000 n 2013-06-13T00:00:00ϩ0000 n 2013-06-13 00:00 n 2013-06-13 00:00:00 n 2013-06-13T00:00 n 2013-06-13T00:00:00 n 2013-06-13 n 2013-06-13ϩ0000 Table 4.1 CQL 3 Data Types Type Description ascii ASCII character string. bigint 64-bit signed long. blob Arbitrary bytes (no validation). boolean True or false. counter Counter column (64-bit signed value). See the “Counters” section for details. decimal Variable-precision decimal. double 64-bit IEEE 754 floating point. float 32-bit IEEE 754 floating point. inet An IP address. It can be either 4 bytes long (IPv4) or 16 bytes long (IPv6). There is no inet constant; IP addresses should be inputted as strings. int 32-bit signed int. text UTF-8 encoded string. timestamp A timestamp. Timestamps can be entered as either a string date or an integer as the number of milliseconds since the UNIX epoch (January 1, 1970, UTC). timeuuid Type 1 UUID. This is generally used as a “conflict-free” timestamp. uuid Type 1 or type 4 UUID. varchar UTF-8 encoded string. varint Arbitrary-precision integer. ptg11524036 30 Chapter 4 CQL It is important to note that when the time zone is omitted, the time zone used will be the one that is configured for that particular Cassandra node. This could have implications for data centers that are configured in different time zones. When the time is omitted, the time of 00:00:00 will be used in its place. Counters When working with counters, it is very important to note that you cannot create a table with columns that have a type of counter mixed with any other type. Tables created for counters are physically separate from other types of tables. The counter type may not be set; you can increment or decrement the counter only by a specified amount. Counters may not be part of the PRIMARY KEY of the table. TimeUUID TimeUUID types have a few extra functions that allow you to extract the time informa- tion from the TimeUUID object. now() The now function when executed will return a new TimeUUID with the time of the cur- rent timestamp; this ensures that the TimeUUID created is globally unique and contains the current time when the statement was executed. This statement is effectively useless for WHERE clauses. minTimeuuid() and maxTimeuuid() The minTimeuuid and maxTimeuuid functions are used when querying ranges of TimeUUIDs by their embedded time. When using these methods, it is important to note that they do not generate RFC-4122-compliant UUIDs. This means that the values returned are not guaranteed to be globally unique and therefore should be used only when querying data and never for inserting data. The functions take a parameter that is compatible with the timestamp type. See Listing 4.1 for example usage. Listing 4.1 Example Usage of minTimeuuid() and maxTimeuuid() SELECT * FROM events WHERE event_time Ͼ maxTimeuuid('2013-01-01 00:05ϩ0000') AND event_time Ͻ minTimeuuid('2013-02-02 10:00ϩ0000') dateOf() and unixTimestampOf() The dateOf and unixTimestampOf functions take an argument of TimeUUID and return the timestamp in it. The dateOf method will return a type of timestamp, whereas the unixTimestampOf method will return a bigint that is representative of the number of milliseconds since the UNIX epoch. Commands CQL 3 supports a subset of the SQL commands. In the following sections, we will describe the commands that are currently supported and show example usage of each one. ptg11524036 31A Familiar Way of Doing Things CREATE/ALTER KEYSPACE CREATE KEYSPACE (Listing 4.2) and ALTER KEYSPACE are used to add or modify top- level strategy options for a collection of tables. When creating a keyspace, the keyspace name must be alphanumeric with a length of 32 or less. There are two supported proper- ties for the CREATE/ALTER KEYSPACE commands, replication and durable_writes. When specifying replication, you may choose one of two options, SimpleStrategy and NetworkTopologyStrategy. SimpleStrategy will create a replication factor that is consistent across the entire cluster, and the only option available is the replication_factor, which must be defined. NetworkTopologyStrategy allows you to decide the replication factor for each individual data center. The durable_writes option specifies whether or not to use the CommitLog when writing data. This is on by default and should not be turned off as data loss could result in the event of an outage. Listing 4.2 Example Usage of CREATE KEYSPACE CREATE KEYSPACE Analytics WITH replication ϭ {'class': 'SimpleStrategy', 'replication_factor' : 3}; CREATE KEYSPACE Analytics WITH replication ϭ {'class': 'NetworkTopologyStrategy', 'west' : 1, 'east' : 3} AND durable_writes ϭ false; USE The USE statement (Listing 4.3) switches the keyspace from the one you are working in. This works exactly like the USE statement in SQL. Listing 4.3 Example Usage of USE USE Analytics; DROP KEYSPACE The DROP KEYSPACE command (Listing 4.4) works exactly like the SQL DROP DATABASE command. This operation is irreversible and removes all information within the specified keyspace. Listing 4.4 Example Usage of DROP KEYSPACE DROP KEYSPACE Analytics; CREATE TABLE/COLUMNFAMILY The CREATE TABLE statement creates a table. A table is defined as a collection of rows and columns. CREATE COLUMNFAMILY is an alias for CREATE TABLE. Primary Keys The PRIMARY KEY in the table definition defines the physical key in the underlying Cassandra data structure. Because of this, the PRIMARY KEY must be defined in the ptg11524036 32 Chapter 4 CQL column definitions. Other than this, the syntax is similar to the corresponding SQL syntax. When defining the PRIMARY KEY, if you decide to use a compound key, only the first part of the key will be used as the underlying row key. This is called the partition key. If you want to have a composite partition key, this is defined by adding a set of parentheses around the parts you would like to be the partition key. The remaining parts of the compound key will be used as the physical parts of the composite columns in the underlying data structure. These are called the clustering keys. The clustering keys will determine the order in which the columns are stored on disk. You can optionally specify a clustering order that will order the columns on disk and directly affect the ORDER BY clause. Listing 4.5 demonstrates creating a static table in CQL 3. Listing 4.5 Example Usage of CREATE TABLE for a Static Table CREATE TABLE users ( email text PRIMARY KEY, first_name text, last_name text, password text ) WITH commentϭ'Users information' Listing 4.6 demonstrates creating a dynamic table in CQL 3 with an optional com- posite partition key. Listing 4.6 Example Usage of CREATE TABLE for a Dynamic Table CREATE TABLE events ( event_time timestamp, url text, event_id uuid, network_location inet, event_data text PRIMARY KEY ((url, event_time), event_id, network_location) ) WITH compaction ϭ { 'class' : 'LeveledCompactionStrategy' } AND commentϭ'Event Data' Other Options The CREATE TABLE syntax allows for other options to be set. These options are purely optional and are defined as key/value pairs in a WITH statement after the column defini- tion. Specifying WITH COMPACT STORAGE will create a legacy table that will allow you to use your existing Thrift-created tables with CQL 3. The available options are specified in Table 4.2. ptg11524036 33A Familiar Way of Doing Things DROP TABLE The DROP TABLE command (Listing 4.7) works exactly as the SQL DROP TABLE com- mand does. This operation is irreversible and removes all information within the specified table, as well as the table definition itself. Listing 4.7 Example Usage of DROP TABLE DROP TABLE events; TRUNCATE The TRUNCATE command (Listing 4.8) works exactly as the SQL TRUNCATE command does. This operation is irreversible and removes all information within the specified table. Table 4.2 Additional Options for CREATE TABLE in CQL 3 Option Default Description comment none A free-form, human-readable com- ment. read_repair_chance 0.1 The probability with which to query extra nodes for the purpose of read repairs. dclocal_read_repair_ chance 0 The probability with which to query extra nodes belonging to the same data center as the read coordinator for the purpose of read repairs. gc_grace_seconds 864000 Time to wait before garbage-collect- ing tombstones (deletion markers). bloom_filter_fp_ chance 0.00075 The target probability of false posi- tives of the SSTable bloom filters. Said bloom filters will be sized to provide the probability. compaction The compaction options to use. compression The compression options to use. replicate_on_write true Whether to replicate data on write. This can only be set to false for tables with counters values. caching keys_only Whether to cache keys (“key cache”) and/or rows (“row cache”) for this table. Valid values are all, keys_ only, rows_only, and none. ptg11524036 34 Chapter 4 CQL Listing 4.8 Example Usage of TRUNCATE TRUNCATE events; CREATE INDEX The CREATE INDEX statement (Listing 4.9) creates a new secondary index for the col- umn specified. Optionally, you can specify the index name prior to the ON statement. If there is existing data in the table during command execution, immediate indexing of current data will occur; after that, the indexes will automatically be updated when data is modified. Listing 4.9 Example Usage of CREATE INDEX CREATE INDEX network_location_index ON events (network_location); DROP INDEX The DROP INDEX statement (Listing 4.10) removes an index that was created using CREATE INDEX. Listing 4.10 Example Usage of DROP INDEX DROP INDEX network_location_index; INSERT The INSERT command (Listing 4.11) is similar to the SQL counterpart. The major differ- ence between CQL insert and SQL insert is that CQL insert requires that the PRIMARY KEY be specified during statement execution. Listing 4.11 Example Usage of INSERT Specifying a TTL INSERT INTO events ( event_time, url, event_id, network_location, event_data ) VALUES ( 1365977131666, '', now(), '', '{"browser": "Firefox"}' ) USING TTL 345600; ptg11524036 35A Familiar Way of Doing Things UPDATE The UDPATE command (Listing 4.12) is similar to the SQL UPDATE command. Just as with the INSERT statement, you must specify the PRIMARY KEY as part of the UPDATE WHERE statement. In addition, the only other difference is that in CQL the UPDATE com- mand does not check for the existence of the row; if the row does not exist, CQL will just create it. Because of this, the application can exclusively use UPDATE and not have to use INSERT when modifying data. Listing 4.12 Example Usage of UPDATE Specifying a TTL UPDATE events USING TTL 345600 SET event_idϭnow(), network_locationϭ'', event_dataϭ'{"browser": "Firefox"}' WHERE event_timeϭ1365977131666 AND urlϭ''; DELETE The DELETE statement (Listing 4.13) removes columns and rows. If columns are specified after the DELETE keyword, only those columns will be removed. If no columns are speci- fied, the entire row will be removed. Just as with the INSERT and UPDATE statements, the PRIMARY KEY must be specified. Listing 4.13 Example Usage of DELETE DELETE event_data FROM events WHERE event_timeϭ1365977131666 AND urlϭ''; BATCH The BATCH statement (Listing 4.14) allows a user to specify multiple statements in one request. All statements that have the same partition key will be applied atomically. Listing 4.14 Example Usage of BATCH BEGIN BATCH UPDATE events USING TTL 345600 SET event_idϭnow(), network_locationϭ'', event_dataϭ'{"browser": "Firefox"}' WHERE event_timeϭ1365977131666 AND urlϭ''; DELETE event_data FROM events (Continues) ptg11524036 36 Chapter 4 CQL Listing 4.14 Example Usage of BATCH (continued) WHERE event_timeϭ1365977131666 AND urlϭ''; APPLY BATCH; SELECT The SELECT statement (Listing 4.15) returns rows and columns, just as in SQL the SELECT statement takes parameters of fields, or a “*” that tells the statement to return all columns. It can also optionally have specified a WHERE clause, an ORDER BY clause, and a LIMIT clause. In addition to these clauses, there is also an optional ALLOW FILTERING clause. Listing 4.15 Example Usage of SELECT SELECT * FROM events; WHERE The WHERE clause (Listing 4.16) specifies which rows must be queried. The syntax is just like that in SQL; however, the columns specified in the WHERE clause must be either part of the PRIMARY KEY or on a column that has a secondary index specified. In addition to this, non-equality-based expressions are not supported unless the table has been created with an ordered partitioner. Last, if a compound PRIMARY KEY is specified, the WHERE clause must have the contiguous parts of the PRIMARY KEY specified; that is to say, if the compound key has four parts, you can use parts 1, 2, and 3 in the WHERE clause but not 1 and 4, or 1, 2, and 4. This is a limitation of the way the data is stored in the underlying structure. Listing 4.16 Example Usage of SELECT with WHERE SELECT * FROM events WHERE urlϭ'' AND event_time Ͼ 1365977131666; ORDER BY The ORDER BY option (Listing 4.17) allows you to change the order of the returned results. Just as in SQL, it takes an option of the column name and either ASC or DESC for ascending order or descending order, respectively. If the table was created with a cluster- ing order, the available ordering options are those specified by that order or reversed; otherwise, it is limited to the order of the clustering key. Listing 4.17 Example Usage of SELECT with ORDER BY SELECT * FROM events WHERE urlϭ'' AND event_time Ͼ 1365977131666 ORDER BY event_id DESC; ptg11524036 37A Familiar Way of Doing Things LIMIT The LIMIT option (Listing 4.18) restricts the number of rows returned to the number specified. This works exactly like its SQL counterpart. Listing 4.18 Example Usage of SELECT with LIMIT SELECT * FROM events LIMIT 10; ALLOW FILTERING The ALLOW FILTERING option (Listing 4.19) enables the server to actively filter the results server-side. The default behavior of the WHERE clause is to only be able to reduce fields based on the PRIMARY KEY parts or items specified by the secondary indexes. With ALLOW FILTERING you can tell the server that it can manually reduce the results by the WHERE clause as long as at least one component of the WHERE clause is specified in a secondary index. If ALLOW FILTERING is used, it can have severe performance implications in situations where a lot of rows are returned. Due to these performance implications, this option should be used with extreme caution. Listing 4.19 Example Usage of SELECT with ALLOW FILTERING SELECT * FROM events WHERE urlϭ'' AND network_locationϭ'' ALLOW FILTERING; Example Schemas To better understand how Cassandra physically stores data, we will provide a few example schemas. In these examples, we will show sample output, as CQL will give you. The alternating shaded and nonshaded rows will show you the physical rows as stored in the Cassandra subsystem. The different styles of text will show the possible node distribution in a three-node cluster using a partitioner that will evenly distribute the data. Static Tables Static tables in Cassandra are tables whose storage schema closely represents what is returned to the user when querying the data. These have a limited number of physical columns and are created by not specifying a compound PRIMARY KEY. In the basic users table in Listing 4.20, you can see that each physical row is returned as a logical row in CQL. In addition to that, each row resides on a different node. ptg11524036 38 Chapter 4 CQL Listing 4.20 Example of Static Table Data Storage CREATE TABLE users ( email text PRIMARY KEY, first_name text, last_name text, password text ); SELECT * FROM USERS; email | first_name | last_name | password ϩ ϩ ϩ | Russ | Bradberry | acbd18db4cc2f85c | Eric | Lubow | a48153c2edb888f9 | Edward | Kim | c08992871dc02c13 | John | Dougherty | 8c4e114e301c4a7d | Steve| Knightley | a4b18d8476c6caf5 Dynamic Tables Dynamic tables in Cassandra are tables that, unlike static tables, do not map one CQL row to one physical row on disk. When using a compound PRIMARY KEY, the storage system will create a single wide row for all the possible logical rows that belong to the partition key of the table. This allows for very fast lookups and slices of data belonging to that particular partition key and also ensures that all pieces of data related to that parti- tion key are stored near each other on disk and on the same node. Listing 4.21 is a page- view-tracking database that tracks page views by URL and rolls them up by hour. While this could have been created as a static table, we used a compound PRIMARY KEY to ensure that all hour data for a particular URL is kept on the same node and close on disk. This will ensure that reads for that URL are much faster. Each physical row on disk in the Cassandra storage system maps to several logical rows in CQL. Listing 4.21 Example of Dynamic Table Data Storage CREATE TABLE page_view_hour_rollups ( hour timestamp, url text, page_views bigint, PRIMARY KEY (url, hour) ); SELECT * FROM page_view_hour_rollups; url | hour | page_views ϩ ϩ | 2013-06-13 00:00:00ϩ0000 | 351 | 2013-06-13 02:00:00ϩ0000 | 43 | 2013-06-13 03:00:00ϩ0000 | 914 | 2013-06-13 01:00:00ϩ0000 | 9435 | 2013-06-13 02:00:00ϩ0000 | 183 | 2013-06-13 04:00:00ϩ0000 | 98 ptg11524036 39Summary | 2013-06-13 08:00:00ϩ0000 | 1363 | 2013-06-13 09:00:00ϩ0000 | 64 | 2013-06-13 11:00:00ϩ0000 | 736 | 2013-06-13 01:00:00ϩ0000 | 692 | 2013-06-13 04:00:00ϩ0000 | 23 | 2013-06-13 09:00:00ϩ0000 | 1553 Summary CQL has come a long way since the first version. It truly bridges the gap between the Cassandra data model and the RDBMS data model. While not all features of SQL are supported, more and more features are added with every new version of Cassandra. It is important to keep in mind that just because CQL is an SQL-like language, it is not SQL and the data should definitely not be modeled as if you were using SQL. By taking advantage of CQL’s many diverse features, you can easily insert and read data that would otherwise be complicated with the Thrift RPC. ptg11524036 This page intentionally left blank ptg11524036 5 Deployment and Provisioning Now that you have done a little development on the application and you have it working in a development environment, it’s time to think about what your production environment is going to look like. In this chapter, we will cover the different aspects of deploying Cassandra to production. These include choosing a replication factor and a placement strategy and understanding the various partitioners available with Cassandra. Keyspace Creation When creating your keyspace, you need to decide on a few things: your data center layout and your replication strategy. Your data center layout is part of a greater set of considerations that get bundled into the category of replication strategy. In short, you need to answer these questions: How many copies of your data do you want, and where do you want to keep them? Much of this depends on your application and your usage patterns. We will examine some of those considerations in this chapter. Replication Factor Before we dive into making decisions about data center layout and strategy, we should outline what replication means. Replication is simply the process of storing copies of data in more than one place. This is done to ensure reliability and fault tolerance. The replication factor refers to the number of copies of each row that are stored. So if you have a replication factor of 3 in data center us-east, that means you have three copies of each row. It is important to note that you don’t have a master set of data and then two additional copies. There is no primary or master concept with Cassandra. Having three copies of the data means that you have three copies of the data that are equally valued by the cluster. There is really only one main rule that applies to setting a replication factor. That rule is that the replication factor should not exceed the number of nodes in the cluster. If the replication factor exceeds the number of nodes, writes are rejected. Reads continue to be served if the consistency level of the request can be met. The key takeaway here is not to paint yourself into a corner by trying to be too safe and having too many copies of your data before your data center has grown to a size where it can handle the replication factor. ptg11524036 42 Chapter 5 Deployment and Provisioning Replication Strategies In order for Cassandra to be able to determine the physical location of nodes in the clus- ter and their proximity to each other, it needs to know how you are planning to lay things out. There are three parts to this: the replication strategy, the snitch, and the partitioner. For replication strategies, there are two possibilities: SimpleStrategy and NetworkTopologyStrategy. SimpleStrategy SimpleStrategy is to be used on an individual machine or with single-data-center clusters. As a result, it is also the default placement strategy when creating a keyspace. The way SimpleStrategy works is mostly based on the partitioner. The first replica is placed on whichever node the partitioner tells it. All additional replicas are added to the ring in a clockwise fashion. When queries are run under SimpleStrategy, there is no location awareness. In other words, the data center and rack locations of the other nodes are not taken into account. NetworkTopologyStrategy The best time to use the NetworkTopologyStrategy is when you have a Cassandra deployment across multiple data centers. The strategy describes the number of replicas that should be in each data center. This number is set at keyspace creation time. Replica Counts There are a few things to take into account when deciding how many replicas should be in each data center. The first has to do with satisfying reads within the data center. If the read has to leave the data center for a replica in another data center, you will incur network latency, which will ultimately slow down the response time of your read query. The other major factor to take into account when deciding the replica counts within a data center is what failure cases you are willing to accept in your application. These break down into two common scenarios: n Two replicas per data center. If a single node in a replica set fails, reads that were requested with a consistency level of ONE will still succeed. It is generally not recommended to set a replication factor of 2 if you have fewer than four nodes in a data center. n Three replicas per data center. If a single node in a replica set fails, reads that were requested with a consistency level of LOCAL_QUORUM will still succeed. It is generally not recommended to set a replication factor of 3 if you have fewer than six nodes in a data center. There is also a setup that is generally termed “asymmetrical replication.” This means that the number of replicas in one data center does not match the number of replicas in the other data centers. There are some very good use cases for this. For instance, if you do the majority of your reads in datacenter1 and the majority of your writes in ptg11524036 43Snitches datacenter2, having more nodes in datacenter1 to reduce read latency may be a good strategy. Another good use case would be if you have a single data center that acts as an analytics data center, using a technology such as Hadoop. This data center would have drastically different read and write patterns from the other data centers and could be configured accordingly. Snitches The first and most obvious question here is what is a snitch? A snitch is simply a protocol that helps map IPs to racks and data centers. In other words, it creates a topology by grouping nodes together to help determine where data is read from. As an aside, there is no need to use a snitch for writes. Writes function by being sent to the receiving node and then the receiving node blocks until the consistency level for that write operation has been met. When a read request to Cassandra happens, the requesting application asks only one node, the coordinator node. The consistency level of the read request and the read_repair_ chance for that ColumnFamily decide how the snitch steps in. Ultimately, only one node will send back the requested data. It is up to the snitch to determine, typically based on historical performance, which node or nodes that will be. There are a few different possible snitches that can be used with Cassandra, and each comes with its pros and cons. We will discuss what each snitch is and what is a good use case for the snitch. It should be noted that a snitch has an effect only on the way Cassandra talks to itself. It has no bearing on client applications and their communication with Cassandra. Snitches are configured in your cassandra.yaml file. It is extremely important to remember that all nodes in a cluster must use the same snitch. If you want to change the snitch for your cluster, you must make the change in the configuration file and then restart the entire cluster. Simple Taking advantage of the SimpleSnitch requires very little setup or knowledge. The reason for that is the SimpleSnitch does not know anything about the data center settings or racks. The SimpleSnitch is good for a single machine or even a single-data-center deployment. You can even think of a single data center as one zone in a cloud architecture. When you are setting the SimpleSnitch, you need to remember to put the replication_factorϭ# in your keyspace strategy_options. Dynamic Dynamic snitching is actually a layer of the snitch protocol that exists in all snitches besides the SimpleSnitch. It wraps your chosen snitch and provides an additional adaptive layer for determining the best possible read locations. However, it is possible for that adaptive layer to become counterproductive. The DynamicSnitch uses latency possibility calculations to determine the best approach. There are certain circumstances in which ptg11524036 44 Chapter 5 Deployment and Provisioning those calculations can take longer than the query itself. In order to avoid that, the DynamicSnitch takes a two-pronged approach. The first option is for the DynamicSnitch to use recent update statistics. The second is to calculate a score for each host and run the query based on the best score. There are a few potential issues with the calculation of scores and update checks with regard to the DynamicSnitch. What happens if a node is in the middle of a compaction? What happens if the hardware on that node isn’t as powerful as some of the other nodes (since Cassandra is made to run on commodity hardware)? What happens if a node hasn’t been able to check in because the coordinator node is overloaded? What happens if there isn’t any information for the snitch to use to make a decision? And even more pressing, if all necessary information exists, is using latency the best approach for determining performance capabilities? Rack Inferring The RackInferringSnitch works by assuming it knows the topology of your network, by the octets in a node’s IP address. For instance, if you have a Cassandra node at, the RackInferringSnitch will assume the following breakdown: 10. The first octet has no special meaning. 20. The second octet is the data center octet. If there was another node at, Cassandra would assume that this node was in a different physical data center. 30. The third octet is the rack octet. Cassandra assumes that all nodes in this octet are in the same rack. 40. The final octet is the node octet. When defining your keyspace strategy_options, be sure to include the second octet as the data center name for clarity. EC2 The Ec2Snitch is for Amazon Web Services (AWS)–based deployments where the entire cluster sits within a single region. The region is then considered the data center and the availability zones are considered the racks in those data centers. For example, let’s assume you have a cluster set up in the us-east-1 region. If node1 is in us-east-1a and node2 is in us-east-1b, Cassandra would consider these nodes to be in two different racks within the same data center. Node1 would be considered rack 1a and node2 would be consid- ered rack 1b. Since the Ec2Snitch will work only in single-region deployments, you don’t have to worry about public IPs for the Cassandra nodes. It also makes your keyspace definition more straightforward. When defining the strategy_options for your keyspace, the EC2 region name (which was us-east in the preceding example) is what should be used as the data center name. Listing 5.1 shows an example of how to create a keyspace using the Ec2Snitch in the us-east data center. ptg11524036 45Snitches Listing 5.1 Keyspace Creation for the Ec2Snitch in the us-east Data Center CREATE KEYSPACE VideoStore WITH replication ϭ {'class': 'NetworkTopologyStrategy', 'us-east' : 3} AND durable_writes ϭ true; Ec2MultiRegion The Ec2MultiRegionSnitch is for use in Amazon Web Services deployments where the Cassandra cluster spans multiple regions. This snitch views data centers and racks in a simi- lar fashion to the EcC2Snitch. Using the Ec2MultiRegionSnitch, if a node is us-east-1a, us-east is the data center and 1a is the rack name. The major difference is in setting the IP addresses. The Ec2MultiRegionSnitch requires the use of public IPs to be able to talk across regions. When configuring your node, the listen_address is still set to the private IP of the node. The broadcast_address is set to the public IP address of the node. This allows nodes in different regions to communicate. If Cassandra determines that the nodes are within the same data center, it will switch to using the private IP address of the node, the one that was set as the listen_address, when doing intra-availability-zone communication. There are a few other important settings that need to be changed in order to make use of the Ec2MultiRegionSnitch. The seed nodes that are set in the cassandra.yaml file must be the public IPs of the seed machines. Since private IPs are not routable, public IPs need to be used in order to get to the nodes that will initiate the bootstrap. The storage_port, default 7000 (or ssl_storage_port, default 7001), should be accessible to all the nodes in the cluster. This may require adjusting the security group settings. The keyspace creation with regard to data centers should be handled in the same fashion as the Ec2Snitch. Listing 5.2 shows an example of how to create a keyspace using the Ec2MultiRegionSnitch in the us-east and us-west data centers. Each data center is being set up with a replication factor of 3. Listing 5.2 Keyspace Creation for the Ec2MultiRegionSnitch in the us-east and us-west Data Centers CREATE KEYSPACE VideoStore WITH replication ϭ {'class': 'NetworkTopologyStrategy', 'us-east' : 3, 'us-west': 3 } AND durable_writes ϭ true; Property File As with the rest of the snitches, the PropertyFileSnitch helps to determine the location of nodes by rack and data center. The difference here is that the layout is user defined. This is good for more complex groupings that may lack uniformity (such as with the RackInferringSnitch and the EC2-based snitches). The property file for the Cassandra node sits in the same directory as the cassandra.yaml file. It is named cassandra-topology .properties. ptg11524036 46 Chapter 5 Deployment and Provisioning PropertyFileSnitch Configuration Using the PropertyFileSnitch is difficult if you have a large cluster that is not under some configuration management (such as Chef or Puppet). The file must be in sync on every machine in the cluster for everything to work properly. And every node in the cluster must be in this file. The typical use case for the PropertyFileSnitch is when you have nonuniform IPs in multiple data centers. The ability to specify which IPs sit in which racks and which data centers is powerful for configuring a complex setup. Listing 5.3 shows an example file. In this file, there are two data cen- ters with two racks each. In both racks, the IPs are laid out in a nonuniform fashion. Listing 5.3 Example File. Setting Up Two Physical Data Centers with Two Racks Each, All with a Nonuniform Layout of IPs # Data Center OneϭDC1:RAC1ϭDC1:RAC1ϭDC1:RAC1ϭDC1:RAC1ϭDC1:RAC2ϭDC1:RAC2ϭDC1:RAC2 # Data Center TwoϭDC2:RAC1ϭDC2:RAC1ϭDC2:RAC1ϭDC2:RAC2ϭDC2:RAC2ϭDC2:RAC2 The last thing you need to ensure is that whatever you name your data centers in the file, the names must match the names of your data centers in the keyspace definition. The corresponding keyspace creation would have to look like Listing 5.4. Listing 5.4 Keyspace Creation for a PropertyFileSnitch-Based Keyspace CREATE KEYSPACE VideoStore WITH replication ϭ {'class': 'NetworkTopologyStrategy', 'DC1' : 2, 'DC2' : 2} AND durable_writes ϭ true; Partitioners The placement of replicas within a data center is determined by the partitioner. At its core, the partitioner is just a hashing function for computing the token (aka the hash) of a row key. Since the location of data within a cluster is determined by its token, it is ultimately up to the partitioner where the data ends up. Each row has its own token and ptg11524036 47Partitioners is therefore uniquely identifiable. This is why each row must be able to fit in full on a single node, regardless of what the replication factor for the data center is. Once the partitioner for a cluster has been chosen, you need to continue to use the same partitioner. Unlike compaction strategies, once the partitioner is in place, it is set for the duration of the cluster. That is not to say that you are not without options. There are a few possible partitioners you can use. As of Cassandra 1.2, you will almost always want to use the Murmur3Partitioner. Let’s take a look at what options are available. Byte Ordered The ByteOrderedPartitioner was one of the first available in Cassandra. While it does have some advantages, it is not recommended to use this. The ByteOrderedPartitioner is used for ordered partitioning of data. This is achieved by ordering the row lexically by the key bytes. Tokens are calculated using the hex representation of the leading character in a key. The main advantage to using an ordered partitioner is that you can do scans by primary key. This means that Cassandra will be capable of doing range scans. For example, you can say, “Show me all users in my database who have a last name between Bradberry and Lubow.” The reason this type of query isn’t possible with one of the random partitioners is that the token is a hashed value of the key and there is no guarantee of sequence. Even though this all seems like a great idea, you can use secondary indexes to achieve the same thing and avoid a lot of the consequences of using an ordered partitioner. There are two major cons to using the ordered partitioner: poor load balancing and hot spots. Although it is entirely possible to get hot spots on the random partitioners as well, your chances increase with ordered partitioners. While this is application dependent, most applications tend to write sequentially or heavily favor certain types of data like timestamps or similar last names. If this is the case for your application, many of the reads and writes will go to the same few nodes and cause hot spots. These types of hot spots will also cause trouble when attempting to load-balance your data. One table being balanced across the cluster does not mean that another table will be balanced across the cluster. This means regularly recalculating and rebalancing your partition ranges. Not only is this an additional administrative headache to deal with, but it is also easily avoidable by choosing a random partitioner and being intelligent about query patterns and using secondary indexes. Random Partitioners A random partitioner is a partitioner that distributes data in a random but consistent fashion around the cluster. Both the RandomPartitioner and the Murmur3Partitioner are examples of random partitioners. Unlike ordered partitioners such as the ByteOrderedPartitioner, ran- dom partitioners create a hashing function that makes the distribution of data appear more random. As of Cassandra 1.2, there are two types of random partitioners: RandomPartitioner and Murmur3Partitioner. Random The RandomPartitioner was the default partitioner in Cassandra prior to 1.2. It should not be confused with the concept of a random partitioner. The RandomPartitioner uses ptg11524036 48 Chapter 5 Deployment and Provisioning an MD5 hash value of the row to evenly distribute data across the cluster. The possible range of values for the hash is from 0 to (2127 - 1). If you are using virtual nodes in Cassandra 1.2, you won’t need to worry about calculating tokens. If you are not using vnodes, you will have to properly calculate token ranges in order to have a balanced cluster. In almost all cases, you will want to use the Murmur3Partitioner if you are using Cassandra 1.2. When using the RandomPartitioner, you can page through all rows using the token function in CQL 3. Murmur3 As of Cassandra 1.2, the Murmur3Partitioner is the new default partitioner when creating a Cassandra cluster. It provides a faster hashing function called the MurmurHash. This func- tion creates a 64-bit hash value of the row key. Its values range from -263 to (ϩ263-1). The other major added benefit to using the Murmur3Partitioner is that you can page through all rows using the token function in CQL 3. Node Layout Prior to Cassandra 1.2, one token was assigned to each node. This ensured that within the ring, each node was responsible for one contiguous range of data. Virtual nodes, or vnodes, provide a Cassandra node with the ability to be responsible for many token ranges. Within the cluster, they can be noncontiguous and selected at random. This provides a greater distribution of data than the non-vnode paradigm. Virtual Nodes There are many advantages to using vnodes over a setup in which each node has a single token. It should be noted that you can easily have Cassandra follow the old single-token paradigm by setting num_tokens to 1 instead of a larger setting like 256. If you do that, you won’t be able to take advantage of more efficient node bootstrapping. For example, let’s say you have a 20-node cluster that has a replication factor of 3, and you lose a node and have to bring up a replacement. Without vnodes, you will be guaranteed to touch and likely saturate three nodes in your cluster (or about 15%). Since the RF is 3 and you are down one node, two replicas times three ranges (assuming that is the breakdown) means six nodes available to stream or bootstrap from. That’s the theory anyway. In practice, Cassandra will use only three nodes to rebuild the data. So out of your 20-node cluster, you will have one node down for a rebuild and three nodes streaming to rebuild it. Effectively 20% of your cluster is degraded. This is all assuming nothing else happens, such as losing another node while rebuilding the current one. Enter vnodes. Instead of only being able to stream from other nodes in the replica set that was lost, the data is spread around the cluster and can be streamed from many nodes. When it comes to repairs, the way it works is that a validation compaction is run to see what data needs to be streamed to the node being repaired. Once the validation compac- tion is done, the node creates a Merkle tree and sends the data from that tree to the requesting node. Of the two operations, streaming is the faster one, whereas the valida- tion can take a long time depending on the size of the data. With smaller distributions of ptg11524036 49Platforms data, the validations on each node can be completed more quickly. Since the streaming is fast anyway, the entire process is sped up because there are more nodes handling less data and providing it to the requesting node more quickly. Another huge advantage is the fact that replacing or adding nodes can be done individ- ually instead of doing full capacity planning. Before virtual nodes, if you had a 12-node cluster, you could not add just one node without rebalancing. This would leave the cluster in a state where there is an uneven distribution of data. Among other things, this can cre- ate hot spots, which is bad for the cluster. With virtual nodes, you can add a single cluster and the shuffle method will redistribute data to ensure that everything is properly balanced throughout the cluster. If you are using older commodity machines or slightly slower machines, setting the num_tokens field to something smaller than the default of 256 is probably the way to go. Starting with the default of 256 is usually fine. Balanced Clusters If you are starting with Cassandra 1.2 or greater, you will likely be using virtual nodes, so you won’t need to worry about keeping your cluster balanced. Everything related to keep- ing the data distribution in the cluster balanced evenly will happen in the background. You need to worry about the balance of a cluster only if you are using a partitioner where token generation isn’t done for you. Firewalls Cassandra primarily operates using three ports: 7000 (or 7001 if you are using SSL), 7199, and 9160. In order for Cassandra to be able to properly talk to the other nodes in the ring, it should to be able to speak TCP to ports 7000 or 7001. This is the internode cluster communication port. Port 7001 is used only if internode communication is set to be encrypted. This is done by setting internode_encryption to either all, dc, or rack. The default setting of none will ensure that internode encryption is off and Cassandra will use port 7000 for communication. Port 7199 is the JMX port used for the initial JMX handshake. Once the initial JMX handshake has been completed, Cassandra picks a high port, or a port greater than 1024, to continue the communication. The last and likely most important port is the Thrift client port. This is port 9160. When clients or applications connect to Cassandra, they do so on port 9160. The other port that should typically be open in a Cassandra cluster is port 22, generally known as the SSH port. Though you can run nodetool queries from any Cassandra machine, you will typically want to run additional commands such as vmstat or dstat, or even look into the logs to get deeper insight into problems. Platforms Cassandra will run on any system that supports the Java Virtual Machine (JVM). The easiest way to get started with an actual deployment greater than your personal machine is to use a cloud service. For instance, on Amazon Web Services, there are ready-made AMIs (Amazon ptg11524036 50 Chapter 5 Deployment and Provisioning Machine Images) that come preloaded with Cassandra. You can start one up and basically go right into using your Cassandra cluster. This is the reason that so much work has been done to create a snitch that is specific to Amazon’s EC2 (Elastic Computer Cloud). Amazon Web Services If you decide to run your cluster on AWS, there are a few basic principles that should be followed. The smallest node that you should run Cassandra on is either an m1.large or an m1.xlarge with ephemeral storage. Since Amazon reduced the number of ephemeral disks per instance from four to two, you may need to attach two additional disks. The most efficient way to make use of a basic configuration is to set up RAID0 on the ephemeral drives and put the CommitLog on that same volume. Even though it is generally considered best practice to put the CommitLog on a separate volume, using Amazon’s EBS (Elastic Block Store) here would be slower, and using the root partition (which is also used by the OS itself) would cause slowdown. Therefore, sharing the RAID0 will likely yield the best results. Even though Cassandra now has JBoD support, you will likely get better throughput by using RAID0. RAID0 takes the approach of splitting every block to be on a separate device. It does this to ensure that writes are handled in parallel instead of serially. EBS volumes on Cassandra are generally not recommended as they contend for network I/O. Adding capacity by increasing the number of EBS volumes per host also does not scale very well. It easily surpasses the ability of the system to keep effective buffer caches and concurrently serve requests for all of the data. Having very large drives on a single system also does not play well with buffer caches to be effective. Using EBS volumes is still a good idea for backups within your cluster. If you have a regular backup strategy, EBS volumes as well as Amazon S3 (and even Amazon Glacier) should be a part of it. Other Platforms Cassandra performs very well on any of the cloud platforms, including Rackspace, Google Compute Engine, and Windows Azure. The requirements for distribution across zones, fast consistent disk I/O, and easy base configuration layout make it well suited for any of the cloud platforms, either individually or in tandem. The value and the performance benefits of a specific cloud platform will be found by figuring out how to tune your setup best for your use case. Summary In this chapter, we covered the basics of how a cluster is constructed and the decisions that need to be made when creating one. These include everything from choosing the partitioner to the replication strategy. We also delved into some of the pros and cons of those issues. Being a distributed system, Cassandra has a lot of moving parts, some that must be set prior to use and others that can be changed around even while in production. Now you should have enough information to move forward and create your cluster, knowing what kinds of trade-offs you are willing to make and where you want to get started. ptg11524036 6 Performance Tuning Once your Cassandra cluster is in production and running with its out-of-the-box configuration, it’s time to push the envelope a little with some performance tuning. There are multiple levels at which this tuning can take place. At the lowest level, there are the hardware and operating system. And while some of this has to do with basic setup of the hardware (such as RAID levels), there are kernel-level tunables that you can take advan- tage of. At the next level is the Java Virtual Machine. The JVM also has a lot of ways that it can be started and manipulated to handle various types of workloads and applications. And finally there are the configurables within Cassandra itself. The important thing to remember when attempting to do performance tuning of any kind is that not everything is universally applicable. Your mileage will vary depending on your OS, hardware, applications, and access patterns. Learning which things are tunable and what they do when changed will go a long way toward helping you pull the most out of your system. This chapter will talk about how some of the available knobs work. Methodology Much of the tuning that needs to be done on any system requires a baseline understand- ing of how the system operates under normal circumstances. There are a number of methods that can be used to obtain this information. The most common method is to use standard *nix tools like vmstat, iostat, dstat, and any of the top monitors like htop, atop, and top. Each of these tools will give you a picture of usage on the memory, CPU, I/O, disks, and any combinations of those on your system. It may sound boring and tedious, but get a little familiar with what the output of those tools should look like under normal operating conditions for your system. Having instrumented graphs is also helpful. Applications such as Cacti, Munin, and Ganglia come with standard templates for graphing many of the system-level metrics. This means you can actively change settings and see how the applications and the system respond in real time. While some of these may be easier than others to get working, it is easier to understand a visual representation of changes than to determine whether or not one or two numbers in a row of other numbers changed significantly. If you are using Amazon Web Services, you get a few of these graphs for free in the Web console just for having an instance running. ptg11524036 52 Chapter 6  Performance Tuning When making changes, you should have a control machine as well as an experimental machine. One method for testing changes is to have a query or a script run on both the control machine and the experimental machine and compare their response times. The problem with this method is that when running a query under normal circumstances, you will likely be using both of the nodes as coordinator nodes, and the response will be cached for one of the two queries by the node responsible for the data. If caching isn’t on for Cassandra, you may get the file system cache. There are a number of variables here, and even though there are ways around many of them, there might be an easier (and possibly better) way to measure the effectiveness of your changes within your application or cluster. Testing in Production A lot of performance optimization testing can be done in production. Although making changes in a production environment goes against conventional wisdom (and might even make you feel a little uneasy), Cassandra is built to be fault tolerant. There are obviously certain circumstances when this is not possible, and even when it is, it should be done with extreme caution. If you have a three-node cluster with an RF of 1, testing perfor- mance changes in production is probably not a good idea. If you have a six-node cluster with an RF of 2 or more that is not running at capacity, you can stand to lose a node and recover if you make a mistake. If you make too big a mistake, it might even be better to pull the node out of the cluster and start from scratch by re-bootstrapping it. On the whole, Cassandra is intended to be a resilient system. If one node goes a little funky and becomes too slow to respond, the FailureDetector will remove it without your having to do a thing. In this respect, Cassandra can protect you from yourself. The general idea is that you can take chances within reason as long as you are aware of what chances you are taking. Tuning Cassandra has a lot of knobs that can be twisted and turned to get different perfor- mance characteristics for different types of workloads. These settings apply to all differ- ent stages of the architecture and have varying impacts based on the types of workloads you are seeing. Timeouts There are quite a few configurable timeouts in Cassandra. The proper values for these settings are highly dependent on your environment and your system requirements. They include how long the coordinator node in a query should wait for operations to return. Setting the proper timeouts for your environment is critical. If you set the values too high, your queries will start to stack up while coordinator nodes wait for responses from slow or down nodes. If the settings are too low, coordinator nodes will give responses based on incomplete information and the replica sets will have been queried for data that wasn’t returned to the application. ptg11524036 53Tuning Another configurable value is streaming_socket_timeout_in_ms. This is an impor- tant setting as it can control how much time is spent restreaming data between nodes in the event of a timeout. By default, there is no timeout in Cassandra for streaming operations. It is a good idea to set a timeout, but not too low a timeout. If a streaming operation times out, the file being streamed is started over from the beginning. As some SSTables can have a not insignificant amount of data, ensure that the value is set high enough to avoid unnecessary streaming restarts. Cassandra provides a setting that allows nodes to communicate timeout information to each other. This option is called cross_node_timeout and defaults to false. The reason this is initially off is because the timing can properly be synchronized only if system clocks on all nodes are in sync. This is usually accomplished with an NTP (Network Time Protocol) server. If this setting is disabled, Cassandra assumes that the request was instantly forwarded by a coordinator node to the replica. CommitLog The idea of a CommitLog and how Cassandra has implemented it is one of the reasons that Cassandra responds so well to write-heavy workloads. Here are some tricks for optimizing the CommitLog. An easy optimization for Cassandra is putting your CommitLog directory on a sepa- rate drive from your data directories. CommitLog segments are written to every time a MemTable is flushed to disk. This might be easier said than done depending on your setup. If your servers are hosted in AWS, the instance stores are your best bet for CommitLog segments on standard machines. On the hi1.large instances in AWS, which allow you to use solid-state drives (SSDs), you have access to multiple faster devices than just the ephemeral drives. But the idea is that you can search the CommitLog and the data SSTables simultaneously, giving better read and write performance. It may not be as easy to add a drive to a Cassandra server running on iron. It would include downtime for the machine, which is not a problem for Cassandra assuming that your downtime period is less than your setting for max_window_in_ms. In the cassandra.yaml file, there is a setting called commitlog_directory. That setting is the one that determines where the CommitLog segments get written to. Check to see which disk or partition your data_directory is set to in the cassandra.yaml file and make sure the directory of the CommitLog is set to be on a different disk or partition. By ensuring that the data_directory and commitlog_directory are on different parti- tions, the CommitLog reads/writes don’t affect the overall performance of the rest of the reads on the node. There are also a few other settings in the cassandra.yaml file that affect the performance of the CommitLog. The commitlog_sync setting can be set to either batch or sync. If the commitlog_sync is set to batch, Cassandra will block until the write has been synced to disk. Having the commitlog_sync set to batch is usually not needed, as under most circumstances writes aren’t acknowledged until another node has the data. On the other hand, periodic is the default setting and typically is the best for perfor- mance as well. You will also need to ensure that the commitlog_sync_period_in_ms ptg11524036 54 Chapter 6  Performance Tuning is sensible for your write workload. For durability, if you have a high-volume write system, set this to something smaller than 10,000ms (or 10s) to ensure minimal data loss between flushes. Although the default setting of 32 for commitlog_segment_size_in_mb is a sane default, depending on your backup strategy, this may be something you want to change. If you are doing CommitLog archiving as a form of backup, choosing a more granular setting of 8 or 16 might be better. This allows a finer point-in-time restore, depending on what your volume is. MemTables There are a few basic tenets to keep in mind when adjusting MemTable thresholds: nn Larger MemTables take memory away from caches. Since MemTables store the actual column data, they will take up at least that amount of space plus a little extra for index structure overhead. Therefore, your settings should take into account schema (ColumnFamily and column layout) in addition to overall memory. nn Larger MemTables do not improve write performance. This is because writes are happening to memory anyway. There is no way to speed up this process unless your CommitLog and SSTables are on separate volumes. If the CommitLog and SSTables were to share a volume, they would be in contention for I/O. nn Larger MemTables are better for unbatched writes. If you do batch writing, you will likely not see a large benefit. But if you do unbatched writes, the compaction will have a better effect on the read performance as it will do a better job of grouping like data together. nn Larger MemTables lead to more effective compaction. Having a lot of little MemTables is bad as it leads to a lot of turnover. It also leads to a lot of additional seeks when the read requests hit memory. The performance tuning of MemTables can double as a pressure release valve if your Cassandra nodes start to get overloaded. They shouldn’t be your only method of emer- gency release, but they can act as a good complement. In the cassandra.yaml file, there is a setting called flush_largest_memtables_at. The default setting is 0.75. This setting is a percentage. What is going on under the hood is that every time a full garbage collec- tion (GC) is completed, the heap usage is checked. If the amount of memory used is still greater than (the default) 0.75, the largest MemTables will be flushed. This setting is more effective when used under read-heavy workloads. In write-heavy workloads, there will probably be too little memory freed too late in the cycle to be of significant value. If you notice the heap filling up from MemTables frequently, you may need to either add more capacity or adjust the heap setting in the JVM. The memtable_total_space_in_mb setting in the cassandra.yaml is usually com- mented out by default. When it is commented out, Cassandra will automatically set the value to one-third the size of the heap. You typically don’t need to adjust this setting as one-third of the heap is sufficient. If you are in a write-heavy environment, you may ptg11524036 55Tuning want to increase this value. Since you already know the size of the JVM heap, you can just calculate what the new size of the total space allotted for MemTables should be. Try not to be too aggressive here as stealing memory from other parts of Cassandra can have negative consequences. The setting of memtable_flush_writers is another one that comes unset out of the box. By default, it’s set to the number of data directories defined in the cassandra.yaml. If you have a large heap size and your use case is having Cassandra under a write-heavy workload, this value can be safely increased. On a similar note, the memtable_flush_queue_size has an effect on the speed and efficiency of a MemTable flush. This value determines the number of MemTables to allow to wait for a writer thread to flush to disk. This should be set, at a minimum, to the maxi- mum number of secondary indexes created on a single ColumnFamily. In other words, if you have a ColumnFamily with ten secondary indexes, this value should be 10. The reason for this is that if the memtable_flush_queue_size is set to 2 and you have three secondary indexes on a ColumnFamily, there are two options available: either flush the MemTable and each index not updated during the initial flush will be out of sync with the SSTable data, or the new SSTable won’t be loaded into memory until all the indexes have been updated. To avoid either of these scenarios, it is recommended that the value of mem_table_flush_queue_size be set to ensure that all secondary indexes for a ColumnFamily can be updated at flush time. Concurrency Cassandra was designed to have faster write performance than read performance. One of the ways this is achieved is using threads for concurrency. There are two settings in the cassandra.yaml that allow control over the amount of concurrency: concurrent_reads and concurrent_writes. By default, both of these values are set to 32. Since the writes that come to Cassandra go to memory, the bottleneck for most performance is going to the disk for reads. To calculate the best setting for concurrent_reads, the value should be 16 * $number_of_drives. If you have two drives, you can leave the default value of 32. The reason for this is to ensure that the operating system has enough room to decide which reads should be serviced at what time. Coming back to writes, since they go directly to memory, the work is CPU bound as opposed to being I/O bound like reads. To derive the best setting for concurrent_ writes, you should use 8 * number_of_cores in your system. If you have a quad-core dual-processor system, the total number of cores is eight and the concurrent_writes should be set at 64. Durability and Consistency Something to always keep in mind when looking at the performance of your application is which trade-offs you are willing to make with regard to the durability of your data on write and the consistency of your data on read. Much of this can be achieved by setting consistency levels on reads and writes. For reference, a quorum is calculated (rounded down to a whole number) as (replication_factor/2) 1 1. ptg11524036 56 Chapter 6  Performance Tuning We have already covered consistency levels in detail, but the theory behind when to use which consistency level at what time, known as durability, is also important. When you are working under a write-heavy workload, it is unlikely that all the data being written is so important that it needs to be verified as received by every node in a replica (QUORUM, LOCAL_QUORUM, EACH_QUORUM, or ALL). Unless your node or cluster is under a heavy load, you will probably be safe with using CL.ANY or CL.ONE for most writes. This reduces the amount of network traffic and reduces the wait time on the application per- forming the write (which is typically a blocking operation to begin with). If you can decide at write time or connection time which data is important enough to require higher consistency levels in your writes, you can save quite a bit of round-trip and wait time on your write calls. On the read side of things, you can ask yourself a similar question: How important is the accuracy of the call I am making? Since you are working under eventual consistency, it is important to remember that the latest and greatest version of the data may not always be immediately available on every node. If you are running queries that require the latest version of the data, you may want to run the query with QUORUM, LOCAL_QUORUM, EACH_ QUORUM, or ALL. It is important to note when using ALL that the read will fail if one of the replicas does not respond to the coordinator. If it is acceptable for the data not to have the latest timestamp, using CL.ONE may be a good option. By default, a read repair will run in the background to ensure that for whatever query you just ran, all data is consistent. If latency is an issue, you should also consider using CL.ONE. If consistency is more important to you, you can ensure that a read will always reflect the most recent write by using the following: (nodes_written 1 nodes_read) . replication_factor. When thinking about consistency levels in the context of multiple data centers, it is important to remember the additional latency incurred by needing to wait for a response from the remote data center or data centers. Ideally, you want all of an application’s requests to be served from within the same data center, which will help avoid quite a bit of latency. It is important to keep in mind that even at a consistency level of ONE or LOCAL_QUORUM, a write is still sent to all replicas, including those in other data centers. In this case, the consistency level determines how many replicas are required to respond that they received the write. Compression There are a few options for using compression and taking advantage of what it has to offer. There is compression at the file system level, compression at the ColumnFamily level, and compression at the network level. Network compression is available for dealing with internode communication. In the cassandra.yaml file, the option internode_compression controls whether traffic moving between Cassandra nodes should be compressed. There are a few options here. You can choose to ignore compression completely, compress all traffic, or only compress traffic between different data centers. It is likely that this setting will not have a major effect on your system any way you set it. The default is to compress all traffic, and this is a sane default. Compression is CPU bound. If you are short on CPU resources (and it’s rare that ptg11524036 57Tuning Cassandra is CPU bound), not compressing any traffic will likely net a performance bonus. You can also incrementally save here by just setting it to only compress between data centers (assuming you have more than one data center). Prior to Cassandra 1.1.0, compression at the ColumnFamily level was turned off by default. The option to use either SnappyCompressor or DeflateCompressor has been around since Cassandra 1.0.0. Cassandra post-1.1.0 has the Java Snappy compression library as the default compression for a ColumnFamily. Out of the box, you can get a pretty good speed increase by just enabling some compression algorithm across all of the ColumnFamilys. In addition to saving space on disk, compression also reduces actual disk I/O. This is especially true for read-heavy workloads. Since the data on disk is com- pressed, Cassandra only needs to find the location of the rows in the SSTable index and decompress the relevant row chunks. All this ultimately means that larger data sets can now fit into memory, which means quicker access times. The speed increase on writes happens because the data is compressed when the MemTable is flushed to disk. This results in a lot less I/O. As a negative, it adds a little more CPU overhead to the flush. Typically, this is negligible compared to the performance gains. With all these factors considered together, using compression is highly recommended. With either SnappyCompressor or DeflateCompressor, you have the additional option of setting chunk_length_kb. This tells the compression library what the com- pression chunk size in kilobytes should be. The default value is 64KB. This is a fairly sane default and should probably be left alone unless you know the makeup of the ColumnFamily prior to creating it. With wide row ColumnFamilys, 64KB allows read- ing slices of column data without decompressing the entire row. On the skinny row side, it is slightly less efficient as you may end up decompressing more data than you want. If you approximate the layout of your rows, you can adjust the value accordingly. To determine a good chunk_length_kb value, you will need to determine how much data is typically requested at a time and figure out how that fits into the average size of a row in the ColumnFamily (which can be found using nodetool cfstats). You can also adjust the crc_check_chance. There is a cyclic redundancy check (CRC) check- sum attached to every compressed block. By default, it is checked every read. You can disable this by setting the value to 0, but it is not recommended to do this. A better approach would be a setting of 0.5 to check the checksum every other read instead of every read. If you have an existing ColumnFamily for which you are changing the compression settings, the already-created SSTables on disk are not compressed immediately. Any new SSTable that gets created will be compressed. You can then either let the existing SSTables get compressed during normal compactions or force the SSTables to be rebuilt by using nodetool scrub. Both are acceptable options and can easily be done in conjunction in larger clusters. It is possible to use another compression option or even to implement your own. All that is necessary is to implement a compression class using the .compress.ICompressor interface. While there may be good reason to implement your own compression class, the defaults that ship with Cassandra are good for the majority of use cases. ptg11524036 58 Chapter 6  Performance Tuning SnappyCompressor Snappy is a compression/decompression library tailored to the 64-bit CPU architecture and aims to be high speed with reasonable compression size. As a barometer, it usually compresses things within the same range as LZF. Since Cassandra is written in Java, it uses the Java port of the original library written in C11. If you have a read-heavy workload, you will want to stick with SnappyCompressor. DeflateCompressor The DeflateCompressor, more commonly known as Java’s version of zip, is more often used when you want better compression ratios. Your reads might come a little slower, but your data on disk will be smaller than if you were using SnappyCompressor. File System There are a number of options for which file system to use. If you are interested in getting better use of your disk while still keeping some speed, it might be worthwhile to look into using ZFS. The common deployment file system for Linux is ext4 and is already supported by most modern Linux distributions. Formatting a device for ext4 is straightforward (see Listing 6.1), and the utility for doing so is available out of the box. Listing 6.1  Format a Drive with the ext4 File System $ mke2fs –t ext4 /dev/md0 By default, ext4 is a fairly performant file system. By changing a few settings (also known as mount options) in the /etc/fstab, you can get a little extra performance push. nn noatime. This stands for “no access time” updates. It tells the operating system not to update the inode information on each access. This is useful because Cassandra doesn’t access inode information and there is no need to pay the performance penalty for updating it. nn barriers50. This option, when set to 0, disables write barriers. Write barriers enforce the proper on-disk order of journal commits. Since ext4 is a journal-based file system, you incur a performance penalty when ensuring the order of the jour- nals. With hardware-based systems, it is safe to disable barriers if you have battery- backed disks (as is common to hardware RAIDs). nn data5journal, commit515. By default, the operating system will sync all data (including metadata) to disk every five seconds. By extending this value, you can improve performance. commit applies only if you set data5journal. nn data5writeback, nobh. When you set data5writeback, metadata for files is lazily written after the file is written. While this won’t cause file system corruption, it may cause the most recent changes to be lost in the event of a crash. Since most files in Cassandra are written on and not edited in place, this is a low-risk change. nobh (no buffer heads) refers to the operating system’s attempt to avoid associating buffer ptg11524036 59Tuning heads to data pages. In other words, don’t create a map of the disk block information. This is a useful setting for Cassandra because it doesn’t update in place. Note that if you make changes on a disk that is already mounted, you will have to unmount and remount the disk in order for the changes to take effect. Caching The use of caching is an important part of getting performance out of any system. Caching is the ability to store something in a temporary location that makes it faster or easier to access than getting it the way it would otherwise be retrieved. In other words, you don’t go to the store to buy a glass of milk. You buy a gallon of milk, store it in your fridge, and then go to your fridge when you want a glass of milk. Cache works the same way. Assuming that you have row caching on, if you are con- stantly going back to get the same row from a ColumnFamily and that row isn’t located on the coordinator node, the coordinator node and the queried node will cache it. The next time the coordinator is asked for that row, it will go to the temporary store (cache) and check that location first. If it finds the data in the cache, this will negate the need to check the node actually responsible for the data. This means there will be less disk I/O, less network traffic, and less system overhead that doesn’t need to be dedicated to listening for query results externally. How Cassandra Caching Works Before we talk specifically about tuning the cache, we should discuss how caching in Cassandra works. When both the row cache and key cache are configured and enabled, the standard order for searching for the data is row cache, key cache, SSTables, and then MemTables. Let’s break this down a little bit further. When your application requests a row of data, the first thing that Cassandra does is to go to the row cache. If Cassandra finds the row in the row cache, it returns it to the application and that is a win for caching. If the row is not cached, Cassandra checks the SSTables on disk. If the row isn’t in the SSTables, the MemTables are checked. Wherever the row was found, whether in the SSTables or the MemTables, the row cache will be populated with the value returned for that row. If the application requests a column and the row where that column resides is already in the row cache, Cassandra will grab the row from the cache, pull the column out, and return it to the application. General Caching Tips Here are some general rules that you can follow to get efficient use of caching: nn Store less recently used data or wide rows in a ColumnFamily with minimal or no caching. nn Try to create logical separations of heavily read data. This can be done by breaking your frequently used data apart into many ColumnFamilys and tuning the caching on each ColumnFamily individually. ptg11524036 60 Chapter 6  Performance Tuning nn Add more Cassandra nodes. Since Cassandra does a lot of caching for you, you will get a pretty solid benefit from adding nodes. Each node will then contain a smaller data set, and you will be able to fit more data in memory. The other item to be aware of when it comes to caching is how it affects the MemTables. A Cassandra MemTable requires an index structure in addition to the data that it stores. This is so that the MemTable is easily searchable for data that has not been written to an SSTable yet. If the size of the values stored is small compared to the number of rows and columns in that MemTable, the overhead to support this indexing may not be worth it. Global Cache Tuning There is an easy performance gain to be had if you have a small data set per node. The first thing we need to do is define small in this scenario. Small refers to the data set being able to fit into memory. In the cassandra.yaml file, the setting populate_io_cache_on_ flush is set to false by default. This is because it is expected that most data sets will not be able to fit into memory. If yours does, it means the cache will be populated on MemTable flush and compactions. This will greatly speed up your query times by having all data loaded into the cache immediately when it becomes available. One of the most common types of caches for a database is a key cache. Cassandra gives you the ability to control the maximum size of the key cache. By default, the key_cache_size_in_mb is set to 5% of the heap in megabytes or 100MB, whichever is less. The key cache saves one seek each hit. And although the default value is relatively sane, it probably makes sense to increase the size. The actual value that you set it to depends on how much memory you have. The amount of time that the keys are saved in the key cache is four hours by default. As with many of the other settings, changing it depends on your use case. Four hours is a decent default, but if you have daily data, perhaps changing it to 24 hours would make more sense in the context of your application. Similar options are available for tuning the row cache. The row cache, when a cache hit happens, saves at least two seeks, sometimes more. Since it is easy for the row cache to become really large, really quick, it needs to be adjusted for your use case. You need to ensure that you set a row_cache_size_in_mb that makes sense for the average size of your rows relative to your heap size. As with keys, you also have the ability to set the row_cache_save_period. This value comes disabled by default, meaning that the rows aren’t saved to disk. Saving caches to disk greatly increases Cassandra cold-start speeds and has relatively little impact on I/O. But in general, saving the row cache to disk is much more expensive than saving the key cache to disk and has a fairly limited use. There is also the ability to change the provider being used for the row cache. As of Cassandra 1.2, there are two possibilities, ConcurrentLinkedHashCacheProvider and SerializingCacheProvider. The SerializingCacheProvider serializes the contents of the row and stores them off the JVM heap. Serialized rows use less memory than a full row inside the JVM, giving you the ability to store more information in a smaller memory footprint. ptg11524036 61Tuning Storing the cache off-heap also means smaller heap sizes, reducing the impact of GC pauses. The ConcurrentLinkedHashCacheProvider is better for workloads that are update heavy. This is because ConcurrentLinkedHashCacheProvider updates data in place. The SerializingCacheProvider invalidates the cached row only on update, meaning that the seek still has to go to disk to get the correct data. Technically, there is a third option for a row cache. It would be to build a custom cache provider. As long as the provider imple- ments org.apache.cassandra.cache.IRowCacheProvider, it can be used as a caching engine. However, the default available options are fine for most use cases. ColumnFamily Cache Tuning There are four possible settings for caching within a ColumnFamily: ALL, NONE, KEYS_ ONLY, and ROWS_ONLY. They all do exactly what they sound as if they do. NONE is to dis- able caching on the ColumnFamily. KEYS_ONLY is to cache only by keys requested. ROWS_ONLY is to cache the entire row of a requested key. ALL refers to the ability to cache as much information as possible. The option that you choose should be relevant to the workload that you normally put on Cassandra. A good way to think about this is to look at your query pattern. If you have skinny rows and you are asking Cassandra for the same row over and over, using ROWS_ONLY is likely the route for you. It allows the row to be put into the cache and modified only if the row itself is modified. If you have wide rows and you don’t use most of the keys in the row, it might make more sense to use KEYS_ONLY as the cache. Using KEYS_ONLY makes sense only if you don’t have a lot of turnover in the cache. If you do have a lot of turnover, it may make sense not to do any caching at all. This means setting the cache to NONE. Listing 6.2 shows how to change the cache setting on the events table to KEYS_ ONLY using CQL 3. Listing 6.2  Change the Cache Setting on an Existing Table # ALTER TABLE events WITH caching5’KEYS_ONLY’; Bloom Filters Bloom filters are a space-efficient probabilistic data structure that allows you to test whether or not an element is a member of a set. Bloom filters allow for false positives, but not for false negatives. Cassandra uses these data structures to determine whether or not an SSTable has data for a particular row. They are used only in index queries, not in range queries. Since by definition bloom filters allow you to customize the level of accuracy of your data structure, Cassandra passes that ability along to you. If you set the bloom_ filter_fp_chance high, the bloom filter will use less memory but will require more disk I/O. You can disable bloom filters completely if you set the bloom_filter_fp_ chance to 1.0. The right bloom filter setting depends on your workload. If you have an analytics cluster that does mostly range scanning, having bloom filters would not be necessary. ptg11524036 62 Chapter 6  Performance Tuning Also, using LeveledCompaction typically causes slightly less fragmentation within the SSTable than SizeTieredCompaction. Therefore, the default value of the bloom_ filter_fp_chance can be slightly higher. Keep in mind that memory savings are nonlinear. That means that going from a setting of 0.01 to 0.1 saves one-third of the memory even though you are changing the bloom_filter_fp_chance by an order of magnitude. In Cassandra version 1.2, bloom filters are stored off-heap. This means that you don’t need to think about the size of the bloom filters when attempting to figure out the maximum memory size for the JVM. You can easily alter the bloom_filter_fp_chance setting on a per-ColumnFamily basis, as shown in Listing 6.3. Listing 6.3  Adjust the Bloom Filter False Positive Chance for an Existing ColumnFamily # ALTER TABLE events WITH bloom_filter_fp_chance 5 0.01; Once you update the bloom_filter_fp_chance for a ColumnFamily, you need to regenerate the bloom filters. This can be done either by forcing a compaction or by running upgradesstables through nodetool. Another good way to see if your bloom filter settings can be adjusted is through a little bit of trial and error. If you do a nodetool cfstats, you will be able to see the number of bloom filter false positives and the bloom filter false positive ratio for a specific ColumnFamily. You want to minimize the number of bloom filter false positives you get in general. But you also have a little leeway when adjusting the bloom_filter_fp_chance before you actually start getting a significant number of false positives. You will have to tune the value to see where your false positive rate starts to increase. System Tuning Out of the box, Linux comes configured to run pretty well. Since running Cassandra is not a normal workload for the basic server configuration, you can make a few small tweaks and get a noticeable performance improvement. Testing I/O Concurrency A great way to tune for reads and writes at the same time is to run a quick test with iostat (*nix only). In a terminal window, start iostat running with extended disk information and have it refresh roughly every three seconds: iostat –x 3. Now open up two other terminal windows. In one of those windows, start a long write process using the dd command (see Listing 6.4). Listing 6.4  Use dd to Measure Sequential Write Performance dd if5/dev/urandom of5/data/cassandra/test_outfile count5512 bs51024k ptg11524036 63System Tuning In the other terminal window, start a long sequential read process (see Listing 6.5). Listing 6.5  Use dd to Measure Sequential Read Performance dd if5/data/cassandra/test_outfile of5/dev/null bs54096k Now look back at the iostat output, which is still continuing to run on the first ter- minal window that you opened. Did either the read or write time drop significantly? If the answer is yes, you may want to think about better separation. If you have already decided to separate the CommitLog data directory and the data directories onto different disks, you are on the right track. Extending this idea a little further, building a more performant Cassandra node from the I/O perspective can easily be accomplished with some planning. It is common to put Cassandra data directories on a RAID drive. This isn’t a bad idea as RAID provides redundancy and a little bit of extra speed, depending on which RAID level you use. But Cassandra offers a feature called JBoD (just a bunch of disks). This example is a little contrived, as it is not indicative of regular system performance. The idea here is just to get a feel for what happens to response times when the disks are under load. It also gives you the ability to tune your current setup when there is an exist- ing simultaneous read and write load. Again, even though the reads and writes as a result of the queries aren’t normally sequential, they will likely be concurrent. This usage of dd mimics the concurrency aspect of the load. Virtual Memory and Swap If you are running Cassandra on a dedicated machine, the suggested swap setting is to keep it off. The ideal case in a bad scenario with regard to memory is that the OS kills off the Java process running Cassandra, leaving the OS reachable even if Cassandra is down. If the machine is allowed to swap itself “to death,” you will have no way to get to the machine to fix it. The settings in Linux that control these are swappiness, overcommit_memory, and overcommit_ratio. In the preceding scenario, overcommit_memory should be set to 2 and swappiness should be set to 0. An overcommit_memory setting of 2 will ensure that Linux will not hand out anonymous pages of memory unless it is sure it has a place to store them in physical memory (RAM). A swappiness setting of 0 tells the kernel to avoid swapping processes out of physical memory for as long as possible. By default, Linux attempts to be smart and limits the maximum number of mem- ory map areas that a process may have. Most applications don’t need many maps. Since Cassandra works a lot with memory, it is a good idea to give it a little bit of head- room. The default for this setting is 65,535 maps. While this is fine most of the time, setting it to something higher is a good idea. A safe value for a high-volume machine and one that is commonly used on Cassandra nodes is 262,140, or four times the default setting. ptg11524036 64 Chapter 6  Performance Tuning sysctl Network Settings There are a few other sysctl settings that can be adjusted for getting more performance out of your Cassandra machine. They are relevant to the amount of network traffic that is allowed in and out of the node. Listing 6.6 shows changes to the send and receive buffer sizes to tell the Linux kernel to allow for higher throughput of network traffic. Listing 6.6  sysctl Settings to Allow for Higher Throughput of Network Traffic net.core.rmem_max 5 16777216 net.core.wmem_max 5 16777216 net.ipv4.tcp_rmem 5 4096 65536 16777216 net.ipv4.tcp_wmem 5 4096 65536 16777216 File Limit Settings You will also need to ensure that Cassandra isn’t being limited by the number of files allowed by the kernel. This can be done by giving a larger value to fs.file-max in your sysctl settings. A good setting for this is 1,048,576. In addition to changing the sysctl setting, you will need to raise the open file limit for the system. By adding the two lines in Listing 6.7 to your /etc/security/limits.conf, you should be able to give Cassandra enough room to operate under normal and heavy loads. Listing 6.7  /etc/security/limits.conf Settings for Allowing Additional Open Files * soft nofile 16,384 * hard nofile 32,768 Solid-State Drives Although Cassandra’s storage engine was optimized for spinning disks, you can still bene- fit a lot from the use of SSDs. This is evident because most modern SATA drives are best at sequential operations. Seek times are limited by the time it takes the drive to rotate. As good throughput as SATA drives get (a 7,200rpm drive typically gets around 125MB/s), solid-state drives are just better at it. They usually get about 250MB/s read and 205MB/s write throughput. Since Cassandra writes sequentially and uses streaming write patterns, it minimizes the effects of write amplification associated with SSDs. Write amplification is a concept asso- ciated with solid-state drives where the amount of physical information written is a mul- tiple of the logical amount intended to be written. This means that Cassandra does a good job of giving normal consumer-level SSDs a longer life span. If you are running on hardware (or even in the cloud) and are lucky enough to have Cassandra on solid-state drives, there are a few settings you can change to get even better performance. The first change that you will want to make is to set multithreaded_ compaction to true. The reason this value defaults to false is that compactions are usually I/O bound. With multithreaded compaction enabled, every compaction ptg11524036 65JVM Tuning operation will use up to one thread per core plus one thread per SSTable being merged. This means a lot of extra disk access. If you are running on SSDs, this additional I/O won’t be a problem and your bottleneck for compactions will likely become the CPU. The other Cassandra options for tuning SSDs are trickle_fsync and trickle_ fsync_interval_in_kb. These values control the intervals at which fsyncs to disk hap- pen. When doing sequential writes to SSDs, you want to avoid sudden dirty buffer flushing from impacting read latencies. Setting trickle_fsync to true and giving a small interval (10MB is usually a good starting point) should make the dirty buffer flushes forced by the OS less frequent. JVM Tuning There are a lot of options for tuning the Java Virtual Machine. The best changes are going to be a result of your workload. There are a lot of “your mileage may vary”-type suggestions. In the following sections, we discuss a few basic principles that can be followed. Multiple JVM Options There are a few different JVMs available. The requirements for deciding which JVM to use for Cassandra are pretty straightforward. It must, at a minimum, be Java 1.6 compati- ble or greater. It should support whichever hardware and operating systems you are using in your infrastructure as well. Although it should run on the IBM JVM and JRockit, the recommended JVM is the Oracle/Sun JVM. Different JVMs will give you different per- formance characteristics, and you will have to experiment with which virtual machine is best for you and your environment. Maximum Heap Size The first thing that administrators usually toy with when tuning JVM settings is the max- imum heap size. The option for this is –Xmx$size, where $size is the desired maximum size of the JVM heap. This value is set in your conf/ file and is defined as MAX_HEAP_SIZE. A good place to start is figuring out how much memory you have on the machine and cutting it in half. You will definitely need to leave enough memory for the operating system to be able to function properly. The operating system is fairly intelli- gent about its memory management, so it is usually smarter to slightly underallocate to the JVM. To determine the approximate size of Cassandra’s internal data structures, a good rule of thumb is memtable_throughput_in_mb * 3 * number_of_hot_CFs 1 1G 1 internal_caches. Another important option in tuning the JVM is to set the thread stack size, –Xss$size. In Java, each thread has its own stack that holds function call or method arguments, return addresses, and the like. In other words, by setting the –Xss parameter, you are able to limit the amount of memory consumed by an individual thread. Setting this to 128KB is typically a safe bet. You should consider raising this value only if you see OutOfMemoryError errors with the additional message that Java is unable to create new native threads. Even then, most of the time it will be indicative of another problem that should be handled first. ptg11524036 66 Chapter 6  Performance Tuning If you find yourself needing to set a larger heap size and you are beginning to run short on memory (and you are using the Oracle JVM), you can look at allowing the JVM to compress ordinary object pointers. An ordinary object pointer (OOP) is a managed pointer to an object in the JVM. Normally, an object pointer in Java is the same size as the machine pointer. When CPU architectures switched to 64-bit, the size of the pointers in the JVM heap increased with it. By adding the option to compress pointers, you can reduce their impact on the heap. You can do this by adding the value –XX:1UseCompressedOops to your conf/ file. Garbage Collection Many issues that stem from JVM performance can usually be traced back to garbage collection. Periodic garbage collections usually are not a problem as most applications expect them and are fairly tolerant of them. Adjusting a few GC-related settings should help to minimize the JVM pauses that can be a result of rapid allocation and deallocation of memory inside the JVM. The following options should be set in the as well: nn -XX:1UserParNewGC. ParNew is a “stop the world while I collect the garbage” system. This may seem like a bad idea, but this GC usually runs quickly and is responsible only for cleaning up the “younger” generation. nn -XX:1UseConcMarkSweepGC. This setting tells the JVM to allow the concurrent collector to run with the parallel young generation collector (also known as ParNewGC in this case). nn -XX: 1CMSParallelRemarkEnabled. CMS here stands for ConcurrentMarkSweep (referenced above). This setting tells the JVM that when the CMS GC is running, the garbage collector can use multiple threads. This usually decreases the length of a pause during the phase. nn -XX: CMSInitiatingOccupancyFraction575. If this value is not set at run- time, the JVM will attempt to determine a good value. This is a percentage and refers to the percent occupancy of the tenured generation at which a CMS GC is triggered. nn -XX:1UseCMSInitiatingOccupancyOnly. This value just tells the JVM not to try to calculate its own value for CMSInitiatingOccupancyFraction and to use the one provided to it. While these settings are good generally applicable settings for Cassandra, they are mostly for systems with smaller amounts of memory, roughly 20GB or less. With any- thing larger, you will start to see diminishing returns on changes you make. Java will be forced to take a much larger part in memory management and garbage collection. This will invariably take up more CPU time and cause longer pauses. A common solution is to run multiple instances of Cassandra on the same node, ensuring that the token ranges don’t coincide with the same replica. ptg11524036 67Summary Summary As with any performance optimizations, your mileage may vary. We have covered sugges- tions for how to get performance improvements up and down the stack from the hard- ware layer all the way up to the Cassandra application and a few things in between. As is common to most bits of advice, you’ll have to test things out and see what works for you. There are many additional options for tuning the Java Virtual Machine for your workload. Determining a specific configuration or set of configuration parameters that works for you will make a large difference in the long run. It’s also a good idea to always keep an eye out for best practices and to learn from people who run similar configurations or have similar requirements. There isn’t always one right answer, and getting the most out of your Cassandra cluster will require some testing. Start applying some of the ideas dis- cussed in this chapter to your cluster piecemeal and watch for improvements. Always ensure that you have a proper baseline of performance before making a change. The more you learn about how your cluster performs under load, the more you will be able to optimize its performance. ptg11524036 This page intentionally left blank ptg11524036 7 Maintenance At this point, your cluster is up and running. There is data coming in and everything seems to be humming along. But Cassandra is not a set-it-and-forget-it system. Maintenance is required, and there are tools to help you perform the necessary maintenance tasks. Understanding nodetool The most basic command that you should be familiar with when administering a Cassandra cluster is nodetool. nodetool is a command-line interface to Cassandra cluster management. It can provide you with basic information about an individual node or all nodes in the cluster or ring. As of Cassandra 1.2, nodetool provides the options shown in Listing 7.1. Listing 7.1  nodetool Output usage: java --host -a,--include-all-sstables includes sstables that are already on the most recent version during upgradesstables. -cf,--column-family only take a snapshot of the specified column family. -et,--end-token token at which repair range ends. -h,--host node hostname or ip address. -local,--in-local-dc only repair against nodes in the same datacenter. -p,--port remote jmx agent port number. -pr,--partitioner-range only repair the first range returned by the partitioner for the node. -pw,--password remote jmx agent password. -snapshot,--with-snapshot repair one node at a time using snapshots. -st,--start-token token at which repair range starts. -T,--tokens display all tokens. -t,--tag optional name to give a snapshot. -u,--username remote jmx agent username. (Continues) ptg11524036 70 Chapter 7 Maintenance Listing 7.1  nodetool Output (continued) Available commands ring - Print information about the token ring. join - Join the ring. info [-T/--tokens] - Print node information (uptime, load, ...). status - Print cluster information (state, load, IDs, ...). cfstats - Print statistics on column families. version - Print cassandra version. tpstats - Print usage statistics of thread pools. proxyhistograms - Print statistic histograms for network operations. drain - Drain the node (stop accepting writes and flush all column families). decommission - Decommission the *node I am connecting to*. compactionstats - Print statistics on compactions. disablebinary - Disable native transport (binary protocol). enablebinary - Reenable native transport (binary protocol). statusbinary - Status of native transport (binary protocol). disablehandoff - Disable the future hints storing on the current node. enablehandoff - Reenable the future hints storing on the current node. resumehandoff - Resume hints delivery process. pausehandoff - Pause hints delivery process. disablegossip - Disable gossip (effectively marking the node down). enablegossip - Reenable gossip. disablethrift - Disable thrift server. enablethrift - Reenable thrift server. enablebackup - Enable incremental backup. disablebackup - Disable incremental backup. statusthrift - Status of thrift server. gossipinfo - Shows the gossip information for the cluster. invalidatekeycache - Invalidate the key cache. invalidaterowcache - Invalidate the row cache. resetlocalschema - Reset node’s local schema and resync. netstats [host] - Print network information on provided host connecting node by default). move - Move node on the token ring to a new token. (for -ve tokens, use \\ to escape, Example: move \\-123). removenode status|force| - Show status of current node removal, force comp- letion of pending removal or remove provided ID. setcompactionthroughput - Set the MB/s throughput cap for compaction in the system, or 0 to disable throttling. setstreamthroughput - Set the MB/s throughput cap for streaming in the system, or 0 to disable throttling. describering [keyspace] - Shows the token ranges info of a given keyspace. rangekeysample - Shows the sampled keys held across all keyspaces. rebuild [src-dc-name] - Rebuild data by streaming from other nodes (similarly to bootstrap). settraceprobability [value] - Sets the probability for tracing any given request to value. 0 disables, 1 enables for all requests, 0 is the default. ptg11524036 71Understanding nodetool snapshot [keyspaces...] - Take a snapshot of the optionally specified column -cf [columnfamilyName] family of the specified keyspaces using optional -t [snapshotName] name snapshotName. clearsnapshot [keyspaces...] - Remove snapshots for the specified keyspaces. -t [snapshotName] Either remove all snapshots or remove the snapshots with the given name. flush [keyspace] [cfnames] - Flush one or more column families. repair [keyspace] [cfnames] - Repair one or more column families (use -pr to repair only the first range returned by the partitioner). cleanup [keyspace] [cfnames] - Run cleanup on one or more column families. compact [keyspace] [cfnames] - Force a (major) compaction on one or more column families. scrub [keyspace] [cfnames] - Scrub (rebuild sstables for) one or more column families. upgradesstables - Rewrite sstables (for the requested column families) [-a|--include-all-sstables] that are not on the current version (thus upgrading [keyspace] [cfnames] them to said current version). Use - a to include all sstables, even those already on the current version. setcompactionthreshold - Set min and max compaction thresholds for a given column family. getcompactionthreshold - Print min and max compaction thresholds for a given column family. stop - Supported types are COMPACTION, VALIDATION, CLEANUP, SCRUB, INDEX_BUILD. cfhistograms - Print statistic histograms for a given column family. refresh - Load newly placed SSTables to the system without restart. rebuild_index - a full rebuilds of native secondary index for a given column family. IndexNameExample: Standard3. IdxName,Standard3.IdxName1. setcachecapacity - Set global key and row cache capacities (in MB units). getendpoints - Print the end points that own the key. getsstables - Print the sstable filenames that own the key. predictconsistency - Predict latency and consistency “t” ms after




需要 8 金币 [ 分享pdf获得金币 ] 0 人已下载