ZooKeeper典型应用场景一览

ZooKeeper是一个高可用的分布式数据管理与系统协调框架。基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是基于这样的特性,使得ZooKeeper解决很多分布式问题。网上对ZK的应用场景也有不少介绍,本文将结合作者身边的项目例子,系统地对ZK的应用场景进行一个分门归类的介绍。

值得注意的是,ZK并非天生就是为这些应用场景设计的,都是后来众多开发者根据其框架的特性,利用其提供的一系列API接口(或者称为原语集),摸索出来的典型使用方法。因此,也非常欢迎读者分享你在ZK使用上的奇技淫巧。

ZooKeeper典型应用场景一览

数据发布与订阅(配置中心) 

发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新。例如全局的配置信息,服务式服务框架的服务地址列表等就非常适合使用。

· 应用中用到的一些配置信息放到ZK上进行集中管理。这类场景通常是这样:应用在启动的时候会主动来获取一次配置,同时,在节点上注册一个Watcher,这样一来,以后每次配置有更新的时候,都会实时通知到订阅的客户端,从来达到获取最新配置信息的目的。

· 分布式搜索服务中,索引的元信息和服务器集群机器的节点状态存放在ZK的一些指定节点,供各个客户端订阅使用。

· 分布式日志收集系统。这个系统的核心工作是收集分布在不同机器的日志。收集器通常是按照应用来分配收集任务单元,因此需要在ZK上创建一个以应用名作为path的节点P,并将这个应用的所有机器ip,以子节点的形式注册到节点P上,这样一来就能够实现机器变动的时候,能够实时通知到收集器调整任务分配。

· 系统中有些信息需要动态获取,并且还会存在人工手动去修改这个信息的发问。通常是暴露出接口,例如JMX接口,来获取一些运行时的信息。引入ZK之后,就不用自己实现一套方案了,只要将这些信息存放到指定的ZK节点上即可。

注意:在上面提到的应用场景中,有个默认前提是:数据量很小,但是数据更新可能会比较快的场景。

负载均衡

这里说的负载均衡是指软负载均衡。在分布式环境中,为了保证高可用性,通常同一个应用或同一个服务的提供方都会部署多份,达到对等服务。而消费者就须要在这些对等的服务器中选择一个来执行相关的业务逻辑,其中比较典型的是消息中间件中的生产者,消费者负载均衡。

消息中间件中发布者和订阅者的负载均衡,linkedin开源的KafkaMQ和阿里开源的metaq都是通过zookeeper来做到生产者、消费者的负载均衡。这里以metaq为例如讲下:
生产者负载均衡metaq发送消息的时候,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息,因此metaq在运行过程中,会把所有broker和对应的分区信息全部注册到ZK指定节点上,默认的策略是一个依次轮询的过程,生产者在通过ZK获取分区列表之后,会按照brokerIdpartition的顺序排列组织成一个有序的分区列表,发送的时候按照从头到尾循环往复的方式选择一个分区来发送消息。

消费负载均衡:

在消费过程中,一个消费者会消费一个或多个分区中的消息,但是一个分区只会由一个消费者来消费。MetaQ的消费策略是:

· 每个分区针对同一个group只挂载一个消费者。

· 如果同一个group的消费者数目大于分区数目,则多出来的消费者将不参与消费。

· 如果同一个group的消费者数目小于分区数目,则有部分消费者需要额外承担消费任务。

在某个消费者故障或者重启等情况下,其他消费者会感知到这一变化(通过 zookeeper watch消费者列表),然后重新进行负载均衡,保证所有的分区都有消费者进行消费。

命名服务(Naming Service)

命名服务也是分布式系统中比较常见的一类场景。在分布式系统中,通过使用命名服务,客户端应用能够根据指定名字来获取资源或服务的地址,提供者等信息。被命名的实体通常可以是集群中的机器,提供的服务地址,远程对象等等——这些我们都可以统称他们为名字(Name)。其中较为常见的就是一些分布式服务框架中的服务地址列表。通过调用ZK提供的创建节点的 API,能够很容易创建一个全局唯一的path,这个path就可以作为一个名称。

阿里巴巴集团开源的分布式服务框架Dubbo中使用ZooKeeper来作为其命名服务,维护全局的服务地址列表,点击这里查看Dubbo开源项目。在Dubbo实现中:

服务提供者在启动的时候,向ZK上的指定节点/dubbo/${serviceName}/providers目录下写入自己的URL地址,这个操作就完成了服务的发布。

服务消费者启动的时候,订阅/dubbo/${serviceName}/providers目录下的提供者URL地址, 并向/dubbo/${serviceName} /consumers目录下写入自己的URL地址。

注意,所有向ZK上注册的地址都是临时节点,这样就能够保证服务提供者和消费者能够自动感应资源的变化。

另外,Dubbo还有针对服务粒度的监控,方法是订阅/dubbo/${serviceName}目录下所有提供者和消费者的信息。

分布式通知/协调

ZooKeeper中特有watcher注册与异步通知机制,能够很好的实现分布式环境下不同系统之间的通知与协调,实现对数据变更的实时处理。使用方法通常是不同系统都对ZK上同一个znode进行注册,监听znode的变化(包括znode本身内容及子节点的),其中一个系统updateznode,那么另一个系统能够收到通知,并作出相应处理

· 另一种心跳检测机制:检测系统和被检测系统之间并不直接关联起来,而是通过zk上某个节点关联,大大减少系统耦合。

· 另一种系统调度模式:某系统有控制台和推送系统两部分组成,控制台的职责是控制推送系统进行相应的推送工作。管理人员在控制台作的一些操作,实际上是修改了ZK上某些节点的状态,而ZK就把这些变化通知给他们注册Watcher的客户端,即推送系统,于是,作出相应的推送任务。

· 另一种工作汇报模式:一些类似于任务分发系统,子任务启动后,到zk来注册一个临时节点,并且定时将自己的进度进行汇报(将进度写回这个临时节点),这样任务管理者就能够实时知道任务进度。

总之,使用zookeeper来进行分布式通知和协调能够大大降低系统之间的耦合

集群管理与Master选举

· 集群机器监控:这通常用于那种对集群中机器状态,机器在线率有较高要求的场景,能够快速对集群中机器变化作出响应。这样的场景中,往往有一个监控系统,实时检测集群机器是否存活。过去的做法通常是:监控系统通过某种手段(比如ping)定时检测每个机器,或者每个机器自己定时向监控系统汇报我还活着 这种做法可行,但是存在两个比较明显的问题:

1. 集群中机器有变动的时候,牵连修改的东西比较多。

2. 有一定的延时。

利用ZooKeeper有两个特性,就可以实时另一种集群机器存活性监控系统:

1. 客户端在节点 x 上注册一个Watcher,那么如果 x?的子节点变化了,会通知该客户端。

2. 创建EPHEMERAL类型的节点,一旦客户端和服务器的会话结束或过期,那么该节点就会消失。

例如,监控系统在 /clusterServers 节点上注册一个Watcher,以后每动态加机器,那么就往 /clusterServers 下创建一个 EPHEMERAL类型的节点:/clusterServers/{hostname}. 这样,监控系统就能够实时知道机器的增减情况,至于后续处理就是监控系统的业务了。

· Master选举则是zookeeper中最为经典的应用场景了。

在分布式环境中,相同的业务应用分布在不同的机器上,有些业务逻辑(例如一些耗时的计算,网络I/O处理),往往只需要让整个集群中的某一台机器进行执行,其余机器可以共享这个结果,这样可以大大减少重复劳动,提高性能,于是这个master选举便是这种场景下的碰到的主要问题。

利用ZooKeeper的强一致性,能够保证在分布式高并发情况下节点创建的全局唯一性,即:同时有多个客户端请求创建 /currentMaster 节点,最终一定只有一个客户端请求能够创建成功。利用这个特性,就能很轻易的在分布式环境中进行集群选取了。

另外,这种场景演化一下,就是动态Master选举。这就要用到?EPHEMERAL_SEQUENTIAL类型节点的特性了。

上文中提到,所有客户端创建请求,最终只有一个能够创建成功。在这里稍微变化下,就是允许所有请求都能够创建成功,但是得有个创建顺序,于是所有的请求最终在ZK上创建结果的一种可能情况是这样: /currentMaster/{sessionId}-1 ,?/currentMaster/{sessionId}-2 ,?/currentMaster/{sessionId}-3 ….. 每次选取序列号最小的那个机器作为Master,如果这个机器挂了,由于他创建的节点会马上小时,那么之后最小的那个机器就是Master了。

· 在搜索系统中,如果集群中每个机器都生成一份全量索引,不仅耗时,而且不能保证彼此之间索引数据一致。因此让集群中的Master来进行全量索引的生成,然后同步到集群中其它机器。另外,Master选举的容灾措施是,可以随时进行手动指定master,就是说应用在zk在无法获取master信息时,可以通过比如http方式,向一个地方获取master

· Hbase中,也是使用ZooKeeper来实现动态HMaster的选举。在Hbase实现中,会在ZK上存储一些ROOT表的地址和 HMaster的地址,HRegionServer也会把自己以临时节点(Ephemeral)的方式注册到Zookeeper中,使得HMaster可以随时感知到各个HRegionServer的存活状态,同时,一旦HMaster出现问题,会重新选举出一个HMaster来运行,从而避免了 HMaster的单点问题

分布式锁

分布式锁,这个主要得益于ZooKeeper为我们保证了数据的强一致性。锁服务可以分为两类,一个是保持独占,另一个是控制时序

· 所谓保持独占,就是所有试图来获取这个锁的客户端,最终只有一个可以成功获得这把锁。通常的做法是把zk上的一个znode看作是一把锁,通过 create znode的方式来实现。所有客户端都去创建 /distribute_lock 节点,最终成功创建的那个客户端也即拥有了这把锁。

· 控制时序,就是所有视图来获取这个锁的客户端,最终都是会被安排执行,只是有个全局时序了。做法和上面基本类似,只是这里 /distribute_lock 已经预先存在,客户端在它下面创建临时有序节点(这个可以通过节点的属性控制:CreateMode.EPHEMERAL_SEQUENTIAL来指定)。Zk的父节点(/distribute_lock)维持一份sequence,保证子节点创建的时序性,从而也形成了每个客户端的全局时序。

分布式队列

队列方面,简单地讲有两种,一种是常规的先进先出队列,另一种是要等到队列成员聚齐之后的才统一按序执行。对于第一种先进先出队列,和分布式锁服务中的控制时序场景基本原理一致,这里不再赘述。

第二种队列其实是在FIFO队列的基础上作了一个增强。通常可以在 /queue 这个znode下预先建立一个/queue/num 节点,并且赋值为n(或者直接给/queue赋值n),表示队列大小,之后每次有队列成员加入后,就判断下是否已经到达队列大小,决定是否可以开始执行了。这种用法的典型场景是,分布式环境中,一个大任务Task A,需要在很多子任务完成(或条件就绪)情况下才能进行。这个时候,凡是其中一个子任务完成(就绪),那么就去 /taskList 下建立自己的临时时序节点(CreateMode.EPHEMERAL_SEQUENTIAL),当 /taskList 发现自己下面的子节点满足指定个数,就可以进行下一步按序进行处理了。

 

Zookeeper实现分布式锁

package com.billstudy.zookeeper;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * 注册服务,并且自动监听可用服务列表
 * @author Bill
 * @since V1.0 2015年6月24日 - 上午10:03:24
 */
public class AppServer {

	private ZooKeeper zk = null;
	
	// 树前缀
	private static final String zkParentPrefix = "/appserver";

	private static final String zkChildPrefix = "/app"; 
	
	// 维护可用列表
	private final ArrayList availableServerList = new ArrayList();
	
	public void connectZk(String address){
		try {
			zk = new ZooKeeper("hadoop-server05:2181,hadoop-server06:2181,hadoop-server07:2181", 5000, new Watcher() {
				@Override
				public void process(WatchedEvent event) {
					
					System.out.println(event.toString());
					
					if (event.getType() == EventType.NodeChildrenChanged
						&& event.getPath().startsWith(zkParentPrefix)
							) {
						flushServerList();
					}
				}
			});
			
			// 如果根节点没有,则先创建
			if (zk.exists(zkParentPrefix, true) == null) {
				zk.create(zkParentPrefix, "AppServer root dir ".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				System.out.println("znode :" + zkParentPrefix + "is not exists , create successful !");
			}
			
			// 根据当前address创建临时连续子节点,这样多个不同的app child节点不会重复。 zk会自己维护序列
			String childPath = zk.create(zkParentPrefix + zkChildPrefix, address.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			
			System.out.println("create " + childPath + " successful, address is :" + address);
			
			
			flushServerList();
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}

	/**
	 * 收到子节点更新后,刷新当前可用服务列表
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 上午10:08:19
	 */
	protected void flushServerList() {
		availableServerList.clear();
		try {
			List children = zk.getChildren(zkParentPrefix, true);
			for (String child : children) {
				byte[] data = zk.getData(zkParentPrefix + "/" + child, true,new Stat());
				availableServerList.add(new String(data,"UTF-8"));
			}
			System.out.println("current available server list:" + availableServerList);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 此处可以用来处理业务逻辑,目前让主线程挂起
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 上午10:24:00
	 */
	public void handle(){
		try {
			// System.out.println("handle ...");
			TimeUnit.HOURS.sleep(Long.MAX_VALUE);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		
		if (args.length != 1) {
			System.err.println("The program first argument must be address !");
			System.exit(1);
		}
		AppServer appServer = new AppServer();
		appServer.connectZk(args[0]);
		appServer.handle();
	}
	
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

Zookeeper实现服务上下线监控服务列表

package com.billstudy.zookeeper;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

/**
 * 注册服务,并且自动监听可用服务列表
 * @author Bill
 * @since V1.0 2015年6月24日 - 上午10:03:24
 */
public class AppServer {

	private ZooKeeper zk = null;
	
	// 树前缀
	private static final String zkParentPrefix = "/appserver";

	private static final String zkChildPrefix = "/app"; 
	
	// 维护可用列表
	private final ArrayList availableServerList = new ArrayList();
	
	public void connectZk(String address){
		try {
			zk = new ZooKeeper("hadoop-server05:2181,hadoop-server06:2181,hadoop-server07:2181", 5000, new Watcher() {
				@Override
				public void process(WatchedEvent event) {
					
					System.out.println(event.toString());
					
					if (event.getType() == EventType.NodeChildrenChanged
						&& event.getPath().startsWith(zkParentPrefix)
							) {
						flushServerList();
					}
				}
			});
			
			// 如果根节点没有,则先创建
			if (zk.exists(zkParentPrefix, true) == null) {
				zk.create(zkParentPrefix, "AppServer root dir ".getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
				System.out.println("znode :" + zkParentPrefix + "is not exists , create successful !");
			}
			
			// 根据当前address创建临时连续子节点,这样多个不同的app child节点不会重复。 zk会自己维护序列
			String childPath = zk.create(zkParentPrefix + zkChildPrefix, address.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
			
			System.out.println("create " + childPath + " successful, address is :" + address);
			
			
			flushServerList();
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}

	/**
	 * 收到子节点更新后,刷新当前可用服务列表
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 上午10:08:19
	 */
	protected void flushServerList() {
		availableServerList.clear();
		try {
			List children = zk.getChildren(zkParentPrefix, true);
			for (String child : children) {
				byte[] data = zk.getData(zkParentPrefix + "/" + child, true,new Stat());
				availableServerList.add(new String(data,"UTF-8"));
			}
			System.out.println("current available server list:" + availableServerList);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

	/**
	 * 此处可以用来处理业务逻辑,目前让主线程挂起
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 上午10:24:00
	 */
	public void handle(){
		try {
			// System.out.println("handle ...");
			TimeUnit.HOURS.sleep(Long.MAX_VALUE);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) {
		
		if (args.length != 1) {
			System.err.println("The program first argument must be address !");
			System.exit(1);
		}
		AppServer appServer = new AppServer();
		appServer.connectZk(args[0]);
		appServer.handle();
	}
	
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

MapReducer-找共同好友

package com.billstudy.mr.friends;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 找朋友
 *  共同好友
	原始数据:每个人的好友列表
	A:B,C,D,F,E,O
	B:A,C,E,K
	C:F,A,D,I
	D:A,E,F,L
	E:B,C,D,M,L
	F:A,B,C,D,E,O,M
	G:A,C,D,E,F
	H:A,C,D,E,O
	I:A,O
	J:B,O
	K:A,C,D
	L:D,E,F
	M:E,F,G
	O:A,H,I,J
	……
	
	输出结果:每个人和其他各人所拥有的功能好友
	A-B	C,E,
	A-C	D,F,
	A-D	E,F,
	A-E	B,C,D,
	A-F	B,C,D,E,O,
	A-G	C,D,E,F,
	A-H	C,D,E,O,
	A-I	O,
	A-J	B,O,
	A-K	C,D,
	A-L	D,E,F,
	A-M	E,F,
	B-C	A,
	B-D	A,E,
	……
 * @author Bill
 * @since V1.0 2015年6月24日 - 下午4:53:01
 */
public class ShareFriends {

	/**
	 * 
	 * 把拥有同一个朋友的放到同一组 
	 * 
	 * 将
	 *  A:B,C,D,F,E,O
	 * 
	 * 输出:
	 * B	A
	 * C	A
	 * D	A
	 * F	A
	 * ....
	 * 
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 下午5:15:21
	 */
	static class Mapper1 extends Mapper {
		
		private final Text k = new Text();
		
		private final Text v = new Text();
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
			String[] persons = value.toString().split(":");
			
			if(persons.length != 2){
				return;
			}
			
			// 切分字段
			String self = persons[0];
			String[] friends = persons[1].split(",");
			
			v.set(self);
			for (int i = 0; i < friends.length; i++) {
				k.set(friends[i]);
				context.write(k, v);
			}
		}
	}

	/**
	 * 
	 * 把拥有同一个朋友的拼接到一起,输出
	 * 
	 * 将
	 * A	{B,C,D}
	 * 
	 * 输出:
	 * A	B-C-D
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 下午5:14:26
	 */
	static class Reducer1 extends Reducer{
		
		// private final Text k = new Text();
		
		private final Text v = new Text();
		
		@Override
		protected void reduce(Text key, Iterable friends, Context context)
				throws IOException, InterruptedException {
			
			StringBuilder friendNames = new StringBuilder();
			
			for (Text friend : friends) {
				friendNames.append(friend.toString() + "-");
			}
			
			// 去掉最后一个杠杠
			v.set(friendNames.length() > 0 ? friendNames.substring(0, friendNames.length() - 1) : "" );
			
			context.write(key, v);
			
		}
	}

	/**
	 * 将拥有同一个朋友的人排序后两两拼接输出,让朋友任意的一对组合都可以分到同一组
	 * 
	 * 将
	 * A	B-C-D-E-F
	 * 
	 * 输出:
	 * B-C	A
	 * B-D	A
	 * B-E	A
	 * B-F	A
	 * C-D	A
	 * ....
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 下午5:12:44
	 */
	static class Mapper2 extends Mapper{
		
		private final Text k = new Text();
		
		private final Text v = new Text();
		
		@Override
		protected void map(LongWritable key, Text value,Context context)
				throws IOException, InterruptedException {
			
			String[] persons = value.toString().split("\t");
			String self = persons[0];
			String[] friends = persons[1].split("-");
			
			// 此处必须要对其朋友排序,否则交叉输出时会导致A-B:D / B-A:F 的问题出现,实际上述两个key为同一组。应该为:A-B:D,F
			Arrays.sort(friends);

			v.set(self);
			
			// 交叉打印
			for (int i = 0; i < friends.length - 1; i++) {
				for (int j = i + 1; j < friends.length; j++) {
					 k.set(friends[i] + "-" + friends[j]);
					 context.write(k, v);
				}
			}
		}
	}
	
	/**
	 * 
	 * 把分到同一组组合的朋友拼接输出
	 * 
	 * 将类似:
	 * 	A-B B
	 *  A-B C
	 *  A-B D
	 *  
	 * 输出:
	 * A-B	B,C,D
	 * @author Bill
	 * @since V1.0 2015年6月24日 - 下午5:11:24
	 */
	static class Reducer2 extends Reducer{
		
		// private final Text k = new Text();
		
		private final Text v = new Text();
		
		@Override
		protected void reduce(Text pair, Iterable friends,Context context)
				throws IOException, InterruptedException {
			
			StringBuilder friendNames = new StringBuilder();
			
			for (Text friend : friends) {
				friendNames.append(friend.toString() + ",");
			}
			
			// 去掉逗号
			v.set(friendNames.length() > 0 ? friendNames.substring(0, friendNames.length() - 1) : "");
			
			context.write(pair, v);
			
		}
		
	}
	
	
	public static void main(String[] args) throws Exception {
		
		if (args.length != 3) {
			System.err.println("Usage:  ");
			System.exit(1);
		}
		
		Configuration conf = new Configuration();
		
		// 创建路径,清除旧数据
		Path job1InputPath = new Path(args[0]);
		Path job1OutputPath = new Path(args[1]);
		Path job2OutputPath = new Path(args[2]);
		
		FileSystem fs = FileSystem.get(conf);
		if (fs.exists(job1OutputPath)) {
			fs.delete(job1OutputPath, true);
		}

		if (fs.exists(job2OutputPath)) {
			fs.delete(job2OutputPath, true);
		}

		
		// job1
		Job job1 = Job.getInstance(conf);
		job1.setJarByClass(ShareFriends.class);
		job1.setMapperClass(Mapper1.class);
		job1.setReducerClass(Reducer1.class);
		
		job1.setOutputKeyClass(Text.class);
		job1.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job1, job1InputPath);
		FileOutputFormat.setOutputPath(job1, job1OutputPath);
		
		// job2
		Job job2 = Job.getInstance(conf);
		job2.setJarByClass(ShareFriends.class);
		job2.setMapperClass(Mapper2.class);
		job2.setReducerClass(Reducer2.class);
		
		job2.setOutputKeyClass(Text.class);
		job2.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job2, job1OutputPath);
		FileOutputFormat.setOutputPath(job2, job2OutputPath);
		
		// 控制依赖
		ControlledJob controlledJob1 = new ControlledJob(conf);
		ControlledJob controlledJob2 = new ControlledJob(conf);
		
		controlledJob1.setJob(job1);
		controlledJob2.setJob(job2);
		controlledJob2.addDependingJob(controlledJob1);
		
		JobControl jobControl = new JobControl("share-friends");
		jobControl.addJob(controlledJob1);
		jobControl.addJob(controlledJob2);

		// 创建线程,开始执行任务
		Thread shareFriendExecuteThread = new Thread(jobControl);
		shareFriendExecuteThread.start();
		
		while(!jobControl.allFinished()){
			TimeUnit.SECONDS.sleep(1);
		}

		jobControl.stop();
		
		//  弹出两个job的输出结果文件夹
		Runtime.getRuntime().exec("cmd.exe /c start " + job1OutputPath.toUri().getPath().substring(1));
		Runtime.getRuntime().exec("cmd.exe /c start " + job2OutputPath.toUri().getPath().substring(1));
	}
	
}

版权声明:本文为博主原创文章,未经博主允许不得转载。

vim配置

runtime! debian.vim
"设置编码
set encoding=utf-8
set fencs=utf-8,ucs-bom,shift-jis,gb18030,gbk,gb2312,cp936
set fileencodings=utf-8,ucs-bom,chinese
 
"语言设置
set langmenu=zh_CN.UTF-8
 
"设置语法高亮
syntax enable
syntax on
 
"设置配色方案
colorscheme torte
 
"可以在buffer的任何地方使用鼠标
set mouse=a
set selection=exclusive
set selectmode=mouse,key
 
"高亮显示匹配的括号
set showmatch
 
"去掉vi一致性
set nocompatible
 
"设置缩进
set tabstop=4
set softtabstop=4
set shiftwidth=4
set autoindent
set cindent
if &term=="xterm"
    set t_Co=8
    set t_Sb=^[[4%dm
    set t_Sf=^[[3%dm
endif
 
"打开文件类型自动检测功能
filetype on
 
"设置taglist
let Tlist_Show_One_File=0   "显示多个文件的tags
let Tlist_File_Fold_Auto_Close=1 "非当前文件,函数列表折叠隐藏
let Tlist_Exit_OnlyWindow=1 "在taglist是最后一个窗口时退出vim
let Tlist_Use_SingleClick=1 "单击时跳转
let Tlist_GainFocus_On_ToggleOpen=1 "打开taglist时获得输入焦点
let Tlist_Process_File_Always=1 "不管taglist窗口是否打开,始终解析文件中的tag
 
"设置WinManager插件
let g:winManagerWindowLayout='FileExplorer|TagList'
nmap wm :WMToggle
map   :WMToggle "将F9绑定至WinManager,即打开WimManager
 
"设置CSCOPE
set cscopequickfix=s-,c-,d-,i-,t-,e- "设定是否使用quickfix窗口显示cscope结果
 
"设置Grep插件
nnoremap   :Grep
 
"设置一键编译
map  :make
 
"设置自动补全
filetype plugin indent on   "打开文件类型检测
set completeopt=longest,menu "关掉智能补全时的预览窗口
 
"启动vim时如果存在tags则自动加载
if exists("tags")
    set tags=./tags
endif
 
"设置按F12就更新tags的方法
map  :call Do_CsTag()
nmap s :cs find s =expand(""):copen
nmap g :cs find g =expand("")
nmap c :cs find c =expand(""):copen
nmap t :cs find t =expand(""):copen
nmap e :cs find e =expand(""):copen
nmap f :cs find f =expand(""):copen
nmap i :cs find i ^=expand("")$:copen
nmap d :cs find d =expand(""):copen
function Do_CsTag()
        let dir = getcwd()
        if filereadable("tags")
            if(g:iswindows==1)
                let tagsdeleted=delete(dir."\\"."tags")
            else
                let tagsdeleted=delete("./"."tags")
            endif
            if(tagsdeleted!=0)
                echohl WarningMsg | echo "Fail to do tags! I cannot delete the tags" | echohl None
                return
            endif
        endif
         
        if has("cscope")
            silent! execute "cs kill -1"
        endif
         
        if filereadable("cscope.files")
            if(g:iswindows==1)
                let csfilesdeleted=delete(dir."\\"."cscope.files")
            else
                let csfilesdeleted=delete("./"."cscope.files")
            endif
            if(csfilesdeleted!=0)
                echohl WarningMsg | echo "Fail to do cscope! I cannot delete the cscope.files" | echohl None
                return
            endif
        endif
                                             
        if filereadable("cscope.out")
            if(g:iswindows==1)
                let csoutdeleted=delete(dir."\\"."cscope.out")
            else
                let csoutdeleted=delete("./"."cscope.out")
            endif
            if(csoutdeleted!=0)
                echohl WarningMsg | echo "Fail to do cscope! I cannot delete the cscope.out" | echohl None
                return
            endif
        endif
                                             
        if(executable('ctags'))
            "silent! execute "!ctags -R --c-types=+p --fields=+S *"
            silent! execute "!ctags -R --c++-kinds=+p --fields=+iaS --extra=+q ."
        endif
             
        if(executable('cscope') && has("cscope") )
            if(g:iswindows!=1)
                silent! execute "!find . -name '*.h' -o -name '*.c' -o -name '*.cpp' -o -name '*.java' -o -name '*.cs' > cscope.files"
            else
                silent! execute "!dir /s/b *.c,*.cpp,*.h,*.java,*.cs >> cscope.files"
            endif
            silent! execute "!cscope -b"
            execute "normal :"
                                                                     
            if filereadable("cscope.out")
                execute "cs add cscope.out"
            endif
        endif
endfunction
 
"设置默认shell
set shell=bash
 
"设置VIM记录的历史数
set history=400
 
"设置当文件被外部改变的时侯自动读入文件
if exists("&autoread")
    set autoread
endif
 
"设置ambiwidth
set ambiwidth=double
 
"设置文件类型
set ffs=unix,dos,mac
 
"设置增量搜索模式
set incsearch
 
"设置静音模式
set noerrorbells
set novisualbell
set t_vb=
 
"不要备份文件
set nobackup
set nowb

转自:http://www.oschina.net/code/snippet_76_2010

Lvs + Ngnix + Haproxy + Keepalived + Tomcat 实现三种HA软负载均衡和Tomcat Session共享

环境准备:


一、11台测试机器
    
    
    由于在内网测试,需要搭建个内网yum源,方便安装软件。yum所在机器为192.168.33.101

二、待实现功能
    下面分别使用haproxy/nginx/lvs实现HA + 负载均衡,软件环境如下:
        v1:33.81 nginx
        v2:33.82 nginx    
        v3:33.83 tomcat
        v4:33.84 tomcat
        v5:33.85 tomcat
        v6:33.86 tomcat
        v7:33.87 haproxy
        v8:33.88 haproxy
        v9:33.89 lvs
        v10:33.90 lvs


  

1. 安装依赖包

yum -y install pcre-devel
 
yum -y install openssl-devel
 
yum -y install gcc
 
yum -y install lrzsz
 
yum -y install openssh-clients

2. 安装nginx

2.1. 上传、解压、重命名nginx

su – root

1cd /usr/local

2rz -y

3tar -xzvf nginx-1.7.7.tar.gz

4mv nginx-1.7.

1进入解压后的目录,指定安装路径,
 
注:不指定prefix,则可执行文件默认放在/usr /local/bin,库文件默认放在/usr/local/lib,配置文件默认放在/usr/local/etc
 
cd /usr/local/nginx
 
./configure --prefix=/usr/local/nginx --conf-path=/usr/local/nginx/nginx.conf
 
2编译:
 
make
 
3安装:
 
make install
 
4启动
 
/usr/local/nginx/sbin/nginx
 
5查看
 
http://192.168.xx.xxx
 
出现:welcome Nginx,就安装ok了。
 
6停止
 
/usr/local/nginx/sbin/nginx s stop

2.2. 安装nginx

2.3. 安装jdk

1su - root 用户
 
2、进入usr目录
 
cd /usr
 
3、在usr目录下建立java安装目录
 
mkdir m 755 java
 
4、将jdk-6u24-linux-i586.bin拷贝到java目录下
 
rz -y
 
4、安装jdk
 
cd /usr/java
 
chmod 755 jdk-6u24-linux-i586.bin
 
./jdk-6u24-linux-i586.bin
 
注意:如果出现/lib/ld-linux.so.2: bad ELF interpreter:No such file or directory,安装下glic即可: yum install glibc.i686
 
5、安装完毕为他建立一个链接以节省目录长度
 
ln -s /usr/java/jdk1.6.0_24//usr/jdk
 
6、配置环境变量
 
vim /etc/profile
 
添加内容:
 
vi /etc/profile
 
export JAVA_HOME=/usr/jdk
 
export PATH=$PATH:$JAVA_HOME/bin
 
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
 
export JAVA_HOME PATH CLASSPATH
 
7、执行下命令(source命令也称为“点命令”,也就是一个点符号(.)。source命令通常用于重新执行刚修改的初始化文件,使之立即生效,而不必注销并重新登录。)
 
source /etc/profile

2.4. 安装tomcat

1上传、解压:
 
rz -y
 
tar -zxvf apache-tomcat-6.0.37.tar.gz
 
2、重命名:
 
mv apache-tomcat-6.0.37 tomcat

 

2.5. 重新配置nginx

1cd /usr/local/nginx
 
2vi /usr/local/nginx/nginx.conf

添加:

user nobody nobody;#定义Nginx运行的用户和用户组
 
worker_processes 4;#nginx进程数,建议设置为等于CPU总核心数。
 
error_log logs/error.log info;#全局错误日志定义类型,[ debug | info | notice | warn | error | crit ]
 
worker_rlimit_nofile 1024;#一个nginx进程打开的最多文件描述符数目,所以建议与ulimit -n的值保持一致。
 
pid logs/nginx.pid;#进程文件
 
 
#工作模式及连接数上限
 
events {
 
use epoll;#参考事件模型,use [ kqueue | rtsig | epoll |/dev/poll | select | poll ]; epoll模型是Linux2.6以上版本内核中的高性能网络I/O模型
 
worker_connections 1024;#单个进程最大连接数(最大连接数=连接数*进程数)
 
}
 
 
#设定http服务器,利用它的反向代理功能提供负载均衡支持
 
http {
 
include mime.types;#文件扩展名与文件类型映射表
 
default_type application/octet-stream;#默认文件类型
 
#设定负载均衡的服务器列表
 
upstream tomcatxxxcom {
 
server 192.168.56.200:8080;
 
server 192.168.56.201:8080;
 
}
 
#设定日志格式
 
log_format www_xy_com '$remote_addr - $remote_user [$time_local] "$request" '
 
'$status $body_bytes_sent "$http_referer" '
 
'"$http_user_agent" "$http_x_forwarded_for"';
 
 
sendfile on;#开启高效文件传输模式,sendfile指令指定nginx是否调用sendfile函数来输出文件,对于普通应用设为 on,如果用来进行下载等应用磁盘IO重负载应用,可设置为off,以平衡磁盘与网络I/O处理速度,降低系统的负载。注意:如果图片显示不正常把这个改成off
 
keepalive_timeout 65;#长连接超时时间,单位是秒
 
 
#gzip on;
 
#设定虚拟主机,默认为监听80端口
 
server {
 
listen 80;
 
server_name tomcat.xxx.com;#域名可以有多个,用空格隔开
 
 
#charset koi8-r;
 
#设定本虚拟主机的访问日志
 
access_log /data/logs/access.log www_xy_com;
 
#对 "/" 启用反向代理
 
location /{
 
proxy_pass http://tomcatxxxcom;
 
proxy_set_header Host $host;
 
proxy_set_header X-Real-IP $remote_addr;
 
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
 
}
 
 
#error_page 500 502 503 504 /50x.html;
 
location =/50x.html {
 
root html;
 
}
 
}
 
}

3、创建logs所需要的文件夹/data /logs/

cd /
 
mkdir m 755 data
 
cd data
 
mkdir m 755 logs

 

4、启动tomcatnginx

/usr/local/tomcat/bin/startup.sh
 
/usr/local/nginx/sbin/nginx
 

5、修改hosts,加入

192.168.56.99 tomcat.xxx.com

 

6、访问http://redis.xxy.com


上面部分安装,我是用脚本装的。没按照上面来,使用上面配置时,启动nginx时,提示我”[emerg] 7458#0: unknown directive “user” in /usr/local/nginx/nginx.conf:1“,

于是,我就直接拿着nginx.conf.default修改了,改改也能用。下面是我修改后的配置(v1下:/usr/local/nginx/nginx.conf):

worker_processes 1;
error_log logs/error.log info;
pid logs/nginx.pid;
 
events {
use epoll;
worker_connections 1024;
}
 
http {
include mime.types;
default_type application/octet-stream;
# 配置需要代理的服务器列表
upstream servers {
server v3:8080;
server v4:8080;
}
 
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
 
access_log logs/access.log main;
 
sendfile on;
 
keepalive_timeout 65;
 
server {
listen 80;
server_name localhost;
 
access_log logs/host.access.log main;
 
location /{
#引用上面的配置
proxy_pass http://servers;
proxy_set_header Host $host;
proxy_set_header X-Real_IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
root html;
index index.html index.htm;
}
 
error_page 500502503504/50x.html;
location =/50x.html {
root html;
}
}
 
}
访问v1:80,使用rr策略自动轮巡v3,v4效果图:
    


对了,上面那句v3,v4的提示语,是我echo一句话把tomcat默认的ROOT/index.jsp 给覆盖了,用作测试
    1.首先使用keepalived + nginx + tomcat 实现

测试过程中发现,我收到将nginx kill之后keepalived居然ip不飘,检查了配置也没发现问题。唯一能让keepavlied执行notify.sh脚本时候就是在启动kp时,
会检查nginx是否存活,不存活则启动。 现在已经实现了
于是又加了个脚本单独去监控kp ,脚本名称为:monitor.sh,代码在下边
v1的keepalived脚本:
    
!ConfigurationFilefor keepalived
 
global_defs {
}
 
vrrp_script chk_nginx {
script "killall -0 nginx"
interval 1
weight -2
}
 
vrrp_instance VI_1 {
state MASTER
interface eth0
virtual_router_id 1
priority 100
advert_int 1
authentication {
auth_type PASS
auth_pass 1111
}
 
track_script {
chk_nginx
}
 
virtual_ipaddress {
192.168.33.181/24
}
 
notify_master "/etc/keepalived/notify.sh master"
notify_backup "/etc/keepalived/notify.sh backup"
notify_fault "/etc/keepalived/notify.sh fault"
}
 
vrrp_instance VI_2 {
state BACKUP
interface eth0
virtual_router_id 2
priority 98
advert_int 1
authentication {
auth_type PASS
auth_pass 1111
}
 
track_script {
chk_nginx
}
 
virtual_ipaddress {
192.168.33.182/24
}
 
notify_master "/etc/keepalived/notify.sh master"
notify_backup "/etc/keepalived/notify.sh backup"
notify_fault "/etc/keepalived/notify.sh fault"
}
v2的keepalived脚本:
!ConfigurationFilefor keepalived
 
global_defs {
}
 
vrrp_script chk_nginx {
script "killall -0 nginx"
interval 1
weight -2
}
 
 
vrrp_instance VI_1 {
state BACKUP
interface eth0
virtual_router_id 1
priority 98
advert_int 1
authentication {
auth_type PASS
auth_pass 1111
}
 
track_script {
chk_nginx
}
 
virtual_ipaddress {
192.168.33.181/24
}
 
notify_master "/etc/keepalived/notify.sh master"
notify_backup "/etc/keepalived/notify.sh backup"
notify_fault "/etc/keepalived/notify.sh fault"
}
 
 
vrrp_instance VI_2 {
state MASTER
interface eth0
virtual_router_id 2
priority 100
advert_int 1
authentication {
auth_type PASS
auth_pass 1111
}
 
track_script {
chk_nginx
}
 
virtual_ipaddress {
192.168.33.182/24
}
 
notify_master "/etc/keepalived/notify.sh master"
notify_backup "/etc/keepalived/notify.sh backup"
notify_fault "/etc/keepalived/notify.sh fault"
}
v1,v2共同脚本:
monitor.sh:
#!/bin/bash
while true;
do
A=`ps -ef|grep nginx |wc -l`
B=`ps -ef|grep keepalived |wc -l`
if[ $A -eq 1];then
echo 'restart nginx!!!!'
/usr/local/nginx/sbin/nginx
if[ $A -eq 1];then
if[ $B -gt 1];then
killall keepalived
service keepalived start &
fi
fi
fi
 
if[ $B -eq 1];then
service keepalived start &
fi
 
sleep 5
done
notify.sh
#!/bin/bash
# keepalived notify script
contact='root@localhost'
notify(){
mailsubject="`hostname` to be $1: vip floating"
mailbody="`date '+%F %H:%M:%S'`: vrrp transition, `hostname` changed to be $1"
echo $mailbody | mail -s "$mailsubject" $contact
}
 
case"$1"in
master)
notify master
echo "变成主了,触发master!">>/root/k_tips.log
/usr/local/nginx/sbin/nginx
exit 0
;;
backup)
notify backup
echo "变成从了,触发backup事件!">>/root/k_tips.log
/usr/local/nginx/sbin/nginx
exit 0
;;
fault)
notify fault
echo "脑列了,触发fault事件!">>/root/k_tips.log
/usr/local/nginx/sbin/nginx -s stop
exit 0
;;
*)
echo 'Usage: notify.sh {master|backup|fault}'
exit 1
;;
esac

    2.使用keepalived + haproxy + tomcat 实现
        2.1 安装haproxy
            yum -y install haproxy
        2.2 修改配置
            
# to have these messages end up in /var/log/haproxy.log you will
# need to:
#
# 1) configure syslog to accept network log events. This is done
# by adding the '-r' option to the SYSLOGD_OPTIONS in
# /etc/sysconfig/syslog
#
# 2) configure local2 events to go to the /var/log/haproxy.log
# file. A line like the following can be added to
# /etc/sysconfig/syslog
#
# local2.* /var/log/haproxy.log
#
# 上面说我们要把日志写到/var/log/haproxy.log需要修改以下两个地方:
1./etc/sysconfig/syslog , 这个我现在用的centos6.6已经没有了,取代它的是同目录下的rsyslog
vi /etc/sysconfig/rsyslog
这里对它做了以下更改:
修改前:SYSLOGD_OPTIONS="-c 5"
修改后:SYSLOGD_OPTIONS="-c 2 -r"
2. /etc/rsyslog.conf 修改日志存放位置
vi /etc/rsyslog.conf
我在local7.*下面加了一行,修改后是这样的:
# Save boot messages also to boot.loglocal7.* /var/log/boot.loglocal2.* /var/log/haproxy.log
好了,基本的配置可以了。
下面开始修改/etc/haproxy/haproxy.cfg
    
    3.使用keepalived + lvs + tomcat 实现

未完待续。。。

版权声明:本文为博主原创文章,未经博主允许不得转载。

HDFS写文件解析

  1. client通过DistributedFileSystem对象调用create()方法创建文件,实际上通过RPC调用了NameNode的方法。
  2. NameNode收到client的请求之后,执行各种检查(1.确认要传文件目前在HDFS上不存在,2.client具有写的权限)如果通过则会为新文件创建一条记录,并返回一个FSDataOutputStream对象,该对象负责DataNode和NameNode通讯。若检查未通过则抛出IOException。
  3. 客户端拿到流后,进行文件写入。stream会将数据分成一个个数据包(packet)并写入内部队列,称为数据队列(data queue)DataStreamer处理数据队列,它的责任是根据DataNode列表来要求NameNode分配适合的新块来存储副本。这一组DataNode构成一个管道(Pipeline)
    如果存在多个DataNode,那么包会在DataNode之间进行流式转发。 一直到最后一个节点
  4. DFSOutputStream也维护着一个确认队列(ack queue)。收到管道中所有DataNode确认信息后,该数据包才会从确认队列中删除。
  5. 所有的数据块都写完以后,close流。并且等待NameNode返回确认信息。
    上述是在正常写文件,并且没有发生故障的理想情况下。

    补充:
        如果数据写入期间DataNode发生故障,则执行以下操作:
             
  1.关闭管道(Pipeline)

             
  2.将队列中的所有数据包都添加回数据队列的最前端,确保故障节点下游的DataNode不会漏掉任何一个数据库包

             
  3.为存储在另一节点的当前数据块指定一个新的标识,并将标识发给NameNode. 便于故障节点恢复后可以删除其原来已经上传的部分数据。

             
  4.从管道中删除故障节点,并把数据块继续传输到其他正常DataNode节点。NameNode发现副本数不足时,会在另一个节点上创建一个新的副本。

版权声明:本文为博主原创文章,未经博主允许不得转载。

CentOS6.4配置163的yum源

CentOS系统自带的更新源的速度实在是慢,为了让CentOS6使用速度更快的YUM更新源,可以选择163(网易)的更新源。

1.下载repo文件

wget http://mirrors.163.com/.help/CentOS6-Base-163.repo

2.备份并替换系统的repo文件

[root@localhost ~]# cd /etc/yum.repos.d/
[root@localhost ~]# mv CentOS-Base.repo CentOS-Base.repo.bak
[root@localhost ~]# mv CentOS6-Base-163.repo CentOS-Base.repo

3.执行yum源更新

[root@localhost ~]# yum clean all
[root@localhost ~]# yum makecache
[root@localhost ~]# yum update

发布于:http://www.hiceon.com/topic/centos-6-config-163-yum

sqoop 和mysql相关操作

数据迁移工具,可以和RDBMS相互迁移数据

需要先将db driver copy to sqoop lib dir
注意:
    sqoop是以mapreduce的方式来运行任务的,用hdfs来存储数据的。所以依赖NameNode和ResourceManager,只要机器上配置了这两个就可以正常运行,程序运行时会自动读取环境变量.
// 导出car数据库中的carinfo表到hdfs的user里面
sqoop import –connect jdbc:mysql://192.168.32.1:3306/car –username root –password bill –table carinfo 
// 导出到/sqoop/file(hdfs上)  
–target-dir /sqoop/file
使用2个map任务来跑
 -m 2
使用制表符作为分隔符
–fields-terminated-by ‘\t’
只导出id和name列
–columns “id,name”
带上过滤条件    
–where ‘id > 2 and id <= 9'
带上查询语句,加上了query必须要带上这个   $CONDTIONS:动态拼接条件
–query ‘select * from user where id > 100 and  $CONDTIONS’ 
// 根据trande_detail.id来分给不同的map,在map多个的时候需要指定
// 单个map不需要加这个选项
–split-by trande_detail.id
案例:
sqoop import –connect jdbc:mysql://192.168.32.1:3306/h1 –username root –password bill –target-dir /sqoop/table/radiotype/all/ –fields-terminated-by ‘\t’ –table radiotype
sqoop import –connect jdbc:mysql://192.168.32.1:3306/h1 –where ‘id >= 3 and id <= 5' --target-dir /sqoop/table/radiotype/id3-5_03/ --fields-terminated-by '\t' -m 2 –table radiotype –username root –password bill
// 上面 -m 2居然没用,但是改成1又有用,待解决。!

将h1数据库中的radiotype表的记录导出到/sqoop/table/radiotype/assign-columns中,只导出(id,typename,imageurl)字段
sqoop import –connect jdbc:mysql://192.168.32.1:3306/h1 –table radiotype –target-dir /sqoop/table/radiotype/assign-columns/ –columns ‘id,typename,imageurl’ –fields-terminated-by ‘^’ -m 4 –username root –password bill
sqoop import –connect jdbc:mysql://192.168.32.1:3306/h1  –target-dir /sqoop/table/radiotype/custom_sql/ –username root –password bill –fields-terminated-by ‘标’ –query ‘select * from radiotype where id > 1 and $CONDITIONS
-m 1

// 使用query时,要带上$CONDITIONS 若是使用多个map需要手动指定–split-by ,否则会报错
sqoop import –connect jdbc:mysql://192.168.32.1/h1 –username root –password bill –query ‘select * from radiotype where id != 3 and $CONDITIONS’ –target-dir /sqoop/table/radiotype/custom_sql_02 –fields-terminated-by ‘^’  -m
4 –split-by radiotype.id


sqoop export –connect jdbc:mysql://192.168.32.1/h1 –table radiotype_export –export-dir /sqoop/table/radiotype/custom_sql/ –username root –password bill –fields-terminated-by ‘标’

版权声明:本文为博主原创文章,未经博主允许不得转载。

HBase shell

进入hbase命令行
./hbase shell
显示hbase中的表
list
创建user表,包含info、data两个列族
create ‘user’, ‘info1’, ‘data1’
create ‘user’, {NAME => ‘info’, VERSIONS => ‘3’}
向user表中插入信息,row key为rk0001,列族info中添加name列标示符,值为zhangsan
put ‘user’, ‘rk0001’, ‘info:name’, ‘zhangsan’
向user表中插入信息,row key为rk0001,列族info中添加gender列标示符,值为female
put ‘user’, ‘rk0001’, ‘info:gender’, ‘female’
向user表中插入信息,row key为rk0001,列族info中添加age列标示符,值为20
put ‘user’, ‘rk0001’, ‘info:age’, 20
向user表中插入信息,row key为rk0001,列族data中添加pic列标示符,值为picture
put ‘user’, ‘rk0001’, ‘data:pic’, ‘picture’
获取user表中row key为rk0001的所有信息
get ‘user’, ‘rk0001’
获取user表中row key为rk0001,info列族的所有信息
get ‘user’, ‘rk0001’, ‘info’
获取user表中row key为rk0001,info列族的name、age列标示符的信息
get ‘user’, ‘rk0001’, ‘info:name’, ‘info:age’
获取user表中row key为rk0001,info、data列族的信息
get ‘user’, ‘rk0001’, ‘info’, ‘data’
get ‘user’, ‘rk0001’, {COLUMN => [‘info’, ‘data’]}
get ‘user’, ‘rk0001’, {COLUMN => [‘info:name’, ‘data:pic’]}
获取user表中row key为rk0001,列族为info,版本号最新5个的信息
get ‘people’, ‘rk0002’, {COLUMN => ‘info’, VERSIONS => 2}
get ‘user’, ‘rk0001’, {COLUMN => ‘info:name’, VERSIONS => 5}
get ‘user’, ‘rk0001’, {COLUMN => ‘info:name’, VERSIONS => 5, TIMERANGE => [1392368783980, 1392380169184]}
获取user表中row key为rk0001,cell的值为zhangsan的信息
get ‘people’, ‘rk0001’, {FILTER => “ValueFilter(=, ‘binary:图片’)”}
获取user表中row key为rk0001,列标示符中含有a的信息
get ‘people’, ‘rk0001′, {FILTER => “(QualifierFilter(=,’substring:a’))”}
put ‘user’, ‘rk0002’, ‘info:name’, ‘fanbingbing’
put ‘user’, ‘rk0002’, ‘info:gender’, ‘female’
put ‘user’, ‘rk0002’, ‘info:nationality’, ‘中国’
get ‘user’, ‘rk0002’, {FILTER => “ValueFilter(=, ‘binary:中国’)”}
查询user表中的所有信息
scan ‘user’
查询user表中列族为info的信息
scan ‘people’, {COLUMNS => ‘info’}
scan ‘user’, {COLUMNS => ‘info’, RAW => true, VERSIONS => 5}
scan ‘persion’, {COLUMNS => ‘info’, RAW => true, VERSIONS => 3}
查询user表中列族为info和data的信息
scan ‘user’, {COLUMNS => [‘info’, ‘data’]}
scan ‘user’, {COLUMNS => [‘info:name’, ‘data:pic’]}
查询user表中列族为info、列标示符为name的信息
scan ‘user’, {COLUMNS => ‘info:name’}
查询user表中列族为info、列标示符为name的信息,并且版本最新的5个
scan ‘user’, {COLUMNS => ‘info:name’, VERSIONS => 5}
查询user表中列族为info和data且列标示符中含有a字符的信息
scan ‘people’, {COLUMNS => [‘info’, ‘data’], FILTER => “(QualifierFilter(=,’substring:a’))”}
查询user表中列族为info,rk范围是[rk0001, rk0003)的数据
scan ‘people’, {COLUMNS => ‘info’, STARTROW => ‘rk0001’, ENDROW => ‘rk0003’}
查询user表中row key以rk字符开头的
scan ‘user’,{FILTER=>”PrefixFilter(‘rk’)”}
查询user表中指定范围的数据
scan ‘user’, {TIMERANGE => [1392368783980, 1392380169184]}
删除数据
删除user表row key为rk0001,列标示符为info:name的数据
delete ‘people’, ‘rk0001’, ‘info:name’
删除user表row key为rk0001,列标示符为info:name,timestamp为1392383705316的数据
delete ‘user’, ‘rk0001’, ‘info:name’, 1392383705316
清空user表中的数据
truncate ‘people’
修改表结构
首先停用user表(新版本不用)
disable ‘user’
添加两个列族f1和f2
alter ‘people’, NAME => ‘f1’
alter ‘user’, NAME => ‘f2’
启用表
enable ‘user’
###disable ‘user'(新版本不用)
删除一个列族:
alter ‘user’, NAME => ‘f1’, METHOD => ‘delete’ 或 alter ‘user’, ‘delete’ => ‘f1’
添加列族f1同时删除列族f2
alter ‘user’, {NAME => ‘f1’}, {NAME => ‘f2’, METHOD => ‘delete’}
将user表的f1列族版本号改为5
alter ‘people’, NAME => ‘info’, VERSIONS => 5
启用表
enable ‘user’
删除表
disable ‘user’
drop ‘user’
get ‘person’, ‘rk0001’, {FILTER => “ValueFilter(=, ‘binary:中国’)”}
get ‘person’, ‘rk0001′, {FILTER => “(QualifierFilter(=,’substring:a’))”}
scan ‘person’, {COLUMNS => ‘info:name’}
scan ‘person’, {COLUMNS => [‘info’, ‘data’], FILTER => “(QualifierFilter(=,’substring:a’))”}
scan ‘person’, {COLUMNS => ‘info’, STARTROW => ‘rk0001’, ENDROW => ‘rk0003’}
scan ‘person’, {COLUMNS => ‘info’, STARTROW => ‘20140201’, ENDROW => ‘20140301’}
scan ‘person’, {COLUMNS => ‘info:name’, TIMERANGE => [1395978233636, 1395987769587]}
delete ‘person’, ‘rk0001’, ‘info:name’
alter ‘person’, NAME => ‘ffff’
alter ‘person’, NAME => ‘info’, VERSIONS => 10
get ‘user’, ‘rk0002’, {COLUMN => [‘info:name’, ‘data:pic’]}
scan ‘people’, {COLUMNS => ‘info’,RAW => true, VERSIONS => 3}
 

版权声明:本文为博主原创文章,未经博主允许不得转载。